From cc2c7a8514d3d6eea4f73b5875c2500883d165f0 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 26 五月 2025 22:57:20 +0800
Subject: [PATCH] 增加自动加白接口

---
 data_parser/transaction_big_order_parser.py |  203 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 203 insertions(+), 0 deletions(-)

diff --git a/data_parser/transaction_big_order_parser.py b/data_parser/transaction_big_order_parser.py
new file mode 100644
index 0000000..48be723
--- /dev/null
+++ b/data_parser/transaction_big_order_parser.py
@@ -0,0 +1,203 @@
+"""
+澶у崟鎴愪氦鏁版嵁瑙f瀽鍣�
+"""
+import os
+import re
+from multiprocessing import Pool
+
+import pandas as pd
+
+from utils import tool
+
+
+class BigOrderDealParser:
+
+    @classmethod
+    def str_to_float(cls, s):
+        try:
+            # 绉婚櫎鍗曚綅骞惰浆鎹�
+            return round(float(s.split("@")[0]), 2)
+        except:
+            return float("nan")
+
+    @classmethod
+    def code_format(cls, s):
+        try:
+            code = "{0:0>6}".format(s)
+            return code
+        except:
+            return ''
+
+
+def __pre_process_transactions_detail(args):
+    def first_last(group):
+        """
+            鑾峰彇绗竴鏉℃暟鎹笌鏈�鍚庝竴鏉�
+            @param group:
+            @return:
+            """
+        return pd.Series({
+            'TotalAmount': group['TradeAmount'].sum(),
+            'TotalVolume': group['TradeVolume'].sum(),
+            'StartTime': group['TradeTime'].iloc[0],
+            'StartPrice': group['TradePrice'].iloc[0],
+            'EndTime': group['TradeTime'].iloc[-1],
+            'EndPrice': group['TradePrice'].iloc[-1]
+        })
+
+    chunk_index, chunk_data = args[0]
+    csv_path = args[1]
+    df = chunk_data.copy()
+    index = chunk_index + 1
+    children_dir = csv_path.replace(".csv", "")
+    if not os.path.exists(children_dir):
+        os.makedirs(children_dir)
+    child_path = os.path.join(children_dir, f"{index}.csv")
+    if os.path.exists(child_path):
+        return
+    print(f"澶勭悊Transaction绗瑊index}鎵规", os.getpid())
+    df["TradePrice"] = df["TradePrice"].apply(BigOrderDealParser.str_to_float)
+    df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
+    df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
+    # 璁$畻鎴愪氦閲戦
+    df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
+    df = df[df["TradeAmount"] > 0]
+    # 鍒ゆ柇鏄惁涓虹┖
+    # print(df.empty)
+    # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
+    grouped = df.groupby(['SecurityID', 'BuyNo'])
+    # 搴旂敤鑱氬悎鍑芥暟
+    chunk_result = grouped.apply(first_last)
+    if not df.empty:
+        chunk_result = chunk_result.reset_index()
+    chunk_result.to_csv(child_path, index=False)
+
+
+def pre_process_transactions(csv_path, process_count=4):
+    chunk_size = 200000
+    # 鍒涘缓DataFrame
+    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
+    indexed_data = list(enumerate(chunks))
+    args = [(x, csv_path) for x in indexed_data]
+    # 鏂板啓娉�
+    with Pool(processes=process_count) as pool:
+        pool.map(__pre_process_transactions_detail, args)
+
+
+def __pre_process_ngtsticks(args):
+    def first_last(group):
+        return pd.Series({
+            'TotalAmount': group['TradeMoney'].sum(),
+            'TotalVolume': group['Volume'].sum(),
+            'StartTime': group['TickTime'].iloc[0],
+            'StartPrice': group['Price'].iloc[0],
+            'EndTime': group['TickTime'].iloc[-1],
+            'EndPrice': group['Price'].iloc[-1]
+        })
+
+    chunk_index, chunk_data = args[0]
+    csv_path = args[1]
+
+    df = chunk_data.copy()
+    index = chunk_index + 1
+    children_dir = csv_path.replace(".csv", "")
+    if not os.path.exists(children_dir):
+        os.makedirs(children_dir)
+    child_path = os.path.join(children_dir, f"{index}.csv")
+    if os.path.exists(child_path):
+        return
+    print(f"澶勭悊NGTSTick绗瑊index}鎵规")
+    df = df[df["TickType"] == 'T']
+    df["Price"] = df["Price"].apply(BigOrderDealParser.str_to_float)
+    df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
+    df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
+    # 璁$畻鎴愪氦閲戦
+    df['TradeMoney'] = df["TradeMoney"].apply(BigOrderDealParser.str_to_float)
+
+    # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
+    grouped = df.groupby(['SecurityID', 'BuyNo'])
+    # 搴旂敤鑱氬悎鍑芥暟
+    chunk_result = grouped.apply(first_last)
+    if not df.empty:
+        chunk_result = chunk_result.reset_index()
+    chunk_result.to_csv(child_path, index=False)
+
+
+def pre_process_ngtsticks(csv_path, process_count=4):
+    chunk_size = 200000
+    # 鍒涘缓DataFrame
+    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
+    indexed_data = list(enumerate(chunks))
+    args = [(x, csv_path) for x in indexed_data]
+    # 鏂板啓娉�
+    with Pool(processes=process_count) as pool:
+        pool.map(__pre_process_ngtsticks, args)
+
+
+def __concat_pre_datas(dir_path):
+    """
+    鎷兼帴鏁版嵁
+    @param dir_path:
+    @return:
+    """
+    combined_path = os.path.join(dir_path, 'combined.csv')
+    if os.path.exists(combined_path):
+        print("鍚堝苟鐨勭洰鏍囨枃浠跺凡瀛樺湪")
+        return
+    file_list = os.listdir(dir_path)
+    file_list.sort(key=lambda x: int(re.findall(r'\d+', x)[0]))
+    df_list = []
+    for file in file_list:
+        df = pd.read_csv(os.path.join(dir_path, file))
+        if df.empty:
+            continue
+        df["SecurityID"] = df["SecurityID"].apply(BigOrderDealParser.code_format)
+        df_list.append(df)
+    print("鍑嗗鍚堝苟鐨勬枃浠舵暟閲忥細", len(df_list))
+
+    combined_df = pd.concat(df_list, ignore_index=True)
+
+    print("鍚堝苟瀹屾垚锛屽噯澶囧啓鍏ユ枃浠讹紒")
+    # 淇濆瓨缁撴灉
+    combined_df.to_csv(combined_path, index=False)
+    print("鍐欏叆鏂囦欢瀹屾垚锛�")
+
+
+def concat_pre_transactions(dir_path):
+    __concat_pre_datas(dir_path)
+
+
+def concat_pre_ngtsticks(dir_path):
+    __concat_pre_datas(dir_path)
+
+
+def process_combined_transaction(dir_path):
+    """
+    澶勭悊鎷兼帴璧锋潵鐨勬暟鎹�
+    @param dir_path:
+    @return:
+    """
+    combined_path = os.path.join(dir_path, 'combined.csv')
+    if not os.path.exists(combined_path):
+        print("鎷兼帴鏁版嵁涓嶅瓨鍦�")
+        return
+    df = pd.read_csv(combined_path)
+    df_copy = df.copy()
+    grouped = df_copy.groupby(["SecurityID"])
+    # 搴旂敤鑱氬悎鍑芥暟
+    chunk_result = grouped.apply(pd.Series({}))
+    # chunk_result["SecurityID"] = chunk_result["SecurityID"].apply(BigOrderDealParser.code_format)
+    print(chunk_result.to_string(
+        index=False,  # 涓嶆樉绀虹储寮�
+        justify='left',  # 宸﹀榻�
+        float_format='%.3f'  # 娴偣鏁版牸寮�
+    ))
+
+
+
+
+if __name__ == "__main__":
+    # pre_process_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test.csv")
+    # pre_process_ngtsticks("E:/娴嬭瘯鏁版嵁/NGTSTick_Test.csv")
+    # concat_pre_transactions("E:/娴嬭瘯鏁版嵁/Transaction_Test")
+    process_combined_transaction("E:/娴嬭瘯鏁版嵁/Transaction_Test")

--
Gitblit v1.8.0