Administrator
2023-08-16 998dbbd175a47ee1821a04778317b9e053989c52
++++++++++++++++
bug修复-
6个文件已修改
246 ■■■■ 已修改文件
db/redis_manager_delegate.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_api/command_manager.py 48 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_api/trade_client.py 87 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_api.py 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_huaxin.py 83 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager_delegate.py
@@ -187,7 +187,7 @@
                        result = method(*args)
                    else:
                        args = tuple([_redis, args])
                        result = method(args)
                        result = method(*args)
            except Exception as e1:
                logging.exception(e1)
huaxin_api/command_manager.py
@@ -72,29 +72,33 @@
    @classmethod
    def __process_command(cls, _type, client_id, result_json):
        data = result_json["data"]
        print("接收内容", result_json)
        request_id = result_json.get('request_id')
        if not socket_util.is_client_params_sign_right(result_json):
            print("签名错误")
            # 签名出错
            SendResponseSkManager.send_error_response(_type, request_id, client_id,
                                                      {"code": -1, "msg": "签名错误"})
            return
        try:
            data = result_json["data"]
            print("接收内容", result_json)
            request_id = result_json.get('request_id')
            if not socket_util.is_client_params_sign_right(result_json):
                print("签名错误")
                # 签名出错
                SendResponseSkManager.send_error_response(_type, request_id, client_id,
                                                          {"code": -1, "msg": "签名错误"})
                return
        if _type == CLIENT_TYPE_TRADE:
            # 交易
            ctype = data["trade_type"]
            cls.action_callback.OnTrade(client_id, request_id, ctype, data)
        elif _type == CLIENT_TYPE_MONEY:
            cls.action_callback.OnMoney(client_id, request_id)
        elif _type == CLIENT_TYPE_DEAL_LIST:
            cls.action_callback.OnDealList(client_id, request_id)
        elif _type == CLIENT_TYPE_DELEGATE_LIST:
            can_cancel = data["can_cancel"]
            cls.action_callback.OnDelegateList(client_id, request_id, can_cancel)
        elif _type == CLIENT_TYPE_POSITION_LIST:
            cls.action_callback.OnPositionList(client_id, request_id)
            if _type == CLIENT_TYPE_TRADE:
                # 交易
                ctype = data["trade_type"]
                cls.action_callback.OnTrade(client_id, request_id, ctype, data)
            elif _type == CLIENT_TYPE_MONEY:
                cls.action_callback.OnMoney(client_id, request_id)
            elif _type == CLIENT_TYPE_DEAL_LIST:
                cls.action_callback.OnDealList(client_id, request_id)
            elif _type == CLIENT_TYPE_DELEGATE_LIST:
                can_cancel = data["can_cancel"]
                cls.action_callback.OnDelegateList(client_id, request_id, can_cancel)
            elif _type == CLIENT_TYPE_POSITION_LIST:
                cls.action_callback.OnPositionList(client_id, request_id)
        except Exception as e:
            logging.exception(e)
            logging.error(result_json)
    @classmethod
    def run_process_command(cls, pipe_strategy):
