Administrator
6 小时以前 2f2516749615da866e96d8d24e499b7ecbb63a3e
test/test_l2.py
@@ -1,10 +1,80 @@
import csv
import time
import unittest
from unittest import mock
from huaxin_client.l2_client_test import L2TransactionDataManager
from l2 import l2_data_util
from l2.l2_data_manager_new import L2TradeDataProcessor
from l2.l2_sell_manager import L2MarketSellManager
from l2.huaxin.huaxin_delegate_postion_manager import RealDelegateOrderPositionManager
from l2.transaction_progress import TradeBuyQueue
from trade.huaxin import huaxin_trade_record_manager
# class TestRealPlaceOrderIndex(unittest.TestCase):
#
#     @unittest.skip("跳过此单元测试")
#     def test_sort(self):
#         l2_data_util.load_l2_data_all()
#         code = "600797"
#         today_datas = l2_data_util.local_today_datas.get(code)
#         RealDelegateOrderPositionManager.place_order(code, [(600, 13.60, 80030), (800, 13.60, 80035)], 23900,
#                                                      today_datas[23900])
#         datas = [today_datas[23903:23930], today_datas[23930:23998], today_datas[814:826]]
#         for data in datas:
#             results = RealDelegateOrderPositionManager.compute_l2_place_order_position(code, data, strict_match=True)
#             print(results)
#
#     @unittest.skip("跳过此单元测试")
#     def test_recompute_real_place_order_index(self):
#         """
#         重新计算真实下单位置
#         @return:
#         """
#         code = "600410"
#         l2_data_util.load_l2_data_all()
#         total_datas = l2_data_util.local_today_datas.get(code)
#         order_info = ([(2700, 7.94, 80160), (2900, 7.94, 80165)], total_datas[7723], time.time(), 7722)
#         TradeBuyQueue.get_traded_index = mock.Mock(return_value=(7722, False))
#
#         huaxin_trade_record_manager.DelegateRecordManager.list_current_delegates = mock.Mock(return_value=[{"acceptTime" : "10:41:55","volume":2700},{"acceptTime" : "10:41:55","volume":2900}])
#
#         result = RealDelegateOrderPositionManager().recompute_l2_place_order_position(code, order_info, 7744, 1)
#         print(result)
# class L2DataTest(unittest.TestCase):
from utils import tool
l2_data_manager_dict = {}
def parse_transaction():
    path_str = r"E:\测试数据\transaction_test.csv"
    with open(path_str, 'r', encoding='utf-8') as file:
        csv_reader = csv.reader(file)
        # 获取表头:: [ExchangeID,SecurityID,TradeTime,TradePrice,TradeVolume,ExecType,MainSeq,SubSeq,BuyNo,SellNo,Info1,Info2,Info3,TradeBSFlag,BizIndex,LocalTimeStamp]
        headers = next(csv_reader)
        print("表头:", headers)
        for row in csv_reader:
            item = {"SecurityID": row[1], "TradePrice": round(float(row[3].split("@")[0]), 2),
                    "TradeVolume": int(row[4]),
                    "OrderTime": int(row[2]), "MainSeq": int(row[6]),
                    "SubSeq": int(row[7]), "BuyNo": int(row[8]),
                    "SellNo": int(row[9]),
                    "ExecType": int(row[5])}
            if item["TradePrice"] <=0:
                continue
            # print(item)
            if item["SecurityID"] not in l2_data_manager_dict:
                l2_data_manager_dict[item["SecurityID"]] = L2TransactionDataManager(item["SecurityID"], True)
            l2_data_manager_dict[item["SecurityID"]].add_transaction_data(item)
    for code in l2_data_manager_dict:
        __L2TransactionDataManager: L2TransactionDataManager = l2_data_manager_dict.get(code)
        if __L2TransactionDataManager.big_accurate_buy_order_queue.qsize() or __L2TransactionDataManager.big_accurate_sell_order_queue.qsize():
            print(code, "大买单数量", __L2TransactionDataManager.big_accurate_buy_order_queue.qsize())
            print(code, "大卖单数量", __L2TransactionDataManager.big_accurate_sell_order_queue.qsize())
if __name__ == "__main__":
    codes = ["002194"]
    for code in codes:
        l2_data_util.load_l2_data(code)
        L2MarketSellManager().set_current_total_sell_data(code,"10:20:54",1500000,45612,(8.94,1200))
        L2TradeDataProcessor.test__compute_active_order_begin_pos(code, 1, 5886, 5896)
    print( tool.trade_time_add_second(tool.get_now_time_str(), -3600).replace(":", ""))
    parse_transaction()