| | |
| | | # 交易测试 |
| | | # 清除交易数据 |
| | | import random |
| | | import unittest |
| | | from unittest import mock |
| | | |
| | | import big_money_num_manager |
| | | import l2_data_manager |
| | | import l2_data_manager_new |
| | | import l2_trade_factor |
| | | import log |
| | | import redis_manager |
| | | import tool |
| | | import trade_manager |
| | | from l2_data_manager import TradePointManager |
| | | |
| | | # from l2_data_manager_new import L2TradeDataProcessor, L2LimitUpMoneyStatisticUtil, AverageBigNumComputer |
| | | # from trade_queue_manager import THSBuy1VolumnManager |
| | | |
| | | |
| | | def clear_trade_data(code): |
| | |
| | | continue |
| | | |
| | | redis_info.delete(k) |
| | | |
| | | # |
| | | # class VirtualTrade(unittest.TestCase): |
| | | # code = "002419" |
| | | # clear_trade_data(code) |
| | | # l2_data_manager.load_l2_data(code) |
| | | # total_datas = l2_data_manager.local_today_datas[code] |
| | | # pos_list = log.get_l2_process_position(code) |
| | | # del pos_list[-1] |
| | | # if pos_list[-1][1] < total_datas[-1]["index"]: |
| | | # # 剩下的数据根据秒来分 |
| | | # start_index = -1 |
| | | # for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1): |
| | | # if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]: |
| | | # if start_index < 0: |
| | | # start_index = i |
| | | # else: |
| | | # pos_list.append((start_index, i - 1)) |
| | | # start_index = i |
| | | # if pos_list[-1][1] < total_datas[-1]["index"]: |
| | | # pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"])) |
| | | # l2_data_manager_new.local_today_datas = {code: []} |
| | | # l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=30) |
| | | # for indexs in pos_list: |
| | | # L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000)) |
| | | # # 设置封单额,获取买1量 |
| | | # for i in range(0, 100): |
| | | # time_ = total_datas[indexs[0]]["val"]["time"] |
| | | # time_s = tool.get_time_as_second(time_) - i - 1 |
| | | # volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s)) |
| | | # if volumn is not None: |
| | | # l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn), |
| | | # tool.time_seconds_format(time_s)) |
| | | # break |
| | | # |
| | | # print("----------------处理位置", indexs) |
| | | # L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0) |
| | | |
| | | |
| | | # class TestTrade(unittest.TestCase): |
| | |
| | | # print(buy_single_index, buy_exec_index, compute_index, num, count) |
| | | |
| | | |
| | | # if __name__ == "__main__": |
| | | # unittest.main() |
| | | if __name__ == "__main__": |
| | | unittest.main() |