Administrator
2023-08-17 16db33c161f5dbbb423f2d84199b3f7723e9b2ad
test/l2_trade_test.py
@@ -2,6 +2,7 @@
# 清除交易数据
import decimal
import json
import multiprocessing
import random
import time
import unittest
@@ -11,6 +12,7 @@
from code_attribute import big_money_num_manager, gpcode_manager
from db.redis_manager_delegate import RedisUtils
from log_module import log, log_export
from trade.huaxin import huaxin_trade_api
from utils import tool
from db import redis_manager_delegate as redis_manager
from l2 import l2_log, l2_data_manager, transaction_progress
@@ -19,7 +21,7 @@
from third_data import kpl_util
from third_data.code_plate_key_manager import RealTimeKplMarketData, LimitUpCodesPlateKeyManager
from third_data.kpl_data_manager import KPLDataManager
from trade import trade_data_manager
from trade import trade_data_manager, current_price_process_manager
from trade.trade_queue_manager import THSBuy1VolumnManager
import l2.l2_data_manager_new, l2.l2_data_manager, l2.l2_data_util, l2.cancel_buy_strategy
@@ -90,7 +92,7 @@
    # @unittest.skip("跳过此单元测试")
    def test_trade(self):
        code = "002213"
        code = "000826"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
@@ -140,18 +142,23 @@
        LimitUpCodesPlateKeyManager().set_today_limit_up(
            KPLDataManager().get_from_file(kpl_util.KPLDataType.LIMIT_UP, tool.get_now_date_str()))
        current_price_process_manager.set_trade_price(code, round(float(gpcode_manager.get_limit_up_price(code)), 2))
        pss_server, pss_strategy = multiprocessing.Pipe()
        huaxin_trade_api.set_pipe_trade(pss_server)
        for indexs in pos_list:
            l2_log.threadIds[code] = mock.Mock(
                return_value=random.randint(0, 100000))
            # 设置封单额,获取买1量
            for i in range(0, 100):
                time_ = total_datas[indexs[0]]["val"]["time"]
                time_s = tool.get_time_as_second(time_) - i - 1
                volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
                if volumn is not None:
                    l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil().verify_num(code, int(volumn),
                                                                                    tool.time_seconds_format(time_s))
                    break
            # for i in range(0, 100):
            #     time_ = total_datas[indexs[0]]["val"]["time"]
            #     time_s = tool.get_time_as_second(time_) - i - 1
            #     volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
            #     if volumn is not None:
            #         l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil().verify_num(code, int(volumn),
            #                                                                         tool.time_seconds_format(time_s))
            #         break
            # 设置委买队列
            for i in range(0, len(buy_queues)):
                if tool.trade_time_sub(buy_queues[i][1], total_datas[indexs[0]]["val"]["time"]) > 0: