From 48fb7a00951f91bdc707e5dd2d196e5bccb752c3 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期三, 18 六月 2025 18:41:30 +0800
Subject: [PATCH] 异常保护

---
 l2_data_parser.py |  359 +++++++++++++++++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 330 insertions(+), 29 deletions(-)

diff --git a/l2_data_parser.py b/l2_data_parser.py
index f2b2d32..3cab7b0 100644
--- a/l2_data_parser.py
+++ b/l2_data_parser.py
@@ -4,16 +4,28 @@
 import csv
 import os
 import sys
+import time
+from multiprocessing import Pool
+
+import pandas as pd
+
+from data_parser import transaction_big_order_parser
 from db import mysql_data_delegate as mysql_data
+from huaxin_client.l2_client_test import L2TransactionDataManager
+from log_module import log_export
+from utils import tool
 
 
 def __get_target_codes(day):
-    results = mysql_data.Mysqldb().select_all(f"select _code from kpl_limit_up_record where _day='{day}'")
-    codes = set([x[0] for x in results])
-    return codes
+    special_codes_dict = log_export.load_special_codes(day)
+    fcodes = set()
+    for plate in special_codes_dict:
+        codes = special_codes_dict[plate]
+        fcodes |= codes
+    return fcodes
 
 
-def parse_order_detail(day):
+def parse_order_detail(day, code):
     target_codes = __get_target_codes(day)
     base_path = f"/home/userzjj/ftp/{day}"
     with open(f"{base_path}/OrderDetail.csv", 'r', encoding='utf-8') as file:
@@ -30,34 +42,53 @@
             for row in csv_reader:
                 if row[1] not in target_codes:
                     continue
-                # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰
-                writer.writerow(row)
-
-
-def parse_transaction(day):
-    target_codes = __get_target_codes(day)
-    base_path = f"/home/userzjj/ftp/{day}"
-    with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file:
-        csv_reader = csv.reader(file)
-        # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp]
-        headers = next(csv_reader)
-        print("琛ㄥご:", headers)
-        # 閬嶅巻鏁版嵁琛�
-        _path = f"{base_path}/Transaction_filter.csv"
-        with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
-            # 鍒涘缓涓�涓� CSV 鍐欏叆鍣ㄥ璞�
-            writer = csv.writer(csvfile)
-            # 閫愯鍐欏叆鏁版嵁
-            for row in csv_reader:
-                if row[1] not in target_codes:
+                if code and code != row[1]:
                     continue
                 # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰
                 writer.writerow(row)
 
 
-def parse_ngtstick(day):
+def parse_transaction(day, save=True, parse_all=False):
     target_codes = __get_target_codes(day)
     base_path = f"/home/userzjj/ftp/{day}"
+    fdatas = []
+    df = pd.read_csv(f"{base_path}/Transaction.csv")
+    category_revenue = df.groupby('BuyNo')['TradeVolume'].sum()
+
+    with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file:
+        csv_reader = csv.reader(file)
+        total_lines = csv_reader.line_num
+        print("鎬昏鏁帮細", total_lines)
+        # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp]
+        headers = next(csv_reader)
+        print("琛ㄥご:", headers)
+        # 閬嶅巻鏁版嵁琛�
+        _path = f"{base_path}/Transaction_filter.csv"
+        percent = 0
+        with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
+            # 鍒涘缓涓�涓� CSV 鍐欏叆鍣ㄥ璞�
+            writer = csv.writer(csvfile)
+            # 閫愯鍐欏叆鏁版嵁
+            count = 0
+            for row in csv_reader:
+                count += 1
+                p = count * 100 // total_lines
+                if p != percent:
+                    percent = p
+                    print(f"**杩涘害锛歿percent}%")
+                if row[1] not in target_codes and not parse_all:
+                    continue
+                # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰
+                if save:
+                    writer.writerow(row)
+                fdatas.append(row)
+    return fdatas
+
+
+def parse_ngtstick(day, save=True):
+    target_codes = __get_target_codes(day)
+    base_path = f"/home/userzjj/ftp/{day}"
+    fdatas = []
     with open(f"{base_path}/NGTSTick.csv", 'r', encoding='utf-8') as file:
         csv_reader = csv.reader(file)
         # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp]
