Administrator
2023-08-15 457f3d4ca0c33ea41055de2e45c4218e3f541d02
++++++++++++++++
bug修复-
5个文件已修改
67 ■■■■ 已修改文件
huaxin_api/l1_client.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_api/l2_client.py 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_api_server.py 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_api/l1_client.py
@@ -58,7 +58,12 @@
class MdSpi(xmdapi.CTORATstpXMdSpi):
    def __init__(self, api):
        self.codes_sh, self.codes_sz = get_level1_codes()
        for i in range(3):
            try:
                self.codes_sh, self.codes_sz = get_level1_codes()
                break
            except:
                time.sleep(2)
        xmdapi.CTORATstpXMdSpi.__init__(self)
        self.__api = api
@@ -150,7 +155,6 @@
        {"type": type_, "data": {"data": datas}})
    if pipe_l2 is not None:
        pipe_l2.send(fdata)
def run(pipe_l2):
huaxin_api/l2_client.py
@@ -506,7 +506,7 @@
    api.Init()
def __receive_from_pipe(pipe):
def __receive_from_pipe_trade(pipe):
    while True:
        try:
            value = pipe.recv()
@@ -517,17 +517,31 @@
                    volume = data["data"]["volume"]
                    code = data["data"]["code"]
                    spi.set_code_special_watch_volume(code, volume)
                elif data["type"] == "set_l2_codes":
        except:
            pass
def __receive_from_pipe_strategy(pipe):
    while True:
        try:
            value = pipe.recv()
            if value:
                value = value.decode("utf-8")
                data = json.loads(value)
                if data["type"] == "set_l2_codes":
                    data = data["data"]
                    l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)
        except:
            pass
def run(pipe_trade=None):
def run(pipe_trade=None, pipe_strategy=None):
    __init_l2()
    if pipe_trade is not None:
        t1 = threading.Thread(target=lambda: __receive_from_pipe(pipe_trade), daemon=True)
        t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(pipe_trade), daemon=True)
        t1.start()
    if pipe_strategy is not None:
        t1 = threading.Thread(target=lambda: __receive_from_pipe_strategy(pipe_strategy), daemon=True)
        t1.start()
    l2_data_manager.run_upload_common()
    global l2CommandManager
main.py
@@ -19,7 +19,7 @@
# from huaxin_api import trade_client, l2_client, l1_client
def createTradeServer(pipe_server, pipe_trade, pipe_l1):
def createTradeServer(pipe_server, pipe_trade, pipe_l1, pipe_l2):
    # 初始化参数
    global_data_loader.init()
@@ -28,7 +28,7 @@
    t1.start()
    # 交易接口服务
    t1 = threading.Thread(target=trade_api_server.run, args=(pipe_server,), daemon=True)
    t1 = threading.Thread(target=trade_api_server.run, args=(pipe_server, pipe_l2), daemon=True)
    t1.start()
    # redis后台服务
@@ -61,21 +61,22 @@
    pst_trade, pst_strategy = multiprocessing.Pipe()
    # 交易与l2之间的通信
    ptl2_trade, ptl2_l2 = multiprocessing.Pipe()
    # l1与trade间的通信
    pl1t_l1, pl1t_trade = multiprocessing.Pipe()
    # 策略与l2之间的通信
    psl2_strategy, psl2_l2 = multiprocessing.Pipe()
    # l1与策略间的通信
    pl1t_l1, pl1t_strategy = multiprocessing.Pipe()
    serverProcess = multiprocessing.Process(target=createServer, args=(pss_server,))
    serverProcess.start()
    tradeServerProcess = multiprocessing.Process(target=createTradeServer, args=(pss_strategy, pst_strategy, pl1t_trade,))
    tradeServerProcess = multiprocessing.Process(target=createTradeServer,
                                                 args=(pss_strategy, pst_strategy, pl1t_strategy, psl2_strategy))
    tradeServerProcess.start()
    # 等待服务器启动
    time.sleep(10)
    # 交易进程与L2进程
    tradeProcess = multiprocessing.Process(target=huaxin_api.trade_client.run, args=(ptl2_trade, pst_trade))
    l2Process = multiprocessing.Process(target=huaxin_api.l2_client.run, args=(ptl2_l2,))
    l2Process = multiprocessing.Process(target=huaxin_api.l2_client.run, args=(ptl2_l2, psl2_l2,))
    tradeProcess.start()
    l2Process.start()
    # L1订阅数据
trade/huaxin/huaxin_trade_api.py
@@ -198,7 +198,7 @@
                     "data": data,
                     "request_id": request_id}
        root_data = socket_util.encryp_client_params_sign(root_data)
        pipe_trade.send(json.dumps(root_data).encode(encoding='utf-8'))
        pipe_trade.send(json.dumps(root_data))
        hx_logger_trade_loop.info("请求发送成功:request_id-{}", request_id)
    except BrokenPipeError as e:
        hx_logger_trade_loop.info("请求发送异常:request_id-{} error-{}", request_id, str(e))
trade/huaxin/trade_api_server.py
@@ -22,6 +22,7 @@
from trade.huaxin import huaxin_trade_api, huaxin_trade_record_manager, \
    huaxin_trade_data_update
from trade.huaxin.huaxin_trade_api import ClientSocketManager
from trade.trade_manager import TradeTargetCodeModeManager
from utils import socket_util, tool, huaxin_util, data_export_util
@@ -433,8 +434,12 @@
                    codes = [d[0] for d in datas]
                    for code in codes:
                        block_info.init_code(code)
                    result = huaxin_trade_api.set_l2_codes_data(datas)
                    print("设置L2代码结果:", result)
                    root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                 "data": datas,
                                 "request_id": f"{ ClientSocketManager.CLIENT_TYPE_CMD_L2}_{round(time.time()*1000)}"}
                    root_data = socket_util.encryp_client_params_sign(root_data)
                    pipe_l2.send(json.dumps(root_data))
                    print("设置L2代码结束")
        except Exception as e:
            logging.exception(e)
        finally:
@@ -486,7 +491,10 @@
            time.sleep(1)
def run(pipe):
def run(pipe_server, pipe_l2_):
    global pipe_l2
    pipe_l2 = pipe_l2_
    print("create TradeApiServer")
    # 拉取交易信息
    huaxin_trade_data_update.run()
@@ -494,7 +502,7 @@
    t1 = threading.Thread(target=lambda: __set_target_codes(), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: __read_sync_task(pipe), daemon=True)
    t1 = threading.Thread(target=lambda: __read_sync_task(pipe_server), daemon=True)
    t1.start()
    laddr = "0.0.0.0", 10009