| | |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from l2.huaxin import huaxin_delegate_postion_manager |
| | | from l2.l2_sell_manager import L2MarketSellManager |
| | | from l2.l2_transaction_data_manager import HuaXinSellOrderStatisticManager |
| | | from log_module import log, log_export, async_log_util |
| | | from trade.huaxin import huaxin_trade_api |
| | | from utils import tool |
| | |
| | | l2.l2_data_util.local_today_datas[code] = total_datas |
| | | l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, total_datas, True) |
| | | |
| | | # @unittest.skip("跳过此单元测试") |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_block(self): |
| | | code = "603778" |
| | | # KPLCodeJXBlockManager().load_jx_blocks(code, 23.52,23.62, |
| | |
| | | kpl_data_manager.KPLLimitUpDataRecordManager.get_current_reason_codes_dict()) |
| | | # l2.l2_data_manager_new.L2TradeDataProcessor.can_buy_first(code, None) |
| | | |
| | | def test_transaction_orders(self): |
| | | code = "603359" |
| | | l2.l2_data_util.load_l2_data(code) |
| | | total_datas = l2.l2_data_util.local_today_datas.get(code) |
| | | total_datas = total_datas[:90] |
| | | l2.l2_data_util.local_today_datas[code] = total_datas |
| | | datas = [('603359', 5.85, 224224, 9320665, 6, 286791, 695423, 711926, 'N'), |
| | | ('603359', 5.85, 512600, 9320665, 6, 286792, 695424, 711926, 'N'), |
| | | ('603359', 5.85, 300, 9320665, 6, 286793, 695428, 711926, 'N'), |
| | | ('603359', 5.85, 142876, 9320665, 6, 286794, 695429, 711926, 'N')] |
| | | big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, datas) |
| | | order_begin_pos = l2.l2_data_manager.OrderBeginPosInfo(buy_single_index=4, buy_exec_index=11) |
| | | index = 77 |
| | | l2.cancel_buy_strategy.FCancelBigNumComputer().set_real_order_index(code, index, False) |
| | | TradeBuyQueue().set_traded_index(code, 4) |
| | | l2.cancel_buy_strategy.FCancelBigNumComputer().need_cancel_for_p(code, big_sell_order_info, order_begin_pos) |
| | | |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_transaction(self): |
| | | threading.Thread(target=async_log_util.run_sync, daemon=True).start() |