From e3f4cf3bacac6eda31ddea6aaf70ebb883788817 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 28 五月 2025 17:38:07 +0800 Subject: [PATCH] L2成交大单解析 --- l2_data_parser.py | 8 +++- l2_test.py | 4 +- data_parser/transaction_big_order_parser.py | 59 ++++++++++++++++++++--------- 3 files changed, 49 insertions(+), 22 deletions(-) diff --git a/data_parser/transaction_big_order_parser.py b/data_parser/transaction_big_order_parser.py index aad957a..84dae46 100644 --- a/data_parser/transaction_big_order_parser.py +++ b/data_parser/transaction_big_order_parser.py @@ -201,7 +201,7 @@ __combined_df_cache = {} -def extract_big_order_of_all(dir_path): +def extract_big_order_of_all(dir_path, process_count=4): combined_path = os.path.join(dir_path, 'combined.csv') if not os.path.exists(combined_path): print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�") @@ -211,15 +211,25 @@ for code in codes: extract_big_order_of_code(dir_path, code) + combined_path = os.path.join(dir_path, 'combined.csv') + if not os.path.exists(combined_path): + print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�") + return + output_path = os.path.join(dir_path, f"big_buy_{code}.csv") + if os.path.exists(output_path): + print_log("璺緞宸插瓨鍦�:", output_path) + return + df = __combined_df_cache.get(combined_path, None) + if df is None: + df = pd.read_csv(combined_path) + __combined_df_cache[combined_path] = df + args = [(code, df) for code in codes] + # 鏂板啓娉� + with Pool(processes=process_count) as pool: + pool.map(__extract_big_order_of_code, args) -def extract_big_order_of_code(dir_path, code): - """ - 鎻愬彇浠g爜鐨勫ぇ鍗� - @param dir_path: 鏁版嵁鐩綍 - @param code: 涓虹┖琛ㄧず瀵煎嚭鍏ㄩ儴 - @return: - """ +def __extract_big_order_of_code(args): def first_last(group): """ 鑾峰彇绗竴鏉℃暟鎹笌鏈�鍚庝竴鏉� @@ -237,18 +247,11 @@ 'StartPrice': group['StartPrice'].iloc[0] }) - combined_path = os.path.join(dir_path, 'combined.csv') - if not os.path.exists(combined_path): - print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�") - return + dir_path, code, df = args[0], args[1], args[2] output_path = os.path.join(dir_path, f"big_buy_{code}.csv") if os.path.exists(output_path): print_log("璺緞宸插瓨鍦�:", output_path) return - df = __combined_df_cache.get(combined_path, None) - if df is None: - df = pd.read_csv(combined_path) - __combined_df_cache[combined_path] = df df_copy = df.copy() if code: df_copy = df_copy[df_copy["SecurityID"] == int(code)] @@ -264,6 +267,26 @@ # 閬嶅巻鍐呭 grouped_result.to_csv(output_path, index=False) print_log(f"[{tool.get_now_time_str()}]淇濆瓨鎴愬姛锛岃矾寰勶細{output_path}") + + +def extract_big_order_of_code(dir_path, code): + """ + 鎻愬彇浠g爜鐨勫ぇ鍗� + @param dir_path: 鏁版嵁鐩綍 + @param code: 涓虹┖琛ㄧず瀵煎嚭鍏ㄩ儴 + @return: + """ + + combined_path = os.path.join(dir_path, 'combined.csv') + if not os.path.exists(combined_path): + print_log("鎷兼帴鏁版嵁涓嶅瓨鍦�") + return + + df = __combined_df_cache.get(combined_path, None) + if df is None: + df = pd.read_csv(combined_path) + __combined_df_cache[combined_path] = df + __extract_big_order_of_code((dir_path, code, df)) def extract_big_order_codes(dir_path): @@ -289,9 +312,9 @@ if __name__ == "__main__": - log(1,2,3) + print_log(1, 2, 3) # pre_process_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test.csv") # pre_process_ngtsticks("E:/娴嬭瘯鏁版嵁/NGTSTick_Test.csv") # concat_pre_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test") # extract_big_order_codes("E:/娴嬭瘯鏁版嵁/Transaction_Test") - extract_big_order_of_code("E:/娴嬭瘯鏁版嵁/Transaction_Test") + extract_big_order_of_all("E:/娴嬭瘯鏁版嵁/Transaction_Test") diff --git a/l2_data_parser.py b/l2_data_parser.py index 3d72c80..5433d19 100644 --- a/l2_data_parser.py +++ b/l2_data_parser.py @@ -407,8 +407,12 @@ # 鍛戒护妯″紡 /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 # 鏍规嵁code鎻愬彇澶у崟 if not code: - transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/NGTSTick") - transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/Transaction") + # indexed_data = [f"/home/userzjj/ftp/{day}/NGTSTick", f"/home/userzjj/ftp/{day}/Transaction"] + # # 鏂板啓娉� + # with Pool(processes=2) as pool: + # pool.map(transaction_big_order_parser.extract_big_order_of_all, indexed_data) + transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/NGTSTick", process_count=10) + transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/Transaction", process_count=10) else: if tool.is_sh_code(code): transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/NGTSTick", code) diff --git a/l2_test.py b/l2_test.py index 8db7571..161d834 100644 --- a/l2_test.py +++ b/l2_test.py @@ -124,8 +124,8 @@ volume = zylt_volume_map.get(code) # 浠婃棩娑ㄥ仠浠疯绐佺牬鏄ㄦ棩鏈�楂樹环 k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day) - if k_bars and 30e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8: - # 鑷敱娴侀�氬競鍊煎湪30浜�-300浜夸互涓� + if k_bars and 10e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8: + # 鑷敱娴侀�氬競鍊煎湪10浜�-300浜夸互涓� limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2) if limit_up_price > k_bars[0]["high"]: # 浠婃棩娑ㄥ仠浠疯绐佺牬鏄ㄦ棩鏈�楂樹环 -- Gitblit v1.8.0