Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
trade/trade_data_manager.py
@@ -7,10 +7,11 @@
# 交易撤销数据管理器
import constant
from db.redis_manager import RedisUtils
from db.mysql_data_delegate import Mysqldb
from db.redis_manager_delegate import RedisUtils
from utils import global_util, tool
import l2_data_util
from db import redis_manager
from db import redis_manager_delegate as redis_manager
from log_module.log import logger_trade
@@ -52,77 +53,107 @@
class TradeBuyDataManager:
    __db = 0
    redisManager = redis_manager.RedisManager(0)
    buy_sure_position_dict = {}
    __buy_position_info_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradeBuyDataManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "buy_position_info-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.__buy_position_info_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 设置买入点的信息
    # trade_time: 买入点截图时间与下单提交时间差值
    # capture_time: 买入点截图时间
    # last_data: 买入点最后一条数据
    @classmethod
    def set_buy_position_info(cls, code, capture_time, trade_time, last_data, last_data_index):
        redis = cls.redisManager.getRedis()
        RedisUtils.setex(redis, "buy_position_info-{}".format(code), tool.get_expire(),
                         json.dumps((capture_time, trade_time, last_data, last_data_index)))
    def set_buy_position_info(self, code, capture_time, trade_time, last_data, last_data_index):
        val = (capture_time, trade_time, last_data, last_data_index)
        tool.CodeDataCacheUtil.set_cache(self.__buy_position_info_cache, code, val)
        RedisUtils.setex_async(self.__db, "buy_position_info-{}".format(code), tool.get_expire(),
                               json.dumps(val))
    # 获取买入点信息
    @classmethod
    def get_buy_position_info(cls, code):
        redis = cls.redisManager.getRedis()
        val_str = RedisUtils.get(redis,"buy_position_info-{}".format(code))
    def get_buy_position_info(self, code):
        val_str = RedisUtils.get(self.redisManager.getRedis(), "buy_position_info-{}".format(code))
        if val_str is None:
            return None, None, None, None
        else:
            val = json.loads(val_str)
            return val[0], val[1], val[2], val[3]
    def get_buy_position_info_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy_position_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None, None, None, None
    # 删除买入点信息
    @classmethod
    def remove_buy_position_info(cls, code):
        redis = cls.redisManager.getRedis()
        RedisUtils.delete( redis,"buy_position_info-{}".format(code))
    def remove_buy_position_info(self, code):
        tool.CodeDataCacheUtil.clear_cache(self.__buy_position_info_cache, code)
        RedisUtils.delete_async(self.__db, "buy_position_info-{}".format(code))
    # 设置买入确认点信息
    @classmethod
    def __set_buy_sure_position(cls, code, index, data):
    def __set_buy_sure_position(self, code, index, data):
        logger_trade.debug("买入确认点信息: code:{} index:{} data:{}", code, index, data)
        redis = cls.redisManager.getRedis()
        key = "buy_sure_position-{}".format(code)
        RedisUtils.setex(redis, key, tool.get_expire(), json.dumps((index, data)))
        cls.buy_sure_position_dict[code] = (index, data)
        RedisUtils.setex(self.redisManager.getRedis(), key, tool.get_expire(), json.dumps((index, data)))
        self.buy_sure_position_dict[code] = (index, data)
        # 移除下单信号的详细信息
        cls.remove_buy_position_info(code)
        self.remove_buy_position_info(code)
    # 清除买入确认点信息
    @classmethod
    def __clear_buy_sure_position(cls, code):
        redis = cls.redisManager.getRedis()
    def __clear_buy_sure_position(self, code):
        key = "buy_sure_position-{}".format(code)
        RedisUtils.delete(redis, key)
        if code in cls.buy_sure_position_dict:
            cls.buy_sure_position_dict.pop(code)
        RedisUtils.delete(self.redisManager.getRedis(), key)
        if code in self.buy_sure_position_dict:
            self.buy_sure_position_dict.pop(code)
    # 获取买入确认点信息
    @classmethod
    def get_buy_sure_position(cls, code):
        temp = cls.buy_sure_position_dict.get(code)
    def get_buy_sure_position(self, code):
        temp = self.buy_sure_position_dict.get(code)
        if temp is not None:
            return temp[0], temp[1]
        redis = cls.redisManager.getRedis()
        key = "buy_sure_position-{}".format(code)
        val = RedisUtils.get(redis, key)
        val = RedisUtils.get(self.redisManager.getRedis(), key)
        if val is None:
            return None, None
        else:
            val = json.loads(val)
            cls.buy_sure_position_dict[code] = (val[0], val[1])
            self.buy_sure_position_dict[code] = (val[0], val[1])
            return val[0], val[1]
    # 处理买入确认点信息
    @classmethod
    def process_buy_sure_position_info(cls, code, capture_time, l2_today_datas, l2_latest_data, l2_add_datas):
        buy_capture_time, trade_time, l2_data, l2_data_index = cls.get_buy_position_info(code)
    def process_buy_sure_position_info(self, code, capture_time, l2_today_datas, l2_latest_data, l2_add_datas):
        buy_capture_time, trade_time, l2_data, l2_data_index = self.get_buy_position_info_cache(code)
        if buy_capture_time is None:
            # 没有购买者信息
            return None
