Administrator
2025-05-26 a846b46f15ad309a62fe400cf78dd7fc888155d7
l2/l2_data_manager.py
@@ -5,12 +5,61 @@
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from log_module.log import logger_debug
from utils import tool
from utils.tool import CodeDataCacheUtil
_db = 1
_redisManager = redis_manager.RedisManager(1)
# 下单临时信息
class OrderBeginPosInfo(object):
    MODE_NORMAL = 0  # 普通下单
    MODE_FAST = 1
    MODE_ACTIVE = 2  # 积极动买
    MODE_RADICAL = 3  # 扫入
    MODE_OPEN_LIMIT_UP = 4  # 排1
    # mode: 0-普通交易  1-快速交易
    def __init__(self, buy_single_index=None, buy_exec_index=-1, buy_compute_index=None, num=0, count=0,
                 max_num_set=None, buy_volume_rate=None, sell_info=None, threshold_money=None, mode=0, mode_desc=None,
                 at_limit_up=False, first_limit_up_buy=False, min_order_no = None):
        self.buy_single_index = buy_single_index
        self.buy_exec_index = buy_exec_index
        self.buy_compute_index = buy_compute_index
        self.num = num
        self.count = count
        self.threshold_money = threshold_money
        if max_num_set:
            self.max_num_set = list(max_num_set)
        else:
            self.max_num_set = []
        self.buy_volume_rate = buy_volume_rate
        self.sell_info = sell_info
        self.mode = mode
        self.mode_desc = mode_desc
        # 是否是板上买
        self.at_limit_up = at_limit_up
        # 是否为首封买
        self.first_limit_up_buy = first_limit_up_buy
        # 统计批次大单成交的最小订单号
        self.min_order_no = min_order_no
    def get_max_num_set(self):
        if self.max_num_set:
            return set(self.max_num_set)
        return None
    def to_json_str(self):
        return json.dumps(vars(self))
    def to_dict(self):
        return vars(self)
    @classmethod
    def to_object(cls, json_str: str):
        d = json.loads(json_str)
        return OrderBeginPosInfo(**d)
class L2DataException(Exception):
@@ -38,6 +87,8 @@
    __buy_compute_index_info_cache = {}
    __buy_cancel_single_pos_cache = {}
    __instance = None
    # 最近的下单模式
    __latest_place_order_mode_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
@@ -57,8 +108,8 @@
        for k in keys:
            code = k.split("-")[-1]
            val = RedisUtils.get(redis_, k)
            val = json.loads(val)
            CodeDataCacheUtil.set_cache(cls.__buy_compute_index_info_cache, code, val)
            order = OrderBeginPosInfo.to_object(val)
            CodeDataCacheUtil.set_cache(cls.__buy_compute_index_info_cache, code, order)
        keys = RedisUtils.keys(redis_, "buy_cancel_single_pos-*")
        for k in keys:
@@ -67,14 +118,12 @@
            CodeDataCacheUtil.set_cache(cls.__buy_cancel_single_pos_cache, code, int(val))
    # 删除买入点数据
    def delete_buy_point(self, code):
        CodeDataCacheUtil.clear_cache(self.__buy_compute_index_info_cache, code)
        RedisUtils.delete_async(self.__db, "buy_compute_index_info-{}".format(code))
    # 获取买入点信息
    # 返回数据为:买入点 累计纯买额 已经计算的数据索引
    def get_buy_compute_start_data(self, code):
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = RedisUtils.get(self.__get_redis(), _key)
@@ -83,11 +132,12 @@
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5], _data[6]
    def get_buy_compute_start_data_cache(self, code):
    def get_buy_compute_start_data_cache(self, code) -> OrderBeginPosInfo:
        cache_result = CodeDataCacheUtil.get_cache(self.__buy_compute_index_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None, None, None, 0, 0, [], 0
        return OrderBeginPosInfo()
    # 设置买入点的值
    # buy_single_index 买入信号位
@@ -95,51 +145,36 @@
    # compute_index 计算位置
    # nums 累计纯买额
    def set_buy_compute_start_data(self, code, buy_single_index, buy_exec_index, compute_index, nums, count,
                                   max_num_sets,
                                   volume_rate):
    def set_buy_compute_start_data_v2(self, code, order: OrderBeginPosInfo):
        if order.mode is not None:
            self.__latest_place_order_mode_cache[code] = order.mode
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
        data_ = None
        if buy_single_index is not None:
            data_ = (buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets),
                     volume_rate)
        if order.buy_single_index is not None:
            data_ = order
        else:
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index, _volume_rate = self.get_buy_compute_start_data_cache(
                code)
            data_ = (_buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets),
                     volume_rate)
            old_order = self.get_buy_compute_start_data_cache(code)
            order.buy_single_index = old_order.buy_single_index
            data_ = order
        CodeDataCacheUtil.set_cache(self.__buy_compute_index_info_cache, code, data_)
        RedisUtils.setex_async(self.__db, _key, expire, json.dumps(data_))
        RedisUtils.setex_async(self.__db, _key, expire, data_.to_json_str())
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
    # 是否已经下单
    @classmethod
    def is_placed_order(cls, order_begin_pos: OrderBeginPosInfo):
        return order_begin_pos and order_begin_pos.buy_exec_index is not None and order_begin_pos.buy_exec_index > -1
    def get_buy_cancel_single_pos(self, code):
        info = RedisUtils.get(self.__get_redis(), "buy_cancel_single_pos-{}".format(code))
        if info is None:
            return None
        else:
            return int(info)
    def get_buy_cancel_single_pos_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy_cancel_single_pos_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    # 设置买撤点信息
    # buy_num 纯买额  computed_index计算到的下标  index撤买信号起点
    def set_buy_cancel_single_pos(self, code, index):
        tool.CodeDataCacheUtil.set_cache(self.__buy_cancel_single_pos_cache, code, index)
        expire = tool.get_expire()
        RedisUtils.setex_async(self.__db, "buy_cancel_single_pos-{}".format(code), expire, index)
    # 删除买撤点数据
    def delete_buy_cancel_point(self, code):
        tool.CodeDataCacheUtil.clear_cache(self.__buy_cancel_single_pos_cache, code)
        RedisUtils.delete_async(self.__db, "buy_cancel_single_pos-{}".format(code))
    @classmethod
    def get_latest_place_order_mode(cls, code):
        """
        获取最近下单的模式
        @param code:
        @return:
        """
        return cls.__latest_place_order_mode_cache.get(code)
# 清除l2数据
@@ -192,4 +227,13 @@
if __name__ == "__main__":
    TradePointManager().get_buy_compute_start_data_cache("603912")
    code = "002886"
    TradePointManager().set_buy_compute_start_data_v2(code, OrderBeginPosInfo(buy_single_index=10,
                                                                              buy_exec_index=30,
                                                                              buy_compute_index=40,
                                                                              num=20000, count=10,
                                                                              buy_volume_rate=0.6,
                                                                              mode=OrderBeginPosInfo.MODE_NORMAL,
                                                                              ))
    print(TradePointManager().get_buy_compute_start_data_cache(code).max_num_set)
    RedisUtils.run_loop()