# -*- coding: utf-8 -*- import logging import marshal import queue import threading import time from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager from log_module.async_log_util import huaxin_l2_log import collections from log_module.log import logger_local_huaxin_contact_debug order_detail_upload_active_time_dict = {} transaction_upload_active_time_dict = {} # 临时数据 tmep_order_detail_queue_dict = {} tmep_transaction_queue_dict = {} target_codes = set() target_codes_add_time = {} common_queue = queue.Queue() # L2上传数据管理器 class L2DataUploadManager: def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager): self.data_callback_distribute_manager = data_callback_distribute_manager # 代码分配的对象 self.temp_order_queue_dict = {} self.temp_transaction_queue_dict = {} self.temp_log_queue_dict = {} self.filter_order_condition_dict = {} self.upload_l2_data_task_dict = {} self.l2_order_codes = set() self.l2_transaction_codes = set() self.__real_time_buy1_data = {} # 过滤订单 def __filter_order(self, item): if item[1] * item[2] < 500000: return None return item # 过滤订单 def __filter_transaction(self, item): return item # 添加委托详情 def add_l2_order_detail(self, data, start_time=0, istransaction=False): code = data["SecurityID"] if code in self.__real_time_buy1_data: if self.__real_time_buy1_data[code][1] == data["Price"]: # 与买的价格一致 if data["Side"] == '1': if data["OrderStatus"] == 'D': # 买撤 self.__real_time_buy1_data[code][3] -= data["Volume"] else: # 买 self.__real_time_buy1_data[code][3] += data["Volume"] q: collections.deque = self.temp_order_queue_dict.get(code) q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) # 添加逐笔成交 def add_transaction_detail(self, data): code = data["SecurityID"] if code in self.__real_time_buy1_data: if self.__real_time_buy1_data[code][1] == data["TradePrice"]: # 与买的价格一致 self.__real_time_buy1_data[code][3] -= data["TradeVolume"] q: collections.deque = self.temp_transaction_queue_dict.get(code) q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], data['SellNo'], data['ExecType'])) def add_market_data(self, data): code = data["securityID"] # [时间,买1价格,原始买1量,计算后的买1量]} self.__real_time_buy1_data[code] = [data["dataTimeStamp"], data["buy"][0][0], data["buy"][0][1], data["buy"][0][1]] self.data_callback_distribute_manager.get_distributed_callback(code).OnMarketData(code, [data]) # 分配上传队列 def distribute_upload_queue(self, code): if not self.data_callback_distribute_manager.get_distributed_callback(code): self.data_callback_distribute_manager.distribute_callback(code) if code not in self.temp_order_queue_dict: self.temp_order_queue_dict[code] = collections.deque() if code not in self.temp_transaction_queue_dict: self.temp_transaction_queue_dict[code] = collections.deque() if code not in self.temp_log_queue_dict: self.temp_log_queue_dict[code] = queue.Queue() if code not in self.upload_l2_data_task_dict: t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) t1.start() t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True) t2.start() t3 = threading.Thread(target=lambda: self.__run_upload_real_time_buy1_task(code), daemon=True) t3.start() self.upload_l2_data_task_dict[code] = (t1, t2, t3) # 释放已经分配的队列 def release_distributed_upload_queue(self, code): self.data_callback_distribute_manager.release_distribute_callback(code) if code in self.temp_order_queue_dict: self.temp_order_queue_dict[code].clear() self.temp_order_queue_dict.pop(code) if code in self.temp_transaction_queue_dict: self.temp_transaction_queue_dict[code].clear() self.temp_transaction_queue_dict.pop(code) if code in self.temp_log_queue_dict: self.temp_log_queue_dict.pop(code) if code in self.upload_l2_data_task_dict: self.upload_l2_data_task_dict.pop(code) def __upload_l2_data(self, code, _queue, datas): _queue.put_nowait(marshal.dumps([code, datas, time.time()])) # 处理订单数据并上传 def __run_upload_order_task(self, code): q: collections.deque = self.temp_order_queue_dict.get(code) temp_list = [] while True: try: while len(q) > 0: data = q.popleft() # 前置数据处理,过滤掉无用的数据 data = self.__filter_order(data) if data: temp_list.append(data) if temp_list: # 上传数据 # self.__upload_l2_data(code, upload_queue, temp_list) # self.__upload_l2_order_data(code, temp_list) __start_time = time.time() last_data = temp_list[-1] self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list, time.time()) use_time = time.time() - __start_time if use_time > 0.01: # 记录10ms以上的数据 huaxin_l2_log.info(logger_local_huaxin_contact_debug, f"耗时:{use_time}s 结束数据:{last_data}") temp_list = [] else: if code not in self.temp_order_queue_dict: self.l2_order_codes.discard(code) break self.l2_order_codes.add(code) time.sleep(0.001) except Exception as e: logging.exception(e) finally: pass # 处理成交数据并上传 def __run_upload_transaction_task(self, code): q: collections.deque = self.temp_transaction_queue_dict.get(code) temp_list = [] while True: try: while len(q) > 0: data = q.popleft() data = self.__filter_transaction(data) if data: temp_list.append(data) if temp_list: # 上传数据 # self.__upload_l2_data(code, upload_queue, temp_list) self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code, temp_list) temp_list = [] else: if code not in self.temp_transaction_queue_dict: self.l2_transaction_codes.discard(code) break self.l2_transaction_codes.add(code) time.sleep(0.001) except: pass finally: pass # 处理实时买1数据 def __run_upload_real_time_buy1_task(self, code): while True: try: if code in self.__real_time_buy1_data: data = self.__real_time_buy1_data[code] # 如果最新的买1是原来买1的1/2时开始上传 if data[2] > 0 and data[3] / data[2] <= 0.5: self.data_callback_distribute_manager.get_distributed_callback(code).OnRealTimeBuy1Info(code, data) except: pass finally: time.sleep(0.1) def add_target_code(code): target_codes.add(code) # 记录代码加入时间 target_codes_add_time[code] = time.time() def del_target_code(code): target_codes.discard(code) if code in target_codes_add_time: target_codes_add_time.pop(code) def add_subscript_codes(codes): pass if __name__ == "__main__": add_subscript_codes(["000333"])