@@ -72,8 +103,108 @@
             for row in csv_reader:
                 if row[1] not in target_codes:
                     continue
-                # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰
-                writer.writerow(row)
+                if save:
+                    # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰
+                    writer.writerow(row)
+                fdatas.append(row)
+    return fdatas
+
+
+def parse_deal_big_orders(day, big_order_path, target_code=None):
+    """
+    鎻愬彇鎵�鏈夋垚浜ゅぇ鍗�
+    @param day:
+    @return:
+    """
+    print("*******寮�濮嬪鐞嗘垚浜ゅぇ鍗曟暟鎹�", day, big_order_path, target_code)
+    l2_data_manager_dict = {}
+    # 瑙f瀽鏁版嵁
+    __start_time = time.time()
+    transaction_data = parse_transaction(day, parse_all=True)
+    print("*******Transaction 璇诲彇瀹屾瘯", len(transaction_data), "鑰楁椂", int(time.time() - __start_time))
+    __start_time = time.time()
+    ngtstick_data = parse_ngtstick(day)
+    print("*******NGTSTick 璇诲彇瀹屾瘯", len(ngtstick_data), "鑰楁椂", int(time.time() - __start_time))
+    __start_time = time.time()
+    big_order_list = []
+    for index, row in enumerate(transaction_data):
+        code = row[1]
+        if target_code and code != target_code:
+            continue
+        item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2),
+                "TradeVolume": int(row[4]),
+                "OrderTime": int(row[2]), "MainSeq": int(row[6]),
+                "SubSeq": int(row[7]), "BuyNo": int(row[8]),
+                "SellNo": int(row[9]),
+                "ExecType": int(row[5])}
+        if item["TradePrice"] <= 0:
+            continue
+        if code not in l2_data_manager_dict:
+            l2_data_manager_dict[code] = L2TransactionDataManager(code, True)
+        l2_data_manager_dict[code].add_transaction_data(item, big_order_money_threshold=100e4)
+        if index % 100 == 0:
+            # 璇诲彇闃熷垪涓殑鏁版嵁
+            l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
+            while not l2_data_manager.big_accurate_buy_order_queue.empty():
+                data = l2_data_manager.big_accurate_buy_order_queue.get()
+                big_order_list.append((code, 0, data))
+
+            while not l2_data_manager.big_accurate_sell_order_queue.empty():
+                data = l2_data_manager.big_accurate_sell_order_queue.get()
+                big_order_list.append((code, 1, data))
+    print("********Transaction 澶勭悊瀹屾瘯", len(big_order_list), "鑰楁椂", int(time.time() - __start_time))
+    __start_time = time.time()
+
+    for index, row in enumerate(ngtstick_data):
+        code = row[1]
+        if target_code and code != target_code:
+            continue
+        if row[5].strip() != 'T':
+            # 杩囨护闈炴垚浜ゆ暟鎹�
+            continue
+
+        item = {"SecurityID": row[1], "TradePrice": round(float(row[8].split("@")[0]), 2),
+                "TradeVolume": int(row[9]),
+                "OrderTime": row[4], "MainSeq": row[2],
+                "SubSeq": row[3], "BuyNo": row[6],
+                "SellNo": row[7],
+                "ExecType": '1'}
+        if item["TradePrice"] <= 0:
+            continue
+
+        if code not in l2_data_manager_dict:
+            l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(code, True)
+        l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item)
+        if index % 100 == 0:
+            # 璇诲彇闃熷垪涓殑鏁版嵁
+            l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
+            while not l2_data_manager.big_accurate_buy_order_queue.empty():
+                data = l2_data_manager.big_accurate_buy_order_queue.get()
+                big_order_list.append((code, 0, data))
+
+            while not l2_data_manager.big_accurate_sell_order_queue.empty():
+                data = l2_data_manager.big_accurate_sell_order_queue.get()
+                big_order_list.append((code, 1, data))
+    print("********NGTSTick 澶勭悊瀹屾瘯", len(big_order_list), "鑰楁椂", int(time.time() - __start_time))
+    __start_time = time.time()
+    # 璇诲彇鍓╀綑鐨勬湭璇绘暟鎹殑浠g爜
+    for code in l2_data_manager_dict:
+        l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
+        while not l2_data_manager.big_accurate_buy_order_queue.empty():
+            data = l2_data_manager.big_accurate_buy_order_queue.get()
+            big_order_list.append((code, 0, data))
+
+        while not l2_data_manager.big_accurate_sell_order_queue.empty():
+            data = l2_data_manager.big_accurate_sell_order_queue.get()
+            big_order_list.append((code, 1, data))
+    print("********寮�濮嬪啓鍏ユ湰鍦版枃浠讹細", len(big_order_list), "鑰楁椂", int(time.time() - __start_time))
+    __start_time = time.time()
+    # 寮�濮嬪啓鍏ユ湰鍦版枃浠�
+    with open(big_order_path, mode='w', encoding='utf-8') as f:
+        for order in big_order_list:
+            f.write(f"{order}\n")
+    print("*******鍐欏叆鏈湴鏂囦欢缁撴潫锛�", "鑰楁椂", int(time.time() - __start_time))
+    __start_time = time.time()
 
 
 def parse_market_data(day):
