Administrator
2023-02-05 1252c9489b631905fbce608109260760537b224f
l2_trade_test.py
@@ -15,8 +15,7 @@
import trade_manager
from l2_data_manager import TradePointManager
from l2_data_manager_new import L2TradeDataProcessor, L2LimitUpMoneyStatisticUtil, AverageBigNumComputer, \
    SecondAverageBigNumComputer
# from l2_data_manager_new import L2TradeDataProcessor, L2LimitUpMoneyStatisticUtil, AverageBigNumComputer
from trade_queue_manager import THSBuy1VolumnManager
@@ -40,51 +39,51 @@
        redis_info.delete(k)
class VirtualTrade(unittest.TestCase):
    code = "001236"
    clear_trade_data(code)
    l2_data_manager.load_l2_data(code)
    total_datas = l2_data_manager.local_today_datas[code]
    if total_datas[0]["index"] > 0:
        # 拼接数据
        for i in range(0, total_datas[0]["index"]):
            data = total_datas[0].copy()
            data["index"] = i
            total_datas.insert(i, data)
    pos_list = log.get_l2_process_position(code)
    if pos_list[0][0] > 0:
        pos_list.insert(0, (0, pos_list[0][0] - 1))
    del pos_list[-1]
    if pos_list[-1][1] < total_datas[-1]["index"]:
        # 剩下的数据根据秒来分
        start_index = -1
        for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
            if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
                if start_index < 0:
                    start_index = i
                else:
                    pos_list.append((start_index, i - 1))
                    start_index = i
    if pos_list[-1][1] < total_datas[-1]["index"]:
        pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
    l2_data_manager_new.local_today_datas = {code: []}
    l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=21)
    for indexs in pos_list:
        L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
        # 设置封单额,获取买1量
        for i in range(0, 100):
            time_ = total_datas[indexs[0]]["val"]["time"]
            time_s = tool.get_time_as_second(time_) - i - 1
            volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
            if volumn is not None:
                l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
                                                                           tool.time_seconds_format(time_s))
                break
        print("----------------处理位置", indexs)
        L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
#
# class VirtualTrade(unittest.TestCase):
#     code = "000701"
#     clear_trade_data(code)
#     l2_data_manager.load_l2_data(code)
#     total_datas = l2_data_manager.local_today_datas[code]
#     if total_datas[0]["index"] > 0:
#         # 拼接数据
#         for i in range(0, total_datas[0]["index"]):
#             data = total_datas[0].copy()
#             data["index"] = i
#             total_datas.insert(i, data)
#
#     pos_list = log.get_l2_process_position(code)
#     if pos_list[0][0] > 0:
#         pos_list.insert(0, (0, pos_list[0][0] - 1))
#     del pos_list[-1]
#     if pos_list[-1][1] < total_datas[-1]["index"]:
#         # 剩下的数据根据秒来分
#         start_index = -1
#         for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
#             if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
#                 if start_index < 0:
#                     start_index = i
#                 else:
#                     pos_list.append((start_index, i - 1))
#                     start_index = i
#     if pos_list[-1][1] < total_datas[-1]["index"]:
#         pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
#     l2_data_manager_new.local_today_datas = {code: []}
#     l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=12)
#     for indexs in pos_list:
#         L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
#         # 设置封单额,获取买1量
#         for i in range(0, 100):
#             time_ = total_datas[indexs[0]]["val"]["time"]
#             time_s = tool.get_time_as_second(time_) - i - 1
#             volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
#             if volumn is not None:
#                 l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
#                                                                            tool.time_seconds_format(time_s))
#                 break
#
#         print("----------------处理位置", indexs)
#         L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
# class TestTrade(unittest.TestCase):