Administrator
8 分钟以前 2f2516749615da866e96d8d24e499b7ecbb63a3e
task/task_manager.py
@@ -4,9 +4,11 @@
import threading
import time
import constant
from db import redis_manager_delegate as redis_manager
from l2 import l2_log
from l2.huaxin import huaxin_target_codes_manager
from l2.subscript import l2_subscript_manager
from log_module import async_log_util
from log_module.log import logger_system, logger_l2_codes_subscript, logger_debug
from servers.huaxin_trade_server import TradeServerProcessor
@@ -24,7 +26,6 @@
                val = queue_l1_w_strategy_r.get()
                if val:
                    val = json.loads(val)
                    # print("收到来自L1的数据:", val["type"])
                    # 处理数据
                    type_ = val["type"]
                    timestamp = val.get("time")
@@ -52,10 +53,9 @@
                times = _datas[0]
                datas = _datas[1]
                request_id = _datas[2]
                logger_l2_codes_subscript.info("({})读取L2代码处理队列:数量-{}", request_id, len(datas))
                async_log_util.info(logger_l2_codes_subscript, "({})读取L2代码处理队列:数量-{}", request_id, len(datas))
                # 只处理20s内的数据
                if time.time() - times < 20:
                    # 获取涨停列表中的数据
                    # datas中的数据格式:(代码, 现价, 涨幅, 量, 时间)
                    if not datas:
                        # 没有数据需要处理
@@ -65,9 +65,19 @@
                    codes = [d[0] for d in datas]
                    for code in codes:
                        block_info.init_code(code)
                    root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                 "data": datas}
                    queue_other_w_l2_r.put_nowait(json.dumps(root_data))
                    if constant.IS_L2_NEW:
                        process_manager: l2_subscript_manager.TargetCodeProcessManager = l2_subscript_manager\
                            .process_manager
                        queue_codes_list = process_manager.set_codes(set(codes))
                        code_data_dict = {d[0]: d for d in datas}
                        for queue_codes in queue_codes_list:
                            root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                         "data": [code_data_dict.get(c) for c in queue_codes[1]]}
                            queue_codes[0].put_nowait(json.dumps(root_data))
                    else:
                        root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2,
                                     "data": datas}
                        queue_other_w_l2_r.put_nowait(json.dumps(root_data))
                    # 如果在9:25-9:29 需要加载板块
                    # if int("092500") < int(tool.get_now_time_str().replace(":", "")) < int("092900"):
                    #     for d in datas:
@@ -78,7 +88,7 @@
                    #                                                                                KPLLimitUpDataRecordManager.get_current_reasons()),
                    #                          daemon=True).start()
                    #         time.sleep(0.2)
                    logger_l2_codes_subscript.info("({})发送到华鑫L2代码处理队列:数量-{}", request_id, len(datas))
                    async_log_util.info(logger_l2_codes_subscript, "({})发送到华鑫L2代码处理队列:数量-{}", request_id, len(datas))
        except Exception as e:
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)