Administrator
2023-08-28 49c42567a7689b68ce8806189117d307520537aa
增加L2订阅日志
6个文件已修改
44 ■■■■ 已修改文件
huaxin_client/l1_client.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_target_codes_manager.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/current_price_process_manager.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_api_server.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_client.py
@@ -9,7 +9,7 @@
import xmdapi
from huaxin_client import tool
from huaxin_client.client_network import SendResponseSkManager
from log_module.log import logger_system, logger_local_huaxin_l1
from log_module.log import logger_system, logger_local_huaxin_l1, logger_l2_codes_subscript
level1_data_dict = {
@@ -178,6 +178,9 @@
    api.Init()
    logger_system.info("L1订阅服务启动成功")
    # TODO 测试链路
    level1_data_dict["000969"] = (
        "000969", 9.46, 9.11, 771000*100, time.time())
    # 等待程序结束
    while True:
@@ -198,6 +201,7 @@
            datas = flist[:100]
            codes = [x[0] for x in datas]
            print("代码数量:", len(datas))
            logger_l2_codes_subscript.info("华鑫L1上传代码:数量-{}",len(datas))
            __upload_codes_info(pipe_l2, datas)
        except Exception as e:
            logging.exception(e)
huaxin_client/l2_client.py
@@ -13,7 +13,7 @@
from huaxin_client.command_manager import L2ActionCallback
from log_module import log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \
    logger_local_huaxin_g_cancel
    logger_local_huaxin_g_cancel, logger_l2_codes_subscript
Front_Address = "tcp://10.0.1.101:6900"
Multicast_Address = "udp://224.224.2.19:7889"
@@ -134,12 +134,17 @@
        if add_codes:
            logger_system.info(f"新增L2订阅代码数量:{len(add_codes)}")
        logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes))
        # 设置最近的代码列表
        self.latest_codes_set = codes
    # 订阅代码,[(代码,最低手数,涨停价)]
    def set_codes_data(self, codes_data):
        self.__process_codes_data(codes_data)
        try:
            self.__process_codes_data(codes_data)
        except Exception as e:
            logger_l2_codes_subscript.exception(e)
    def set_code_special_watch_volume(self, code, volume):
        # 有效期为3s
@@ -496,6 +501,7 @@
    def OnSetL2Position(self, client_id, request_id, codes_data):
        print("L2订阅数量:", len(codes_data))
        logger_l2_codes_subscript.info("华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data))
        try:
            spi.set_codes_data(codes_data)
        except Exception as e:
l2/huaxin/huaxin_target_codes_manager.py
@@ -11,6 +11,7 @@
from code_attribute import global_data_loader, code_volumn_manager, first_target_code_data_processor
from code_attribute.code_data_util import ZYLTGBUtil
from db import redis_manager_delegate as redis_manager
from log_module.log import logger_l2_codes_subscript
from third_data import kpl_data_manager, kpl_api
from trade import current_price_process_manager
from utils import tool, global_util, socket_util
@@ -37,6 +38,7 @@
    @classmethod
    def push(cls, datas):
        l2_codes_queue.put_nowait((int(time.time()), datas))
        logger_l2_codes_subscript.info("加入L2代码处理队列:数量-{}", len(datas))
        # cls.__get_redis().lpush(cls.__L2_CODE_KEY, json.dumps())
    @classmethod
@@ -96,7 +98,7 @@
                     "zyltgb": zyltgb // 10000, "zyltgbUnit": 1}
            flist.append(fitem)
        code_volumn_manager.set_today_volumns(temp_volumns)
        print("首板代码数量", len(flist))
        logger_l2_codes_subscript.info("首板代码数量:{}", len(flist))
        try:
            tick_datas = first_target_code_data_processor.process_first_codes_datas(flist)
            current_price_process_manager.accept_prices(tick_datas)
log_module/log.py
@@ -92,6 +92,10 @@
                   filter=lambda record: record["extra"].get("name") == "l2_real_place_order_position",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_subscript"),
                   filter=lambda record: record["extra"].get("name") == "l2_codes_subscript",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("juejin", "juejin_tick"),
                   filter=lambda record: record["extra"].get("name") == "juejin_tick",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -290,6 +294,9 @@
logger_l2_trade_buy_queue = __mylogger.get_logger("l2_trade_buy_queue")
logger_l2_trade_buy_progress = __mylogger.get_logger("l2_trade_buy_progress")
logger_real_place_order_position = __mylogger.get_logger("l2_real_place_order_position")
# 代码订阅日志
logger_l2_codes_subscript = __mylogger.get_logger("l2_codes_subscript")
logger_l2_big_data = __mylogger.get_logger("l2_big_data")
logger_juejin_tick = __mylogger.get_logger("juejin_tick")
trade/current_price_process_manager.py
@@ -6,6 +6,7 @@
import logging
from l2.huaxin import huaxin_target_codes_manager
from log_module.log import logger_l2_codes_subscript
from ths import client_manager
import constant
from code_attribute import gpcode_manager
@@ -28,12 +29,11 @@
    print("总价格代码数量:", len(prices))
    __actualPriceProcessor.save_current_price_codes_count(len(prices))
    # 采集的代码数量不对
    if len(gpcode_manager.get_gp_list()) - len(prices) > 10:
        print("采集到的代码数量不正确:", len(prices))
        return
    # 采集的代码数量不对, 暂时不需要
    # if len(gpcode_manager.get_gp_list()) - len(prices) > 10:
    #     logger_l2_codes_subscript.info("采集到的代码数量不正确:{}", len(prices))
    #     return
    now_str = tool.get_now_time_str()
    now_strs = now_str.split(":")
    # 获取想买单
    want_codes = gpcode_manager.WantBuyCodesManager().list_code_cache()
    if True:
@@ -68,6 +68,7 @@
                                                                  decimal.Decimal(d["price"])))
                except Exception as e:
                    logging.exception(e)
                    logger_l2_codes_subscript.exception(e)
        gpcode_manager.set_prices(temp_prices)
        __actualPriceProcessor.process_rates(temp_rates, now_str)
        # -------------------------------处理交易位置分配---------------------------------
trade/huaxin/trade_api_server.py
@@ -14,7 +14,7 @@
from db.redis_manager_delegate import RedisUtils
from l2 import l2_data_manager_new
from l2.huaxin import huaxin_target_codes_manager
from log_module.log import logger_system
from log_module.log import logger_system, logger_l2_codes_subscript
from third_data import block_info
from third_data.history_k_data_util import HistoryKDatasUtils, JueJinApi
from third_data.kpl_data_manager import KPLDataManager
@@ -426,8 +426,10 @@
        try:
            _datas = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.pop()
            if _datas:
                timestamp = _datas[0]
                datas = _datas[1]
                logger_l2_codes_subscript.info("读取L2代码处理队列:数量-{}", len(datas))
                print("时间戳:", timestamp)
                print("内容:", datas)
                # 只处理20s内的数据
@@ -441,8 +443,10 @@
                    root_data = socket_util.encryp_client_params_sign(root_data)
                    pipe_l2.send(json.dumps(root_data))
                    print("设置L2代码结束")
                    logger_l2_codes_subscript.info("发送到华鑫L2代码处理队列:数量-{}", len(datas))
        except Exception as e:
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)
        finally:
            time.sleep(1)