From b51b2ae184fad5aaf37a78903987e064f192d430 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 26 五月 2025 11:35:20 +0800
Subject: [PATCH] 大单解析修改

---
 l2_data_parser.py |   39 ++++++++++++++++++++++++++++++---------
 1 files changed, 30 insertions(+), 9 deletions(-)

diff --git a/l2_data_parser.py b/l2_data_parser.py
index e96b4ed..0574cfb 100644
--- a/l2_data_parser.py
+++ b/l2_data_parser.py
@@ -5,8 +5,11 @@
 import os
 import sys
 import time
+from multiprocessing import Pool
+
 import pandas as pd
 
+from data_parser import transaction_big_order_parser
 from db import mysql_data_delegate as mysql_data
 from huaxin_client.l2_client_test import L2TransactionDataManager
 from log_module import log_export
@@ -224,6 +227,9 @@
                 # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰
                 writer.writerow(row)
 
+def test1(args):
+    index, df = args
+    print(index)
 
 def pre_process_transactions(csv_path="E:/娴嬭瘯鏁版嵁/Transaction_Test.csv"):
     def str_to_float(s):
@@ -260,16 +266,24 @@
             'EndPrice': group['TradePrice'].iloc[-1]
         })
 
+
+
+
     dtype = {
         'SecurityID': 'category',  # 浣庡熀鏁板垎绫绘暟鎹�
     }
-    chunk_size = 100000
+    chunk_size = 10000
     # 鍒涘缓DataFrame
     chunks = pd.read_csv(csv_path, chunksize=chunk_size)
+    indexed_data = list(enumerate(chunks))
+    # 鏂板啓娉�
+    with Pool(processes=4) as pool:
+        pool.map(test1, indexed_data)
+
     result_list = []
-    index = 0
-    for df in chunks:
-        index += 1
+    for chunk_index, chunk in enumerate(chunks):
+        df = chunk.copy()
+        index = chunk_index + 1
         child_path = csv_path.replace(".csv", f"_{index}.csv")
         if os.path.exists(child_path):
             continue
@@ -279,7 +293,6 @@
         df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
         # 璁$畻鎴愪氦閲戦
         df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
-
 
         # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
         grouped = df.groupby(['SecurityID', 'BuyNo'])
@@ -319,7 +332,7 @@
 
     # [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp]
 
-    chunk_size = 200000
+    chunk_size = 10000
     # 鍒涘缓DataFrame
     chunks = pd.read_csv(csv_path, chunksize=chunk_size)
     result_list = []
@@ -348,7 +361,7 @@
 
 if __name__ == '__main__1':
     # df = pd.read_csv(f"E:/娴嬭瘯鏁版嵁/Transaction_Test.csv")
-    pre_process_ngtstick()
+    pre_process_transactions()
 
 # 鍛戒护妯″紡  /home/userzjj/app/gp-server/l2_data_parser Transaction  2025-05-08
 # 瑙f瀽澶у崟锛� /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/鏈�缁堟垚浜ゆ暟鎹�20250509.txt 000555
@@ -371,9 +384,17 @@
         elif _type == 'MarketData':
             parse_market_data(day)
         elif _type == 'Transaction_New':
-            pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv")
+            transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv")
+            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
         elif _type == 'NGTSTick_New':
-            pre_process_ngtstick(f"/home/userzjj/ftp/{day}/NGTSTick.csv")
+            transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv")
+            transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
+        elif _type == 'Transaction_Concat':
+            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
+        elif _type == 'NGTSTick_Concat':
+            transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
+
+
         elif _type == 'ExtractDealBigOrder':
             # 鎻愬彇鎵�鏈夋垚浜ょ殑澶у崟
             if len(params) > 2:

--
Gitblit v1.8.0