From ad3cc1a24c4b413bae3069cc6d1c2a1923540ce3 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 25 七月 2025 15:51:48 +0800
Subject: [PATCH] 开盘啦TCP请求/委托数据持久化异步处理

---
 third_data/kpl_api.py                       |   23 +----
 api/outside_api_command_callback.py         |    2 
 third_data/kpl_app_util_of_tcp.py           |  152 ++++++++++++++++++++++++++++++++++++++
 constant.py                                 |    2 
 servers/huaxin_trade_server.py              |    2 
 trade/huaxin/huaxin_trade_record_manager.py |   30 +++++-
 6 files changed, 183 insertions(+), 28 deletions(-)

diff --git a/api/outside_api_command_callback.py b/api/outside_api_command_callback.py
index 60bdf01..7060f65 100644
--- a/api/outside_api_command_callback.py
+++ b/api/outside_api_command_callback.py
@@ -1556,7 +1556,7 @@
                                        request_id)
                     return
                 rate = round(rate, 2)
-                old_rate = LCancelRateManager().get_cancel_rate(0)[0]
+                old_rate = LCancelRateManager().get_cancel_rate(code)[0]
                 CancelRateHumanSettingManager().set_l_down(code, rate)
                 # L鍚庨噸鏂板泭鎷�
                 # if rate < old_rate:
diff --git a/constant.py b/constant.py
index 8a2db3e..8e3ca06 100644
--- a/constant.py
+++ b/constant.py
@@ -134,7 +134,7 @@
 
 L_CANCEL_MIN_WATCH_COUNT = 10
 # 鎾ゅ崟姣斾緥
-L_CANCEL_RATE = 0.39  # L鍚�
+L_CANCEL_RATE = 0.3  # L鍚�
 
 L_CANCEL_RATE_WITH_MUST_BUY = 0.9  # 鍔犵孩L鍚�
 
diff --git a/servers/huaxin_trade_server.py b/servers/huaxin_trade_server.py
index 72d109c..c19d537 100644
--- a/servers/huaxin_trade_server.py
+++ b/servers/huaxin_trade_server.py
@@ -1153,6 +1153,8 @@
     gpcode_manager.WantBuyCodesManager()
     # 鍔犺浇鍘嗗彶K绾挎暟鎹�
     HistoryKDataManager().load_data()
+    # 闃熷垪鎸佷箙鍖�
+    threading.Thread(target=lambda: DelegateRecordManager().run(), daemon=True).start()
 
 
 def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr):
diff --git a/third_data/kpl_api.py b/third_data/kpl_api.py
index 28b5e54..57e702c 100644
--- a/third_data/kpl_api.py
+++ b/third_data/kpl_api.py
@@ -79,23 +79,6 @@
     return json.dumps({"errcode": 0, "list": fresults, "day": day})
 
 
-def getLimitUpInfo():
-    list_ = []
-    page_size = 20
-    MAX_SIZE = 150
-    for i in range(0, 10):
-        result_str = daBanList(DABAN_TYPE_LIMIT_UP, page_size=page_size, index=len(list_))
-        result = json.loads(result_str)
-        temp_list = result["list"]
-        list_ += temp_list
-        if len(temp_list) < page_size:
-            result['list'] = list_
-            return json.dumps(result)
-        elif len(list_) > MAX_SIZE:
-            return json.dumps(result)
-    return None
-
-
 def getHistoryLimitUpInfo(day):
     fresults = []
     for i in range(0, 100):
@@ -357,8 +340,10 @@
     #     print(r)
     # result = getCodeJingXuanBlocks("000756", True)
     # for x in result:
-    result = getHistoryCodesByPlateOrderByLZCS("801074", "2025-05-16", "1025")
-    print(result)
+    results = getLimitUpInfoNew()
+    results = json.loads(results)
+    for d in results['list']:
+        print(len(d), d)
 
     # request_new_blocks_codes([("鏈哄櫒浜�", "801159")])
     # result = getCodesByPlate("801159")  # getHistoryLimitUpInfo("2024-02-19")
