Administrator
2023-09-05 bcc38a54b2ca8268d7233b1a0e8250d81ca4bd7d
为交易加锁
4个文件已修改
69 ■■■■■ 已修改文件
constant.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -114,10 +114,10 @@
L_CANCEL_MIN_WATCH_COUNT = 10
# 撤单比例
L_CANCEL_RATE = 0.6
# 大金额
L_CANCEL_BIG_MONEY = 100
# 小金额
L_CANCEL_MIN_MONEY = 50
# 华鑫L2的卡位数量
HUAXIN_L2_MAX_CODES_COUNT = 50
huaxin_client/trade_client.py
@@ -152,7 +152,8 @@
    def cancel_buy(self, code, order_sys_id, sinfo):
        if sinfo in self.__cancel_buy_sinfo_set:
            raise Exception(f'撤单请求已经提交:{sinfo}')
        logger_local_huaxin_trade_debug.info(f"进入撤单方法:code-{code} order_sys_id-{order_sys_id} sinfo-{sinfo}")
        async_log_util.info(logger_local_huaxin_trade_debug,
                            f"进入撤单方法:code-{code} order_sys_id-{order_sys_id} sinfo-{sinfo}")
        self.__cancel_buy_sinfo_set.add(sinfo)
        self.req_id += 1
        # 请求撤单