@@ -97,18 +228,188 @@
                 writer.writerow(row)
 
 
+def test1(args):
+    index, df = args
+    print(index)
+
+
+def pre_process_transactions(csv_path="E:/娴嬭瘯鏁版嵁/Transaction_Test.csv"):
+    def str_to_float(s):
+        try:
+            # 绉婚櫎鍗曚綅骞惰浆鎹�
+            return round(float(s.split("@")[0]), 2)
+        except:
+            return float("nan")
+
+    def code_format(s):
+        try:
+            code = "{0:0>6}".format(s)
+            return code
+        except:
+            return ''
+
+    # [ExchangeID, SecurityID, TradeTime, TradePrice, TradeVolume, ExecType, MainSeq, SubSeq, BuyNo, SellNo, Info1, Info2,
+    #  Info3, TradeBSFlag, BizIndex, LocalTimeStamp]
+    # transaction_data = {
+    #     "SecurityID": ['300920', '300920', '300920', '300920'],
+    #     "TradeTime": [91500040, 91500041, 91500042, 92000040],
+    #     "TradePrice": [15.0, 16.2, 15.2, 16.3],
+    #     "TradeVolume": [100, 100, 200, 300],
+    #     "BuyNo": [0, 1, 1, 1]
+    # }
+    # 瀹氫箟鑱氬悎鍑芥暟
+    def first_last(group):
+        return pd.Series({
+            'TotalAmount': group['TradeAmount'].sum(),
+            'TotalVolume': group['TradeVolume'].sum(),
+            'StartTime': group['TradeTime'].iloc[0],
+            'StartPrice': group['TradePrice'].iloc[0],
+            'EndTime': group['TradeTime'].iloc[-1],
+            'EndPrice': group['TradePrice'].iloc[-1]
+        })
+
+    dtype = {
+        'SecurityID': 'category',  # 浣庡熀鏁板垎绫绘暟鎹�
+    }
+    chunk_size = 10000
+    # 鍒涘缓DataFrame
+    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
+    indexed_data = list(enumerate(chunks))
+    # 鏂板啓娉�
+    with Pool(processes=4) as pool:
+        pool.map(test1, indexed_data)
+
+    result_list = []
+    for chunk_index, chunk in enumerate(chunks):
+        df = chunk.copy()
+        index = chunk_index + 1
+        child_path = csv_path.replace(".csv", f"_{index}.csv")
+        if os.path.exists(child_path):
+            continue
+        print(f"澶勭悊绗瑊index}鎵规")
+        df["TradePrice"] = df["TradePrice"].apply(str_to_float)
+        df["SecurityID"] = df["SecurityID"].apply(code_format)
+        df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
+        # 璁$畻鎴愪氦閲戦
+        df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
+
+        # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
+        grouped = df.groupby(['SecurityID', 'BuyNo'])
+
+        # 搴旂敤鑱氬悎鍑芥暟
+        chunk_result = grouped.apply(first_last).reset_index()
+
+        chunk_result.to_csv(child_path, index=False)
+    print(f"澶勭悊瀹屾瘯锛屾�诲叡{index}鎵�")
+
+
+def pre_process_ngtstick(csv_path="E:/娴嬭瘯鏁版嵁/NGTSTick_Test.csv"):
+    def str_to_float(s):
+        try:
+            # 绉婚櫎鍗曚綅骞惰浆鎹�
+            return round(float(s.split("@")[0]), 2)
+        except:
+            return float("nan")
+
+    def code_format(s):
+        try:
+            code = "{0:0>6}".format(s)
+            return code
+        except:
+            return ''
+
+    # 瀹氫箟鑱氬悎鍑芥暟
+    def first_last(group):
+        return pd.Series({
+            'TotalAmount': group['TradeMoney'].sum(),
+            'TotalVolume': group['Volume'].sum(),
+            'StartTime': group['TickTime'].iloc[0],
+            'StartPrice': group['Price'].iloc[0],
+            'EndTime': group['TickTime'].iloc[-1],
+            'EndPrice': group['Price'].iloc[-1]
+        })
+
+    # [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp]
+
+    chunk_size = 10000
+    # 鍒涘缓DataFrame
+    chunks = pd.read_csv(csv_path, chunksize=chunk_size)
+    result_list = []
+    index = 0
+    for df in chunks:
+        index += 1
+        child_path = csv_path.replace(".csv", f"_{index}.csv")
+        if os.path.exists(child_path):
+            continue
+        print(f"澶勭悊绗瑊index}鎵规")
+        df = df[df["TickType"] == 'T']
+        df["Price"] = df["Price"].apply(str_to_float)
+        df["SecurityID"] = df["SecurityID"].apply(code_format)
+
+        df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
+
+        # 璁$畻鎴愪氦閲戦
+        df['TradeMoney'] = df["TradeMoney"].apply(str_to_float)
+        # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
+        grouped = df.groupby(['SecurityID', 'BuyNo'])
+        # 搴旂敤鑱氬悎鍑芥暟
+        chunk_result = grouped.apply(first_last).reset_index()
+        chunk_result.to_csv(child_path, index=False)
+    print(f"澶勭悊瀹屾瘯锛屾�诲叡{index}鎵�")
+
+
+if __name__ == '__main__1':
+    # df = pd.read_csv(f"E:/娴嬭瘯鏁版嵁/Transaction_Test.csv")
+    pre_process_transactions()
+
+# 鍛戒护妯″紡  /home/userzjj/app/gp-server/l2_data_parser Transaction  2025-05-08
+# 瑙f瀽澶у崟锛� /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/鏈�缁堟垚浜ゆ暟鎹�20250509.txt 000555
 if __name__ == '__main__':
     if len(sys.argv) > 1:
         params = sys.argv[1:]
         print("鎺ユ敹鐨勫弬鏁�", params)
         _type = params[0].strip()
         day = params[1].strip()
