import csv import time import unittest from unittest import mock from huaxin_client.l2_client_test import L2TransactionDataManager from l2 import l2_data_util from l2.huaxin.huaxin_delegate_postion_manager import RealDelegateOrderPositionManager from l2.transaction_progress import TradeBuyQueue from trade.huaxin import huaxin_trade_record_manager # class TestRealPlaceOrderIndex(unittest.TestCase): # # @unittest.skip("跳过此单元测试") # def test_sort(self): # l2_data_util.load_l2_data_all() # code = "600797" # today_datas = l2_data_util.local_today_datas.get(code) # RealDelegateOrderPositionManager.place_order(code, [(600, 13.60, 80030), (800, 13.60, 80035)], 23900, # today_datas[23900]) # datas = [today_datas[23903:23930], today_datas[23930:23998], today_datas[814:826]] # for data in datas: # results = RealDelegateOrderPositionManager.compute_l2_place_order_position(code, data, strict_match=True) # print(results) # # @unittest.skip("跳过此单元测试") # def test_recompute_real_place_order_index(self): # """ # 重新计算真实下单位置 # @return: # """ # code = "600410" # l2_data_util.load_l2_data_all() # total_datas = l2_data_util.local_today_datas.get(code) # order_info = ([(2700, 7.94, 80160), (2900, 7.94, 80165)], total_datas[7723], time.time(), 7722) # TradeBuyQueue.get_traded_index = mock.Mock(return_value=(7722, False)) # # huaxin_trade_record_manager.DelegateRecordManager.list_current_delegates = mock.Mock(return_value=[{"acceptTime" : "10:41:55","volume":2700},{"acceptTime" : "10:41:55","volume":2900}]) # # result = RealDelegateOrderPositionManager().recompute_l2_place_order_position(code, order_info, 7744, 1) # print(result) # class L2DataTest(unittest.TestCase): from utils import tool l2_data_manager_dict = {} def parse_transaction(): path_str = r"E:\测试数据\transaction_test.csv" with open(path_str, 'r', encoding='utf-8') as file: csv_reader = csv.reader(file) # 获取表头:: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp] headers = next(csv_reader) print("表头:", headers) for row in csv_reader: item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2), "TradeVolume": int(row[4]), "OrderTime": int(row[2]), "MainSeq": int(row[6]), "SubSeq": int(row[7]), "BuyNo": int(row[8]), "SellNo": int(row[9]), "ExecType": int(row[5])} if item["TradePrice"] <=0: continue # print(item) if item["SecurityID"] not in l2_data_manager_dict: l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], True) l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item) for code in l2_data_manager_dict: __L2TransactionDataManager: L2TransactionDataManager = l2_data_manager_dict.get(code) if __L2TransactionDataManager.big_accurate_buy_order_queue.qsize() or __L2TransactionDataManager.big_accurate_sell_order_queue.qsize(): print(code, "大买单数量", __L2TransactionDataManager.big_accurate_buy_order_queue.qsize()) print(code, "大卖单数量", __L2TransactionDataManager.big_accurate_sell_order_queue.qsize()) if __name__ == "__main__": print( tool.trade_time_add_second(tool.get_now_time_str(), -3600).replace(":", "")) parse_transaction()