@@ -536,14 +537,14 @@
                         pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        try:
            if pRspInfoField.ErrorID == 0:
                logger_local_huaxin_trade_debug.info('OnRspOrderAction: OK! [%d]' % nRequestID)
                async_log_util.info(logger_local_huaxin_trade_debug, 'OnRspOrderAction: OK! [%d]' % nRequestID)
                threading.Thread(target=lambda: self.__data_callback(TYPE_CANCEL_ORDER, nRequestID,
                                                                     {"sinfo": pInputOrderActionField.SInfo,
                                                                      "orderSysID": pInputOrderActionField.OrderSysID,
                                                                      "cancel": 1}), daemon=True).start()
            else:
                logger_local_huaxin_trade_debug.info('OnRspOrderAction: Error! [%d] [%d] [%s]'
                async_log_util.info(logger_local_huaxin_trade_debug, 'OnRspOrderAction: Error! [%d] [%d] [%s]'
                                                     % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
                threading.Thread(target=lambda: self.__data_callback(TYPE_CANCEL_ORDER, nRequestID,
                                                                     {"sinfo": pInputOrderActionField.SInfo,
@@ -559,7 +560,7 @@
                            pRspInfoField: "CTORATstpRspInfoField", nRequestID: "int") -> "void":
        try:
            if pInputOrderActionField and pRspInfoField:
                logger_local_huaxin_trade_debug.info('OnErrRtnOrderAction: Error! [%d] [%d] [%d] [%s]'
                async_log_util.info(logger_local_huaxin_trade_debug, 'OnErrRtnOrderAction: Error! [%d] [%d] [%d] [%s]'
                                                     % (nRequestID, pInputOrderActionField.OrderSysID,
                                                        pRspInfoField.ErrorID,
                                                        pRspInfoField.ErrorMsg))
@@ -591,9 +592,10 @@
    def OnRtnOrder(self, pOrderField: "CTORATstpOrderField") -> "void":
        try:
            logger_local_huaxin_trade_debug.info(
            async_log_util.info(logger_local_huaxin_trade_debug,
                '[%d] OnRtnOrder: SInfo[%s] InvestorID[%s] SecurityID[%s] OrderRef[%d] OrderLocalID[%s] LimitPrice[%.2f] VolumeTotalOriginal[%d] OrderSysID[%s] OrderStatus[%s] InsertTime[%s]'
                % (round(time.time() * 1000), pOrderField.SInfo, pOrderField.InvestorID, pOrderField.SecurityID,
                                % (round(time.time() * 1000), pOrderField.SInfo, pOrderField.InvestorID,
                                   pOrderField.SecurityID,
                   pOrderField.OrderRef, pOrderField.OrderLocalID,
                   pOrderField.LimitPrice, pOrderField.VolumeTotalOriginal, pOrderField.OrderSysID,
                   pOrderField.OrderStatus, pOrderField.InsertTime))
@@ -620,10 +622,11 @@
    def OnRtnTrade(self, pTradeField: "CTORATstpTradeField") -> "void":
        try:
            logger_local_huaxin_trade_debug.info(
            async_log_util.info(logger_local_huaxin_trade_debug,
                'OnRtnTrade: TradeID[%s] InvestorID[%s] SecurityID[%s] OrderRef[%d] OrderLocalID[%s] Price[%.2f] Volume[%d]'
                % (pTradeField.TradeID, pTradeField.InvestorID, pTradeField.SecurityID,
                   pTradeField.OrderRef, pTradeField.OrderLocalID, pTradeField.Price, pTradeField.Volume))
                                   pTradeField.OrderRef, pTradeField.OrderLocalID, pTradeField.Price,
                                   pTradeField.Volume))
        except:
            pass
@@ -805,7 +808,8 @@
    def OnTrade(self, client_id, request_id, sk, type_, data):
        if type_ == 1:
            async_log_util.info(logger_local_huaxin_trade_debug,f"\n---------------------\n请求下单:client_id-{client_id} request_id-{request_id}")
            async_log_util.info(logger_local_huaxin_trade_debug,
                                f"\n---------------------\n请求下单:client_id-{client_id} request_id-{request_id}")
            # 下单
            # 1-买 2-卖
            direction = data["direction"]
@@ -835,7 +839,8 @@
                                  request_id)
        elif type_ == 2:
            async_log_util.info(logger_local_huaxin_trade_debug, f"\n---------------------\n请求撤单:client_id-{client_id} request_id-{request_id} data-{data}")
            async_log_util.info(logger_local_huaxin_trade_debug,
                                f"\n---------------------\n请求撤单:client_id-{client_id} request_id-{request_id} data-{data}")
            # 撤单
            direction = data["direction"]
            code = data["code"]
@@ -867,7 +872,7 @@
                                  request_id)
    def OnDealList(self, client_id, request_id, sk):
        logger_local_huaxin_trade_debug.info(f"请求成交列表:client_id-{client_id} request_id-{request_id}")
        async_log_util.info(logger_local_huaxin_trade_debug, f"请求成交列表:client_id-{client_id} request_id-{request_id}")
        try:
            print("开始请求成交列表")
            req_id = self.__tradeSimpleApi.list_traded_orders()
@@ -877,7 +882,7 @@
            SendResponseSkManager.send_error_response("common", request_id, client_id, str(e))
    def OnDelegateList(self, client_id, request_id, sk, is_cancel):
        logger_local_huaxin_trade_debug.info(f"请求委托列表:client_id-{client_id} request_id-{request_id}")
        async_log_util.info(logger_local_huaxin_trade_debug, f"请求委托列表:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.list_delegate_orders(is_cancel)
            req_rid_dict[req_id] = (client_id, request_id, sk)
@@ -886,7 +891,7 @@
                          request_id)
    def OnMoney(self, client_id, request_id, sk):
        logger_local_huaxin_trade_debug.info(f"请求账户:client_id-{client_id} request_id-{request_id}")
        async_log_util.info(logger_local_huaxin_trade_debug, f"请求账户:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.get_money_account()
            req_rid_dict[req_id] = (client_id, request_id, sk)
@@ -895,7 +900,7 @@
                          request_id)
    def OnPositionList(self, client_id, request_id, sk):
        logger_local_huaxin_trade_debug.info(f"请求持仓:client_id-{client_id} request_id-{request_id}")
        async_log_util.info(logger_local_huaxin_trade_debug, f"请求持仓:client_id-{client_id} request_id-{request_id}")
        try:
            req_id = self.__tradeSimpleApi.list_positions()
            req_rid_dict[req_id] = (client_id, request_id, sk)
@@ -1009,7 +1014,8 @@
        if local_order_id:
            if local_order_id not in cls.local_order_id_map and orderSystemId:
                cls.local_order_id_map[local_order_id] = orderSystemId
                async_log_util.info( logger_local_huaxin_trade_debug, f"本地订单号与系统订单号映射,{code}:{local_order_id} {orderSystemId}")
                async_log_util.info(logger_local_huaxin_trade_debug,
                                    f"本地订单号与系统订单号映射,{code}:{local_order_id} {orderSystemId}")
            if local_order_id in cls.not_canceled_local_order_ids:
                async_log_util.info(logger_local_huaxin_trade_debug,f"执行等待撤单,{code}:{local_order_id} {orderSystemId}")
                # 执行撤单
@@ -1030,7 +1036,7 @@
# 交易反馈回调
def __traderapi_callback(type, req_id, data):
    logger_local_huaxin_trade_debug.info("回调:type-{} req_id-{}", type, req_id)
    async_log_util.info(logger_local_huaxin_trade_debug, "回调:type-{} req_id-{}", type, req_id)
    key = req_id
    if type == TYPE_ORDER or type == TYPE_CANCEL_ORDER:
        key = data["sinfo"]
@@ -1045,7 +1051,7 @@
                local_order_id = temp_params[3]
                data["localOrderId"] = local_order_id
            logger_local_huaxin_trade_debug.info("API回调 request_id-{}", request_id)
            async_log_util.info(logger_local_huaxin_trade_debug, "API回调 request_id-{}", request_id)
            # 测试
            # send_response(
            #     json.dumps({"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
@@ -1055,10 +1061,10 @@
                {"type": "response", "data": {"code": 0, "data": data}, "client_id": client_id,
                 "request_id": request_id})
            logger_local_huaxin_trade_debug.info("API回调结束 req_id-{} request_id-{}", req_id, request_id)
            async_log_util.info(logger_local_huaxin_trade_debug, "API回调结束 req_id-{} request_id-{}", req_id, request_id)
            print("API回调结束")
        else:
            logger_local_huaxin_trade_debug.info("非API回调 req_id-{}", req_id)
            async_log_util.info(logger_local_huaxin_trade_debug, "非API回调 req_id-{}", req_id)
            print("非API回调")
            trade_response.OnTradeCallback({"type": "trade_callback", "data": {"code": 0, "data": data, "type": type}})
            # # 非API回调
l2/cancel_buy_strategy.py
@@ -1106,6 +1106,9 @@
            val = data['val']
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            # 小金额过滤
            if float(val['price'])*val['num'] < constant.L_CANCEL_MIN_MONEY * 100:
                continue
            # 判断当前订单是否已经撤单
            left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code, i, total_data,
l2/l2_data_manager_new.py
@@ -1,5 +1,6 @@
import io
import logging
import threading
import time as t
from code_attribute import big_money_num_manager, code_volumn_manager, code_data_util, industry_codes_sort, \
@@ -206,6 +207,7 @@
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    volume_rate_info = {}
    __trade_thread_lock_dict = {}
    __codeActualPriceProcessor = CodeActualPriceProcessor()
    __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager()
    __thsBuy1VolumnManager = trade_queue_manager.THSBuy1VolumnManager()
@@ -595,8 +597,13 @@
            return False
        else:
            l2_log.debug(code, "可以下单,原因:{}", reason)
            if code not in cls.__trade_thread_lock_dict:
                cls.__trade_thread_lock_dict[code] = threading.RLock()
            cls.__trade_thread_lock_dict[code].acquire()
            try:
                l2_log.debug(code, "开始执行买入")
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
@@ -616,7 +623,7 @@
                pass
            finally:
                # l2_log.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
                pass
                cls.__trade_thread_lock_dict[code].release()
            return True
    # 是否可以取消
@@ -916,7 +923,11 @@
    @classmethod
    def __cancel_buy(cls, code):
        # 加锁
        if code not in cls.__trade_thread_lock_dict:
            cls.__trade_thread_lock_dict[code] = threading.RLock()
        try:
            cls.__trade_thread_lock_dict[code].acquire()
            l2_log.debug(code, "开始执行撤单")
            trade_manager.start_cancel_buy(code)
            l2_log.debug(code, "执行撤单成功")
@@ -925,6 +936,9 @@
            logging.exception(e)
            l2_log.debug(code, "执行撤单异常:{}", str(e))
            return False
        finally:
            cls.__trade_thread_lock_dict[code].release()
    @classmethod
    def cancel_buy(cls, code, msg=None, source="l2"):