@@ -144,15 +175,15 @@
                    if l2_data_util.get_time_as_seconds(_time) - old_time_int >= 2:
                        index = i - 1
                        data = l2_today_datas[index]
                        cls.__set_buy_sure_position(code, index, data)
                        self.__set_buy_sure_position(code, index, data)
                        break
            else:
                cls.__set_buy_sure_position(code, l2_data_index, l2_data)
                self.__set_buy_sure_position(code, l2_data_index, l2_data)
        elif new_time_int - old_time_int >= 0:
            # 间隔2s内表示数据正常,将其位置设置为新增数据的中间位置
            index = len(l2_today_datas) - 1 - (len(l2_add_datas)) // 2
            data = l2_today_datas[index]
            cls.__set_buy_sure_position(code, index, data)
            self.__set_buy_sure_position(code, index, data)
        else:
            # 间隔时间小于0 ,一般产生原因是数据回溯产生,故不做处理
            logger_trade.warning("预估委托位置错误:数据间隔时间小于0 code-{}", code)
@@ -161,59 +192,65 @@
# 代码实时价格管理器
class CodeActualPriceProcessor:
    __code_current_rate_cache = {}
    __code_current_rate_latest = {}
    __db = 0
    __redisManager = redis_manager.RedisManager(0)
    __instance = None
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodeActualPriceProcessor, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    # 保存跌价的时间
    def __save_down_price_time(self, code, time_str):
        key = "under_water_last_time-{}".format(code)
        RedisUtils.setex(self.__get_redis(),key, tool.get_expire(), time_str)
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    def __remove_down_price_time(self, code):
        key = "under_water_last_time-{}".format(code)
        RedisUtils.delete( self.__get_redis(), key)
    def __get_last_down_price_time(self, code):
        key = "under_water_last_time-{}".format(code)
        return RedisUtils.get(self.__get_redis(), key)
    def __increment_down_price_time(self, code, seconds):
        key = "under_water_seconds-{}".format(code)
        RedisUtils.incrby(
            self.__get_redis(),key, seconds)
        # 设置个失效时间
        RedisUtils.expire(self.__get_redis(), key, tool.get_expire())
    def __get_down_price_time_as_seconds(self, code):
        key = "under_water_seconds-{}".format(code)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None
        else:
            return int(val)
    # 清除所有的水下捞数据
    def clear_under_water_data(self):
        key = "under_water_*"
        keys = RedisUtils.keys(self.__get_redis(), key)
        for k in keys:
            RedisUtils.delete(self.__get_redis(), k)
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "code_current_rate-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__code_current_rate_cache, code, float(val))
        except Exception as e:
            pass
        finally:
            RedisUtils.realse(__redis)
    def __save_current_price_codes_count(self, count):
        key = "current_price_codes_count"
        RedisUtils.setex(self.__get_redis(),key, 10, count)
        RedisUtils.setex(self.__get_redis(), key, 10, count)
    def __get_current_price_codes_count(self):
        key = "current_price_codes_count"
        count = RedisUtils.get(self.__get_redis(),key)
        count = RedisUtils.get(self.__get_redis(), key)
        return 0 if count is None else count
    # 保存当前涨幅
    def __save_current_rate(self, code, rate):
        # 变化之后才会持久化
        if self.__code_current_rate_latest.get(code) == rate:
            return
        self.__code_current_rate_latest[code] = rate
        tool.CodeDataCacheUtil.set_cache(self.__code_current_rate_cache, code, rate)
        key = "code_current_rate-{}".format(code)
        RedisUtils.setex(self.__get_redis(),key, tool.get_expire(), rate)
        RedisUtils.setex_async(self.__db, key, tool.get_expire(), rate)
    # 批量保存
    def __save_current_rates(self, datas):
        # 变化之后才会持久化
        for d in datas:
            if self.__code_current_rate_latest.get(d[0]) == d[1]:
                continue
            self.__code_current_rate_latest[d[0]] = d[1]
            tool.CodeDataCacheUtil.set_cache(self.__code_current_rate_cache, d[0], d[1])
            key = "code_current_rate-{}".format(d[0])
            RedisUtils.setex_async(self.__db, key, tool.get_expire(), d[1])
    # 获取当前涨幅
    def __get_current_rate(self, code):
