From 6bbfbbb16d792f7737ec86cabdba5c0e98dcf4b4 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 29 八月 2025 17:41:29 +0800
Subject: [PATCH] 有涨停买撤单要触发撤单计算

---
 huaxin_client/l1_client.py |  231 +++++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 174 insertions(+), 57 deletions(-)

diff --git a/huaxin_client/l1_client.py b/huaxin_client/l1_client.py
index 2a62460..0ee112f 100644
--- a/huaxin_client/l1_client.py
+++ b/huaxin_client/l1_client.py
@@ -1,15 +1,24 @@
 # -*- coding: utf-8 -*-
 import json
 import logging
+import multiprocessing
 import os
 import threading
 import time
 
 from huaxin_client import socket_util, l1_subscript_codes_manager
 import xmdapi
-from huaxin_client import tool
-from huaxin_client.client_network import SendResponseSkManager
-from log_module.log import logger_system, logger_local_huaxin_l1
+from huaxin_client import tool, constant
+from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript, logger_debug
+from third_data import custom_block_in_money_manager
+from utils import tool as out_tool
+
+################B绫�##################
+ADDRESS = "udp://224.224.1.19:7880"
+
+################A绫�##################
+if constant.IS_A:
+    ADDRESS = "udp://224.224.1.9:7880"
 
 level1_data_dict = {
 
@@ -45,9 +54,29 @@
         login_req = xmdapi.CTORATstpReqUserLoginField()
         self.__api.ReqUserLogin(login_req, 1)
 
+    def subscribe_codes(self, codes_sh, codes_sz):
+        # 閲嶆柊璁㈤槄浠g爜
+        if codes_sh:
+            ret = self.__api.SubscribeMarketData(codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
+            if ret != 0:
+                # print('SubscribeMarketData fail, ret[%d]' % ret)
+                pass
+            else:
+                # print('SubscribeMarketData success, ret[%d]' % ret)
+                pass
+
+        if codes_sz:
+            ret = self.__api.SubscribeMarketData(codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
+            if ret != 0:
+                # print('SubscribeMarketData fail, ret[%d]' % ret)
+                pass
+            else:
+                # print('SubscribeMarketData success, ret[%d]' % ret)
+                pass
+
     def OnRspUserLogin(self, pRspUserLoginField, pRspInfoField, nRequestID):
         if pRspInfoField.ErrorID == 0:
-            print('Login success! [%d]' % nRequestID)
+            # print('Login success! [%d]' % nRequestID)
 
             '''
             璁㈤槄琛屾儏
@@ -56,21 +85,7 @@
 			鍏跺畠鎯呭喌锛岃闃卻ub_arr闆嗗悎涓殑鍚堢害琛屾儏
             '''
 
-            print(f"璁㈤槄鏁伴噺锛歴h-{len(self.codes_sh)}  sz-{len(self.codes_sz)}")
-            if self.codes_sh:
-                ret = self.__api.SubscribeMarketData(self.codes_sh, xmdapi.TORA_TSTP_EXD_SSE)
-                if ret != 0:
-                    print('SubscribeMarketData fail, ret[%d]' % ret)
-                else:
-                    print('SubscribeMarketData success, ret[%d]' % ret)
-
-            if self.codes_sz:
-                ret = self.__api.SubscribeMarketData(self.codes_sz, xmdapi.TORA_TSTP_EXD_SZSE)
-                if ret != 0:
-                    print('SubscribeMarketData fail, ret[%d]' % ret)
-                else:
-                    print('SubscribeMarketData success, ret[%d]' % ret)
-
+            self.subscribe_codes(self.codes_sh, self.codes_sz)
             # sub_arr = [b'600004']
             # ret = self.__api.UnSubscribeMarketData(sub_arr, xmdapi.TORA_TSTP_EXD_SSE)
             # if ret != 0:
@@ -80,77 +95,147 @@
 
 
         else:
-            print('Login fail!!! [%d] [%d] [%s]'
-                  % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
+            pass
+            # print('Login fail!!! [%d] [%d] [%s]'
+            #       % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
 
     def OnRspSubMarketData(self, pSpecificSecurityField, pRspInfoField):
         if pRspInfoField.ErrorID == 0:
-            print('OnRspSubMarketData: OK!')
+            # print('OnRspSubMarketData: OK!')
+            pass
         else:
-            print('OnRspSubMarketData: Error! [%d] [%s]'
-                  % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
+            # print('OnRspSubMarketData: Error! [%d] [%s]'
+            #       % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
+            pass
 
     def OnRspUnSubMarketData(self, pSpecificSecurityField, pRspInfoField):
         if pRspInfoField.ErrorID == 0:
-            print('OnRspUnSubMarketData: OK!')
+            # print('OnRspUnSubMarketData: OK!')
+            pass
         else:
-            print('OnRspUnSubMarketData: Error! [%d] [%s]'
-                  % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
+            pass
+            # print('OnRspUnSubMarketData: Error! [%d] [%s]'
+            #       % (pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
 
     def OnRtnMarketData(self, pMarketDataField):
         if pMarketDataField.SecurityName.find("S") == 0:
             return
         if pMarketDataField.SecurityName.find("ST") >= 0:
             return
-        close_price = round(pMarketDataField.UpperLimitPrice / 1.1, 2)
-        rate = round((pMarketDataField.LastPrice - close_price) * 100 / close_price, 2)
-        # print(pMarketDataField.SecurityID, pMarketDataField.SecurityName, rate, pMarketDataField.Volume)
-
+        close_price = pMarketDataField.PreClosePrice
+        lastPrice = pMarketDataField.LastPrice
+        if pMarketDataField.BidPrice1:
+            lastPrice = pMarketDataField.BidPrice1
+        rate = round((lastPrice - close_price) * 100 / close_price, 2)
+        if out_tool.get_limit_up_rate(pMarketDataField.SecurityID) > 1.1001:
+            # 娑ㄥ仠鏉�20%浠ヤ笂鐨勬墦鎶�
+            rate = rate / 2
+        # (浠g爜, 鐜颁环, 娑ㄥ箙, 閲�, 褰撳墠鏃堕棿, 涔�1浠�, 涔�1閲�, 涔�2浠�, 涔�2閲�, 鏇存柊鏃堕棿, 鍗�1浠�, 鍗�1閲�)
         level1_data_dict[pMarketDataField.SecurityID] = (
-            pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time())
-        # print(
-        #     "SecurityID[%s] SecurityName[%s] LastPrice[%.2f] Volume[%d] Turnover[%.2f] BidPrice1[%.2f] BidVolume1[%d] AskPrice1[%.2f] AskVolume1[%d] UpperLimitPrice[%.2f] LowerLimitPrice[%.2f]"
-        #     % (pMarketDataField.SecurityID, pMarketDataField.SecurityName, pMarketDataField.LastPrice,
-        #        pMarketDataField.Volume,
-        #        pMarketDataField.Turnover, pMarketDataField.BidPrice1, pMarketDataField.BidVolume1,
-        #        pMarketDataField.AskPrice1,
-        #        pMarketDataField.AskVolume1, pMarketDataField.UpperLimitPrice, pMarketDataField.LowerLimitPrice))
+            pMarketDataField.SecurityID, pMarketDataField.LastPrice, rate, pMarketDataField.Volume, time.time(),
+            pMarketDataField.BidPrice1, pMarketDataField.BidVolume1, pMarketDataField.BidPrice2,
+            pMarketDataField.BidVolume2, pMarketDataField.UpdateTime, pMarketDataField.AskPrice1,
+            pMarketDataField.AskVolume1)
 
 
 __latest_subscript_codes = set()
 
 
-def __upload_codes_info(pipe_l2, datas):
-    if not tool.is_trade_time():
+def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas):
+    if not tool.is_trade_time() and not tool.is_pre_trade_time():
         return
     # 涓婁紶鏁版嵁
     type_ = "set_target_codes"
+    request_id = f"sb_{int(time.time() * 1000)}"
     fdata = json.dumps(
-        {"type": type_, "data": {"data": datas}})
-    if pipe_l2 is not None:
-        pipe_l2.send(fdata)
+        {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)})
+    if queue_l1_w_strategy_r is not None:
+        queue_l1_w_strategy_r.put_nowait(fdata)
     # 璁板綍鏂板鍔犵殑浠g爜
     codes = set([x[0] for x in datas])
     add_codes = codes - __latest_subscript_codes
     __latest_subscript_codes.clear()
     for c in codes:
         __latest_subscript_codes.add(c)
-    logger_local_huaxin_l1.info(f"鏂板鍔犺闃呯殑浠g爜锛歿add_codes}")
+    if add_codes:
+        logger_local_huaxin_l1.info(f"({request_id})鏂板鍔犺闃呯殑浠g爜锛歿add_codes}")
 
 
-def run(pipe_l2):
-    logger_local_huaxin_l1.info("杩愯l1璁㈤槄鏈嶅姟")
+# 閲嶆柊璁㈤槄浠g爜
+def re_subscript(spi: MdSpi):
+    try:
+        codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes()
+        if len(codes_sh) > 100 and len(codes_sz) > 100:
+            logger_local_huaxin_l1.info(f"閲嶆柊璁㈤槄 sh-{len(codes_sh)} sz-{len(codes_sz)}")
+            spi.subscribe_codes(codes_sh, codes_sz)
+    except:
+        pass
+
+
+__position_codes = set()
+
+
+def __read_from_strategy(queue_l1_r_strategy_w: multiprocessing.Queue):
+    while True:
+        try:
+            data = queue_l1_r_strategy_w.get()
+            if type(data) == str:
+                data = json.loads(data)
+            if data["type"] == "set_position_codes":
+                codes = set(data["data"])
+                global __position_codes
+                __position_codes = codes
+            logger_local_huaxin_l1.info(f"鏀跺埌绛栫暐娑堟伅锛歿data}", )
+        except:
+            pass
+        finally:
+            time.sleep(1)
+
+
+def __run_subscript_task(spi):
+    """
+    杩愯璁㈤槄浠诲姟锛屽湪9:19鍒�9:29涔嬮棿寮�濮嬭闃�
+    @return:
+    """
+    is_re_subscript = False
+    while True:
+        try:
+            # 鍒ゆ柇鏄惁闇�瑕侀噸鏂拌闃�
+            if tool.is_pre_trade_time():
+                re_subscript(spi)
+                is_re_subscript = True
+            if is_re_subscript:
+                break
+        except:
+            pass
+        finally:
+            time.sleep(3)
+
+
+def run(queue_l1_w_strategy_r, queue_l1_r_strategy_w, fixed_codes=None):
+    """
+    杩愯l1璁㈤槄浠诲姟
+
+    @param queue_l1_w_strategy_r: L1鏂瑰啓锛岀瓥鐣ユ柟璇�
+    @param queue_l1_r_strategy_w: L1鏂硅锛岀瓥鐣ユ柟鍐�
+    @param fixed_codes: 鍥哄畾瑕佽繑鍥炴暟鎹殑浠g爜
+    @return:
+    """
+    if fixed_codes is None:
+        fixed_codes = set()
+    logger_local_huaxin_l1.info(f"杩愯l1璁㈤槄鏈嶅姟锛屽浐瀹氫唬鐮侊細{fixed_codes}")
     codes_sh = []
     codes_sz = []
     for i in range(15):
         try:
+            logger_local_huaxin_l1.info("寮�濮嬭幏鍙栫洰鏍囦唬鐮�")
             codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
             logger_local_huaxin_l1.info(f"鑾峰彇涓婅瘉锛屾繁璇佷唬鐮佹暟閲忥細sh-{len(codes_sh)} sz-{len(codes_sz)}")
             break
         except Exception as e:
             logger_local_huaxin_l1.exception(e)
             time.sleep(4)
-
+    logger_system.info(f"鑾峰彇L1璁㈤槄鐩爣绁ㄦ暟閲忥細sh-{len(codes_sh)} sz-{len(codes_sz)}")
     # 鎵撳嵃鎺ュ彛鐗堟湰鍙�
     print(xmdapi.CTORATstpXMdApi_GetApiVersion())
 
@@ -164,20 +249,36 @@
     api.RegisterSpi(spi)
 
     # 娉ㄥ唽鍗曚釜琛屾儏鍓嶇疆鏈嶅姟鍦板潃
-    # api.RegisterFront("tcp://224.224.1.19:7880")
+    # api.RegisterFront("tcp://210.14.72.16:9402")
     # 娉ㄥ唽澶氫釜琛屾儏鍓嶇疆鏈嶅姟鍦板潃锛岀敤閫楀彿闅斿紑
     # api.RegisterFront("tcp://10.0.1.101:6402,tcp://10.0.1.101:16402")
     # 娉ㄥ唽鍚嶅瓧鏈嶅姟鍣ㄥ湴鍧�锛屾敮鎸佸鏈嶅姟鍦板潃閫楀彿闅斿紑
     # api.RegisterNameServer('tcp://224.224.3.19:7888')
     # api.RegisterNameServer('tcp://10.0.1.101:52370,tcp://10.0.1.101:62370')
-    api.RegisterMulticast("udp://224.224.1.19:7880", None, "")
+
+    # -------------------------姝e紡鍦板潃B绫�-------------------------------
+    api.RegisterMulticast(ADDRESS, None, "")
+
+    # -------------------------姝e紡鍦板潃A绫�-------------------------------
+    # api.RegisterMulticast("udp://224.224.1.9:7880", None, "")
 
     # 鍚姩鎺ュ彛
     api.Init()
 
+    logger_system.info("L1璁㈤槄鏈嶅姟鍚姩鎴愬姛")
+    # 娴嬭瘯閾捐矾
+    # level1_data_dict["000969"] = (
+    #     "000969", 9.46, 9.11, 771000*100, time.time())
+    # level1_data_dict["002292"] = (
+    #     "002292", 8.06, 9.96, 969500 * 100, time.time())
+
+    threading.Thread(target=__read_from_strategy, args=(queue_l1_r_strategy_w,), daemon=True).start()
+
+    threading.Thread(target=__run_subscript_task, args=(spi,), daemon=True).start()
+
     # 绛夊緟绋嬪簭缁撴潫
     while True:
-        print("鏁伴噺", len(level1_data_dict))
+        # print("鏁伴噺", len(level1_data_dict))
         try:
             if len(level1_data_dict) < 1:
                 continue
@@ -186,19 +287,35 @@
             # (浠g爜,鐜颁环,娑ㄥ箙,閲�,鏃堕棿)
             list_ = [level1_data_dict[k] for k in level1_data_dict]
             flist = []
+            now_time_int = int(tool.get_now_time_str().replace(":", ""))
+            threshold_rate = constant.L1_MIN_RATE
             for d in list_:
-                if d[2] >= 5:
-                    # 娑ㄥ箙灏忎簬5%鐨勯渶瑕佸垹闄�
+                if d[2] >= threshold_rate or d[0] in fixed_codes:
+                    # 娑ㄥ箙灏忎簬3%鐨勯渶瑕佸垹闄�
                     flist.append(d)
             flist.sort(key=lambda x: x[2], reverse=True)
-            datas = flist[:100]
-            codes = [x[0] for x in datas]
-            print("浠g爜鏁伴噺:", len(datas))
-            __upload_codes_info(pipe_l2, datas)
+            # 灏嗗浐瀹氫唬鐮佺殑鎺掑湪鏈�鍓�
+            for code in fixed_codes:
+                if code in level1_data_dict:
+                    flist.insert(0, level1_data_dict[code])
+            # 姝e紡浜ゆ槗涔嬪墠鍏堝鐞嗘瘮杈冨皯鐨勬暟鎹紝涓嶇劧澶勭悊鏃堕棿涔呴�犳垚鏁版嵁鎷ュ牭
+            MAX_COUNT = 500
+            if now_time_int < int("092600"):
+                MAX_COUNT = 200
+            elif now_time_int < int("092800"):
+                MAX_COUNT = 300
+            elif now_time_int < int("092900"):
+                MAX_COUNT = 400
+            datas = flist[:MAX_COUNT]
+            if len(datas) > 0:
+                logger_l2_codes_subscript.info("寮�濮�#鍗庨懌L1涓婁紶浠g爜锛氭暟閲�-{}", len(datas))
+                __upload_codes_info(queue_l1_w_strategy_r, datas)
         except Exception as e:
             logging.exception(e)
+            logger_debug.exception(e)
         finally:
             time.sleep(3)
+
     # 閲婃斁鎺ュ彛瀵硅薄
     api.Release()
 

--
Gitblit v1.8.0