From f273791e2337215a2a3bd7e3c46c23c69bcb1c7c Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期三, 23 八月 2023 12:12:16 +0800
Subject: [PATCH] bug修复

---
 huaxin_client/l1_subscript_codes_manager.py |   98 ++++++++++++++++++++++++
 trade/huaxin/trade_server.py                |   12 ++
 outside_api_command_manager.py              |    6 +
 huaxin_client/l1_client.py                  |   48 +++--------
 huaxin_client/l2_data_manager.py            |    2 
 test/test.py                                |   17 +--
 6 files changed, 135 insertions(+), 48 deletions(-)

diff --git a/huaxin_client/l1_client.py b/huaxin_client/l1_client.py
index ba3b01e..2a62460 100644
--- a/huaxin_client/l1_client.py
+++ b/huaxin_client/l1_client.py
@@ -5,7 +5,7 @@
 import threading
 import time
 
-from huaxin_client import socket_util
+from huaxin_client import socket_util, l1_subscript_codes_manager
 import xmdapi
 from huaxin_client import tool
 from huaxin_client.client_network import SendResponseSkManager
@@ -25,38 +25,6 @@
         if result_json.get("code") == 0:
             return True
     return False
-
-
-def get_level1_codes():
-    type_ = "get_level1_codes"
-    fdata = json.dumps(
-        {"type": type_, "data": {}})
-    msg = fdata.encode("utf-8")
-    # 鍙戦�佹秷鎭�
-    for i in range(3):
-        try:
-            sk = SendResponseSkManager.create_send_response_sk()
-            msg = socket_util.load_header(msg)
-            sk.sendall(msg)
-            result, header_str = socket_util.recv_data(sk)
-            # 璇诲彇浠g爜
-            result_json = json.loads(result)
-            if result_json["code"] == 0:
-                codes = result_json["data"]
-                codes_sh = []
-                codes_sz = []
-                for code in codes:
-                    if code.find("00") == 0:
-                        codes_sz.append(code.encode("utf-8"))
-                    else:
-                        codes_sh.append(code.encode("utf-8"))
-                print("鑾峰彇璁㈤槄鐩爣浠f暟閲忥細", len(codes_sh), len(codes_sz))
-                return codes_sh, codes_sz
-        except ConnectionResetError:
-            SendResponseSkManager.del_send_response_sk(type_)
-        except BrokenPipeError:
-            SendResponseSkManager.del_send_response_sk(type_)
-    return None, None
 
 
 class MdSpi(xmdapi.CTORATstpXMdSpi):
@@ -149,6 +117,9 @@
         #        pMarketDataField.AskVolume1, pMarketDataField.UpperLimitPrice, pMarketDataField.LowerLimitPrice))
 
 
+__latest_subscript_codes = set()
+
+
 def __upload_codes_info(pipe_l2, datas):
     if not tool.is_trade_time():
         return
@@ -158,6 +129,13 @@
         {"type": type_, "data": {"data": datas}})
     if pipe_l2 is not None:
         pipe_l2.send(fdata)
+    # 璁板綍鏂板鍔犵殑浠g爜
+    codes = set([x[0] for x in datas])
+    add_codes = codes - __latest_subscript_codes
+    __latest_subscript_codes.clear()
+    for c in codes:
+        __latest_subscript_codes.add(c)
+    logger_local_huaxin_l1.info(f"鏂板鍔犺闃呯殑浠g爜锛歿add_codes}")
 
 
 def run(pipe_l2):
@@ -166,7 +144,7 @@
     codes_sz = []
     for i in range(15):
         try:
-            codes_sh, codes_sz = get_level1_codes()
+            codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
             logger_local_huaxin_l1.info(f"鑾峰彇涓婅瘉锛屾繁璇佷唬鐮佹暟閲忥細sh-{len(codes_sh)} sz-{len(codes_sz)}")
             break
         except Exception as e:
@@ -204,6 +182,8 @@
             if len(level1_data_dict) < 1:
                 continue
             # 鏍规嵁娑ㄥ箙鎺掑簭
+
+            # (浠g爜,鐜颁环,娑ㄥ箙,閲�,鏃堕棿)
             list_ = [level1_data_dict[k] for k in level1_data_dict]
             flist = []
             for d in list_:
