| | |
| | | # 交易测试 |
| | | # 清除交易数据 |
| | | import decimal |
| | | import itertools |
| | | import json |
| | | import logging |
| | | import random |
| | |
| | | BuyL2SafeCountManager().clear_data(code) |
| | | |
| | | |
| | | |
| | | class VirtualTrade(unittest.TestCase): |
| | | |
| | | def __process_buy_queue(code, buy_queue, time_): |
| | | def __process_buy_queue(self,code, buy_queue, time_): |
| | | if time_ == "09:32:37": |
| | | print("进入调试") |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | |
| | | try: |
| | | buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize( |
| | | decimal.Decimal("0.00")) |
| | | buy_progress_index = TradeBuyQueue().save_traded_index(code, buy_one_price_, buy_queue_result_list) |
| | | buy_progress_index = TradeBuyQueue().compute_traded_index(code, buy_one_price_, buy_queue_result_list) |
| | | if buy_progress_index is not None: |
| | | l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index, |
| | | l2.l2_data_util.local_today_datas.get( |
| | |
| | | except Exception as e: |
| | | pass |
| | | |
| | | code = "002792" |
| | | clear_trade_data(code) |
| | | l2.l2_data_util.load_l2_data(code) |
| | | total_datas = deepcopy(l2.l2_data_util.local_today_datas[code]) |
| | | if total_datas[0]["index"] > 0: |
| | | # 拼接数据 |
| | | for i in range(0, total_datas[0]["index"]): |
| | | data = total_datas[0].copy() |
| | | data["index"] = i |
| | | total_datas.insert(i, data) |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_trade(self): |
| | | code = "002328" |
| | | clear_trade_data(code) |
| | | l2.l2_data_util.load_l2_data(code) |
| | | total_datas = deepcopy(l2.l2_data_util.local_today_datas[code]) |
| | | if total_datas[0]["index"] > 0: |
| | | # 拼接数据 |
| | | for i in range(0, total_datas[0]["index"]): |
| | | data = total_datas[0].copy() |
| | | data["index"] = i |
| | | total_datas.insert(i, data) |
| | | |
| | | pos_list = log.get_l2_process_position(code) |
| | | if pos_list[0][0] > 0: |
| | | pos_list.insert(0, (0, pos_list[0][0] - 1)) |
| | | del pos_list[-1] |
| | | if pos_list[-1][1] < total_datas[-1]["index"]: |
| | | # 剩下的数据根据秒来分 |
| | | start_index = -1 |
| | | for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1): |
| | | if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]: |
| | | if start_index < 0: |
| | | start_index = i |
| | | else: |
| | | pos_list.append((start_index, i - 1)) |
| | | start_index = i |
| | | if pos_list[-1][1] < total_datas[-1]["index"]: |
| | | pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"])) |
| | | l2.l2_data_util.local_today_datas[code].clear() |
| | | print("id:", id(l2.l2_data_util.local_today_datas)) |
| | | # l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=12) |
| | | # pos_list.insert(41,(225,306)) |
| | | # pos_list.insert(63, (345, 423)) |
| | | # pos_list.insert(66, (440, 447)) |
| | | # pos_list.insert(75, (472, 488)) |
| | | # pos_list.insert(84, (516, 532)) |
| | | pos_list = log.get_l2_process_position(code) |
| | | pos_list.insert(108,(375,448)) |
| | | if pos_list[0][0] > 0: |
| | | pos_list.insert(0, (0, pos_list[0][0] - 1)) |
| | | del pos_list[-1] |
| | | if pos_list[-1][1] < total_datas[-1]["index"]: |
| | | # 剩下的数据根据秒来分 |
| | | start_index = -1 |
| | | for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1): |
| | | if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]: |
| | | if start_index < 0: |
| | | start_index = i |
| | | else: |
| | | pos_list.append((start_index, i - 1)) |
| | | start_index = i |
| | | if pos_list[-1][1] < total_datas[-1]["index"]: |
| | | pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"])) |
| | | l2.l2_data_util.local_today_datas[code].clear() |
| | | l2.l2_data_util.local_today_num_operate_map[code].clear() |
| | | |
| | | # 获取交易进度 |
| | | trade_progress_list, buy_queues = log.get_trade_progress(code) |
| | | print("id:", id(l2.l2_data_util.local_today_datas)) |
| | | # l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=12) |
| | | # pos_list.insert(41,(225,306)) |
| | | # pos_list.insert(63, (345, 423)) |
| | | # pos_list.insert(66, (440, 447)) |
| | | # pos_list.insert(75, (472, 488)) |
| | | # pos_list.insert(84, (516, 532)) |
| | | |
| | | for indexs in pos_list: |
| | | l2.l2_data_manager_new.L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000)) |
| | | # 设置封单额,获取买1量 |
| | | for i in range(0, 100): |
| | | time_ = total_datas[indexs[0]]["val"]["time"] |
| | | time_s = tool.get_time_as_second(time_) - i - 1 |
| | | volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s)) |
| | | if volumn is not None: |
| | | l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn), |
| | | tool.time_seconds_format(time_s)) |
| | | break |
| | | # 设置委买队列 |
| | | for i in range(0, len(buy_queues)): |
| | | if tool.trade_time_sub(buy_queues[i][1], total_datas[indexs[0]]["val"]["time"]) > 0: |
| | | print("委买队列", buy_queues[i]) |
| | | try: |
| | | __process_buy_queue(code, buy_queues[i - 1][0], buy_queues[i - 1][1]) |
| | | except: |
| | | pass |
| | | break |
| | | # 获取交易进度 |
| | | trade_progress_list, buy_queues = log.get_trade_progress(code) |
| | | |
| | | print("----------------处理位置", indexs) |
| | | if indexs[0] >= 224: |
| | | print("进入调试") |
| | | l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0) |
| | | for indexs in pos_list: |
| | | l2.l2_data_manager_new.L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000)) |
| | | # 设置封单额,获取买1量 |
| | | for i in range(0, 100): |
| | | time_ = total_datas[indexs[0]]["val"]["time"] |
| | | time_s = tool.get_time_as_second(time_) - i - 1 |
| | | volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s)) |
| | | if volumn is not None: |
| | | l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn), |
| | | tool.time_seconds_format(time_s)) |
| | | break |
| | | # 设置委买队列 |
| | | for i in range(0, len(buy_queues)): |
| | | if tool.trade_time_sub(buy_queues[i][1], total_datas[indexs[0]]["val"]["time"]) > 0: |
| | | print("委买队列", buy_queues[i]) |
| | | try: |
| | | self.__process_buy_queue(code, buy_queues[i - 1][0], buy_queues[i - 1][1]) |
| | | except: |
| | | pass |
| | | break |
| | | |
| | | print("----------------处理位置", indexs) |
| | | if indexs[0] >= 224: |
| | | print("进入调试") |
| | | l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0) |
| | | |
| | | |
| | | # class TestTrade(unittest.TestCase): |
| | |
| | | # # TradeBuyQueue().save_traded_index(code, "6.94", [1511, 888, 796]) |
| | | |
| | | |
| | | # |
| | | |
| | | class TestTradedProgress(unittest.TestCase): |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_get_progress(self): |
| | | code = "002328" |
| | | l2.l2_data_util.load_l2_data(code) |
| | | |
| | | TradeBuyQueue.get_traded_index = mock.Mock(return_value=(10, False)) |
| | | buy_progress_index = TradeBuyQueue().compute_traded_index(code, "6.94", [1270, 9999, 1973]) |
| | | |
| | | @unittest.skip("跳过此单元测试") |
| | | def test_sort(self): |
| | | list = [1, 2, 3] |
| | | result_list = itertools.permutations(list, 3) |
| | | print(result_list) |
| | | for r in result_list: |
| | | print(r) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | unittest.main() |