| | |
| | | """ |
| | | 交易API |
| | | """ |
| | | import copy |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import random |
| | | import threading |
| | | import time |
| | | |
| | | from huaxin_client.trade_transform_protocol import TradeRequest |
| | | from log_module import async_log_util |
| | | from log_module.log import hx_logger_trade_debug, hx_logger_trade_loop, hx_logger_trade_callback, logger_trade, \ |
| | | logger_system |
| | | from trade.huaxin import huaxin_trade_data_update |
| | | from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager |
| | | from trade.huaxin.huaxin_trade_order_processor import CancelOrderManager, HuaxinOrderEntity, TradeResultProcessor |
| | | from utils import socket_util, huaxin_util, tool |
| | | |
| | | # 外部传入的交易队列 |
| | | pipe_trade = None |
| | | |
| | | |
| | | def __run_recv_pipe_trade(): |
| | | logger_system.info(f"huaxin_trade_api __run_recv_pipe_trade 线程ID:{tool.get_thread_id()}") |
| | | |
| | | def __run_recv_queue_trade(queue: multiprocessing.Queue): |
| | | # 设置结果 |
| | | def __set_response(data_json): |
| | | if 'request_id' not in data_json: |
| | |
| | | # 设置响应内容 |
| | | set_response(data_json["request_id"], data_json['data']) |
| | | |
| | | if pipe_trade is not None: |
| | | logger_system.info(f"huaxin_trade_api __run_recv_pipe_trade 线程ID:{tool.get_thread_id()}") |
| | | |
| | | if queue is not None: |
| | | while True: |
| | | try: |
| | | val = pipe_trade.recv() |
| | | val = queue.get() |
| | | if val: |
| | | data_json = json.loads(val) |
| | | # 处理数据 |
| | |
| | | # 主动触发的响应 |
| | | async_log_util.info(hx_logger_trade_callback, |
| | | f"response:request_id-{data_json['request_id']}") |
| | | # 设置响应内容 |
| | | threading.Thread(target=lambda: __set_response(data_json), daemon=True).start() |
| | | elif type_ == "trade_callback": |
| | | try: |
| | |
| | | data_json = data_json["data"] |
| | | ctype = data_json["type"] |
| | | # 记录交易反馈日志 |
| | | async_log_util.info(hx_logger_trade_callback, data_json) |
| | | # 重新请求委托列表与资金 |
| | | huaxin_trade_data_update.add_delegate_list("来自交易管道") |
| | | huaxin_trade_data_update.add_deal_list() |
| | | huaxin_trade_data_update.add_money_list() |
| | | async_log_util.info(hx_logger_trade_callback, f"{data_json}") |
| | | if ctype == 0: |
| | | data = data_json.get("data") |
| | | # 获取订单状态 |
| | | code = data["securityID"] |
| | | accountID = data["accountID"] |
| | | orderStatus = data["orderStatus"] |
| | | orderRef = data["orderRef"] |
| | | orderSysID = data["orderSysID"] |
| | | insertTime = data.get("insertTime") |
| | | acceptTime = data.get("acceptTime") |
| | | |
| | | order = HuaxinOrderEntity(code, orderStatus, orderRef, accountID, orderSysID, |
| | | insertTime=insertTime, acceptTime=acceptTime) |
| | | TradeResultProcessor.process_order(order) |
| | | # 订单相关回调 |
| | | # 重新请求委托列表与资金 |
| | | huaxin_trade_data_update.add_delegate_list("来自交易管道") |
| | | huaxin_trade_data_update.add_deal_list() |
| | | huaxin_trade_data_update.add_money_list() |
| | | # print("响应结果:", data_json['data']) |
| | | finally: |
| | | pass |
| | |
| | | |
| | | |
| | | # 设置交易通信队列 |
| | | def run_pipe_trade(pipe_trade_, queue_strategy_trade_): |
| | | global pipe_trade |
| | | pipe_trade = pipe_trade_ |
| | | global queue_strategy_trade |
| | | queue_strategy_trade = queue_strategy_trade_ |
| | | t1 = threading.Thread(target=lambda: __run_recv_pipe_trade(), daemon=True) |
| | | def run_pipe_trade(queue_strategy_r_trade_w_, queue_strategy_w_trade_r_): |
| | | global queue_strategy_w_trade_r |
| | | queue_strategy_w_trade_r = queue_strategy_w_trade_r_ |
| | | t1 = threading.Thread(target=lambda: __run_recv_queue_trade(queue_strategy_r_trade_w_), daemon=True) |
| | | t1.start() |
| | | t1 = threading.Thread(target=lambda: CancelOrderManager().run(cancel_order), daemon=True) |
| | | t1.start() |
| | | |
| | | |
| | | |
| | | |
| | | # 交易通道的错误次数 |
| | |
| | | "request_id": request_id} |
| | | root_data = socket_util.encryp_client_params_sign(root_data) |
| | | start_time = time.time() |
| | | queue_strategy_trade.put_nowait(root_data) |
| | | # pipe_trade.send(json.dumps(root_data).encode("utf-8")) |
| | | queue_strategy_w_trade_r.put_nowait(root_data) |
| | | use_time = int((time.time() - start_time) * 1000) |
| | | if use_time > 10: |
| | | async_log_util.info(hx_logger_trade_loop, f"发送耗时:request_id-{request_id} 耗时时间:{use_time}") |
| | |
| | | huaxin_trade_data_update.add_money_list() |
| | | |
| | | |
| | | def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None): |
| | | __canceling_order_dict = {} |
| | | |
| | | |
| | | def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None, |
| | | recancel=False): |
| | | if not recancel: |
| | | CancelOrderManager.start_cancel(code, orderRef, orderSysID) |
| | | if not sinfo: |
| | | sinfo = f"cb_{code}_{round(time.time() * 1000)}_{random.randint(0, 10000)}" |
| | | order_action_ref = huaxin_util.create_order_ref() |
| | | if not request_id: |
| | | request_id = __get_request_id(ClientSocketManager.CLIENT_TYPE_TRADE) |
| | | # 加入撤单记录,用于校验最后的撤单是否成功 |
| | | if code not in __canceling_order_dict: |
| | | __canceling_order_dict[code] = set() |
| | | __canceling_order_dict[code].add(json.dumps((orderRef, orderSysID))) |
| | | # 执行2次撤单,防止没有撤到 |
| | | for i in range(2): |
| | | request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE, |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | d = {"id": "123123"} |
| | | print(d.pop("id")) |
| | | pass |