diff --git a/huaxin_client/l1_subscript_codes_manager.py b/huaxin_client/l1_subscript_codes_manager.py
new file mode 100644
index 0000000..fb89b63
--- /dev/null
+++ b/huaxin_client/l1_subscript_codes_manager.py
@@ -0,0 +1,98 @@
+"""
+L1闇�瑕佽闃呯殑浠g爜绠$悊
+"""
+import json
+import os
+
+import constant
+from huaxin_client import socket_util
+from huaxin_client.client_network import SendResponseSkManager
+
+
+# 璇锋眰l1璁㈤槄鐨勭洰鏍囦唬鐮�
+def request_l1_subscript_target_codes():
+    type_ = "get_level1_codes"
+    fdata = json.dumps(
+        {"type": type_, "data": {}})
+    msg = fdata.encode("utf-8")
+    # 鍙戦�佹秷鎭�
+    for i in range(3):
+        try:
+            sk = SendResponseSkManager.create_send_response_sk()
+            msg = socket_util.load_header(msg)
+            sk.sendall(msg)
+            result, header_str = socket_util.recv_data(sk)
+            # 璇诲彇浠g爜
+            result_json = json.loads(result)
+            if result_json["code"] == 0:
+                codes = result_json["data"]
+                codes_sh = []
+                codes_sz = []
+                for code in codes:
+                    if code.find("00") == 0:
+                        codes_sz.append(code.encode("utf-8"))
+                    else:
+                        codes_sh.append(code.encode("utf-8"))
+                print("鑾峰彇璁㈤槄鐩爣浠f暟閲忥細", len(codes_sh), len(codes_sz))
+                return codes_sh, codes_sz
+        except ConnectionResetError:
+            SendResponseSkManager.del_send_response_sk(type_)
+        except BrokenPipeError:
+            SendResponseSkManager.del_send_response_sk(type_)
+    return None, None
+
+
+__DIR_PATH = f"{constant.get_path_prefix()}/codes"
+__CODE_SH_PATH = f"{__DIR_PATH}/codes_sh.text"
+__CODE_SZ_PATH = f"{__DIR_PATH}/codes_sz.text"
+
+
+# 淇濆瓨鐩爣浠g爜
+def save_codes(codes_sh, codes_sz):
+    if not os.path.exists(__DIR_PATH):
+        os.mkdir(__DIR_PATH)
+    with open(__CODE_SH_PATH, 'w') as f:
+        for c in codes_sh:
+            if type(c) == bytes:
+                f.write(c.encode('utf-8'))
+            else:
+                f.write(c)
+            f.write("\n")
+
+    with open(__CODE_SZ_PATH, 'w') as f:
+        for c in codes_sz:
+            if type(c) == bytes:
+                f.write(c.decode('utf-8'))
+            else:
+                f.write(c)
+            f.write("\n")
+
+
+def get_codes_from_file():
+    codes_sh, codes_sz = [], []
+    if os.path.exists(__CODE_SH_PATH):
+        with open(__CODE_SH_PATH, 'r') as f:
+            line = f.readline()
+            while line:
+                if line.strip():
+                    codes_sh.append(line.strip().encode('utf-8'))
+                line = f.readline()
+    if os.path.exists(__CODE_SZ_PATH):
+        with open(__CODE_SZ_PATH, 'r') as f:
+            line = f.readline()
+            while line:
+                if line.strip():
+                    codes_sz.append(line.strip().encode('utf-8'))
+                line = f.readline()
+    return codes_sh, codes_sz
+
+
+def get_codes():
+    codes_sh, codes_sz = get_codes_from_file()
+    if not codes_sh or not codes_sz:
+        return request_l1_subscript_target_codes()
+    return codes_sh, codes_sz
+
+
+if __name__ == '__main__':
+    pass
diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py
index 56355dc..a881b80 100644
--- a/huaxin_client/l2_data_manager.py
+++ b/huaxin_client/l2_data_manager.py
@@ -60,7 +60,7 @@
 
     tmep_order_detail_queue_dict[code].put(
         (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'],
-         data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus']), int(time.time()*1000))
+         data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], int(time.time()*1000)))
 
 
 # 娣诲姞閫愮瑪鎴愪氦
