# 交易测试 # 清除交易数据 import decimal import itertools import json import logging import random import time import unittest from copy import deepcopy from unittest import mock import big_money_num_manager import gpcode_manager import log import tool from db import redis_manager from l2 import l2_log, l2_data_manager, transaction_progress, safe_count_manager from l2.safe_count_manager import BuyL2SafeCountManager from l2.transaction_progress import TradeBuyQueue from trade import trade_data_manager, l2_trade_factor from trade.trade_queue_manager import THSBuy1VolumnManager import l2.l2_data_manager_new, l2.l2_data_manager, l2.l2_data_util, l2.cancel_buy_strategy def clear_trade_data(code): redis_l2 = redis_manager.RedisManager(1).getRedis() keys = ["buy1_volumn_latest_info-{}", "m_big_money_begin-{}", "m_big_money_process_index-{}"] for k in keys: redis_l2.delete(k.format(code)) l2.l2_data_manager.TradePointManager.delete_buy_point(code) big_money_num_manager.reset(code) redis_trade = redis_manager.RedisManager(2).getRedis() redis_trade.delete("trade-state-{}".format(code)) trade_data_manager.placeordercountmanager.clear_place_order_count(code) redis_info = redis_manager.RedisManager(0).getRedis() keys = redis_info.keys("*{}*".format(code)) for k in keys: if k.find("pre") is not None: continue if k.find("zyltgb") is not None: continue redis_info.delete(k) BuyL2SafeCountManager().clear_data(code) transaction_progress.TradeBuyQueue().set_traded_index(code, 0) class VirtualTrade(unittest.TestCase): def __process_buy_queue(self, code, buy_queue, time_): if time_ == "10:25:14": print("进入调试") limit_up_price = gpcode_manager.get_limit_up_price(code) buy_one_price = limit_up_price if limit_up_price is not None: buy_queue_result_list = TradeBuyQueue().save(code, limit_up_price, limit_up_price, time_, buy_queue) if buy_queue_result_list: # 有数据 try: buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize( decimal.Decimal("0.00")) # 获取执行位时间 exec_time = None buy_single_index, buy_exec_index, compute_index, num, count, max_num_set,volume_rate = l2_data_manager.TradePointManager.get_buy_compute_start_data( code) if buy_exec_index: try: exec_time = l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"]["time"] except: pass buy_progress_index = TradeBuyQueue().compute_traded_index(code, buy_one_price_, buy_queue_result_list, exec_time) if buy_progress_index is not None: l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, time_, buy_exec_index, buy_progress_index, l2.l2_data_util.local_today_datas.get( code), l2.l2_data_util.local_today_num_operate_map.get( code)) log.logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{} 数据-{}", code, buy_progress_index, json.dumps(buy_queue_result_list)) except Exception as e: pass # @unittest.skip("跳过此单元测试") def test_trade(self): code = "000892" clear_trade_data(code) l2.l2_data_util.load_l2_data(code) total_datas = deepcopy(l2.l2_data_util.local_today_datas[code]) if total_datas[0]["index"] > 0: # 拼接数据 for i in range(0, total_datas[0]["index"]): data = total_datas[0].copy() data["index"] = i total_datas.insert(i, data) pos_list = log.get_l2_process_position(code) # pos_list.insert(108,(375,448)) if pos_list[0][0] > 0: pos_list.insert(0, (0, pos_list[0][0] - 1)) del pos_list[-1] if pos_list[-1][1] < total_datas[-1]["index"]: # 剩下的数据根据秒来分 start_index = -1 for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1): if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]: if start_index < 0: start_index = i else: pos_list.append((start_index, i - 1)) start_index = i if pos_list[-1][1] < total_datas[-1]["index"]: pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"])) l2.l2_data_util.local_today_datas[code].clear() l2.l2_data_util.local_today_num_operate_map[code].clear() print("id:", id(l2.l2_data_util.local_today_datas)) safe_count_manager.BuyL2SafeCountManager.get_safe_count = mock.Mock(return_value=16) l2_trade_factor.L2TradeFactorUtil.compute_m_value = mock.Mock(return_value=(14699952, "")) # pos_list.insert(41,(225,306)) # pos_list.insert(63, (345, 423)) # pos_list.insert(66, (440, 447)) # pos_list.insert(75, (472, 488)) # pos_list.insert(84, (516, 532)) # 获取交易进度 trade_progress_list, buy_queues = log.get_trade_progress(code) for indexs in pos_list: l2_log.threadIds[code] = mock.Mock( return_value=random.randint(0, 100000)) # 设置封单额,获取买1量 for i in range(0, 100): time_ = total_datas[indexs[0]]["val"]["time"] time_s = tool.get_time_as_second(time_) - i - 1 volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s)) if volumn is not None: l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn), tool.time_seconds_format(time_s)) break # 设置委买队列 for i in range(0, len(buy_queues)): if tool.trade_time_sub(buy_queues[i][1], total_datas[indexs[0]]["val"]["time"]) > 0: print("委买队列", buy_queues[i]) try: self.__process_buy_queue(code, buy_queues[i - 1][0], buy_queues[i - 1][1]) except: pass break print("----------------处理位置", indexs) if indexs[0] >= 661: print("进入调试") l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0) @unittest.skip("跳过此单元测试") def test_h_cancel(self): code = "002870" l2.l2_data_util.load_l2_data(code) total_datas = l2.l2_data_util.local_today_datas.get(code) total_datas = total_datas[:899] l2.l2_data_util.local_today_datas[code] = total_datas l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, total_datas, True) buy_progress_index = 523 l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index, l2.l2_data_util.local_today_datas.get( code), l2.l2_data_util.local_today_num_operate_map.get( code)) # class TestTrade(unittest.TestCase): # processor = L2TradeDataProcessor() # code = "002094" # l2_data_manager.load_l2_data(code) # l2_data_manager.local_today_datas[code] = l2_data_manager.local_today_datas[code][0:520] # buy_single_index = 426 # buy_exec_index = 479 # processor.random_key[code] = mock.Mock(return_value=123123) # L2LimitUpMoneyStatisticUtil._L2LimitUpMoneyStatisticUtil__get_l2_latest_money_record = mock.Mock( # return_value=(0, -1)) # # AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index) # # L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, buy_exec_index, buy_single_index, # buy_exec_index, False) # # l2_data_manager.TradePointManager.get_buy_compute_start_data = mock.Mock(return_value=(426, 479, 479, 0, 100)) # buy_single_index, buy_exec_index, compute_index, num, count = l2_data_manager.TradePointManager.get_buy_compute_start_data( # code) # processor.unreal_buy_dict[code] = mock.Mock(return_value=(479, 167234623)) # # # processor.process_order(code, 480, 519, 167234623, False) # print(buy_single_index, buy_exec_index, compute_index, num, count) # class TestData(unittest.TestCase): # code = "002103" # # l2_data_manager.load_l2_data(code) # # TradeBuyQueue().save_traded_index(code, "6.94", [1511, 888, 796]) # class TestTradedProgress(unittest.TestCase): @unittest.skip("跳过此单元测试") def test_get_progress(self): code = "000925" l2.l2_data_util.load_l2_data(code) # l2.l2_data_util.local_today_datas[code] = l2.l2_data_util.local_today_datas[code][:898] TradeBuyQueue.get_traded_index = mock.Mock(return_value=(10, False)) buy_progress_index = TradeBuyQueue().compute_traded_index(code, "9.76", [9999, 1506], "09:32:45") print("获取到交易进度:", buy_progress_index) @unittest.skip("跳过此单元测试") def test_sort(self): _start = round(time.time() * 1000) count = 0 for i in range(0, 100000): # if i > 30: count += 1 print("耗时", round(time.time() * 1000) - _start) if __name__ == "__main__": unittest.main()