From cc2c7a8514d3d6eea4f73b5875c2500883d165f0 Mon Sep 17 00:00:00 2001
From: Administrator <admin@example.com>
Date: 星期一, 26 五月 2025 22:57:20 +0800
Subject: [PATCH] 增加自动加白接口
---
l2_data_parser.py | 409 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 398 insertions(+), 11 deletions(-)
diff --git a/l2_data_parser.py b/l2_data_parser.py
index 2578aae..0574cfb 100644
--- a/l2_data_parser.py
+++ b/l2_data_parser.py
@@ -2,21 +2,408 @@
L2鏁版嵁瑙f瀽鍣�
"""
import csv
-def parse_order_detail():
- with open('/home/userzjj/ftp/20250123/OrderDetail.csv', 'r', encoding='utf-8') as file:
+import os
+import sys
+import time
+from multiprocessing import Pool
+
+import pandas as pd
+
+from data_parser import transaction_big_order_parser
+from db import mysql_data_delegate as mysql_data
+from huaxin_client.l2_client_test import L2TransactionDataManager
+from log_module import log_export
+from utils import tool
+
+
+def __get_target_codes(day):
+ special_codes_dict = log_export.load_special_codes(day)
+ fcodes = set()
+ for plate in special_codes_dict:
+ codes = special_codes_dict[plate]
+ fcodes |= codes
+ return fcodes
+
+
+def parse_order_detail(day, code):
+ target_codes = __get_target_codes(day)
+ base_path = f"/home/userzjj/ftp/{day}"
+ with open(f"{base_path}/OrderDetail.csv", 'r', encoding='utf-8') as file:
csv_reader = csv.reader(file)
- # 鑾峰彇琛ㄥご
+ # 鑾峰彇琛ㄥご锛�: ['ExchangeID', 'SecurityID', 'OrderTime', 'Price', 'Volume', 'Side', 'OrderType', 'MainSeq', 'SubSeq', 'Info1', 'Info2', 'Info3', 'OrderNO', 'OrderStatus', 'BizIndex', 'LocalTimeStamp']
headers = next(csv_reader)
print("琛ㄥご:", headers)
# 閬嶅巻鏁版嵁琛�
- max_count = 10
- count = 0
- for row in csv_reader:
- print(row)
- count += 1
- if count > max_count:
- break
+ _path = f"{base_path}/OrderDetail_filter.csv"
+ with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
+ # 鍒涘缓涓�涓� CSV 鍐欏叆鍣ㄥ璞�
+ writer = csv.writer(csvfile)
+ # 閫愯鍐欏叆鏁版嵁
+ for row in csv_reader:
+ if row[1] not in target_codes:
+ continue
+ if code and code != row[1]:
+ continue
+ # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰
+ writer.writerow(row)
+def parse_transaction(day, save=True, parse_all=False):
+ target_codes = __get_target_codes(day)
+ base_path = f"/home/userzjj/ftp/{day}"
+ fdatas = []
+ df = pd.read_csv(f"{base_path}/Transaction.csv")
+ category_revenue = df.groupby('BuyNo')['TradeVolume'].sum()
+
+ with open(f"{base_path}/Transaction.csv", 'r', encoding='utf-8') as file:
+ csv_reader = csv.reader(file)
+ total_lines = csv_reader.line_num
+ print("鎬昏鏁帮細", total_lines)
+ # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp]
+ headers = next(csv_reader)
+ print("琛ㄥご:", headers)
+ # 閬嶅巻鏁版嵁琛�
+ _path = f"{base_path}/Transaction_filter.csv"
+ percent = 0
+ with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
+ # 鍒涘缓涓�涓� CSV 鍐欏叆鍣ㄥ璞�
+ writer = csv.writer(csvfile)
+ # 閫愯鍐欏叆鏁版嵁
+ count = 0
+ for row in csv_reader:
+ count += 1
+ p = count * 100 // total_lines
+ if p != percent:
+ percent = p
+ print(f"**杩涘害锛歿percent}%")
+ if row[1] not in target_codes and not parse_all:
+ continue
+ # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰
+ if save:
+ writer.writerow(row)
+ fdatas.append(row)
+ return fdatas
+
+
+def parse_ngtstick(day, save=True):
+ target_codes = __get_target_codes(day)
+ base_path = f"/home/userzjj/ftp/{day}"
+ fdatas = []
+ with open(f"{base_path}/NGTSTick.csv", 'r', encoding='utf-8') as file:
+ csv_reader = csv.reader(file)
+ # 鑾峰彇琛ㄥご锛�: [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp]
+ headers = next(csv_reader)
+ print("琛ㄥご:", headers)
+ # 閬嶅巻鏁版嵁琛�
+ _path = f"{base_path}/NGTSTick_filter.csv"
+ with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
+ # 鍒涘缓涓�涓� CSV 鍐欏叆鍣ㄥ璞�
+ writer = csv.writer(csvfile)
+ # 閫愯鍐欏叆鏁版嵁
+ for row in csv_reader:
+ if row[1] not in target_codes:
+ continue
+ if save:
+ # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰
+ writer.writerow(row)
+ fdatas.append(row)
+ return fdatas
+
+
+def parse_deal_big_orders(day, big_order_path, target_code=None):
+ """
+ 鎻愬彇鎵�鏈夋垚浜ゅぇ鍗�
+ @param day:
+ @return:
+ """
+ print("*******寮�濮嬪鐞嗘垚浜ゅぇ鍗曟暟鎹�", day, big_order_path, target_code)
+ l2_data_manager_dict = {}
+ # 瑙f瀽鏁版嵁
+ __start_time = time.time()
+ transaction_data = parse_transaction(day, parse_all=True)
+ print("*******Transaction 璇诲彇瀹屾瘯", len(transaction_data), "鑰楁椂", int(time.time() - __start_time))
+ __start_time = time.time()
+ ngtstick_data = parse_ngtstick(day)
+ print("*******NGTSTick 璇诲彇瀹屾瘯", len(ngtstick_data), "鑰楁椂", int(time.time() - __start_time))
+ __start_time = time.time()
+ big_order_list = []
+ for index, row in enumerate(transaction_data):
+ code = row[1]
+ if target_code and code != target_code:
+ continue
+ item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2),
+ "TradeVolume": int(row[4]),
+ "OrderTime": int(row[2]), "MainSeq": int(row[6]),
+ "SubSeq": int(row[7]), "BuyNo": int(row[8]),
+ "SellNo": int(row[9]),
+ "ExecType": int(row[5])}
+ if item["TradePrice"] <= 0:
+ continue
+ if code not in l2_data_manager_dict:
+ l2_data_manager_dict[code] = L2TransactionDataManager(code, True)
+ l2_data_manager_dict[code].add_transaction_data(item, big_order_money_threshold=100e4)
+ if index % 100 == 0:
+ # 璇诲彇闃熷垪涓殑鏁版嵁
+ l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
+ while not l2_data_manager.big_accurate_buy_order_queue.empty():
+ data = l2_data_manager.big_accurate_buy_order_queue.get()
+ big_order_list.append((code, 0, data))
+
+ while not l2_data_manager.big_accurate_sell_order_queue.empty():
+ data = l2_data_manager.big_accurate_sell_order_queue.get()
+ big_order_list.append((code, 1, data))
+ print("********Transaction 澶勭悊瀹屾瘯", len(big_order_list), "鑰楁椂", int(time.time() - __start_time))
+ __start_time = time.time()
+
+ for index, row in enumerate(ngtstick_data):
+ code = row[1]
+ if target_code and code != target_code:
+ continue
+ if row[5].strip() != 'T':
+ # 杩囨护闈炴垚浜ゆ暟鎹�
+ continue
+
+ item = {"SecurityID": row[1], "TradePrice": round(float(row[8].split("@")[0]), 2),
+ "TradeVolume": int(row[9]),
+ "OrderTime": row[4], "MainSeq": row[2],
+ "SubSeq": row[3], "BuyNo": row[6],
+ "SellNo": row[7],
+ "ExecType": '1'}
+ if item["TradePrice"] <= 0:
+ continue
+
+ if code not in l2_data_manager_dict:
+ l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(code, True)
+ l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item)
+ if index % 100 == 0:
+ # 璇诲彇闃熷垪涓殑鏁版嵁
+ l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
+ while not l2_data_manager.big_accurate_buy_order_queue.empty():
+ data = l2_data_manager.big_accurate_buy_order_queue.get()
+ big_order_list.append((code, 0, data))
+
+ while not l2_data_manager.big_accurate_sell_order_queue.empty():
+ data = l2_data_manager.big_accurate_sell_order_queue.get()
+ big_order_list.append((code, 1, data))
+ print("********NGTSTick 澶勭悊瀹屾瘯", len(big_order_list), "鑰楁椂", int(time.time() - __start_time))
+ __start_time = time.time()
+ # 璇诲彇鍓╀綑鐨勬湭璇绘暟鎹殑浠g爜
+ for code in l2_data_manager_dict:
+ l2_data_manager: L2TransactionDataManager = l2_data_manager_dict[code]
+ while not l2_data_manager.big_accurate_buy_order_queue.empty():
+ data = l2_data_manager.big_accurate_buy_order_queue.get()
+ big_order_list.append((code, 0, data))
+
+ while not l2_data_manager.big_accurate_sell_order_queue.empty():
+ data = l2_data_manager.big_accurate_sell_order_queue.get()
+ big_order_list.append((code, 1, data))
+ print("********寮�濮嬪啓鍏ユ湰鍦版枃浠讹細", len(big_order_list), "鑰楁椂", int(time.time() - __start_time))
+ __start_time = time.time()
+ # 寮�濮嬪啓鍏ユ湰鍦版枃浠�
+ with open(big_order_path, mode='w', encoding='utf-8') as f:
+ for order in big_order_list:
+ f.write(f"{order}\n")
+ print("*******鍐欏叆鏈湴鏂囦欢缁撴潫锛�", "鑰楁椂", int(time.time() - __start_time))
+ __start_time = time.time()
+
+
+def parse_market_data(day):
+ target_codes = __get_target_codes(day)
+ base_path = f"/home/userzjj/ftp/{day}"
+ with open(f"{base_path}/MarketData.csv", 'r', encoding='utf-8') as file:
+ csv_reader = csv.reader(file)
+ # 鑾峰彇琛ㄥご锛�: [SecurityID,ExchangeID,DataTimeStamp,PreClosePrice,OpenPrice,NumTrades,TotalVolumeTrade,TotalValueTrade,TotalBidVolume,AvgBidPrice,TotalAskVolume,AvgAskPrice,HighestPrice,LowestPrice,LastPrice,BidPrice1,BidVolume1,AskPrice1,AskVolume1....]
+ headers = next(csv_reader)
+ print("琛ㄥご:", headers)
+ # 閬嶅巻鏁版嵁琛�
+ _path = f"{base_path}/MarketData_filter.csv"
+ with open(_path, 'w', newline='', encoding='utf-8') as csvfile:
+ # 鍒涘缓涓�涓� CSV 鍐欏叆鍣ㄥ璞�
+ writer = csv.writer(csvfile)
+ # 閫愯鍐欏叆鏁版嵁
+ for row in csv_reader:
+ if row[0] not in target_codes:
+ continue
+ # 灏嗘枃浠跺啓鍏ュ埌鏂囨湰
+ writer.writerow(row)
+
+def test1(args):
+ index, df = args
+ print(index)
+
+def pre_process_transactions(csv_path="E:/娴嬭瘯鏁版嵁/Transaction_Test.csv"):
+ def str_to_float(s):
+ try:
+ # 绉婚櫎鍗曚綅骞惰浆鎹�
+ return round(float(s.split("@")[0]), 2)
+ except:
+ return float("nan")
+
+ def code_format(s):
+ try:
+ code = "{0:0>6}".format(s)
+ return code
+ except:
+ return ''
+
+ # [ExchangeID, SecurityID, TradeTime, TradePrice, TradeVolume, ExecType, MainSeq, SubSeq, BuyNo, SellNo, Info1, Info2,
+ # Info3, TradeBSFlag, BizIndex, LocalTimeStamp]
+ # transaction_data = {
+ # "SecurityID": ['300920', '300920', '300920', '300920'],
+ # "TradeTime": [91500040, 91500041, 91500042, 92000040],
+ # "TradePrice": [15.0, 16.2, 15.2, 16.3],
+ # "TradeVolume": [100, 100, 200, 300],
+ # "BuyNo": [0, 1, 1, 1]
+ # }
+ # 瀹氫箟鑱氬悎鍑芥暟
+ def first_last(group):
+ return pd.Series({
+ 'TotalAmount': group['TradeAmount'].sum(),
+ 'TotalVolume': group['TradeVolume'].sum(),
+ 'StartTime': group['TradeTime'].iloc[0],
+ 'StartPrice': group['TradePrice'].iloc[0],
+ 'EndTime': group['TradeTime'].iloc[-1],
+ 'EndPrice': group['TradePrice'].iloc[-1]
+ })
+
+
+
+
+ dtype = {
+ 'SecurityID': 'category', # 浣庡熀鏁板垎绫绘暟鎹�
+ }
+ chunk_size = 10000
+ # 鍒涘缓DataFrame
+ chunks = pd.read_csv(csv_path, chunksize=chunk_size)
+ indexed_data = list(enumerate(chunks))
+ # 鏂板啓娉�
+ with Pool(processes=4) as pool:
+ pool.map(test1, indexed_data)
+
+ result_list = []
+ for chunk_index, chunk in enumerate(chunks):
+ df = chunk.copy()
+ index = chunk_index + 1
+ child_path = csv_path.replace(".csv", f"_{index}.csv")
+ if os.path.exists(child_path):
+ continue
+ print(f"澶勭悊绗瑊index}鎵规")
+ df["TradePrice"] = df["TradePrice"].apply(str_to_float)
+ df["SecurityID"] = df["SecurityID"].apply(code_format)
+ df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
+ # 璁$畻鎴愪氦閲戦
+ df['TradeAmount'] = df['TradePrice'] * df['TradeVolume']
+
+ # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
+ grouped = df.groupby(['SecurityID', 'BuyNo'])
+
+ # 搴旂敤鑱氬悎鍑芥暟
+ chunk_result = grouped.apply(first_last).reset_index()
+
+ chunk_result.to_csv(child_path, index=False)
+ print(f"澶勭悊瀹屾瘯锛屾�诲叡{index}鎵�")
+
+
+def pre_process_ngtstick(csv_path="E:/娴嬭瘯鏁版嵁/NGTSTick_Test.csv"):
+ def str_to_float(s):
+ try:
+ # 绉婚櫎鍗曚綅骞惰浆鎹�
+ return round(float(s.split("@")[0]), 2)
+ except:
+ return float("nan")
+
+ def code_format(s):
+ try:
+ code = "{0:0>6}".format(s)
+ return code
+ except:
+ return ''
+
+ # 瀹氫箟鑱氬悎鍑芥暟
+ def first_last(group):
+ return pd.Series({
+ 'TotalAmount': group['TradeMoney'].sum(),
+ 'TotalVolume': group['Volume'].sum(),
+ 'StartTime': group['TickTime'].iloc[0],
+ 'StartPrice': group['Price'].iloc[0],
+ 'EndTime': group['TickTime'].iloc[-1],
+ 'EndPrice': group['Price'].iloc[-1]
+ })
+
+ # [ExchangeID,SecurityID,MainSeq,SubSeq,TickTime,TickType,BuyNo,SellNo,Price,Volume,TradeMoney,Side,TradeBSFlag,MDSecurityStat,Info1,Info2,Info3,LocalTimeStamp]
+
+ chunk_size = 10000
+ # 鍒涘缓DataFrame
+ chunks = pd.read_csv(csv_path, chunksize=chunk_size)
+ result_list = []
+ index = 0
+ for df in chunks:
+ index += 1
+ child_path = csv_path.replace(".csv", f"_{index}.csv")
+ if os.path.exists(child_path):
+ continue
+ print(f"澶勭悊绗瑊index}鎵规")
+ df = df[df["TickType"] == 'T']
+ df["Price"] = df["Price"].apply(str_to_float)
+ df["SecurityID"] = df["SecurityID"].apply(code_format)
+
+ df = df[df["SecurityID"].str.startswith(("30", "00", "60"), na=False)]
+
+ # 璁$畻鎴愪氦閲戦
+ df['TradeMoney'] = df["TradeMoney"].apply(str_to_float)
+ # 鎸塖ecurityID鍜孊uyNo鍒嗙粍
+ grouped = df.groupby(['SecurityID', 'BuyNo'])
+ # 搴旂敤鑱氬悎鍑芥暟
+ chunk_result = grouped.apply(first_last).reset_index()
+ chunk_result.to_csv(child_path, index=False)
+ print(f"澶勭悊瀹屾瘯锛屾�诲叡{index}鎵�")
+
+
+if __name__ == '__main__1':
+ # df = pd.read_csv(f"E:/娴嬭瘯鏁版嵁/Transaction_Test.csv")
+ pre_process_transactions()
+
+# 鍛戒护妯″紡 /home/userzjj/app/gp-server/l2_data_parser Transaction 2025-05-08
+# 瑙f瀽澶у崟锛� /home/userzjj/app/gp-server/l2_data_parser ExtractDealBigOrder 2025-05-09 /home/userzjj/鏈�缁堟垚浜ゆ暟鎹�20250509.txt 000555
if __name__ == '__main__':
- parse_order_detail()
+ if len(sys.argv) > 1:
+ params = sys.argv[1:]
+ print("鎺ユ敹鐨勫弬鏁�", params)
+ _type = params[0].strip()
+ day = params[1].strip()
+ if len(params) > 2:
+ code = params[2].strip()
+ else:
+ code = None
+ if _type == 'OrderDetail':
+ parse_order_detail(day, code)
+ elif _type == 'Transaction':
+ parse_transaction(day)
+ elif _type == 'NGTSTick':
+ parse_ngtstick(day)
+ elif _type == 'MarketData':
+ parse_market_data(day)
+ elif _type == 'Transaction_New':
+ transaction_big_order_parser.pre_process_transactions(f"/home/userzjj/ftp/{day}/Transaction.csv")
+ transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
+ elif _type == 'NGTSTick_New':
+ transaction_big_order_parser.pre_process_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick.csv")
+ transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
+ elif _type == 'Transaction_Concat':
+ transaction_big_order_parser.concat_pre_transactions(f"/home/userzjj/ftp/{day}/Transaction")
+ elif _type == 'NGTSTick_Concat':
+ transaction_big_order_parser.concat_pre_ngtsticks(f"/home/userzjj/ftp/{day}/NGTSTick")
+
+
+ elif _type == 'ExtractDealBigOrder':
+ # 鎻愬彇鎵�鏈夋垚浜ょ殑澶у崟
+ if len(params) > 2:
+ save_path = params[2].strip()
+ else:
+ save_path = None
+
+ if len(params) > 3:
+ target_code = params[3].strip()
+ else:
+ target_code = None
+ parse_deal_big_orders(day, save_path, target_code)
--
Gitblit v1.8.0