| | |
| | | # print(f"i===={i}") |
| | | get_all_stocks_current_open(current_infos) |
| | | get_all_stocks_current_high_and_low(current_infos) |
| | | |
| | | # 保存现价 |
| | | for current_info in current_infos: |
| | | data_cache.current_l1_dict[current_info[0]] = current_info |
| | | |
| | | for current_info in current_infos: |
| | | try: |
| | | if current_info is not None: |
| | |
| | | |
| | | # 把current_infos灌入相应的线程 |
| | | def set_current_info(current_infos): |
| | | @dask.delayed |
| | | # @dask.delayed |
| | | def process_current_infos(current_info_list): |
| | | __start_time = time.time() |
| | | use_time_list = [] |
| | | for current_info in current_info_list: |
| | | try: |
| | | if current_info is not None: |
| | | _start_time = time.time() |
| | | strategic_thread_manager(current_info) |
| | | use_time_list.append((time.time() - _start_time, current_info[0])) |
| | | except Exception as error: |
| | | logging.exception(error) |
| | | # print("异常:", current_info) |
| | | logger_debug.exception(error) |
| | | logger_debug.error(f"L1处理出错:{current_info}") |
| | | use_time = time.time() - __start_time |
| | | if use_time > 0.5: |
| | | # 记录超过1s的数据 |
| | | async_log_util.info(logger_debug, "L1数据处理时间统计:thread-{} 总计用时-{} 平均耗时-{} 最大耗时-{}", |
| | | tool.get_thread_id(), use_time, sum([x[0] for x in use_time_list]) / len(use_time_list), |
| | | max(use_time_list, key=lambda e: e[0])) |
| | | |
| | | @dask.delayed |
| | | def batch_process_current_infos(fs): |
| | | return fs |
| | | # @dask.delayed |
| | | # def batch_process_current_infos(fs): |
| | | # return fs |
| | | |
| | | logging.info(f"set_current_info进入") |
| | | now_start = time.time() |
| | |
| | | get_all_stocks_current_open(current_infos) |
| | | get_all_stocks_current_high_and_low(current_infos) |
| | | if current_infos: |
| | | # 保存现价 |
| | | for current_info in current_infos: |
| | | data_cache.current_l1_dict[current_info[0]] = current_info |
| | | |
| | | # 分批处理数据 |
| | | ds = [] |
| | | total_count = len(current_infos) |
| | | page = 15 |
| | | page_size = total_count // page + 1 |
| | | for p in range(page): |
| | | temp_list = current_infos[p * page_size:(p + 1) * page_size] |
| | | ds.append(process_current_infos(temp_list)) |
| | | dask_result = batch_process_current_infos(ds) |
| | | dask_result.compute() |
| | | # ds = [] |
| | | # total_count = len(current_infos) |
| | | # page = 2 |
| | | # page_size = total_count // page + 1 |
| | | # for p in range(page): |
| | | # temp_list = current_infos[p * page_size:(p + 1) * page_size] |
| | | # ds.append(process_current_infos(temp_list)) |
| | | # dask_result = batch_process_current_infos(ds) |
| | | # dask_result.compute() |
| | | process_current_infos(current_infos) |
| | | now_end: float = time.time() |
| | | start_to_end = now_end - now_start |
| | | print(f"运行中=={round(start_to_end, 2)} 秒") |