diff --git a/outside_api_command_manager.py b/outside_api_command_manager.py
index eaa9221..29755c1 100644
--- a/outside_api_command_manager.py
+++ b/outside_api_command_manager.py
@@ -51,6 +51,7 @@
 API_TYPE_CODE_ATRRIBUTE = "code_attribute"  # 浠g爜灞炴��
 API_TYPE_CODE_TRADE_STATE = "code_trade_state"  # 浠g爜浜ゆ槗鐘舵��
 API_TYPE_GET_ENV = "get_env"  # 鑾峰彇鐜淇℃伅
+API_TYPE_SYNC_L1_TARGET_CODES = "sync_l1_subscript_codes"  # 鍚屾L1闇�瑕佽闃呯殑浠g爜
 
 
 class ActionCallback(object):
@@ -86,6 +87,9 @@
         pass
 
     def OnGetEnvInfo(self, client_id, request_id, data):
+        pass
+
+    def OnSyncL2SubscriptCodes(self, client_id, request_id):
         pass
 
 
@@ -181,6 +185,8 @@
                             cls.action_callback.OnGetCodeTradeState(client_id, request_id, data)
                         elif content_type == API_TYPE_GET_ENV:
                             cls.action_callback.OnGetEnvInfo(client_id, request_id, data)
+                        elif content_type == API_TYPE_GET_ENV:
+                            cls.action_callback.OnSyncL2SubscriptCodes(client_id, request_id)
                     except Exception as e:
                         logging.exception(e)
                         pass
diff --git a/test/test.py b/test/test.py
index 63e1579..5d8df2b 100644
--- a/test/test.py
+++ b/test/test.py
@@ -1,9 +1,8 @@
-import logging
 import queue
-import threading
 import time
 
-from log_module.log import logger_l2_process_time, logger_debug
+from huaxin_client import l1_subscript_codes_manager
+from log_module.log import logger_debug
 
 _dict = {}
 
@@ -14,14 +13,10 @@
     start_time = time.time()
     _queue.put(index)
     end_time = time.time()
-    logger_debug.debug(end_time -start_time)
+    logger_debug.debug(end_time - start_time)
 
 
 if __name__ == "__main__":
-    for k in range(1):
-        _dict.clear()
-        for i in range(0, 1000):
-            threading.Thread(target=lambda: add(i), daemon=True).start()
-        time.sleep(2)
-
-    input()
+    # l1_subscript_codes_manager.save_codes(["600100", "600102"], ["000123", "000146"])
+    code_sh,codes_sz =l1_subscript_codes_manager.request_l1_subscript_target_codes()
+    l1_subscript_codes_manager.save_codes(code_sh,codes_sz)
diff --git a/trade/huaxin/trade_server.py b/trade/huaxin/trade_server.py
index 7aad862..b0ff212 100644
--- a/trade/huaxin/trade_server.py
+++ b/trade/huaxin/trade_server.py
@@ -21,6 +21,7 @@
 from code_attribute import gpcode_manager
 from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
 from db.redis_manager_delegate import RedisUtils
+from huaxin_client import l1_subscript_codes_manager
 from huaxin_client.client_network import SendResponseSkManager
 from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress
 from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer
@@ -217,8 +218,7 @@
                                             code)
                                         logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}", code,
                                                                        buy_progress_index)
-                                        buy_time = total_datas[buy_progress_index]["val"][
-                                            "time"]
+                                        buy_time = total_datas[buy_progress_index]["val"]["time"]
                                         limit_up_price = gpcode_manager.get_limit_up_price(code)
                                         if buy_exec_index:
                                             need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
@@ -680,6 +680,14 @@
         except Exception as e:
             self.send_response({"code": 1, "msg": str(e)}, client_id, request_id)
 
+    # 鍚屾L2璁㈤槄浠g爜
+    def OnSyncL2SubscriptCodes(self, client_id, request_id):
+        codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes()
+        if codes_sh and codes_sz:
+            l1_subscript_codes_manager.save_codes(codes_sh, codes_sz)
+        result = {"code": 0, "data": {"codes_sh": len(codes_sh), "codes_sz": len(codes_sz)}}
+        self.send_response(result, client_id, request_id)
+
 
 def run(pipe_trade, pipe_l1):
     # 鎵ц涓�浜涘垵濮嬪寲鏁版嵁

--
Gitblit v1.8.0