Administrator
2024-11-22 47f3ef30ebc8ee4cf714f6cfd33fcdc3331dd6cc
bug修复
3个文件已修改
38 ■■■■■ 已修改文件
code_attribute/first_target_code_data_processor.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/async_log_util.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
task/task_manager.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/first_target_code_data_processor.py
@@ -42,7 +42,7 @@
def process_first_codes_datas(dataList, request_id=None):
    logger_l2_codes_subscript.info(f"{request_id}加载l2代码相关数据")
    async_log_util.info(logger_l2_codes_subscript, f"{request_id}加载l2代码相关数据")
    # 获取最近5天的交易日期,为后面的数据计算做准备
    dates = HistoryKDatasUtils.get_latest_trading_date_cache(5)
    latest_trading_date = None
log_module/async_log_util.py
@@ -13,13 +13,19 @@
class AsyncLogManager:
    def __init__(self):
        self.__log_queue = queue.Queue()
        self.__log_queue = queue.Queue(maxsize=10240)
    def __add_log(self, logger, method, *args):
        self.__log_queue.put_nowait((logger, time.time(), method, args))
        try:
            self.__log_queue.put_nowait((logger, time.time(), method, args))
        except Exception:
            pass
    def add_log(self, data):
        self.__log_queue.put_nowait(data)
        try:
            self.__log_queue.put_nowait(data)
        except Exception:
            pass
    def debug(self, logger, *args):
        self.__add_log(logger, "debug", *args)
@@ -35,6 +41,13 @@
    def exception(self, logger, *args):
        self.__add_log(logger, "exception", *args)
    def get_queue_size(self):
        """
        获取队列大小
        @return:
        """
        self.__log_queue.qsize()
    # 运行同步日志
    def run_sync(self, add_to_common_log=False):
@@ -91,10 +104,7 @@
if __name__ == "__main__":
    # info(logger_debug, "*-{}", "test")
    asyncLogManager = AsyncLogManager()
    asyncLogManager.info(logger_debug, "测试123")
    threading.Thread(target=lambda: asyncLogManager.run_sync(), daemon=True).start()
    time.sleep(1)
    # info(logger_debug, "002375")
    run_sync()
    _queue = queue.Queue(maxsize=102400)
    for i in range(200):
        _queue.put_nowait("1")
task/task_manager.py
@@ -24,7 +24,6 @@
                val = queue_l1_w_strategy_r.get()
                if val:
                    val = json.loads(val)
                    # print("收到来自L1的数据:", val["type"])
                    # 处理数据
                    type_ = val["type"]
                    timestamp = val.get("time")
@@ -52,10 +51,9 @@
                times = _datas[0]
                datas = _datas[1]
                request_id = _datas[2]
                logger_l2_codes_subscript.info("({})读取L2代码处理队列:数量-{}", request_id, len(datas))
                async_log_util.info(logger_l2_codes_subscript, "({})读取L2代码处理队列:数量-{}", request_id, len(datas))
                # 只处理20s内的数据
                if time.time() - times < 20:
                    # 获取涨停列表中的数据
                    # datas中的数据格式:(代码, 现价, 涨幅, 量, 时间)
                    if not datas:
                        # 没有数据需要处理
@@ -78,7 +76,7 @@
                    #                                                                                KPLLimitUpDataRecordManager.get_current_reasons()),
                    #                          daemon=True).start()
                    #         time.sleep(0.2)
                    logger_l2_codes_subscript.info("({})发送到华鑫L2代码处理队列:数量-{}", request_id, len(datas))
                    async_log_util.info(logger_l2_codes_subscript, "({})发送到华鑫L2代码处理队列:数量-{}", request_id, len(datas))
        except Exception as e:
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)