"""
|
华鑫交易结果处理器
|
"""
|
import copy
|
import json
|
import time
|
|
from l2 import cancel_buy_strategy, l2_data_util
|
from l2.huaxin import huaxin_delegate_postion_manager
|
from l2.l2_data_manager import TradePointManager
|
from log_module import async_log_util
|
from log_module.log import logger_trade, hx_logger_trade_debug
|
from trade import trade_manager, trade_record_log_util
|
from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager
|
from utils import huaxin_util, tool
|
import concurrent.futures
|
|
|
class HuaxinOrderEntity:
|
def __init__(self, code, orderStatus, orderRef, accountID, orderSysID, direction=None, insertDate=None,
|
insertTime=None,
|
acceptTime=None, is_shadow_order=False):
|
self.code = code
|
self.orderStatus = orderStatus
|
self.orderRef = orderRef
|
self.accountID = accountID
|
self.orderSysID = orderSysID
|
self.insertTime = insertTime
|
self.insertDate = insertDate
|
self.acceptTime = acceptTime
|
self.direction = direction
|
self.is_shadow_order = is_shadow_order
|
|
|
class CancelOrderManager:
|
__canceling_order_dict = {}
|
__recancel_order_count = {}
|
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(CancelOrderManager, cls).__new__(cls, *args, **kwargs)
|
return cls.__instance
|
|
# 开始撤单
|
def start_cancel(self, code, order_ref, order_sys_id):
|
if code not in self.__canceling_order_dict:
|
self.__canceling_order_dict[code] = set()
|
self.__canceling_order_dict[code].add(json.dumps((order_ref, order_sys_id, int(time.time() * 1000))))
|
|
def __cancel_finish(self, code, order_ref, order_sys_id):
|
if code not in self.__canceling_order_dict:
|
return
|
if not self.__canceling_order_dict[code]:
|
return
|
infos = copy.deepcopy(self.__canceling_order_dict[code])
|
for info in infos:
|
_info = json.loads(info)
|
if _info[0] == order_ref or _info[1] == order_sys_id:
|
# 匹配到目标数据
|
self.__canceling_order_dict[code].discard(info)
|
|
def __add_recancel_count(self, code, order_ref, order_sys_id):
|
key = f"{code}_{order_ref}_{order_sys_id}"
|
if key not in self.__recancel_order_count:
|
self.__recancel_order_count[key] = 0
|
self.__recancel_order_count[key] += 1
|
|
def __can_recancel(self, code, order_ref, order_sys_id):
|
key = f"{code}_{order_ref}_{order_sys_id}"
|
if key not in self.__recancel_order_count:
|
return True
|
return self.__recancel_order_count[key] < 2
|
|
# 撤单成功
|
def cancel_success(self, code, order_ref, order_sys_id):
|
self.__cancel_finish(code, order_ref, order_sys_id)
|
|
# 买入成功
|
def buy_success(self, code, order_ref, order_sys_id):
|
self.__cancel_finish(code, order_ref, order_sys_id)
|
|
# 传入重新下单
|
def run(self, re_cancel_method):
|
while True:
|
try:
|
if self.__canceling_order_dict:
|
for code in self.__canceling_order_dict:
|
infos = self.__canceling_order_dict[code]
|
infos = copy.deepcopy(infos)
|
for info in infos:
|
_info = json.loads(info)
|
timestamp = _info[2]
|
# 查询是否还能重新撤单
|
if not self.__can_recancel(code, _info[0], _info[1]):
|
self.__canceling_order_dict[code].discard(info)
|
continue
|
|
if time.time() * 1000 - timestamp > 100:
|
async_log_util.info(logger_trade, f"{code}触发重新撤单:{info}")
|
# 100ms后才进行
|
self.__add_recancel_count(code, _info[0], _info[1])
|
re_cancel_method(1, code, _info[1], orderRef=_info[0], recancel=True)
|
time.sleep(0.05)
|
except:
|
pass
|
|
|
class TradeResultProcessor:
|
__TradeOrderIdManager = TradeOrderIdManager()
|
__processed_keys = set()
|
__thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=50)
|
# 订单索引号字典
|
order_ref_dict = {}
|
|
# 处理买单
|
@classmethod
|
def process_buy_order(cls, order: HuaxinOrderEntity):
|
"""
|
处理买单
|
@param order:
|
@return: 是否需要撤单
|
"""
|
|
# 处理下单成功
|
def process_order_success(order_: HuaxinOrderEntity, delay_s=0.0):
|
if delay_s > 0:
|
time.sleep(delay_s)
|
new_place_order_index = cls.order_success(order_)
|
if new_place_order_index:
|
order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(
|
order.code)
|
cancel_buy_strategy.set_real_place_position(order.code,
|
new_place_order_index,
|
order_begin_pos.buy_single_index, is_default=True)
|
trade_record_log_util.add_real_place_order_position_log(order.code,
|
new_place_order_index,
|
order_begin_pos.buy_single_index)
|
return new_place_order_index
|
|
# 只处理买入单
|
if order.direction != str(huaxin_util.TORA_TSTP_D_Buy):
|
return False
|
|
# 只处理正式订单,不处理影子订单
|
if order.is_shadow_order:
|
# 9:15之前下的影子单
|
if order.insertTime and int(order.insertTime.replace(":", "")) < int("091500"):
|
# 是委托状态的影子单且是交易所已接受的状态
|
if order.orderStatus == huaxin_util.TORA_TSTP_OST_Accepted:
|
# 需要撤单
|
return True
|
return False
|
|
# 同一订单号只有状态变化了才需要处理
|
key = f"{order.insertDate}_{order.code}_{order.orderSysID}_{order.orderStatus}"
|
if key in cls.__processed_keys:
|
return False
|
try:
|
async_log_util.info(hx_logger_trade_debug, f"处理华鑫订单:{key}")
|
cls.__processed_keys.add(key)
|
if huaxin_util.is_can_cancel(order.orderStatus):
|
# 设置下单成功
|
new_place_order_index_ = process_order_success(order)
|
# 暂时不需要验证
|
# if not new_place_order_index_:
|
# # 有可能由于L2数据还没有来,导致正确的下单位置尚未获取到,需要延时200ms再进行处理
|
# cls.__thread_pool.submit(lambda: process_order_success(order, delay_s=0.2))
|
elif huaxin_util.is_canceled(order.orderStatus) or huaxin_util.is_deal(
|
order.orderStatus):
|
# 已经撤单/已经成交,需要处理临时保存的系统订单号
|
cls.__TradeOrderIdManager.remove_order_id(order.code,
|
order.accountID,
|
order.orderSysID)
|
if huaxin_util.is_deal(order.orderStatus):
|
# 成交之后处理
|
trade_manager.buy_success(order.code)
|
CancelOrderManager().buy_success(order.code, order.orderRef, order.orderSysID)
|
elif huaxin_util.is_canceled(order.orderStatus):
|
CancelOrderManager().cancel_success(order.code, order.orderRef, order.orderSysID)
|
except Exception as e:
|
async_log_util.exception(hx_logger_trade_debug, e)
|
return False
|
|
# 返回是否要监控撤单
|
@classmethod
|
def process_sell_order(cls, order: HuaxinOrderEntity):
|
# 处理卖单
|
if order.direction != str(huaxin_util.TORA_TSTP_D_Sell):
|
return False
|
cls.order_ref_dict[order.orderRef] = order
|
|
# 下单时间在9:30以后的如果2s没成交就撤单
|
if tool.trade_time_sub(order.insertTime, "09:30:00") >= 0 and str(
|
order.orderStatus) == huaxin_util.TORA_TSTP_OST_Accepted:
|
sell_mode = trade_manager.AutoCancelSellModeManager().get_mode()
|
if sell_mode == trade_manager.AutoCancelSellModeManager.MODE_CANCEL_MECHINE and order.orderRef < 90000:
|
# 只撤机器的单
|
return False
|
return True
|
return False
|
|
@classmethod
|
def get_huaxin_order_by_order_ref(cls, order_ref) -> HuaxinOrderEntity:
|
return cls.order_ref_dict.get(order_ref)
|
|
@classmethod
|
def get_huaxin_sell_order_by_code(cls, code):
|
results = []
|
for k in cls.order_ref_dict:
|
entity = cls.order_ref_dict[k]
|
if entity.code == code and entity.direction == huaxin_util.TORA_TSTP_D_Sell:
|
results.append(entity)
|
return results
|
|
@classmethod
|
def order_success(cls, order: HuaxinOrderEntity):
|
async_log_util.info(hx_logger_trade_debug, f"处理华鑫订单下单成功:{order.code}, {order.orderRef}, {order.orderSysID}")
|
# 加入系统订单号
|
cls.__TradeOrderIdManager.add_order_id(order.code, order.accountID, order.orderSysID)
|
# 删除临时订单号
|
cls.__TradeOrderIdManager.remove_order_ref(order.code, order.orderRef)
|
# 根据插入时间判断下单位置是否正确
|
try:
|
place_index = huaxin_delegate_postion_manager.get_place_order_position(order.code)
|
if place_index and order.acceptTime:
|
# 大致判断是否为真实下单位置
|
total_datas = l2_data_util.local_today_datas.get(order.code)
|
if total_datas:
|
if 0 < tool.trade_time_sub(order.acceptTime, total_datas[place_index]["val"]["time"]) < 4:
|
# 4s内才会校验
|
volume = total_datas[place_index]["val"]["num"]
|
for i in range(place_index + 1, len(total_datas)):
|
if total_datas[i]["val"]["num"] == volume and order.acceptTime == total_datas[i]["val"][
|
"time"]:
|
huaxin_delegate_postion_manager.set_place_order_position(order.code, i)
|
async_log_util.info(hx_logger_trade_debug, "{}校验真实下单成功,{}->{}", order.code, place_index,
|
i)
|
return i
|
else:
|
raise Exception(
|
f"不满足校验条件,真实下单时间:{order.acceptTime} 预估下单时间:{total_datas[place_index]['val']['time']}")
|
else:
|
raise Exception("未获取到L2数据")
|
else:
|
raise Exception(f"尚未获取到数据(place_index-{place_index} acceptTime-{order.acceptTime})")
|
|
except Exception as e:
|
async_log_util.warning(hx_logger_trade_debug, "{}校验真实下单位置出错:{}", order.code, str(e))
|
return None
|
|
@classmethod
|
def cancel_order_success(cls, code, accountId, orderSysID):
|
cls.__TradeOrderIdManager.remove_order_id(code, accountId, orderSysID)
|
|
|
def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None,
|
recancel=False):
|
pass
|
|
|
if __name__ == "__main__":
|
CancelOrderManager().start_cancel("000333", 1, "123123")
|
# CancelOrderManager().cancel_success("000333", 1, "123123")
|
# CancelOrderManager().buy_success("000333", 1, "123123")
|
CancelOrderManager().run(cancel_order)
|