From ca310f014336d93eba73ed5010c1c5645424a1e0 Mon Sep 17 00:00:00 2001
From: admin <weikou2014>
Date: 星期五, 04 八月 2023 16:06:28 +0800
Subject: [PATCH] 交易优化

---
 l2_data_manager.py |   43 ++++++++++++++++++++++++++++++++-----------
 1 files changed, 32 insertions(+), 11 deletions(-)

diff --git a/l2_data_manager.py b/l2_data_manager.py
index 512803d..89d2e49 100644
--- a/l2_data_manager.py
+++ b/l2_data_manager.py
@@ -4,6 +4,7 @@
 import random
 import threading
 import time
+import socket_util
 
 from client_network import SendResponseSkManager
 from mylog import logger_l2_error, logger_l2_upload
@@ -15,6 +16,7 @@
 tmep_order_detail_queue_dict = {}
 tmep_transaction_queue_dict = {}
 target_codes = set()
+common_queue = queue.Queue()
 
 
 # 娣诲姞濮旀墭璇︽儏
@@ -59,24 +61,24 @@
 
 def add_market_data(data):
     code = data['securityID']
-    upload_data(code, "l2_market_data", data)
+    # 鍔犲叆涓婁紶闃熷垪
+    common_queue.put((code, "l2_market_data", data))
 
 
 def add_subscript_codes(codes):
-    upload_data('', "l2_subscript_codes", list(codes))
+    print("add_subscript_codes", codes)
+    # 鍔犲叆涓婁紶闃熷垪
+    common_queue.put(('', "l2_subscript_codes", list(codes)))
 
 
 def __send_response(sk, msg):
-    msg = SendResponseSkManager.format_response(msg)
+    msg = socket_util.load_header(msg)
     sk.sendall(msg)
-    while True:
-        result = sk.recv(1024)
-        if result:
-            result = result.decode("utf-8")
-            result_json = json.loads(result)
-            if result_json.get("code") == 0:
-                return True
-            break
+    result, header_str = socket_util.recv_data(sk)
+    if result:
+        result_json = json.loads(result)
+        if result_json.get("code") == 0:
+            return True
     return False
 
 
@@ -110,6 +112,7 @@
     # print("璇锋眰寮�濮�", uid, len(datas), len(fdata), f"{fdata[:20]}...{fdata[-20:]}")
     result = None
     start_time = time.time()
+    logger_l2_upload.info(f"{code} 涓婁紶鏁版嵁寮�濮�-{_type}")
     try:
         result = send_response(key, fdata.encode('utf-8'))
     except Exception as e:
@@ -164,6 +167,19 @@
             logger_l2_error.error(f"涓婁紶鎴愪氦鏁版嵁鍑洪敊锛歿str(e)}")
 
 
+def __run_upload_common():
+    print("__run_upload_common")
+    while True:
+        try:
+            while not common_queue.empty():
+                temp = common_queue.get()
+                upload_data(temp[0], temp[1], temp[2])
+            time.sleep(0.01)
+        except Exception as e:
+            logger_l2_error.exception(e)
+            logger_l2_error.error(f"涓婁紶鏅�氭暟鎹嚭閿欙細{str(e)}")
+
+
 # 杩愯涓婁紶浠诲姟
 def run_upload_task(code):
     # 濡傛灉浠g爜娌℃湁鍦ㄧ洰鏍囦唬鐮佷腑灏变笉闇�瑕佽繍琛�
@@ -179,6 +195,11 @@
         t.start()
 
 
+def run_upload_common():
+    t = threading.Thread(target=lambda: __run_upload_common(), daemon=True)
+    t.start()
+
+
 if __name__ == "__main__":
     code = "603809"
     target_codes.add(code)

--
Gitblit v1.8.0