From 638865a032eb403bdf61a5808acb579a25ec7be1 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 23 五月 2025 00:30:55 +0800
Subject: [PATCH] L2成交数据解析

---
 l2_data_parser.py |   77 ++++++++++++++++++++++++++++++++++----
 1 files changed, 69 insertions(+), 8 deletions(-)

diff --git a/l2_data_parser.py b/l2_data_parser.py
index abdbe66..8018fc5 100644
--- a/l2_data_parser.py
+++ b/l2_data_parser.py
@@ -10,6 +10,7 @@
 from db import mysql_data_delegate as mysql_data
 from huaxin_client.l2_client_test import L2TransactionDataManager
 from log_module import log_export
+from utils import tool
 
 
 def __get_target_codes(day):
@@ -50,13 +51,6 @@
     fdatas = []
     df = pd.read_csv(f"{base_path}/Transaction.csv")
     category_revenue = df.groupby('BuyNo')['TradeVolume'].sum()
-    total_count = 0
-    for chunk in chunks:
-        total_count += len(chunk)
-    for chunk in chunks:
-        total_count += len(chunk)
-
-
 
     with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file:
         csv_reader = csv.reader(file)
@@ -231,9 +225,74 @@
                 writer.writerow(row)
 
 
+def test(csv_path="E:/娴嬭瘯鏁版嵁/Transaction_Test.csv"):
+    def str_to_float(s):
+        try:
+            # 绉婚櫎鍗曚綅骞惰浆鎹�
+            return round(float(s.split("@")[0]), 2)
+        except:
+            return float("nan")
+
+    def code_format(s):
+        try:
+            code = "{0:0>6}".format(s)
+            return code
+        except:
+            return ''
+
+    # [ExchangeID, SecurityID, TradeTime, TradePrice, TradeVolume, ExecType, MainSeq, SubSeq, BuyNo, SellNo, Info1, Info2,
+    #  Info3, TradeBSFlag, BizIndex, LocalTimeStamp]
+    # transaction_data = {
+    #     "SecurityID": ['300920', '300920', '300920', '300920'],
+    #     "TradeTime": [91500040, 91500041, 91500042, 92000040],
+    #     "TradePrice": [15.0, 16.2, 15.2, 16.3],
+    #     "TradeVolume": [100, 100, 200, 300],
+    #     "BuyNo": [0, 1, 1, 1]
+    # }
+    # 瀹氫箟鑱氬悎鍑芥暟
+    def first_last(group):
+        return pd.Series({
+            'TotalAmount': group['TradeAmount'].sum(),
+            'TotalVolume': group['TradeVolume'].sum(),
+            'StartTime': group['TradeTime'].iloc[0],
+            'StartPrice': group['TradePrice'].iloc[0],
+            'EndTime': group['TradeTime'].iloc[-1],
+            'EndPrice': group['TradePrice'].iloc[-1]
+        })
+
+    dtype = {
+        'SecurityID': 'category',  # 浣庡熀鏁板垎绫绘暟鎹�
+    }
+    chunk_size = 100000
+    # 鍒涘缓DataFrame
+    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
+    result_list = []
+    index = 0
+    for df in chunks:
+        index += 1
+        print(f"澶勭悊绗瑊index}鎵规")
+        df["TradePrice"] = df["TradePrice"].apply(str_to_float)
+        df["SecurityID"] = df["SecurityID"].apply(code_format)
+        # 璁$畻鎴愪氦閲戦
+        df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
+
+        # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
+        grouped = df.groupby(['SecurityID', 'BuyNo'])
+
+        # 搴旂敤鑱氬悎鍑芥暟
+        chunk_result = grouped.apply(first_last).reset_index()
+        child_path = csv_path.replace(".csv", f"_{index}.csv")
+        chunk_result.to_csv(child_path, index=False)
+    print(f"澶勭悊瀹屾瘯锛屾�诲叡{index}鎵�")
+
+
+if __name__ == '__main__':
+    # df = pd.read_csv(f"E:/娴嬭瘯鏁版嵁/Transaction_Test.csv")
+    test()
+
 # 鍛戒护妯″紡  /home/userzjj/app/gp-server/l2_data_parser Transaction  2025-05-08
 # 瑙f瀽澶у崟锛� /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/鏈�缁堟垚浜ゆ暟鎹�20250509.txt 000555
-if __name__ == '__main__':
+if __name__ == '__main__1':
     if len(sys.argv) > 1:
         params = sys.argv[1:]
         print("鎺ユ敹鐨勫弬鏁�", params)
@@ -251,6 +310,8 @@
             parse_ngtstick(day)
         elif _type == 'MarketData':
             parse_market_data(day)
+        elif _type == 'Transaction_New':
+            test(f"/home/userzjj/ftp/{day}/Transaction.csv")
         elif _type == 'ExtractDealBigOrder':
             # 鎻愬彇鎵�鏈夋垚浜ょ殑澶у崟
             if len(params) > 2:

--
Gitblit v1.8.0