From ad3cc1a24c4b413bae3069cc6d1c2a1923540ce3 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期五, 25 七月 2025 15:51:48 +0800 Subject: [PATCH] 开盘啦TCP请求/委托数据持久化异步处理 --- third_data/kpl_api.py | 23 +---- api/outside_api_command_callback.py | 2 third_data/kpl_app_util_of_tcp.py | 152 ++++++++++++++++++++++++++++++++++++++ constant.py | 2 servers/huaxin_trade_server.py | 2 trade/huaxin/huaxin_trade_record_manager.py | 30 +++++- 6 files changed, 183 insertions(+), 28 deletions(-) diff --git a/api/outside_api_command_callback.py b/api/outside_api_command_callback.py index 60bdf01..7060f65 100644 --- a/api/outside_api_command_callback.py +++ b/api/outside_api_command_callback.py @@ -1556,7 +1556,7 @@ request_id) return rate = round(rate, 2) - old_rate = LCancelRateManager().get_cancel_rate(0)[0] + old_rate = LCancelRateManager().get_cancel_rate(code)[0] CancelRateHumanSettingManager().set_l_down(code, rate) # L鍚庨噸鏂板泭鎷� # if rate < old_rate: diff --git a/constant.py b/constant.py index 8a2db3e..8e3ca06 100644 --- a/constant.py +++ b/constant.py @@ -134,7 +134,7 @@ L_CANCEL_MIN_WATCH_COUNT = 10 # 鎾ゅ崟姣斾緥 -L_CANCEL_RATE = 0.39 # L鍚� +L_CANCEL_RATE = 0.3 # L鍚� L_CANCEL_RATE_WITH_MUST_BUY = 0.9 # 鍔犵孩L鍚� diff --git a/servers/huaxin_trade_server.py b/servers/huaxin_trade_server.py index 72d109c..c19d537 100644 --- a/servers/huaxin_trade_server.py +++ b/servers/huaxin_trade_server.py @@ -1153,6 +1153,8 @@ gpcode_manager.WantBuyCodesManager() # 鍔犺浇鍘嗗彶K绾挎暟鎹� HistoryKDataManager().load_data() + # 闃熷垪鎸佷箙鍖� + threading.Thread(target=lambda: DelegateRecordManager().run(), daemon=True).start() def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr): diff --git a/third_data/kpl_api.py b/third_data/kpl_api.py index 28b5e54..57e702c 100644 --- a/third_data/kpl_api.py +++ b/third_data/kpl_api.py @@ -79,23 +79,6 @@ return json.dumps({"errcode": 0, "list": fresults, "day": day}) -def getLimitUpInfo(): - list_ = [] - page_size = 20 - MAX_SIZE = 150 - for i in range(0, 10): - result_str = daBanList(DABAN_TYPE_LIMIT_UP, page_size=page_size, index=len(list_)) - result = json.loads(result_str) - temp_list = result["list"] - list_ += temp_list - if len(temp_list) < page_size: - result['list'] = list_ - return json.dumps(result) - elif len(list_) > MAX_SIZE: - return json.dumps(result) - return None - - def getHistoryLimitUpInfo(day): fresults = [] for i in range(0, 100): @@ -357,8 +340,10 @@ # print(r) # result = getCodeJingXuanBlocks("000756", True) # for x in result: - result = getHistoryCodesByPlateOrderByLZCS("801074", "2025-05-16", "1025") - print(result) + results = getLimitUpInfoNew() + results = json.loads(results) + for d in results['list']: + print(len(d), d) # request_new_blocks_codes([("鏈哄櫒浜�", "801159")]) # result = getCodesByPlate("801159") # getHistoryLimitUpInfo("2024-02-19") diff --git a/third_data/kpl_app_util_of_tcp.py b/third_data/kpl_app_util_of_tcp.py new file mode 100644 index 0000000..922f776 --- /dev/null +++ b/third_data/kpl_app_util_of_tcp.py @@ -0,0 +1,152 @@ +""" +TSL-TCP鐗堢殑鎺ュ彛 +""" +import codecs +import logging +import re +import socket +import ssl + + +class DataParseUtil: + @classmethod + def parse_limit_up_data(cls, hex_str, enable_log=False): + limit_up_data_list = [] + results = re.split(r"\sA2\s01\s\w{2}\s01\s0A\s06\s", hex_str) + # 娑ㄥ仠鐨勬棩鏈� + day = None + for i in range(1, len(results)): + try: + result = results[i] + if enable_log: + print(bytes.fromhex(result)) + temp_results = re.finditer(r"\s12\s\w{2}\s", result) + for match in temp_results: + code = bytes.fromhex(result[:match.start()]).decode('utf-8') + result = result[match.end():] + break + # print(bytes.fromhex(result)) + # print(result) + temp_results = re.split(r"\sA2\s06\s[A-Z0-9]{2}\s", result) + contents = [code] + pattern = r'[^\u4e00-\u9fa5a-zA-Z0-9\u3001\.\-]' # 涓枃/瀛楁瘝/鏁板瓧/涓枃椤垮彿 + for j in range(len(temp_results)): + content = temp_results[j] + if j == 0: + # 浠g爜鍚嶇О + code_name = re.split(r"\s1A\s06", content)[0] + code_name = bytes.fromhex(code_name).decode('utf-8') + # 鏇挎崲鎵�鏈夐潪涓嫳鏂囨暟瀛楀瓧绗︿负绌哄瓧绗︿覆 + code_name = re.sub(pattern, '', code_name) + contents.append(code_name) + else: + content_str = bytes.fromhex(content).decode('utf-8', errors='ignore') + content_str = re.sub(pattern, '', content_str) + + if len(contents) >= 20: + # 鏈�澶�20椤规暟鎹� + day = content_str + break + contents.append(content_str) + + # contents 鏍煎紡: [浠g爜,鍚嶇О,鏈煡,鏈煡,鏈煡,鎴愪氦棰�,娑ㄥ仠鏃堕棿,楂樺害, 娑ㄥ仠鍘熷洜, 鎺ㄨ崘鍘熷洜,灏佸崟閲戦, 涓诲姏鍑�棰�, 瀹為檯鎹㈡墜鐧惧垎姣�, 瀹為檯娴侀�氬競鍊�, 鏈�澶у皝鍗曢噾棰�, 鏈�杩戞定鍋滄椂闂�, 娑ㄥ仠鍘熷洜鐨勪唬鐮佷釜鏁�, 娑ㄥ仠鍘熷洜鐨勭紪鐮�, 鏈煡, 鏈煡] + if enable_log: + print(i, len(contents), contents) + limit_up_data_list.append(contents) + except Exception as e: + logging.exception(e) + return limit_up_data_list, day + + +def __recieve_data(ssl_sock): + """ + 鎺ユ敹鏁版嵁璇锋眰 + """ + response = ssl_sock.read(3) + hex_str = codecs.encode(response, 'hex').decode('utf-8') + content_length = int(hex_str[2:6], 16) + total_content = b'' + while content_length > len(total_content): + content = ssl_sock.recv(4096) + total_content += content + return total_content + + +def __base_request(request_body, ssl_sock=None): + """ + + """ + if not ssl_sock: + host, port = "hwsockapp.longhuvip.com", 14000 + sock = socket.create_connection((host, port)) + sock.settimeout(2) + context = ssl.create_default_context() + # 涓嶉獙璇佽瘉涔� + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + ssl_sock = context.wrap_socket(sock, server_hostname=host) + try: + # 娉ㄥ唽浣跨敤 + register_body = "30 00 65 00 02 02 59 10 00 00 0A 24 62 36 39 32 65 35 31 63 2D 31 62 63 34 2D 33 65 38 63 2D 61 30 31 62 2D 36 32 30 61 61 36 32 34 30 65 32 38 10 01 1A 08 35 2E 32 30 2E 30 2E 36 20 01 2A 20 63 65 30 36 34 63 66 38 62 35 34 62 62 66 35 30 63 33 66 36 62 37 36 63 30 38 38 64 64 63 33 36 32 01 30 3A 01 30 40 63" + ssl_sock.sendall(bytes.fromhex(register_body)) + + __recieve_data(ssl_sock) + __recieve_data(ssl_sock) + + except Exception as e: + raise e + ssl_sock.sendall(bytes.fromhex(request_body)) + response = __recieve_data(ssl_sock) + hex_str = codecs.encode(response, 'hex').decode('utf-8').upper() + hex_array = [] + for i in range(0, len(hex_str), 2): + str_ = hex_str[i: i + 2] + hex_array.append(str_) + return " ".join(hex_array), ssl_sock + + +def get_limit_up_list(): + # 鑾峰彇娑ㄥ仠鍒楄〃 + pages = ["400014006F0837080003020837080110061801281E3801", + "400016008B08370800030208370801100618012011282B3801", + "400016008C0837080003020837080110061801202F282B3801", + "400016008D0837080003020837080110061801202F282B3801", + "400016008E0837080003020837080110061801202F282B3801", + "400016008F0837080003020837080110061801202F282B3801" + ] + limit_up_list = [] + ssl_sock = None + day = None + for page in pages: + # print("======================") + result, ssl_sock = __base_request(page, ssl_sock) + fresults, day = DataParseUtil.parse_limit_up_data(result) + limit_up_list.extend(fresults) + if len(fresults) < 30: + break + seen = set() + # 鍘婚噸 + unique_limit_up_list = [x for x in limit_up_list if x[0] not in seen and not seen.add(x[0])] + + unique_limit_up_list.sort(key=lambda e: int(e[6], ), reverse=True) + + # 鏍煎紡鍖栨暟鎹� + format_limit_up_data_list = [] + if unique_limit_up_list: + for d in unique_limit_up_list: + format_data = [0] * 21 + format_data[0], format_data[1], format_data[3], format_data[4], format_data[5] = d[0], d[1], '', int(d[6]), \ + d[8] + format_data[6], format_data[7], format_data[8] = int(d[10]), int(d[14]), int(d[11]) + + format_data[12], format_data[13], format_data[14] = d[9], int(d[13]), round(float(d[12]), 2) + format_data[18], format_data[19], format_data[20] = d[7], d[17], int(d[16]) + + format_limit_up_data_list.append(format_data) + fdata = {"errcode": 0, "list": format_limit_up_data_list, "day": day} + return fdata + + +if __name__ == "__main__": + result = get_limit_up_list() + print(len(result["list"])) diff --git a/trade/huaxin/huaxin_trade_record_manager.py b/trade/huaxin/huaxin_trade_record_manager.py index a48f811..f96e5b2 100644 --- a/trade/huaxin/huaxin_trade_record_manager.py +++ b/trade/huaxin/huaxin_trade_record_manager.py @@ -6,6 +6,7 @@ import copy import datetime import json +import queue from db.redis_manager_delegate import RedisUtils, RedisManager from utils import tool, huaxin_util @@ -19,6 +20,7 @@ __current_delegate_records_dict_cache = {} mysqldb = mysql_data.Mysqldb() __instance = None + __queue = queue.Queue(maxsize=4096) def __new__(cls, *args, **kwargs): if not cls.__instance: @@ -57,12 +59,7 @@ pass @classmethod - def add_one(cls, d): - if huaxin_util.is_can_cancel(str(d["orderStatus"])): - cls.__current_delegate_records_dict_cache[d['orderSysID']] = d - else: - if d['orderSysID'] in cls.__current_delegate_records_dict_cache: - cls.__current_delegate_records_dict_cache.pop(d['orderSysID']) + def __add_one(cls, d): # 鏌ヨ鏄惁鏈夋暟鎹� _id = f"{d['insertDate']}-{d['orderLocalID']}" result = cls.mysqldb.select_one( @@ -114,6 +111,16 @@ f"update hx_trade_delegate_record set {','.join(where_list)} where id='{result[0]}'") @classmethod + def add_one(cls, d): + if huaxin_util.is_can_cancel(str(d["orderStatus"])): + cls.__current_delegate_records_dict_cache[d['orderSysID']] = d + else: + if d['orderSysID'] in cls.__current_delegate_records_dict_cache: + cls.__current_delegate_records_dict_cache.pop(d['orderSysID']) + # pass + cls.__queue.put_nowait(d) + + @classmethod def list_by_day(cls, day, min_update_time, orderStatus=[]): mysqldb = mysql_data.Mysqldb() try: @@ -149,6 +156,15 @@ return fresults, max_update_time finally: pass + + @classmethod + def run(cls): + while True: + try: + data = cls.__queue.get() + cls.__add_one(data) + except: + pass # 鎸佷粨璁板綍 @@ -485,4 +501,4 @@ if __name__ == "__main__": - print(DelegateRecordManager().list_current_delegates("600239")) + pass -- Gitblit v1.8.0