""" 华鑫交易结果处理器 """ import copy import json import time from log_module import async_log_util from log_module.log import logger_trade, hx_logger_trade_debug from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager from utils import huaxin_util import concurrent.futures class HuaxinOrderEntity: def __init__(self, code, orderStatus, orderRef, accountID, orderSysID, direction=None, insertDate=None, insertTime=None, acceptTime=None): self.code = code self.orderStatus = orderStatus self.orderRef = orderRef self.accountID = accountID self.orderSysID = orderSysID self.insertTime = insertTime self.insertDate = insertDate self.acceptTime = acceptTime self.direction = direction class CancelOrderManager: __canceling_order_dict = {} __recancel_order_count = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(CancelOrderManager, cls).__new__(cls, *args, **kwargs) return cls.__instance # 开始撤单 def start_cancel(self, code, order_ref, order_sys_id): if code not in self.__canceling_order_dict: self.__canceling_order_dict[code] = set() self.__canceling_order_dict[code].add(json.dumps((order_ref, order_sys_id, int(time.time() * 1000)))) def __cancel_finish(self, code, order_ref, order_sys_id): if code not in self.__canceling_order_dict: return if not self.__canceling_order_dict[code]: return infos = copy.deepcopy(self.__canceling_order_dict[code]) for info in infos: _info = json.loads(info) if _info[0] == order_ref or _info[1] == order_sys_id: # 匹配到目标数据 self.__canceling_order_dict[code].discard(info) def __add_recancel_count(self, code, order_ref, order_sys_id): key = f"{code}_{order_ref}_{order_sys_id}" if key not in self.__recancel_order_count: self.__recancel_order_count[key] = 0 self.__recancel_order_count[key] += 1 def __can_recancel(self, code, order_ref, order_sys_id): key = f"{code}_{order_ref}_{order_sys_id}" if key not in self.__recancel_order_count: return True return self.__recancel_order_count[key] < 2 # 撤单成功 def cancel_success(self, code, order_ref, order_sys_id): self.__cancel_finish(code, order_ref, order_sys_id) # 买入成功 def buy_success(self, code, order_ref, order_sys_id): self.__cancel_finish(code, order_ref, order_sys_id) # 传入重新下单 def run(self, re_cancel_method): while True: try: if self.__canceling_order_dict: for code in self.__canceling_order_dict: infos = self.__canceling_order_dict[code] infos = copy.deepcopy(infos) for info in infos: _info = json.loads(info) timestamp = _info[2] # 查询是否还能重新撤单 if not self.__can_recancel(code, _info[0], _info[1]): self.__canceling_order_dict[code].discard(info) continue if time.time() * 1000 - timestamp > 100: async_log_util.info(logger_trade, f"{code}触发重新撤单:{info}") # 100ms后才进行 self.__add_recancel_count(code, _info[0], _info[1]) re_cancel_method(1, code, _info[1], orderRef=_info[0], recancel=True) time.sleep(0.05) except: pass class TradeResultProcessor: __TradeOrderIdManager = TradeOrderIdManager() __processed_keys = set() __thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=50) # 订单索引号字典 order_ref_dict = {} # 处理买单 @classmethod def process_buy_order(cls, order: HuaxinOrderEntity): """ 处理买单 @param order: @return: 是否需要撤单 """ # 处理下单成功 def process_order_success(order_: HuaxinOrderEntity, delay_s=0.0): if delay_s > 0: time.sleep(delay_s) # TODO 处理下单成功 # 只处理买入单 if order.direction != str(huaxin_util.TORA_TSTP_D_Buy): return False # 同一订单号只有状态变化了才需要处理 key = f"{order.insertDate}_{order.code}_{order.orderSysID}_{order.orderStatus}" if key in cls.__processed_keys: return False try: async_log_util.info(hx_logger_trade_debug, f"处理华鑫订单:{key}") cls.__processed_keys.add(key) if huaxin_util.is_can_cancel(order.orderStatus): # 设置下单成功 process_order_success(order) elif huaxin_util.is_canceled(order.orderStatus) or huaxin_util.is_deal( order.orderStatus): # 已经撤单/已经成交,需要处理临时保存的系统订单号 cls.__TradeOrderIdManager.remove_order_id(order.code, order.accountID, order.orderSysID) if huaxin_util.is_deal(order.orderStatus): # TODO 处理成交 pass elif huaxin_util.is_canceled(order.orderStatus): CancelOrderManager().cancel_success(order.code, order.orderRef, order.orderSysID) except Exception as e: async_log_util.exception(hx_logger_trade_debug, e) return False @classmethod def get_huaxin_order_by_order_ref(cls, order_ref) -> HuaxinOrderEntity: return cls.order_ref_dict.get(order_ref) @classmethod def get_huaxin_sell_order_by_code(cls, code): results = [] for k in cls.order_ref_dict: entity = cls.order_ref_dict[k] if entity.code == code and entity.direction == huaxin_util.TORA_TSTP_D_Sell: results.append(entity) return results @classmethod def order_success(cls, order: HuaxinOrderEntity): async_log_util.info(hx_logger_trade_debug, f"处理华鑫订单下单成功:{order.code}, {order.orderRef}, {order.orderSysID}") # 加入系统订单号 cls.__TradeOrderIdManager.add_order_id(order.code, order.accountID, order.orderSysID) # 删除临时订单号 cls.__TradeOrderIdManager.remove_order_ref(order.code, order.orderRef) return None @classmethod def cancel_order_success(cls, code, accountId, orderSysID): cls.__TradeOrderIdManager.remove_order_id(code, accountId, orderSysID) if __name__ == "__main__": CancelOrderManager().start_cancel("000333", 1, "123123") # CancelOrderManager().cancel_success("000333", 1, "123123") # CancelOrderManager().buy_success("000333", 1, "123123")