Administrator
2023-09-04 acd3f8f4d658f8261f3a592d7943c065e0b6fc71
订阅代码增加跟踪日志
8个文件已修改
75 ■■■■■ 已修改文件
code_attribute/first_target_code_data_processor.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/trade_client.py 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_target_codes_manager.py 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/current_price_process_manager.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_api_server.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/first_target_code_data_processor.py
@@ -22,8 +22,8 @@
__CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
def process_first_codes_datas(dataList):
    logger_l2_codes_subscript.info("加载l2代码相关数据")
def process_first_codes_datas(dataList, request_id=None):
    logger_l2_codes_subscript.info(f"{request_id}加载l2代码相关数据")
    print("首板代码数量:", len(dataList))
    limit_up_price_dict = {}
    temp_codes = []
@@ -209,5 +209,5 @@
                    trade_data_manager.PlaceOrderCountManager().place_order(code)
    gpcode_first_screen_manager.process_ticks(prices)
    logger_l2_codes_subscript.info("l2代码相关数据加载完成")
    logger_l2_codes_subscript.info(f"{request_id}l2代码相关数据加载完成")
    return tick_datas
huaxin_client/l1_client.py
@@ -125,8 +125,9 @@
        return
    # 上传数据
    type_ = "set_target_codes"
    request_id = f"sb_{int(time.time() * 1000)}"
    fdata = json.dumps(
        {"type": type_, "data": {"data": datas}})
        {"type": type_, "data": {"data": datas}, "request_id": request_id})
    if pipe_l2 is not None:
        pipe_l2.send(fdata)
    # 记录新增加的代码
@@ -136,7 +137,7 @@
    for c in codes:
        __latest_subscript_codes.add(c)
    if add_codes:
        logger_local_huaxin_l1.info(f"新增加订阅的代码:{add_codes}")
        logger_local_huaxin_l1.info(f"({request_id})新增加订阅的代码:{add_codes}")