@@ -224,29 +261,10 @@
        return None
    def get_current_rate(self, code):
        return self.__get_current_rate(code)
    def process_rate(self, code, rate, time_str):
        # 保存目前的代码涨幅
        self.__save_current_rate(code, rate)
        # 9点半之前的数据不处理
        if int(time_str.replace(":", "")) < int("093000"):
            return
        # now_str = tool.get_now_time_str()
        if rate >= 0:
            down_start_time = self.__get_last_down_price_time(code)
            if down_start_time is None:
                return
            else:
                # 累计增加时间
                time_second = tool.trade_time_sub(time_str, down_start_time)
                self.__increment_down_price_time(code, time_second)
                # 删除起始时间
                self.__remove_down_price_time(code)
        else:
            # 记录开始值
            if self.__get_last_down_price_time(code) is None:
                self.__save_down_price_time(code, time_str)
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__code_current_rate_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    # 保存现价
    def save_current_price(self, code, price, is_limit_up):
@@ -264,20 +282,6 @@
    def get_current_price_codes_count(self):
        return self.__get_current_price_codes_count()
    # 是否为水下捞
    def is_under_water(self, code, now_time=None):
        time_seconds = self.__get_down_price_time_as_seconds(code)
        if time_seconds is None:
            return False
        else:
            if time_seconds >= constant.UNDER_WATER_PRICE_TIME_AS_SECONDS:
                if now_time is None:
                    now_time = tool.get_now_time_str()
                space = tool.trade_time_sub(now_time, "09:30:00")
                if space > 0 and time_seconds / space >= 0.2:
                    return True
            return False
    # 当前代码是否涨停
    def current_is_limit_up(self, code):
        data = self.get_current_price(code)
@@ -292,7 +296,7 @@
        infos = []
        for k in keys:
            code = k.split("-")[1]
            rate = self.__get_current_rate(code)
            rate = self.get_current_rate(code)
            infos.append((code, rate))
        # 排序信息
        sorted_infos = sorted(infos, key=lambda tup: tup[1], reverse=True)
