Money talks

환율 변동에 빠르게 대응하는 방법: 메신저 알림 시스템 구축

shadefly 2025. 5. 16. 16:57

해외 주식이나 외환 거래를 하다 보면 환율의 순간적인 변동이 큰 기회로 다가올 때가 많습니다. 특히 원/달러 환율이 급등하거나 급락할 때, 그 영향이 엔/원 환율에도 연쇄적으로 작용하게 되죠. 이 흐름을 빠르게 포착하고 대응할 수 있다면, 적지 않은 환차익을 기대할 수 있습니다.
하지만 문제는 ‘속도’입니다.
환율 변동 정보는 누구나 볼 수 있지만, 누가 먼저 보고 반응하느냐에 따라 수익의 차이가 발생합니다. 그래서 저는 이를 해결하기 위해 실시간 메신저 알림 프로그램을 직접 만들었습니다.


왜 메신저 알림인가?

  1. 속도: 뉴스나 포털을 수시로 새로고침할 수는 없습니다. 하지만 메신저는 푸시 알림으로 즉각 반응할 수 있습니다.
  2. 모바일 연동: 언제 어디서든 휴대폰으로 실시간 확인 가능
  3. 자동화: 정해진 기준을 넘는 환율 변화가 감지되면 자동으로 알림이 발송됩니다.

시스템 간단 소개

  1. 데이터 수집: 실시간 환율 API를 통해 원/달러, 엔/원 환율을 지속적으로 모니터링
  2. 조건 판단: 일정 퍼센트 이상 급등/급락 시 알림 조건 충족
  3. 메신저 전송: 텔레그램으로 즉시 알림 발송

예를 들어,

  • 원/달러 환율이 급등 → 곧 엔/원 환율도 영향을 받을 것으로 예측
  • 미리 세팅한 기준에 따라 매수/매도 타이밍 포착
  • 알림 받는 즉시 거래소 앱을 열고 빠르게 매도 또는 매수

기대 효과

  • 환차익 극대화
  • 불필요한 모니터링 시간 단축
  • 심리적 부담 감소 (시세 변동 감시에 대한 압박)

 

 
동작하게끔 짠 코드 (참고하세용)

import threading
import re
import time
import asyncio
import nest_asyncio
import copy
import yfinance as yf
import telegram
from telegram import Update
from telegram.ext import ApplicationBuilder, ContextTypes, MessageHandler, filters, CommandHandler
import logging
import logging.config
import json

TARGET_FILENAME = "symbols.json"
config = json.load(open('./logger.json'))
logging.config.dictConfig(config)
logger = logging.getLogger(__name__)
DBG = logger.debug
INFO = logger.info
WARN = logger.warning
ERR = logger.error

nest_asyncio.apply()

class jf_data:
    def __init__(self):
        try:
            with open(TARGET_FILENAME, "r") as f:
                self.target = json.load(f)
        except:
            self.target =  {
                "^KS11": { "name" : "KOSPI지수", "available" : 0, "p_value" : "", "value" : "", "t_value" : [0,0], "gap" : 5, "notify" : 0},
                "KRW=X": { "name" : "USD/KRW", "available" : 0, "p_value" : "", "value" : "", "t_value" : [0,0], "gap" : 2, "notify" : 0},
                "JPY=X": { "name" : "USD/JPY", "available" : 0, "p_value" : "", "value" : "", "t_value" : [0,0], "gap" : 0.5, "notify" : 0},
                "JPYKRW=X": { "name" : "JPY/KRW", "available" : 0, "p_value" : "", "value" : "", "t_value" : [0,0], "gap" : 4, "notify" : 0},
                "MCL=F": { "name" : "WTI 원유 선물", "available" : 0, "p_value" : "", "value" : "", "t_value" : [0,0], "gap" : 4, "notify" : 0},
                "2621.T": { "name" : "iShares 20+ Year US", "available" : 0, "p_value" : "", "value" : "", "t_value" : [0,0], "gap" : 5, "notify" : 0},
                "261220.KS": { "name" : "KODEX WTI원유선물(H)", "available" : 0, "p_value" : "", "value" : "", "t_value" : [0,0], "gap" : 4, "notify" : 0},
                "472830.KS": { "name" : "RISE 미국30년국채커버드콜", "available" : 0, "p_value" : "", "value" : "", "t_value" : [0,0], "gap" : 4, "notify" : 0},
                "252670.KS": { "name" : "KODEX 인버스 2x", "available" : 0, "p_value" : "", "value" : "", "t_value" : [0,0], "gap" : 4, "notify" : 0},
                "122630.KS": { "name" : "KODEX 레버리지", "available" : 0, "p_value" : "", "value" : "", "t_value" : [0,0], "gap" : 4, "notify" : 0}
                }
        self.tickers = list(self.target.keys())
        self.send_msg = ""
        self.rcv_msg = ""
        self.bot_id = "YOUR BOT_ID"
        self.chan_id = "YOUR_CHANNEL_ID"
        self.token = "YOUR_TOKEN"



