From f51466f1d4563f97b1ec620b70a1c94f01a6a2e1 Mon Sep 17 00:00:00 2001
From: admin <weikou2014>
Date: 星期五, 21 七月 2023 16:23:51 +0800
Subject: [PATCH] 交易优化

---
 l2_client.py |  193 ++++++++++++++++++++++++------------------------
 1 files changed, 96 insertions(+), 97 deletions(-)

diff --git a/l2_client.py b/l2_client.py
index e89cb5c..30239db 100644
--- a/l2_client.py
+++ b/l2_client.py
@@ -2,6 +2,7 @@
 # -*- coding: UTF-8 -*-
 import json
 import logging
+import queue
 import threading
 import time
 
@@ -36,6 +37,9 @@
 SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725",
                  b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"]
 SZ_Bond_Securities = [b"100303", b"109559", b"112617"]
+spi = None
+set_codes_data_queue = queue.Queue()
+market_code_dict = {}
 
 
 class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi):
@@ -50,58 +54,57 @@
         self.__api = api
         self.is_login = False
 
-    # 璁㈤槄浠g爜,[(浠g爜,鏈�浣庢墜鏁�,娑ㄥ仠浠�)]
-    def set_codes_data(self, codes_data):
-        print("set_codes_data", codes_data)
+    def __split_codes(self, codes):
+        szse_codes = []
+        sse_codes = []
+        for code in codes:
+            if code.find("00") == 0:
+                szse_codes.append(code.encode())
+            elif code.find("60") == 0:
+                sse_codes.append(code.encode())
+        return sse_codes, szse_codes
 
-        def split_codes(codes):
-            szse_codes = []
-            sse_codes = []
-            for code in codes:
-                if code.find("00") == 0:
-                    szse_codes.append(code.encode())
-                elif code.find("60") == 0:
-                    sse_codes.append(code.encode())
-            return sse_codes, szse_codes
+    # 鏂板璁㈤槄
 
-        # 鏂板璁㈤槄
-        def subscribe(_codes):
-            sh, sz = split_codes(_codes)
-            logger_l2_subscript.info(f"璁㈤槄涓婅瘉锛歿sh}")
-            logger_l2_subscript.info(f"璁㈤槄娣辫瘉锛歿sz}")
-            if sh:
-                # 璁㈤槄閫愮瑪濮旀墭
-                result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-                logger_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}")
-                # 璁㈤槄閫愮瑪鎴愪氦
-                result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-                logger_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}")
+    # 鍙栨秷璁㈤槄
+    def __unsubscribe(self, _codes):
+        sh, sz = self.__split_codes(_codes)
+        logger_l2_subscript.info(f"鍙栨秷璁㈤槄涓婅瘉锛歿sh}")
+        logger_l2_subscript.info(f"鍙栨秷璁㈤槄娣辫瘉锛歿sz}")
+        if sh:
+            # 鍙栨秷璁㈤槄閫愮瑪濮旀墭
+            self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+            # 鍙栨秷璁㈤槄閫愮瑪鎴愪氦
+            self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+            self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+        if sz:
+            self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
+            self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
+            self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
 
-                result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-                logger_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sh锛歿result}")
-            if sz:
-                result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
-                logger_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sz锛歿result}")
-                result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
-                logger_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sz锛歿result}")
-                result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
-                logger_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sz锛歿result}")
+    def __subscribe(self, _codes):
+        sh, sz = self.__split_codes(_codes)
+        logger_l2_subscript.info(f"璁㈤槄涓婅瘉锛歿sh}")
+        logger_l2_subscript.info(f"璁㈤槄娣辫瘉锛歿sz}")
+        if sh:
+            # 璁㈤槄閫愮瑪濮旀墭
+            result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+            logger_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}")
+            # 璁㈤槄閫愮瑪鎴愪氦
+            result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+            logger_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}")
 