diff --git a/third_data/kpl_app_util_of_tcp.py b/third_data/kpl_app_util_of_tcp.py
new file mode 100644
index 0000000..922f776
--- /dev/null
+++ b/third_data/kpl_app_util_of_tcp.py
@@ -0,0 +1,152 @@
+"""
+TSL-TCP鐗堢殑鎺ュ彛
+"""
+import codecs
+import logging
+import re
+import socket
+import ssl
+
+
+class DataParseUtil:
+    @classmethod
+    def parse_limit_up_data(cls, hex_str, enable_log=False):
+        limit_up_data_list = []
+        results = re.split(r"\sA2\s01\s\w{2}\s01\s0A\s06\s", hex_str)
+        # 娑ㄥ仠鐨勬棩鏈�
+        day = None
+        for i in range(1, len(results)):
+            try:
+                result = results[i]
+                if enable_log:
+                    print(bytes.fromhex(result))
+                temp_results = re.finditer(r"\s12\s\w{2}\s", result)
+                for match in temp_results:
+                    code = bytes.fromhex(result[:match.start()]).decode('utf-8')
+                    result = result[match.end():]
+                    break
+                # print(bytes.fromhex(result))
+                # print(result)
+                temp_results = re.split(r"\sA2\s06\s[A-Z0-9]{2}\s", result)
+                contents = [code]
+                pattern = r'[^\u4e00-\u9fa5a-zA-Z0-9\u3001\.\-]'  # 涓枃/瀛楁瘝/鏁板瓧/涓枃椤垮彿
+                for j in range(len(temp_results)):
+                    content = temp_results[j]
+                    if j == 0:
+                        # 浠g爜鍚嶇О
+                        code_name = re.split(r"\s1A\s06", content)[0]
+                        code_name = bytes.fromhex(code_name).decode('utf-8')
+                        # 鏇挎崲鎵�鏈夐潪涓嫳鏂囨暟瀛楀瓧绗︿负绌哄瓧绗︿覆
+                        code_name = re.sub(pattern, '', code_name)
+                        contents.append(code_name)
+                    else:
+                        content_str = bytes.fromhex(content).decode('utf-8', errors='ignore')
+                        content_str = re.sub(pattern, '', content_str)
+
+                        if len(contents) >= 20:
+                            # 鏈�澶�20椤规暟鎹�
+                            day = content_str
+                            break
+                        contents.append(content_str)
+
+                # contents 鏍煎紡: [浠g爜,鍚嶇О,鏈煡,鏈煡,鏈煡,鎴愪氦棰�,娑ㄥ仠鏃堕棿,楂樺害, 娑ㄥ仠鍘熷洜, 鎺ㄨ崘鍘熷洜,灏佸崟閲戦, 涓诲姏鍑�棰�, 瀹為檯鎹㈡墜鐧惧垎姣�, 瀹為檯娴侀�氬競鍊�, 鏈�澶у皝鍗曢噾棰�, 鏈�杩戞定鍋滄椂闂�, 娑ㄥ仠鍘熷洜鐨勪唬鐮佷釜鏁�, 娑ㄥ仠鍘熷洜鐨勭紪鐮�, 鏈煡, 鏈煡]
+                if enable_log:
+                    print(i, len(contents), contents)
+                limit_up_data_list.append(contents)
+            except Exception as e:
+                logging.exception(e)
+        return limit_up_data_list, day
+
+
+def __recieve_data(ssl_sock):
+    """
+    鎺ユ敹鏁版嵁璇锋眰
+    """
+    response = ssl_sock.read(3)
+    hex_str = codecs.encode(response, 'hex').decode('utf-8')
+    content_length = int(hex_str[2:6], 16)
+    total_content = b''
+    while content_length > len(total_content):
+        content = ssl_sock.recv(4096)
+        total_content += content
+    return total_content
+
+
+def __base_request(request_body, ssl_sock=None):
+    """
+
+    """
+    if not ssl_sock:
+        host, port = "hwsockapp.longhuvip.com", 14000
+        sock = socket.create_connection((host, port))
+        sock.settimeout(2)
+        context = ssl.create_default_context()
+        # 涓嶉獙璇佽瘉涔�
+        context.check_hostname = False
+        context.verify_mode = ssl.CERT_NONE
+        ssl_sock = context.wrap_socket(sock, server_hostname=host)
+        try:
+            # 娉ㄥ唽浣跨敤
+            register_body = "30 00 65 00 02 02 59 10 00 00 0A 24 62 36 39 32 65 35 31 63 2D 31 62 63 34 2D 33 65 38 63 2D 61 30 31 62 2D 36 32 30 61 61 36 32 34 30 65 32 38 10 01 1A 08 35 2E 32 30 2E 30 2E 36 20 01 2A 20 63 65 30 36 34 63 66 38 62 35 34 62 62 66 35 30 63 33 66 36 62 37 36 63 30 38 38 64 64 63 33 36 32 01 30 3A 01 30 40 63"
+            ssl_sock.sendall(bytes.fromhex(register_body))
+
+            __recieve_data(ssl_sock)
+            __recieve_data(ssl_sock)
+
+        except Exception as e:
+            raise e
+    ssl_sock.sendall(bytes.fromhex(request_body))
+    response = __recieve_data(ssl_sock)
+    hex_str = codecs.encode(response, 'hex').decode('utf-8').upper()
+    hex_array = []
+    for i in range(0, len(hex_str), 2):
+        str_ = hex_str[i: i + 2]
+        hex_array.append(str_)
+    return " ".join(hex_array), ssl_sock
+
+
+def get_limit_up_list():
+    # 鑾峰彇娑ㄥ仠鍒楄〃
+    pages = ["400014006F0837080003020837080110061801281E3801",
+             "400016008B08370800030208370801100618012011282B3801",
+             "400016008C0837080003020837080110061801202F282B3801",
+             "400016008D0837080003020837080110061801202F282B3801",
+             "400016008E0837080003020837080110061801202F282B3801",
+             "400016008F0837080003020837080110061801202F282B3801"
+             ]
+    limit_up_list = []
+    ssl_sock = None
+    day = None
+    for page in pages:
+        # print("======================")
+        result, ssl_sock = __base_request(page, ssl_sock)
+        fresults, day = DataParseUtil.parse_limit_up_data(result)
+        limit_up_list.extend(fresults)
+        if len(fresults) < 30:
+            break
+    seen = set()
+    # 鍘婚噸
+    unique_limit_up_list = [x for x in limit_up_list if x[0] not in seen and not seen.add(x[0])]
+
+    unique_limit_up_list.sort(key=lambda e: int(e[6], ), reverse=True)
+
+    # 鏍煎紡鍖栨暟鎹�
+    format_limit_up_data_list = []
+    if unique_limit_up_list:
+        for d in unique_limit_up_list:
+            format_data = [0] * 21
+            format_data[0], format_data[1], format_data[3], format_data[4], format_data[5] = d[0], d[1], '', int(d[6]), \
+                                                                                             d[8]
+            format_data[6], format_data[7], format_data[8] = int(d[10]), int(d[14]), int(d[11])
+
+            format_data[12], format_data[13], format_data[14] = d[9], int(d[13]), round(float(d[12]), 2)
+            format_data[18], format_data[19], format_data[20] = d[7], d[17], int(d[16])
+
+            format_limit_up_data_list.append(format_data)
+    fdata = {"errcode": 0, "list": format_limit_up_data_list, "day": day}
+    return fdata
+
+
+if __name__ == "__main__":
+    result = get_limit_up_list()
+    print(len(result["list"]))
diff --git a/trade/huaxin/huaxin_trade_record_manager.py b/trade/huaxin/huaxin_trade_record_manager.py
index a48f811..f96e5b2 100644
--- a/trade/huaxin/huaxin_trade_record_manager.py
+++ b/trade/huaxin/huaxin_trade_record_manager.py
@@ -6,6 +6,7 @@
 import copy
 import datetime
 import json
