admin
12 小时以前 bc1913abfdc364d97facb3edcd2d54130c8e1ce2
strategy/instant_time_market.py
@@ -294,6 +294,11 @@
            #         print(f"i===={i}")
            get_all_stocks_current_open(current_infos)
            get_all_stocks_current_high_and_low(current_infos)
            # 保存现价
            for current_info in current_infos:
                data_cache.current_l1_dict[current_info[0]] = current_info
            for current_info in current_infos:
                try:
                    if current_info is not None:
@@ -313,21 +318,31 @@
# 把current_infos灌入相应的线程
def set_current_info(current_infos):
    @dask.delayed
    # @dask.delayed
    def process_current_infos(current_info_list):
        __start_time = time.time()
        use_time_list = []
        for current_info in current_info_list:
            try:
                if current_info is not None:
                    _start_time = time.time()
                    strategic_thread_manager(current_info)
                    use_time_list.append((time.time() - _start_time, current_info[0]))
            except Exception as error:
                logging.exception(error)
                # print("异常:", current_info)
                logger_debug.exception(error)
                logger_debug.error(f"L1处理出错:{current_info}")
        use_time = time.time() - __start_time
        if use_time > 0.5:
            # 记录超过1s的数据
            async_log_util.info(logger_debug, "L1数据处理时间统计:thread-{} 总计用时-{} 平均耗时-{} 最大耗时-{}",
                                tool.get_thread_id(), use_time, sum([x[0] for x in use_time_list]) / len(use_time_list),
                                max(use_time_list, key=lambda e: e[0]))
    @dask.delayed
    def batch_process_current_infos(fs):
        return fs
    # @dask.delayed
    # def batch_process_current_infos(fs):
    #     return fs
    logging.info(f"set_current_info进入")
    now_start = time.time()
@@ -341,16 +356,21 @@
        get_all_stocks_current_open(current_infos)
        get_all_stocks_current_high_and_low(current_infos)
        if current_infos:
            # 保存现价
            for current_info in current_infos:
                data_cache.current_l1_dict[current_info[0]] = current_info
            # 分批处理数据
            ds = []
            total_count = len(current_infos)
            page = 15
            page_size = total_count // page + 1
            for p in range(page):
                temp_list = current_infos[p * page_size:(p + 1) * page_size]
                ds.append(process_current_infos(temp_list))
            dask_result = batch_process_current_infos(ds)
            dask_result.compute()
            # ds = []
            # total_count = len(current_infos)
            # page = 2
            # page_size = total_count // page + 1
            # for p in range(page):
            #     temp_list = current_infos[p * page_size:(p + 1) * page_size]
            #     ds.append(process_current_infos(temp_list))
            # dask_result = batch_process_current_infos(ds)
            # dask_result.compute()
            process_current_infos(current_infos)
        now_end: float = time.time()
        start_to_end = now_end - now_start
        print(f"运行中=={round(start_to_end, 2)} 秒")