-        # 鍙栨秷璁㈤槄
-        def unsubscribe(_codes):
-            sh, sz = split_codes(_codes)
-            logger_l2_subscript.info(f"鍙栨秷璁㈤槄涓婅瘉锛歿sh}")
-            logger_l2_subscript.info(f"鍙栨秷璁㈤槄娣辫瘉锛歿sz}")
-            if sh:
-                # 鍙栨秷璁㈤槄閫愮瑪濮旀墭
-                self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-                # 鍙栨秷璁㈤槄閫愮瑪鎴愪氦
-                self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-                self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
-            if sz:
-                self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
-                self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
-                self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
+            result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE)
+            logger_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sh锛歿result}")
+        if sz:
+            result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
+            logger_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sz锛歿result}")
+            result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
+            logger_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sz锛歿result}")
+            result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE)
+            logger_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sz锛歿result}")
+
+    def __process_codes_data(self, codes_data):
 
         if not self.is_login and not constant.TEST:
             raise Exception("L2灏氭湭鐧诲綍")
@@ -120,13 +123,15 @@
             l2_data_manager.target_codes.discard(c)
         for c in add_codes:
             l2_data_manager.run_upload_task(c)
-        if len(add_codes) > 10:
-            unsubscribe(add_codes)
-        subscribe(add_codes)
-        unsubscribe(del_codes)
+        self.__subscribe(add_codes)
+        self.__unsubscribe(del_codes)
 
         # 璁剧疆鏈�杩戠殑浠g爜鍒楄〃
         self.latest_codes_set = codes
+
+    # 璁㈤槄浠g爜,[(浠g爜,鏈�浣庢墜鏁�,娑ㄥ仠浠�)]
+    def set_codes_data(self, codes_data):
+        self.__process_codes_data(codes_data)
 
     def set_code_special_watch_volume(self, code, volume):
         # 鏈夋晥鏈熶负3s
@@ -134,44 +139,23 @@
 
     def OnFrontConnected(self):
         print("OnFrontConnected")
+
+        logout_req = lev2mdapi.CTORATstpUserLogoutField()
+        self.__api.ReqUserLogout(logout_req, 1)
+        time.sleep(1)
         # 璇锋眰鐧诲綍
         login_req = lev2mdapi.CTORATstpReqUserLoginField()
-        self.__api.ReqUserLogin(login_req, 1)
+        self.__api.ReqUserLogin(login_req, 2)
 
     def OnRspUserLogin(self, pRspUserLogin, pRspInfo, nRequestID, bIsLast):
         print("OnRspUserLogin: ErrorID[%d] ErrorMsg[%s] RequestID[%d] IsLast[%d]" % (
             pRspInfo['ErrorID'], pRspInfo['ErrorMsg'], nRequestID, bIsLast))
         if pRspInfo['ErrorID'] == 0:
+            print("----L2琛屾儏鐧诲綍鎴愬姛----")
             self.is_login = True