+import queue
 
 from db.redis_manager_delegate import RedisUtils, RedisManager
 from utils import tool, huaxin_util
@@ -19,6 +20,7 @@
     __current_delegate_records_dict_cache = {}
     mysqldb = mysql_data.Mysqldb()
     __instance = None
+    __queue = queue.Queue(maxsize=4096)
 
     def __new__(cls, *args, **kwargs):
         if not cls.__instance:
@@ -57,12 +59,7 @@
             pass
 
     @classmethod
-    def add_one(cls, d):
-        if huaxin_util.is_can_cancel(str(d["orderStatus"])):
-            cls.__current_delegate_records_dict_cache[d['orderSysID']] = d
-        else:
-            if d['orderSysID'] in cls.__current_delegate_records_dict_cache:
-                cls.__current_delegate_records_dict_cache.pop(d['orderSysID'])
+    def __add_one(cls, d):
         # 鏌ヨ鏄惁鏈夋暟鎹�
         _id = f"{d['insertDate']}-{d['orderLocalID']}"
         result = cls.mysqldb.select_one(
@@ -114,6 +111,16 @@
                     f"update hx_trade_delegate_record set {','.join(where_list)} where id='{result[0]}'")
 
     @classmethod
+    def add_one(cls, d):
+        if huaxin_util.is_can_cancel(str(d["orderStatus"])):
+            cls.__current_delegate_records_dict_cache[d['orderSysID']] = d
+        else:
+            if d['orderSysID'] in cls.__current_delegate_records_dict_cache:
+                cls.__current_delegate_records_dict_cache.pop(d['orderSysID'])
+        # pass
+        cls.__queue.put_nowait(d)
+
+    @classmethod
     def list_by_day(cls, day, min_update_time, orderStatus=[]):
         mysqldb = mysql_data.Mysqldb()
         try:
@@ -149,6 +156,15 @@
             return fresults, max_update_time
         finally:
             pass
+
+    @classmethod
+    def run(cls):
+        while True:
+            try:
+                data = cls.__queue.get()
+                cls.__add_one(data)
+            except:
+                pass
 
 
 # 鎸佷粨璁板綍
@@ -485,4 +501,4 @@
 
 
 if __name__ == "__main__":
-    print(DelegateRecordManager().list_current_delegates("600239"))
+    pass

--
Gitblit v1.8.0