Administrator
2024-03-26 5a2a5f064101fe294a299e2d1543ed95bd2080a7
test/l2_trade_test.py
@@ -14,6 +14,7 @@
from db.redis_manager_delegate import RedisUtils
from l2.huaxin import huaxin_delegate_postion_manager
from l2.l2_sell_manager import L2MarketSellManager
from l2.l2_transaction_data_manager import HuaXinSellOrderStatisticManager
from log_module import log, log_export, async_log_util
from trade.huaxin import huaxin_trade_api
from utils import tool
@@ -230,7 +231,7 @@
        l2.l2_data_util.local_today_datas[code] = total_datas
        l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, total_datas, True)
    # @unittest.skip("跳过此单元测试")
    @unittest.skip("跳过此单元测试")
    def test_block(self):
        code = "603778"
        # KPLCodeJXBlockManager().load_jx_blocks(code, 23.52,23.62,
@@ -250,6 +251,23 @@
                                                     kpl_data_manager.KPLLimitUpDataRecordManager.get_current_reason_codes_dict())
        # l2.l2_data_manager_new.L2TradeDataProcessor.can_buy_first(code, None)
    def test_transaction_orders(self):
        code = "603359"
        l2.l2_data_util.load_l2_data(code)
        total_datas = l2.l2_data_util.local_today_datas.get(code)
        total_datas = total_datas[:90]
        l2.l2_data_util.local_today_datas[code] = total_datas
        datas = [('603359', 5.85, 224224, 9320665, 6, 286791, 695423, 711926, 'N'),
                 ('603359', 5.85, 512600, 9320665, 6, 286792, 695424, 711926, 'N'),
                 ('603359', 5.85, 300, 9320665, 6, 286793, 695428, 711926, 'N'),
                 ('603359', 5.85, 142876, 9320665, 6, 286794, 695429, 711926, 'N')]
        big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, datas)
        order_begin_pos = l2.l2_data_manager.OrderBeginPosInfo(buy_single_index=4, buy_exec_index=11)
        index = 77
        l2.cancel_buy_strategy.FCancelBigNumComputer().set_real_order_index(code, index, False)
        TradeBuyQueue().set_traded_index(code, 4)
        l2.cancel_buy_strategy.FCancelBigNumComputer().need_cancel_for_p(code, big_sell_order_info, order_begin_pos)
    @unittest.skip("跳过此单元测试")
    def test_transaction(self):
        threading.Thread(target=async_log_util.run_sync, daemon=True).start()