""" 下单信号管理 """ import time from l2 import l2_log from l2.huaxin import l2_huaxin_util from log_module.log import logger_l2_trade_buy, logger_debug from utils import tool class L2TradeSingleDataProcessor: """ L2逐笔成交数据信号管理 """ __latest_sell_data = {} __latest_limit_up_sell_list_dict = {} __latest_limit_up_sell_order_no_set_dict = {} # 主动卖订单号集合 __active_sell_order_no_set_dict = {} @classmethod def add_l2_delegate_limit_up_sell(cls, code, data): """ 添加涨停卖单数据,当前不是涨停成交时才加 @param code: @param data: @return: """ if code not in cls.__latest_limit_up_sell_list_dict: cls.__latest_limit_up_sell_list_dict[code] = [] cls.__latest_limit_up_sell_list_dict[code].append(data) if code not in cls.__latest_limit_up_sell_order_no_set_dict: cls.__latest_limit_up_sell_order_no_set_dict[code] = set() cls.__latest_limit_up_sell_order_no_set_dict[code].add(data['val']['orderNo']) # 只保留前20的数据 if len(cls.__latest_limit_up_sell_list_dict[code]) > 20: cls.__latest_limit_up_sell_list_dict[code] = cls.__latest_limit_up_sell_list_dict[code][-20:] # 删除之前的map for d in cls.__latest_limit_up_sell_list_dict[code][0:-20]: cls.__latest_limit_up_sell_order_no_set_dict[code].discard(d["val"]["orderNo"]) @classmethod def add_l2_delegate_limit_up_sell_cancel(cls, code, order_no): """ 涨停卖撤 @param code: @param order_no: @return: """ if code not in cls.__latest_limit_up_sell_order_no_set_dict: return order_no_set = cls.__latest_limit_up_sell_order_no_set_dict[code] if order_no not in order_no_set: return cls.__latest_limit_up_sell_order_no_set_dict[code].discard(order_no) for i in range(0, len(cls.__latest_limit_up_sell_list_dict[code])): if cls.__latest_limit_up_sell_list_dict[code][i]['val']['orderNo'] == order_no: cls.__latest_limit_up_sell_list_dict[code].pop(i) break @classmethod def get_latest_limit_up_sell_order_count(cls, code): sell_list = cls.__latest_limit_up_sell_list_dict.get(code) if not sell_list: return 0 return len(sell_list) @classmethod def process_passive_limit_up_sell_data(cls, code, datas, limit_up_price): """ 添加涨停被动卖成交数据 @param data: 数据格式:(data['SecurityID'], data['TradePrice'], data['TradeVolume'], # data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], # data['SellNo'], data['ExecType']) @return: """ try: start_time = time.time() sell_list = cls.__latest_limit_up_sell_list_dict.get(code) if not sell_list: return sell_info = sell_list[-1] for data in datas: if data[6] < data[7] or data[1] != limit_up_price: # 排除主动卖/非涨停卖 continue sell_no = data[7] if sell_no != sell_info['val']['orderNo']: continue # 需要判断当前单是否已经成交完成 if code not in cls.__latest_sell_data: cls.__latest_sell_data[code] = [sell_no, data[2]] else: if cls.__latest_sell_data[code][0] == sell_no: cls.__latest_sell_data[code][1] += data[2] else: cls.__latest_sell_data[code] = [sell_no, data[2]] sell_info_num = sell_info['val']['num'] deal_num = cls.__latest_sell_data[code][1] // 100 if sell_info_num == deal_num: use_time = round((time.time() - start_time) * 1000, 3) l2_log.info(code, logger_l2_trade_buy, f"找到最近的被动涨停卖单数据:{sell_info['val']['orderNo']}, 成交数据:{data} 计算耗时:{use_time}ms, 可以触发下单") # 成交完成 L2TradeSingleDataManager.set_latest_sell_data(code, data) l2_log.info(code, logger_l2_trade_buy, "被动卖数据处理完毕") break # l2_log.info(code, logger_l2_trade_buy, f"找到最近的被动涨停卖单数据:{data['val']['orderNo']}, 可以触发下单") except Exception as e: logger_debug.exception(e) @classmethod def add_active_limit_up_sell_data(cls, data): """ 添加主动卖数据 @param data: @return: """ try: code = data[0] sell_no = data[7] if code not in cls.__active_sell_order_no_set_dict: cls.__active_sell_order_no_set_dict[code] = set() cls.__active_sell_order_no_set_dict[code].add(sell_no) except Exception as e: logger_debug.exception(e) class L2TradeSingleCallback: """ 交易信号回调 """ def OnTradeSingle(self, code, _type, data): """ 交易数据信号回调 @param code: @param _type: 类型:0-TYPE_PASSIVE 1-TYPE_ACTIVE @param data: (逐笔成交数据,生效时间) @return: """ class L2TradeSingleDataManager: __callback = None TYPE_PASSIVE = 0 TYPE_ACTIVE = 1 """ 买入信号管理 """ # 最近的涨停卖被动成交数据 __latest_sell_data_dict = {} # 格式为:{code:(逐笔成交数据,生效时间(带ms))} # 由被动向主动卖成交转变的数据 __latest_sell_active_deal_data_dict = {} # 格式为:{code:(逐笔成交数据,生效时间(带ms))} @classmethod def set_callback(cls, callback: L2TradeSingleCallback): cls.__callback = callback @classmethod def set_latest_sell_data(cls, code, data): """ 设置最近成交的涨停卖被动成交数据 @param code: 代码 @param data: L2逐笔成交数据 数据格式:(data['SecurityID'], data['TradePrice'], data['TradeVolume'], # data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], # data['SellNo'], data['ExecType']) @return: """ deal_time = l2_huaxin_util.convert_time(data[3], True) # 生效时间在1s以内 cls.__latest_sell_data_dict[code] = (data, tool.trade_time_add_millionsecond(deal_time, 1000)) if cls.__callback: cls.__callback.OnTradeSingle(code, cls.TYPE_PASSIVE, cls.__latest_sell_data_dict[code]) @classmethod def set_sell_passive_to_active_datas(cls, code, passive_data, active_data): """ 设置被动卖成交向主动卖成交的转变数据 @param code: 代码 @param passive_data: 被动卖成交逐笔 @param active_data: 主动卖成交逐笔 @return: """ l2_log.info(code, logger_l2_trade_buy, f"被动卖变主动卖:{passive_data} => {active_data}") deal_time = l2_huaxin_util.convert_time(passive_data[3], True) # 生效时间在1s以内 cls.__latest_sell_active_deal_data_dict[code] = ( active_data, tool.trade_time_add_millionsecond(deal_time, 1000)) if cls.__callback: cls.__callback.OnTradeSingle(code, cls.TYPE_ACTIVE, cls.__latest_sell_active_deal_data_dict[code]) @classmethod def get_valid_trade_single(cls, code, latest_time_with_ms): """ 获取有效的成交下单信号 @param code: @param latest_time_with_ms: @return: (逐笔成交数据, 类型) """ # 如果有最近卖涨停成交完就用最近的数据 data = cls.__latest_sell_data_dict.get(code) if data and tool.trade_time_sub_with_ms(latest_time_with_ms, data[1]) <= 0: return data, cls.TYPE_PASSIVE data = cls.__latest_sell_active_deal_data_dict.get(code) if data and tool.trade_time_sub_with_ms(latest_time_with_ms, data[1]) <= 0: return data, cls.TYPE_ACTIVE return None @classmethod def is_can_place_order(cls, code, buy_data): """ 是否可以下单 @param code: 代码 @param buy_data: L2逐笔委托大于50w的买入数据 @return: (是否可以下单, 原因) """ buy_data_time_with_ms = tool.to_time_with_ms(buy_data['val']['time'], buy_data['val']['tms']) single = cls.get_valid_trade_single(code, buy_data_time_with_ms) if not single: return False, "没有找到可用信号" if single[0][0][6] < buy_data['val']['orderNo']: # 信号位置的成交买单号要小于买入数据的买单号才行 return True, f"找到信号:{single}" else: return False, f"买单没有在信号位之后 信号:{single}" @classmethod def clear_data(cls, code): if code in cls.__latest_sell_data_dict: cls.__latest_sell_data_dict.pop(code) if code in cls.__latest_sell_active_deal_data_dict: cls.__latest_sell_active_deal_data_dict.pop(code)