import csv
|
import time
|
import unittest
|
from unittest import mock
|
|
from huaxin_client.l2_client_test import L2TransactionDataManager
|
from l2 import l2_data_util
|
from l2.huaxin.huaxin_delegate_postion_manager import RealDelegateOrderPositionManager
|
from l2.transaction_progress import TradeBuyQueue
|
from trade.huaxin import huaxin_trade_record_manager
|
|
# class TestRealPlaceOrderIndex(unittest.TestCase):
|
#
|
# @unittest.skip("跳过此单元测试")
|
# def test_sort(self):
|
# l2_data_util.load_l2_data_all()
|
# code = "600797"
|
# today_datas = l2_data_util.local_today_datas.get(code)
|
# RealDelegateOrderPositionManager.place_order(code, [(600, 13.60, 80030), (800, 13.60, 80035)], 23900,
|
# today_datas[23900])
|
# datas = [today_datas[23903:23930], today_datas[23930:23998], today_datas[814:826]]
|
# for data in datas:
|
# results = RealDelegateOrderPositionManager.compute_l2_place_order_position(code, data, strict_match=True)
|
# print(results)
|
#
|
# @unittest.skip("跳过此单元测试")
|
# def test_recompute_real_place_order_index(self):
|
# """
|
# 重新计算真实下单位置
|
# @return:
|
# """
|
# code = "600410"
|
# l2_data_util.load_l2_data_all()
|
# total_datas = l2_data_util.local_today_datas.get(code)
|
# order_info = ([(2700, 7.94, 80160), (2900, 7.94, 80165)], total_datas[7723], time.time(), 7722)
|
# TradeBuyQueue.get_traded_index = mock.Mock(return_value=(7722, False))
|
#
|
# huaxin_trade_record_manager.DelegateRecordManager.list_current_delegates = mock.Mock(return_value=[{"acceptTime" : "10:41:55","volume":2700},{"acceptTime" : "10:41:55","volume":2900}])
|
#
|
# result = RealDelegateOrderPositionManager().recompute_l2_place_order_position(code, order_info, 7744, 1)
|
# print(result)
|
|
|
# class L2DataTest(unittest.TestCase):
|
from utils import tool
|
|
l2_data_manager_dict = {}
|
|
|
def parse_transaction():
|
path_str = r"E:\测试数据\transaction_test.csv"
|
with open(path_str, 'r', encoding='utf-8') as file:
|
csv_reader = csv.reader(file)
|
# 获取表头:: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp]
|
headers = next(csv_reader)
|
print("表头:", headers)
|
for row in csv_reader:
|
item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2),
|
"TradeVolume": int(row[4]),
|
"OrderTime": int(row[2]), "MainSeq": int(row[6]),
|
"SubSeq": int(row[7]), "BuyNo": int(row[8]),
|
"SellNo": int(row[9]),
|
"ExecType": int(row[5])}
|
if item["TradePrice"] <=0:
|
continue
|
# print(item)
|
if item["SecurityID"] not in l2_data_manager_dict:
|
l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], True)
|
l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item)
|
for code in l2_data_manager_dict:
|
__L2TransactionDataManager: L2TransactionDataManager = l2_data_manager_dict.get(code)
|
if __L2TransactionDataManager.big_accurate_buy_order_queue.qsize() or __L2TransactionDataManager.big_accurate_sell_order_queue.qsize():
|
print(code, "大买单数量", __L2TransactionDataManager.big_accurate_buy_order_queue.qsize())
|
print(code, "大卖单数量", __L2TransactionDataManager.big_accurate_sell_order_queue.qsize())
|
|
|
if __name__ == "__main__":
|
print( tool.trade_time_add_second(tool.get_now_time_str(), -3600).replace(":", ""))
|
|
parse_transaction()
|