Administrator
2023-09-13 8ea6200de76e128b54ad51e75014b5fc051b55ca
撤单机制修改
6个文件已修改
207 ■■■■■ 已修改文件
huaxin_client/trade_client.py 91 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_record_manager.py 40 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_huaxin.py 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/huaxin_util.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py
@@ -83,15 +83,20 @@
    __cancel_buy_sinfo_set = set()
    __cancel_sell_sinfo_set = set()
    @classmethod
    def set_login_info(cls, session_id, front_id):
        cls.__session_id = session_id
        cls.__front_id = front_id
    # sinfo char(32)
    def buy(self, code, count, price, sinfo):
    def buy(self, code, count, price, sinfo, order_ref):
        if not ENABLE_ORDER:
            return
        if sinfo in self.__buy_sinfo_set:
            raise Exception(f'下单请求已经提交:{sinfo}')
        async_log_util.info(logger_trade, f"{code}华鑫本地真实下单开始")
        async_log_util.info(logger_local_huaxin_trade_debug,
                            f"进入买入方法:code-{code} sinfo-{sinfo}")
                            f"进入买入方法:code-{code} sinfo-{sinfo} order_ref-{order_ref}")
        self.__buy_sinfo_set.add(sinfo)
        self.req_id += 1
        # 请求报单
