Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
trade/trade_queue_manager.py
@@ -2,54 +2,79 @@
import decimal
import json
import gpcode_manager
from db import redis_manager
import tool
from trade import trade_manager
from code_attribute import gpcode_manager
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from utils import tool
from trade import trade_manager, trade_constant
class THSBuy1VolumnManager:
    __db = 1
    __redisManager = redis_manager.RedisManager(1)
    __last_data = {}
    __code_time_volumn_dict = {}
    __max_buy1_volumn_cache = {}
    __instance = None
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(THSBuy1VolumnManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "max_buy1_volumn-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__max_buy1_volumn_cache, code, int(val))
        finally:
            RedisUtils.realse(__redis)
    # 保存最大量
    def __save_max_buy1_volume(self, code, volume):
        tool.CodeDataCacheUtil.set_cache(self.__max_buy1_volumn_cache, code, volume)
        key = "max_buy1_volumn-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), volume)
        RedisUtils.setex_async(self.__db, key, tool.get_expire(), volume)
    def __get_max_buy1_volume(self, code):
        key = "max_buy1_volumn-{}".format(code)
        val = self.__get_redis().get(key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is not None:
            return int(val)
        return None
    def __del_max_buy1_volume(self, code):
        tool.CodeDataCacheUtil.clear_cache(self.__max_buy1_volumn_cache, code)
        key = "max_buy1_volumn-{}".format(code)
        val = self.__get_redis().delete(key)
        RedisUtils.delete_async(self.__db, key)
    def __save_recod(self, code, time_str, volumn):
        # 保存每一次的
        key = "buy1_volumn-{}-{}".format(code, time_str)
        self.__get_redis().setex(key, tool.get_expire(), volumn)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), volumn)
        # 保存最近的
        key = "buy1_volumn_latest_info-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), json.dumps((time_str, volumn)))
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((time_str, volumn)))
    # 保存上一次数据
    def __save_last_recod(self, code, time_str, volumn):
        # 保存最近的
        key = "buy1_volumn_last_info-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), json.dumps((time_str, volumn)))
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps((time_str, volumn)))
    def __get_last_record(self, code):
        key = "buy1_volumn_last_info-{}".format(code)
        val = self.__get_redis().get(key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, None
        val = json.loads(val)
@@ -57,7 +82,7 @@
    def __get_latest_record(self, code):
        key = "buy1_volumn_latest_info-{}".format(code)
        val = self.__get_redis().get(key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, None
        val = json.loads(val)
@@ -66,17 +91,17 @@
    # 添加记录
    def __add_recod(self, code):
        key = "buy1_volumn_codes"
        self.__get_redis().sadd(key, code)
        self.__get_redis().expire(key, 10)
        RedisUtils.sadd(self.__get_redis(), key, code)
        RedisUtils.expire(self.__get_redis(), key, 10)
    # 获取当前正在监听的代码
    def get_current_codes(self):
        key = "buy1_volumn_codes"
        return self.__get_redis().smembers(key)
        return RedisUtils.smembers(self.__get_redis(), key)
    def get_buy_1_volumn(self, code, time_str):
        key = "buy1_volumn-{}-{}".format(code, time_str)
        return self.__get_redis().get(key)
        return RedisUtils.get(self.__get_redis(), key)
    # 返回是否需要更新数据,是否需要撤单,撤单原因
    def save(self, code, time_str, volumn, price):
@@ -125,8 +150,8 @@
        self.__save_recod(code, time_str, volumn)
        # 如果当前已挂单
        state = trade_manager.get_trade_state(code)
        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
        state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
        if state == trade_constant.TRADE_STATE_BUY_DELEGATED or state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER:
            # 判断本次与上一次的封单额是否小于5000w
            limit_up_price = gpcode_manager.get_limit_up_price(code)
            threshold_num = 50000000 // (limit_up_price * 100)
@@ -159,9 +184,14 @@
            return -1
        return val
    def get_max_buy1_volume_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__max_buy1_volumn_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return -1
    def clear_max_buy1_volume(self, code):
        self.__del_max_buy1_volume(code)
class JueJinBuy1VolumnManager:
@@ -174,13 +204,13 @@
    def __save_recod(self, code, time_str, volumn):
        # 保存每一次的
        key = "buy1_volumn_juejin-{}-{}".format(code, time_str)
        self.__get_redis().setex(key, tool.get_expire(), volumn)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), volumn)
        key = "buy1_volumn_juejin_latest_info-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), volumn)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), volumn)
    def __get_latest_record(self, code):
        key = "buy1_volumn_juejin_latest_info-{}".format(code)
        val = self.__get_redis().get(key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None, None
        val = json.loads(val)
@@ -208,8 +238,6 @@
        return time_str, volumn
class thsl2tradequeuemanager:
    __redisManager = redis_manager.RedisManager(0)
    __filter_dict = {}
@@ -220,11 +248,11 @@
    def __save_latest_recod(self, code, info):
        # 保存每一次的
        key = "ths_l2_latest_trade_info-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), json.dumps(info))
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), json.dumps(info))
    def __get_latest_record(self, code):
        key = "ths_l2_latest_trade_info-{}".format(code)
        val = self.__get_redis().get(key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None
        return json.loads(val)
@@ -233,14 +261,14 @@
    def __add_buy1_code(self, code):
        key = "buy1_volumn_codes"
        self.__get_redis().sadd(key, code)
        self.__get_redis().expire(key, 10)
        RedisUtils.sadd(self.__get_redis(), key, code)
        RedisUtils.expire(self.__get_redis(), key, 10)
        # 获取当前正在监听的代码
    def get_current_codes(self):
        key = "buy1_volumn_codes"
        return self.__get_redis().smembers(key)
        return RedisUtils.smembers(self.__get_redis(), key)
    def save_recod(self, code, data):
        _str = json.dumps(data)