| | |
| | | from trade.huaxin import huaxin_trade_api |
| | | from utils import tool |
| | | from db import redis_manager_delegate as redis_manager |
| | | from l2 import l2_log, l2_data_manager, transaction_progress, l2_data_manager_new, l2_transaction_data_processor |
| | | from l2 import l2_log, l2_data_manager, transaction_progress, l2_data_manager_new, l2_transaction_data_processor, \ |
| | | cancel_buy_strategy |
| | | from l2.transaction_progress import TradeBuyQueue |
| | | from third_data import kpl_util, kpl_data_manager, block_info |
| | | from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager, CodePlateKeyBuyManager |
| | |
| | | l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, |
| | | 0) |
| | | |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_s_cancel(self): |
| | | code = "603363" |
| | | l2.l2_data_util.load_l2_data(code) |
| | |
| | | kpl_data_manager.KPLLimitUpDataRecordManager.get_current_reason_codes_dict()) |
| | | # l2.l2_data_manager_new.L2TradeDataProcessor.can_buy_first(code, None) |
| | | |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_add_transaction_orders(self): |
| | | code = "603359" |
| | | date = "2024-03-14" |
| | |
| | | for d in datas: |
| | | big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, d) |
| | | |
| | | def test_b_cancel(self): |
| | | code = "603032" |
| | | l2.l2_data_util.load_l2_data(code) |
| | | total_datas = l2.l2_data_util.local_today_datas.get(code) |
| | | TradeBuyQueue.get_traded_index = mock.Mock(return_value=(1000, False)) |
| | | cancel_buy_strategy.GCancelBigNumComputer().set_real_place_order_index(code, 1015, 10000, False) |
| | | cancel_buy_strategy.GCancelBigNumComputer().set_trade_progress(code, 1000, 1000) |
| | | result = cancel_buy_strategy.GCancelBigNumComputer().need_cancel_for_b(code, 1020, 1077) |
| | | print(result) |
| | | |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_transaction(self): |
| | | threading.Thread(target=async_log_util.run_sync, daemon=True).start() |