Administrator
2023-08-28 ba52d7ac92a36f413eacaa686f8535e859664ec6
l2/l2_data_manager.py
@@ -3,10 +3,12 @@
"""
import json
from db import redis_manager
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from utils import tool
from logs.log import logger_l2_trade_buy
from utils.tool import CodeDataCacheUtil
_db = 1
_redisManager = redis_manager.RedisManager(1)
@@ -30,168 +32,163 @@
# 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据
class TradePointManager:
    @staticmethod
    def __get_redis():
        return _redisManager.getRedis()
    __db = 1
    __redisManager = redis_manager.RedisManager(1)
    __buy_compute_index_info_cache = {}
    __buy_cancel_single_pos_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradePointManager, cls).__new__(cls, *args, **kwargs)
            cls.load_data()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def load_data(cls):
        redis_ = cls.__get_redis()
        keys = RedisUtils.keys(redis_, "buy_compute_index_info-*")
        for k in keys:
            code = k.split("-")[-1]
            val = RedisUtils.get(redis_, k)
            val = json.loads(val)
            CodeDataCacheUtil.set_cache(cls.__buy_compute_index_info_cache, code, val)
        keys = RedisUtils.keys(redis_, "buy_cancel_single_pos-*")
        for k in keys:
            code = k.split("-")[-1]
            val = RedisUtils.get(redis_, k)
            CodeDataCacheUtil.set_cache(cls.__buy_cancel_single_pos_cache, code, int(val))
    # 删除买入点数据
    @staticmethod
    def delete_buy_point(code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_compute_index_info-{}".format(code))
    def delete_buy_point(self, code):
        CodeDataCacheUtil.clear_cache(self.__buy_compute_index_info_cache, code)
        RedisUtils.delete_async(self.__db, "buy_compute_index_info-{}".format(code))
    # 获取买入点信息
    # 返回数据为:买入点 累计纯买额 已经计算的数据索引
    @staticmethod
    def get_buy_compute_start_data(code):
        redis = TradePointManager.__get_redis()
    def get_buy_compute_start_data(self, code):
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        _data_json = RedisUtils.get(self.__get_redis(), _key)
        if _data_json is None:
            return None, None, None, 0, 0, [],0
            return None, None, None, 0, 0, [], 0
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5], _data[6]
    def get_buy_compute_start_data_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__buy_compute_index_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None, None, None, 0, 0, [], 0
    # 设置买入点的值
    # buy_single_index 买入信号位
    # buy_exec_index 买入执行位
    # compute_index 计算位置
    # nums 累计纯买额
    @staticmethod
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count, max_num_sets, volume_rate):
        redis = TradePointManager.__get_redis()
    def set_buy_compute_start_data(self, code, buy_single_index, buy_exec_index, compute_index, nums, count,
                                   max_num_sets,
                                   volume_rate):
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
        data_ = None
        if buy_single_index is not None:
            redis.setex(_key, expire,
                        json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets),volume_rate)))
            data_ = (buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets),
                     volume_rate)
        else:
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index,_volume_rate = TradePointManager.get_buy_compute_start_data(code)
            redis.setex(_key, expire,
                        json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets),volume_rate)))
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index, _volume_rate = self.get_buy_compute_start_data_cache(
                code)
            data_ = (_buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets),
                     volume_rate)
        CodeDataCacheUtil.set_cache(self.__buy_compute_index_info_cache, code, data_)
        RedisUtils.setex_async(self.__db, _key, expire, json.dumps(data_))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
    @staticmethod
    def get_buy_cancel_single_pos(code):
        redis = TradePointManager.__get_redis()
        info = redis.get("buy_cancel_single_pos-{}".format(code))
    def get_buy_cancel_single_pos(self, code):
        info = RedisUtils.get(self.__get_redis(), "buy_cancel_single_pos-{}".format(code))
        if info is None:
            return None
        else:
            return int(info)
    def get_buy_cancel_single_pos_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy_cancel_single_pos_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    # 设置买撤点信息
    # buy_num 纯买额  computed_index计算到的下标  index撤买信号起点
    @classmethod
    def set_buy_cancel_single_pos(cls, code, index):
        redis = TradePointManager.__get_redis()
    def set_buy_cancel_single_pos(self, code, index):
        tool.CodeDataCacheUtil.set_cache(self.__buy_cancel_single_pos_cache, code, index)
        expire = tool.get_expire()
        redis.setex("buy_cancel_single_pos-{}".format(code), expire, index)
        RedisUtils.setex_async(self.__db, "buy_cancel_single_pos-{}".format(code), expire, index)
    # 删除买撤点数据
    @classmethod
    def delete_buy_cancel_point(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_cancel_single_pos-{}".format(code))
    # 设置买撤纯买额
    @classmethod
    def set_compute_info_for_cancel_buy(cls, code, index, nums):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, nums)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_compute_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = redis.get("compute_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0
        else:
            info = json.loads(info)
            return info[0], info[1]
    @classmethod
    def delete_compute_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("compute_info_for_cancel_buy-{}".format(code))
    # 从买入信号开始设置涨停买与涨停撤的单数
    @classmethod
    def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("count_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, buy_count, cancel_count)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_count_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = redis.get("count_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0, 0
        else:
            info = json.loads(info)
            return info[0], info[1], info[2]
    @classmethod
    def delete_count_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("count_info_for_cancel_buy-{}".format(code))
    def delete_buy_cancel_point(self, code):
        tool.CodeDataCacheUtil.clear_cache(self.__buy_cancel_single_pos_cache, code)
        RedisUtils.delete_async(self.__db, "buy_cancel_single_pos-{}".format(code))
# 清除l2数据
def clear_l2_data(code):
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    keys = redis_l2.keys("l2-{}-*".format(code))
    for k in keys:
        redis_l2.delete(k)
    try:
        keys = RedisUtils.keys(redis_l2, "l2-{}-*".format(code), auto_free=False)
        for k in keys:
            RedisUtils.delete(redis_l2, k, auto_free=False)
    redis_l2.delete("l2-data-latest-{}".format(code))
        RedisUtils.delete(redis_l2, "l2-data-latest-{}".format(code), auto_free=False)
    finally:
        RedisUtils.realse(redis_l2)
second_930 = 9 * 3600 + 30 * 60 + 0
#  初始化l2固定代码库
def init_l2_fixed_codes():
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    count = redis.scard(key)
    if count > 0:
        redis.delete(key)
    redis.sadd(key, "000000")
    redis.expire(key, tool.get_expire())
    try:
        count = RedisUtils.scard(redis, key, auto_free=False)
        if count > 0:
            RedisUtils.delete(redis, key, auto_free=False)
        RedisUtils.sadd(redis, key, "000000", auto_free=False)
        RedisUtils.expire(redis, key, tool.get_expire(), auto_free=False)
    finally:
        RedisUtils.realse(redis)
# 移除l2固定监控代码
def remove_from_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    redis.srem(key, code)
    RedisUtils.srem(_redisManager.getRedis(), key, code)
# 添加代码到L2固定监控
def add_to_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    redis.sadd(key, code)
    redis.expire(key, tool.get_expire())
    RedisUtils.sadd(_redisManager.getRedis(), key, code)
    RedisUtils.expire(_redisManager.getRedis(), key, tool.get_expire())
# 是否在l2固定监控代码中
def is_in_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    return redis.sismember(key, code)
    return RedisUtils.sismember(_redisManager.getRedis(), key, code)
if __name__ == "__main__":
    clear_l2_data("603912")
    TradePointManager().get_buy_compute_start_data_cache("603912")