from tvDatafeed import TvDatafeed, Interval as TVInterval from argparse import ArgumentParser import numpy as np from fastapi import FastAPI from fastapi.responses import HTMLResponse import uvicorn import telebot import multiprocessing import time import os bot = telebot.TeleBot(os.getenv('TELEGRAM_TOKEN')) all_processes = {} def get_data(symbol, exchange, interval): data = tv.get_hist(symbol=symbol, exchange=exchange, interval=interval_dict[interval], n_bars=50) period = 14 rsi, k, d = StochRSI(data['close'], period=period) data = data[period:] data['k'] = k.to_numpy() data['d'] = d.to_numpy() # long buy indicator data['k_higher'] = data['k'] > data['d'] data['below_20'] = data['k'] < 0.2 data['long_crossover'] = data['k_higher'] != data['k_higher'].shift() # short buy indicator data['k_lower'] = data['k'] < data['d'] data['above_80'] = data['k'] > 0.8 data['short_crossover'] = data['k_lower'] != data['k_lower'].shift() data = data[-10:] return data def get_buy_long_indicator(data): time = data.index.strftime('%Y-%m-%d %H:%M') buy_long = (data['k_higher'] == True) & (data['k_higher'] == data['below_20']) & ( data['below_20'] == data['long_crossover']) table = np.vstack([time.to_numpy(), buy_long.to_numpy()]).T return table def get_buy_short_indicator(data): time = data.index.strftime('%Y-%m-%d %H:%M') buy_long = (data['k_lower'] == True) & (data['k_lower'] == data['above_80']) & ( data['above_80'] == data['short_crossover']) table = np.vstack([time.to_numpy(), buy_long.to_numpy()]).T return table def check_risk(symbol, exchange, type='long'): reverse_indicator = { 'long': max, 'short': min } three_hour = get_data(symbol, exchange, '3H') time.sleep(2) four_hour = get_data(symbol, exchange, '4H') time.sleep(2) one_day = get_data(symbol, exchange, '1D') time.sleep(2) three_hour = three_hour[-8:] four_hour = four_hour[-6:] one_day = one_day[-2:] k_three_hour = reverse_indicator[type](three_hour['k']) k_four_hour = reverse_indicator[type](four_hour['k']) k_one_day = reverse_indicator[type](one_day['k']) def risk_evaluate(greatest_point, type): return 3 if (type == 'long' and greatest_point >= 0.8) or ( type == 'short' and greatest_point <= 0.2) else 2 if 0.2 < greatest_point < 0.8 else 0 # if long evaluate risk on long three_hour_risk = risk_evaluate(k_three_hour, type) four_hour_risk = risk_evaluate(k_four_hour, type) one_day_risk = risk_evaluate(k_one_day, type) risk = three_hour_risk + four_hour_risk + one_day_risk if risk >= 6: return 'HIGH risk' elif risk >= 4: return 'MEDIUM risk' else: return 'LOW risk' def push_notification(symbol, exchange, interval): while True: data = get_data(symbol, exchange, interval) time.sleep(1) # send long buy message table = get_buy_long_indicator(data) long_risk = check_risk(symbol, exchange, 'long') if table[-1, 1]: bot.send_message(os.getenv('TELEGRAM_CHAT_ID'), f'LONG now ({long_risk}): {symbol} {exchange} {interval}', timeout=20) # send short buy message table = get_buy_short_indicator(data) short_risk = check_risk(symbol, exchange, 'short') if table[-1, 1]: bot.send_message(os.getenv('TELEGRAM_CHAT_ID'), f'SHORT now ({short_risk}): {symbol} {exchange} {interval}', timeout=20) if 'H' in interval: time.sleep(3600) else: time.sleep(int(interval) * 4) def data_arg_parser(): parser = ArgumentParser() parser.add_argument('--symbol', type=str, default='LTCUSDT.P') parser.add_argument('--exchange', type=str, default='MEXC') parser.add_argument('--window_length', type=int, default=50) parser.add_argument('--target_length', type=str, default=5) parser.add_argument('--interval', type=str, default='15', choices=['1', '3', '5', '15', '30', '45', '1H', '2H', '3H', '4H', '1D', '1W', '1M']) parser.add_argument('--n_bars', type=str, default=5000) parser.add_argument('--training', type=bool, default=True) return parser # calculating Stoch RSI (gives the same values as TradingView) # https://www.tradingview.com/wiki/Stochastic_RSI_(STOCH_RSI) def StochRSI(series, period=14, smoothK=3, smoothD=3): # Calculate RSI delta = series.diff().dropna() ups = delta * 0 downs = ups.copy() ups[delta > 0] = delta[delta > 0] downs[delta < 0] = -delta[delta < 0] ups[ups.index[period-1]] = np.mean( ups[:period] ) #first value is sum of avg gains ups = ups.drop(ups.index[:(period-1)]) downs[downs.index[period-1]] = np.mean( downs[:period] ) #first value is sum of avg losses downs = downs.drop(downs.index[:(period-1)]) rs = ups.ewm(com=period-1,min_periods=0,adjust=False,ignore_na=False).mean() / \ downs.ewm(com=period-1,min_periods=0,adjust=False,ignore_na=False).mean() rsi = 100 - 100 / (1 + rs) # Calculate StochRSI stochrsi = (rsi - rsi.rolling(period).min()) / (rsi.rolling(period).max() - rsi.rolling(period).min()) stochrsi_K = stochrsi.rolling(smoothK).mean() stochrsi_D = stochrsi_K.rolling(smoothD).mean() return stochrsi, stochrsi_K, stochrsi_D app = FastAPI() interval_dict = {i.value: i for i in TVInterval} tv = TvDatafeed() def create_html_table(data, headers): # Start building the HTML table html_table = "\n" # Add the table headers html_table += "" for header in headers: html_table += "".format(header) html_table += "\n" # Add the table rows for row in data: html_table += "" for value in row: if value is True: html_table += "".format(value) else: html_table += "".format(value) html_table += "\n" # Close the table html_table += "
{}
{}{}
" return html_table @app.get('/startpolling/{exchange}/{symbol}/{interval}') def startpolling(exchange, symbol, interval): global all_processes if all_processes.get(f'{symbol}_{exchange}_{interval}', None) is not None: return False bot.send_message(os.getenv('TELEGRAM_CHAT_ID'), f'STARTING BOT FOR: {symbol} {exchange} {interval}', timeout=20) process = multiprocessing.Process(target=push_notification, args=(symbol, exchange, interval)) process.start() all_processes[f'{symbol}_{exchange}_{interval}'] = {'process_stop': False, 'process': process} return True @app.get('/endpolling/{exchange}/{symbol}/{interval}') def endpolling(exchange, symbol, interval): global all_processes if all_processes.get(f'{symbol}_{exchange}_{interval}', None) is None: return False selected_process = all_processes[f'{symbol}_{exchange}_{interval}'] selected_process['process_stop'] = True bot.send_message(os.getenv('TELEGRAM_CHAT_ID'), f'STOPPING BOT FOR: {symbol} {exchange} {interval}', timeout=20) selected_process['process'].terminate() all_processes.pop(f'{symbol}_{exchange}_{interval}') bot.send_message(os.getenv('TELEGRAM_CHAT_ID'), f'BOT STOPPED FOR: {symbol} {exchange} {interval}', timeout=20) return True @app.get('/endall') def end_all_processes(): global all_processes for process_name, process_obj in all_processes.items(): process_obj['process_stop'] = True process_obj['process'].terminate() all_processes = {} bot.send_message(os.getenv('TELEGRAM_CHAT_ID'), f'ALL NOTIFICATIONS ENDED', timeout=20) return True @app.get('/{exchange}/{symbol}/{interval}') def get_exchange_data(exchange, symbol, interval): data = get_data(symbol, exchange, interval) table = get_buy_long_indicator(data) html_content = create_html_table(table, ['time', 'long buy']) return HTMLResponse(content=html_content, status_code=200) if __name__ == '__main__': startpolling('MEXC', 'BTCUSDT.P', '30') time.sleep(1) startpolling('MEXC', 'SOLUSDT.P', '30') time.sleep(1) startpolling('MEXC', 'ADAUSDT.P', '30') time.sleep(1) startpolling('MEXC', 'XRPUSDT.P', '30') time.sleep(1) startpolling('MEXC', 'DOGEUSDT.P', '30') time.sleep(1) startpolling('TSX', 'CNQ', '4H') time.sleep(1) startpolling('TSX', 'TEC', '4H') time.sleep(1) startpolling('TSX', 'SHOP', '4H') time.sleep(1) startpolling('TSX', 'CP', '4H') time.sleep(1) startpolling('TSX', 'ATD', '4H') time.sleep(1) time.sleep(1) startpolling('TSX', 'XEQT', '4H') time.sleep(1) startpolling('TSX', 'VFV', '4H') time.sleep(1) uvicorn.run(app, host='0.0.0.0', port=7860) bot.infinity_polling(timeout=10, long_polling_timeout=5)