import decimal
|
import json
|
import logging
|
import os
|
import random
|
import threading
|
import time as t
|
from datetime import datetime
|
|
import big_money_num_manager
|
import data_process
|
import global_util
|
import l2_data_util
|
|
import gpcode_manager
|
import l2_trade_factor
|
|
import redis_manager
|
import tool
|
import trade_manager
|
from log import logger_l2_trade, logger_l2_trade_cancel
|
from trade_data_manager import TradeBuyDataManager
|
|
_redisManager = redis_manager.RedisManager(1)
|
# l2数据管理
|
# 本地最新一次上传的数据
|
local_latest_datas = {}
|
# 本地今日数据
|
local_today_datas = {}
|
# 本地手数+操作那类型组成的临时变量
|
# 用于加快数据处理,用空换时间
|
local_today_num_operate_map = {}
|
|
|
class L2DataException(Exception):
|
# 价格不匹配
|
CODE_PRICE_ERROR = 1
|
# 无收盘价
|
CODE_NO_CLOSE_PRICE = 2
|
|
def __init__(self, code, msg):
|
super().__init__(self)
|
self.code = code
|
self.msg = msg
|
|
def __str__(self):
|
return self.msg
|
|
def get_code(self):
|
return self.code
|
|
|
# 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据
|
class TradePointManager:
|
@staticmethod
|
def __get_redis():
|
return _redisManager.getRedis()
|
|
# 删除买入点数据
|
@staticmethod
|
def delete_buy_point(code):
|
redis = TradePointManager.__get_redis()
|
redis.delete("buy_compute_index_info-{}".format(code))
|
|
# 获取买入点信息
|
# 返回数据为:买入点 累计纯买额 已经计算的数据索引
|
@staticmethod
|
def get_buy_compute_start_data(code):
|
redis = TradePointManager.__get_redis()
|
_key = "buy_compute_index_info-{}".format(code)
|
_data_json = redis.get(_key)
|
if _data_json is None:
|
return None, None, None, 0
|
_data = json.loads(_data_json)
|
return _data[0], _data[1], _data[2], _data[3]
|
|
# 设置买入点的值
|
# buy_single_index 买入信号位
|
# buy_exec_index 买入执行位
|
# compute_index 计算位置
|
# nums 累计纯买额
|
@staticmethod
|
def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums):
|
redis = TradePointManager.__get_redis()
|
expire = tool.get_expire()
|
_key = "buy_compute_index_info-{}".format(code)
|
if buy_single_index is not None:
|
redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums)))
|
else:
|
_buy_single_index, _buy_exec_index, _compute_index, _nums = TradePointManager.get_buy_compute_start_data(
|
code)
|
redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums)))
|
|
# 获取撤买入开始计算的信息
|
# 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
|
@staticmethod
|
def get_buy_cancel_compute_start_data(code):
|
redis = TradePointManager.__get_redis()
|
info = redis.get("buy_cancel_compute_info-{}".format(code))
|
if info is None:
|
return None, None, None
|
else:
|
info = json.loads(info)
|
return info[0], info[1], info[2]
|
|
# 设置买撤点信息
|
# buy_num 纯买额 computed_index计算到的下标 index撤买信号起点
|
|
@classmethod
|
def set_buy_cancel_compute_start_data(cls, code, buy_num, computed_index, index):
|
redis = TradePointManager.__get_redis()
|
expire = tool.get_expire()
|
redis.setex("buy_cancel_compute_info-{}".format(code), expire, json.dumps((index, buy_num, computed_index)))
|
|
# 增加撤买的纯买额
|
@classmethod
|
def add_buy_nums_for_cancel(cls, code, num_add, computed_index):
|
cancel_index, nums, c_index = cls.get_buy_cancel_compute_start_data(code)
|
if cancel_index is None:
|
raise Exception("无撤买信号记录")
|
nums += num_add
|
cls.set_buy_cancel_compute_start_data(code, nums, computed_index)
|
|
# 删除买撤点数据
|
@staticmethod
|
def delete_buy_cancel_point(code):
|
redis = TradePointManager.__get_redis()
|
redis.delete("buy_cancel_compute_info-{}".format(code))
|
|
|
def load_l2_data(code, force=False):
|
redis = _redisManager.getRedis()
|
# 加载最近的l2数据
|
if local_latest_datas.get(code) is None or force:
|
# 获取最近的数据
|
_data = redis.get("l2-data-latest-{}".format(code))
|
if _data is not None:
|
if code in local_latest_datas:
|
local_latest_datas[code] = json.loads(_data)
|
else:
|
local_latest_datas.setdefault(code, json.loads(_data))
|
# 获取今日的数据
|
|
if local_today_datas.get(code) is None or force:
|
datas = []
|
keys = redis.keys("l2-{}-*".format(code))
|
for k in keys:
|
value = redis.get(k)
|
_data = l2_data_util.l2_data_key_2_obj(k, value)
|
datas.append(_data)
|
# 排序
|
new_datas = sorted(datas,
|
key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
|
local_today_datas[code] = new_datas
|
# 根据今日数据加载
|
l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
|
|
|
def saveL2Data(code, datas, msg=""):
|
start_time = round(t.time() * 1000)
|
# 查询票是否在待监听的票里面
|
if not gpcode_manager.is_in_gp_pool(code):
|
return None
|
# 验证股价的正确性
|
redis_instance = _redisManager.getRedis()
|
|
try:
|
if redis_instance.setnx("l2-save-{}".format(code), "1") > 0:
|
|
# 计算保留的时间
|
expire = tool.get_expire()
|
i = 0
|
for _data in datas:
|
i += 1
|
key = "l2-" + _data["key"]
|
value = redis_instance.get(key)
|
if value is None:
|
# 新增
|
try:
|
value = {"index": _data["index"], "re": _data["re"]}
|
redis_instance.setex(key, expire, json.dumps(value))
|
except:
|
logging.error("更正L2数据出错:{} key:{}".format(code, key))
|
else:
|
json_value = json.loads(value)
|
if json_value["re"] != _data["re"]:
|
json_value["re"] = _data["re"]
|
redis_instance.setex(key, expire, json.dumps(json_value))
|
finally:
|
redis_instance.delete("l2-save-{}".format(code))
|
|
print("保存新数据用时:", msg, "耗时:{}".format(round(t.time() * 1000) - start_time))
|
return datas
|
|
|
# TODO 获取l2的数据
|
def get_l2_data_index(code, key):
|
pass
|
|
|
def parseL2Data(str):
|
day = datetime.now().strftime("%Y%m%d")
|
dict = json.loads(str)
|
data = dict["data"]
|
client = dict["client"]
|
code = data["code"]
|
channel = data["channel"]
|
capture_time = data["captureTime"]
|
process_time = data["processTime"]
|
data = data["data"]
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
|
# 获取涨停价
|
return day, client, channel, code, capture_time, process_time, datas
|
|
|
# 保存l2数据
|
def save_l2_data(code, datas, add_datas):
|
redis = _redisManager.getRedis()
|
# 保存最近的数据
|
redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
|
# 设置进内存
|
if code in local_latest_datas:
|
local_latest_datas[code] = datas
|
else:
|
local_latest_datas.setdefault(code, datas)
|
__set_l2_data_latest_count(code, len(datas))
|
if len(add_datas) > 0:
|
saveL2Data(code, add_datas)
|
|
|
class L2DataUtil:
|
@classmethod
|
def is_same_time(cls, time1, time2):
|
# TODO 测试
|
if global_util.TEST:
|
return True
|
time1_s = time1.split(":")
|
time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
|
time2_s = time2.split(":")
|
time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
|
if abs(time2_second - time1_second) < 3:
|
return True
|
else:
|
return False
|
|
# 获取增量数据
|
@classmethod
|
def get_add_data(cls, code, datas, _start_index):
|
if datas is not None and len(datas) < 1:
|
return []
|
last_key = ""
|
__latest_datas = local_latest_datas.get(code)
|
if __latest_datas is not None and len(__latest_datas) > 0:
|
last_key = __latest_datas[-1]["key"]
|
|
count = 0
|
start_index = -1
|
# 如果原来没有数据
|
# 设置add_data的序号
|
for n in reversed(datas):
|
count += 1
|
if n["key"] == last_key:
|
start_index = len(datas) - count
|
break
|
|
_add_datas = []
|
if len(last_key) > 0:
|
if start_index < 0 or start_index + 1 >= len(datas):
|
_add_datas = []
|
else:
|
_add_datas = datas[start_index + 1:]
|
else:
|
_add_datas = datas[start_index + 1:]
|
for i in range(0, len(_add_datas)):
|
_add_datas[i]["index"] = _start_index + i
|
|
return _add_datas
|
|
# 纠正数据,将re字段替换为较大值
|
@classmethod
|
def correct_data(cls, code, _datas):
|
latest_data = local_latest_datas.get(code)
|
if latest_data is None:
|
latest_data = []
|
save_list = []
|
for data in _datas:
|
for _ldata in latest_data:
|
# 新数据条数比旧数据多才保存
|
if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]:
|
max_re = max(_ldata["re"], data["re"])
|
_ldata["re"] = max_re
|
data["re"] = max_re
|
# 保存到数据库,更新re的数据
|
save_list.append(_ldata)
|
if len(save_list) > 0:
|
saveL2Data(code, save_list, "保存纠正数据")
|
local_latest_datas[code] = latest_data
|
return _datas
|
|
# 处理l2数据
|
@classmethod
|
def format_l2_data(cls, data, code, limit_up_price):
|
datas = []
|
dataIndexs = {}
|
same_time_num = {}
|
for item in data:
|
# 解析数据
|
time = item["time"]
|
if time in same_time_num:
|
same_time_num[time] = same_time_num[time] + 1
|
else:
|
same_time_num[time] = 1
|
|
price = float(item["price"])
|
num = item["num"]
|
limitPrice = item["limitPrice"]
|
# 涨停价
|
if limit_up_price is not None and limit_up_price == tool.to_price(decimal.Decimal(price)):
|
limitPrice = 1
|
item["limitPrice"] = "{}".format(limitPrice)
|
operateType = item["operateType"]
|
cancelTime = item["cancelTime"]
|
cancelTimeUnit = item["cancelTimeUnit"]
|
key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
|
cancelTimeUnit)
|
if key in dataIndexs:
|
# 数据重复次数+1
|
datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
|
else:
|
# 数据重复次数默认为1
|
datas.append({"key": key, "val": item, "re": 1})
|
dataIndexs.setdefault(key, len(datas) - 1)
|
l2_data_util.save_big_data(code, same_time_num, data)
|
return datas
|
|
@classmethod
|
def get_time_as_second(cls, time_str):
|
ts = time_str.split(":")
|
return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
|
|
# 是否是涨停价买
|
@classmethod
|
def is_limit_up_price_buy(cls, val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 0:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
# 是否涨停买撤
|
@classmethod
|
def is_limit_up_price_buy_cancel(cls, val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 1:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
@staticmethod
|
def is_index_end(code, index):
|
if index >= len(local_today_datas[code]) - 1:
|
return True
|
else:
|
return False
|
|
|
# L2交易数据处理器
|
# 一些常见的概念:
|
# 买入信号位置(出现下单信号的第一条数据的位置):buy_single_index
|
# 买入执行位置(符合下单信号的最后一条数据):buy_exec_index
|
# 计算位置(当前计算的整个计算的位置):compute_index
|
#
|
|
class L2TradeDataProcessor:
|
unreal_buy_dict = {}
|
random_key = {}
|
|
@classmethod
|
def debug(cls, code, content, *args):
|
logger_l2_trade.debug(("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args))
|
|
@classmethod
|
def cancel_debug(cls, code, content, *args):
|
logger_l2_trade_cancel.debug(
|
("thread-id={} code={} ".format(cls.random_key[code], code) + content).format(*args))
|
|
@classmethod
|
# 数据处理入口
|
# datas: 本次截图数据
|
# capture_timestamp:截图时间戳
|
def process(cls, code, datas, capture_timestamp):
|
cls.random_key[code] = random.randint(0, 100000)
|
now_time_str = datetime.now().strftime("%H:%M:%S")
|
__start_time = round(t.time() * 1000)
|
try:
|
if len(datas) > 0:
|
# 判断价格区间是否正确
|
if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
|
raise L2DataException(L2DataException.CODE_PRICE_ERROR,
|
"股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
|
# 加载历史数据
|
load_l2_data(code)
|
# 纠正数据
|
datas = L2DataUtil.correct_data(code, datas)
|
_start_index = 0
|
if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0:
|
_start_index = local_today_datas[code][-1]["index"] + 1
|
add_datas = L2DataUtil.get_add_data(code, datas, _start_index)
|
if len(add_datas) > 0:
|
# 拼接数据
|
local_today_datas[code].extend(add_datas)
|
l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
|
total_datas = local_today_datas[code]
|
# 买入确认点处理
|
TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas,
|
total_datas[-1],
|
add_datas)
|
if len(add_datas) > 0:
|
# 计算大单数量
|
cls.__compute_big_money_data(code, add_datas)
|
|
latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
|
# 时间差不能太大才能处理
|
if L2DataUtil.is_same_time(now_time_str, latest_time):
|
# 判断是否已经挂单
|
state = trade_manager.get_trade_state(code)
|
if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
|
# 已挂单
|
cls.__process_order(code, len(total_datas) - len(add_datas) - 3, capture_timestamp)
|
else:
|
# 未挂单
|
cls.__process_not_order(code, add_datas, capture_timestamp)
|
# 保存数据
|
save_l2_data(code, datas, add_datas)
|
finally:
|
if code in cls.unreal_buy_dict:
|
cls.unreal_buy_dict.pop(code)
|
|
@classmethod
|
def __compute_big_money_data(cls, code, add_datas):
|
# 计算大单
|
num = 0
|
for data in add_datas:
|
if l2_trade_factor.L2TradeFactorSourceDataUtil.is_big_money(data):
|
if int(data["val"]["operateType"]) == 0:
|
num += data["re"]
|
elif int(data["val"]["operateType"]) == 1:
|
num -= data["re"]
|
big_money_num_manager.add_num(code, num)
|
|
# 处理未挂单
|
@classmethod
|
def __process_not_order(cls, code, add_datas, capture_time):
|
# 获取阈值
|
threshold_money = cls.__get_threshmoney(code)
|
cls.__start_compute_buy(code, len(local_today_datas[code]) - len(add_datas), threshold_money, capture_time)
|
|
# 处理已挂单
|
@classmethod
|
def __process_order(cls, code, start_index, capture_time):
|
if start_index < 0:
|
start_index = 0
|
# 获取之前是否有记录的撤买信号
|
cancel_index, buy_num_for_cancel, computed_index = cls.__has_order_cancel_begin_pos(code)
|
buy_single_index, buy_exec_index, buy_compute_index, buy_num = cls.__get_order_begin_pos(code)
|
if cancel_index is None:
|
# 无撤单信号起始点记录
|
cancel_index = cls.__compute_order_cancel_begin_single(code, max(start_index - 3, 0), 3)
|
buy_num_for_cancel = buy_num
|
computed_index = buy_single_index
|
if cancel_index is not None:
|
cls.debug(code, "找到撤单信号,数据处理起始点:{} 数据:{}", start_index, local_today_datas[code][start_index])
|
if cancel_index is not None:
|
# 获取阈值 有买撤信号,统计撤买纯买额
|
threshold_money = cls.__get_threshmoney(code)
|
cls.__start_compute_cancel(code, cancel_index, max(computed_index, buy_exec_index + 1), buy_num_for_cancel,
|
threshold_money,
|
capture_time)
|
else:
|
# 无买撤信号,是否有虚拟下单
|
unreal_buy_info = cls.unreal_buy_dict.get(code)
|
if unreal_buy_info is not None:
|
cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入")
|
# unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
|
# 真实下单
|
cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
|
unreal_buy_info[0])
|
pass
|
|
# 开始计算撤的信号
|
@classmethod
|
def __start_compute_cancel(cls, code, cancel_index, compute_start_index, origin_num, threshold_money, capture_time):
|
# sure_type 0-虚拟挂买位 1-真实挂买位
|
computed_index, buy_num_for_cancel, sure_type = cls.__sum_buy_num_for_cancel_order(code, compute_start_index,
|
origin_num, threshold_money)
|
total_datas = local_today_datas[code]
|
if computed_index is not None:
|
cls.debug(code, "获取到撤单执行信号,信号位置:{},m2:{} 数据:{}", computed_index, threshold_money,
|
total_datas[computed_index])
|
# 发出撤买信号,需要撤买
|
if cls.unreal_buy_dict.get(code) is not None:
|
# 有虚拟下单
|
cls.debug(code, "之前有虚拟下单,执行虚拟撤买")
|
# 删除虚拟下单标记
|
cls.unreal_buy_dict.pop(code)
|
# 删除下单标记位置
|
TradePointManager.delete_buy_point(code)
|
else:
|
# 无虚拟下单,需要执行撤单
|
cls.debug(code, "之前无虚拟下单,执行真实撤单")
|
cls.__cancel_buy(code)
|
|
if computed_index < len(local_today_datas[code]) - 1:
|
# 数据尚未处理完,重新进入下单计算流程
|
cls.__start_compute_buy(code, computed_index + 1, threshold_money, capture_time)
|
pass
|
else:
|
cls.debug(code, "未获取到撤单执行信号,计算开始位置:{}, 纯买额:{}", compute_start_index, buy_num_for_cancel)
|
# 无需撤买,记录撤买信号
|
TradePointManager.set_buy_cancel_compute_start_data(code, buy_num_for_cancel, len(total_datas) - 1,
|
cancel_index)
|
# 判断是否有虚拟下单
|
unreal_buy_info = cls.unreal_buy_dict.get(code)
|
if unreal_buy_info is not None:
|
# unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
|
# 真实下单
|
cls.debug(code, "无撤单执行信号,有虚拟下单,执行真实下单")
|
cls.__buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]],
|
unreal_buy_info[0])
|
pass
|
else:
|
# 终止执行
|
pass
|
|
@classmethod
|
def __buy(cls, code, capture_timestamp, last_data, last_data_index):
|
cls.debug(code, "开始执行买入")
|
try:
|
trade_manager.start_buy(code, capture_timestamp, last_data,
|
last_data_index)
|
TradePointManager.delete_buy_cancel_point(code)
|
cls.debug(code, "执行买入成功")
|
except Exception as e:
|
cls.debug(code, "执行买入异常:{}", str(e))
|
pass
|
|
@classmethod
|
def __cancel_buy(cls, code):
|
try:
|
cls.debug(code, "开始执行撤单")
|
trade_manager.start_cancel_buy(code)
|
# 取消买入标识
|
TradePointManager.delete_buy_point(code)
|
TradePointManager.delete_buy_cancel_point(code)
|
cls.debug(code, "执行撤单成功")
|
except Exception as e:
|
cls.debug(code, "执行撤单异常:{}", str(e))
|
|
@classmethod
|
def __start_compute_buy(cls, code, compute_start_index, threshold_money, capture_time):
|
total_datas = local_today_datas[code]
|
# 获取买入信号计算起始位置
|
# index, num, finish_index = cls.__get_order_begin_pos(code)
|
buy_single_index, buy_exec_index, buy_compute_index, num = cls.__get_order_begin_pos(code)
|
# 是否为新获取到的位置
|
new_get_pos = False
|
if buy_single_index is None:
|
# 有买入信号
|
has_single, _index = cls.__compute_order_begin_pos(code, max(compute_start_index - 3, 0), 3)
|
buy_single_index = _index
|
if has_single:
|
num = 0
|
new_get_pos = True
|
cls.debug(code, "获取到买入信号起始点:{} 数据:{}", buy_single_index, total_datas[buy_single_index])
|
|
if buy_single_index is None:
|
# 未获取到买入信号,终止程序
|
return None
|
|
# 买入纯买额统计
|
compute_index, buy_nums = cls.__sum_buy_num_for_order(code, max(buy_single_index, compute_start_index), num,
|
threshold_money)
|
if compute_index is not None:
|
cls.debug(code, "获取到买入执行位置:{} m值:{} 数据:{}", compute_index, threshold_money, total_datas[compute_index])
|
# 记录买入信号位置
|
cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums)
|
# 虚拟下单
|
cls.unreal_buy_dict[code] = (compute_index, capture_time)
|
# 删除之前的所有撤单信号
|
TradePointManager.delete_buy_cancel_point(code)
|
# 数据是否处理完毕
|
if L2DataUtil.is_index_end(code, compute_index):
|
cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
|
# 数据已经处理完毕,下单
|
cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
|
else:
|
# 数据尚未处理完毕,进行下一步处理
|
cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
|
cls.__process_order(code, compute_index + 1, capture_time)
|
|
else:
|
# 未达到下单条件,保存纯买额,设置纯买额
|
# 记录买入信号位置
|
cls.__save_order_begin_data(code, buy_single_index, -1, len(total_datas) - 1, buy_nums)
|
pass
|
|
# 获取下单起始信号
|
@classmethod
|
def __get_order_begin_pos(cls, code):
|
buy_single_index, buy_exec_index, compute_index, num = TradePointManager.get_buy_compute_start_data(code)
|
return buy_single_index, buy_exec_index, compute_index, num
|
|
@classmethod
|
def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num):
|
TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num)
|
|
# 获取撤单起始位置
|
@classmethod
|
def __has_order_cancel_begin_pos(cls, code):
|
# cancel_index:撤单信号起点
|
# buy_num_for_cancel:从挂入点计算的纯买额
|
# computed_index 计算的最后位置
|
cancel_index, buy_num_for_cancel, computed_index = TradePointManager.get_buy_cancel_compute_start_data(code)
|
return cancel_index, buy_num_for_cancel, computed_index
|
|
# 计算下单起始信号
|
# compute_data_count 用于计算的l2数据数量
|
@classmethod
|
def __compute_order_begin_pos(cls, code, start_index, continue_count):
|
# 倒数100条数据查询
|
datas = local_today_datas[code]
|
__len = len(datas)
|
if len(datas) - start_index < continue_count:
|
return False, None
|
__time = None
|
_limit_up_count_1s = 0
|
_limit_up_count_1s_start_index = -1
|
|
for i in range(start_index, __len - (continue_count - 1)):
|
_val = datas[i]["val"]
|
# 时间要>=09:30:00
|
if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
|
continue
|
|
# 有连续4个涨停买就标记计算起始点
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
index_0 = i
|
index_1 = -1
|
index_2 = -1
|
# index_3 = -1
|
for j in range(index_0 + 1, __len):
|
# 涨停买
|
if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]):
|
index_1 = j
|
break
|
|
if index_1 > 0:
|
for j in range(index_1 + 1, __len):
|
# 涨停买
|
if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]):
|
index_2 = j
|
break
|
# if index_2 > 0:
|
# for j in range(index_2 + 1, __len):
|
# # 涨停买
|
# if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0:
|
# index_3 = j
|
if index_1 - index_0 == 1 and index_2 - index_1 == 1: # and index_3 - index_2 == 1
|
logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i]))
|
return True, i
|
# 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
# 涨停买
|
if __time is None:
|
_time = datas[i]["val"]["time"]
|
_limit_up_count_1s = 1
|
_limit_up_count_1s_start_index = i
|
elif _time == _val["time"]:
|
_limit_up_count_1s += 1
|
else:
|
_time = datas[i]["val"]["time"]
|
_limit_up_count_1s = 1
|
_limit_up_count_1s_start_index = i
|
elif _val["operateType"] == 1:
|
# 买撤
|
_time = None
|
_limit_up_count_1s = 0
|
_limit_up_count_1s_start_index = -1
|
|
if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1:
|
logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i]))
|
return True, _limit_up_count_1s_start_index
|
|
return False, None
|
|
# 是否有撤销信号
|
@classmethod
|
def __compute_order_cancel_begin_single(cls, code, start_index, continue_count):
|
datas = local_today_datas[code]
|
__len = len(datas)
|
if len(datas) - start_index < continue_count:
|
return None
|
for i in range(start_index, __len - (continue_count - 1)):
|
_val = datas[i]["val"]
|
if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
|
continue
|
# 有连续3个买撤
|
if L2DataUtil.is_limit_up_price_buy_cancel(_val):
|
index_0 = i
|
index_1 = -1
|
index_2 = -1
|
for j in range(index_0 + 1, __len):
|
# 涨停买
|
if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]):
|
index_1 = j
|
break
|
|
if index_1 > 0:
|
for j in range(index_1 + 1, __len):
|
# 涨停买
|
if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]):
|
index_2 = j
|
break
|
if index_1 - index_0 == 1 and index_2 - index_1 == 1:
|
logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i])))
|
return i
|
return None
|
|
# 是否可以下单
|
def __is_can_order(self):
|
pass
|
|
# 虚拟下单
|
def __unreal_order(self):
|
pass
|
|
@classmethod
|
def __get_threshmoney(cls, code):
|
return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
|
|
# 获取预估挂买位
|
@classmethod
|
def __get_sure_order_pos(cls, code):
|
index, data = TradeBuyDataManager.get_buy_sure_position(code)
|
if index is None:
|
return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1]
|
else:
|
return 1, index, data
|
|
# 统计买入净买量
|
@classmethod
|
def __sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money):
|
total_datas = local_today_datas[code]
|
buy_nums = origin_num
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price is None:
|
raise Exception("涨停价无法获取")
|
threshold_num = threshold_money / (limit_up_price * 100)
|
for i in range(compute_start_index, len(total_datas)):
|
_val = total_datas[i]["val"]
|
# 有连续4个涨停买就标记计算起始点
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
# 涨停买
|
buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
|
if buy_nums >= threshold_num:
|
return i, buy_nums
|
elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
|
# 涨停买撤
|
buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
|
return None, buy_nums
|
|
# 同一时间买入的概率计算
|
@classmethod
|
def __get_same_time_property(cls, code):
|
# 计算板块热度
|
industry = global_util.code_industry_map.get(code)
|
if industry is not None:
|
hot_num = global_util.industry_hot_num.get(industry)
|
if hot_num is not None:
|
return 1 - l2_trade_factor.L2TradeFactorUtil.get_industry_rate(hot_num)
|
return 0.5
|
|
# 统计买撤净买量
|
@classmethod
|
def __sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money):
|
buy_nums = origin_num
|
total_datas = local_today_datas[code]
|
limit_up_price = gpcode_manager.get_limit_up_price(code)
|
if limit_up_price is None:
|
raise Exception("涨停价无法获取")
|
threshold_num = threshold_money / (limit_up_price * 100)
|
# 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买
|
sure_type, sure_pos, sure_data = cls.__get_sure_order_pos(code)
|
same_time_property = cls.__get_same_time_property(code)
|
# 同一秒,在预估买入位之后的数据之和
|
property_buy_num_count = 0
|
cls.cancel_debug(code, "撤单纯买额计算位置:{}-{} 预估挂买位:{}", start_index, len(total_datas) - 1, sure_pos)
|
for i in range(start_index, len(total_datas)):
|
data = total_datas[i]
|
_val = data["val"]
|
if L2DataUtil.is_limit_up_price_buy(_val):
|
# 涨停买
|
if i < sure_pos:
|
buy_nums += int(_val["num"]) * int(data["re"])
|
elif sure_data["val"]["time"] == _val["time"]:
|
# 同一秒买入,而且还在预估买入位之后
|
property_buy_num_count += int(_val["num"]) * int(data["re"])
|
|
elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
|
# 涨停撤买
|
# 判断买入位置是否在买入信号之前
|
buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
|
local_today_num_operate_map.get(code))
|
if buy_index is not None:
|
# 找到买撤数据的买入点
|
if buy_index < sure_pos:
|
buy_nums -= int(_val["num"]) * int(data["re"])
|
cls.cancel_debug(code, "{}数据在预估买入位之前 撤买纯买额:{}", i, buy_nums * limit_up_price)
|
else:
|
cls.cancel_debug(code, "{}数据在预估买入位之后,买入位:{}", i, buy_index)
|
if sure_data["val"]["time"] == buy_data["val"]["time"]:
|
# 同一秒,而且还在预估买入位之后按概率计算
|
property_buy_num_count -= int(_val["num"]) * int(data["re"])
|
cls.debug(code, "{}数据买入位与预估买入位在同一秒", i)
|
else:
|
# 未找到买撤数据的买入点
|
cls.cancel_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
|
|
property_buy_num = round(property_buy_num_count * same_time_property)
|
cls.cancel_debug(code, "预估买入点之后同一秒买入手数-{},位置-{},总手数:{}", property_buy_num, i, buy_nums + property_buy_num)
|
if buy_nums + property_buy_num <= threshold_num:
|
return i, buy_nums + property_buy_num, sure_type
|
return None, buy_nums + round(property_buy_num_count * same_time_property), sure_type
|
|
@classmethod
|
def test(cls):
|
code = "002336"
|
cls.random_key[code] = random.randint(0, 100000)
|
load_l2_data(code)
|
try:
|
# cls.__sum_buy_num_for_cancel_order(code, 112, 100000, 10000000)
|
has_single, _index = cls.__compute_order_begin_pos(code, max(9, 0), 3)
|
print(has_single, _index)
|
except Exception as e:
|
logging.exception(e)
|
|
|
def __get_time_second(time_str):
|
ts = time_str.split(":")
|
return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
|
|
|
second_930 = 9 * 3600 + 30 * 60 + 0
|
|
|
# 是否是涨停价买
|
def __is_limit_up_price_buy(val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 0:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
|
def __is_limit_up_price_buy_cancel(val):
|
if int(val["limitPrice"]) != 1:
|
return False
|
|
if int(val["operateType"]) != 1:
|
return False
|
|
price = float(val["price"])
|
num = int(val["num"])
|
if price * num * 100 < 50 * 10000:
|
return False
|
return True
|
|
|
# 获取涨停买入起始点
|
def __get_limit_up_buy_start(code, data_count, __continue_count):
|
# logger.info("__get_limit_up_buy_start:{},{},{}".format(code, data_count, __continue_count))
|
# 倒数100条数据查询
|
datas = local_today_datas[code]
|
__len = len(datas)
|
if __len < __continue_count:
|
return None
|
start_index = 0
|
if data_count > __len:
|
data_count = __len
|
|
if __len > data_count:
|
start_index = __len - data_count
|
__time = None
|
_limit_up_count_1s = 0
|
_limit_up_count_1s_start_index = -1
|
|
for i in range(start_index, __len - (__continue_count - 1)):
|
_val = datas[i]["val"]
|
# 时间要>=09:30:00
|
if __get_time_second(_val["time"]) < second_930:
|
continue
|
|
# 有连续4个涨停买就标记计算起始点
|
if __is_limit_up_price_buy(_val):
|
index_0 = i
|
index_1 = -1
|
index_2 = -1
|
# index_3 = -1
|
for j in range(index_0 + 1, __len):
|
# 涨停买
|
if __is_limit_up_price_buy(datas[j]["val"]):
|
index_1 = j
|
break
|
|
if index_1 > 0:
|
for j in range(index_1 + 1, __len):
|
# 涨停买
|
if __is_limit_up_price_buy(datas[j]["val"]):
|
index_2 = j
|
break
|
# if index_2 > 0:
|
# for j in range(index_2 + 1, __len):
|
# # 涨停买
|
# if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0:
|
# index_3 = j
|
if index_1 - index_0 == 1 and index_2 - index_1 == 1: # and index_3 - index_2 == 1
|
logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i]))
|
return i
|
# 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点
|
if __is_limit_up_price_buy(_val):
|
# 涨停买
|
if __time is None:
|
_time = datas[i]["val"]["time"]
|
_limit_up_count_1s = 1
|
_limit_up_count_1s_start_index = i
|
elif _time == _val["time"]:
|
_limit_up_count_1s += 1
|
else:
|
_time = datas[i]["val"]["time"]
|
_limit_up_count_1s = 1
|
_limit_up_count_1s_start_index = i
|
elif _val["operateType"] == 1:
|
# 买撤
|
_time = None
|
_limit_up_count_1s = 0
|
_limit_up_count_1s_start_index = -1
|
|
if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1:
|
logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i]))
|
return _limit_up_count_1s_start_index
|
|
return None
|
|
|
# 获取涨停撤销起始点
|
def __get_limit_up_buy_cancel_start(code, data_count, __continue_count):
|
# logger.info("__get_limit_up_buy_cancel_start:{},{},{}".format(code, data_count, __continue_count))
|
# 倒数100条数据查询
|
datas = local_today_datas[code]
|
__len = len(datas)
|
if __len < __continue_count:
|
return None
|
start_index = 0
|
if data_count > __len:
|
data_count = __len
|
|
if __len > data_count:
|
start_index = __len - data_count
|
for i in range(start_index, __len - (__continue_count - 1)):
|
_val = datas[i]["val"]
|
if __get_time_second(_val["time"]) < second_930:
|
continue
|
# 有连续3个买撤
|
if __is_limit_up_price_buy_cancel(_val):
|
index_0 = i
|
index_1 = -1
|
index_2 = -1
|
for j in range(index_0 + 1, __len):
|
# 涨停买
|
if __is_limit_up_price_buy_cancel(datas[j]["val"]):
|
index_1 = j
|
break
|
|
if index_1 > 0:
|
for j in range(index_1 + 1, __len):
|
# 涨停买
|
if __is_limit_up_price_buy_cancel(datas[j]["val"]):
|
index_2 = j
|
break
|
if index_1 - index_0 == 1 and index_2 - index_1 == 1:
|
logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i])))
|
return i
|
return None
|
|
|
# 是否有禁止交易特征
|
def __is_have_forbidden_feature(code, data_count, __continue_count):
|
# 09:30:00出现连续撤销的数量大于一定的值
|
# 倒数100条数据查询
|
datas = local_today_datas[code]
|
__len = len(datas)
|
if __len < __continue_count:
|
return None
|
start_index = 0
|
if data_count > __len:
|
data_count = __len
|
if __len > data_count:
|
start_index = __len - data_count
|
cancel_start = -1
|
cancel_count = 0
|
for i in range(start_index, __len):
|
_val = datas[i]["val"]
|
if _val["time"] == "09:30:00" and i - 1 >= 0 and datas[i - 1]["val"]["time"] != "09:30:00":
|
# 09:30第一条数据
|
if _val["operateType"] == 1:
|
cancel_start = i
|
else:
|
return False
|
elif cancel_start > -1:
|
# 连续撤销
|
if _val["operateType"] == 1 and i - cancel_start == 1:
|
cancel_start = i
|
cancel_count += 1
|
if cancel_count >= __continue_count:
|
return True
|
else:
|
return False
|
return False
|
|
|
# 设置最新的l2数据采集的数量
|
def __set_l2_data_latest_count(code, count):
|
redis = _redisManager.getRedis()
|
key = "latest-l2-count-{}".format(code)
|
redis.setex(key, 2, count)
|
pass
|
|
|
# 获取代码最近的l2数据数量
|
def get_l2_data_latest_count(code):
|
if code is None or len(code) < 1:
|
return 0
|
redis = _redisManager.getRedis()
|
key = "latest-l2-count-{}".format(code)
|
|
result = redis.get(key)
|
if result is None:
|
return 0
|
else:
|
return int(result)
|
|
|
# 初始化l2固定代码库
|
def init_l2_fixed_codes():
|
key = "l2-fixed-codes"
|
redis = _redisManager.getRedis()
|
count = redis.scard(key)
|
if count > 0:
|
redis.delete(key)
|
redis.sadd(key, "000000")
|
redis.expire(key, tool.get_expire())
|
|
|
# 移除l2固定监控代码
|
def remove_from_l2_fixed_codes(code):
|
key = "l2-fixed-codes"
|
redis = _redisManager.getRedis()
|
redis.srem(key, code)
|
|
|
# 添加代码到L2固定监控
|
def add_to_l2_fixed_codes(code):
|
key = "l2-fixed-codes"
|
redis = _redisManager.getRedis()
|
redis.sadd(key, code)
|
redis.expire(key, tool.get_expire())
|
|
|
# 是否在l2固定监控代码中
|
def is_in_l2_fixed_codes(code):
|
key = "l2-fixed-codes"
|
redis = _redisManager.getRedis()
|
return redis.sismember(key, code)
|
|
|
if __name__ == "__main__":
|
L2TradeDataProcessor.test()
|