| | |
| | | import json |
| | | import multiprocessing |
| | | import random |
| | | import threading |
| | | import time |
| | | import unittest |
| | | from copy import deepcopy |
| | |
| | | |
| | | from code_attribute import big_money_num_manager, gpcode_manager |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from log_module import log, log_export |
| | | from log_module import log, log_export, async_log_util |
| | | from trade.huaxin import huaxin_trade_api |
| | | from utils import tool |
| | | from db import redis_manager_delegate as redis_manager |
| | |
| | | |
| | | # @unittest.skip("跳过此单元测试") |
| | | def test_trade(self): |
| | | code = "002527" |
| | | threading.Thread(target=async_log_util.run_sync,daemon=True).start() |
| | | |
| | | code = "000678" |
| | | clear_trade_data(code) |
| | | l2.l2_data_util.load_l2_data(code) |
| | | total_datas = deepcopy(l2.l2_data_util.local_today_datas[code]) |
| | |
| | | current_price_process_manager.set_trade_price(code, round(float(gpcode_manager.get_limit_up_price(code)), 2)) |
| | | |
| | | pss_server, pss_strategy = multiprocessing.Pipe() |
| | | huaxin_trade_api.run_pipe_trade(pss_server) |
| | | huaxin_trade_api.run_pipe_trade(pss_server, None) |
| | | |
| | | for indexs in pos_list: |
| | | l2_log.threadIds[code] = mock.Mock( |