Administrator
2025-05-27 b96ce90271b2374850068e0f298c58310a2e0e24
l2_data_parser.py
@@ -227,9 +227,11 @@
                # 将文件写入到文本
                writer.writerow(row)
def test1(args):
    index, df = args
    print(index)
def pre_process_transactions(csv_path="E:/测试数据/Transaction_Test.csv"):
    def str_to_float(s):
@@ -265,9 +267,6 @@
            'EndTime': group['TradeTime'].iloc[-1],
            'EndPrice': group['TradePrice'].iloc[-1]
        })
    dtype = {
        'SecurityID': 'category',  # 低基数分类数据
@@ -384,10 +383,21 @@
        elif _type == 'MarketData':
            parse_market_data(day)
        elif _type == 'Transaction_New':
            transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv")
            if len(params) > 2:
                process_count = int(params[2].strip())
            else:
                process_count = 4
            transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv",
                                                                  process_count=process_count)
            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
        elif _type == 'NGTSTick_New':
            transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv")
            if len(params) > 2:
                process_count = int(params[2].strip())
            else:
                process_count = 4
            transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv",
                                                               process_count=process_count)
            transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
        elif _type == 'Transaction_Concat':
            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")