| | |
| | | |
| | | # 把current_infos灌入相应的线程 |
| | | def set_current_info(current_infos): |
| | | @dask.delayed |
| | | # @dask.delayed |
| | | def process_current_infos(current_info_list): |
| | | __start_time = time.time() |
| | | use_time_list = [] |
| | |
| | | tool.get_thread_id(), use_time, sum(use_time_list) / len(use_time_list), |
| | | max(use_time_list)) |
| | | |
| | | @dask.delayed |
| | | def batch_process_current_infos(fs): |
| | | return fs |
| | | # @dask.delayed |
| | | # def batch_process_current_infos(fs): |
| | | # return fs |
| | | |
| | | logging.info(f"set_current_info进入") |
| | | now_start = time.time() |
| | |
| | | data_cache.current_l1_dict[current_info[0]] = current_info |
| | | |
| | | # 分批处理数据 |
| | | ds = [] |
| | | total_count = len(current_infos) |
| | | page = 2 |
| | | page_size = total_count // page + 1 |
| | | for p in range(page): |
| | | temp_list = current_infos[p * page_size:(p + 1) * page_size] |
| | | ds.append(process_current_infos(temp_list)) |
| | | dask_result = batch_process_current_infos(ds) |
| | | dask_result.compute() |
| | | # ds = [] |
| | | # total_count = len(current_infos) |
| | | # page = 2 |
| | | # page_size = total_count // page + 1 |
| | | # for p in range(page): |
| | | # temp_list = current_infos[p * page_size:(p + 1) * page_size] |
| | | # ds.append(process_current_infos(temp_list)) |
| | | # dask_result = batch_process_current_infos(ds) |
| | | # dask_result.compute() |
| | | process_current_infos(current_infos) |
| | | now_end: float = time.time() |
| | | start_to_end = now_end - now_start |
| | | print(f"运行中=={round(start_to_end, 2)} 秒") |