@@ -304,41 +308,222 @@
# 涨停次数管理
class placeordercountmanager:
class PlaceOrderCountManager:
    __db = 0
    __redisManager = redis_manager.RedisManager(0)
    __place_order_count_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(PlaceOrderCountManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def __incre_place_order_count(cls, code):
        key = "place_order_count-{}".format(code)
        RedisUtils.incrby(cls.__get_redis(), key, 1)
        RedisUtils.expire(cls.__get_redis(), key, tool.get_expire())
    def __load_datas(cls):
        redis_ = cls.__get_redis()
        try:
            keys = RedisUtils.keys(redis_, "place_order_count-*")
            for k in keys:
                code = k.split("-")[-1]
                count = RedisUtils.get(redis_, k)
                cls.__place_order_count_cache[code] = int(count)
        finally:
            RedisUtils.realse(redis_)
    @classmethod
    def __get_place_order_count(cls, code):
    def __incre_place_order_count(self, code):
        if code not in self.__place_order_count_cache:
            self.__place_order_count_cache[code] = 0
        self.__place_order_count_cache[code] += 1
        key = "place_order_count-{}".format(code)
        count = RedisUtils.get(cls.__get_redis(), key)
        RedisUtils.incrby_async(self.__db, key, 1)
        RedisUtils.expire_async(self.__db, key, tool.get_expire())
    def __get_place_order_count(self, code):
        key = "place_order_count-{}".format(code)
        count = RedisUtils.get(self.__get_redis(), key)
        if count is not None:
            return int(count)
        return 0
    @classmethod
    def place_order(cls, code):
        cls.__incre_place_order_count(code)
    def __get_place_order_count_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__place_order_count_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return 0
    @classmethod
    def get_place_order_count(cls, code):
        return cls.__get_place_order_count(code)
    def place_order(self, code):
        self.__incre_place_order_count(code)
    @classmethod
    def clear_place_order_count(cls, code):
    def get_place_order_count(self, code):
        return self.__get_place_order_count_cache(code)
    def clear_place_order_count(self, code):
        self.__place_order_count_cache[code] = 0
        key = "place_order_count-{}".format(code)
        RedisUtils.delete(cls.__get_redis(), key)
        RedisUtils.delete_async(self.__db, key)
    def clear(self):
        self.__place_order_count_cache.clear()
        keys = RedisUtils.keys(self.__get_redis(), "place_order_count-*")
        for k in keys:
            RedisUtils.delete(self.__get_redis(), k)
# 账户可用资金管理
class AccountMoneyManager:
    __db = 2
    __redis_manager = redis_manager.RedisManager(2)
    __available_money_cache = None
    __commission_cache = None
    __instance = None
    __mysqldb = Mysqldb()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(AccountMoneyManager, cls).__new__(cls, *args, **kwargs)
            __redis = cls.__get_redis()
            result = RedisUtils.get(cls.__get_redis(), "trade-account-canuse-money")
            if result:
                cls.__available_money_cache = round(float(result), 2)
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def set_available_money(self, client_id, money):
        self.__available_money_cache = round(float(money), 2)
        RedisUtils.set(self.__get_redis(), "trade-account-canuse-money", money)
    def set_commission(self, commission):
        self.__commission_cache = commission
    # 获取交易账户的可用金额
    def get_available_money(self):
        result = RedisUtils.get(self.__get_redis(), "trade-account-canuse-money")
        if result is None:
            return None
        return round(float(result), 2)
    def get_available_money_cache(self):
        return self.__available_money_cache
    def get_commission_cache(self):
        return self.__commission_cache
    def get_delegated_count_info(self, from_date=None, to_date=None):
        """
        获取委托数量信息
        @return:
        """
        if not from_date:
            from_date = tool.get_now_date_str("%Y%m%d")
        if not to_date:
            to_date = tool.get_now_date_str("%Y%m%d")
        sql = f"SELECT * FROM (SELECT '挂买', COUNT(*) AS '数量'  FROM `hx_trade_delegate_record` r WHERE r.`direction`=0 AND r.`insertDate`>='{from_date}' AND r.`insertDate`<='{to_date}'"
        sql += " UNION ALL "
        sql += f"SELECT '撤挂买',COUNT(*) AS '数量' FROM `hx_trade_delegate_record` r WHERE r.`direction`=0 AND r.`cancelTime`!='' AND r.`cancelTime`IS NOT NULL AND r.`insertDate`>='{from_date}' AND r.`insertDate`<='{to_date}'"
        sql += " UNION ALL "
        sql += f"SELECT '撤挂卖', COUNT(*) AS '数量'  FROM `hx_trade_delegate_record` r WHERE r.`direction`=1 AND r.`cancelTime`!='' AND r.`cancelTime`IS NOT NULL AND r.`insertDate`>='{from_date}' AND r.`insertDate`<='{to_date}'"
        sql += " UNION ALL "
        sql += f"SELECT '挂卖' ,COUNT(*) AS  '数量' FROM `hx_trade_delegate_record` r WHERE r.`direction`=1 AND r.`insertDate`>='{from_date}' AND r.`insertDate`<='{to_date}'"
        sql += ") a"
        return self.__mysqldb.select_all(sql)
    def get_deal_count_info(self, from_date=None, to_date=None):
        if not from_date:
            from_date = tool.get_now_date_str("%Y%m%d")
        if not to_date:
            to_date = tool.get_now_date_str("%Y%m%d")
        sql = "SELECT * FROM ( "
        sql += f"SELECT '股票', COUNT(*), sum(a.price*a.volume) as '金额' FROM (SELECT * FROM `hx_trade_deal_record` r WHERE r.`tradeDate` >='{from_date}' and  r.`tradeDate` <='{to_date}' AND (r.`securityID` LIKE '30%' OR r.`securityID` LIKE '60%'  OR r.`securityID` LIKE '68%' OR r.`securityID` LIKE '00%')  GROUP BY r.`orderSysID`) a"
        sql += " UNION ALL "
        sql += f"SELECT '上证基金', COUNT(*) AS '数量', sum(a.price*a.volume) as '金额' FROM (SELECT * FROM `hx_trade_deal_record` r WHERE r.`tradeDate` >='{from_date}' and  r.`tradeDate` <='{to_date}' AND (r.`securityID` LIKE '11%')  GROUP BY r.`orderSysID`) a"
        sql += " UNION ALL "
        sql += f"SELECT '深证基金', COUNT(*) AS '数量', sum(a.price*a.volume) as '金额' FROM (SELECT * FROM `hx_trade_deal_record` r WHERE r.`tradeDate` >='{from_date}' and  r.`tradeDate` <='{to_date}' AND (r.`securityID` LIKE '12%')  GROUP BY r.`orderSysID`) a"
        sql += ") a"
        return self.__mysqldb.select_all(sql)
# 激进买成交代码
class RadicalBuyDealCodesManager:
    """
    激进买成交代码管理
    """
    __db = 2
    __redis_manager = redis_manager.RedisManager(2)
    __deal_codes_cache = set()
    __instance = None
    # 根据L2数据来激进买入的有效时间:{"code":(有效截至时间, 买单号, 扫入的板块, 最近成交时间, 买入板块净流入情况, 是否是板上放量买入)}
    buy_by_l2_delegate_expire_time_dict = {}
    # 仅仅买的板块
    __radical_buy_blocks_dict = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(RadicalBuyDealCodesManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
    @classmethod
    def __load_data(cls):
        result = RedisUtils.smembers(cls.__get_redis(), "radical_buy_deal_codes")
        if result:
            cls.__deal_codes_cache = set(result)
        keys = RedisUtils.keys(cls.__get_redis(), "radical_buy_blocks-*")
        if keys:
            for k in keys:
                code = k.split("-")[1]
                val = RedisUtils.get(cls.__get_redis(), k)
                val = json.loads(val)
                cls.__radical_buy_blocks_dict[code] = set(val)
            cls.__deal_codes_cache = set(result)
    def set_code_blocks(self, code, blocks):
        self.__radical_buy_blocks_dict[code] = set(blocks)
        RedisUtils.setex_async(self.__db, f"radical_buy_blocks-{code}", tool.get_expire(), json.dumps(list(blocks)))
    def get_code_blocks(self, code):
        return self.__radical_buy_blocks_dict.get(code)
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def add_deal_code(self, code):
        """
        添加已成交的代码
        @param code:
        @return:
        """
        self.__deal_codes_cache.add(code)
        RedisUtils.sadd_async(self.__db, "radical_buy_deal_codes", code)
        RedisUtils.expire_async(self.__db, "radical_buy_deal_codes", tool.get_expire())
    def get_deal_codes(self):
        """
        获取已经成交的代码
        @return:
        """
        if self.__deal_codes_cache:
            return self.__deal_codes_cache
        return set()
if __name__ == "__main__":
    processor = CodeActualPriceProcessor()
    print(processor.get_top_rate_codes(30))
    pass