# 交易测试
|
# 清除交易数据
|
import decimal
|
import json
|
import multiprocessing
|
import random
|
import threading
|
import time
|
import unittest
|
from copy import deepcopy
|
from unittest import mock
|
|
import constant
|
from cancel_strategy.s_l_h_cancel_strategy import SCancelBigNumComputer
|
from code_attribute import big_money_num_manager, gpcode_manager
|
from db.redis_manager_delegate import RedisUtils
|
from l2.l2_sell_manager import L2MarketSellManager
|
from l2.l2_transaction_data_manager import HuaXinSellOrderStatisticManager
|
from log_module import log, log_export, async_log_util
|
from trade.buy_radical.radical_buy_data_manager import RadicalBuyDataManager, RadicalBuyBlockManager
|
from utils import tool, init_data_util
|
from db import redis_manager_delegate as redis_manager
|
from l2 import l2_log, l2_data_manager, transaction_progress, l2_data_manager_new, l2_transaction_data_processor, \
|
cancel_buy_strategy
|
from l2.transaction_progress import TradeBuyQueue
|
from third_data import kpl_util, kpl_data_manager, block_info
|
from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager
|
from third_data.kpl_data_manager import KPLDataManager
|
from trade import trade_data_manager, current_price_process_manager, l2_trade_util, trade_manager
|
import l2.l2_data_manager_new, l2.l2_data_manager, l2.l2_data_util, l2.cancel_buy_strategy
|
|
|
def clear_trade_data(code):
|
redis_l2 = redis_manager.RedisManager(1).getRedis()
|
keys = ["buy1_volumn_latest_info-{}", "m_big_money_begin-{}", "m_big_money_process_index-{}"]
|
for k in keys:
|
RedisUtils.delete(redis_l2, k.format(code), auto_free=False)
|
RedisUtils.realse(redis_l2)
|
l2.l2_data_manager.TradePointManager().delete_buy_point(code)
|
big_money_num_manager.reset(code)
|
RedisUtils.delete(redis_manager.RedisManager(2).getRedis(), "trade-state-{}".format(code))
|
trade_data_manager.PlaceOrderCountManager().clear_place_order_count(code)
|
redis_info = redis_manager.RedisManager(0).getRedis()
|
keys = RedisUtils.keys(redis_info, "*{}*".format(code), auto_free=False)
|
for k in keys:
|
if k.find("pre") is not None:
|
continue
|
if k.find("zyltgb") is not None:
|
continue
|
RedisUtils.delete(redis_info, k, auto_free=False)
|
RedisUtils.realse(redis_info)
|
|
transaction_progress.TradeBuyQueue().set_traded_index(code, 0)
|
l2_trade_util.remove_from_forbidden_trade_codes(code)
|
|
|
class VirtualTrade(unittest.TestCase):
|
|
def __process_buy_queue(self, code, buy_queue, time_):
|
if time_ == "10:25:14":
|
print("进入调试")
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
buy_one_price = limit_up_price
|
if limit_up_price is not None:
|
buy_queue_result_list = TradeBuyQueue().save(code, limit_up_price, limit_up_price, time_,
|
buy_queue)
|
if buy_queue_result_list:
|
# 有数据
|
try:
|
buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
|
decimal.Decimal("0.00"))
|
# 获取执行位时间
|
exec_time = None
|
buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data(
|
code)
|
if buy_exec_index:
|
try:
|
exec_time = l2.l2_data_util.local_today_datas.get(code)[buy_exec_index]["val"]["time"]
|
except:
|
pass
|
buy_progress_index = TradeBuyQueue().compute_traded_index(code, buy_one_price_,
|
buy_queue_result_list, exec_time)
|
if buy_progress_index is not None:
|
log.logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{} 数据-{}", code,
|
buy_progress_index,
|
json.dumps(buy_queue_result_list))
|
except Exception as e:
|
pass
|
|
# @unittest.skip("跳过此单元测试")
|
def test_trade(self):
|
constant.TEST = True
|
trade_manager.TradeStateManager().open_buy()
|
threading.Thread(target=async_log_util.run_sync, daemon=True).start()
|
code = "000590"
|
clear_trade_data(code)
|
l2.l2_data_util.load_l2_data(code)
|
total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
|
l2.l2_data_util.local_today_num_operate_map.get(code).clear()
|
l2.l2_data_util.local_today_buyno_map.get(code).clear()
|
l2.l2_data_util.local_today_canceled_buyno_map.get(code).clear()
|
|
if total_datas[0]["index"] > 0:
|
# 拼接数据
|
for i in range(0, total_datas[0]["index"]):
|
data = total_datas[0].copy()
|
data["index"] = i
|
total_datas.insert(i, data)
|
l2_market_datas = log_export.load_l2_market_data()
|
l2_market_datas = l2_market_datas.get(code)
|
if l2_market_datas:
|
l2_market_datas.reverse()
|
|
pos_list = log_export.get_l2_process_position(code)
|
# pos_list.insert(108,(375,448))
|
if pos_list[0][0] > 0:
|
pos_list.insert(0, (0, pos_list[0][0] - 1))
|
del pos_list[-1]
|
if pos_list[-1][1] < total_datas[-1]["index"]:
|
# 剩下的数据根据秒来分
|
start_index = -1
|
for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
|
if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
|
if start_index < 0:
|
start_index = i
|
else:
|
pos_list.append((start_index, i - 1))
|
start_index = i
|
if pos_list[-1][1] < total_datas[-1]["index"]:
|
pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
|
l2.l2_data_util.local_today_datas[code].clear()
|
l2.l2_data_util.local_today_num_operate_map[code].clear()
|
|
print("id:", id(l2.l2_data_util.local_today_datas))
|
# l2_trade_factor.L2TradeFactorUtil.compute_m_value = mock.Mock(return_value=(14699952, ""))
|
# pos_list.insert(41,(225,306))
|
# pos_list.insert(63, (345, 423))
|
# pos_list.insert(66, (440, 447))
|
# pos_list.insert(75, (472, 488))
|
# pos_list.insert(84, (516, 532))
|
|
# hook
|
# tool.get_now_time_str = mock.Mock(return_value="09:35:00")
|
|
# 获取交易进度
|
trade_progress_list, buy_queues = log_export.get_trade_progress(code)
|
trade_progress_list.reverse()
|
# jingxuan_ranks = KPLDataManager().get_from_file(kpl_util.KPLDataType.JINGXUAN_RANK, tool.get_now_date_str())
|
# industry_ranks = KPLDataManager().get_from_file(kpl_util.KPLDataType.INDUSTRY_RANK, tool.get_now_date_str())
|
# RealTimeKplMarketData().set_top_5_reasons(jingxuan_ranks)
|
# RealTimeKplMarketData().set_top_5_industry(industry_ranks)
|
|
LimitUpCodesPlateKeyManager().set_today_limit_up(
|
KPLDataManager().get_from_file(kpl_util.KPLDataType.LIMIT_UP, tool.get_now_date_str()))
|
|
kpl_data_manager.KPLLimitUpDataRecordManager.load_total_datas()
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price is None:
|
init_data_util.re_set_price_pre(code)
|
current_price_process_manager.set_trade_price(code, gpcode_manager.get_limit_up_price_as_num(code))
|
|
pss_server, pss_strategy = multiprocessing.Pipe()
|
# huaxin_trade_api.run_pipe_trade(pss_server, None, None)
|
|
for indexs in pos_list:
|
l2_log.threadIds[code] = mock.Mock(
|
return_value=random.randint(0, 100000))
|
# 设置封单额,获取买1量
|
# for i in range(0, 100):
|
# time_ = total_datas[indexs[0]]["val"]["time"]
|
# time_s = tool.get_time_as_second(time_) - i - 1
|
# volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
|
# if volumn is not None:
|
# l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil().verify_num(code, int(volumn),
|
# tool.time_seconds_format(time_s))
|
# break
|
# 设置委买队列
|
for i in range(0, len(buy_queues)):
|
if tool.trade_time_sub(buy_queues[i][1], total_datas[indexs[0]]["val"]["time"]) > 0:
|
print("委买队列", buy_queues[i])
|
try:
|
self.__process_buy_queue(code, buy_queues[i - 1][0], buy_queues[i - 1][1])
|
except:
|
pass
|
break
|
|
print("----------------处理位置", indexs)
|
if l2_market_datas:
|
for l2_m in l2_market_datas:
|
if int(str(l2_m["dataTimeStamp"])[:-3]) < int(
|
total_datas[indexs[0]]["val"]["time"].replace(":", "")):
|
time_str = f"{l2_m['dataTimeStamp']}"
|
if time_str.startswith("9"):
|
time_str = "0" + time_str
|
time_str = time_str[:6]
|
time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
|
L2MarketSellManager().set_current_total_sell_data(code, time_str,
|
l2_m["totalAskVolume"] * l2_m["avgAskPrice"],
|
l2_m["totalAskVolume"])
|
break
|
|
for tp in trade_progress_list:
|
if int(tp[1].replace(":", "")) < int(total_datas[indexs[0]]["val"]["time"].replace(":", "")):
|
TradeBuyQueue().set_traded_index(code, tp[0])
|
break
|
|
l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0,
|
0)
|
|
# @unittest.skip("跳过此单元测试")
|
def test_s_cancel(self):
|
code = "002288"
|
l2.l2_data_util.load_l2_data(code)
|
l2.l2_data_util.local_today_datas[code] = l2.l2_data_util.local_today_datas[code][:222]
|
SCancelBigNumComputer().set_real_place_order_index(code, 153, False)
|
|
TradeBuyQueue.get_traded_index = mock.Mock(return_value=(117, False))
|
order_begin_pos = l2_data_manager.OrderBeginPosInfo(buy_single_index=117, buy_exec_index=122)
|
sell_order_info = [1517999, [[2996226, 300000, 5.06, (10223471, 4876395), (10223471, 4876395)]]]
|
SCancelBigNumComputer().set_big_sell_order_info_for_cancel(code, sell_order_info,
|
order_begin_pos)
|
|
sell_order_info = [508507, [[5564445, 100100, 5.08, (10372994, 4876545), (10372994, 4876546)]]]
|
SCancelBigNumComputer().set_big_sell_order_info_for_cancel(code, sell_order_info,
|
order_begin_pos)
|
SCancelBigNumComputer().set_big_sell_order_info_for_cancel(code, sell_order_info,
|
order_begin_pos)
|
|
@unittest.skip("跳过此单元测试")
|
def test_h_cancel(self):
|
code = "600540"
|
l2.l2_data_util.load_l2_data(code)
|
total_datas = l2.l2_data_util.local_today_datas.get(code)
|
total_datas = total_datas[:899]
|
l2.l2_data_util.local_today_datas[code] = total_datas
|
l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, total_datas, True)
|
|
# @unittest.skip("跳过此单元测试")
|
def test_block(self):
|
codes_str = "300390"
|
codes = codes_str.split(",") # ["002889", "300337", "001298", "002771"]
|
kpl_data_manager.KPLLimitUpDataRecordManager.load_total_datas()
|
kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(),
|
kpl_data_manager.KPLDataManager.get_data(
|
kpl_util.KPLDataType.LIMIT_UP))
|
for code in codes:
|
# KPLCodeJXBlockManager().load_jx_blocks(code, 23.52,23.62,
|
# kpl_data_manager.KPLLimitUpDataRecordManager.get_current_reasons())
|
#
|
block_info.init_code(code)
|
# latest_current_limit_up_records = kpl_data_manager.get_latest_current_limit_up_records()
|
|
init_data_util.re_set_price_pre(code, True)
|
|
limit_up_data = kpl_data_manager.KPLLimitUpDataRecordManager.record_code_dict.get(code)
|
if limit_up_data:
|
limit_up_time = tool.to_time_str(limit_up_data[2])
|
RadicalBuyBlockManager.set_current_limit_up_datas(
|
kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas)
|
# CodePlateKeyBuyManager.update_can_buy_blocks(code,
|
# kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas,
|
# kpl_data_manager.KPLLimitUpDataRecordManager.total_datas,
|
# latest_current_limit_up_records,
|
# block_info.get_before_blocks_dict(),
|
# kpl_data_manager.KPLLimitUpDataRecordManager.get_current_limit_up_reason_codes_dict())
|
# can_buy_result = CodePlateKeyBuyManager.can_buy(code)
|
# print(can_buy_result)
|
# if can_buy_result:
|
# if can_buy_result[0]:
|
# blocks = ",".join([f"{x[0]}-{x[1] + 1}({x[2]}&{x[3] - x[2]})" for x in can_buy_result[0]])
|
# print(blocks)
|
|
yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
|
if yesterday_codes is None:
|
yesterday_codes = set()
|
result = RadicalBuyBlockManager.is_radical_buy(code, yesterday_codes)
|
print(code, result)
|
if result[0]:
|
can_buy_result = RadicalBuyDataManager.is_code_can_buy(code)
|
if can_buy_result[0]:
|
print("可以买", code, result)
|
else:
|
print("不可以买", code, can_buy_result)
|
|
# l2.l2_data_manager_new.L2TradeDataProcessor.can_buy_first(code, None)
|
|
@unittest.skip("跳过此单元测试")
|
def test_add_transaction_orders(self):
|
code = "603359"
|
date = "2024-03-14"
|
data_map = log_export.load_huaxin_transaction_map(date=date)
|
for code in data_map:
|
datas = data_map[code]
|
for d in datas:
|
big_sell_order_info = HuaXinSellOrderStatisticManager.add_transaction_datas(code, d)
|
|
@unittest.skip("跳过此单元测试")
|
def test_b_cancel(self):
|
code = "603032"
|
l2.l2_data_util.load_l2_data(code)
|
total_datas = l2.l2_data_util.local_today_datas.get(code)
|
TradeBuyQueue.get_traded_index = mock.Mock(return_value=(1000, False))
|
cancel_buy_strategy.NewGCancelBigNumComputer().set_real_place_order_index(code, 1015, 10000, False)
|
cancel_buy_strategy.GCancelBigNumComputer().set_trade_progress(code, 1000, 1000)
|
result = cancel_buy_strategy.GCancelBigNumComputer().need_cancel_for_b(code, 1020, 1077)
|
print(result)
|
|
@unittest.skip("跳过此单元测试")
|
def test_transaction(self):
|
threading.Thread(target=async_log_util.run_sync, daemon=True).start()
|
code = "003019"
|
l2.l2_data_util.load_l2_data(code)
|
# 读取成交数据
|
transaction_map = log_export.load_huaxin_transaction_map()
|
fdatas = transaction_map.get(code)
|
for d in fdatas:
|
l2_transaction_data_processor.HuaXinTransactionDatasProcessor().process_huaxin_transaction_datas(code, d)
|
|
def test_cancel(self):
|
code = "603300"
|
l2.l2_data_util.load_l2_data(code)
|
TradeBuyQueue.get_traded_index = mock.Mock(return_value=(3742, False))
|
l2.cancel_buy_strategy.RDCancelBigNumComputer._RDCancelBigNumComputer__watch_indexes_cache ={ code: {3686, 3691, 3693, 3694, 3699, 3700}}
|
l2.cancel_buy_strategy.RDCancelBigNumComputer().need_cancel(code, 4370, 4372)
|
|
|
class TestTradedProgress(unittest.TestCase):
|
@unittest.skip("跳过此单元测试")
|
def test_get_progress(self):
|
code = "000925"
|
l2.l2_data_util.load_l2_data(code)
|
# l2.l2_data_util.local_today_datas[code] = l2.l2_data_util.local_today_datas[code][:898]
|
|
TradeBuyQueue.get_traded_index = mock.Mock(return_value=(10, False))
|
buy_progress_index = TradeBuyQueue().compute_traded_index(code, "9.76",
|
[9999, 1506], "09:32:45")
|
print("获取到交易进度:", buy_progress_index)
|
|
@unittest.skip("跳过此单元测试")
|
def test_sort(self):
|
_start = round(time.time() * 1000)
|
count = 0
|
for i in range(0, 100000):
|
# if i > 30:
|
count += 1
|
print("耗时", round(time.time() * 1000) - _start)
|
|
|
if __name__ == "__main__":
|
unittest.main()
|