Administrator
2025-05-28 e3f4cf3bacac6eda31ddea6aaf70ebb883788817
L2成交大单解析
3个文件已修改
71 ■■■■■ 已修改文件
data_parser/transaction_big_order_parser.py 59 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_parser.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_test.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_parser/transaction_big_order_parser.py
@@ -201,7 +201,7 @@
__combined_df_cache = {}
def extract_big_order_of_all(dir_path):
def extract_big_order_of_all(dir_path, process_count=4):
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print_log("拼接数据不存在")
@@ -211,15 +211,25 @@
    for code in codes:
        extract_big_order_of_code(dir_path, code)
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print_log("拼接数据不存在")
        return
    output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
    if os.path.exists(output_path):
        print_log("路径已存在:", output_path)
        return
    df = __combined_df_cache.get(combined_path, None)
    if df is None:
        df = pd.read_csv(combined_path)
        __combined_df_cache[combined_path] = df
    args = [(code, df) for code in codes]
    # 新写法
    with Pool(processes=process_count) as pool:
        pool.map(__extract_big_order_of_code, args)
def extract_big_order_of_code(dir_path, code):
    """
    提取代码的大单
    @param dir_path: 数据目录
    @param code: 为空表示导出全部
    @return:
    """
def __extract_big_order_of_code(args):
    def first_last(group):
        """
            获取第一条数据与最后一条
@@ -237,18 +247,11 @@
            'StartPrice': group['StartPrice'].iloc[0]
        })
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print_log("拼接数据不存在")
        return
    dir_path, code, df = args[0], args[1], args[2]
    output_path = os.path.join(dir_path, f"big_buy_{code}.csv")
    if os.path.exists(output_path):
        print_log("路径已存在:", output_path)
        return
    df = __combined_df_cache.get(combined_path, None)
    if df is None:
        df = pd.read_csv(combined_path)
        __combined_df_cache[combined_path] = df
    df_copy = df.copy()
    if code:
        df_copy = df_copy[df_copy["SecurityID"] == int(code)]
@@ -264,6 +267,26 @@
    # 遍历内容
    grouped_result.to_csv(output_path, index=False)
    print_log(f"[{tool.get_now_time_str()}]保存成功,路径:{output_path}")
def extract_big_order_of_code(dir_path, code):
    """
    提取代码的大单
    @param dir_path: 数据目录
    @param code: 为空表示导出全部
    @return:
    """
    combined_path = os.path.join(dir_path, 'combined.csv')
    if not os.path.exists(combined_path):
        print_log("拼接数据不存在")
        return
    df = __combined_df_cache.get(combined_path, None)
    if df is None:
        df = pd.read_csv(combined_path)
        __combined_df_cache[combined_path] = df
    __extract_big_order_of_code((dir_path, code, df))
def extract_big_order_codes(dir_path):
@@ -289,9 +312,9 @@
if __name__ == "__main__":
    log(1,2,3)
    print_log(1, 2, 3)
    # pre_process_transactions("E:/测试数据/Transaction_Test.csv")
    # pre_process_ngtsticks("E:/测试数据/NGTSTick_Test.csv")
    # concat_pre_transactions("E:/测试数据/Transaction_Test")
    # extract_big_order_codes("E:/测试数据/Transaction_Test")
    extract_big_order_of_code("E:/测试数据/Transaction_Test")
    extract_big_order_of_all("E:/测试数据/Transaction_Test")
l2_data_parser.py
@@ -407,8 +407,12 @@
            # 命令模式  /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09
            # 根据code提取大单
            if not code:
                transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/NGTSTick")
                transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/Transaction")
                # indexed_data = [f"/home/userzjj/ftp/{day}/NGTSTick", f"/home/userzjj/ftp/{day}/Transaction"]
                # # 新写法
                # with Pool(processes=2) as pool:
                #     pool.map(transaction_big_order_parser.extract_big_order_of_all, indexed_data)
                transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/NGTSTick", process_count=10)
                transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/Transaction", process_count=10)
            else:
                if tool.is_sh_code(code):
                    transaction_big_order_parser.extract_big_order_of_code(f"/home/userzjj/ftp/{day}/NGTSTick", code)
l2_test.py
@@ -124,8 +124,8 @@
            volume = zylt_volume_map.get(code)
            # 今日涨停价要突破昨日最高价
            k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day)
            if k_bars and 30e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
                # 自由流通市值在30亿-300亿以上
            if k_bars and 10e8 <= k_bars[0]["close"] * volume * tool.get_limit_up_rate(code) <= 300e8:
                # 自由流通市值在10亿-300亿以上
                limit_up_price = round(tool.get_limit_up_rate(code) * k_bars[0]["close"], 2)
                if limit_up_price > k_bars[0]["high"]:
                    # 今日涨停价要突破昨日最高价