# 交易测试 # 清除交易数据 import unittest import big_money_num_manager from db import redis_manager from trade import trade_data_manager from l2.l2_data_manager import TradePointManager # from l2_data_manager_new import L2TradeDataProcessor, L2LimitUpMoneyStatisticUtil, AverageBigNumComputer def clear_trade_data(code): redis_l2 = redis_manager.RedisManager(1).getRedis() keys = ["buy1_volumn_latest_info-{}", "m_big_money_begin-{}", "m_big_money_process_index-{}"] for k in keys: redis_l2.delete(k.format(code)) TradePointManager.delete_buy_point(code) big_money_num_manager.reset(code) redis_trade = redis_manager.RedisManager(2).getRedis() redis_trade.delete("trade-state-{}".format(code)) trade_data_manager.placeordercountmanager.clear_place_order_count(code) redis_info = redis_manager.RedisManager(0).getRedis() keys = redis_info.keys("*{}*".format(code)) for k in keys: if k.find("pre") is not None: continue if k.find("zyltgb") is not None: continue redis_info.delete(k) # # class VirtualTrade(unittest.TestCase): # code = "000701" # clear_trade_data(code) # l2_data_manager.load_l2_data(code) # total_datas = l2_data_manager.local_today_datas[code] # if total_datas[0]["index"] > 0: # # 拼接数据 # for i in range(0, total_datas[0]["index"]): # data = total_datas[0].copy() # data["index"] = i # total_datas.insert(i, data) # # pos_list = log.get_l2_process_position(code) # if pos_list[0][0] > 0: # pos_list.insert(0, (0, pos_list[0][0] - 1)) # del pos_list[-1] # if pos_list[-1][1] < total_datas[-1]["index"]: # # 剩下的数据根据秒来分 # start_index = -1 # for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1): # if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]: # if start_index < 0: # start_index = i # else: # pos_list.append((start_index, i - 1)) # start_index = i # if pos_list[-1][1] < total_datas[-1]["index"]: # pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"])) # l2_data_manager_new.local_today_datas = {code: []} # l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=12) # for indexs in pos_list: # L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000)) # # 设置封单额,获取买1量 # for i in range(0, 100): # time_ = total_datas[indexs[0]]["val"]["time"] # time_s = tool.get_time_as_second(time_) - i - 1 # volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s)) # if volumn is not None: # l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn), # tool.time_seconds_format(time_s)) # break # # print("----------------处理位置", indexs) # L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0) # class TestTrade(unittest.TestCase): # processor = L2TradeDataProcessor() # code = "002094" # l2_data_manager.load_l2_data(code) # l2_data_manager.local_today_datas[code] = l2_data_manager.local_today_datas[code][0:520] # buy_single_index = 426 # buy_exec_index = 479 # processor.random_key[code] = mock.Mock(return_value=123123) # L2LimitUpMoneyStatisticUtil._L2LimitUpMoneyStatisticUtil__get_l2_latest_money_record = mock.Mock( # return_value=(0, -1)) # # AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index) # # L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, buy_exec_index, buy_single_index, # buy_exec_index, False) # # l2_data_manager.TradePointManager.get_buy_compute_start_data = mock.Mock(return_value=(426, 479, 479, 0, 100)) # buy_single_index, buy_exec_index, compute_index, num, count = l2_data_manager.TradePointManager.get_buy_compute_start_data( # code) # processor.unreal_buy_dict[code] = mock.Mock(return_value=(479, 167234623)) # # # processor.process_order(code, 480, 519, 167234623, False) # print(buy_single_index, buy_exec_index, compute_index, num, count) class TestData(unittest.TestCase): code = "002103" # l2_data_manager.load_l2_data(code) # TradeBuyQueue().save_traded_index(code, "6.94", [1511, 888, 796]) if __name__ == "__main__": unittest.main()