From f273791e2337215a2a3bd7e3c46c23c69bcb1c7c Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 23 八月 2023 12:12:16 +0800 Subject: [PATCH] bug修复 --- huaxin_client/l1_subscript_codes_manager.py | 98 ++++++++++++++++++++++++ trade/huaxin/trade_server.py | 12 ++ outside_api_command_manager.py | 6 + huaxin_client/l1_client.py | 48 +++-------- huaxin_client/l2_data_manager.py | 2 test/test.py | 17 +-- 6 files changed, 135 insertions(+), 48 deletions(-) diff --git a/huaxin_client/l1_client.py b/huaxin_client/l1_client.py index ba3b01e..2a62460 100644 --- a/huaxin_client/l1_client.py +++ b/huaxin_client/l1_client.py @@ -5,7 +5,7 @@ import threading import time -from huaxin_client import socket_util +from huaxin_client import socket_util, l1_subscript_codes_manager import xmdapi from huaxin_client import tool from huaxin_client.client_network import SendResponseSkManager @@ -25,38 +25,6 @@ if result_json.get("code") == 0: return True return False - - -def get_level1_codes(): - type_ = "get_level1_codes" - fdata = json.dumps( - {"type": type_, "data": {}}) - msg = fdata.encode("utf-8") - # 鍙戦�佹秷鎭� - for i in range(3): - try: - sk = SendResponseSkManager.create_send_response_sk() - msg = socket_util.load_header(msg) - sk.sendall(msg) - result, header_str = socket_util.recv_data(sk) - # 璇诲彇浠g爜 - result_json = json.loads(result) - if result_json["code"] == 0: - codes = result_json["data"] - codes_sh = [] - codes_sz = [] - for code in codes: - if code.find("00") == 0: - codes_sz.append(code.encode("utf-8")) - else: - codes_sh.append(code.encode("utf-8")) - print("鑾峰彇璁㈤槄鐩爣浠f暟閲忥細", len(codes_sh), len(codes_sz)) - return codes_sh, codes_sz - except ConnectionResetError: - SendResponseSkManager.del_send_response_sk(type_) - except BrokenPipeError: - SendResponseSkManager.del_send_response_sk(type_) - return None, None class MdSpi(xmdapi.CTORATstpXMdSpi): @@ -149,6 +117,9 @@ # pMarketDataField.AskVolume1, pMarketDataField.UpperLimitPrice, pMarketDataField.LowerLimitPrice)) +__latest_subscript_codes = set() + + def __upload_codes_info(pipe_l2, datas): if not tool.is_trade_time(): return @@ -158,6 +129,13 @@ {"type": type_, "data": {"data": datas}}) if pipe_l2 is not None: pipe_l2.send(fdata) + # 璁板綍鏂板鍔犵殑浠g爜 + codes = set([x[0] for x in datas]) + add_codes = codes - __latest_subscript_codes + __latest_subscript_codes.clear() + for c in codes: + __latest_subscript_codes.add(c) + logger_local_huaxin_l1.info(f"鏂板鍔犺闃呯殑浠g爜锛歿add_codes}") def run(pipe_l2): @@ -166,7 +144,7 @@ codes_sz = [] for i in range(15): try: - codes_sh, codes_sz = get_level1_codes() + codes_sh, codes_sz = l1_subscript_codes_manager.get_codes() logger_local_huaxin_l1.info(f"鑾峰彇涓婅瘉锛屾繁璇佷唬鐮佹暟閲忥細sh-{len(codes_sh)} sz-{len(codes_sz)}") break except Exception as e: @@ -204,6 +182,8 @@ if len(level1_data_dict) < 1: continue # 鏍规嵁娑ㄥ箙鎺掑簭 + + # (浠g爜,鐜颁环,娑ㄥ箙,閲�,鏃堕棿) list_ = [level1_data_dict[k] for k in level1_data_dict] flist = [] for d in list_: diff --git a/huaxin_client/l1_subscript_codes_manager.py b/huaxin_client/l1_subscript_codes_manager.py new file mode 100644 index 0000000..fb89b63 --- /dev/null +++ b/huaxin_client/l1_subscript_codes_manager.py @@ -0,0 +1,98 @@ +""" +L1闇�瑕佽闃呯殑浠g爜绠$悊 +""" +import json +import os + +import constant +from huaxin_client import socket_util +from huaxin_client.client_network import SendResponseSkManager + + +# 璇锋眰l1璁㈤槄鐨勭洰鏍囦唬鐮� +def request_l1_subscript_target_codes(): + type_ = "get_level1_codes" + fdata = json.dumps( + {"type": type_, "data": {}}) + msg = fdata.encode("utf-8") + # 鍙戦�佹秷鎭� + for i in range(3): + try: + sk = SendResponseSkManager.create_send_response_sk() + msg = socket_util.load_header(msg) + sk.sendall(msg) + result, header_str = socket_util.recv_data(sk) + # 璇诲彇浠g爜 + result_json = json.loads(result) + if result_json["code"] == 0: + codes = result_json["data"] + codes_sh = [] + codes_sz = [] + for code in codes: + if code.find("00") == 0: + codes_sz.append(code.encode("utf-8")) + else: + codes_sh.append(code.encode("utf-8")) + print("鑾峰彇璁㈤槄鐩爣浠f暟閲忥細", len(codes_sh), len(codes_sz)) + return codes_sh, codes_sz + except ConnectionResetError: + SendResponseSkManager.del_send_response_sk(type_) + except BrokenPipeError: + SendResponseSkManager.del_send_response_sk(type_) + return None, None + + +__DIR_PATH = f"{constant.get_path_prefix()}/codes" +__CODE_SH_PATH = f"{__DIR_PATH}/codes_sh.text" +__CODE_SZ_PATH = f"{__DIR_PATH}/codes_sz.text" + + +# 淇濆瓨鐩爣浠g爜 +def save_codes(codes_sh, codes_sz): + if not os.path.exists(__DIR_PATH): + os.mkdir(__DIR_PATH) + with open(__CODE_SH_PATH, 'w') as f: + for c in codes_sh: + if type(c) == bytes: + f.write(c.encode('utf-8')) + else: + f.write(c) + f.write("\n") + + with open(__CODE_SZ_PATH, 'w') as f: + for c in codes_sz: + if type(c) == bytes: + f.write(c.decode('utf-8')) + else: + f.write(c) + f.write("\n") + + +def get_codes_from_file(): + codes_sh, codes_sz = [], [] + if os.path.exists(__CODE_SH_PATH): + with open(__CODE_SH_PATH, 'r') as f: + line = f.readline() + while line: + if line.strip(): + codes_sh.append(line.strip().encode('utf-8')) + line = f.readline() + if os.path.exists(__CODE_SZ_PATH): + with open(__CODE_SZ_PATH, 'r') as f: + line = f.readline() + while line: + if line.strip(): + codes_sz.append(line.strip().encode('utf-8')) + line = f.readline() + return codes_sh, codes_sz + + +def get_codes(): + codes_sh, codes_sz = get_codes_from_file() + if not codes_sh or not codes_sz: + return request_l1_subscript_target_codes() + return codes_sh, codes_sz + + +if __name__ == '__main__': + pass diff --git a/huaxin_client/l2_data_manager.py b/huaxin_client/l2_data_manager.py index 56355dc..a881b80 100644 --- a/huaxin_client/l2_data_manager.py +++ b/huaxin_client/l2_data_manager.py @@ -60,7 +60,7 @@ tmep_order_detail_queue_dict[code].put( (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], - data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus']), int(time.time()*1000)) + data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], int(time.time()*1000))) # 娣诲姞閫愮瑪鎴愪氦 diff --git a/outside_api_command_manager.py b/outside_api_command_manager.py index eaa9221..29755c1 100644 --- a/outside_api_command_manager.py +++ b/outside_api_command_manager.py @@ -51,6 +51,7 @@ API_TYPE_CODE_ATRRIBUTE = "code_attribute" # 浠g爜灞炴�� API_TYPE_CODE_TRADE_STATE = "code_trade_state" # 浠g爜浜ゆ槗鐘舵�� API_TYPE_GET_ENV = "get_env" # 鑾峰彇鐜淇℃伅 +API_TYPE_SYNC_L1_TARGET_CODES = "sync_l1_subscript_codes" # 鍚屾L1闇�瑕佽闃呯殑浠g爜 class ActionCallback(object): @@ -86,6 +87,9 @@ pass def OnGetEnvInfo(self, client_id, request_id, data): + pass + + def OnSyncL2SubscriptCodes(self, client_id, request_id): pass @@ -181,6 +185,8 @@ cls.action_callback.OnGetCodeTradeState(client_id, request_id, data) elif content_type == API_TYPE_GET_ENV: cls.action_callback.OnGetEnvInfo(client_id, request_id, data) + elif content_type == API_TYPE_GET_ENV: + cls.action_callback.OnSyncL2SubscriptCodes(client_id, request_id) except Exception as e: logging.exception(e) pass diff --git a/test/test.py b/test/test.py index 63e1579..5d8df2b 100644 --- a/test/test.py +++ b/test/test.py @@ -1,9 +1,8 @@ -import logging import queue -import threading import time -from log_module.log import logger_l2_process_time, logger_debug +from huaxin_client import l1_subscript_codes_manager +from log_module.log import logger_debug _dict = {} @@ -14,14 +13,10 @@ start_time = time.time() _queue.put(index) end_time = time.time() - logger_debug.debug(end_time -start_time) + logger_debug.debug(end_time - start_time) if __name__ == "__main__": - for k in range(1): - _dict.clear() - for i in range(0, 1000): - threading.Thread(target=lambda: add(i), daemon=True).start() - time.sleep(2) - - input() + # l1_subscript_codes_manager.save_codes(["600100", "600102"], ["000123", "000146"]) + code_sh,codes_sz =l1_subscript_codes_manager.request_l1_subscript_target_codes() + l1_subscript_codes_manager.save_codes(code_sh,codes_sz) diff --git a/trade/huaxin/trade_server.py b/trade/huaxin/trade_server.py index 7aad862..b0ff212 100644 --- a/trade/huaxin/trade_server.py +++ b/trade/huaxin/trade_server.py @@ -21,6 +21,7 @@ from code_attribute import gpcode_manager from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils +from huaxin_client import l1_subscript_codes_manager from huaxin_client.client_network import SendResponseSkManager from l2 import l2_data_manager_new, l2_log, code_price_manager, l2_data_util, l2_data_manager, transaction_progress from l2.cancel_buy_strategy import HourCancelBigNumComputer, LCancelBigNumComputer, DCancelBigNumComputer @@ -217,8 +218,7 @@ code) logger_l2_trade_buy_queue.info("鑾峰彇鎴愪氦浣嶇疆鎴愬姛锛� code-{} index-{}", code, buy_progress_index) - buy_time = total_datas[buy_progress_index]["val"][ - "time"] + buy_time = total_datas[buy_progress_index]["val"]["time"] limit_up_price = gpcode_manager.get_limit_up_price(code) if buy_exec_index: need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code, @@ -680,6 +680,14 @@ except Exception as e: self.send_response({"code": 1, "msg": str(e)}, client_id, request_id) + # 鍚屾L2璁㈤槄浠g爜 + def OnSyncL2SubscriptCodes(self, client_id, request_id): + codes_sh, codes_sz = l1_subscript_codes_manager.request_l1_subscript_target_codes() + if codes_sh and codes_sz: + l1_subscript_codes_manager.save_codes(codes_sh, codes_sz) + result = {"code": 0, "data": {"codes_sh": len(codes_sh), "codes_sz": len(codes_sz)}} + self.send_response(result, client_id, request_id) + def run(pipe_trade, pipe_l1): # 鎵ц涓�浜涘垵濮嬪寲鏁版嵁 -- Gitblit v1.8.0