import pandas as pd import numpy as np import plotly.graph_objects as go from plotly.subplots import make_subplots class SMC: def __init__(self, data, swing_hl_window_sz=10): """ Smart Money Concept :param data: Should contain Open, High, Low, Close columns and 'Date' as index. :type data: pd.DataFrame :param swing_hl_window_sz: {int} CHoCH Detection Period. """ self.data = data self.data['Date'] = self.data.index.to_series() self.swing_hl_window_sz = swing_hl_window_sz self.order_blocks = self.order_block() self.swing_hl = self.swing_highs_lows_v2(self.swing_hl_window_sz) self.structure_map = self.bos_choch(self.swing_hl) def backtest_buy_signal_ob(self): """ :return: Get buy signals from order blocks mitigation index. :rtype: np.ndarray """ # Get only bullish order blocks which are mitigated. bull_ob = self.order_blocks[(self.order_blocks['OB']==1) & (self.order_blocks['MitigatedIndex']!=0)] arr = np.zeros(len(self.data)) # Mark the mitigated indices with 1. arr[bull_ob['MitigatedIndex'].apply(lambda x: int(x))] = 1 return arr def backtest_sell_signal_ob(self): """ :return: Get sell signals from order blocks mitigation index. :rtype: np.ndarray """ # Get only bearish order blocks which are mitigated. bear_ob = self.order_blocks[(self.order_blocks['OB'] == -1) & (self.order_blocks['MitigatedIndex'] != 0)] arr = np.zeros(len(self.data)) # Mark the mitigated indices with -1. arr[bear_ob['MitigatedIndex'].apply(lambda x: int(x))] = -1 return arr def backtest_buy_signal_structure(self): """ :return: Get buy signals from bullish structure broken index. :rtype: np.ndarray """ # Get only bullish structure. bull_struct = self.structure_map[(self.structure_map['BOS'] == 1) | (self.structure_map['CHOCH'] == 1)] arr = np.zeros(len(self.data)) # Mark the broken indices with 1. arr[bull_struct['BrokenIndex'].apply(lambda x: int(x))] = 1 return arr def backtest_sell_signal_structure(self): """ :return: Get buy signals from bullish structure broken index. :rtype: np.ndarray """ # Get only bearish structure. bull_struct = self.structure_map[(self.structure_map['BOS'] == -1) | (self.structure_map['CHOCH'] == -1)] arr = np.zeros(len(self.data)) # Mark the broken indices with -1. arr[bull_struct['BrokenIndex'].apply(lambda x: int(x))] = 1 return arr def swing_highs_lows(self, window_size): """ Basic version of swing highs and lows. Suitable for finding swing order blocks. :param window_size: Window size for searching swing highs and lows :type window_size: int :return: DataFrame with Date, highs(bool), lows(bool) columns :rtype: pd.DataFrame """ l = self.data['Low'].reset_index(drop=True) h = self.data['High'].reset_index(drop=True) swing_highs = (h.rolling(window_size, center=True).max() / h == 1.) swing_lows = (l.rolling(window_size, center=True).min() / l == 1.) return pd.DataFrame({'Date':self.data.index.to_series(), 'highs':swing_highs.values, 'lows':swing_lows.values}) def swing_highs_lows_v2(self, window_size): """ Updated version of swing_highs_lows function. Suitable for BOS and CHoCH. :param window_size: Window size for searching swing highs and lows. :type window_size: int :return: DataFrame with HighLow(1 for bull, -1 for bear), Level columns. :rtype: pd.DataFrame """ # Reversing the datapoints for .rolling() method with right to left. l = self.data['Low'][::-1].reset_index(drop=True) h = self.data['High'][::-1].reset_index(drop=True) swing_highs = (h.rolling(window_size, min_periods=1).max() / h == 1.)[::-1] swing_lows = (l.rolling(window_size, min_periods=1).min() / l == 1.)[::-1] swing_highs.reset_index(drop=True, inplace=True) swing_lows.reset_index(drop=True, inplace=True) # Mark swing highs as 1 and swing lows as -1. swings = np.where((swing_highs | swing_lows), np.where(swing_highs, 1, -1), 0) # Filtering only one swing high between two swing lows and vice-versa. state = 1 for i in range(1, swings.shape[0]): if swings[i] == state or swings[i] == 0: swings[i] = 0 else: state *= -1 # Replace 0 with NaN. swing_highs_lows = np.where(swings==0, np.nan, swings) # Get positions of swing_highs_lows where elements are not np.nan pos = np.where(~np.isnan(swing_highs_lows))[0] # Set first position and last position of swing_highs_lows. if len(pos) > 0: if swing_highs_lows[pos[0]] == 1: swing_highs_lows[0] = -1 if swing_highs_lows[pos[0]] == -1: swing_highs_lows[0] = 1 if swing_highs_lows[pos[-1]] == -1: swing_highs_lows[-1] = 1 if swing_highs_lows[pos[-1]] == 1: swing_highs_lows[-1] = -1 level = np.where( ~np.isnan(swing_highs_lows), np.where(swing_highs_lows == 1, self.data.High, self.data.Low), np.nan, ) return pd.concat( [ pd.Series(swing_highs_lows, name="HighLow"), pd.Series(level, name="Level"), ], axis=1, ) def bos_choch(self, swing_highs_lows): """ Break of Structure and Change of Character :param swing_highs_lows: A DataFrame which contains swing highs and lows. Format should be same as swing_highs_lows_v2() function. :type swing_highs_lows: pd.DataFrame :return: A DataFrame with BOS(1 for bear, -1 for bull), CHOCH(1 for bear, -1 for bull), Level, BrokenIndex as columns. :rtype: pd.DataFrame """ level_order = [] highs_lows_order = [] bos = np.zeros(len(self.data), dtype=np.int32) choch = np.zeros(len(self.data), dtype=np.int32) level = np.zeros(len(self.data), dtype=np.float32) last_positions = [] for i in range(len(swing_highs_lows["HighLow"])): if not np.isnan(swing_highs_lows["HighLow"][i]): level_order.append(swing_highs_lows["Level"][i]) highs_lows_order.append(swing_highs_lows["HighLow"][i]) if len(level_order) >= 4: # bullish bos # -1 # -3 __BOS__ / \ # / \ / \ # / \ / # \ / \ / # \ / -2 # -4 bos[last_positions[-2]] = ( 1 if ( np.all(highs_lows_order[-4:] == [-1, 1, -1, 1]) and np.all( level_order[-4] < level_order[-2] < level_order[-3] < level_order[-1] ) ) else 0 ) level[last_positions[-2]] = ( level_order[-3] if bos[last_positions[-2]] !=0 else 0 ) # bearish bos # -4 # / \ -2 # / \ / \ # \ / \ # \ / \ # \ /__BOS__\ / # -3 \ / # -1 bos[last_positions[-2]] = ( -1 if( np.all(highs_lows_order[-4:] == [1, -1, 1, -1]) and np.all( level_order[-4] > level_order[-2] > level_order[-3] > level_order[-1] ) ) else bos[last_positions[-2]] ) level[last_positions[-2]] = ( level_order[-3] if bos[last_positions[-2]] != 0 else 0 ) # bullish CHoCH # -1 # -3 __CHoCH__ / \ # / \ / \ # / \ / # \ / \ / # \ / \ / # -4 \ / # -2 choch[last_positions[-2]] = ( 1 if ( np.all(highs_lows_order[-4:] == [-1, 1, -1, 1]) and np.all( level_order[-1] > level_order[-3] > level_order[-4] > level_order[-2] ) ) else 0 ) level[last_positions[-2]] = ( level_order[-3] if choch[last_positions[-2]] != 0 else level[last_positions[-2]] ) # bearish CHoCH # -2 # -4 / \ # / \ / \ # / \ / \ # \ / \ # \ / \ # -3__CHoCH__ \ / # \ / # -1 choch[last_positions[-2]] = ( -1 if ( np.all(highs_lows_order[-4:] == [1, -1, 1, -1]) and np.all( level_order[-1] < level_order[-3] < level_order[-4] < level_order[-2] ) ) else choch[last_positions[-2]] ) level[last_positions[-2]] = ( level_order[-3] if choch[last_positions[-2]] != 0 else level[last_positions[-2]] ) last_positions.append(i) broken = np.zeros(len(self.data), dtype=np.int32) for i in np.where(np.logical_or(bos != 0, choch != 0))[0]: mask = np.zeros(len(self.data), dtype=np.bool_) # if the bos is 1 then check if the candles high has gone above the level if bos[i] == 1 or choch[i] == 1: mask = self.data.Close[i + 2:] > level[i] # if the bos is -1 then check if the candles low has gone below the level elif bos[i] == -1 or choch[i] == -1: mask = self.data.Close[i + 2:] < level[i] if np.any(mask): j = np.argmax(mask) + i + 2 broken[i] = j # if there are any unbroken bos or CHoCH that started before this one and ended after this one then remove them for k in np.where(np.logical_or(bos != 0, choch != 0))[0]: if k < i and broken[k] >= j: bos[k] = 0 choch[k] = 0 level[k] = 0 # remove the ones that aren't broken for i in np.where( np.logical_and(np.logical_or(bos != 0, choch != 0), broken == 0) )[0]: bos[i] = 0 choch[i] = 0 level[i] = 0 # replace all the 0s with np.nan bos = np.where(bos != 0, bos, np.nan) choch = np.where(choch != 0, choch, np.nan) level = np.where(level != 0, level, np.nan) broken = np.where(broken != 0, broken, np.nan) bos = pd.Series(bos, name="BOS") choch = pd.Series(choch, name="CHOCH") level = pd.Series(level, name="Level") broken = pd.Series(broken, name="BrokenIndex") return pd.concat([bos, choch, level, broken], axis=1) def fvg(self): """ FVG - Fair Value Gap A fair value gap is when the previous high is lower than the next low if the current candle is bullish. Or when the previous low is higher than the next high if the current candle is bearish. :return:\ FVG = 1 if bullish fair value gap, -1 if bearish fair value gap Top = the top of the fair value gap Bottom = the bottom of the fair value gap MitigatedIndex = the index of the candle that mitigated the fair value gap :rtype: pd.DataFrame """ fvg = np.where( ( (self.data["High"].shift(1) < self.data["Low"].shift(-1)) & (self.data["Close"] > self.data["Open"]) ) | ( (self.data["Low"].shift(1) > self.data["High"].shift(-1)) & (self.data["Close"] < self.data["Open"]) ), np.where(self.data["Close"] > self.data["Open"], 1, -1), np.nan, ) top = np.where( ~np.isnan(fvg), np.where( self.data["Close"] > self.data["Open"], self.data["Low"].shift(-1), self.data["Low"].shift(1), ), np.nan, ) bottom = np.where( ~np.isnan(fvg), np.where( self.data["Close"] > self.data["Open"], self.data["High"].shift(1), self.data["High"].shift(-1), ), np.nan, ) mitigated_index = np.zeros(len(self.data), dtype=np.int32) for i in np.where(~np.isnan(fvg))[0]: mask = np.zeros(len(self.data), dtype=np.bool_) if fvg[i] == 1: mask = self.data["Low"][i + 2:] <= top[i] elif fvg[i] == -1: mask = self.data["High"][i + 2:] >= bottom[i] if np.any(mask): j = np.argmax(mask) + i + 2 mitigated_index[i] = j mitigated_index = np.where(np.isnan(fvg), np.nan, mitigated_index) return pd.concat( [ pd.Series(fvg.flatten(), name="FVG"), pd.Series(top.flatten(), name="Top"), pd.Series(bottom.flatten(), name="Bottom"), pd.Series(mitigated_index.flatten(), name="MitigatedIndex"), ], axis=1, ) def order_block(self, imb_perc=.1, join_consecutive=True): """ OB - Order Block Order block is the presence of a chunk of market orders that results in a sudden rise or fall in the market :return:\ OB = 1 if bullish order block, -1 if bearish order block Top = the top of the order block Bottom = the bottom of the order block MitigatedIndex = the index of the candle that mitigated the order block :rtype: pd.DataFrame """ hl = self.swing_highs_lows(self.swing_hl_window_sz) ob = np.where( ( ((self.data["High"]*((100+imb_perc)/100)) < self.data["Low"].shift(-2)) & ((hl['lows']==True) | (hl['lows'].shift(1)==True)) ) | ( (self.data["Low"] > (self.data["High"].shift(-2)*((100+imb_perc)/100))) & ((hl['highs']==True) | (hl['highs'].shift(1)==True)) ), np.where(((hl['lows']==True) | (hl['lows'].shift(1)==True)), 1, -1), np.nan, ) # print(ob) top = np.where( ~np.isnan(ob), np.where( self.data["Close"] > self.data["Open"], self.data["Low"].shift(-2), self.data["Low"], ), np.nan, ) bottom = np.where( ~np.isnan(ob), np.where( self.data["Close"] > self.data["Open"], self.data["High"], self.data["High"].shift(-2), ), np.nan, ) # if join_consecutive: # for i in range(len(ob) - 1): # if ob[i] == ob[i + 1]: # top[i + 1] = max(top[i], top[i + 1]) # bottom[i + 1] = min(bottom[i], bottom[i + 1]) # ob[i] = top[i] = bottom[i] = np.nan mitigated_index = np.zeros(len(self.data), dtype=np.int32) for i in np.where(~np.isnan(ob))[0]: mask = np.zeros(len(self.data), dtype=np.bool_) if ob[i] == 1: mask = self.data["Low"][i + 3:] <= top[i] elif ob[i] == -1: mask = self.data["High"][i + 3:] >= bottom[i] if np.any(mask): j = np.argmax(mask) + i + 3 mitigated_index[i] = int(j) ob = ob.flatten() mitigated_index1 = np.where(np.isnan(ob), np.nan, mitigated_index) return pd.concat( [ pd.Series(ob.flatten(), name="OB"), pd.Series(top.flatten(), name="Top"), pd.Series(bottom.flatten(), name="Bottom"), pd.Series(mitigated_index1.flatten(), name="MitigatedIndex"), ], axis=1, ).dropna(subset=['OB']) def plot(self, order_blocks=False, swing_hl=False, swing_hl_v2=False, structure=False, show=True): """ :param order_blocks: :param swing_hl: :param swing_hl_v2: :param structure: :param show: :return: """ fig = make_subplots(1, 1) # plot the candle stick graph fig.add_trace(go.Candlestick(x=self.data.index.to_series(), open=self.data['Open'], high=self.data['High'], low=self.data['Low'], close=self.data['Close'], name='ohlc')) # grab first and last observations from df.date and make a continuous date range from that dt_all = pd.date_range(start=self.data['Date'].iloc[0], end=self.data['Date'].iloc[-1], freq='5min') # check which dates from your source that also accur in the continuous date range dt_obs = [d.strftime("%Y-%m-%d %H:%M:%S") for d in self.data['Date']] # isolate missing timestamps dt_breaks = [d for d in dt_all.strftime("%Y-%m-%d %H:%M:%S").tolist() if not d in dt_obs] # adjust xaxis for rangebreaks fig.update_xaxes(rangebreaks=[dict(dvalue=5 * 60 * 1000, values=dt_breaks)]) if order_blocks: # print(self.order_blocks.head()) # print(self.order_blocks.index.to_list()) ob_df = self.data.iloc[self.order_blocks.index.to_list()] # print(ob_df) fig.add_trace(go.Scatter( x=ob_df['Date'], y=ob_df['Low'], name="Order Block", mode='markers', marker_symbol='diamond-dot', marker_size=13, marker_line_width=2, # offsetgroup=0, )) if swing_hl: hl = self.swing_highs_lows(self.swing_hl_window_sz) h = hl[(hl['highs']==True)] l = hl[hl['lows']==True] fig.add_trace(go.Scatter( x=h['Date'], y=self.data[self.data.Date.isin(h['Date'])]['High']*(100.1/100), mode='markers', marker_symbol="triangle-up-dot", marker_size=10, name='Swing High', # offsetgroup=2, )) fig.add_trace(go.Scatter( x=l['Date'], y=self.data[self.data.Date.isin(l['Date'])]['Low']*(99.9/100), mode='markers', marker_symbol="triangle-down-dot", marker_size=10, name='Swing Low', marker_color='red', # offsetgroup=2, )) if swing_hl_v2: hl = self.swing_hl h = hl[hl['HighLow']==1] l = hl[hl['HighLow']==-1] fig.add_trace(go.Scatter( x=self.data['Date'].iloc[h.index], y=h['Level'], mode='markers', marker_symbol="triangle-up-dot", marker_size=10, name='Swing High', marker_color='green', )) fig.add_trace(go.Scatter( x=self.data['Date'].iloc[l.index], y=l['Level'], mode='markers', marker_symbol="triangle-down-dot", marker_size=10, name='Swing Low', marker_color='red', )) if structure: struct = self.structure_map struct.dropna(subset=['Level'], inplace=True) for i in range(len(struct)): x0 = self.data['Date'].iloc[struct.index[i]] x1 = self.data['Date'].iloc[int(struct['BrokenIndex'].iloc[i])] y = struct['Level'].iloc[i] label = "BOS" if np.isnan(struct['CHOCH'].iloc[i]) else "CHOCH" direction = struct[label].iloc[i] # Add scatter trace for the line fig.add_trace(go.Scatter( x=[x0, x1], # x-coordinates of the line y=[y, y], # y-coordinates of the line mode="lines+text", # Line and optional label line=dict(color="blue" if label=="BOS" else "orange"), # Customize line color text=[None, label], # Add label only at one end textposition="top left" if direction==1 else "bottom left", # Adjust label position name=label, # Legend entry for this line showlegend=False )) fig.update_layout(xaxis_rangeslider_visible=False) if show: fig.show() return fig def EMA(array, n): """ :param array: price of the stock :param n: window size :type n: int :return: Exponential moving average :rtype: pd.Series """ return pd.Series(array).ewm(span=n, adjust=False).mean() if __name__ == "__main__": from utils import fetch data = fetch('ICICIBANK.NS', period='1mo', interval='15m') data = fetch('RELIANCE.NS', period='1mo', interval='15m') data['Date'] = data.index.to_series() filter = pd.to_datetime('2024-12-17 09:50:00.0000000011', format='%Y-%m-%d %H:%M:%S.%f') # data = data[data['Date']