def jf_event_handler(app_data):
    if 0:
        print(app_data)
    id = app_data['id']
    price = round(float(app_data['price']),2)

    jf.target[id]['value'] = price
    jf.target[id]['available'] = 1
    if jf.target[id]['p_value'] == "":
        jf.target[id]['p_value'] = jf.target[id]['value']

# -----------------------------
# 텔레그램 핸들러 함수
# -----------------------------
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE):
    message = update.message

    if message is not None:
        INFO('We got new message--------------------')
        INFO(message)
        INFO('--------------------------------------')

        try:
            await context.bot.sendMessage(chat_id=message.chat_id, text=message.text)
        except Exception as e:
            ERR(e)

        try:
            await context.bot.sendMessage(chat_id=jf.chan_id, text=message.text)
        except Exception as e:
            ERR(e)


async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Send a message when the command /start is issued."""
    await update.message.reply_text("start")

async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Send a message when the command /help is issued."""
    await update.message.reply_text("Help!")


async def set_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Send a message when the command /help is issued."""
    args = context.args

    if len(args) >= 3:
        cmd = args[1]
        parm = args[0]
        val = args[2]

        response_text = f"You sended command {cmd} {parm} {val}"
        _text = ""        #print(jf.tickers, parm)

        if parm in jf.tickers:
            if cmd == 'gap':
                jf.target[parm]['gap'] = round(float(val), 2)
                _text = f"\n{jf.target[parm]['name']} : {parm}  {jf.target[parm]['value']}\n gap is set to {jf.target[parm]['gap']}"
            elif cmd == 'down':
                jf.target[parm]['t_value'][0] = round(float(val), 2)
                _text = f"\n{jf.target[parm]['name']} : {parm}  {jf.target[parm]['value']}\n down is set to {jf.target[parm]['t_value'][0]}"
            elif cmd == 'up':
                jf.target[parm]['t_value'][1] = round(float(val), 2)
                _text = f"\n{jf.target[parm]['name']} : {parm}  {jf.target[parm]['value']}\n up is set to {jf.target[parm]['t_value'][1]}"
            else:
                _text = "\nFailed!!!\nPlease check parameters!"
        else:
            _text = "\nFailed!!!\nPlease check parameters!"


        response_text += _text
        await update.message.reply_text(response_text)
    else:
        code = ''
        for id in jf.tickers:
            temp_code = f"{jf.target[id]['name']} : {id}"
            if code != '':
                code += '\n'
            code += temp_code

        #print(code)
        help_text = f"잘 못된 명령어를 입력하였습니다.\n ex\n /set gap/up/down KOSPI레버리지 값\n /set gap ^KS11 50\n\n 종목 코드\n {code}"
        await update.message.reply_text(help_text)


async def get_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    """Send a message when the command /help is issued."""
    args = context.args

    if len(args) >= 1:
        parm = args[0]

        response_text = f"You want to get {parm}"
        #print(jf.tickers, parm)

        if parm in jf.tickers:
            _text = f"\n{jf.target[parm]['name']}({parm})\n  {jf.target[parm]['value']}\n up: {jf.target[parm]['t_value'][1]}\n down:{jf.target[parm]['t_value'][0]}\n gap: {jf.target[parm]['gap']} "
        else:
            _text = "\nFailed!!!\nPlease check parameters!"


        response_text += _text
        await update.message.reply_text(response_text)
    else:
        code = ''
        for id in jf.tickers:
            temp_code = f"{jf.target[id]['name']} : {id}"
            if code != '':
                code += '\n'
            code += temp_code

        #print(code)
        help_text = f"잘 못된 명령어를 입력하였습니다.\n ex\n /get KOSPI레버리지\n /get ^KS11\n\n 종목 코드\n {code}"
        await update.message.reply_text(help_text)
 

# -----------------------------
# 백그라운드 이벤트에서 메시지 전송
# -----------------------------
async def send_telegram_message(bot, msg: str):
    await bot.send_message(chat_id=jf.chan_id, text=msg)


# 스레드에서 이벤트 발생 시 텔레그램 메시지 전송 예약
def on_finance_event(application, msg: str):
    async def send_msg(context: ContextTypes.DEFAULT_TYPE):
        await context.bot.send_message(chat_id=jf.chan_id, text=msg)

    # job_queue는 스레드에서 안전하게 사용 가능
    application.job_queue.run_once(lambda context: asyncio.create_task(send_msg(context)), when=0)



# -----------------------------
# 봇 실행 및 스레드에서 금융 이벤트 발생 시뮬레이션
# -----------------------------
async def main():
    application = ApplicationBuilder().token(jf.token).build()

    # 텔레그램 명령/메시지 핸들러 추가
    application.add_handler(CommandHandler("start", start))
    application.add_handler(CommandHandler("help", help_command))
    application.add_handler(CommandHandler("set", set_command))
    application.add_handler(CommandHandler("get", get_command))

    #echo_handler = MessageHandler(filters.ALL, echo)
    echo_handler = MessageHandler(filters.TEXT & ~filters.COMMAND, echo)
    application.add_handler(echo_handler)


    def jfinance_threads():
        INFO('run jf')
        try:
            ticker = yf.Tickers(jf.tickers)
            data = ticker.live(message_handler=jf_event_handler)
        except :
            print("jfinance connection failed")

        INFO('end jf')

    threading.Thread(target=jfinance_threads, daemon=True).start()


    # 스레드에서 금융 이벤트 발생 시뮬레이션
    def finance_threads():
        INFO('finance thread started')
        b_send_notify = False
        time.sleep(5)
        while True:
            for id in jf.tickers:
                if jf.target[id]['available'] == 0 or jf.target[id]['p_value'] == '':
                    continue

                if jf.target[id]['value'] > jf.target[id]['p_value'] + jf.target[id]['gap']:
                    jf.target[id]['notify'] = 1  # 갭 상승
                elif jf.target[id]['value'] < jf.target[id]['p_value'] - jf.target[id]['gap']:
                    jf.target[id]['notify'] = 4 #갭 하락
                if jf.target[id]['t_value'][0] != 0 and jf.target[id]['t_value'][1] != 0:
                    if jf.target[id]['value'] < jf.target[id]['t_value'][0]:
                        jf.target[id]['notify'] = 2 # 하방 돌파
                    elif jf.target[id]['value'] > jf.target[id]['t_value'][1]:
                        jf.target[id]['notify'] = 3 # 상방 돌파

            msg_text = ""
            for id in jf.tickers:
                sta = ""
                if jf.target[id]['notify'] == 0:
                    continue
                elif jf.target[id]['notify']  == 1:
                    sta = f"갭 상승 {jf.target[id]['gap']}"
                elif jf.target[id]['notify'] == 2:
                    sta = f"하방 돌파 {jf.target[id]['t_value'][0]}"
                elif jf.target[id]['notify'] == 3:
                    sta = f"상방 돌파 {jf.target[id]['t_value'][1]}"
                elif jf.target[id]['notify'] == 4:
                    sta = f"갭 하락 {jf.target[id]['gap']}"

                tmp_text = f"{jf.target[id]['name'] } 종목이 {sta} 하였습니다.\n {id} :  {jf.target[id]['value']}"
                if msg_text != '':
                    msg_text += '\n\n'
                msg_text += tmp_text
                if jf.target[id]['notify']  == 2:
                    jf.target[id]['t_value'][0] = jf.target[id]['t_value'][0] - jf.target[id]['gap']
                    
                    jf.target[id]['t_value'][0] = jf.target[id]['t_value'][0] - jf.target[id]['gap']
                elif jf.target[id]['notify'] == 3:
                    jf.target[id]['t_value'][1] = jf.target[id]['t_value'][1] + jf.target[id]['gap']

                jf.target[id]['notify'] = 0
                jf.target[id]['p_value'] = jf.target[id]['value']

            if msg_text != "":
                try :
                    INFO(msg_text)
                    on_finance_event(application, msg_text)

                except Exception as e:
                    ERR(f'send msg {id} asyncio run error please fix it?')
                    ERR(e)

            time.sleep(0.1)
        INFO('end finance thread')

    threading.Thread(target=finance_threads, daemon=True).start()


    # 봇 시작
    await application.run_polling()

if __name__ == '__main__':
    INFO("Start finance data")
    jf = jf_data()

    asyncio.get_event_loop().run_until_complete(main())

    with open("test.json", "w") as f:
        json.dump(jf.target, f, indent=4)

 

{
    "^KS11": {
        "name" : "KOSPI지수",
        "available" : 0,
        "p_value" : "",
        "value" : "",
        "t_value" : [2620,2630],
        "gap" : 10,
        "notify" : 0
    },
    "KRW=X": {
        "name" : "USD/KRW",
        "available" : 0,
        "p_value" : "",
        "value" : "",
        "t_value" : [1350,1400],
        "gap" : 5,
        "notify" : 0
    },
    "JPY=X": {
        "name" : "USD/JPY",
        "available" : 0,
        "p_value" : "",
        "value" : "",
        "t_value" : [145,160],
        "gap" : 0.5,
        "notify" : 0
    },
    "JPYKRW=X": {
        "name" : "JPY/KRW",
        "available" : 0,
        "p_value" : "",
        "value" : "",
        "t_value" : [0,0],
        "gap" : 4,
        "notify" : 0
    },
    "MCL=F": {
        "name" : "WTI 원유 선물",
        "available" : 0,
        "p_value" : "",
        "value" : "",
        "t_value" : [0,0],
        "gap" : 4,
        "notify" : 0
    },
    "2621.T": {
        "name" : "iShares 20+ Year US",
        "available" : 0,
        "p_value" : "",
        "value" : "",
        "t_value" : [0,0],
        "gap" : 5,
        "notify" : 0
    },
    "261220.KS": {
        "name" : "KODEX WTI원유선물(H)",
        "available" : 0,
        "p_value" : "",
        "value" : "",
        "t_value" : [0,0],
        "gap" : 50,
        "notify" : 0
    },
    "472830.KS": {
        "name" : "RISE 미국30년국채커버드콜",
        "available" : 0,
        "p_value" : "",
        "value" : "",
        "t_value" : [0,0],
        "gap" : 100,
        "notify" : 0
    },
    "252670.KS": {
        "name" : "KODEX 인버스 2x",
        "available" : 0,
        "p_value" : "",
        "value" : "",
        "t_value" : [2000,2040],
        "gap" : 50,
        "notify" : 0
    },
        "122630.KS": {
        "name" : "KODEX 레버리지",
        "available" : 0,
        "p_value" : "",
        "value" : "",
        "t_value" : [16300,16700],
        "gap" : 200,
        "notify" : 0
    }
}

 

#시작
$nohup python kuma_bot.py &
 
#종료 python PID를 찾은 다음
$ps -ef | grep python
$kill -9 PID

마무리

환율 시장은 빠르게 움직입니다.
빠르게 움직이는 시장에서는 빠른 정보빠른 실행이 전부입니다.
이 시스템을 통해 단 몇 초 차이로 수익과 손실이 갈리는 환율 시장에서 한 발 앞선 대응을 하고자 합니다.