import json
|
|
from db import redis_manager_delegate as redis_manager
|
from db.redis_manager_delegate import RedisUtils
|
from log_module import async_log_util
|
from log_module.log import logger_debug
|
from utils import tool
|
|
|
class DealAndDelegateWithBuyModeDataManager:
|
"""
|
成交与委托买入模式数据管理
|
成交数据需要持久化
|
"""
|
__instance = None
|
__db = 2
|
__redisManager = redis_manager.RedisManager(2)
|
__deal_codes = set()
|
__deal_mode_list_dict = {}
|
__delegates_mode_list_dict = {}
|
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(DealAndDelegateWithBuyModeDataManager, cls).__new__(cls, *args, **kwargs)
|
cls.__load_datas()
|
return cls.__instance
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.__redisManager.getRedis()
|
|
@classmethod
|
def __load_datas(cls):
|
keys = RedisUtils.keys(cls.__get_redis(), "deal_delegate_by_mode-*")
|
for k in keys:
|
mode = int(k.split("-")[1])
|
val = RedisUtils.get(cls.__get_redis(), k)
|
cls.__deal_mode_list_dict[mode] = json.loads(val)
|
for m in cls.__deal_mode_list_dict:
|
cls.__deal_codes |= set([x[0] for x in cls.__deal_mode_list_dict[m]])
|
|
def add_deal_code(self, code, deal_time, mode):
|
"""
|
添加成交数据
|
@param code:
|
@param deal_time:
|
@param mode: 买入模式
|
@return:
|
"""
|
async_log_util.info(logger_debug, f"下单模式统计成交:{code}-{deal_time}-{mode}")
|
# 添加成交数据
|
if code in self.__deal_codes:
|
return
|
self.__deal_codes.add(code)
|
if mode not in self.__deal_mode_list_dict:
|
self.__deal_mode_list_dict[mode] = []
|
self.__deal_mode_list_dict[mode].append((code, deal_time))
|
# 将当前数据缓存到数据库
|
RedisUtils.setex_async(self.__db, f"deal_delegate_by_mode-{mode}", tool.get_expire(),
|
json.dumps(self.__deal_mode_list_dict[mode]))
|
|
def get_deal_codes_info(self, mode):
|
"""
|
获取成交代码信息
|
@param mode:
|
@return:
|
"""
|
return self.__deal_mode_list_dict.get(mode)
|
|
def get_deal_codes(self):
|
"""
|
获取成交的代码
|
@return:
|
"""
|
return self.__deal_codes
|
|
def set_delegates_info(self, delegates):
|
"""
|
设置委托信息
|
@param delegates:[(代码,委托时间, 委托模式)]
|
@return:
|
"""
|
temp_dict = {}
|
for d in delegates:
|
mode = d[2]
|
if mode not in temp_dict:
|
temp_dict[mode] = []
|
temp_dict[mode].append((d[0], d[1]))
|
self.__delegates_mode_list_dict = temp_dict
|
|
def get_delegates_codes_info(self, mode):
|
"""
|
获取成交代码信息
|
@param mode:
|
@return:
|
"""
|
return self.__delegates_mode_list_dict.get(mode)
|