""" 下单信号管理 """ import time from l2 import l2_log from l2.huaxin import l2_huaxin_util from log_module import async_log_util from log_module.log import logger_l2_trade_buy, logger_debug from utils import tool class L2TradeSingleDataProcessor: """ L2逐笔成交数据信号管理 """ __latest_sell_data = {} # 最近的涨停卖委托列表 __latest_limit_up_sell_list_dict = {} __latest_limit_up_sell_order_no_set_dict = {} # 主动卖订单号集合 __active_sell_order_no_set_dict = {} # 主动买单数据,格式:{"code":[买单号,当前成交手数, 当前成交金额, 开始时间, 结束时间]} __latest_active_buy_order_data_dict = {} # 所有的买单数据,格式:{code:[[买单数据],...]} __active_buy_order_datas_dict = {} @classmethod def clear_passive_sell_data(cls, code): """ 清除被动卖相关的数据 @param code: @return: """ if code in cls.__latest_active_buy_order_data_dict: cls.__latest_active_buy_order_data_dict.pop(code) if code in cls.__active_buy_order_datas_dict: cls.__active_buy_order_datas_dict.pop(code) if code in cls.__latest_limit_up_sell_list_dict: cls.__latest_limit_up_sell_list_dict.pop(code) if code in cls.__latest_limit_up_sell_order_no_set_dict: cls.__latest_limit_up_sell_order_no_set_dict.pop(code) @classmethod def add_l2_delegate_limit_up_sell(cls, code, data): """ 添加涨停卖单数据,当前不是涨停成交时才加 @param code: @param data: @return: """ if code not in cls.__latest_limit_up_sell_list_dict: cls.__latest_limit_up_sell_list_dict[code] = [] cls.__latest_limit_up_sell_list_dict[code].append(data) # async_log_util.info(logger_debug, f"添加涨停卖数据:{code}-{data}") if code not in cls.__latest_limit_up_sell_order_no_set_dict: cls.__latest_limit_up_sell_order_no_set_dict[code] = set() cls.__latest_limit_up_sell_order_no_set_dict[code].add(data['val']['orderNo']) # 只保留前20的数据 if len(cls.__latest_limit_up_sell_list_dict[code]) > 20: delete_datas = cls.__latest_limit_up_sell_list_dict[code][:-20] cls.__latest_limit_up_sell_list_dict[code] = cls.__latest_limit_up_sell_list_dict[code][-20:] # 删除之前的map for d in delete_datas: cls.__latest_limit_up_sell_order_no_set_dict[code].discard(d["val"]["orderNo"]) @classmethod def add_l2_delegate_limit_up_sell_cancel(cls, code, order_no): """ 涨停卖撤 @param code: @param order_no: @return: """ if code not in cls.__latest_limit_up_sell_order_no_set_dict: return order_no_set = cls.__latest_limit_up_sell_order_no_set_dict[code] if order_no not in order_no_set: return cls.__latest_limit_up_sell_order_no_set_dict[code].discard(order_no) for i in range(0, len(cls.__latest_limit_up_sell_list_dict[code])): if cls.__latest_limit_up_sell_list_dict[code][i]['val']['orderNo'] == order_no: cls.__latest_limit_up_sell_list_dict[code].pop(i) break @classmethod def get_latest_limit_up_sell_order_count(cls, code): sell_list = cls.__latest_limit_up_sell_list_dict.get(code) if not sell_list: return 0 return len(sell_list) @classmethod def process_passive_limit_up_sell_data(cls, code, fdatas): """ 添加涨停被动卖成交数据 @param fdata: 数据格式:(data['SecurityID'], data['TradePrice'], data['TradeVolume'], # data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], # data['SellNo'], data['ExecType']) [(数据本身, 是否主动买, 是否涨停, 总成交额, 不含ms时间,含ms时间)] @return: 涨停卖是否已经吃完 """ try: start_time = time.time() sell_list = cls.__latest_limit_up_sell_list_dict.get(code) if not sell_list: return False last_sell_info = sell_list[-1] for data in fdatas: # 过滤被动买 if not data[1]: # 出现被动买需要将历史大单清空 if cls.__active_buy_order_datas_dict.get(code): cls.__active_buy_order_datas_dict[code].clear() continue money = data[3] # 统计买单数据 if code not in cls.__latest_active_buy_order_data_dict: # [买单号,当前成交股数, 当前成交金额, 开始时间, 结束时间] cls.__latest_active_buy_order_data_dict[code] = [data[0][6], data[0][2], money, data[0][3], data[0][3]] else: if data[0][6] == cls.__latest_active_buy_order_data_dict[code][0]: # 同一买单号 cls.__latest_active_buy_order_data_dict[code][1] += data[0][2] cls.__latest_active_buy_order_data_dict[code][2] += money cls.__latest_active_buy_order_data_dict[code][4] = data[0][3] else: # 不同买单号 if cls.__latest_active_buy_order_data_dict[code][2] >= 2990000: # 加入大单列表 if code not in cls.__active_buy_order_datas_dict: cls.__active_buy_order_datas_dict[code] = [] cls.__active_buy_order_datas_dict[code].append( cls.__latest_active_buy_order_data_dict[code]) cls.__latest_active_buy_order_data_dict[code] = [data[0][6], data[0][2], money, data[0][3], data[0][3]] if not data[2]: # 排除主动卖/非涨停卖 continue sell_no = data[0][7] if sell_no != last_sell_info['val']['orderNo']: continue # 需要判断当前单是否已经成交完成 if code not in cls.__latest_sell_data: cls.__latest_sell_data[code] = [sell_no, data[0][2]] else: if cls.__latest_sell_data[code][0] == sell_no: cls.__latest_sell_data[code][1] += data[0][2] else: cls.__latest_sell_data[code] = [sell_no, data[0][2]] sell_info_num = last_sell_info['val']['num'] deal_num = cls.__latest_sell_data[code][1] // 100 if sell_info_num == deal_num: use_time = round((time.time() - start_time) * 1000, 3) l2_log.info(code, logger_l2_trade_buy, f"找到最近的被动涨停卖单数据:{last_sell_info['val']['orderNo']}, 成交数据:{data} 计算耗时:{use_time}ms, 可以触发下单") # 将历史大单列表与最近的大单加入列表 big_buy_order_datas = [] if code in cls.__active_buy_order_datas_dict: big_buy_order_datas.extend(cls.__active_buy_order_datas_dict[code]) if cls.__latest_active_buy_order_data_dict[code][2] >= 2990000: big_buy_order_datas.append(cls.__latest_active_buy_order_data_dict[code]) # 成交完成 L2TradeSingleDataManager.set_latest_sell_data(code, data, big_buy_order_datas) l2_log.info(code, logger_l2_trade_buy, "被动卖数据处理完毕") if tool.is_sz_code(code): # 涨停主动卖已经被吃完,可以清除 return True break except Exception as e: logger_debug.exception(e) return False @classmethod def filter_last_limit_up_sell_data(cls, code, fdatas): """ 筛选出最后一条涨停卖成交数据 @param code: @param fdatas: @return: (成交数据, 卖单数据) """ def compute_last_sell(): # 最大买单号 max_buy_order_no = fdatas[-1][0][6] for i in range(len(sell_list) - 1, -1, -1): if sell_list[i]['val']['orderNo'] > max_buy_order_no: continue return sell_list[i] return None if not fdatas[-1][2]: # 最后一条数据不是涨停成交数据 return None sell_list = cls.__latest_limit_up_sell_list_dict.get(code) if not sell_list: return None last_sell_info = compute_last_sell() if not last_sell_info: return None for data in fdatas: if not data[2]: # 排除主动卖/非涨停卖 continue sell_no = data[0][7] if sell_no != last_sell_info['val']['orderNo']: continue # 需要判断当前单是否已经成交完成 if code not in cls.__latest_sell_data: cls.__latest_sell_data[code] = [sell_no, data[0][2]] else: if cls.__latest_sell_data[code][0] == sell_no: cls.__latest_sell_data[code][1] += data[0][2] else: cls.__latest_sell_data[code] = [sell_no, data[0][2]] sell_info_num = last_sell_info['val']['num'] deal_num = cls.__latest_sell_data[code][1] // 100 if sell_info_num == deal_num: # 最后一笔涨停卖已经成交完成 l2_log.info(code, logger_l2_trade_buy, f"找到最近的被动涨停卖单数据:{last_sell_info}, 成交数据:{data} 可以触发下单") return data, last_sell_info return None @classmethod def add_active_limit_up_sell_data(cls, data): """ 添加主动卖数据 @param data: @return: """ try: code = data[0] sell_no = data[7] if code not in cls.__active_sell_order_no_set_dict: cls.__active_sell_order_no_set_dict[code] = set() cls.__active_sell_order_no_set_dict[code].add(sell_no) except Exception as e: logger_debug.exception(e) class L2TradeSingleCallback: """ 交易信号回调 """ def OnTradeSingle(self, code, big_buy_order_count, _type, data): """ 交易数据信号回调 @param code: @param big_buy_order_count: 大买单数量 @param _type: 类型:0-TYPE_PASSIVE 1-TYPE_ACTIVE @param data: (逐笔成交数据,生效时间) @return: """ def OnLimitUpActiveBuy(self, code, transaction_data, no_left_limit_up_sell): """ 涨停主动买触发 @param code: @param transaction_data: 成交数据 @param no_left_limit_up_sell: 是否还剩余涨停卖 @return: """ def OnLastLimitUpSellDeal(self, code, data): """ 最后一笔涨停卖成交 @param code: 代码 @param data: 成交的数据 @return: """ class L2TradeSingleDataManager: __callback = None TYPE_PASSIVE = 0 TYPE_ACTIVE = 1 """ 买入信号管理 """ # 最近的涨停卖被动成交数据 __latest_sell_data_dict = {} # 格式为:{code:(逐笔成交数据,生效时间(带ms))} # 由被动向主动卖成交转变的数据 __latest_sell_active_deal_data_dict = {} # 格式为:{code:(逐笔成交数据,生效时间(带ms))} @classmethod def set_callback(cls, callback: L2TradeSingleCallback): cls.__callback = callback @classmethod def set_latest_sell_data(cls, code, fdata, big_active_buy_order_datas): """ 设置最近成交的涨停卖被动成交数据 @param code: 代码 @param data: L2逐笔成交数据 数据格式:(data['SecurityID'], data['TradePrice'], data['TradeVolume'], # data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], # data['SellNo'], data['ExecType']) @param big_active_buy_order_datas: 大主动买单数据:[[买单号,当前成交股数, 当前成交金额, 开始时间, 结束时间],....] @return: """ deal_time = fdata[5] # 生效时间在1s以内 cls.__latest_sell_data_dict[code] = (fdata[0], tool.trade_time_add_millionsecond(deal_time, 1000)) if cls.__callback: big_buy_order_count = 0 if big_active_buy_order_datas: for b in big_active_buy_order_datas: if b[0] > fdata[0][7]: # 买单在卖单之后 big_buy_order_count += 1 cls.__callback.OnTradeSingle(code, big_buy_order_count, cls.TYPE_PASSIVE, cls.__latest_sell_data_dict[code]) @classmethod def set_sell_passive_to_active_datas(cls, code, passive_data, active_data): """ 设置被动卖成交向主动卖成交的转变数据 @param code: 代码 @param passive_data: 被动卖成交逐笔 @param active_data: 主动卖成交逐笔 @return: """ # 暂时不需要生效 l2_log.info(code, logger_l2_trade_buy, f"被动卖变主动卖:{passive_data} => {active_data}") # deal_time = l2_huaxin_util.convert_time(passive_data[3], True) # # 生效时间在1s以内 # cls.__latest_sell_active_deal_data_dict[code] = ( # active_data, tool.trade_time_add_millionsecond(deal_time, 1000)) # if cls.__callback: # cls.__callback.OnTradeSingle(code, 0, cls.TYPE_ACTIVE, cls.__latest_sell_active_deal_data_dict[code]) @classmethod def set_limit_up_active_buy(cls, code, transaction_datas, no_left_limit_up_sell): if transaction_datas: cls.__callback.OnLimitUpActiveBuy(code, transaction_datas, no_left_limit_up_sell) @classmethod def get_valid_trade_single(cls, code, latest_time_with_ms): """ 获取有效的成交下单信号 @param code: @param latest_time_with_ms: @return: (逐笔成交数据, 类型) """ # 如果有最近卖涨停成交完就用最近的数据 data = cls.__latest_sell_data_dict.get(code) if data and tool.trade_time_sub_with_ms(latest_time_with_ms, data[1]) <= 0: return data, cls.TYPE_PASSIVE data = cls.__latest_sell_active_deal_data_dict.get(code) if data and tool.trade_time_sub_with_ms(latest_time_with_ms, data[1]) <= 0: return data, cls.TYPE_ACTIVE return None @classmethod def is_can_place_order(cls, code, buy_data): """ 是否可以下单 @param code: 代码 @param buy_data: L2逐笔委托大于50w的买入数据 @return: (是否可以下单, 原因) """ buy_data_time_with_ms = tool.to_time_with_ms(buy_data['val']['time'], buy_data['val']['tms']) single = cls.get_valid_trade_single(code, buy_data_time_with_ms) if not single: return False, "没有找到可用信号" if single[0][0][6] < buy_data['val']['orderNo']: # 信号位置的成交买单号要小于买入数据的买单号才行 return True, f"找到信号:{single}" else: return False, f"买单没有在信号位之后 信号:{single}" @classmethod def clear_data(cls, code): if code in cls.__latest_sell_data_dict: cls.__latest_sell_data_dict.pop(code) if code in cls.__latest_sell_active_deal_data_dict: cls.__latest_sell_active_deal_data_dict.pop(code) L2TradeSingleDataProcessor.clear_passive_sell_data(code)