Administrator
2023-02-09 b74016d3ba3750cd27fee83675449da8f1da3926
l2_trade_test.py
@@ -1,6 +1,7 @@
# 交易测试
# 清除交易数据
import decimal
import itertools
import json
import logging
import random
@@ -41,9 +42,10 @@
    BuyL2SafeCountManager().clear_data(code)
class VirtualTrade(unittest.TestCase):
    def __process_buy_queue(code, buy_queue, time_):
    def __process_buy_queue(self,code, buy_queue, time_):
        if time_ == "09:32:37":
            print("进入调试")
        limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -56,7 +58,7 @@
                try:
                    buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                        decimal.Decimal("0.00"))
                    buy_progress_index = TradeBuyQueue().save_traded_index(code, buy_one_price_, buy_queue_result_list)
                    buy_progress_index = TradeBuyQueue().compute_traded_index(code, buy_one_price_, buy_queue_result_list)
                    if buy_progress_index is not None:
                        l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                                                                           l2.l2_data_util.local_today_datas.get(
@@ -69,70 +71,75 @@
                except Exception as e:
                    pass
    code = "002792"
    clear_trade_data(code)
    l2.l2_data_util.load_l2_data(code)
    total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
    if total_datas[0]["index"] > 0:
        # 拼接数据
        for i in range(0, total_datas[0]["index"]):
            data = total_datas[0].copy()
            data["index"] = i
            total_datas.insert(i, data)
    @unittest.skip("跳过此单元测试")
    def test_trade(self):
        code = "002328"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
        if total_datas[0]["index"] > 0:
            # 拼接数据
            for i in range(0, total_datas[0]["index"]):
                data = total_datas[0].copy()
                data["index"] = i
                total_datas.insert(i, data)
    pos_list = log.get_l2_process_position(code)
    if pos_list[0][0] > 0:
        pos_list.insert(0, (0, pos_list[0][0] - 1))
    del pos_list[-1]
    if pos_list[-1][1] < total_datas[-1]["index"]:
        # 剩下的数据根据秒来分
        start_index = -1
        for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
            if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
                if start_index < 0:
                    start_index = i
                else:
                    pos_list.append((start_index, i - 1))
                    start_index = i
    if pos_list[-1][1] < total_datas[-1]["index"]:
        pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
    l2.l2_data_util.local_today_datas[code].clear()
    print("id:", id(l2.l2_data_util.local_today_datas))
    # l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=12)
    # pos_list.insert(41,(225,306))
    # pos_list.insert(63, (345, 423))
    # pos_list.insert(66, (440, 447))
    # pos_list.insert(75, (472, 488))
    # pos_list.insert(84, (516, 532))
        pos_list = log.get_l2_process_position(code)
        pos_list.insert(108,(375,448))
        if pos_list[0][0] > 0:
            pos_list.insert(0, (0, pos_list[0][0] - 1))
        del pos_list[-1]
        if pos_list[-1][1] < total_datas[-1]["index"]:
            # 剩下的数据根据秒来分
            start_index = -1
            for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
                if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
                    if start_index < 0:
                        start_index = i
                    else:
                        pos_list.append((start_index, i - 1))
                        start_index = i
        if pos_list[-1][1] < total_datas[-1]["index"]:
            pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
        l2.l2_data_util.local_today_datas[code].clear()
        l2.l2_data_util.local_today_num_operate_map[code].clear()
    # 获取交易进度
    trade_progress_list, buy_queues = log.get_trade_progress(code)
        print("id:", id(l2.l2_data_util.local_today_datas))
        # l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=12)
        # pos_list.insert(41,(225,306))
        # pos_list.insert(63, (345, 423))
        # pos_list.insert(66, (440, 447))
        # pos_list.insert(75, (472, 488))
        # pos_list.insert(84, (516, 532))
    for indexs in pos_list:
        l2.l2_data_manager_new.L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
        # 设置封单额,获取买1量
        for i in range(0, 100):
            time_ = total_datas[indexs[0]]["val"]["time"]
            time_s = tool.get_time_as_second(time_) - i - 1
            volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
            if volumn is not None:
                l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
                                                                              tool.time_seconds_format(time_s))
                break
        # 设置委买队列
        for i in range(0, len(buy_queues)):
            if tool.trade_time_sub(buy_queues[i][1], total_datas[indexs[0]]["val"]["time"]) > 0:
                print("委买队列", buy_queues[i])
                try:
                    __process_buy_queue(code, buy_queues[i - 1][0], buy_queues[i - 1][1])
                except:
                    pass
                break
        # 获取交易进度
        trade_progress_list, buy_queues = log.get_trade_progress(code)
        print("----------------处理位置", indexs)
        if indexs[0] >= 224:
            print("进入调试")
        l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
        for indexs in pos_list:
            l2.l2_data_manager_new.L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
            # 设置封单额,获取买1量
            for i in range(0, 100):
                time_ = total_datas[indexs[0]]["val"]["time"]
                time_s = tool.get_time_as_second(time_) - i - 1
                volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
                if volumn is not None:
                    l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
                                                                                  tool.time_seconds_format(time_s))
                    break
            # 设置委买队列
            for i in range(0, len(buy_queues)):
                if tool.trade_time_sub(buy_queues[i][1], total_datas[indexs[0]]["val"]["time"]) > 0:
                    print("委买队列", buy_queues[i])
                    try:
                        self.__process_buy_queue(code, buy_queues[i - 1][0], buy_queues[i - 1][1])
                    except:
                        pass
                    break
            print("----------------处理位置", indexs)
            if indexs[0] >= 224:
                print("进入调试")
            l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
# class TestTrade(unittest.TestCase):
@@ -167,5 +174,25 @@
#     # TradeBuyQueue().save_traded_index(code, "6.94", [1511, 888, 796])
#
class TestTradedProgress(unittest.TestCase):
    @unittest.skip("跳过此单元测试")
    def test_get_progress(self):
        code = "002328"
        l2.l2_data_util.load_l2_data(code)
        TradeBuyQueue.get_traded_index = mock.Mock(return_value=(10, False))
        buy_progress_index = TradeBuyQueue().compute_traded_index(code, "6.94", [1270, 9999, 1973])
    @unittest.skip("跳过此单元测试")
    def test_sort(self):
        list = [1, 2, 3]
        result_list = itertools.permutations(list, 3)
        print(result_list)
        for r in result_list:
            print(r)
if __name__ == "__main__":
    unittest.main()