Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
l2/safe_count_manager.py
@@ -4,87 +4,141 @@
# 下单L2的安全笔数管理
import json
from db.redis_manager_delegate import RedisUtils
from l2 import l2_data_source_util
from trade import l2_trade_factor
from db import redis_manager
import tool
from l2.l2_data_util import L2DataUtil
import l2_data_util
from db import redis_manager_delegate as redis_manager
from utils import tool
from l2.l2_data_util import L2DataUtil, local_today_buyno_map
class BuyL2SafeCountManager(object):
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __instance = None
    latest_place_order_info_cache = {}
    safe_count_l2_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(BuyL2SafeCountManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    def __init__(self):
        self.last_buy_queue_data = {}
    def __getRedis(self):
        return self.__redis_manager.getRedis()
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "safe_count_l2-*")
            for k in keys:
                ks = k.split("-")
                code, last_buy_single_index = ks[1], int(ks[2])
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.safe_count_l2_cache, f"{code}-{last_buy_single_index}", val)
            keys = RedisUtils.keys(__redis, "latest_place_order_info-*")
            for k in keys:
                ks = k.split("-")
                code = ks[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.latest_place_order_info_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 记录每一次的处理进度
    def __save_compute_progress(self, code, last_buy_single_index, process_index, buy_num, cancel_num):
        key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
        self.__getRedis().setex(key, tool.get_expire(),
                                json.dumps((last_buy_single_index, process_index, buy_num, cancel_num)))
        tool.CodeDataCacheUtil.set_cache(self.safe_count_l2_cache, f"{code}-{last_buy_single_index}",
                                         (last_buy_single_index, process_index, buy_num, cancel_num))
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((last_buy_single_index, process_index, buy_num, cancel_num)))
    # 返回数据与更新时间
    def __get_compute_progress(self, code, last_buy_single_index):
        key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
        val = self.__getRedis().get(key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, -1, 0, 0
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    def __get_compute_progress_cache(self, code, last_buy_single_index):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.safe_count_l2_cache, f"{code}-{last_buy_single_index}")
        if cache_result[0]:
            return cache_result[1]
        return None, -1, 0, 0
    # 保存最近的下单信息
    def __save_latest_place_order_info(self, code, buy_single_index, buy_exec_index, cancel_index):
        tool.CodeDataCacheUtil.set_cache(self.latest_place_order_info_cache, code,
                                         (buy_single_index, buy_exec_index, cancel_index))
        key = "latest_place_order_info-{}".format(code)
        self.__getRedis().setex(key, tool.get_expire(), json.dumps((buy_single_index, buy_exec_index, cancel_index)))
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((buy_single_index, buy_exec_index, cancel_index)))
    def __get_latest_place_order_info(self, code):
        key = "latest_place_order_info-{}".format(code)
        val = self.__getRedis().get(key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2]
    def __get_latest_place_order_info_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.latest_place_order_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None, None, None
    def __get_all_compute_progress(self, code):
        key_regex = f"safe_count_l2-{code}-*"
        keys = self.__getRedis().keys(key_regex)
        keys = RedisUtils.keys(self.__get_redis(), key_regex)
        vals = []
        for k in keys:
            val = self.__getRedis().get(k)
            val = RedisUtils.get(self.__get_redis(), k)
            val = json.loads(val)
            vals.append(val)
        return vals
    def clear_data(self, code):
        key_regex = f"safe_count_l2-{code}-*"
        keys = self.__getRedis().keys(key_regex)
        keys = RedisUtils.keys(self.__get_redis(), key_regex)
        for k in keys:
            self.__getRedis().delete(k)
            RedisUtils.delete(self.__get_redis(), k)
        tool.CodeDataCacheUtil.clear_cache(self.latest_place_order_info_cache, code)
        key = f"latest_place_order_info-{code}"
        self.__getRedis().delete(key)
        RedisUtils.delete_async(self.__db, key)
    # 获取基础的安全笔数
    def __get_base_save_count(self, code):
        return l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code)
    def __get_base_save_count(self, code, is_first):
        return l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count(code, is_first)
    # 获取最后的安全笔数
    def get_safe_count(self, code):
        rate = self.__get_rate(code)
        print("--------------------------------")
        print("安全笔数比例:", rate)
        print("--------------------------------")
        count = self.__get_base_save_count(code)
        count = round(count * rate)
        if count < 8:
            count = 8
        if count > 21:
            count = 21
        return count
    def get_safe_count(self, code, is_first_code, rate):
        # rate = self.__get_rate(code)
        count, min_count, max_count = self.__get_base_save_count(code, is_first_code)
        # 第4次下单按第一次算
        # if place_order_count and place_order_count >= 3:
        #     rate = 1
        # print("--------------------------------")
        # print("安全笔数比例:", rate)
        # print("--------------------------------")
        # count, min_count, max_count = self.__get_base_save_count(code, is_first_code)
        # count = round(count * rate)
        # if count < min_count:
        #     count = min_count
        # if count > max_count:
        #     count = max_count
        return int(round(count * (1 + rate), 0))
    # 计算留下来的比例
    # last_buy_single_index 上一次下单信号起始位置
@@ -93,15 +147,15 @@
    # end_index 数据结束位置
    def compute_left_rate(self, code, start_index, end_index, total_datas,
                          local_today_num_operate_map):
        last_buy_single_index, buy_exec_index, cancel_index = self.__get_latest_place_order_info(code)
        last_buy_single_index, buy_exec_index, cancel_index = self.__get_latest_place_order_info_cache(code)
        if last_buy_single_index is None:
            return
        cancel_time = None
        if cancel_index is not None:
            cancel_time = total_datas[cancel_index]["val"]["time"]
        # 获取处理的进度
        last_buy_single_index_, process_index, buy_num, cancel_num = self.__get_compute_progress(code,
                                                                                                 last_buy_single_index)
        last_buy_single_index_, process_index, buy_num, cancel_num = self.__get_compute_progress_cache(code,
                                                                                                       last_buy_single_index)
        break_index = -1
        for i in range(start_index, end_index):
@@ -128,8 +182,7 @@
                buy_num += int(val["num"]) * data["re"]
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 获取买入信息
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, data,
                                                                                                 local_today_num_operate_map)
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(data, local_today_buyno_map.get(code))
                if buy_index is not None:
                    if last_buy_single_index <= buy_index <= end_index:
                        cancel_num += int(val["num"]) * data["re"]