import datetime
|
import logging
|
import random
|
import time as t
|
|
import big_money_num_manager
|
import data_process
|
import global_util
|
import gpcode_manager
|
import l2_data_log
|
import l2_data_manager
|
import l2_data_util
|
import l2_trade_factor
|
import l2_trade_test
|
import limit_up_time_manager
|
import log
|
import redis_manager
|
import ths_industry_util
|
import tool
|
import trade_manager
|
from l2_data_manager import L2DataException, TradePointManager, local_today_datas, L2DataUtil, load_l2_data, \
|
local_today_num_operate_map, L2LimitUpMoneyStatisticUtil
|
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process
|
|
|
# TODO l2数据管理
|
class L2DataManager:
|
# 格式化数据
|
def format_data(self, datas):
|
format_datas = []
|
for data in datas:
|
format_datas.append({"val": data, "re": 1})
|
return format_datas
|
|
# 获取新增数据
|
def get_add_datas(self, format_datas):
|
pass
|
|
# 从数据库加载数据
|
def load_data(self, code=None, force=False):
|
pass
|
|
# 保存数据
|
def save_datas(self, add_datas, datas):
|
pass
|
|
|
# m值大单处理
|
class L2BigNumForMProcessor:
|
|
def __init__(self):
|
self._redis_manager = redis_manager.RedisManager(1)
|
|
def __get_redis(self):
|
return self._redis_manager.getRedis()
|
|
# 保存计算开始位置
|
def set_begin_pos(self, code, index):
|
if self.__get_begin_pos(code) is None:
|
# 保存位置
|
key = "m_big_money_begin-{}".format(code)
|
self.__get_redis().setex(key, tool.get_expire(), index)
|
|
# 获取计算开始位置
|
def __get_begin_pos(self, code):
|
key = "m_big_money_begin-{}".format(code)
|
val = self.__get_redis().get(key)
|
if val is None:
|
return None
|
return int(val)
|
|
# 清除已经处理的数据
|
def clear_processed_end_index(self, code):
|
key = "m_big_money_process_index-{}".format(code)
|
self.__get_redis().delete(key)
|
|
# 添加已经处理过的单
|
def __set_processed_end_index(self, code, index):
|
key = "m_big_money_process_index-{}".format(code)
|
self.__get_redis().setex(key, tool.get_expire(), index)
|
|
# 是否已经处理过
|
def __get_processed_end_index(self, code):
|
key = "m_big_money_process_index-{}".format(code)
|
val = self.__get_redis().get(key)
|
if val is None:
|
return None
|
return int(val)
|
|
# 处理大单
|
def process(self, code, start_index, end_index, limit_up_price):
|
|
begin_pos = self.__get_begin_pos(code)
|
if begin_pos is None:
|
# 没有获取到开始买入信号
|
return
|
# 上次处理到的坐标
|
processed_index = self.__get_processed_end_index(code)
|
if processed_index is None:
|
processed_index = 0
|
if processed_index >= end_index:
|
return
|
|
start_time = round(t.time() * 1000)
|
total_datas = local_today_datas[code]
|
|
num_splites = [round(5000 / limit_up_price), round(10000 / limit_up_price), round(20000 / limit_up_price),
|
round(30000 / limit_up_price)]
|
total_num = 0
|
for i in range(max(start_index, processed_index), end_index + 1):
|
data = total_datas[i]
|
if not L2DataUtil.is_limit_up_price_buy_cancel(data["val"]) and not L2DataUtil.is_limit_up_price_buy(
|
data["val"]):
|
continue
|
# 如果是涨停买撤信号需要看数据位置是否比开始处理时间早
|
if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
|
# 获取买入信号
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
|
local_today_num_operate_map.get(code))
|
if buy_index is not None and buy_index < begin_pos:
|
continue
|
|
# 计算成交金额
|
num = int(data["val"]["num"])
|
temp = 0
|
if num < num_splites[0]:
|
pass
|
elif num < num_splites[1]:
|
temp = 1
|
elif num < num_splites[2]:
|
temp = round(4 / 3, 3)
|
elif num < num_splites[3]:
|
temp = 2
|
else:
|
temp = 4
|
count = int(temp * data["re"] * 1000)
|
if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
|
count = 0 - count
|
total_num += count
|
self.__set_processed_end_index(code, end_index)
|
big_money_num_manager.add_num(code, total_num)
|
|
print("m值大单计算范围:{}-{} 时间:{}".format(max(start_index, processed_index), end_index,
|
round(t.time() * 1000) - start_time))
|
|
|
class L2TradeDataProcessor:
|
unreal_buy_dict = {}
|
random_key = {}
|
l2BigNumForMProcessor = L2BigNumForMProcessor()
|
|
@classmethod
|
def debug(cls, code, content, *args):
|
logger_l2_trade.debug(("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args))
|
|
@classmethod
|
def cancel_debug(cls, code, content, *args):
|
logger_l2_trade_cancel.debug(
|
("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args))
|
|
@classmethod
|
def buy_debug(cls, code, content, *args):
|
logger_l2_trade_buy.debug(
|
("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args))
|
|
@classmethod
|
# 数据处理入口
|
# datas: 本次截图数据
|
# capture_timestamp:截图时间戳
|
def process(cls, code, datas, capture_timestamp):
|
cls.random_key[code] = random.randint(0, 100000)
|
now_time_str = datetime.datetime.now().strftime("%H:%M:%S")
|
__start_time = round(t.time() * 1000)
|
try:
|
if len(datas) > 0:
|
# 判断价格区间是否正确
|
if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
|
raise L2DataException(L2DataException.CODE_PRICE_ERROR,
|
"股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
|
# 加载历史数据
|
l2_data_manager.load_l2_data(code)
|
# 纠正数据
|
datas = l2_data_manager.L2DataUtil.correct_data(code, datas)
|
_start_index = 0
|
if local_today_datas.get(code) is not None and len(
|
local_today_datas[code]) > 0:
|
_start_index = local_today_datas[code][-1]["index"] + 1
|
add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
|
if len(add_datas) > 0:
|
# 拼接数据
|
local_today_datas[code].extend(add_datas)
|
l2_data_util.load_num_operate_map(l2_data_manager.local_today_num_operate_map, code, add_datas)
|
total_datas = local_today_datas[code]
|
__start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间")
|
if len(add_datas) > 0:
|
_start_time = round(t.time() * 1000)
|
latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
|
# 时间差不能太大才能处理
|
# TODO 暂时关闭处理
|
if l2_data_manager.L2DataUtil.is_same_time(now_time_str, latest_time):
|
# 判断是否已经挂单
|
state = trade_manager.get_trade_state(code)
|
start_index = len(total_datas) - len(add_datas)
|
end_index = len(total_datas) - 1
|
if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
|
# 已挂单
|
cls.__process_order(code, start_index, end_index, capture_timestamp)
|
else:
|
# 未挂单
|
cls.__process_not_order(code, start_index, end_index, capture_timestamp)
|
logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"],
|
add_datas[-1]["index"], round(t.time() * 1000) - __start_time)
|
__start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
|
# 保存数据
|
l2_data_manager.save_l2_data(code, datas, add_datas)
|
__start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存数据时间")
|
|
finally:
|
if code in cls.unreal_buy_dict:
|
cls.unreal_buy_dict.pop(code)
|
|
# 处理未挂单
|
@classmethod
|
def __process_not_order(cls, code, start_index, end_index, capture_time):
|
# 获取阈值
|
threshold_money = cls.__get_threshmoney(code)
|
cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
|
|
@classmethod
|
def __statistic_count_l2_data_for_cancel(cls, code, start_index, end_index, has_cancel_single=False):
|
index, old_buy_count, old_cancel_count = l2_data_manager.TradePointManager.get_count_info_for_cancel_buy(code)
|
for i in range(start_index, end_index + 1):
|
buy_count, buy_cancel_count = cls.__count_l2_data_for_cancel(code, i, i)
|
old_buy_count += buy_count
|
|
old_cancel_count += buy_cancel_count
|
if old_buy_count > 0 and (old_buy_count - old_cancel_count) / old_buy_count < 0.3 and has_cancel_single:
|
return i, True
|
l2_data_manager.TradePointManager.set_count_info_for_cancel_buy(code, end_index, old_buy_count,
|
old_cancel_count)
|
return end_index, False
|
|
# 处理已挂单
|
@classmethod
|
def __process_order(cls, code, start_index, end_index, capture_time, new_add=True):
|
if start_index < 0:
|
start_index = 0
|
|
if end_index < start_index:
|
return
|
# 获取买入信号起始点
|
buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
|
# 撤单计算,只看买1
|
cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
|
buy_single_index)
|
|
# 计算m值大单
|
cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
|
gpcode_manager.get_limit_up_price(code))
|
|
if cancel_data:
|
cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
|
# 撤单
|
cls.cancel_buy(code)
|
# 继续计算下单
|
cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time)
|
else:
|
# 如果有虚拟下单需要真实下单
|
unreal_buy_info = cls.unreal_buy_dict.get(code)
|
if unreal_buy_info is not None:
|
cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,执行位置:{},截图时间:{}", unreal_buy_info[0], capture_time)
|
# unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
|
# 真实下单
|
cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
|
unreal_buy_info[0])
|
|
@classmethod
|
def __buy(cls, code, capture_timestamp, last_data, last_data_index):
|
can, reason = cls.__can_buy(code)
|
# 不能购买
|
if not can:
|
cls.debug(code, "不可以下单,原因:{}", reason)
|
return
|
else:
|
cls.debug(code, "可以下单,原因:{}", reason)
|
|
# 删除虚拟下单
|
if code in cls.unreal_buy_dict:
|
cls.unreal_buy_dict.pop(code)
|
cls.debug(code, "开始执行买入")
|
try:
|
trade_manager.start_buy(code, capture_timestamp, last_data,
|
last_data_index)
|
l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
|
cls.debug(code, "执行买入成功")
|
except Exception as e:
|
cls.debug(code, "执行买入异常:{}", str(e))
|
pass
|
finally:
|
cls.debug(code, "m值影响因子:{}", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
|
|
# 是否可以买
|
@classmethod
|
def __can_buy(cls, code):
|
limit_up_time = limit_up_time_manager.get_limit_up_time(code)
|
if limit_up_time is not None and l2_data_manager.L2DataUtil.get_time_as_second(
|
limit_up_time) >= l2_data_manager.L2DataUtil.get_time_as_second(
|
"14:30:00"):
|
return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
|
|
# 同一板块中老二后面的不能买
|
industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
|
if industry is None:
|
return True, "没有获取到行业"
|
codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
|
if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
|
return False, "同一板块中老三,老四,...不能买"
|
|
# 13:00后涨停,本板块中涨停票数<29不能买
|
limit_up_time = limit_up_time_manager.get_limit_up_time(code)
|
if limit_up_time is not None:
|
if int(limit_up_time.replace(":", "")) >= 130000 and global_util.industry_hot_num.get(industry) is not None:
|
if global_util.industry_hot_num.get(industry) < 29:
|
return False, "13:00后涨停,本板块中涨停票数<29不能买"
|
# 老二,本板块中涨停票数<29 不能买
|
if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
|
industry) is not None:
|
if global_util.industry_hot_num.get(industry) < 29:
|
return False, "老二,本板块中涨停票数<29不能买"
|
# 可以下单
|
return True, None
|
|
@classmethod
|
def __cancel_buy(cls, code):
|
try:
|
cls.debug(code, "开始执行撤单")
|
trade_manager.start_cancel_buy(code)
|
# 取消买入标识
|
l2_data_manager.TradePointManager.delete_buy_point(code)
|
l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
|
l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
|
l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
|
# 删除大群撤事件的大单
|
l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code)
|
cls.debug(code, "执行撤单成功")
|
except Exception as e:
|
cls.debug(code, "执行撤单异常:{}", str(e))
|
|
@classmethod
|
def cancel_buy(cls, code):
|
# 删除大群撤事件的大单
|
l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code)
|
l2_data_manager.L2ContinueLimitUpCountManager.del_data(code)
|
|
if code in cls.unreal_buy_dict:
|
cls.unreal_buy_dict.pop(code)
|
# 取消买入标识
|
l2_data_manager.TradePointManager.delete_buy_point(code)
|
l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
|
l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
|
l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
|
# 删除大群撤事件的大单
|
l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code)
|
else:
|
cls.__cancel_buy(code)
|
|
l2_data_manager.L2BigNumProcessor.del_big_num_pos(code)
|
|
@classmethod
|
def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
|
new_add=True):
|
if compute_end_index < compute_start_index:
|
return
|
|
total_datas = local_today_datas[code]
|
# 获取买入信号计算起始位置
|
buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
|
# 是否为新获取到的位置
|
if buy_single_index is None:
|
# 有买入信号
|
has_single, _index = cls.__compute_order_begin_pos(code, max(
|
compute_start_index - 2 if new_add else compute_start_index, 0), 3, compute_end_index)
|
buy_single_index = _index
|
if has_single:
|
num = 0
|
count = 0
|
cls.debug(code, "获取到买入信号起始点:{} 数据:{}", buy_single_index, total_datas[buy_single_index])
|
# 如果是今天第一次有下单开始信号,需要设置大单起始点
|
cls.l2BigNumForMProcessor.set_begin_pos(code, buy_single_index)
|
|
if buy_single_index is None:
|
# 未获取到买入信号,终止程序
|
return None
|
# 计算m值大单
|
cls.l2BigNumForMProcessor.process(code, max(buy_single_index, compute_start_index), compute_end_index,
|
gpcode_manager.get_limit_up_price(code))
|
threshold_money = cls.__get_threshmoney(code)
|
# 买入纯买额统计
|
compute_index, buy_nums, buy_count, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,
|
compute_start_index),
|
compute_end_index, num,
|
count, threshold_money,
|
buy_single_index,
|
capture_time)
|
# 买入信号位与计算位置间隔2s及以上了
|
if rebegin_buy_pos is not None:
|
# 需要重新计算纯买额
|
cls.__start_compute_buy(code, rebegin_buy_pos, compute_end_index, threshold_money, capture_time, False)
|
return
|
|
if compute_index is not None:
|
cls.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 纯买单数:{} 数据:{}", compute_index, threshold_money, buy_nums,
|
buy_count,
|
total_datas[compute_index])
|
# 记录买入信号位置
|
cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums, buy_count)
|
# 如果是今天第一次有下单执行信号,涨停时间(买入执行位时间)
|
limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"])
|
# 虚拟下单
|
cls.unreal_buy_dict[code] = (compute_index, capture_time)
|
# 删除之前的所有撤单信号
|
l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
|
|
# 涨停封单额计算
|
L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, compute_index, buy_single_index, False)
|
|
# 数据是否处理完毕
|
if compute_index >= compute_end_index:
|
cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
|
# 数据已经处理完毕,如果还没撤单就实际下单
|
cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
|
else:
|
# 数据尚未处理完毕,进行下一步处理
|
cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
|
# 处理撤单步骤
|
cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
|
else:
|
# 未达到下单条件,保存纯买额,设置纯买额
|
# 记录买入信号位置
|
cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count)
|
pass
|
|
# 获取下单起始信号
|
@classmethod
|
def __get_order_begin_pos(cls, code):
|
buy_single_index, buy_exec_index, compute_index, num, count = l2_data_manager.TradePointManager.get_buy_compute_start_data(
|
code)
|
return buy_single_index, buy_exec_index, compute_index, num, count
|
|
# 保存下单起始信号
|
@classmethod
|
def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count):
|
TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count)
|
|
# 计算下单起始信号
|
# compute_data_count 用于计算的l2数据数量
|
@classmethod
|
def __compute_order_begin_pos(cls, code, start_index, continue_count, end_index):
|
second_930 = 9 * 3600 + 30 * 60 + 0
|
# 倒数100条数据查询
|
datas = local_today_datas[code]
|
if end_index - start_index + 1 < continue_count:
|
return False, None
|
__time = None
|
|
last_index = None
|
count = 0
|
start = None
|
|
for i in range(start_index, end_index + 1):
|
_val = datas[i]["val"]
|
# 时间要>=09:30:00
|
if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
|
continue
|
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
|
if last_index is None or (datas[last_index]["val"]["time"] == datas[i]["val"]["time"]):
|
if start is None:
|
start = i
|
last_index = i
|
count += datas[i]["re"]
|
if count >= continue_count:
|
return True, start
|
else:
|
# 本条数据作为起点
|
last_index = i
|
count = datas[i]["re"]
|
start = i
|
|
elif not L2DataUtil.is_sell(_val) and not L2DataUtil.is_sell_cancel(_val):
|
# 剔除卖与卖撤
|
last_index = None
|
count = 0
|
start = None
|
|
return False, None
|
|
@classmethod
|
def __get_threshmoney(cls, code):
|
return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
|
|
# 统计买入净买量,不计算在买入信号之前的买撤单
|
@classmethod
|
def __sum_buy_num_for_order_3(cls, code, compute_start_index, compute_end_index, origin_num, origin_count,
|
threshold_money, buy_single_index, capture_time):
|
total_datas = local_today_datas[code]
|
buy_nums = origin_num
|
buy_count = origin_count
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price is None:
|
raise Exception("涨停价无法获取")
|
# 目标手数
|
threshold_num = threshold_money / (limit_up_price * 100)
|
# 目标订单数量
|
threshold_count = l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code)
|
|
buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
|
for i in range(compute_start_index, compute_end_index + 1):
|
data = total_datas[i]
|
_val = total_datas[i]["val"]
|
if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > 1:
|
TradePointManager.delete_buy_point(code)
|
if i == compute_end_index:
|
# 数据处理完毕
|
return None, buy_nums, buy_count, None
|
else:
|
# 计算买入信号,不能同一时间开始计算
|
for ii in range(buy_single_index + 1, compute_end_index + 1):
|
if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]:
|
return None, buy_nums, buy_count, ii
|
|
# 涨停买
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
# 涨停买
|
buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
|
buy_count += int(total_datas[i]["re"])
|
if buy_nums >= threshold_num and buy_count >= threshold_count:
|
logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}", code, i, buy_nums,
|
threshold_num, buy_count, threshold_count)
|
elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
|
# 涨停买撤
|
# 判断买入位置是否在买入信号之前
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
|
local_today_num_operate_map.get(code))
|
if buy_index is not None:
|
# 找到买撤数据的买入点
|
if buy_index >= buy_single_index:
|
buy_nums -= int(_val["num"]) * int(data["re"])
|
buy_count -= int(data["re"])
|
cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
|
else:
|
cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
|
if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
|
# 同一秒,当作买入信号之后处理
|
buy_nums -= int(_val["num"]) * int(data["re"])
|
buy_count -= int(data["re"])
|
cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
|
else:
|
# 未找到买撤数据的买入点
|
cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
|
buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
|
buy_count -= int(total_datas[i]["re"])
|
cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
|
buy_nums, threshold_num)
|
# 有撤单信号,且小于阈值
|
if buy_nums >= threshold_num and buy_count >= threshold_count:
|
return i, buy_nums, buy_count, None
|
|
cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}", compute_start_index,
|
buy_nums,
|
threshold_num, buy_count, threshold_count)
|
return None, buy_nums, buy_count, None
|
|
@classmethod
|
def test(cls):
|
code = "002898"
|
l2_trade_test.clear_trade_data(code)
|
load_l2_data(code, True)
|
|
if True:
|
state = trade_manager.get_trade_state(code)
|
cls.random_key[code] = random.randint(0, 100000)
|
capture_timestamp = 1999988888
|
try:
|
if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
|
# 已挂单
|
cls.__process_order(code, 0, 140, capture_timestamp)
|
else:
|
# 未挂单
|
cls.__process_not_order(code, 0, 140, capture_timestamp)
|
except Exception as e:
|
logging.exception(e)
|
return
|
|
_start = t.time()
|
# 按s批量化数据
|
total_datas = local_today_datas[code]
|
start_time = total_datas[0]["val"]["time"]
|
start_index = 0
|
for i in range(0, len(total_datas)):
|
if total_datas[i]["val"]["time"] != start_time:
|
cls.random_key[code] = random.randint(0, 100000)
|
# 处理数据
|
start = start_index
|
# if start != 201:
|
# continue
|
end = i - 1
|
print("处理进度:{},{}".format(start, end))
|
capture_timestamp = 1999999999
|
state = trade_manager.get_trade_state(code)
|
try:
|
if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
|
# 已挂单
|
cls.__process_order(code, start, end, capture_timestamp)
|
else:
|
# 未挂单
|
cls.__process_not_order(code, start, end, capture_timestamp)
|
except Exception as e:
|
logging.exception(e)
|
# t.sleep(1)
|
start_index = i
|
start_time = total_datas[i]["val"]["time"]
|
|
print("时间花费:", round((t.time() - _start) * 1000))
|
|
|
if __name__ == "__main__":
|
L2TradeDataProcessor.test()
|