Administrator
2025-05-26 a846b46f15ad309a62fe400cf78dd7fc888155d7
l2/l2_data_manager.py
@@ -3,11 +3,63 @@
"""
import json
from db import redis_manager
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from utils import tool
from log_module.log import logger_l2_trade_buy
from utils.tool import CodeDataCacheUtil
_db = 1
_redisManager = redis_manager.RedisManager(1)
# 下单临时信息
class OrderBeginPosInfo(object):
    MODE_NORMAL = 0  # 普通下单
    MODE_FAST = 1
    MODE_ACTIVE = 2  # 积极动买
    MODE_RADICAL = 3  # 扫入
    MODE_OPEN_LIMIT_UP = 4  # 排1
    # mode: 0-普通交易  1-快速交易
    def __init__(self, buy_single_index=None, buy_exec_index=-1, buy_compute_index=None, num=0, count=0,
                 max_num_set=None, buy_volume_rate=None, sell_info=None, threshold_money=None, mode=0, mode_desc=None,
                 at_limit_up=False, first_limit_up_buy=False, min_order_no = None):
        self.buy_single_index = buy_single_index
        self.buy_exec_index = buy_exec_index
        self.buy_compute_index = buy_compute_index
        self.num = num
        self.count = count
        self.threshold_money = threshold_money
        if max_num_set:
            self.max_num_set = list(max_num_set)
        else:
            self.max_num_set = []
        self.buy_volume_rate = buy_volume_rate
        self.sell_info = sell_info
        self.mode = mode
        self.mode_desc = mode_desc
        # 是否是板上买
        self.at_limit_up = at_limit_up
        # 是否为首封买
        self.first_limit_up_buy = first_limit_up_buy
        # 统计批次大单成交的最小订单号
        self.min_order_no = min_order_no
    def get_max_num_set(self):
        if self.max_num_set:
            return set(self.max_num_set)
        return None
    def to_json_str(self):
        return json.dumps(vars(self))
    def to_dict(self):
        return vars(self)
    @classmethod
    def to_object(cls, json_str: str):
        d = json.loads(json_str)
        return OrderBeginPosInfo(**d)
class L2DataException(Exception):
@@ -30,168 +82,158 @@
# 交易点管理器,用于管理买入点;买撤点;距离买入点的净买入数据;距离买撤点的买撤数据
class TradePointManager:
    @staticmethod
    def __get_redis():
        return _redisManager.getRedis()
    __db = 1
    __redisManager = redis_manager.RedisManager(1)
    __buy_compute_index_info_cache = {}
    __buy_cancel_single_pos_cache = {}
    __instance = None
    # 最近的下单模式
    __latest_place_order_mode_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradePointManager, cls).__new__(cls, *args, **kwargs)
            cls.load_data()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def load_data(cls):
        redis_ = cls.__get_redis()
        keys = RedisUtils.keys(redis_, "buy_compute_index_info-*")
        for k in keys:
            code = k.split("-")[-1]
            val = RedisUtils.get(redis_, k)
            order = OrderBeginPosInfo.to_object(val)
            CodeDataCacheUtil.set_cache(cls.__buy_compute_index_info_cache, code, order)
        keys = RedisUtils.keys(redis_, "buy_cancel_single_pos-*")
        for k in keys:
            code = k.split("-")[-1]
            val = RedisUtils.get(redis_, k)
            CodeDataCacheUtil.set_cache(cls.__buy_cancel_single_pos_cache, code, int(val))
    # 删除买入点数据
    @staticmethod
    def delete_buy_point(code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_compute_index_info-{}".format(code))
    def delete_buy_point(self, code):
        CodeDataCacheUtil.clear_cache(self.__buy_compute_index_info_cache, code)
        RedisUtils.delete_async(self.__db, "buy_compute_index_info-{}".format(code))
    # 获取买入点信息
    # 返回数据为:买入点 累计纯买额 已经计算的数据索引
    @staticmethod
    def get_buy_compute_start_data(code):
        redis = TradePointManager.__get_redis()
    def get_buy_compute_start_data(self, code):
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        _data_json = RedisUtils.get(self.__get_redis(), _key)
        if _data_json is None:
            return None, None, None, 0, 0, [],0
            return None, None, None, 0, 0, [], 0
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5], _data[6]
    def get_buy_compute_start_data_cache(self, code) -> OrderBeginPosInfo:
        cache_result = CodeDataCacheUtil.get_cache(self.__buy_compute_index_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return OrderBeginPosInfo()
    # 设置买入点的值
    # buy_single_index 买入信号位
    # buy_exec_index 买入执行位
    # compute_index 计算位置
    # nums 累计纯买额
    @staticmethod
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count, max_num_sets, volume_rate):
        redis = TradePointManager.__get_redis()
    def set_buy_compute_start_data_v2(self, code, order: OrderBeginPosInfo):
        if order.mode is not None:
            self.__latest_place_order_mode_cache[code] = order.mode
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
        if buy_single_index is not None:
            redis.setex(_key, expire,
                        json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets),volume_rate)))
        data_ = None
        if order.buy_single_index is not None:
            data_ = order
        else:
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index,_volume_rate = TradePointManager.get_buy_compute_start_data(code)
            redis.setex(_key, expire,
                        json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets),volume_rate)))
            old_order = self.get_buy_compute_start_data_cache(code)
            order.buy_single_index = old_order.buy_single_index
            data_ = order
        CodeDataCacheUtil.set_cache(self.__buy_compute_index_info_cache, code, data_)
        RedisUtils.setex_async(self.__db, _key, expire, data_.to_json_str())
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
    @staticmethod
    def get_buy_cancel_single_pos(code):
        redis = TradePointManager.__get_redis()
        info = redis.get("buy_cancel_single_pos-{}".format(code))
        if info is None:
            return None
        else:
            return int(info)
    # 设置买撤点信息
    # buy_num 纯买额  computed_index计算到的下标  index撤买信号起点
    # 是否已经下单
    @classmethod
    def is_placed_order(cls, order_begin_pos: OrderBeginPosInfo):
        return order_begin_pos and order_begin_pos.buy_exec_index is not None and order_begin_pos.buy_exec_index > -1
    @classmethod
    def set_buy_cancel_single_pos(cls, code, index):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("buy_cancel_single_pos-{}".format(code), expire, index)
    # 删除买撤点数据
    @classmethod
    def delete_buy_cancel_point(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_cancel_single_pos-{}".format(code))
    # 设置买撤纯买额
    @classmethod
    def set_compute_info_for_cancel_buy(cls, code, index, nums):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, nums)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_compute_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = redis.get("compute_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0
        else:
            info = json.loads(info)
            return info[0], info[1]
    @classmethod
    def delete_compute_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("compute_info_for_cancel_buy-{}".format(code))
    # 从买入信号开始设置涨停买与涨停撤的单数
    @classmethod
    def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("count_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, buy_count, cancel_count)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_count_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = redis.get("count_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0, 0
        else:
            info = json.loads(info)
            return info[0], info[1], info[2]
    @classmethod
    def delete_count_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("count_info_for_cancel_buy-{}".format(code))
    def get_latest_place_order_mode(cls, code):
        """
        获取最近下单的模式
        @param code:
        @return:
        """
        return cls.__latest_place_order_mode_cache.get(code)
# 清除l2数据
def clear_l2_data(code):
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    keys = redis_l2.keys("l2-{}-*".format(code))
    for k in keys:
        redis_l2.delete(k)
    try:
        keys = RedisUtils.keys(redis_l2, "l2-{}-*".format(code), auto_free=False)
        for k in keys:
            RedisUtils.delete(redis_l2, k, auto_free=False)
    redis_l2.delete("l2-data-latest-{}".format(code))
        RedisUtils.delete(redis_l2, "l2-data-latest-{}".format(code), auto_free=False)
    finally:
        RedisUtils.realse(redis_l2)
second_930 = 9 * 3600 + 30 * 60 + 0
#  初始化l2固定代码库
def init_l2_fixed_codes():
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    count = redis.scard(key)
    if count > 0:
        redis.delete(key)
    redis.sadd(key, "000000")
    redis.expire(key, tool.get_expire())
    try:
        count = RedisUtils.scard(redis, key, auto_free=False)
        if count > 0:
            RedisUtils.delete(redis, key, auto_free=False)
        RedisUtils.sadd(redis, key, "000000", auto_free=False)
        RedisUtils.expire(redis, key, tool.get_expire(), auto_free=False)
    finally:
        RedisUtils.realse(redis)
# 移除l2固定监控代码
def remove_from_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    redis.srem(key, code)
    RedisUtils.srem(_redisManager.getRedis(), key, code)
# 添加代码到L2固定监控
def add_to_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    redis.sadd(key, code)
    redis.expire(key, tool.get_expire())
    RedisUtils.sadd(_redisManager.getRedis(), key, code)
    RedisUtils.expire(_redisManager.getRedis(), key, tool.get_expire())
# 是否在l2固定监控代码中
def is_in_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    return redis.sismember(key, code)
    return RedisUtils.sismember(_redisManager.getRedis(), key, code)
if __name__ == "__main__":
    clear_l2_data("603912")
    code = "002886"
    TradePointManager().set_buy_compute_start_data_v2(code, OrderBeginPosInfo(buy_single_index=10,
                                                                              buy_exec_index=30,
                                                                              buy_compute_index=40,
                                                                              num=20000, count=10,
                                                                              buy_volume_rate=0.6,
                                                                              mode=OrderBeginPosInfo.MODE_NORMAL,
                                                                              ))
    print(TradePointManager().get_buy_compute_start_data_cache(code).max_num_set)
    RedisUtils.run_loop()