From f3651b3d5b750c26b3f5a46e0a4abfc5401782d0 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期五, 09 五月 2025 17:21:33 +0800
Subject: [PATCH] 大单解析器

---
 l2_data_parser.py |  126 ++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 120 insertions(+), 6 deletions(-)

diff --git a/l2_data_parser.py b/l2_data_parser.py
index b93a0e2..0f2b715 100644
--- a/l2_data_parser.py
+++ b/l2_data_parser.py
@@ -5,6 +5,7 @@
 import os
 import sys
 from db import mysql_data_delegate as mysql_data
+from huaxin_client.l2_client_test import L2TransactionDataManager
 from log_module import log_export
 
 
@@ -14,7 +15,7 @@
     for plate in special_codes_dict:
         codes = special_codes_dict[plate]
         fcodes |= codes
-    fcodes
+    return fcodes
 
 
 def parse_order_detail(day, code):
@@ -40,9 +41,10 @@
                 writer.writerow(row)
 
 
-def parse_transaction(day):
+def parse_transaction(day, save=True):
     target_codes = __get_target_codes(day)
     base_path = f"/home/userzjj/ftp/{day}"
+    fdatas = []
     with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file:
         csv_reader = csv.reader(file)
         # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp]
@@ -58,12 +60,16 @@
                 if row[1] not in target_codes:
                     continue
                 # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰
-                writer.writerow(row)
+                if save:
+                    writer.writerow(row)
+                fdatas.append(row)
+    return fdatas
 
 
-def parse_ngtstick(day):
+def parse_ngtstick(day, save=True):
     target_codes = __get_target_codes(day)
     base_path = f"/home/userzjj/ftp/{day}"
+    fdatas = []
     with open(f"{base_path}/NGTSTick.csv", 'r', encoding='utf-8') as file:
         csv_reader = csv.reader(file)
         # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp]
@@ -78,8 +84,102 @@
             for row in csv_reader:
                 if row[1] not in target_codes:
                     continue
-                # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰
-                writer.writerow(row)
+                if save:
+                    # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰
+                    writer.writerow(row)
+                fdatas.append(row)
+    return fdatas
+
+
+def parse_deal_big_orders(day, big_order_path, target_code=None):
+    """
+    鎻愬彇鎵�鏈夋垚浜ゅぇ鍗�
+    @param day:
+    @return:
+    """
+    print("寮�濮嬪鐞嗘暟鎹�", day, big_order_path, target_code)
+    l2_data_manager_dict = {}
+    # 瑙f瀽鏁版嵁
+    transaction_data = parse_transaction(day)
+    print("Transaction 璇诲彇瀹屾瘯", len(transaction_data))
+    ngtstick_data = parse_ngtstick(day)
+    print("NGTSTick 璇诲彇瀹屾瘯", len(ngtstick_data))
+    big_order_list = []
+    for index, row in enumerate(transaction_data):
+        code = row[1]
+        if target_code and code != target_code:
+            continue
+        item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2),
+                "TradeVolume": int(row[4]),
+                "OrderTime": int(row[2]), "MainSeq": int(row[6]),
+                "SubSeq": int(row[7]), "BuyNo": int(row[8]),
+                "SellNo": int(row[9]),
+                "ExecType": int(row[5])}
+        if item["TradePrice"] <= 0:
+            continue
+        if code not in l2_data_manager_dict:
+            l2_data_manager_dict[code] = L2TransactionDataManager(code, True)
+        l2_data_manager_dict[code].add_transaction_data(item)
+        if index % 100 == 0:
+            # 璇诲彇闃熷垪涓殑鏁版嵁
+            l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
+            while not l2_data_manager.big_accurate_buy_order_queue.empty():
+                data = l2_data_manager.big_accurate_buy_order_queue.get()
+                big_order_list.append((code, 0, data))
+
+            while not l2_data_manager.big_accurate_sell_order_queue.empty():
+                data = l2_data_manager.big_accurate_sell_order_queue.get()
+                big_order_list.append((code, 1, data))
+    print("Transaction 澶勭悊瀹屾瘯", len(big_order_list))
+
+    for index, row in enumerate(ngtstick_data):
+        code = row[1]
+        if target_code and code != target_code:
+            continue
+        if row[5].strip() != 'T':
+            # 杩囨护闈炴垚浜ゆ暟鎹�
+            continue
+
+        item = {"SecurityID": row[1], "TradePrice": round(float(row[8].split("@")[0]), 2),
+                "TradeVolume": int(row[9]),
+                "OrderTime": row[4], "MainSeq": row[2],
+                "SubSeq": row[3], "BuyNo": row[6],
+                "SellNo": row[7],
+                "ExecType": '1'}
+        if item["TradePrice"] <= 0:
+            continue
+
+        if code not in l2_data_manager_dict:
+            l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(code, True)
+        l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item)
+        if index % 100 == 0:
+            # 璇诲彇闃熷垪涓殑鏁版嵁
+            l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
+            while not l2_data_manager.big_accurate_buy_order_queue.empty():
+                data = l2_data_manager.big_accurate_buy_order_queue.get()
+                big_order_list.append((code, 0, data))
+
+            while not l2_data_manager.big_accurate_sell_order_queue.empty():
+                data = l2_data_manager.big_accurate_sell_order_queue.get()
+                big_order_list.append((code, 1, data))
+    print("NGTSTick 澶勭悊瀹屾瘯", len(big_order_list))
+    # 璇诲彇鍓╀綑鐨勬湭璇绘暟鎹殑浠g爜
+    for code in l2_data_manager_dict:
+        l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
+        while not l2_data_manager.big_accurate_buy_order_queue.empty():
+            data = l2_data_manager.big_accurate_buy_order_queue.get()
+            big_order_list.append((code, 0, data))
+
+        while not l2_data_manager.big_accurate_sell_order_queue.empty():
+            data = l2_data_manager.big_accurate_sell_order_queue.get()
+            big_order_list.append((code, 1, data))
+    print("寮�濮嬪啓鍏ユ湰鍦版枃浠讹細", len(big_order_list))
+    # 寮�濮嬪啓鍏ユ湰鍦版枃浠�
+    with open(big_order_path, mode='w', encoding='utf-8') as f:
+        for order in big_order_list:
+            f.write(f"{order}")
+    print("鍐欏叆鏈湴鏂囦欢缁撴潫锛�")
+
 
 
 def parse_market_data(day):
@@ -103,6 +203,7 @@
                 writer.writerow(row)
 
 
+# 鍛戒护妯″紡  /home/userzjj/app/gp-server/l2_data_parser Transaction  2025-05-08
 if __name__ == '__main__':
     if len(sys.argv) > 1:
         params = sys.argv[1:]
@@ -121,3 +222,16 @@
             parse_ngtstick(day)
         elif _type == 'MarketData':
             parse_market_data(day)
+        elif _type == 'ExtractDealBigOrder':
+            # 鎻愬彇鎵�鏈夋垚浜ょ殑澶у崟
+            # TODO
+            if len(params) > 2:
+                save_path = params[2].strip()
+            else:
+                save_path = None
+
+            if len(params) > 3:
+                target_code = params[3].strip()
+            else:
+                target_code = None
+            parse_deal_big_orders(day, save_path, target_code)

--
Gitblit v1.8.0