2025-05-28 | Administrator | ![]() |
2025-05-28 | Administrator | ![]() |
2025-05-28 | Administrator | ![]() |
2025-05-28 | Administrator | ![]() |
2025-05-28 | Administrator | ![]() |
2025-05-28 | Administrator | ![]() |
2025-05-28 | Administrator | ![]() |
2025-05-28 | Administrator | ![]() |
2025-05-28 | Administrator | ![]() |
2025-05-28 | Administrator | ![]() |
2025-05-28 | Administrator | ![]() |
data_parser/transaction_big_order_parser.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
l2_data_parser.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
l2_test.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
servers/data_server.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
servers/huaxin_trade_server.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
trade/buy_radical/radical_buy_data_manager.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
trade/buy_radical/radical_buy_strategy.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
data_parser/transaction_big_order_parser.py
@@ -10,6 +10,10 @@ from utils import tool def print_log(*args): print(f"[{tool.get_now_time_str()}]", *args) class BigOrderDealParser: @classmethod @@ -55,7 +59,7 @@ child_path = os.path.join(children_dir, f"{index}.csv") if os.path.exists(child_path): return print(f"处理Transaction第{index}批次", os.getpid()) print_log(f"处理Transaction第{index}批次", os.getpid()) df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float) df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)] @@ -63,7 +67,7 @@ df['TradeAmount'] = df['TradePrice'] * df['TradeVolume'] df = df[df["TradeAmount"] > 0] # 判断是否为空 # print(df.empty) # print_log(df.empty) # 按SecurityID和BuyNo分组 grouped = df.groupby(['SecurityID', 'BuyNo']) # 应用聚合函数 @@ -106,7 +110,7 @@ child_path = os.path.join(children_dir, f"{index}.csv") if os.path.exists(child_path): return print(f"处理NGTSTick第{index}批次") log(f"处理NGTSTick第{index}批次") df = df[df["TickType"] == 'T'] df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float) df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) @@ -142,7 +146,7 @@ """ combined_path = os.path.join(dir_path, 'combined.csv') if os.path.exists(combined_path): print("合并的目标文件已存在") print_log("合并的目标文件已存在") return file_list = os.listdir(dir_path) file_list.sort(key=lambda x: int(re.findall(r'\d+', x)[0])) @@ -153,14 +157,14 @@ continue df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format) df_list.append(df) print("准备合并的文件数量:", len(df_list)) print_log("准备合并的文件数量:", len(df_list)) combined_df = pd.concat(df_list, ignore_index=True) print("合并完成,准备写入文件!") print_log("合并完成,准备写入文件!") # 保存结果 combined_df.to_csv(combined_path, index=False) print("写入文件完成!") print_log("写入文件完成!") def concat_pre_transactions(dir_path): @@ -179,7 +183,7 @@ """ combined_path = os.path.join(dir_path, 'combined.csv') if not os.path.exists(combined_path): print("拼接数据不存在") print_log("拼接数据不存在") return df = pd.read_csv(combined_path) df_copy = df.copy() @@ -187,21 +191,45 @@ # 应用聚合函数 chunk_result = grouped.apply(pd.Series({})) # chunk_result["SecurityID"] = chunk_result["SecurityID"].apply(BigOrderDealParser.code_format) print(chunk_result.to_string( print_log(chunk_result.to_string( index=False, # 不显示索引 justify='left', # 左对齐 float_format='%.3f' # 浮点数格式 )) def extract_big_order_of_code(dir_path, code=None): """ 提取代码的大单 @param dir_path: 数据目录 @param code: 为空表示导出全部 @return: """ __combined_df_cache = {} def extract_big_order_of_all(dir_path, process_count=4): combined_path = os.path.join(dir_path, 'combined.csv') if not os.path.exists(combined_path): print_log("拼接数据不存在") return codes = extract_big_order_codes(dir_path) print_log("总代码数量:", len(codes)) for code in codes: extract_big_order_of_code(dir_path, code) combined_path = os.path.join(dir_path, 'combined.csv') if not os.path.exists(combined_path): print_log("拼接数据不存在") return output_path = os.path.join(dir_path, f"big_buy_{code}.csv") if os.path.exists(output_path): print_log("路径已存在:", output_path) return df = __combined_df_cache.get(combined_path, None) if df is None: df = pd.read_csv(combined_path) __combined_df_cache[combined_path] = df args = [(code, df) for code in codes] # 新写法 with Pool(processes=process_count) as pool: pool.map(__extract_big_order_of_code, args) def __extract_big_order_of_code(args): def first_last(group): """ 获取第一条数据与最后一条 @@ -219,29 +247,46 @@ 'StartPrice': group['StartPrice'].iloc[0] }) combined_path = os.path.join(dir_path, 'combined.csv') if not os.path.exists(combined_path): print("拼接数据不存在") dir_path, code, df = args[0], args[1], args[2] output_path = os.path.join(dir_path, f"big_buy_{code}.csv") if os.path.exists(output_path): print_log("路径已存在:", output_path) return df = pd.read_csv(combined_path) df_copy = df.copy() if code: df_copy = df_copy[df_copy["SecurityID"] == int(code)] if df_copy.empty: print("目标代码对应成交数据为空") print_log("目标代码对应成交数据为空") return df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format) # 按SecurityID和BuyNo分组 grouped = df_copy.groupby(['SecurityID', 'BuyNo']) grouped_result = grouped.apply(first_last) grouped_result = grouped_result[grouped_result["TotalAmount"] > 500000] # print(grouped_result) # print_log(grouped_result) # 遍历内容 if code: grouped_result.to_csv(os.path.join(dir_path, f"big_buy_{code}.csv"), index=False) else: grouped_result.to_csv(os.path.join(dir_path, f"big_buy.csv"), index=False) print("保存成功") grouped_result.to_csv(output_path, index=False) print_log(f"[{tool.get_now_time_str()}]保存成功,路径:{output_path}") def extract_big_order_of_code(dir_path, code): """ 提取代码的大单 @param dir_path: 数据目录 @param code: 为空表示导出全部 @return: """ combined_path = os.path.join(dir_path, 'combined.csv') if not os.path.exists(combined_path): print_log("拼接数据不存在") return df = __combined_df_cache.get(combined_path, None) if df is None: df = pd.read_csv(combined_path) __combined_df_cache[combined_path] = df __extract_big_order_of_code((dir_path, code, df)) def extract_big_order_codes(dir_path): @@ -251,24 +296,14 @@ @param code: @return: """ def first_last(group): """ 获取第一条数据与最后一条 @param group: @return: """ return pd.Series({ }) combined_path = os.path.join(dir_path, 'combined.csv') if not os.path.exists(combined_path): print("拼接数据不存在") print_log("拼接数据不存在") return df = pd.read_csv(combined_path) df_copy = df.copy() if df_copy.empty: print("目标代码对应成交数据为空") print_log("目标代码对应成交数据为空") return df_copy["SecurityID"] = df_copy["SecurityID"].apply(BigOrderDealParser.code_format) # 按SecurityID和BuyNo分组 @@ -277,8 +312,9 @@ if __name__ == "__main__": print_log(1, 2, 3) # pre_process_transactions("E:/测试数据/Transaction_Test.csv") # pre_process_ngtsticks("E:/测试数据/NGTSTick_Test.csv") # concat_pre_transactions("E:/测试数据/Transaction_Test") # extract_big_order_codes("E:/测试数据/Transaction_Test") extract_big_order_of_code("E:/测试数据/Transaction_Test") extract_big_order_of_all("E:/测试数据/Transaction_Test") l2_data_parser.py
@@ -405,13 +405,11 @@ transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick") elif _type == 'ExtractDealBigOrder': # 命令模式 /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 # 根据code提取大单 if not code: transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/NGTSTick") transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/Transaction") if len(params) > 2: process_count = int(params[2].strip()) else: if tool.is_sh_code(code): transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/NGTSTick", code) else: transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/Transaction", code) process_count = 10 transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/NGTSTick", process_count=process_count) transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/Transaction", process_count=process_count) l2_test.py
@@ -124,10 +124,10 @@ volume = zylt_volume_map.get(code) # 今日涨停价要突破昨日最高价 k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day) if k_bars and 30e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8: # 自由流通市值在30亿-300亿以上 if k_bars and 10e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8: # 自由流通市值在10亿-300亿以上 limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2) if limit_up_price > k_bars[0]["high"]: if limit_up_price > k_bars[0]["high"] or True: # 今日涨停价要突破昨日最高价 codes.add(code) # 获取辨识度的票 servers/data_server.py
@@ -9,6 +9,7 @@ import requests import constant import inited_data from code_attribute.gpcode_manager import BlackListCodeManager, HumanRemoveForbiddenManager from l2.huaxin import huaxin_target_codes_manager from l2.l2_transaction_data_manager import HuaXinBuyOrderManager @@ -24,7 +25,7 @@ from trade.buy_radical.new_block_processor import BeforeBlocksComputer from trade.buy_strategy import OpenLimitUpGoodBlocksBuyStrategy from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager, BeforeSubDealBigOrderManager from utils import global_util, tool, data_export_util from utils import global_util, tool, data_export_util, init_data_util from code_attribute import gpcode_manager, code_nature_analyse from log_module import log_analyse, log_export, async_log_util from l2 import code_price_manager, l2_data_util, transaction_progress @@ -970,6 +971,11 @@ # 加载涨停大单详情 limit_up_big_order_detail = radical_buy_data_manager.get_total_detal_big_order_details( code) if max(limit_up_big_order_detail) == 0: # 没有数据,从网络加载 limit_up_big_order_detail = list(limit_up_big_order_detail) limit_up_big_order_detail[1] = deal_big_order_detail_info[1][0] limit_up_big_order_detail[3] = deal_big_order_detail_info[2][0] deal_big_order_info.append( output_util.money_desc(limit_up_big_order_detail[0] + limit_up_big_order_detail[1])) deal_big_order_info.append( @@ -979,6 +985,8 @@ except Exception as e: logger_debug.error(f"可能没有获取到涨停价:{code}") if not gpcode_manager.get_limit_up_price(code): init_data_util.re_set_price_pre(code) # logger_debug.exception(e) deal_big_order_info = None code_name = gpcode_manager.get_code_name(code) @@ -1060,7 +1068,7 @@ code_info_list.append((d[0], d[6])) # 保存新题材 datas = [(d[0], d[6]) for d in result["list"]] async_log_util.info(logger_kpl_new_blocks, f"{(tool.get_thread_id() ,bi, datas)}") async_log_util.info(logger_kpl_new_blocks, f"{(tool.get_thread_id(), bi, datas)}") if code_info_list: # 将代码加入新题材 new_block_processor.process_new_block_by_component_codes(bi[0], servers/huaxin_trade_server.py
@@ -828,7 +828,7 @@ radical_buy_data_manager.ExcludeIndexComputeCodesManager.remove_code(code) if result_by_volume[0] == radical_buy_strategy.BUY_MODE_DIRECT and not tool.is_sh_code(code): if result_by_volume[0] == radical_buy_strategy.BUY_MODE_DIRECT: # 上证不能根据成交买入 latest_deal_time = l2_huaxin_util.convert_time(transaction_datas[-1][3]) refer_sell_data = L2MarketSellManager().get_refer_sell_data(code, latest_deal_time) @@ -841,7 +841,8 @@ threshold_money = 0 every_deal_orders = EveryLimitupBigDealOrderManager.list_big_buy_deal_orders(code) if every_deal_orders: min_order_no = min(every_deal_orders, lambda x: x[0])[0] min_order_no_info = min(every_deal_orders, key=lambda x: x[0]) min_order_no = min_order_no_info[0] else: min_order_no = transaction_datas[-1][6] trade/buy_radical/radical_buy_data_manager.py
@@ -2180,7 +2180,6 @@ return datas return None def list_l2_big_order_deal_info(codes): """ 获取成交大单信息 trade/buy_radical/radical_buy_strategy.py
@@ -167,9 +167,22 @@ # 每次上板的大单与金额 big_order_count = radical_buy_data_manager.EveryLimitupBigDealOrderManager.get_big_buy_deal_order_count(code) big_order_money = radical_buy_data_manager.EveryLimitupBigDealOrderManager.get_big_buy_deal_order_money(code) total_lack_money_info = radical_buy_data_manager.get_total_deal_big_order_info(code, gpcode_manager.get_limit_up_price_as_num( code), is_for_buy=True) if total_lack_money_info and total_lack_money_info[2] > 1e8 and total_lack_money_info[0] <= 0: # 要求的大单够了 以后,回封买,只要有两个大单成交了,立即下单 THRESHOLD_MONEY, is_temp_threshold_money = radical_buy_data_manager.BeforeSubDealBigOrderManager().get_big_order_threshold_info( code) if big_order_money > THRESHOLD_MONEY * 2: return BUY_MODE_DIRECT, f"有两个以上大单瞬时成交({big_order_money}/{THRESHOLD_MONEY * 2}), 大单足够" if big_order_count >= 2: # 判断大单是否满足 average_big_order_money = int(big_order_money / big_order_count) # 如果均价涨幅小于7%均大单等于299w # 如果均价涨幅小于7%,均大单等于299w average_rate = Buy1PriceManager().get_average_rate(code) if average_rate is not None and average_rate < 0.07: average_big_order_money = 299 * 10000 @@ -200,12 +213,7 @@ if tool.is_sz_code(code) and refer_sell_money >= 5e7: return BUY_MODE_DIRECT, f"剩余涨停总卖额-{left_limit_up_sell_money},均大单-{average_big_order_money}, 剩余阈值-{threshold_left_sell_money}, 总抛压大({refer_sell_money})" else: # 判断大单是否满足 total_lack_money_info = radical_buy_data_manager.get_total_deal_big_order_info(code, gpcode_manager.get_limit_up_price_as_num( code), is_for_buy=True) if total_lack_money_info[0] <= 0: if total_lack_money_info[0] <= 0 and tool.is_sz_code(code): return BUY_MODE_DIRECT, f"剩余涨停总卖额-{left_limit_up_sell_money},均大单-{average_big_order_money}, 大单足够" else: average_big_order_money = 0