Administrator
6 天以前 365491c1fcf523994035e4bd28d8b5872dd6ec98
l2/safe_count_manager.py
@@ -4,31 +4,59 @@
# 下单L2的安全笔数管理
import json
from db.redis_manager import RedisUtils
from db.redis_manager_delegate import RedisUtils
from l2 import l2_data_source_util
from trade import l2_trade_factor
from db import redis_manager
from db import redis_manager_delegate as redis_manager
from utils import tool
from l2.l2_data_util import L2DataUtil
latest_place_order_info_cache = {}
safe_count_l2_cache = {}
from l2.l2_data_util import L2DataUtil, local_today_buyno_map
class BuyL2SafeCountManager(object):
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __instance = None
    latest_place_order_info_cache = {}
    safe_count_l2_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(BuyL2SafeCountManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    def __init__(self):
        self.last_buy_queue_data = {}
    def __getRedis(self):
        return self.__redis_manager.getRedis()
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "safe_count_l2-*")
            for k in keys:
                ks = k.split("-")
                code, last_buy_single_index = ks[1], int(ks[2])
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.safe_count_l2_cache, f"{code}-{last_buy_single_index}", val)
            keys = RedisUtils.keys(__redis, "latest_place_order_info-*")
            for k in keys:
                ks = k.split("-")
                code = ks[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.latest_place_order_info_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 记录每一次的处理进度
    def __save_compute_progress(self, code, last_buy_single_index, process_index, buy_num, cancel_num):
        key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
        tool.CodeDataCacheUtil.set_cache(safe_count_l2_cache, f"{code}-{last_buy_single_index}",
        tool.CodeDataCacheUtil.set_cache(self.safe_count_l2_cache, f"{code}-{last_buy_single_index}",
                                         (last_buy_single_index, process_index, buy_num, cancel_num))
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((last_buy_single_index, process_index, buy_num, cancel_num)))
@@ -36,61 +64,57 @@
    # 返回数据与更新时间
    def __get_compute_progress(self, code, last_buy_single_index):
        key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
        val = RedisUtils.get(self.__getRedis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, -1, 0, 0
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    def __get_compute_progress_cache(self, code, last_buy_single_index):
        cache_result = tool.CodeDataCacheUtil.get_cache(safe_count_l2_cache, f"{code}-{last_buy_single_index}")
        cache_result = tool.CodeDataCacheUtil.get_cache(self.safe_count_l2_cache, f"{code}-{last_buy_single_index}")
        if cache_result[0]:
            return cache_result[1]
        val = self.__get_compute_progress(code, last_buy_single_index)
        tool.CodeDataCacheUtil.set_cache(safe_count_l2_cache, f"{code}-{last_buy_single_index}", val)
        return val
        return None, -1, 0, 0
    # 保存最近的下单信息
    def __save_latest_place_order_info(self, code, buy_single_index, buy_exec_index, cancel_index):
        tool.CodeDataCacheUtil.set_cache(latest_place_order_info_cache, code,
        tool.CodeDataCacheUtil.set_cache(self.latest_place_order_info_cache, code,
                                         (buy_single_index, buy_exec_index, cancel_index))
        key = "latest_place_order_info-{}".format(code)
        RedisUtils.setex(self.__getRedis(), key, tool.get_expire(),
                         json.dumps((buy_single_index, buy_exec_index, cancel_index)))
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((buy_single_index, buy_exec_index, cancel_index)))
    def __get_latest_place_order_info(self, code):
        key = "latest_place_order_info-{}".format(code)
        val = RedisUtils.get(self.__getRedis(), key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2]
    def __get_latest_place_order_info_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(latest_place_order_info_cache, code)
        cache_result = tool.CodeDataCacheUtil.get_cache(self.latest_place_order_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = self.__get_latest_place_order_info(code)
        tool.CodeDataCacheUtil.set_cache(latest_place_order_info_cache, code, val)
        return val
        return None, None, None
    def __get_all_compute_progress(self, code):
        key_regex = f"safe_count_l2-{code}-*"
        keys = RedisUtils.keys(self.__getRedis(), key_regex)
        keys = RedisUtils.keys(self.__get_redis(), key_regex)
        vals = []
        for k in keys:
            val = RedisUtils.get(self.__getRedis(), k)
            val = RedisUtils.get(self.__get_redis(), k)
            val = json.loads(val)
            vals.append(val)
        return vals
    def clear_data(self, code):
        key_regex = f"safe_count_l2-{code}-*"
        keys = RedisUtils.keys(self.__getRedis(), key_regex)
        keys = RedisUtils.keys(self.__get_redis(), key_regex)
        for k in keys:
            RedisUtils.delete(self.__getRedis(), k)
            RedisUtils.delete(self.__get_redis(), k)
        tool.CodeDataCacheUtil.clear_cache(latest_place_order_info_cache, code)
        tool.CodeDataCacheUtil.clear_cache(self.latest_place_order_info_cache, code)
        key = f"latest_place_order_info-{code}"
        RedisUtils.delete_async(self.__db, key)
@@ -158,8 +182,7 @@
                buy_num += int(val["num"]) * data["re"]
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 获取买入信息
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                 local_today_num_operate_map)
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data, local_today_buyno_map.get(code))
                if buy_index is not None:
                    if last_buy_single_index <= buy_index <= end_index:
                        cancel_num += int(val["num"]) * data["re"]