Administrator
2023-08-08 8c7519b0dc79d32a216765a1b46e736d53e3d786
l2/l2_data_manager.py
@@ -6,7 +6,6 @@
from db import redis_manager
from db.redis_manager import RedisUtils
from utils import tool
from log_module.log import logger_l2_trade_buy
from utils.tool import CodeDataCacheUtil
_db = 1
@@ -33,45 +32,70 @@
# 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据
class TradePointManager:
    __db = 1
    __redisManager = redis_manager.RedisManager(1)
    __buy_compute_index_info_cache = {}
    __buy_cancel_single_pos_cache = {}
    __instance = None
    @staticmethod
    def __get_redis():
        return _redisManager.getRedis()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradePointManager, cls).__new__(cls, *args, **kwargs)
            cls.load_data()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def load_data(cls):
        redis_ = cls.__get_redis()
        keys = RedisUtils.keys(redis_, "buy_compute_index_info-*")
        for k in keys:
            code = k.split("-")[-1]
            val = RedisUtils.get(redis_, k)
            val = json.loads(val)
            CodeDataCacheUtil.set_cache(cls.__buy_compute_index_info_cache, code, val)
        keys = RedisUtils.keys(redis_, "buy_cancel_single_pos-*")
        for k in keys:
            code = k.split("-")[-1]
            val = RedisUtils.get(redis_, k)
            CodeDataCacheUtil.set_cache(cls.__buy_cancel_single_pos_cache, code, int(val))
    # 删除买入点数据
    @staticmethod
    def delete_buy_point(code):
        CodeDataCacheUtil.clear_cache(TradePointManager.__buy_compute_index_info_cache, code)
        RedisUtils.delete_async(_db, "buy_compute_index_info-{}".format(code))
    def delete_buy_point(self, code):
        CodeDataCacheUtil.clear_cache(self.__buy_compute_index_info_cache, code)
        RedisUtils.delete_async(self.__db, "buy_compute_index_info-{}".format(code))
    # 获取买入点信息
    # 返回数据为:买入点 累计纯买额 已经计算的数据索引
    @staticmethod
    def get_buy_compute_start_data(code):
    def get_buy_compute_start_data(self, code):
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = RedisUtils.get(TradePointManager.__get_redis(), _key)
        _data_json = RedisUtils.get(self.__get_redis(), _key)
        if _data_json is None:
            return None, None, None, 0, 0, [], 0
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5], _data[6]
    @staticmethod
    def get_buy_compute_start_data_cache(code):
        cache_result = CodeDataCacheUtil.get_cache(TradePointManager.__buy_compute_index_info_cache, code)
    def get_buy_compute_start_data_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__buy_compute_index_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = TradePointManager.get_buy_compute_start_data(code)
        CodeDataCacheUtil.set_cache(TradePointManager.__buy_compute_index_info_cache, code, val)
        return val
        return None, None, None, 0, 0, [], 0
    # 设置买入点的值
    # buy_single_index 买入信号位
    # buy_exec_index 买入执行位
    # compute_index 计算位置
    # nums 累计纯买额
    @staticmethod
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count, max_num_sets,
    def set_buy_compute_start_data(self, code, buy_single_index, buy_exec_index, compute_index, nums, count,
                                   max_num_sets,
                                   volume_rate):
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
@@ -81,80 +105,41 @@
                     volume_rate)
        else:
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index, _volume_rate = TradePointManager.get_buy_compute_start_data_cache(
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index, _volume_rate = self.get_buy_compute_start_data_cache(
                code)
            data_ = (_buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets),
                     volume_rate)
        CodeDataCacheUtil.set_cache(TradePointManager.__buy_compute_index_info_cache, code, data_)
        RedisUtils.setex_async(
            _db, _key, expire,
            json.dumps(data_))
        CodeDataCacheUtil.set_cache(self.__buy_compute_index_info_cache, code, data_)
        RedisUtils.setex_async(self.__db, _key, expire, json.dumps(data_))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
    @staticmethod
    def get_buy_cancel_single_pos(code):
        info = RedisUtils.get(TradePointManager.__get_redis(), "buy_cancel_single_pos-{}".format(code))
    def get_buy_cancel_single_pos(self, code):
        info = RedisUtils.get(self.__get_redis(), "buy_cancel_single_pos-{}".format(code))
        if info is None:
            return None
        else:
            return int(info)
    def get_buy_cancel_single_pos_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy_cancel_single_pos_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    # 设置买撤点信息
    # buy_num 纯买额  computed_index计算到的下标  index撤买信号起点
    @classmethod
    def set_buy_cancel_single_pos(cls, code, index):
    def set_buy_cancel_single_pos(self, code, index):
        tool.CodeDataCacheUtil.set_cache(self.__buy_cancel_single_pos_cache, code, index)
        expire = tool.get_expire()
        RedisUtils.setex(TradePointManager.__get_redis(), "buy_cancel_single_pos-{}".format(code), expire, index)
        RedisUtils.setex_async(self.__db, "buy_cancel_single_pos-{}".format(code), expire, index)
    # 删除买撤点数据
    @classmethod
    def delete_buy_cancel_point(cls, code):
        RedisUtils.delete_async(_db, "buy_cancel_single_pos-{}".format(code))
    # 设置买撤纯买额
    @classmethod
    def set_compute_info_for_cancel_buy(cls, code, index, nums):
        expire = tool.get_expire()
        RedisUtils.setex(TradePointManager.__get_redis(), "compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, nums)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_compute_info_for_cancel_buy(cls, code):
        info = RedisUtils.get(TradePointManager.__get_redis(), "compute_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0
        else:
            info = json.loads(info)
            return info[0], info[1]
    @classmethod
    def delete_compute_info_for_cancel_buy(cls, code):
        RedisUtils.delete_async(_db, "compute_info_for_cancel_buy-{}".format(code))
    # 从买入信号开始设置涨停买与涨停撤的单数
    @classmethod
    def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count):
        expire = tool.get_expire()
        RedisUtils.setex(TradePointManager.__get_redis(), "count_info_for_cancel_buy-{}".format(code), expire,
                         json.dumps((index, buy_count, cancel_count)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_count_info_for_cancel_buy(cls, code):
        info = RedisUtils.get(TradePointManager.__get_redis(), "count_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0, 0
        else:
            info = json.loads(info)
            return info[0], info[1], info[2]
    @classmethod
    def delete_count_info_for_cancel_buy(cls, code):
        RedisUtils.delete_async(_db, "count_info_for_cancel_buy-{}".format(code))
    def delete_buy_cancel_point(self, code):
        tool.CodeDataCacheUtil.clear_cache(self.__buy_cancel_single_pos_cache, code)
        RedisUtils.delete_async(self.__db, "buy_cancel_single_pos-{}".format(code))
# 清除l2数据
@@ -207,4 +192,4 @@
if __name__ == "__main__":
    TradePointManager.get_buy_compute_start_data_cache("603912")
    TradePointManager().get_buy_compute_start_data_cache("603912")