"""
|
安全笔数管理
|
"""
|
# 下单L2的安全笔数管理
|
import json
|
|
from db.redis_manager_delegate import RedisUtils
|
from l2 import l2_data_source_util
|
from trade import l2_trade_factor
|
from db import redis_manager_delegate as redis_manager
|
from utils import tool
|
from l2.l2_data_util import L2DataUtil, local_today_buyno_map
|
|
|
class BuyL2SafeCountManager(object):
|
__db = 0
|
__redis_manager = redis_manager.RedisManager(0)
|
__instance = None
|
latest_place_order_info_cache = {}
|
safe_count_l2_cache = {}
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(BuyL2SafeCountManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
def __init__(self):
|
self.last_buy_queue_data = {}
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redis_manager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "safe_count_l2-*")
|
for k in keys:
|
ks = k.split("-")
|
code, last_buy_single_index = ks[1], int(ks[2])
|
val = RedisUtils.get(__redis, k)
|
val = json.loads(val)
|
tool.CodeDataCacheUtil.set_cache(cls.safe_count_l2_cache, f"{code}-{last_buy_single_index}", val)
|
keys = RedisUtils.keys(__redis, "latest_place_order_info-*")
|
for k in keys:
|
ks = k.split("-")
|
code = ks[-1]
|
val = RedisUtils.get(__redis, k)
|
val = json.loads(val)
|
tool.CodeDataCacheUtil.set_cache(cls.latest_place_order_info_cache, code, val)
|
finally:
|
RedisUtils.realse(__redis)
|
|
# 记录每一次的处理进度
|
def __save_compute_progress(self, code, last_buy_single_index, process_index, buy_num, cancel_num):
|
key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
|
tool.CodeDataCacheUtil.set_cache(self.safe_count_l2_cache, f"{code}-{last_buy_single_index}",
|
(last_buy_single_index, process_index, buy_num, cancel_num))
|
RedisUtils.setex_async(self.__db, key, tool.get_expire(),
|
json.dumps((last_buy_single_index, process_index, buy_num, cancel_num)))
|
|
# 返回数据与更新时间
|
def __get_compute_progress(self, code, last_buy_single_index):
|
key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is None:
|
return None, -1, 0, 0
|
val = json.loads(val)
|
return val[0], val[1], val[2], val[3]
|
|
def __get_compute_progress_cache(self, code, last_buy_single_index):
|
cache_result = tool.CodeDataCacheUtil.get_cache(self.safe_count_l2_cache, f"{code}-{last_buy_single_index}")
|
if cache_result[0]:
|
return cache_result[1]
|
return None, -1, 0, 0
|
|
# 保存最近的下单信息
|
def __save_latest_place_order_info(self, code, buy_single_index, buy_exec_index, cancel_index):
|
tool.CodeDataCacheUtil.set_cache(self.latest_place_order_info_cache, code,
|
(buy_single_index, buy_exec_index, cancel_index))
|
key = "latest_place_order_info-{}".format(code)
|
RedisUtils.setex_async(self.__db, key, tool.get_expire(),
|
json.dumps((buy_single_index, buy_exec_index, cancel_index)))
|
|
def __get_latest_place_order_info(self, code):
|
key = "latest_place_order_info-{}".format(code)
|
val = RedisUtils.get(self.__get_redis(), key)
|
if val is None:
|
return None, None, None
|
val = json.loads(val)
|
return val[0], val[1], val[2]
|
|
def __get_latest_place_order_info_cache(self, code):
|
cache_result = tool.CodeDataCacheUtil.get_cache(self.latest_place_order_info_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return None, None, None
|
|
def __get_all_compute_progress(self, code):
|
key_regex = f"safe_count_l2-{code}-*"
|
keys = RedisUtils.keys(self.__get_redis(), key_regex)
|
vals = []
|
for k in keys:
|
val = RedisUtils.get(self.__get_redis(), k)
|
val = json.loads(val)
|
vals.append(val)
|
return vals
|
|
def clear_data(self, code):
|
key_regex = f"safe_count_l2-{code}-*"
|
keys = RedisUtils.keys(self.__get_redis(), key_regex)
|
for k in keys:
|
RedisUtils.delete(self.__get_redis(), k)
|
|
tool.CodeDataCacheUtil.clear_cache(self.latest_place_order_info_cache, code)
|
key = f"latest_place_order_info-{code}"
|
RedisUtils.delete_async(self.__db, key)
|
|
# 获取基础的安全笔数
|
def __get_base_save_count(self, code, is_first):
|
return l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code, is_first)
|
|
# 获取最后的安全笔数
|
def get_safe_count(self, code, is_first_code, rate):
|
# rate = self.__get_rate(code)
|
count, min_count, max_count = self.__get_base_save_count(code, is_first_code)
|
# 第4次下单按第一次算
|
# if place_order_count and place_order_count >= 3:
|
# rate = 1
|
# print("--------------------------------")
|
# print("安全笔数比例:", rate)
|
# print("--------------------------------")
|
# count, min_count, max_count = self.__get_base_save_count(code, is_first_code)
|
# count = round(count * rate)
|
# if count < min_count:
|
# count = min_count
|
# if count > max_count:
|
# count = max_count
|
return int(round(count * (1 + rate), 0))
|
|
# 计算留下来的比例
|
# last_buy_single_index 上一次下单信号起始位置
|
# cancel_index 上一次取消下单的位置
|
# start_index 数据开始位置
|
# end_index 数据结束位置
|
def compute_left_rate(self, code, start_index, end_index, total_datas,
|
local_today_num_operate_map):
|
last_buy_single_index, buy_exec_index, cancel_index = self.__get_latest_place_order_info_cache(code)
|
if last_buy_single_index is None:
|
return
|
cancel_time = None
|
if cancel_index is not None:
|
cancel_time = total_datas[cancel_index]["val"]["time"]
|
# 获取处理的进度
|
last_buy_single_index_, process_index, buy_num, cancel_num = self.__get_compute_progress_cache(code,
|
last_buy_single_index)
|
|
break_index = -1
|
for i in range(start_index, end_index):
|
data = total_datas[i]
|
val = data["val"]
|
# 如果没有取消位置就一直计算下去, 计算截至时间不能大于取消时间
|
if cancel_time and int(cancel_time.replace(":", "")) < int(val["time"].replace(":", "")):
|
break_index = i
|
break
|
if break_index >= 0:
|
end_index = break_index - 1
|
# 获取开始计算的位置
|
start_compute_index = min(start_index, last_buy_single_index)
|
if start_compute_index <= process_index:
|
start_compute_index = process_index + 1
|
|
for i in range(start_compute_index, end_index):
|
data = total_datas[i]
|
val = data["val"]
|
if process_index >= i:
|
continue
|
if L2DataUtil.is_limit_up_price_buy(val):
|
# 涨停买
|
buy_num += int(val["num"]) * data["re"]
|
elif L2DataUtil.is_limit_up_price_buy_cancel(val):
|
# 获取买入信息
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data, local_today_buyno_map.get(code))
|
if buy_index is not None:
|
if last_buy_single_index <= buy_index <= end_index:
|
cancel_num += int(val["num"]) * data["re"]
|
|
process_index = end_index
|
# 保存处理进度与数量
|
self.__save_compute_progress(code, last_buy_single_index, process_index, buy_num, cancel_num)
|
|
# 获取比例
|
def __get_rate(self, code):
|
vals = self.__get_all_compute_progress(code)
|
rate = (1 - 0)
|
for val in vals:
|
temp_rate = (1 - round((val[2] - val[3]) / val[2], 4))
|
if temp_rate > 1:
|
temp_rate = 1
|
rate *= temp_rate
|
return rate
|
|
# 下单成功
|
def save_place_order_info(self, code, buy_single_index, buy_exec_index, cancel_index):
|
self.__save_latest_place_order_info(code, buy_single_index, buy_exec_index, cancel_index)
|