From ca310f014336d93eba73ed5010c1c5645424a1e0 Mon Sep 17 00:00:00 2001 From: admin <weikou2014> Date: 星期五, 04 八月 2023 16:06:28 +0800 Subject: [PATCH] 交易优化 --- l2_data_manager.py | 43 ++++++++++++++++++++++++++++++++----------- 1 files changed, 32 insertions(+), 11 deletions(-) diff --git a/l2_data_manager.py b/l2_data_manager.py index 512803d..89d2e49 100644 --- a/l2_data_manager.py +++ b/l2_data_manager.py @@ -4,6 +4,7 @@ import random import threading import time +import socket_util from client_network import SendResponseSkManager from mylog import logger_l2_error, logger_l2_upload @@ -15,6 +16,7 @@ tmep_order_detail_queue_dict = {} tmep_transaction_queue_dict = {} target_codes = set() +common_queue = queue.Queue() # 娣诲姞濮旀墭璇︽儏 @@ -59,24 +61,24 @@ def add_market_data(data): code = data['securityID'] - upload_data(code, "l2_market_data", data) + # 鍔犲叆涓婁紶闃熷垪 + common_queue.put((code, "l2_market_data", data)) def add_subscript_codes(codes): - upload_data('', "l2_subscript_codes", list(codes)) + print("add_subscript_codes", codes) + # 鍔犲叆涓婁紶闃熷垪 + common_queue.put(('', "l2_subscript_codes", list(codes))) def __send_response(sk, msg): - msg = SendResponseSkManager.format_response(msg) + msg = socket_util.load_header(msg) sk.sendall(msg) - while True: - result = sk.recv(1024) - if result: - result = result.decode("utf-8") - result_json = json.loads(result) - if result_json.get("code") == 0: - return True - break + result, header_str = socket_util.recv_data(sk) + if result: + result_json = json.loads(result) + if result_json.get("code") == 0: + return True return False @@ -110,6 +112,7 @@ # print("璇锋眰寮�濮�", uid, len(datas), len(fdata), f"{fdata[:20]}...{fdata[-20:]}") result = None start_time = time.time() + logger_l2_upload.info(f"{code} 涓婁紶鏁版嵁寮�濮�-{_type}") try: result = send_response(key, fdata.encode('utf-8')) except Exception as e: @@ -164,6 +167,19 @@ logger_l2_error.error(f"涓婁紶鎴愪氦鏁版嵁鍑洪敊锛歿str(e)}") +def __run_upload_common(): + print("__run_upload_common") + while True: + try: + while not common_queue.empty(): + temp = common_queue.get() + upload_data(temp[0], temp[1], temp[2]) + time.sleep(0.01) + except Exception as e: + logger_l2_error.exception(e) + logger_l2_error.error(f"涓婁紶鏅�氭暟鎹嚭閿欙細{str(e)}") + + # 杩愯涓婁紶浠诲姟 def run_upload_task(code): # 濡傛灉浠g爜娌℃湁鍦ㄧ洰鏍囦唬鐮佷腑灏变笉闇�瑕佽繍琛� @@ -179,6 +195,11 @@ t.start() +def run_upload_common(): + t = threading.Thread(target=lambda: __run_upload_common(), daemon=True) + t.start() + + if __name__ == "__main__": code = "603809" target_codes.add(code) -- Gitblit v1.8.0