def run(pipe_l2):
huaxin_client/trade_client.py
@@ -69,6 +69,8 @@
TYPE_LIST_TRADED = 3
TYPE_LIST_POSITION = 4
TYPE_LIST_MONEY = 5
# 成交
TYPE_DEAL = 6
ENABLE_ORDER = True
@@ -521,7 +523,8 @@
                logger.info('OnRspOrderInsert: Error! [%d] [%d] [%s]'
                            % (nRequestID, pRspInfoField.ErrorID, pRspInfoField.ErrorMsg))
                threading.Thread(target=lambda: self.__data_callback(TYPE_ORDER, nRequestID,
                                                                     {"sinfo": pInputOrderField.SInfo, "orderStatus": -1,
                                                                     {"sinfo": pInputOrderField.SInfo,
                                                                      "orderStatus": -1,
                                                                      "orderStatusMsg": pRspInfoField.ErrorMsg}),
                                 daemon=True).start()
        except:
@@ -595,6 +598,16 @@
                   pOrderField.OrderStatus, pOrderField.InsertTime))
            OrderIDManager.set_system_order_id(pOrderField.SecurityID, pOrderField.SInfo, pOrderField.OrderSysID)
            # 上报订单状态
            if pOrderField.OrderStatus == traderapi.TORA_TSTP_OST_AllTraded or pOrderField.OrderStatus == traderapi.TORA_TSTP_OST_PartTradeCanceled:
                threading.Thread(target=lambda: self.__data_callback(TYPE_DEAL, 0, {"sinfo": pOrderField.SInfo,
                                                                                    "securityId": pOrderField.SecurityID,
                                                                                    "orderLocalId": pOrderField.OrderLocalID,
                                                                                    "orderStatus": pOrderField.OrderStatus,
                                                                                    "statusMsg": pOrderField.StatusMsg,
                                                                                    "orderSysID": pOrderField.OrderSysID,
                                                                                    "accountID": pOrderField.AccountID}),
                                 daemon=True).start()
            if pOrderField.OrderStatus != traderapi.TORA_TSTP_OST_Unknown:
                threading.Thread(target=lambda: self.__data_callback(TYPE_ORDER, 0, {"sinfo": pOrderField.SInfo,
@@ -770,9 +783,11 @@
                self.__temp_order_list_dict[nRequestID].append(
                    {"tradeID": pTradeField.TradeID, "securityID": pTradeField.SecurityID,
                     "orderLocalID": pTradeField.OrderLocalID,
                     "direction": pTradeField.Direction, "orderSysID": pTradeField.OrderSysID, "price": pTradeField.Price,
                     "direction": pTradeField.Direction, "orderSysID": pTradeField.OrderSysID,
                     "price": pTradeField.Price,
                     "tradeTime": pTradeField.TradeTime,
                     "volume": pTradeField.Volume, "tradeDate": pTradeField.TradeDate, "tradingDay": pTradeField.TradingDay,
                     "volume": pTradeField.Volume, "tradeDate": pTradeField.TradeDate,
                     "tradingDay": pTradeField.TradingDay,
                     "pbuID": pTradeField.PbuID, "accountID": pTradeField.AccountID})
            else:
                threading.Thread(target=lambda: self.__data_callback(TYPE_LIST_TRADED, nRequestID,
@@ -781,7 +796,6 @@
                self.__temp_order_list_dict.pop(nRequestID)
        except:
            pass
# 获取响应发送socket
@@ -1005,8 +1019,14 @@
                logger_local_huaxin_trade_debug.info(f"执行等待撤单,{code}:{local_order_id} {orderSystemId}")
                # 执行撤单
                cls.not_canceled_local_order_ids.discard(local_order_id)
                for i in range(3):
                    try:
                cls.__TradeSimpleApi.cancel_buy(code, orderSystemId,
                                                f"local_cancel_buy-{code}-{round(time.time() * 1000)}")
                                                        f"lcb-{code}-{round(time.time() * 1000)}")
                        break
                    except Exception as e:
                        logger_local_huaxin_trade_debug.exception(e)
                        time.sleep(0.01)
    @classmethod
    def add_need_cancel_local_order_id(cls, local_order_id):
l2/cancel_buy_strategy.py
@@ -1085,7 +1085,9 @@
        watch_indexes = set()
        # 小金额
        watch_indexes_small_money = set()
        for i in range(index + 1, total_data[-1]["index"] + 1):
        start_index = index + 1
        end_index = total_data[-1]["index"]
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data['val']
            if not L2DataUtil.is_limit_up_price_buy(val):
@@ -1110,7 +1112,7 @@
                if len(watch_indexes) >= constant.L_CANCEL_MAX_WATCH_COUNT:
                    break
        l2_log.l_cancel_debug(code, f"设置监听范围,成交进度-{index} 监听范围-{watch_indexes}")
        l2_log.l_cancel_debug(code, f"设置监听范围,成交进度-{index} , 数据范围:{start_index}-{end_index} 监听范围-{watch_indexes}")
        # 数据维护
        add_indexes = watch_indexes - old_watch_indexes
        delete_indexes = old_watch_indexes - watch_indexes
l2/huaxin/huaxin_target_codes_manager.py
@@ -36,8 +36,8 @@
        l2_codes_queue.clear()
    @classmethod
    def push(cls, datas):
        l2_codes_queue.put_nowait((int(time.time()), datas))
    def push(cls, datas,request_id=None):
        l2_codes_queue.put_nowait((int(time.time()), datas,request_id))
        logger_l2_codes_subscript.info("加入L2代码处理队列:数量-{}", len(datas))
        # cls.__get_redis().lpush(cls.__L2_CODE_KEY, json.dumps())
@@ -62,8 +62,8 @@
class HuaXinL1TargetCodesManager:
    @classmethod
    def set_level_1_codes_datas(cls, datas):
        logger_l2_codes_subscript.info("接受到L1的数据,开始预处理")
    def set_level_1_codes_datas(cls, datas, request_id=None):
        logger_l2_codes_subscript.info(f"({request_id})接受到L1的数据,开始预处理")
        yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
        # 订阅的代码
        flist = []
@@ -99,10 +99,10 @@
                     "zyltgb": zyltgb // 10000, "zyltgbUnit": 1}
            flist.append(fitem)
        code_volumn_manager.set_today_volumns(temp_volumns)
        logger_l2_codes_subscript.info("接受到L1的数据,预处理完成")
        logger_l2_codes_subscript.info(f"{request_id}接受到L1的数据,预处理完成")
        try:
            tick_datas = first_target_code_data_processor.process_first_codes_datas(flist)
            current_price_process_manager.accept_prices(tick_datas)
            tick_datas = first_target_code_data_processor.process_first_codes_datas(flist, request_id)
            current_price_process_manager.accept_prices(tick_datas,request_id)
        except Exception as e:
            logging.exception(e)
trade/current_price_process_manager.py
@@ -22,7 +22,7 @@
latest_add_codes = set()
def accept_prices(prices):
def accept_prices(prices, request_id=None):
    # 获取首板代码
    first_codes = gpcode_manager.FirstGPCodesManager().get_first_gp_codes_cache()
@@ -135,7 +135,7 @@
                if True:
                    print("设置L2代码数量:", len(add_code_set))
                    global latest_add_codes
                    logger_l2_codes_subscript.info(f"预处理新增订阅代码:{add_code_set - latest_add_codes}")
                    logger_l2_codes_subscript.info(f"({request_id})预处理新增订阅代码:{add_code_set - latest_add_codes}")
                    latest_add_codes = add_code_set
                    add_datas = []
                    for d in add_code_list:
@@ -143,7 +143,7 @@
                        limit_up_price = round(float(limit_up_price), 2)
                        min_volume = int(round(50 * 10000 / limit_up_price))
                        add_datas.append((d, min_volume, limit_up_price))
                    huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.push(add_datas)
                    huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.push(add_datas, request_id)
            except Exception as e:
                logging.exception(e)
        else:
trade/huaxin/trade_api_server.py
@@ -431,7 +431,8 @@
                times = _datas[0]
                datas = _datas[1]
                logger_l2_codes_subscript.info("读取L2代码处理队列:数量-{}", len(datas))
                request_id = _datas[2]
                logger_l2_codes_subscript.info("({})读取L2代码处理队列:数量-{}",request_id, len(datas))
                print("时间戳:", times)
                print("内容:", datas)
                # 只处理20s内的数据
@@ -445,7 +446,7 @@
                    root_data = socket_util.encryp_client_params_sign(root_data)
                    pipe_l2.send(json.dumps(root_data))
                    print("设置L2代码结束")
                    logger_l2_codes_subscript.info("发送到华鑫L2代码处理队列:数量-{}", len(datas))
                    logger_l2_codes_subscript.info("({})发送到华鑫L2代码处理队列:数量-{}",request_id, len(datas))
        except Exception as e:
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)
trade/huaxin/trade_server.py
@@ -276,8 +276,9 @@
    @classmethod
    def set_target_codes(cls, data_json):
        data = data_json["data"]
        request_id = data_json["request_id"]
        datas = data["data"]
        HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas)
        HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas,request_id = request_id)
    @classmethod
    def l2_order(cls, code, _datas, timestamp):