huaxin_api/trade_client.py
@@ -512,24 +512,29 @@
        else:
            logger.info('OnRspOrderInsert: Error! [%d] [%d] [%s]'
                        % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
            self.__data_callback(TYPE_ORDER, nRequestID, {"sinfo": pInputOrderField.SInfo, "orderStatus": -1,
                                                          "orderStatusMsg": pRspInfoField.ErrorMsg})
            threading.Thread(target=lambda: self.__data_callback(TYPE_ORDER, nRequestID,
                                                                 {"sinfo": pInputOrderField.SInfo, "orderStatus": -1,
                                                                  "orderStatusMsg": pRspInfoField.ErrorMsg}),
                             daemon=True).start()
    def OnRspOrderAction(self, pInputOrderActionField: "CTORATstpInputOrderActionField",
                         pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        if pRspInfoField.ErrorID == 0:
            logger.info('OnRspOrderAction: OK! [%d]' % nRequestID)
            self.__data_callback(TYPE_CANCEL_ORDER, nRequestID, {"sinfo": pInputOrderActionField.SInfo,
                                                                 "orderSysID": pInputOrderActionField.OrderSysID,
                                                                 "cancel": 1})
            threading.Thread(target=lambda: self.__data_callback(TYPE_CANCEL_ORDER, nRequestID,
                                                                 {"sinfo": pInputOrderActionField.SInfo,
                                                                  "orderSysID": pInputOrderActionField.OrderSysID,
                                                                  "cancel": 1}), daemon=True).start()
        else:
            logger.info('OnRspOrderAction: Error! [%d] [%d] [%s]'
                        % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
            self.__data_callback(TYPE_CANCEL_ORDER, nRequestID, {"sinfo": pInputOrderActionField.SInfo,
                                                                 "orderSysID": pInputOrderActionField.OrderSysID,
                                                                 "cancel": 0, "errorID": pRspInfoField.ErrorID,
                                                                 "errorMsg": pRspInfoField.ErrorMsg})
            threading.Thread(target=lambda: self.__data_callback(TYPE_CANCEL_ORDER, nRequestID,
                                                                 {"sinfo": pInputOrderActionField.SInfo,
                                                                  "orderSysID": pInputOrderActionField.OrderSysID,
                                                                  "cancel": 0, "errorID": pRspInfoField.ErrorID,
                                                                  "errorMsg": pRspInfoField.ErrorMsg}),
                             daemon=True).start()
    def OnRspInquiryJZFund(self, pRspInquiryJZFundField: "CTORATstpRspInquiryJZFundField",
                           pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
@@ -556,12 +561,15 @@
               pOrderField.LimitPrice, pOrderField.VolumeTotalOriginal, pOrderField.OrderSysID,
               pOrderField.OrderStatus))
        if pOrderField.OrderStatus != traderapi.TORA_TSTP_OST_Unknown:
            self.__data_callback(TYPE_ORDER, 0, {"sinfo": pOrderField.SInfo, "securityId": pOrderField.SecurityID,
                                                 "orderLocalId": pOrderField.OrderLocalID,
                                                 "orderStatus": pOrderField.OrderStatus,
                                                 "statusMsg": pOrderField.StatusMsg,
                                                 "orderSysID": pOrderField.OrderSysID,
                                                 "accountID": pOrderField.AccountID})
            set_system_order_id(pOrderField.SInfo, pOrderField.OrderSysID)
            threading.Thread(target=lambda: self.__data_callback(TYPE_ORDER, 0, {"sinfo": pOrderField.SInfo,
                                                                                 "securityId": pOrderField.SecurityID,
                                                                                 "orderLocalId": pOrderField.OrderLocalID,
                                                                                 "orderStatus": pOrderField.OrderStatus,
                                                                                 "statusMsg": pOrderField.StatusMsg,
                                                                                 "orderSysID": pOrderField.OrderSysID,
                                                                                 "accountID": pOrderField.AccountID}),
                             daemon=True).start()
    def OnRtnTrade(self, pTradeField: "CTORATstpTradeField") -> "void":
        logger_trade_debug.info(
@@ -634,7 +642,8 @@
                   pTradingAccountField.UsefulMoney, pTradingAccountField.FetchLimit))
        else:
            results = self.__temp_money_account_list_dict.pop(nRequestID)
            self.__data_callback(TYPE_LIST_MONEY, nRequestID, results)
            threading.Thread(target=lambda: self.__data_callback(TYPE_LIST_MONEY, nRequestID, results),
                             daemon=True).start()
            logger.info('查询资金账号结束[%d] ErrorID[%d] ErrorMsg[%s]'
                        % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
@@ -664,7 +673,9 @@
        else:
            logger.info('查询报单结束[%d] ErrorID[%d] ErrorMsg[%s]'
                        % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
            self.__data_callback(TYPE_LIST_DELEGATE, nRequestID, self.__temp_order_list_dict[nRequestID])
            threading.Thread(target=lambda: self.__data_callback(TYPE_LIST_DELEGATE, nRequestID,
                                                                 self.__temp_order_list_dict[nRequestID]),
                             daemon=True).start()
            self.__temp_order_list_dict.pop(nRequestID)
    def OnRspQryPosition(self, pPositionField: "CTORATstpPositionField", pRspInfoField: "CTORATstpRspInfoField",
@@ -687,7 +698,9 @@
        else:
            logger.info('查询持仓结束[%d] ErrorID[%d] ErrorMsg[%s]'
                        % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
            self.__data_callback(TYPE_LIST_POSITION, nRequestID, self.__temp_position_list_dict[nRequestID])
            threading.Thread(target=lambda: self.__data_callback(TYPE_LIST_POSITION, nRequestID,
                                                                 self.__temp_position_list_dict[nRequestID]),
                             daemon=True).start()
            self.__temp_position_list_dict.pop(nRequestID)
    # 成交回报,参数pTradeField是一个CTORATstpTradeField类对象
@@ -709,7 +722,9 @@
                 "volume": pTradeField.Volume, "tradeDate": pTradeField.TradeDate, "tradingDay": pTradeField.TradingDay,
                 "pbuID": pTradeField.PbuID, "accountID": pTradeField.AccountID})
        else:
            self.__data_callback(TYPE_LIST_TRADED, nRequestID, self.__temp_order_list_dict[nRequestID])
            threading.Thread(target=lambda: self.__data_callback(TYPE_LIST_TRADED, nRequestID,
                                                                 self.__temp_order_list_dict[nRequestID]),
                             daemon=True).start()
            self.__temp_order_list_dict.pop(nRequestID)
@@ -733,10 +748,11 @@
            volume = data["volume"]
            price = data["price"]
            sinfo = data["sinfo"]
            local_order_id = data.get("local_order_id")
            if direction == 1:
                # 买
                try:
                    req_rid_dict[sinfo] = (client_id, request_id)
                    req_rid_dict[sinfo] = (client_id, request_id, local_order_id)
                    self.__tradeSimpleApi.buy(code, volume, price, sinfo)
                except Exception as e:
                    send_response(json.dumps({"code": 1, "msg": str(e)}), TYPE_ORDER, client_id,
@@ -759,11 +775,16 @@
            # 撤单
            direction = data["direction"]
            code = data["code"]
            orderSysID = data["orderSysID"]
            orderSysID = data.get("orderSysID")
            localOrderID = data.get("localOrderID")
            sinfo = data["sinfo"]
            if direction == 1:
                if not orderSysID and localOrderID:
                    orderSysID = local_order_id_map.get(localOrderID)
                # 撤买
                try:
                    if not orderSysID:
                        raise Exception("没有找到系统订单号")
                    req_rid_dict[sinfo] = (client_id, request_id)
                    self.__tradeSimpleApi.cancel_buy(code, orderSysID, sinfo)
                except Exception as e:
@@ -887,6 +908,21 @@
    strategy_pipe.send(data)
local_order_id_map = {}
# 设置系统订单ID
def set_system_order_id(sinfo, orderSystemId):
    # 获取本地订单ID
    local_order_id = None
    if req_rid_dict and sinfo in req_rid_dict:
        temp_params = req_rid_dict.get(sinfo)
        if len(temp_params) > 2:
            local_order_id = temp_params[2]
    if local_order_id and local_order_id not in local_order_id_map and orderSystemId:
        local_order_id_map[local_order_id] = orderSystemId
# 交易反馈回调
def __traderapi_callback(type, req_id, data):
    logger_trade_debug.info("回调:type-{} req_id-{}", type, req_id)
@@ -897,7 +933,13 @@
        print("traderapi_callback", req_rid_dict)
        if req_rid_dict and key in req_rid_dict:
            print("API回调")
            client_id, request_id = req_rid_dict.pop(key)
            temp_params = req_rid_dict.pop(key)
            client_id, request_id = temp_params[0], temp_params[1]
            # 本地订单号-系统订单号映射
            if len(temp_params) >= 3 and type == TYPE_ORDER:
                local_order_id = temp_params[2]
                data["localOrderId"] = local_order_id
            logger_trade_debug.info("API回调 request_id-{}", request_id)
            # 测试
            send_response(
@@ -935,7 +977,6 @@
    global strategy_pipe
    strategy_pipe = pipe_strategy
    global tradeCommandManager
    tradeCommandManager = command_manager.TradeCommandManager()
log_module/log.py
@@ -294,6 +294,8 @@
def close_print():
    if True:
        return
    logging.basicConfig(level=logging.ERROR)
    if not constant.is_windows():
        os.close(1)
trade/huaxin/huaxin_trade_api.py
@@ -248,32 +248,38 @@
# price:价格(如果是卖时不传价格就按照5挡价卖)
# blocking是否阻塞进程
def order(direction, code, volume, price, price_type=2, blocking=True, sinfo=None, request_id=None):
    timestamp = round(time.time() * 1000)
    if not sinfo:
        sinfo = f"b_{code}_{round(time.time() * 1000)}"
        sinfo = f"b_{code}_{timestamp}"
    print("客户端", ClientSocketManager.socket_client_dict)
    local_order_id = f"l_{code}_{direction}_{volume}_{timestamp}_{random.randint(0, 999)}"
    request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                           {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 1,
                            "direction": direction,
                            "code": code,
                            "local_order_id": local_order_id,
                            "volume": volume,
                            "price_type": price_type,
                            "price": price, "sinfo": sinfo}, request_id=request_id)
    try:
        return __read_response(request_id, blocking)
    finally:
        huaxin_trade_data_update.add_delegate_list()
        huaxin_trade_data_update.add_money_list()
    if blocking:
        try:
            return __read_response(request_id, blocking)
        finally:
            huaxin_trade_data_update.add_delegate_list()
            huaxin_trade_data_update.add_money_list()
    else:
        return {"local_order_id": local_order_id}
def cancel_order(direction, code, orderSysID, blocking=True, sinfo=None, request_id=None):
def cancel_order(direction, code, orderSysID, localOrderID=None, blocking=True, sinfo=None, request_id=None):
    if not sinfo:
        sinfo = f"cb_{code}_{round(time.time() * 1000)}"
    request_id = __request(ClientSocketManager.CLIENT_TYPE_TRADE,
                           {"type": ClientSocketManager.CLIENT_TYPE_TRADE, "trade_type": 2,
                            "direction": direction,
                            "code": code,
                            "orderSysID": orderSysID, "sinfo": sinfo},request_id=request_id)
                            "localOrderID": localOrderID,
                            "orderSysID": orderSysID, "sinfo": sinfo}, request_id=request_id)
    try:
        return __read_response(request_id, blocking)
    finally:
trade/trade_huaxin.py
@@ -20,6 +20,7 @@
    __redisManager = RedisManager(2)
    __instance = None
    __huaxin_order_id_cache = {}
    __huaxin_local_order_id_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
@@ -40,12 +41,44 @@
                code = k.split("-")[-1]
                vals = RedisUtils.smembers(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__huaxin_order_id_cache, code, vals)
            keys = RedisUtils.keys(__redis, "huaxin_local_order_id-*")
            for k in keys:
                code = k.split("-")[-1]
                vals = RedisUtils.smembers(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__huaxin_local_order_id_cache, code, vals)
        finally:
            RedisUtils.realse(__redis)
        # 添加订单ID
    def add_local_order_id(self, code, local_order_id):
        val = local_order_id
        if code not in self.__huaxin_local_order_id_cache:
            self.__huaxin_local_order_id_cache[code] = set()
        self.__huaxin_local_order_id_cache[code].add(val)
        RedisUtils.sadd_async(self.__db, f"huaxin_local_order_id-{code}", val)
        RedisUtils.expire_async(self.__db, f"huaxin_local_order_id-{code}", tool.get_expire())
        # 删除订单ID
    def remove_local_order_id(self, code, local_order_id):
        val = local_order_id
        if code in self.__huaxin_local_order_id_cache:
            self.__huaxin_local_order_id_cache[code].discard(val)
        RedisUtils.srem_async(self.__get_redis(), f"huaxin_local_order_id-{code}", val)
        # 查询所有的订单号
    def list_local_order_ids(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"huaxin_local_order_id-{code}")
    def list_local_order_ids_cache(self, code):
        if code in self.__huaxin_local_order_id_cache:
            return self.__huaxin_local_order_id_cache[code]
        return set()
    # 添加订单ID
    def add_order_id(self, code, account_id, order_id):
        val = json.dumps((account_id, order_id))
    def add_order_id(self, code, account_id, sys_order_id):
        val = json.dumps((account_id, sys_order_id))
        if code not in self.__huaxin_order_id_cache:
            self.__huaxin_order_id_cache[code] = set()
        self.__huaxin_order_id_cache[code].add(val)
@@ -94,8 +127,9 @@
    if not constant.TRADE_ENABLE:
        return
    result = None
    blocking = False
    try:
        result = huaxin_trade_api.order(1, code, count, price)
        result = huaxin_trade_api.order(1, code, count, price, blocking=blocking)
        print("华鑫下单耗时", time.time() - start_time)
        hx_logger_trade_debug.info(f"{code}:下单耗时{round(time.time() - start_time, 3)}s")
    except Exception as e:
@@ -107,17 +141,23 @@
    if result:
        print("下单结果", result)
        if result['code'] == 0:
            result = result["data"]
            if result["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected:
                logger_juejin_trade.info(f"{code}:下单失败:{result.get('statusMsg')}")
                raise Exception(result.get('statusMsg'))
        if blocking:
            if result['code'] == 0:
                result = result["data"]
                if result["orderStatus"] == huaxin_util.TORA_TSTP_OST_Rejected:
                    logger_juejin_trade.info(f"{code}:下单失败:{result.get('statusMsg')}")
                    raise Exception(result.get('statusMsg'))
                else:
                    TradeOrderIdManager().add_order_id(code, result["accountID"], result["orderSysID"])
                    logger_juejin_trade.info(f"{code}:下单成功 orderSysID:{result['orderSysID']}")
                    return result["securityId"], result["accountID"], result["orderSysID"]
            else:
                TradeOrderIdManager().add_order_id(code, result["accountID"], result["orderSysID"])
                logger_juejin_trade.info(f"{code}:下单成功 orderSysID:{result['orderSysID']}")
                return result["securityId"], result["accountID"], result["orderSysID"]
                raise Exception(result['msg'])
        else:
            raise Exception(result['msg'])
            local_order_id = result["local_order_id"]
            TradeOrderIdManager().add_local_order_id(code, local_order_id)
            logger_juejin_trade.info(f"{code}:下单成功 localOrderId:{local_order_id}")
            return code, "local", local_order_id
    else:
        raise Exception("下单失败,无返回")
@@ -138,12 +178,19 @@
        for order in orders_info:
            order_info = json.loads(order)
            orders.append({'orderSysID': order_info[1], 'accountId': order_info[0]})
    if orders:
        logger_juejin_trade.info(f"{code}:开始执行撤单")
        logger_juejin_trade.info(f"{code}:撤单成功,撤单数量:{len(orders)}")
        for order in orders:
            huaxin_trade_api.cancel_order(1, code, order["orderSysID"])
            TradeOrderIdManager().remove_order_id(code, order["accountId"], order["orderSysID"])
        if orders:
            logger_juejin_trade.info(f"{code}:开始执行撤单")
            logger_juejin_trade.info(f"{code}:撤单成功,撤单数量:{len(orders)}")
            for order in orders:
                huaxin_trade_api.cancel_order(1, code, order["orderSysID"])
                TradeOrderIdManager().remove_order_id(code, order["accountId"], order["orderSysID"])
    else:
        # 查询是否有本地订单号
        orders_info = TradeOrderIdManager().list_local_order_ids_cache(code)
        if orders_info:
            for order_id in orders_info:
                huaxin_trade_api.cancel_order(1, code, '', localOrderID=order_id)
                TradeOrderIdManager().remove_local_order_id(code, order_id)
if __name__ == "__main__":