"""
|
成交大单管理
|
"""
|
import json
|
|
from db.redis_manager_delegate import RedisUtils
|
from utils import tool
|
from db import redis_manager_delegate as redis_manager
|
from l2 import l2_data_util, l2_data_source_util
|
|
|
class DealComputeProgressManager:
|
__db = 2
|
__redisManager = redis_manager.RedisManager(2)
|
__deal_compute_progress_cache = {}
|
__last_progress = {}
|
__instance = None
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(DealComputeProgressManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redisManager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
__redis = cls.__get_redis()
|
try:
|
keys = RedisUtils.keys(__redis, "deal_compute_info-*")
|
for k in keys:
|
code = k.split("-")[-1]
|
val = RedisUtils.get(__redis, k)
|
val = json.loads(val)
|
tool.CodeDataCacheUtil.set_cache(cls.__deal_compute_progress_cache, code, val)
|
|
finally:
|
RedisUtils.realse(__redis)
|
|
# 获取成交计算进度
|
def __get_deal_compute_progress(self, code):
|
val = RedisUtils.get(self.__get_redis(), f"deal_compute_info-{code}")
|
if val is None:
|
return -1, 0
|
val = json.loads(val)
|
return val[0], val[1]
|
|
# 获取成交计算进度
|
def get_deal_compute_progress_cache(self, code):
|
cache_result = tool.CodeDataCacheUtil.get_cache(self.__deal_compute_progress_cache, code)
|
if cache_result[0]:
|
return cache_result[1]
|
return -1, 0
|
|
# 设置成交进度
|
def __set_deal_compute_progress(self, code, index, money):
|
tool.CodeDataCacheUtil.set_cache(self.__deal_compute_progress_cache, code, (index, money))
|
RedisUtils.setex_async(self.__db, f"deal_compute_info-{code}", tool.get_expire(), json.dumps((index, money)))
|
|
# 设置成交进度
|
def set_trade_progress(self, code, progress, total_data, local_today_num_operate_map):
|
if self.__last_progress.get(code) == progress:
|
return
|
self.__last_progress[code] = progress
|
# 计算从开始位置到成交位置
|
c_index, deal_num = self.get_deal_compute_progress_cache(code)
|
process_index = c_index
|
for i in range(c_index + 1, progress):
|
data = total_data[i]
|
val = data['val']
|
process_index = i
|
# 是否有大单
|
if not l2_data_util.is_big_money(val):
|
continue
|
if l2_data_util.L2DataUtil.is_limit_up_price_buy(val):
|
# 是否已经取消
|
cancel_data = self.__get_cancel_data(code, data, local_today_num_operate_map)
|
if cancel_data is None:
|
deal_num += val["num"] * data["re"]
|
self.__save_traded_index(code, data["index"])
|
|
self.__set_deal_compute_progress(code, process_index, deal_num)
|
|
def get_deal_big_money_num(self, code):
|
if code in self.__deal_compute_progress_cache:
|
return self.__deal_compute_progress_cache.get(code)[1]
|
compute_index, num = self.get_deal_compute_progress_cache(code)
|
return num
|
|
def __get_cancel_data(self, code, buy_data, local_today_num_operate_map):
|
val = buy_data['val']
|
cancel_datas = local_today_num_operate_map.get(
|
"{}-{}-{}".format(val["num"], "1", val["price"]))
|
if cancel_datas:
|
for cancel_data in cancel_datas:
|
buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
|
local_today_num_operate_map)
|
if buy_index == buy_data["index"]:
|
return cancel_data
|
return None
|
|
def __save_traded_index(self, code, index):
|
RedisUtils.sadd(self.__get_redis(), f"deal_indexes-{code}", index)
|
RedisUtils.expire(self.__get_redis(), f"deal_indexes-{code}", tool.get_expire())
|
|
def __get_traded_indexes(self, code):
|
return RedisUtils.smembers(self.__get_redis(), f"deal_indexes-{code}")
|
|
# 获取成交的索引
|
def get_traded_indexes(self, code):
|
return self.__get_traded_indexes(code)
|
|
|
def get_deal_big_money_num(code):
|
val = DealComputeProgressManager().get_deal_compute_progress_cache(code)
|
return val[1]
|