Administrator
2023-09-25 c0bcfe746b97bc126636a658b1f01fc6a51f9f95
trade/huaxin/huaxin_trade_api.py
@@ -1,27 +1,24 @@
"""
交易API
"""
import copy
import json
import logging
import multiprocessing
import random
import threading
import time
from huaxin_client.trade_transform_protocol import TradeRequest
from log_module import async_log_util
from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop, hx_logger_trade_callback, logger_trade, \
    logger_system
from trade.huaxin import huaxin_trade_data_update
from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager
from trade.huaxin.huaxin_trade_order_processor import CancelOrderManager, HuaxinOrderEntity, TradeResultProcessor
from utils import socket_util, huaxin_util, tool
# 外部传入的交易队列
pipe_trade = None
def __run_recv_pipe_trade():
    logger_system.info(f"huaxin_trade_api __run_recv_pipe_trade 线程ID:{tool.get_thread_id()}")
def __run_recv_queue_trade(queue: multiprocessing.Queue):
    # 设置结果
    def __set_response(data_json):
        if 'request_id' not in data_json:
@@ -31,10 +28,12 @@
        # 设置响应内容
        set_response(data_json["request_id"], data_json['data'])
    if pipe_trade is not None:
    logger_system.info(f"huaxin_trade_api __run_recv_pipe_trade 线程ID:{tool.get_thread_id()}")
    if queue is not None:
        while True:
            try:
                val = pipe_trade.recv()
                val = queue.get()
                if val:
                    data_json = json.loads(val)
                    # 处理数据
@@ -43,7 +42,6 @@
                        # 主动触发的响应
                        async_log_util.info(hx_logger_trade_callback,
                                            f"response:request_id-{data_json['request_id']}")
                        # 设置响应内容
                        threading.Thread(target=lambda: __set_response(data_json), daemon=True).start()
                    elif type_ == "trade_callback":
                        try:
@@ -51,11 +49,26 @@
                            data_json = data_json["data"]
                            ctype = data_json["type"]
                            # 记录交易反馈日志
                            async_log_util.info(hx_logger_trade_callback, data_json)
                            # 重新请求委托列表与资金
                            huaxin_trade_data_update.add_delegate_list("来自交易管道")
                            huaxin_trade_data_update.add_deal_list()
                            huaxin_trade_data_update.add_money_list()
                            async_log_util.info(hx_logger_trade_callback, f"{data_json}")
                            if ctype == 0:
                                data = data_json.get("data")
                                # 获取订单状态
                                code = data["securityID"]
                                accountID = data["accountID"]
                                orderStatus = data["orderStatus"]
                                orderRef = data["orderRef"]
                                orderSysID = data["orderSysID"]
                                insertTime = data.get("insertTime")
                                acceptTime = data.get("acceptTime")
                                order = HuaxinOrderEntity(code, orderStatus, orderRef, accountID, orderSysID,
                                                          insertTime=insertTime, acceptTime=acceptTime)
                                TradeResultProcessor.process_order(order)
                                # 订单相关回调
                                # 重新请求委托列表与资金
                                huaxin_trade_data_update.add_delegate_list("来自交易管道")
                                huaxin_trade_data_update.add_deal_list()
                                huaxin_trade_data_update.add_money_list()
                            # print("响应结果:", data_json['data'])
                        finally:
                            pass
@@ -64,13 +77,15 @@
# 设置交易通信队列
def run_pipe_trade(pipe_trade_, queue_strategy_trade_):
    global pipe_trade
    pipe_trade = pipe_trade_
    global queue_strategy_trade
    queue_strategy_trade = queue_strategy_trade_
    t1 = threading.Thread(target=lambda: __run_recv_pipe_trade(), daemon=True)
def run_pipe_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_):
    global queue_strategy_w_trade_r
    queue_strategy_w_trade_r = queue_strategy_w_trade_r_
    t1 = threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: CancelOrderManager().run(cancel_order), daemon=True)
    t1.start()
# 交易通道的错误次数
@@ -251,8 +266,7 @@
                     "request_id": request_id}
        root_data = socket_util.encryp_client_params_sign(root_data)
        start_time = time.time()
        queue_strategy_trade.put_nowait(root_data)
        # pipe_trade.send(json.dumps(root_data).encode("utf-8"))
        queue_strategy_w_trade_r.put_nowait(root_data)
        use_time = int((time.time() - start_time) * 1000)
        if use_time > 10:
            async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}")
@@ -353,12 +367,22 @@
        huaxin_trade_data_update.add_money_list()
def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None):
__canceling_order_dict = {}
def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None,
                 recancel=False):
    if not recancel:
        CancelOrderManager.start_cancel(code, orderRef, orderSysID)
    if not sinfo:
        sinfo = f"cb_{code}_{round(time.time() * 1000)}_{random.randint(0, 10000)}"
    order_action_ref = huaxin_util.create_order_ref()
    if not request_id:
        request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE)
    # 加入撤单记录,用于校验最后的撤单是否成功
    if code not in __canceling_order_dict:
        __canceling_order_dict[code] = set()
    __canceling_order_dict[code].add(json.dumps((orderRef, orderSysID)))
    # 执行2次撤单,防止没有撤到
    for i in range(2):
        request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
@@ -444,5 +468,4 @@
if __name__ == "__main__":
    d = {"id": "123123"}
    print(d.pop("id"))
    pass