From afc88de23208d8af901d7c25145ec9420a58ce62 Mon Sep 17 00:00:00 2001 From: admin <admin@example.com> Date: 星期四, 28 八月 2025 09:17:18 +0800 Subject: [PATCH] 1.BUG --- strategy/instant_time_market.py | 85 +++++++++++++++++++++++++++++++++++------- 1 files changed, 70 insertions(+), 15 deletions(-) diff --git a/strategy/instant_time_market.py b/strategy/instant_time_market.py index 0cc2834..19e8a91 100644 --- a/strategy/instant_time_market.py +++ b/strategy/instant_time_market.py @@ -8,6 +8,7 @@ import dask +import constant import utils from log_module import async_log_util from log_module.log import logger_common, logger_debug @@ -79,6 +80,17 @@ pass # 鑾峰彇褰撳墠鏃堕棿 now_time = tool.get_now_time_str() + if constant.IS_FOR_BACKTEST or True: + if data_cache.OPEN_BIDDING_TIME <= now_time < "09:30:10": + for current_info in current_infos: + # 妫�鏌ヨ偂绁ㄦ槸鍚﹀凡缁忓湪data_cache涓� + # if current_info[0] not in data_cache.all_stocks_current_open: + symbol = basic_methods.format_stock_symbol(current_info[0]) + if symbol in data_cache.all_stocks_current_open: + continue + data_cache.all_stocks_current_open[symbol] = { + 'current_open': current_info[2] if current_info[2] > 0 else current_info[5][0][0]} + # 濡傛灉褰撳墠鏃堕棿澶т簬09:25:06鎵嶈繍琛屾渶楂樹环鍜屾渶浣庝环鐨勮繍绠� if data_cache.LATER_OPEN_BIDDING_TIME < now_time: # if data_cache.AFTER_CLOSING_TIME < now_time: @@ -92,6 +104,7 @@ # if current_info[0] not in data_cache.all_stocks_current_open: symbol = basic_methods.format_stock_symbol(current_info[0]) data_cache.all_stocks_current_open[symbol] = {'current_open': current_info[2]} + logger.info(f"{symbol}鐨勫紑鐩樹环涓�==銆媨current_info[2]}") # print(f"data_cache.all_stocks_current_open=={data_cache.all_stocks_current_open}") # json_data = data_cache.all_stocks_current_open # 灏嗚浆鎹㈠悗鐨凧SON瀛楃涓插啓鍏ユ枃浠�(鐩墠鑰冭檻鍙栨秷鏁版嵁瀛樺偍鏈湴) @@ -107,12 +120,15 @@ data_cache.record_current_open_execution = True current_datas = utils.juejin_api.JueJinApi.get_codes_open(data_cache.DataCache().min_stocks, fields='symbol,open') + if current_datas is not None: + logger_common.info(f"鎺橀噾鎷夊彇寮�鐩樹环锛歿len(current_datas)}") # print(f"current_datas=={current_datas}") for current_data in current_datas: # print(f"current_data=={current_data}") # 妫�鏌ヨ偂绁ㄦ槸鍚﹀凡缁忓湪data_cache涓� # if current_data[0] not in data_cache.all_stocks_current_open: data_cache.all_stocks_current_open[current_data['symbol']] = {'current_open': current_data['open']} + logger.info(f"{current_data['symbol']}鐨勬帢閲戝紑鐩樹环涓�==銆媨current_data['open']}") # 灏嗚浆鎹㈠悗鐨凧SON瀛楃涓插啓鍏ユ枃浠�(鐩墠鍙栨秷鏁版嵁瀛樺偍鏈湴锛屽闇�瀛樺偍鏈湴涔熻鏀惧湪D:鐩樿矾寰�) # with open('local_storage_data/all_stocks_current_open.json', 'w', encoding='utf-8') as f: # # 灏嗗瓧鍏歌浆鎹负 JSON 鏍煎紡鐨勫瓧绗︿覆 @@ -172,6 +188,8 @@ :param current_infos: :return: """ + if not current_infos: + return # 鑾峰彇褰撳墠鏃堕棿 now_time = tool.get_now_time_str() # 濡傛灉褰撳墠鏃堕棿澶т簬09:25:06鎵嶈繍琛屾渶楂樹环鍜屾渶浣庝环鐨勮繍绠� @@ -219,6 +237,20 @@ 'current_low': get_current_high_or_low.current_low} else: logger.info(f"銆愭病鏈夈�戝紑鐩樺墠鍚姩锛岄噰鐢ㄣ�愭帢閲戞暟鎹�戝垵濮嬪寲 鏈�楂樹环鏈�浣庝环锛岄噰鐢ㄣ�愬崕閼暟鎹�戞洿鏂� 鏈�楂樻渶浣庝环") + + # 寮�鐩樺墠鍔犺浇寮�鐩樹环/鏈�楂樹环/鏈�浣庝环 + if constant.IS_FOR_BACKTEST: + # 寮�鐩樹箣鍓嶅紑鐩樹环/鏈�楂樹环/鏈�浣庝环涓庡紑鐩樹环鐩稿悓 + for current_info in current_infos: + # print(f"寮�濮嬪惊鐜痗urrent_infos") + symbol = basic_methods.format_stock_symbol(current_info[0]) + if symbol not in data_cache.DataCache().min_stocks: + continue + if symbol in __current_high_or_low_dict: + continue + open, high, low = current_info[2], current_info[2], current_info[2] + __current_high_or_low_dict[symbol] = PriceTracker(0) + __current_high_or_low_dict[symbol].set_high_and_low_price(high, low) # print(f"褰撳墠鏃堕棿鏇存柊鏃堕棿锛歿now_time}") if not __current_high_or_low_dict: # 杩樻病鍒濆鍖� @@ -259,6 +291,7 @@ data_cache.all_stocks_current_high_and_low[symbol] = { 'current_high': price_track_manage.current_high, 'current_low': price_track_manage.current_low} + # print(f"data_cache.all_stocks_current_high_and_low[symbol]==={data_cache.all_stocks_current_high_and_low[symbol]}") @@ -294,6 +327,11 @@ # print(f"i===={i}") get_all_stocks_current_open(current_infos) get_all_stocks_current_high_and_low(current_infos) + + # 淇濆瓨鐜颁环 + for current_info in current_infos: + data_cache.current_l1_dict[current_info[0]] = current_info + for current_info in current_infos: try: if current_info is not None: @@ -313,23 +351,33 @@ # 鎶奵urrent_infos鐏屽叆鐩稿簲鐨勭嚎绋� def set_current_info(current_infos): - @dask.delayed + # @dask.delayed def process_current_infos(current_info_list): + __start_time = time.time() + use_time_list = [] for current_info in current_info_list: try: if current_info is not None: + _start_time = time.time() strategic_thread_manager(current_info) + use_time_list.append((time.time() - _start_time, current_info[0])) except Exception as error: logging.exception(error) # print("寮傚父锛�", current_info) logger_debug.exception(error) logger_debug.error(f"L1澶勭悊鍑洪敊锛歿current_info}") + use_time = time.time() - __start_time + if use_time > 0.5: + # 璁板綍瓒呰繃1s鐨勬暟鎹� + async_log_util.info(logger_debug, "L1鏁版嵁澶勭悊鏃堕棿缁熻锛歵hread-{} 鎬昏鐢ㄦ椂-{} 骞冲潎鑰楁椂-{} 鏈�澶ц�楁椂-{}", + tool.get_thread_id(), use_time, sum([x[0] for x in use_time_list]) / len(use_time_list), + max(use_time_list, key=lambda e: e[0])) - @dask.delayed - def batch_process_current_infos(fs): - return fs + # @dask.delayed + # def batch_process_current_infos(fs): + # return fs - logging.info(f"set_current_info杩涘叆") + # async_log_util.info(logger_common, f"set_current_info杩涘叆: {len(current_infos)}") now_start = time.time() try: now_time = tool.get_now_time_str() @@ -341,16 +389,21 @@ get_all_stocks_current_open(current_infos) get_all_stocks_current_high_and_low(current_infos) if current_infos: + # 淇濆瓨鐜颁环 + for current_info in current_infos: + data_cache.current_l1_dict[current_info[0]] = current_info + # 鍒嗘壒澶勭悊鏁版嵁 - ds = [] - total_count = len(current_infos) - page = 15 - page_size = total_count // page + 1 - for p in range(page): - temp_list = current_infos[p * page_size:(p + 1) * page_size] - ds.append(process_current_infos(temp_list)) - dask_result = batch_process_current_infos(ds) - dask_result.compute() + # ds = [] + # total_count = len(current_infos) + # page = 2 + # page_size = total_count // page + 1 + # for p in range(page): + # temp_list = current_infos[p * page_size:(p + 1) * page_size] + # ds.append(process_current_infos(temp_list)) + # dask_result = batch_process_current_infos(ds) + # dask_result.compute() + process_current_infos(current_infos) now_end: float = time.time() start_to_end = now_end - now_start print(f"杩愯涓�=={round(start_to_end, 2)} 绉�") @@ -358,7 +411,9 @@ except Exception as error: logging.exception(error) finally: - async_log_util.info(logger_debug, f"L1澶勭悊鏃堕棿锛歿time.time() - now_start}") + use_time = time.time() - now_start + if use_time > 1: + async_log_util.info(logger_debug, f"L1澶勭悊鏃堕棿锛歿use_time}") # 浠呬粎鐢ㄤ簬娴嬭瘯鏁版嵁杩涘叆绛栫暐鍚庣殑鏁版嵁鎯呭喌 # get_current_info() -- Gitblit v1.8.0