From 6bbfbbb16d792f7737ec86cabdba5c0e98dcf4b4 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 29 八月 2025 17:41:29 +0800 Subject: [PATCH] 有涨停买撤单要触发撤单计算 --- huaxin_client/l2_data_manager.py | 63 ++++++++++++++++++++----------- 1 files changed, 41 insertions(+), 22 deletions(-) diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index e62e624..f47415a 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -16,7 +16,7 @@ from log_module import async_log_util from log_module.async_log_util import huaxin_l2_log from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \ - logger_local_huaxin_l2_special_volume + logger_local_huaxin_l2_special_volume, logger_debug, logger_local_huaxin_l2_orderdetail from utils import tool import collections import zmq @@ -28,7 +28,7 @@ tmep_transaction_queue_dict = {} target_codes = set() target_codes_add_time = {} -common_queue = queue.Queue() +common_queue = queue.Queue(maxsize=1000) # L2涓婁紶鏁版嵁绠$悊鍣� @@ -47,12 +47,18 @@ # 璁剧疆璁㈠崟杩囨护鏉′欢 # special_price:杩囨护鐨�1鎵嬬殑浠锋牸 - def set_order_fileter_condition(self, code, min_volume, limit_up_price, shadow_price, buy_volume): - if code not in self.filter_order_condition_dict: + def set_order_fileter_condition(self, code, min_volume, limit_up_price, shadow_price, buy_volume, special_volumes): + if not special_volumes: + special_volumes = set() + # if code not in self.filter_order_condition_dict: + try: + # (鏈�灏忕殑閲�, 娑ㄥ仠浠锋牸, 褰卞瓙鍗曚环鏍�, 涔扮殑閲�, 搴熷純浣跨敤, 鐗规畩鐨勯噺闆嗗悎) self.filter_order_condition_dict[code] = [(min_volume, limit_up_price, shadow_price, buy_volume, - min_volume // 50)] - huaxin_l2_log.info(logger_local_huaxin_l2_subscript, - f"({code})甯歌杩囨护鏉′欢璁剧疆锛歿self.filter_order_condition_dict[code]}") + int(min_volume) // 50, set(special_volumes))] + # huaxin_l2_log.info(logger_local_huaxin_l2_subscript, + # f"({code})甯歌杩囨护鏉′欢璁剧疆锛歿self.filter_order_condition_dict[code]}") + except Exception as e: + logger_debug.error(f"{str(e)} - min_volume-{min_volume}") # 杩囨护璁㈠崟 def __filter_order(self, item): @@ -62,16 +68,22 @@ if item[2] >= filter_condition[0][0]: return item # 1鎵嬬殑涔板崟婊¤冻浠锋牸 - if item[2] == 100 and abs(filter_condition[0][2] - item[1]) < 0.001: - return item + # if item[2] == 100 and abs(filter_condition[0][2] - item[1]) < 0.001: + # return item # 涔伴噺 if item[2] == filter_condition[0][3]: return item - # 鍗栧ぇ浜�2w涓旀槸娑ㄥ仠鍗� - if item[3] != '1' and item[2] > filter_condition[0][4] and item[1] == filter_condition[0][1]: - return item - + # 鎵�鏈夌殑娑ㄥ仠鍗� + if item[3] != '1': + # 鍗栦笌鍗栨挙 + if abs(item[1] - filter_condition[0][1]) < 0.001: + # 娑ㄥ仠浠� + return item + else: + if item[2] in filter_condition[0][5]: + # 鐗规畩鎵嬫暟 + return item return None return item # 杩囨护璁㈠崟 @@ -102,8 +114,9 @@ q: collections.deque = self.temp_order_queue_dict.get(code) if q is not None: - q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], - data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) + q.append( + (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], + data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) # 娣诲姞閫愮瑪鎴愪氦 def add_transaction_detail(self, data): @@ -121,7 +134,7 @@ if q is not None: q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], - data['SellNo'], data['ExecType'])) + data['SellNo'], data['ExecType'], time.time())) def add_market_data(self, data): # 鍔犲叆涓婁紶闃熷垪 @@ -147,7 +160,7 @@ if code not in self.temp_transaction_queue_dict: self.temp_transaction_queue_dict[code] = collections.deque() if code not in self.temp_log_queue_dict: - self.temp_log_queue_dict[code] = queue.Queue() + self.temp_log_queue_dict[code] = queue.Queue(maxsize=1000) if code not in self.upload_l2_data_task_dict: t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) t1.start() @@ -179,6 +192,7 @@ def __run_upload_order_task(self, code): q: collections.deque = self.temp_order_queue_dict.get(code) temp_list = [] + filter_condition = self.filter_order_condition_dict.get(code) while True: try: while len(q) > 0: @@ -200,7 +214,12 @@ if use_time > 0.01: # 璁板綍10ms浠ヤ笂鐨勬暟鎹� huaxin_l2_log.info(logger_local_huaxin_l2_error, f"鑰楁椂:{use_time}s 缁撴潫鏁版嵁锛歿last_data}") - temp_list = [] + + # 璁板綍鎵�鏈夌殑璁㈠崟鍙� + if filter_condition: + huaxin_l2_log.info(logger_local_huaxin_l2_orderdetail, + f"{[(x[0], x[1], x[2], x[4], x[8]) for x in temp_list if x[2] >= filter_condition[0][0]]}") + temp_list.clear() else: if code not in self.temp_order_queue_dict: self.l2_order_codes.discard(code) @@ -326,7 +345,7 @@ def add_subscript_codes(codes): - print("add_subscript_codes", codes) + # print("add_subscript_codes", codes) # 鍔犲叆涓婁紶闃熷垪 common_queue.put(('', "l2_subscript_codes", list(codes))) @@ -350,7 +369,7 @@ return True else: # 鍐嶆鍙戦�� - print("鍐嶆鍙戦��") + # print("鍐嶆鍙戦��") return __send_response(sk, msg) except ConnectionResetError as e: SendResponseSkManager.del_send_response_sk(type) @@ -381,7 +400,7 @@ def __run_upload_common(): - print("__run_upload_common") + # print("__run_upload_common") logger_system.info(f"l2_client __run_upload_common 绾跨▼ID:{tool.get_thread_id()}") while True: try: @@ -397,7 +416,7 @@ def __run_log(): - print("__run_log") + # print("__run_log") logger_system.info(f"l2_client __run_log 绾跨▼ID:{tool.get_thread_id()}") async_log_util.huaxin_l2_log.run_sync() -- Gitblit v1.8.0