From 3a87b1c89a76d858e8e7e4e54ff360dc0b8670f5 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期四, 11 九月 2025 16:27:20 +0800 Subject: [PATCH] L撤删除L前撤单与L后后半段撤单 --- huaxin_client/l2_data_manager.py | 354 +++++++++++++++++++++++++++++++++++++++++----------------- 1 files changed, 250 insertions(+), 104 deletions(-) diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index de05165..f47415a 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -1,20 +1,25 @@ # -*- coding: utf-8 -*- import json import logging -import multiprocessing +import marshal import queue import threading import time + +import constant from huaxin_client import socket_util from huaxin_client.client_network import SendResponseSkManager # 娲诲姩鏃堕棿 -from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager +from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager from log_module import async_log_util -from log_module.log import logger_local_huaxin_l2_error, logger_system +from log_module.async_log_util import huaxin_l2_log +from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \ + logger_local_huaxin_l2_special_volume, logger_debug, logger_local_huaxin_l2_orderdetail from utils import tool import collections +import zmq order_detail_upload_active_time_dict = {} transaction_upload_active_time_dict = {} @@ -23,30 +28,37 @@ tmep_transaction_queue_dict = {} target_codes = set() target_codes_add_time = {} -common_queue = queue.Queue() +common_queue = queue.Queue(maxsize=1000) # L2涓婁紶鏁版嵁绠$悊鍣� class L2DataUploadManager: - def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, - transaction_queue_distribute_manager: CodeQueueDistributeManager, - market_data_queue: multiprocessing.Queue): - self.order_queue_distribute_manager = order_queue_distribute_manager - self.transaction_queue_distribute_manager = transaction_queue_distribute_manager - self.market_data_queue = market_data_queue + def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager): + self.data_callback_distribute_manager = data_callback_distribute_manager + # 浠g爜鍒嗛厤鐨勫璞� self.temp_order_queue_dict = {} self.temp_transaction_queue_dict = {} + self.temp_log_queue_dict = {} + self.filter_order_condition_dict = {} + self.upload_l2_data_task_dict = {} + self.l2_order_codes = set() + self.l2_transaction_codes = set() # 璁剧疆璁㈠崟杩囨护鏉′欢 - def set_order_fileter_condition(self, code, min_volume, limit_up_price, special_volumes=None, - special_volumes_expire_time=None): - if special_volumes is None: + # special_price:杩囨护鐨�1鎵嬬殑浠锋牸 + def set_order_fileter_condition(self, code, min_volume, limit_up_price, shadow_price, buy_volume, special_volumes): + if not special_volumes: special_volumes = set() - if code in self.filter_order_condition_dict and not special_volumes and not special_volumes_expire_time: - self.filter_order_condition_dict[code][0] = (min_volume, limit_up_price) - else: - self.filter_order_condition_dict[code] = [(min_volume, limit_up_price), special_volumes, special_volumes_expire_time] + # if code not in self.filter_order_condition_dict: + try: + # (鏈�灏忕殑閲�, 娑ㄥ仠浠锋牸, 褰卞瓙鍗曚环鏍�, 涔扮殑閲�, 搴熷純浣跨敤, 鐗规畩鐨勯噺闆嗗悎) + self.filter_order_condition_dict[code] = [(min_volume, limit_up_price, shadow_price, buy_volume, + int(min_volume) // 50, set(special_volumes))] + # huaxin_l2_log.info(logger_local_huaxin_l2_subscript, + # f"({code})甯歌杩囨护鏉′欢璁剧疆锛歿self.filter_order_condition_dict[code]}") + except Exception as e: + logger_debug.error(f"{str(e)} - min_volume-{min_volume}") # 杩囨护璁㈠崟 def __filter_order(self, item): @@ -55,16 +67,25 @@ # item[2]涓洪噺 if item[2] >= filter_condition[0][0]: return item - if filter_condition[1] and item[2] in filter_condition[1]: - if filter_condition[2] and time.time() > filter_condition[2]: - # 瓒呮椂浜嗭紝闇�瑕佹竻闄ょ壒娈婇噺鏁版嵁 - filter_condition[1] = set() - filter_condition[2] = None - return None + # 1鎵嬬殑涔板崟婊¤冻浠锋牸 + # if item[2] == 100 and abs(filter_condition[0][2] - item[1]) < 0.001: + # return item + # 涔伴噺 + if item[2] == filter_condition[0][3]: return item + + # 鎵�鏈夌殑娑ㄥ仠鍗� + if item[3] != '1': + # 鍗栦笌鍗栨挙 + if abs(item[1] - filter_condition[0][1]) < 0.001: + # 娑ㄥ仠浠� + return item + else: + if item[2] in filter_condition[0][5]: + # 鐗规畩鎵嬫暟 + return item return None return item - # 杩囨护璁㈠崟 def __filter_transaction(self, item): @@ -86,10 +107,16 @@ # queue_info[1].put_nowait( # (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], # data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) + # if data['Volume'] == 100: + # log_queue = self.temp_log_queue_dict.get(code) + # if log_queue: + # log_queue.put_nowait(data) q: collections.deque = self.temp_order_queue_dict.get(code) - q.append((data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], - data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) + if q is not None: + q.append( + (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], + data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) # 娣诲姞閫愮瑪鎴愪氦 def add_transaction_detail(self, data): @@ -104,92 +131,205 @@ # data['SellNo'], data['ExecType'])) q: collections.deque = self.temp_transaction_queue_dict.get(code) - q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], - data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], - data['SellNo'], data['ExecType'])) + if q is not None: + q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], + data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], + data['SellNo'], data['ExecType'], time.time())) def add_market_data(self, data): # 鍔犲叆涓婁紶闃熷垪 - self.market_data_queue.put_nowait(data) + # self.market_data_queue.put_nowait(data) + code = data['securityID'] + callback = self.data_callback_distribute_manager.get_distributed_callback(code) + if callback: + callback.OnMarketData(code, data) # 鍒嗛厤涓婁紶闃熷垪 - def distribute_upload_queue(self, code): - self.order_queue_distribute_manager.distribute_queue(code) - self.transaction_queue_distribute_manager.distribute_queue(code) - self.temp_order_queue_dict[code] = collections.deque() - self.temp_transaction_queue_dict[code] = collections.deque() - threading.Thread(target=lambda: self.__run_upload_order_task(code)).start() - threading.Thread(target=lambda: self.__run_upload_transaction_task(code)).start() + def distribute_upload_queue(self, code, _target_codes=None): + """ + 鍒嗛厤涓婁紶闃熷垪 + @param code: 浠g爜 + @param _target_codes: 鎵�鏈夌殑鐩爣浠g爜 + @return: + """ + if not self.data_callback_distribute_manager.get_distributed_callback(code): + self.data_callback_distribute_manager.distribute_callback(code, _target_codes) - def __upload_l2_data(self, code, _queue, datas): - _queue.put_nowait((code, datas, time.time())) + if code not in self.temp_order_queue_dict: + self.temp_order_queue_dict[code] = collections.deque() + if code not in self.temp_transaction_queue_dict: + self.temp_transaction_queue_dict[code] = collections.deque() + if code not in self.temp_log_queue_dict: + self.temp_log_queue_dict[code] = queue.Queue(maxsize=1000) + if code not in self.upload_l2_data_task_dict: + t1 = threading.Thread(target=lambda: self.__run_upload_order_task(code), daemon=True) + t1.start() + t2 = threading.Thread(target=lambda: self.__run_upload_transaction_task(code), daemon=True) + t2.start() + # t3 = threading.Thread(target=lambda: self.__run_log_task(code), daemon=True) + # t3.start() + self.upload_l2_data_task_dict[code] = (t1, t2) + # 閲婃斁宸茬粡鍒嗛厤鐨勯槦鍒� - # 澶勭悊璁㈠崟鏁版嵁骞朵笂浼� - def __run_upload_order_task(self, code): - q: collections.deque = self.temp_order_queue_dict.get(code) - temp_list = [] - queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) - upload_queue = queue_info[1] - while True: - try: - if not q: - data = q.popleft() - # 鍓嶇疆鏁版嵁澶勭悊锛岃繃婊ゆ帀鏃犵敤鐨勬暟鎹� - data = self.__filter_order(data) - if data: - temp_list.append(data) - else: - if temp_list: - # 涓婁紶鏁版嵁 - self.__upload_l2_data(code, upload_queue, temp_list) - temp_list.clear() - else: - if code not in self.temp_order_queue_dict: - break - time.sleep(0.001) - - except: - pass - finally: - pass - - # 澶勭悊鎴愪氦鏁版嵁骞朵笂浼� - def __run_upload_transaction_task(self, code): - q: collections.deque = self.temp_transaction_queue_dict.get(code) - queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) - upload_queue = queue_info[1] - temp_list = [] - while True: - try: - if not q: - data = q.popleft() - data = self.__filter_transaction(data) - if data: - temp_list.append(data) - else: - if temp_list: - # 涓婁紶鏁版嵁 - self.__upload_l2_data(code, upload_queue, temp_list) - temp_list.clear() - else: - if code not in self.temp_transaction_queue_dict: - break - time.sleep(0.002) - except: - pass - finally: - pass - - # 閲婃斁宸茬粡鍒嗛厤鐨勯槦鍒� def release_distributed_upload_queue(self, code): - self.order_queue_distribute_manager.release_distribute_queue(code) - self.transaction_queue_distribute_manager.release_distribute_queue(code) + self.data_callback_distribute_manager.release_distribute_callback(code) if code in self.temp_order_queue_dict: self.temp_order_queue_dict[code].clear() self.temp_order_queue_dict.pop(code) if code in self.temp_transaction_queue_dict: self.temp_transaction_queue_dict[code].clear() self.temp_transaction_queue_dict.pop(code) + if code in self.temp_log_queue_dict: + self.temp_log_queue_dict.pop(code) + + if code in self.upload_l2_data_task_dict: + self.upload_l2_data_task_dict.pop(code) + + def __upload_l2_data(self, code, _queue, datas): + _queue.put_nowait(marshal.dumps([code, datas, time.time()])) + + # 澶勭悊璁㈠崟鏁版嵁骞朵笂浼� + def __run_upload_order_task(self, code): + q: collections.deque = self.temp_order_queue_dict.get(code) + temp_list = [] + filter_condition = self.filter_order_condition_dict.get(code) + while True: + try: + while len(q) > 0: + data = q.popleft() + # 鍓嶇疆鏁版嵁澶勭悊锛岃繃婊ゆ帀鏃犵敤鐨勬暟鎹� + data = self.__filter_order(data) + if data: + temp_list.append(data) + + if temp_list: + # 涓婁紶鏁版嵁 + # self.__upload_l2_data(code, upload_queue, temp_list) + # self.__upload_l2_order_data(code, temp_list) + __start_time = time.time() + last_data = temp_list[-1] + self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list, + time.time()) + use_time = time.time() - __start_time + if use_time > 0.01: + # 璁板綍10ms浠ヤ笂鐨勬暟鎹� + huaxin_l2_log.info(logger_local_huaxin_l2_error, f"鑰楁椂:{use_time}s 缁撴潫鏁版嵁锛歿last_data}") + + # 璁板綍鎵�鏈夌殑璁㈠崟鍙� + if filter_condition: + huaxin_l2_log.info(logger_local_huaxin_l2_orderdetail, + f"{[(x[0], x[1], x[2], x[4], x[8]) for x in temp_list if x[2] >= filter_condition[0][0]]}") + temp_list.clear() + else: + if code not in self.temp_order_queue_dict: + self.l2_order_codes.discard(code) + break + self.l2_order_codes.add(code) + time.sleep(0.001) + + except Exception as e: + logging.exception(e) + finally: + temp_list.clear() + + # 澶勭悊鎴愪氦鏁版嵁骞朵笂浼� + def __run_upload_transaction_task(self, code): + q: collections.deque = self.temp_transaction_queue_dict.get(code) + temp_list = [] + while True: + try: + while len(q) > 0: + data = q.popleft() + data = self.__filter_transaction(data) + if data: + temp_list.append(data) + if temp_list: + # 涓婁紶鏁版嵁 + # self.__upload_l2_data(code, upload_queue, temp_list) + self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code, + temp_list) + temp_list = [] + else: + if code not in self.temp_transaction_queue_dict: + self.l2_transaction_codes.discard(code) + break + self.l2_transaction_codes.add(code) + time.sleep(0.001) + except: + pass + finally: + temp_list.clear() + + def __run_log_task(self, code): + q: queue.Queue = self.temp_log_queue_dict.get(code) + while True: + try: + temp = q.get(timeout=10) + huaxin_l2_log.info(logger_local_huaxin_l2_special_volume, + f"{temp}") + except: + time.sleep(0.02) + finally: + if code not in self.temp_log_queue_dict: + break + + +class L2DataUploadProtocolManager: + + # ipchosts IPC鍗忚 + def __init__(self, ipchosts): + self.ipchosts = ipchosts + # 鎵�鏈夌殑client + self.socket_client_dict = {} + # 淇濆瓨浠g爜鍒嗛厤鐨刢lient 鏍煎紡锛歿code:(host, socket)} + self.code_socket_client_dict = {} + self.rlock = threading.RLock() + context = zmq.Context() + if constant.is_windows(): + return + for host in self.ipchosts: + socket = context.socket(zmq.REQ) + socket.connect(host) + self.socket_client_dict[host] = socket + + # 鑾峰彇 + def __get_available_ipchost(self): + if len(self.code_socket_client_dict) >= len(self.socket_client_dict): + raise Exception("鏃犲彲鐢╤ost") + used_hosts = set([self.code_socket_client_dict[k][0] for k in self.code_socket_client_dict]) + for host in self.socket_client_dict: + if host not in used_hosts: + return host, self.socket_client_dict[host] + raise Exception("鏃犲彲鐢╤ost") + + # 鍒嗛厤HOST + def distribute_upload_host(self, code): + if code in self.code_socket_client_dict: + return + self.rlock.acquire() + try: + host_info = self.__get_available_ipchost() + if host_info: + self.code_socket_client_dict[code] = host_info + finally: + self.rlock.release() + + def release_distributed_upload_host(self, code): + if code not in self.code_socket_client_dict: + return + self.rlock.acquire() + try: + if code in self.code_socket_client_dict: + self.code_socket_client_dict.pop(code) + finally: + self.rlock.release() + + def upload_data_as_json(self, code, data): + if code not in self.code_socket_client_dict: + raise Exception("灏氭湭鍒嗛厤host") + host, socket = self.code_socket_client_dict[code] + socket.send(marshal.dumps(data)) + socket.recv_string() def add_target_code(code): @@ -205,7 +345,7 @@ def add_subscript_codes(codes): - print("add_subscript_codes", codes) + # print("add_subscript_codes", codes) # 鍔犲叆涓婁紶闃熷垪 common_queue.put(('', "l2_subscript_codes", list(codes))) @@ -229,7 +369,7 @@ return True else: # 鍐嶆鍙戦�� - print("鍐嶆鍙戦��") + # print("鍐嶆鍙戦��") return __send_response(sk, msg) except ConnectionResetError as e: SendResponseSkManager.del_send_response_sk(type) @@ -260,7 +400,7 @@ def __run_upload_common(): - print("__run_upload_common") + # print("__run_upload_common") logger_system.info(f"l2_client __run_upload_common 绾跨▼ID:{tool.get_thread_id()}") while True: try: @@ -276,7 +416,7 @@ def __run_log(): - print("__run_log") + # print("__run_log") logger_system.info(f"l2_client __run_log 绾跨▼ID:{tool.get_thread_id()}") async_log_util.huaxin_l2_log.run_sync() @@ -293,7 +433,7 @@ def __test(): - code = "002073" + # 鍒嗛厤鏁版嵁 pass @@ -303,4 +443,10 @@ def test(): - pass + ipclist = [] + for i in range(0, 70): + ipclist.append(f"ipc://l2order{i}.ipc") + manager = L2DataUploadProtocolManager(ipclist) + code = "000333" + manager.distribute_upload_host(code) + manager.upload_data_as_json(code, {"test": "test"}) -- Gitblit v1.8.0