@@ -113,6 +118,7 @@
        req_field.Direction = traderapi.TORA_TSTP_D_Buy
        req_field.VolumeTotalOriginal = count
        req_field.SInfo = sinfo
        req_field.OrderRef = order_ref
        '''
        上交所支持限价指令和最优五档剩撤、最优五档剩转限两种市价指令,对于科创板额外支持本方最优和对手方最优两种市价指令和盘后固定价格申报指令
@@ -153,11 +159,11 @@
        return
    # 撤买
    def cancel_buy(self, code, order_sys_id, sinfo):
    def cancel_buy(self, code, sinfo, order_sys_id=None, order_ref=None):
        if sinfo in self.__cancel_buy_sinfo_set:
            raise Exception(f'撤单请求已经提交:{sinfo}')
        async_log_util.info(logger_local_huaxin_trade_debug,
                            f"进入撤单方法:code-{code} order_sys_id-{order_sys_id} sinfo-{sinfo}")
                            f"进入撤单方法:code-{code} order_sys_id-{order_sys_id}  order_ref-{order_ref} sinfo-{sinfo}")
        self.__cancel_buy_sinfo_set.add(sinfo)
        self.req_id += 1
        # 请求撤单
@@ -175,7 +181,12 @@
        # req_field.SessionID = self.__session_id
        # req_field.OrderRef = 1
        # (2)系统报单编号方式
        req_field.OrderSysID = order_sys_id
        if order_sys_id:
            req_field.OrderSysID = order_sys_id
        elif order_ref is not None:
            req_field.OrderRef = order_ref
            req_field.SessionID = self.__session_id
            req_field.FrontID = self.__front_id
        # OrderActionRef报单操作引用,用法同报单引用,可根据需要选填
@@ -489,6 +500,7 @@
            logger.info('Login success! [%d]' % nRequestID)
            self.__front_id = pRspUserLoginField.FrontID
            self.__session_id = pRspUserLoginField.SessionID
            TradeSimpleApi.set_login_info(self.__session_id, self.__front_id)
            if 0:
                # 查询股东账号
@@ -603,8 +615,6 @@
                                   pOrderField.OrderRef, pOrderField.OrderLocalID,
                                   pOrderField.LimitPrice, pOrderField.VolumeTotalOriginal, pOrderField.OrderSysID,
                                   pOrderField.OrderStatus, pOrderField.InsertTime))
            OrderIDManager.set_system_order_id(pOrderField.SecurityID, pOrderField.SInfo, pOrderField.OrderSysID)
            if pOrderField.OrderStatus != traderapi.TORA_TSTP_OST_Unknown:
                order_data = {"sinfo": pOrderField.SInfo, "securityID": pOrderField.SecurityID,
                              "orderLocalID": pOrderField.OrderLocalID,
@@ -820,14 +830,14 @@
            volume = data["volume"]
            price = data["price"]
            sinfo = data["sinfo"]
            local_order_id = data.get("local_order_id")
            order_ref = data.get("order_ref")
            if direction == 1:
                async_log_util.info(logger_trade, f"{code}华鑫本地开始下单")
                # 买
                try:
                    req_rid_dict[sinfo] = (client_id, request_id, sk, local_order_id)
                    threading.Thread(target=lambda: self.__tradeSimpleApi.buy(code, volume, price, sinfo),
                    req_rid_dict[sinfo] = (client_id, request_id, sk, order_ref)
                    threading.Thread(target=lambda: self.__tradeSimpleApi.buy(code, volume, price, sinfo, order_ref),
                                     daemon=True).start()
                    async_log_util.info(logger_trade, f"{code}华鑫本地下单线程结束")
                except Exception as e:
@@ -853,19 +863,15 @@
            direction = data["direction"]
            code = data["code"]
            orderSysID = data.get("orderSysID")
            localOrderID = data.get("localOrderID")
            orderRef = data.get("orderRef")
            sinfo = data["sinfo"]
            if direction == 1:
                if not orderSysID and localOrderID:
                    orderSysID = OrderIDManager.get_system_id_by_local_id(localOrderID)
                # 撤买
                try:
                    if not orderSysID:
                        if localOrderID:
                            OrderIDManager.add_need_cancel_local_order_id(localOrderID)
                        raise Exception("没有找到系统订单号")
                    if not orderSysID and orderRef is None:
                        raise Exception("没有找到系统订单号或者报单引用")
                    req_rid_dict[sinfo] = (client_id, request_id, sk)
                    self.__tradeSimpleApi.cancel_buy(code, orderSysID, sinfo)
                    self.__tradeSimpleApi.cancel_buy(code, sinfo, order_sys_id=orderSysID, order_ref=orderRef)
                    async_log_util.info(logger_local_huaxin_trade_debug,
                                        f"撤单结束:code-{code} order_sys_id-{orderSysID} sinfo-{sinfo}")
                except Exception as e:
@@ -999,51 +1005,6 @@
    else:
        strategy_pipe.send(data)
# 订单号管理
class OrderIDManager:
    # 尚未撤单的本地订单号
    not_canceled_local_order_ids = set()
    local_order_id_map = {}
    __TradeSimpleApi = TradeSimpleApi()
    @classmethod
    def get_system_id_by_local_id(cls, local_order_id):
        return cls.local_order_id_map.get(local_order_id)
    # 设置系统订单ID
    @classmethod
    def set_system_order_id(cls, code, sinfo, orderSystemId):
        # 获取本地订单ID
        local_order_id = None
        if req_rid_dict and sinfo in req_rid_dict:
            temp_params = req_rid_dict.get(sinfo)
            if len(temp_params) > 3:
                local_order_id = temp_params[3]
        if local_order_id:
            if local_order_id not in cls.local_order_id_map and orderSystemId:
                cls.local_order_id_map[local_order_id] = orderSystemId
                async_log_util.info(logger_local_huaxin_trade_debug,
                                    f"本地订单号与系统订单号映射,{code}:{local_order_id} {orderSystemId}")
            if local_order_id in cls.not_canceled_local_order_ids:
                async_log_util.info(logger_local_huaxin_trade_debug, f"执行等待撤单,{code}:{local_order_id} {orderSystemId}")
                # 执行撤单
                cls.not_canceled_local_order_ids.discard(local_order_id)
                for i in range(3):
                    try:
                        cls.__TradeSimpleApi.cancel_buy(code, orderSystemId,
                                                        f"lcb-{code}-{round(time.time() * 1000)}")
                        break
                    except Exception as e:
                        logger_local_huaxin_trade_debug.exception(e)
                        time.sleep(0.01)
    @classmethod
    def add_need_cancel_local_order_id(cls, local_order_id):
        cls.not_canceled_local_order_ids.add(local_order_id)
# 交易反馈回调
def __traderapi_callback(type, req_id, data):
    async_log_util.info(logger_local_huaxin_trade_debug, "回调:type-{} req_id-{}", type, req_id)
@@ -1056,8 +1017,8 @@
            client_id, request_id = temp_params[0], temp_params[1]
            # 本地订单号-系统订单号映射
            if len(temp_params) >= 4 and type == TYPE_ORDER:
                local_order_id = temp_params[3]
                data["localOrderId"] = local_order_id
                order_ref = temp_params[3]
                data["orderRef"] = order_ref
            async_log_util.info(logger_local_huaxin_trade_debug, "API回调 request_id-{}", request_id)
            # 测试
trade/huaxin/huaxin_trade_api.py
@@ -298,15 +298,15 @@
                data = response['data']
                # 处理下单
                if data.get('orderStatus') == huaxin_util.TORA_TSTP_OST_Accepted:
                    localOrderId = data.get('localOrderId')
                    orderRef = data.get('orderRef')
                    orderSysID = data.get('orderSysID')
                    accountID = data.get('accountID')
                    insertTime = data.get('insertTime')
                    code = data.get('securityId')
                    if localOrderId and orderSysID:
                    if orderRef and orderSysID:
                        # 移除本地单号,添加系统单号
                        __TradeOrderIdManager.add_order_id(code, accountID, orderSysID)
                        __TradeOrderIdManager.remove_local_order_id(code, localOrderId)
                        __TradeOrderIdManager.remove_order_ref(code, orderRef)
        except:
            pass
    else:
@@ -321,17 +321,17 @@
# price:价格(如果是卖时不传价格就按照5挡价卖)
# blocking是否阻塞进程
def order(direction, code, volume, price, price_type=2, blocking=False, sinfo=None, request_id=None,
          local_order_id=None):
          order_ref=None):
    timestamp = round(time.time() * 1000)
    if not sinfo:
        sinfo = f"b_{code}_{timestamp}"
    if not local_order_id:
        local_order_id = f"l_{code}_{direction}_{volume}_{timestamp}_{random.randint(0, 999)}"
    if not order_ref:
        order_ref = huaxin_util.create_order_ref()
    request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                           {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1,
                            "direction": direction,
                            "code": code,
                            "local_order_id": local_order_id,
                            "order_ref": order_ref,
                            "volume": volume,
                            "price_type": price_type,
                            "price": price, "sinfo": sinfo}, request_id=request_id, blocking=blocking,
@@ -340,20 +340,20 @@
        if blocking:
            return __read_response(request_id, blocking)
        else:
            return {"local_order_id": local_order_id}
            return {"order_ref": order_ref}
    finally:
        huaxin_trade_data_update.add_delegate_list("下单", delay=0.2)
        huaxin_trade_data_update.add_money_list()
def cancel_order(direction, code, orderSysID, localOrderID=None, blocking=False, sinfo=None, request_id=None):
def cancel_order(direction, code, orderSysID, orderRef=None, blocking=False, sinfo=None, request_id=None):
    if not sinfo:
        sinfo = f"cb_{code}_{round(time.time() * 1000)}_{random.randint(0, 10000)}"
    request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                           {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2,
                            "direction": direction,
                            "code": code,
                            "localOrderID": localOrderID,
                            "orderRef": orderRef,
                            "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id, blocking=blocking,
                           is_pipe=is_pipe_channel_normal())
    try:
trade/huaxin/huaxin_trade_record_manager.py
@@ -309,7 +309,7 @@
    __redisManager = RedisManager(2)
    __instance = None
    __huaxin_order_id_cache = {}
    __huaxin_local_order_id_cache = {}
    __huaxin_order_ref_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
@@ -330,37 +330,37 @@
                code = k.split("-")[-1]
                vals = RedisUtils.smembers(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__huaxin_order_id_cache, code, vals)
            keys = RedisUtils.keys(__redis, "huaxin_local_order_id-*")
            keys = RedisUtils.keys(__redis, "huaxin_order_ref-*")
            for k in keys:
                code = k.split("-")[-1]
                vals = RedisUtils.smembers(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__huaxin_local_order_id_cache, code, vals)
                tool.CodeDataCacheUtil.set_cache(cls.__huaxin_order_ref_cache, code, vals)
        finally:
            RedisUtils.realse(__redis)
        # 添加订单ID
    def add_local_order_id(self, code, local_order_id):
        val = local_order_id
        if code not in self.__huaxin_local_order_id_cache:
            self.__huaxin_local_order_id_cache[code] = set()
        self.__huaxin_local_order_id_cache[code].add(val)
        RedisUtils.sadd_async(self.__db, f"huaxin_local_order_id-{code}", val)
        RedisUtils.expire_async(self.__db, f"huaxin_local_order_id-{code}", tool.get_expire())
    def add_order_ref(self, code, order_ref):
        val = order_ref
        if code not in self.__huaxin_order_ref_cache:
            self.__huaxin_order_ref_cache[code] = set()
        self.__huaxin_order_ref_cache[code].add(val)
        RedisUtils.sadd_async(self.__db, f"huaxin_order_ref-{code}", val)
        RedisUtils.expire_async(self.__db, f"huaxin_order_ref-{code}", tool.get_expire())
    # 删除订单ID
    def remove_local_order_id(self, code, local_order_id):
        val = local_order_id
        if code in self.__huaxin_local_order_id_cache:
            self.__huaxin_local_order_id_cache[code].discard(val)
        RedisUtils.srem_async(self.__db, f"huaxin_local_order_id-{code}", val)
    def remove_order_ref(self, code, order_ref):
        val = order_ref
        if code in self.__huaxin_order_ref_cache:
            self.__huaxin_order_ref_cache[code].discard(val)
        RedisUtils.srem_async(self.__db, f"huaxin_order_ref-{code}", val)
    # 查询所有的订单号
    def list_local_order_ids(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"huaxin_local_order_id-{code}")
    def list_order_refs(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"huaxin_order_ref-{code}")
    def list_local_order_ids_cache(self, code):
        if code in self.__huaxin_local_order_id_cache:
            return self.__huaxin_local_order_id_cache[code]
    def list_order_refs_cache(self, code):
        if code in self.__huaxin_order_ref_cache:
            return self.__huaxin_order_ref_cache[code]
        return set()
    # 添加订单ID
trade/trade_huaxin.py
@@ -33,7 +33,7 @@
# 通过量下单,返回(代码,账号ID,订单号)
def order_volume(code, price, count, last_data_index, local_order_id=None):
def order_volume(code, price, count, last_data_index, order_ref=None):
    async_log_util.info(logger_trade, f"{code}下单方法开始")
    price = round(float(price), 2)
    if code.find("00") != 0 and code.find("60") != 0:
@@ -46,7 +46,7 @@
    blocking = False
    try:
        async_log_util.info(logger_trade, f"{code}下单开始")
        result = huaxin_trade_api.order(1, code, count, price, blocking=blocking, local_order_id=local_order_id)
        result = huaxin_trade_api.order(1, code, count, price, blocking=blocking, order_ref=order_ref)
        async_log_util.info(logger_trade, f"{code}下单结束")
    except Exception as e:
        if str(e).find("超时") >= 0:
@@ -69,10 +69,10 @@
            else:
                raise Exception(result['msg'])
        else:
            local_order_id = result["local_order_id"]
            __TradeOrderIdManager.add_local_order_id(code, local_order_id)
            async_log_util.info(hx_logger_trade_debug, f"{code}:下单成功 localOrderId:{local_order_id}")
            return code, "local", local_order_id
            order_ref = result["order_ref"]
            __TradeOrderIdManager.add_order_ref(code, order_ref)
            async_log_util.info(hx_logger_trade_debug, f"{code}:下单成功 orderRef:{order_ref}")
            return code, "local", order_ref
    else:
        raise Exception("下单失败,无返回")
@@ -127,14 +127,14 @@
            async_log_util.info(logger_trade, f"{code}:华鑫撤单结束,撤单数量:{len(orders)}")
        # 查询是否有本地订单号
    local_orders_info = __TradeOrderIdManager.list_local_order_ids_cache(code)
    order_refs_info = __TradeOrderIdManager.list_order_refs_cache(code)
    if local_orders_info:
        local_orders_info = copy.deepcopy(local_orders_info)
        for order_id in local_orders_info:
    if order_refs_info:
        order_refs_info = copy.deepcopy(order_refs_info)
        for order_ref in order_refs_info:
            async_log_util.info(logger_trade, f"{code}:华鑫开始执行撤单 {msg}")
            huaxin_trade_api.cancel_order(1, code, '', localOrderID=order_id)
            __TradeOrderIdManager.remove_local_order_id(code, order_id)
            huaxin_trade_api.cancel_order(1, code, '', orderRef=order_ref)
            __TradeOrderIdManager.remove_order_ref(code, order_ref)
            async_log_util.info(logger_trade, f"{code}:华鑫执行撤单结束")
trade/trade_manager.py
@@ -23,7 +23,7 @@
from log_module.log import *
from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager
from utils import import_util, tool
from utils import import_util, tool, huaxin_util
trade_gui = import_util.import_lib("trade.trade_gui")
@@ -464,9 +464,9 @@
            if constant.TRADE_WAY == constant.TRADE_WAY_JUEJIN:
                trade_juejin.order_volume(code, price, count)
            elif constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN:
                local_order_id = create_local_order_id(code)
                TradeOrderIdManager().add_local_order_id(code, local_order_id)
                trade_huaxin.order_volume(code, price, count, last_data_index, local_order_id=local_order_id)
                order_ref = huaxin_util.create_order_ref()
                TradeOrderIdManager().add_order_ref(code, order_ref)
                trade_huaxin.order_volume(code, price, count, last_data_index, order_ref=order_ref)
        else:
            guiTrade.buy(code, price)
        __place_order_success(code, capture_timestamp, last_data, last_data_index)
@@ -666,10 +666,7 @@
        RedisUtils.realse(redis_l2)
# 生成本地订单号
def create_local_order_id(code):
    local_order_id = f"l_{code}_{1}_{round(time.time() * 1000)}_{random.randint(0, 999)}"
    return local_order_id
if __name__ == "__main__":
utils/huaxin_util.py
@@ -1,4 +1,8 @@
# 报单状态
import threading
from utils import tool
TORA_TSTP_OST_Cached = '0'  # 预埋
TORA_TSTP_OST_Unknown = '1'  # 未知
TORA_TSTP_OST_Accepted = '2'  # 交易所已接收
@@ -48,3 +52,18 @@
    if state == TORA_TSTP_OST_PartTraded or state == TORA_TSTP_OST_AllTraded or state == TORA_TSTP_OST_PartTradeCanceled:
        return True
    return False
__order_ref_lock = threading.RLock()
__public_order_ref = int(tool.get_now_time_str().replace(":", ""))
# 创建订单引用
def create_order_ref():
    __order_ref_lock.acquire()
    try:
        global __public_order_ref
        __public_order_ref += 1
        return __public_order_ref
    finally:
        __order_ref_lock.release()