"""
|
华鑫交易结果处理器
|
"""
|
import copy
|
import json
|
import time
|
|
from log_module import async_log_util
|
from log_module.log import logger_trade, hx_logger_trade_debug
|
from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager
|
from utils import huaxin_util
|
import concurrent.futures
|
|
|
class HuaxinOrderEntity:
|
def __init__(self, code, orderStatus, orderRef, accountID, orderSysID, direction=None, insertDate=None,
|
insertTime=None,
|
acceptTime=None):
|
self.code = code
|
self.orderStatus = orderStatus
|
self.orderRef = orderRef
|
self.accountID = accountID
|
self.orderSysID = orderSysID
|
self.insertTime = insertTime
|
self.insertDate = insertDate
|
self.acceptTime = acceptTime
|
self.direction = direction
|
|
|
class CancelOrderManager:
|
__canceling_order_dict = {}
|
__recancel_order_count = {}
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(CancelOrderManager, cls).__new__(cls, *args, **kwargs)
|
return cls.__instance
|
|
# 开始撤单
|
def start_cancel(self, code, order_ref, order_sys_id):
|
if code not in self.__canceling_order_dict:
|
self.__canceling_order_dict[code] = set()
|
self.__canceling_order_dict[code].add(json.dumps((order_ref, order_sys_id, int(time.time() * 1000))))
|
|
def __cancel_finish(self, code, order_ref, order_sys_id):
|
if code not in self.__canceling_order_dict:
|
return
|
if not self.__canceling_order_dict[code]:
|
return
|
infos = copy.deepcopy(self.__canceling_order_dict[code])
|
for info in infos:
|
_info = json.loads(info)
|
if _info[0] == order_ref or _info[1] == order_sys_id:
|
# 匹配到目标数据
|
self.__canceling_order_dict[code].discard(info)
|
|
def __add_recancel_count(self, code, order_ref, order_sys_id):
|
key = f"{code}_{order_ref}_{order_sys_id}"
|
if key not in self.__recancel_order_count:
|
self.__recancel_order_count[key] = 0
|
self.__recancel_order_count[key] += 1
|
|
def __can_recancel(self, code, order_ref, order_sys_id):
|
key = f"{code}_{order_ref}_{order_sys_id}"
|
if key not in self.__recancel_order_count:
|
return True
|
return self.__recancel_order_count[key] < 2
|
|
# 撤单成功
|
def cancel_success(self, code, order_ref, order_sys_id):
|
self.__cancel_finish(code, order_ref, order_sys_id)
|
|
# 买入成功
|
def buy_success(self, code, order_ref, order_sys_id):
|
self.__cancel_finish(code, order_ref, order_sys_id)
|
|
# 传入重新下单
|
def run(self, re_cancel_method):
|
while True:
|
try:
|
if self.__canceling_order_dict:
|
for code in self.__canceling_order_dict:
|
infos = self.__canceling_order_dict[code]
|
infos = copy.deepcopy(infos)
|
for info in infos:
|
_info = json.loads(info)
|
timestamp = _info[2]
|
# 查询是否还能重新撤单
|
if not self.__can_recancel(code, _info[0], _info[1]):
|
self.__canceling_order_dict[code].discard(info)
|
continue
|
|
if time.time() * 1000 - timestamp > 100:
|
async_log_util.info(logger_trade, f"{code}触发重新撤单:{info}")
|
# 100ms后才进行
|
self.__add_recancel_count(code, _info[0], _info[1])
|
re_cancel_method(1, code, _info[1], orderRef=_info[0], recancel=True)
|
time.sleep(0.05)
|
except:
|
pass
|
|
|
class TradeResultProcessor:
|
__TradeOrderIdManager = TradeOrderIdManager()
|
__processed_keys = set()
|
__thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=50)
|
# 订单索引号字典
|
order_ref_dict = {}
|
|
# 处理买单
|
@classmethod
|
def process_buy_order(cls, order: HuaxinOrderEntity):
|
"""
|
处理买单
|
@param order:
|
@return: 是否需要撤单
|
"""
|
|
# 处理下单成功
|
def process_order_success(order_: HuaxinOrderEntity, delay_s=0.0):
|
if delay_s > 0:
|
time.sleep(delay_s)
|
# TODO 处理下单成功
|
|
# 只处理买入单
|
if order.direction != str(huaxin_util.TORA_TSTP_D_Buy):
|
return False
|
|
# 同一订单号只有状态变化了才需要处理
|
key = f"{order.insertDate}_{order.code}_{order.orderSysID}_{order.orderStatus}"
|
if key in cls.__processed_keys:
|
return False
|
try:
|
async_log_util.info(hx_logger_trade_debug, f"处理华鑫订单:{key}")
|
cls.__processed_keys.add(key)
|
if huaxin_util.is_can_cancel(order.orderStatus):
|
# 设置下单成功
|
process_order_success(order)
|
elif huaxin_util.is_canceled(order.orderStatus) or huaxin_util.is_deal(
|
order.orderStatus):
|
# 已经撤单/已经成交,需要处理临时保存的系统订单号
|
cls.__TradeOrderIdManager.remove_order_id(order.code,
|
order.accountID,
|
order.orderSysID)
|
if huaxin_util.is_deal(order.orderStatus):
|
# TODO 处理成交
|
pass
|
elif huaxin_util.is_canceled(order.orderStatus):
|
CancelOrderManager().cancel_success(order.code, order.orderRef, order.orderSysID)
|
except Exception as e:
|
async_log_util.exception(hx_logger_trade_debug, e)
|
return False
|
|
@classmethod
|
def get_huaxin_order_by_order_ref(cls, order_ref) -> HuaxinOrderEntity:
|
return cls.order_ref_dict.get(order_ref)
|
|
@classmethod
|
def get_huaxin_sell_order_by_code(cls, code):
|
results = []
|
for k in cls.order_ref_dict:
|
entity = cls.order_ref_dict[k]
|
if entity.code == code and entity.direction == huaxin_util.TORA_TSTP_D_Sell:
|
results.append(entity)
|
return results
|
|
@classmethod
|
def order_success(cls, order: HuaxinOrderEntity):
|
async_log_util.info(hx_logger_trade_debug, f"处理华鑫订单下单成功:{order.code}, {order.orderRef}, {order.orderSysID}")
|
# 加入系统订单号
|
cls.__TradeOrderIdManager.add_order_id(order.code, order.accountID, order.orderSysID)
|
# 删除临时订单号
|
cls.__TradeOrderIdManager.remove_order_ref(order.code, order.orderRef)
|
return None
|
|
@classmethod
|
def cancel_order_success(cls, code, accountId, orderSysID):
|
cls.__TradeOrderIdManager.remove_order_id(code, accountId, orderSysID)
|
|
|
if __name__ == "__main__":
|
CancelOrderManager().start_cancel("000333", 1, "123123")
|
# CancelOrderManager().cancel_success("000333", 1, "123123")
|
# CancelOrderManager().buy_success("000333", 1, "123123")
|