| | |
| | | |
| | | from code_attribute import big_money_num_manager, gpcode_manager |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from l2.huaxin import huaxin_delegate_postion_manager |
| | | from log_module import log, log_export, async_log_util |
| | | from trade.huaxin import huaxin_trade_api |
| | | from utils import tool |
| | |
| | | from l2 import l2_log, l2_data_manager, transaction_progress |
| | | from l2.transaction_progress import TradeBuyQueue |
| | | from third_data import kpl_util, kpl_data_manager |
| | | from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager |
| | | from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager |
| | | from third_data.kpl_data_manager import KPLDataManager |
| | | from trade import trade_data_manager, current_price_process_manager, l2_trade_util |
| | | from trade.trade_queue_manager import THSBuy1VolumnManager |
| | |
| | | except Exception as e: |
| | | pass |
| | | |
| | | # @unittest.skip("跳过此单元测试") |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_trade(self): |
| | | threading.Thread(target=async_log_util.run_sync,daemon=True).start() |
| | | threading.Thread(target=async_log_util.run_sync, daemon=True).start() |
| | | code = "603178" |
| | | clear_trade_data(code) |
| | | l2.l2_data_util.load_l2_data(code) |
| | |
| | | l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, |
| | | 0) |
| | | |
| | | def test_place_order(self): |
| | | code = "002241" |
| | | l2.l2_data_util.load_l2_data(code) |
| | | total_datas = deepcopy(l2.l2_data_util.local_today_datas[code]) |
| | | huaxin_delegate_postion_manager.place_order(code, 17.36, 100, total_datas[753]) |
| | | huaxin_delegate_postion_manager.get_l2_place_order_position(code,total_datas[755:1038]) |
| | | |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_h_cancel(self): |
| | | code = "600540" |