-            if g_SubMarketData:
-                self.__api.SubscribeMarketData(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
-                self.__api.SubscribeMarketData(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
-
-            if g_SubTransaction:
-                self.__api.SubscribeTransaction(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
-                self.__api.SubscribeTransaction(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
-
-            if g_SubOrderDetail:
-                self.__api.SubscribeOrderDetail(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
-                self.__api.SubscribeOrderDetail(SZ_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
-
-            if g_SubXTSTick:
-                self.__api.SubscribeXTSTick(SH_XTS_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
-
-            if g_SubXTSMarketData:
-                self.__api.SubscribeXTSMarketData(SH_XTS_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
-
-            if g_SubBondMarketData:
-                self.__api.SubscribeBondMarketData(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
-
-            if g_SubBondTransaction:
-                self.__api.SubscribeBondTransaction(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
-
-            if g_SubBondOrderDetail:
-                self.__api.SubscribeBondOrderDetail(SZ_Bond_Securities, lev2mdapi.TORA_TSTP_EXD_SZSE);
-            # 4.0.5鐗堟湰鎺ュ彛
-            if g_SubNGTSTick:
-                self.__api.SubscribeNGTSTick(SH_Securities, lev2mdapi.TORA_TSTP_EXD_SSE);
+            # t1 = threading.Thread(target=lambda: self.__set_codes_data(), daemon=True)
+            # # 鍚庡彴杩愯
+            # t1.start()
 
     def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
         print("OnRspSubMarketData")
@@ -184,22 +168,32 @@
 
     def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
         print("OnRspSubOrderDetail")
+        # try:
         print("璁㈤槄缁撴灉锛�", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"],
               pRspInfo["ErrorMsg"])
-        logger_l2_subscript.info(f"璁㈤槄缁撴灉锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
+        logger_l2_subscript.info(
+            f"璁㈤槄缁撴灉锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}")
         if pRspInfo["ErrorID"] == 0:
             print("璁㈤槄鎴愬姛")
             self.subscripted_codes.add(pSpecificSecurity['SecurityID'])
         if bIsLast == 1:
-            l2_data_manager.add_subscript_codes(self.subscripted_codes)
+            t1 = threading.Thread(target=lambda: l2_data_manager.add_subscript_codes(self.subscripted_codes),
+                                  daemon=True)
+            # 鍚庡彴杩愯
+            t1.start()
 
     def OnRspUnSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
         print("OnRspUnSubOrderDetail")
-        code = pSpecificSecurity['SecurityID']
-        if code in self.subscripted_codes:
-            self.subscripted_codes.remove(code)
-        if bIsLast == 1:
-            l2_data_manager.add_subscript_codes(self.subscripted_codes)
+        try:
+            code = pSpecificSecurity['SecurityID']
+            self.subscripted_codes.discard(code)
+            if bIsLast == 1:
+                t1 = threading.Thread(target=lambda: l2_data_manager.add_subscript_codes(self.subscripted_codes),
+                                      daemon=True)
+                # 鍚庡彴杩愯
+                t1.start()
+        except Exception as e:
+            logging.exception(e)
 
     def OnRspSubBondMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast):
         print("OnRspSubBondMarketData")
@@ -238,7 +232,10 @@
                  (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']),
                  (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5'])
              ]}
-        l2_data_manager.add_market_data(d)
+        market_code_dict[pDepthMarketData['SecurityID']] = time.time()
+
+        t1 = threading.Thread(target=lambda: l2_data_manager.add_market_data(d), daemon=True)
+        t1.start()
 
         # 杈撳嚭琛屾儏蹇収鏁版嵁
         # print(
@@ -273,14 +270,14 @@
     def OnRtnTransaction(self, pTransaction):
         code = str(pTransaction['SecurityID'])
         min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code)
-        if min_volume is None:
-            # 榛樿绛涢��50w
-            if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
-                return
-        elif pTransaction['TradeVolume'] < min_volume:
-            return
         # 杈撳嚭閫愮瑪鎴愪氦鏁版嵁
         if pTransaction['ExecType'] == b"2":
+            if min_volume is None:
+                # 榛樿绛涢��50w
+                if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000:
+                    return
+            elif pTransaction['TradeVolume'] < min_volume:
+                return
             # 鎾ゅ崟
             item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'],
                     "Volume": pTransaction['TradeVolume'],
@@ -302,7 +299,7 @@
             print("閫愮瑪濮旀墭", item)
             l2_data_manager.add_l2_order_detail(item, True)
         else:
-            if abs(pTransaction['TradePrice'] - limit_up_price) < 0.001:
+            if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201:
                 # 娑ㄥ仠浠�
                 # 鎴愪氦
                 item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'],
@@ -483,6 +480,8 @@
             SendResponseSkManager.send_normal_response("l2_cmd",
                                                        SendResponseSkManager.load_response(client_id, request_id,
                                                                                            {"code": 0, "msg": "璁剧疆鎴愬姛"}))
+
+
         except Exception as e:
             logging.exception(e)
             SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))

--
Gitblit v1.8.0