From eb33b717023d9871bd74e6dce47a065228cffefc Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期四, 02 十一月 2023 11:23:09 +0800 Subject: [PATCH] L2进程与策略进程分开 --- huaxin_client/l2_data_manager.py | 315 +++++++++------------------------------------------ 1 files changed, 59 insertions(+), 256 deletions(-) diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index 9a4ae30..0c00579 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -1,19 +1,18 @@ # -*- coding: utf-8 -*- import json import logging +import multiprocessing import queue -import random import threading import time -from huaxin_client import socket_util, l2_data_transform_protocol +from huaxin_client import socket_util from huaxin_client.client_network import SendResponseSkManager # 娲诲姩鏃堕棿 -from huaxin_client.l2_data_transform_protocol import L2DataCallBack -from log_module import log_export, async_log_util -from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \ - logger_local_huaxin_g_cancel, hx_logger_contact_debug, logger_system, logger_local_huaxin_l2_orderdetail +from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager +from log_module import async_log_util +from log_module.log import logger_local_huaxin_l2_error, logger_system from utils import tool order_detail_upload_active_time_dict = {} @@ -24,12 +23,51 @@ target_codes = set() target_codes_add_time = {} common_queue = queue.Queue() -trading_canceled_queue = queue.Queue() -log_buy_no_queue = queue.Queue() -# 涔板叆璁㈠崟鍙风殑瀛楀吀 -buy_order_nos_dict = {} -# 鏈�杩戠殑澶у崟鎴愪氦鍗曞彿 -latest_big_order_transaction_orders_dict = {} + + +# L2涓婁紶鏁版嵁绠$悊鍣� +class L2DataUploadManager: + def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, + transaction_queue_distribute_manager: CodeQueueDistributeManager, + market_data_queue: multiprocessing.Queue): + self.order_queue_distribute_manager = order_queue_distribute_manager + self.transaction_queue_distribute_manager = transaction_queue_distribute_manager + self.market_data_queue = market_data_queue + + # 娣诲姞濮旀墭璇︽儏 + def add_l2_order_detail(self, data, start_time, istransaction=False): + code = data["SecurityID"] + queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) + if not queue_info: + return + queue_info[1].put_nowait( + (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], + data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) + + # 娣诲姞閫愮瑪鎴愪氦 + def add_transaction_detail(self, data): + code = data["SecurityID"] + queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) + if not queue_info: + return + # 鍒ゆ柇鏄惁涓哄ぇ鍗曟垚浜� + queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'], + data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], + data['SellNo'], data['ExecType'])) + + def add_market_data(self, data): + # 鍔犲叆涓婁紶闃熷垪 + self.market_data_queue.put_nowait(data) + + # 鍒嗛厤涓婁紶闃熷垪 + def distribute_upload_queue(self, code): + self.order_queue_distribute_manager.distribute_queue(code) + self.transaction_queue_distribute_manager.distribute_queue(code) + + # 閲婃斁宸茬粡鍒嗛厤鐨勯槦鍒� + def release_distributed_upload_queue(self, code): + self.order_queue_distribute_manager.release_distribute_queue(code) + self.transaction_queue_distribute_manager.release_distribute_queue(code) def add_target_code(code): @@ -42,89 +80,6 @@ target_codes.discard(code) if code in target_codes_add_time: target_codes_add_time.pop(code) - - -# 鑾峰彇鏈�杩戠殑澶у崟鎴愪氦璁㈠崟鍙� -def get_latest_transaction_order_nos(code): - return latest_big_order_transaction_orders_dict.get(code) - - -# 姝e湪鎴愪氦鐨勮鍗曟挙鍗曚簡 -def trading_order_canceled(code_, order_no): - trading_canceled_queue.put((code_, order_no)) - - -# 娣诲姞濮旀墭璇︽儏 -def add_l2_order_detail(data, start_time, istransaction=False): - code = data["SecurityID"] - # 寮傛鏃ュ織璁板綍 - if code not in tmep_order_detail_queue_dict: - tmep_order_detail_queue_dict[code] = queue.Queue() - # 鍘熸潵鐨勬牸寮� - # {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'], - # "Volume": pOrderDetail['Volume'], - # "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(), - # "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'], - # "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], - # "OrderStatus": pOrderDetail['OrderStatus'].decode()} - - # 鐢ㄤ簬G鎾ょ殑鏁版嵁锛屾殏鏃舵敞閲� - # if data['Side'] == "1": - # # 璁板綍鎵�鏈変拱鍏ョ殑璁㈠崟鍙� - # if data['SecurityID'] not in buy_order_nos_dict: - # buy_order_nos_dict[data['SecurityID']] = set() - # buy_order_nos_dict[data['SecurityID']].add(data['OrderNO']) - # # 涔板叆璁㈠崟鍙烽渶瑕佽褰曟棩蹇� - # async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_buy_no, f"{data['SecurityID']}#{data['OrderNO']}") - - tmep_order_detail_queue_dict[code].put( - (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], - data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) - - -# 娣诲姞閫愮瑪鎴愪氦 -def add_transaction_detail(data): - code = data["SecurityID"] - if code not in tmep_transaction_queue_dict: - tmep_transaction_queue_dict[code] = queue.Queue() - # 鍘熸潵鐨勬牸寮� - # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], - # "TradeVolume": pTransaction['TradeVolume'], - # "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], - # "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'], - # "ExecType": pTransaction['ExecType'].decode()} - - # 鍒ゆ柇鏄惁涓哄ぇ鍗曟垚浜� - code = data['SecurityID'] - # G鎾ょ浉鍏虫暟鎹搷浣滄殏鏃舵敞閲� - # if code in buy_order_nos_dict: - # if data['BuyNo'] in buy_order_nos_dict[code]: - # try: - # temp_list = latest_big_order_transaction_orders_dict.get(code) - # if not temp_list: - # temp_list = [] - # if temp_list: - # if temp_list[-1] != data['BuyNo']: - # # 涓嶅姞鍏ラ噸澶嶈鍗曞彿 - # temp_list.append(data['BuyNo']) - # if len(temp_list) > 10: - # # 鏈�澶氬姞10涓鍗曞彿 - # temp_list = temp_list[-10:] - # else: - # temp_list.append(data['BuyNo']) - # latest_big_order_transaction_orders_dict[code] = temp_list - # except: - # pass - tmep_transaction_queue_dict[code].put((data['SecurityID'], data['TradePrice'], data['TradeVolume'], - data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], - data['SellNo'], data['ExecType'])) - - -def add_market_data(data): - code = data['securityID'] - # 鍔犲叆涓婁紶闃熷垪 - common_queue.put((code, "l2_market_data", data)) - def add_subscript_codes(codes): print("add_subscript_codes", codes) @@ -165,7 +120,6 @@ # 涓婁紶鏁版嵁 def upload_data(code, _type, datas, new_sk=False): - uid = random.randint(0, 100000) key = f"{_type}_{code}" fdata = json.dumps( {"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}}) @@ -180,76 +134,16 @@ logging.exception(e) finally: pass - # print("璇锋眰缁撴潫", uid, result) - # logger_local_huaxin_l2_upload.info( - # f"{code} 涓婁紶鏁版嵁鑰楁椂-{_type}锛� {round((time.time() - start_time) * 1000, 1)} 鏁版嵁閲�:{len(datas)}") - # print("涓婁紶缁撴灉", result) -# 寰幆璇诲彇涓婁紶鏁版嵁 -def __run_upload_order(code: str, l2_data_callback: L2DataCallBack) -> None: - if code not in tmep_order_detail_queue_dict: - tmep_order_detail_queue_dict[code] = queue.Queue() - if True: - while True: - # print("order task") - try: - if code not in target_codes: - break - order_detail_upload_active_time_dict[code] = time.time() - udatas = [] - while not tmep_order_detail_queue_dict[code].empty(): - temp = tmep_order_detail_queue_dict[code].get() - udatas.append(temp) - if udatas: - # start_time = time.time() - # upload_data(code, "l2_order", udatas) - l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000)) - # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas) - # use_time = int((time.time() - start_time) * 1000) - # if use_time > 10: - # async_log_util.info(logger_local_huaxin_l2_upload, f"{code}-涓婁紶浠g爜鑰楁椂锛歿use_time}ms") - else: - # 娌℃湁鏁版嵁鐨勬椂鍊欓渶绛夊緟锛屾湁鏁版嵁鏃朵笉闇�绛夊緟 - time.sleep(0.001) - except Exception as e: - hx_logger_contact_debug.exception(e) - logger_local_huaxin_l2_error.error(f"涓婁紶璁㈠崟鏁版嵁鍑洪敊锛歿str(e)}") - pass - - -def __run_upload_trans(code, l2_data_callback: L2DataCallBack): - if code not in tmep_transaction_queue_dict: - tmep_transaction_queue_dict[code] = queue.Queue() - while True: - # print("trans task") - try: - if code not in target_codes: - break - transaction_upload_active_time_dict[code] = time.time() - udatas = [] - while not tmep_transaction_queue_dict[code].empty(): - temp = tmep_transaction_queue_dict[code].get() - udatas.append(temp) - if udatas: - # upload_data(code, "l2_trans", udatas) - l2_data_callback.OnL2Transaction(code, udatas) - time.sleep(0.01) - except Exception as e: - logger_local_huaxin_l2_error.error(f"涓婁紶鎴愪氦鏁版嵁鍑洪敊锛歿str(e)}") - - -def __run_upload_common(l2_data_callback: L2DataCallBack): +def __run_upload_common(): print("__run_upload_common") logger_system.info(f"l2_client __run_upload_common 绾跨▼ID:{tool.get_thread_id()}") while True: try: while not common_queue.empty(): temp = common_queue.get() - if temp[1] == "l2_market_data": - l2_data_callback.OnMarketData(temp[0], temp[2]) - else: - upload_data(temp[0], temp[1], temp[2]) + upload_data(temp[0], temp[1], temp[2]) except Exception as e: logger_local_huaxin_l2_error.exception(e) @@ -258,121 +152,30 @@ time.sleep(0.01) -def __run_upload_trading_canceled(l2_data_callback: L2DataCallBack): - print("__run_upload_trading_canceled") - logger_system.info(f"l2_client __run_upload_trading_canceled 绾跨▼ID:{tool.get_thread_id()}") - while True: - try: - temp = trading_canceled_queue.get() - if temp: - logger_local_huaxin_g_cancel.info(f"鍑嗗涓婃姤锛歿temp}") - # upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True) - l2_data_callback.OnTradingOrderCancel(temp[0], temp[1]) - logger_local_huaxin_g_cancel.info(f"涓婃姤鎴愬姛锛歿temp}") - except Exception as e: - logger_local_huaxin_l2_error.exception(e) - logger_local_huaxin_l2_error.error(f"涓婁紶鏅�氭暟鎹嚭閿欙細{str(e)}") - - def __run_log(): print("__run_log") logger_system.info(f"l2_client __run_log 绾跨▼ID:{tool.get_thread_id()}") async_log_util.huaxin_l2_log.run_sync() -__upload_order_threads = {} -__upload_trans_threads = {} - - -# 杩愯涓婁紶浠诲姟 -def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None: - try: - # 濡傛灉浠g爜娌℃湁鍦ㄧ洰鏍囦唬鐮佷腑灏变笉闇�瑕佽繍琛� - if code not in target_codes: - return - # 濡傛灉鏈�杩戠殑娲诲姩鏃堕棿灏忎簬2s灏变笉闇�瑕佽繍琛� - if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[ - code] > 2: - t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True) - t.start() - __upload_order_threads[code] = t - - if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[ - code] > 2: - t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True) - t.start() - __upload_trans_threads[code] = t - finally: - pass - - -def run_upload_common(l2_data_callback: L2DataCallBack): - t = threading.Thread(target=lambda: __run_upload_common(l2_data_callback), daemon=True) - t.start() - - -def run_upload_trading_canceled(l2_data_callback: L2DataCallBack): - t = threading.Thread(target=lambda: __run_upload_trading_canceled(l2_data_callback), daemon=True) +# 閲囩敤socket浼犺緭鏁版嵁 +def run_upload_common(): + t = threading.Thread(target=lambda: __run_upload_common(), daemon=True) t.start() def run_log(): - # G鎾ょ浉鍏虫暟鎹紝鏆傛椂娉ㄩ噴 - # fdatas = log_export.load_huaxin_local_buy_no() - # global buy_order_nos_dict - # buy_order_nos_dict = fdatas t = threading.Thread(target=lambda: __run_log(), daemon=True) t.start() -# 杩愯瀹堟姢绾跨▼ -def run_upload_daemon(_l2_data_callback): - def upload_daemon(): - logger_system.info(f"l2_client upload_daemon 绾跨▼ID:{tool.get_thread_id()}") - while True: - try: - for code in target_codes_add_time: - # 鐩爣浠g爜鍔犲叆2s涔嬪悗鍚姩瀹堟姢 - if time.time() - target_codes_add_time[code] > 2: - if code not in __upload_order_threads or not __upload_order_threads[code].is_alive(): - t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), - daemon=True) - t.start() - __upload_order_threads[code] = t - logger_local_huaxin_l2_upload.info(f"閲嶆柊鍒涘缓L2璁㈠崟涓婁紶绾跨▼锛歿code}") - if code not in __upload_trans_threads or not __upload_trans_threads[code].is_alive(): - t = threading.Thread(target=lambda: __run_upload_trans(code, _l2_data_callback), - daemon=True) - t.start() - __upload_trans_threads[code] = t - logger_local_huaxin_l2_upload.info(f"閲嶆柊鍒涘缓L2鎴愪氦涓婁紶绾跨▼锛歿code}") - except: - pass - finally: - time.sleep(3) - - t = threading.Thread(target=lambda: upload_daemon(), daemon=True) - t.start() - - -def __test(_l2_data_callback): +def __test(): code = "002073" - if code not in tmep_order_detail_queue_dict: - tmep_order_detail_queue_dict[code] = queue.Queue() - target_codes.add(code) - t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True) - t.start() - while True: - try: - tmep_order_detail_queue_dict[code].put_nowait( - ['002073', 0.0, 88100, '1', '2', 103831240, 2011, 18190761, 18069131, 'D', 1693276711224]) - time.sleep(5) - except: - pass + pass -def run_test(_l2_data_callback): - t = threading.Thread(target=lambda: __test(_l2_data_callback), daemon=True) +def run_test(): + t = threading.Thread(target=lambda: __test(), daemon=True) t.start() -- Gitblit v1.8.0