+        if len(params) > 2:
+            code = params[2].strip()
+        else:
+            code = None
         if _type == 'OrderDetail':
-            parse_order_detail(day)
+            parse_order_detail(day, code)
         elif _type == 'Transaction':
             parse_transaction(day)
-        elif _type == 'XTSTick':
+        elif _type == 'NGTSTick':
             parse_ngtstick(day)
         elif _type == 'MarketData':
             parse_market_data(day)
+        elif _type == 'Transaction_New':
+            if len(params) > 2:
+                process_count = int(params[2].strip())
+            else:
+                process_count = 4
 
+            transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv",
+                                                                  process_count=process_count)
+            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
+        elif _type == 'NGTSTick_New':
+            if len(params) > 2:
+                process_count = int(params[2].strip())
+            else:
+                process_count = 4
+            transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv",
+                                                               process_count=process_count)
+            transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
+        elif _type == 'Transaction_Concat':
+            transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
+        elif _type == 'NGTSTick_Concat':
+            transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
+        elif _type == 'ExtractDealBigOrder':
+            # 鍛戒护妯″紡  /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09
+            if len(params) > 2:
+                process_count = int(params[2].strip())
+            else:
+                process_count = 10
+            transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/NGTSTick",
+                                                                  process_count=process_count)
+            transaction_big_order_parser.extract_big_order_of_all(f"/home/userzjj/ftp/{day}/Transaction",
+                                                                  process_count=process_count)

--
Gitblit v1.8.0