Administrator
2023-08-08 ba2f7b998e5e3f6223c11e804c7922a8070426a0
单例+缓存优化
5个文件已修改
228 ■■■■■ 已修改文件
l2/l2_data_manager_new.py 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/safe_count_manager.py 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/transaction_progress.py 48 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_data_manager.py 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py
@@ -69,11 +69,33 @@
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(L2BigNumForMProcessor, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls._redis_manager.getRedis()
    @classmethod
    def __load_datas(cls):
        _redis = cls._redis_manager.getRedis()
        try:
            keys = RedisUtils.keys(_redis, "m_big_money_begin-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(_redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.m_big_money_begin_cache, code, int(val))
            keys = RedisUtils.keys(_redis, "m_big_money_process_index-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(_redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.m_big_money_process_index_cache, code, int(val))
        finally:
            RedisUtils.realse(_redis)
    # 保存计算开始位置
    def set_begin_pos(self, code, index):
@@ -95,9 +117,7 @@
        cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_begin_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = self.__get_begin_pos(code)
        tool.CodeDataCacheUtil.set_cache(self.m_big_money_begin_cache, code, val)
        return val
        return None
    # 清除已经处理的数据
    def clear_processed_end_index(self, code):
@@ -123,9 +143,7 @@
        cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_process_index_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = self.__get_processed_end_index(code)
        tool.CodeDataCacheUtil.set_cache(self.m_big_money_process_index_cache, code, val)
        return val
        return None
    # 处理大单
    def process(self, code, start_index, end_index, limit_up_price):
@@ -1184,8 +1202,9 @@
    @classmethod
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set,
                                volume_rate):
        TradePointManager().set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num, count,
                                                     max_num_set, volume_rate)
        TradePointManager().set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num,
                                                       count,
                                                       max_num_set, volume_rate)
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
l2/safe_count_manager.py
@@ -11,24 +11,52 @@
from utils import tool
from l2.l2_data_util import L2DataUtil
latest_place_order_info_cache = {}
safe_count_l2_cache = {}
class BuyL2SafeCountManager(object):
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __instance = None
    latest_place_order_info_cache = {}
    safe_count_l2_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(BuyL2SafeCountManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    def __init__(self):
        self.last_buy_queue_data = {}
    def __getRedis(self):
        return self.__redis_manager.getRedis()
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "safe_count_l2-*")
            for k in keys:
                ks = k.split("-")
                code, last_buy_single_index = ks[1], int(ks[2])
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.safe_count_l2_cache, f"{code}-{last_buy_single_index}", val)
            keys = RedisUtils.keys(__redis, "latest_place_order_info-*")
            for k in keys:
                ks = k.split("-")
                code = ks[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.latest_place_order_info_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 记录每一次的处理进度
    def __save_compute_progress(self, code, last_buy_single_index, process_index, buy_num, cancel_num):
        key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
        tool.CodeDataCacheUtil.set_cache(safe_count_l2_cache, f"{code}-{last_buy_single_index}",
        tool.CodeDataCacheUtil.set_cache(self.safe_count_l2_cache, f"{code}-{last_buy_single_index}",
                                         (last_buy_single_index, process_index, buy_num, cancel_num))
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((last_buy_single_index, process_index, buy_num, cancel_num)))
@@ -36,61 +64,57 @@
    # 返回数据与更新时间
    def __get_compute_progress(self, code, last_buy_single_index):
        key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
        val = RedisUtils.get(self.__getRedis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, -1, 0, 0
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    def __get_compute_progress_cache(self, code, last_buy_single_index):
        cache_result = tool.CodeDataCacheUtil.get_cache(safe_count_l2_cache, f"{code}-{last_buy_single_index}")
        cache_result = tool.CodeDataCacheUtil.get_cache(self.safe_count_l2_cache, f"{code}-{last_buy_single_index}")
        if cache_result[0]:
            return cache_result[1]
        val = self.__get_compute_progress(code, last_buy_single_index)
        tool.CodeDataCacheUtil.set_cache(safe_count_l2_cache, f"{code}-{last_buy_single_index}", val)
        return val
        return None, -1, 0, 0
    # 保存最近的下单信息
    def __save_latest_place_order_info(self, code, buy_single_index, buy_exec_index, cancel_index):
        tool.CodeDataCacheUtil.set_cache(latest_place_order_info_cache, code,
        tool.CodeDataCacheUtil.set_cache(self.latest_place_order_info_cache, code,
                                         (buy_single_index, buy_exec_index, cancel_index))
        key = "latest_place_order_info-{}".format(code)
        RedisUtils.setex(self.__getRedis(), key, tool.get_expire(),
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(),
                         json.dumps((buy_single_index, buy_exec_index, cancel_index)))
    def __get_latest_place_order_info(self, code):
        key = "latest_place_order_info-{}".format(code)
        val = RedisUtils.get(self.__getRedis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2]
    def __get_latest_place_order_info_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(latest_place_order_info_cache, code)
        cache_result = tool.CodeDataCacheUtil.get_cache(self.latest_place_order_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = self.__get_latest_place_order_info(code)
        tool.CodeDataCacheUtil.set_cache(latest_place_order_info_cache, code, val)
        return val
        return None, None, None
    def __get_all_compute_progress(self, code):
        key_regex = f"safe_count_l2-{code}-*"
        keys = RedisUtils.keys(self.__getRedis(), key_regex)
        keys = RedisUtils.keys(self.__get_redis(), key_regex)
        vals = []
        for k in keys:
            val = RedisUtils.get(self.__getRedis(), k)
            val = RedisUtils.get(self.__get_redis(), k)
            val = json.loads(val)
            vals.append(val)
        return vals
    def clear_data(self, code):
        key_regex = f"safe_count_l2-{code}-*"
        keys = RedisUtils.keys(self.__getRedis(), key_regex)
        keys = RedisUtils.keys(self.__get_redis(), key_regex)
        for k in keys:
            RedisUtils.delete(self.__getRedis(), k)
            RedisUtils.delete(self.__get_redis(), k)
        tool.CodeDataCacheUtil.clear_cache(latest_place_order_info_cache, code)
        tool.CodeDataCacheUtil.clear_cache(self.latest_place_order_info_cache, code)
        key = f"latest_place_order_info-{code}"
        RedisUtils.delete_async(self.__db, key)
l2/transaction_progress.py
@@ -13,63 +13,83 @@
import l2.l2_data_util
from log_module.log import logger_l2_trade_buy_queue, logger_l2_trade_buy_progress
buy_progress_index_cache = {}
class TradeBuyQueue:
    buy_progress_index_cache = {}
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradeBuyQueue, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    def __init__(self):
        self.last_buy_queue_data = {}
    def __getRedis(self):
        return self.__redis_manager.getRedis()
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "trade_buy_progress_index-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.buy_progress_index_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    def __save_buy_queue_data(self, code, num_list):
        key = "trade_buy_queue_data-{}".format(code)
        RedisUtils.setex(self.__getRedis(), key, tool.get_expire(), json.dumps((num_list, tool.get_now_time_str())))
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((num_list, tool.get_now_time_str())))
    # 返回数据与更新时间
    def __get_buy_queue_data(self, code):
        key = "trade_buy_queue_data-{}".format(code)
        val = RedisUtils.get(self.__getRedis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], [1]
    def __save_buy_progress_index(self, code, index, is_default):
        tool.CodeDataCacheUtil.set_cache(buy_progress_index_cache, code, (index, is_default))
        tool.CodeDataCacheUtil.set_cache(self.buy_progress_index_cache, code, (index, is_default))
        key = "trade_buy_progress_index-{}".format(code)
        RedisUtils.setex_async(self.__db, key, tool.get_expire(), json.dumps((index, is_default)))
        # 返回数据与更新时间
    def __get_buy_progress_index(self, code):
        key = "trade_buy_progress_index-{}".format(code)
        val = RedisUtils.get(self.__getRedis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, True
        val = json.loads(val)
        return int(val[0]), bool(val[1])
    def __get_buy_progress_index_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(buy_progress_index_cache, code)
        cache_result = tool.CodeDataCacheUtil.get_cache(self.buy_progress_index_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = self.__get_buy_progress_index(code)
        tool.CodeDataCacheUtil.set_cache(buy_progress_index_cache, code, val)
        return val
        return None, True
    # 最近的非涨停买1的时间
    def __save_latest_not_limit_up_time(self, code, time_str):
        key = "latest_not_limit_up_time-{}".format(code)
        RedisUtils.setex(self.__getRedis(), key, tool.get_expire(), time_str)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), time_str)
    def __get_latest_not_limit_up_time(self, code):
        key = "latest_not_limit_up_time-{}".format(code)
        if not constant.TEST:
            return RedisUtils.get(self.__getRedis(), key)
            return RedisUtils.get(self.__get_redis(), key)
        return None
    # 保存数据,返回保存数据的条数
trade/trade_data_manager.py
@@ -155,22 +155,40 @@
# 代码实时价格管理器
class CodeActualPriceProcessor:
    __under_water_last_time_cache = {}
    __code_current_rate_cache = {}
    __code_current_rate_latest = {}
    __db = 0
    __redisManager = redis_manager.RedisManager(0)
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodeActualPriceProcessor, cls).__new__(cls, *args, **kwargs)
            # 初始化设置
            # 获取交易窗口的锁
            cls.__instance.__redisManager = redis_manager.RedisManager(0)
            cls.__instance.__under_water_last_time_cache = {}
            cls.__instance.__code_current_rate_cache = {}
            cls.__instance.__code_current_rate_latest = {}
            cls.__load_datas()
        return cls.__instance
    def __get_redis(self):
        return self.__redisManager.getRedis()
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "under_water_last_time-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__under_water_last_time_cache, code, val)
            keys = RedisUtils.keys(__redis, "code_current_rate-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__code_current_rate_cache, code, float(val))
        finally:
            RedisUtils.realse(__redis)
    # 保存跌价的时间
    def __save_down_price_time(self, code, time_str):
@@ -191,9 +209,7 @@
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__under_water_last_time_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = self.__get_last_down_price_time(code)
        tool.CodeDataCacheUtil.set_cache(self.__under_water_last_time_cache, code, val)
        return val
        return None
    def __increment_down_price_time(self, code, seconds):
        key = "under_water_seconds-{}".format(code)
@@ -234,7 +250,7 @@
        self.__code_current_rate_latest[code] = rate
        tool.CodeDataCacheUtil.set_cache(self.__code_current_rate_cache, code, rate)
        key = "code_current_rate-{}".format(code)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), rate)
        RedisUtils.setex_async(self.__db, key, tool.get_expire(), rate)
    # 批量保存
    def __save_current_rates(self, datas):
@@ -261,9 +277,7 @@
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__code_current_rate_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = self.__get_current_rate(code)
        tool.CodeDataCacheUtil.set_cache(self.__code_current_rate_cache, code, val)
        return val
        return None
    def process_rate(self, code, rate, time_str):
        # 保存目前的代码涨幅
@@ -372,7 +386,7 @@
class PlaceOrderCountManager:
    __db = 0
    __redisManager = redis_manager.RedisManager(0)
    __place_order_count_cache={}
    __place_order_count_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
@@ -414,7 +428,7 @@
        return 0
    def __get_place_order_count_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__place_order_count_cache,code)
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__place_order_count_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return 0
trade/trade_manager.py
@@ -183,19 +183,26 @@
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodesTradeStateManager, cls).__new__(cls, *args, **kwargs)
            __redis = cls.__get_redis()
            # 初始化数据
            keys = RedisUtils.keys(__redis, "trade-state-*", auto_free=False)
            if keys:
                for key in keys:
                    code = key.replace("trade-state-", '')
                    cls.__trade_state_cache[code] = int(RedisUtils.get(__redis, key, auto_free=False))
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            # 初始化数据
            keys = RedisUtils.keys(__redis, "trade-state-*", auto_free=False)
            if keys:
                for key in keys:
                    code = key.replace("trade-state-", '')
                    cls.__trade_state_cache[code] = int(RedisUtils.get(__redis, key, auto_free=False))
        finally:
            RedisUtils.realse(__redis)
    # 获取交易状态
    def get_trade_state(self, code):
@@ -208,9 +215,7 @@
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__trade_state_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = self.get_trade_state(code)
        tool.CodeDataCacheUtil.set_cache(self.__trade_state_cache, code, val)
        return val
        return TRADE_STATE_NOT_TRADE
    # 设置交易状态
    def set_trade_state(self, code, state):