Administrator
2023-10-30 fb47d36048e94b9a506d5c153e3dd19a01e37df1
third_data/code_plate_key_manager.py
@@ -7,48 +7,96 @@
import constant
from db.redis_manager_delegate import RedisUtils
from third_data import kpl_block_util
from third_data import kpl_block_util, kpl_api
from utils import global_util, tool
from log_module import log
from log_module import log, async_log_util
from db import redis_manager_delegate as redis_manager
from log_module.log import logger_kpl_limit_up, logger_kpl_block_can_buy, logger_kpl_debug
from log_module.log import logger_kpl_limit_up, logger_kpl_block_can_buy
from third_data.kpl_util import KPLPlatManager
from trade import trade_manager
from trade import trade_manager, l2_trade_util
# 代码精选板块管理
class KPLCodeJXBlockManager:
    __db = 3
    __redisManager = redis_manager.RedisManager(3)
    __code_blocks = {}
    # 备用
    __code_by_blocks = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(KPLCodeJXBlockManager, cls).__new__(cls, *args, **kwargs)
        return cls.__instance
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def save_jx_blocks(self, code, blocks):
    def save_jx_blocks(self, code, blocks, by=False):
        if blocks is None:
            return
        if len(blocks) >2:
        if len(blocks) > 2:
            blocks = blocks[:2]
        # 保存前2条数据
        RedisUtils.setex(self.__get_redis(), f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(blocks))
        self.__code_blocks[code] = blocks
        if by:
            RedisUtils.setex_async(self.__db, f"kpl_jx_blocks_by-{code}", tool.get_expire(), json.dumps(blocks))
            self.__code_by_blocks[code] = blocks
        else:
            RedisUtils.setex_async(self.__db, f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(blocks))
            self.__code_blocks[code] = blocks
    # 获取精选板块
    def get_jx_blocks(self, code):
        if code in self.__code_blocks:
            return self.__code_blocks[code]
        val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks-{code}")
        if val is None:
            return None
    def get_jx_blocks(self, code, by=False):
        if by:
            if code in self.__code_by_blocks:
                return self.__code_by_blocks[code]
            val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks_by-{code}")
            if val is None:
                return None
            else:
                val = json.loads(val)
                self.__code_by_blocks[code] = val
            return self.__code_by_blocks[code]
        else:
            val = json.loads(val)
            self.__code_blocks[code] = val
        return self.__code_blocks[code]
            if code in self.__code_blocks:
                return self.__code_blocks[code]
            val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks-{code}")
            if val is None:
                return None
            else:
                val = json.loads(val)
                self.__code_blocks[code] = val
            return self.__code_blocks[code]
    def get_jx_blocks_cache(self, code):
        return self.__code_blocks.get(code)
    def get_jx_blocks_cache(self, code, by=False):
        if by:
            return self.__code_by_blocks.get(code)
        else:
            return self.__code_blocks.get(code)
    # 从网络上加载精选板块
    def load_jx_blocks(self, code, buy_1_price, limit_up_price):
        if limit_up_price and buy_1_price:
            # 处理买1,卖1信息
            pre_close_price = round(float(limit_up_price) / 1.1, 2)
            # 如果涨幅大于7%就读取板块
            price_rate = (buy_1_price - pre_close_price) / pre_close_price
            if price_rate > 0.07:
                if not self.get_jx_blocks_cache(code):
                    blocks = kpl_api.getCodeJingXuanBlocks(code)
                    self.save_jx_blocks(code, blocks)
                    async_log_util.info(logger_kpl_block_can_buy,f"{code}:获取到精选板块-{blocks}")
            elif price_rate > 0.03:
                # 添加备用板块
                if not self.get_jx_blocks_cache(code, by=True):
                    blocks = kpl_api.getCodeJingXuanBlocks(code)
                    self.save_jx_blocks(code, blocks, by=True)
                    async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块(备用)-{blocks}")
# 开盘啦禁止交易板块管理
@@ -263,11 +311,9 @@
    @classmethod
    def is_in_top(cls, keys):
        reasons = cls.get_can_buy_key_set()
        log.logger_kpl_debug.debug("市场流入前5:{}", reasons)
        forbidden_plates = cls.__KPLPlateForbiddenManager.list_all_cache()
        reasons = reasons - forbidden_plates
        temp_set = keys & reasons
        log.logger_kpl_debug.debug("市场流入前5匹配结果:{}", temp_set)
        if temp_set:
            return True, temp_set
        else:
@@ -289,7 +335,6 @@
        self.__history_limit_up_reason_dict[code] = set(reasons)
        RedisUtils.setex(self.__get_redis(), f"kpl_his_limit_up_reason-{code}", tool.get_expire(),
                         json.dumps(list(reasons)))
        logger_kpl_debug.debug(f"设置历史涨停原因:{code}-{reasons}")
    # 如果返回值不为None表示已经加载过历史原因了
    def get_history_limit_up_reason(self, code):
@@ -367,6 +412,8 @@
        k4 = set()
        jingxuan_blocks = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code)
        if not jingxuan_blocks:
            jingxuan_blocks = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True)
        if jingxuan_blocks:
            jingxuan_blocks = jingxuan_blocks[:2]
            k4 |= set([x[1] for x in jingxuan_blocks])
@@ -397,6 +444,62 @@
    __CodesTradeStateManager = trade_manager.CodesTradeStateManager()
    __can_buy_compute_result_dict = {}
    @classmethod
    def __remove_from_l2(cls, code, msg):
        # 根据身位移除代码
        # return
        # 下过单的代码不移除
        if trade_manager.CodesTradeStateManager().get_trade_state_cache(code) != trade_manager.TRADE_STATE_NOT_TRADE:
            # 只要下过单的就不移除
            return
        l2_trade_util.forbidden_trade(code, msg=msg)
        logger_kpl_block_can_buy.info(msg)
    @classmethod
    def __is_block_can_buy(cls, code, block, current_limit_up_datas, code_limit_up_reason_dict,
                           yesterday_current_limit_up_codes, limit_up_record_datas):
        # log.logger_kpl_debug.info(f"判断板块是否可买:{block}")
        # is_top_8_record, top_8_record = kpl_block_util.is_record_top_block(code, block, limit_up_record_datas,
        #                                                                    yesterday_current_limit_up_codes, 50)
        # is_top_4_current, top_4_current = kpl_block_util.is_current_top_block(code, block, current_limit_up_datas,
        #                                                                       yesterday_current_limit_up_codes, 50)
        # is_top_4 = is_top_8_record and is_top_4_current
        # msg_list.append(f"\n实时top10(涨停数量:{len(current_limit_up_datas)})")
        # msg_list.append(f"历史top20(涨停数量:{len(top_8_record)})")
        # 获取主板实时身位,剔除高位板
        current_shsz_rank, front_current_shsz_rank_codes = kpl_block_util.get_code_current_rank(code, block,
                                                                                                current_limit_up_datas,
                                                                                                code_limit_up_reason_dict,
                                                                                                yesterday_current_limit_up_codes,
                                                                                                shsz=True)
        record_shsz_rank, record_shsz_rank_codes = kpl_block_util.get_code_record_rank(code, block,
                                                                                       limit_up_record_datas,
                                                                                       code_limit_up_reason_dict,
                                                                                       yesterday_current_limit_up_codes,
                                                                                       shsz=True)
        # 获取主板历史身位
        open_limit_up_codes = kpl_block_util.get_shsz_open_limit_up_codes(code, block, limit_up_record_datas,
                                                                          code_limit_up_reason_dict)
        if open_limit_up_codes:
            # 主板开1
            if current_shsz_rank < len(open_limit_up_codes) + 1 and record_shsz_rank < len(open_limit_up_codes) + 2:
                # 属于龙1,龙2
                return True, f"{tool.get_now_time_str()} {block}:top10涨停板块,主板开1({open_limit_up_codes}),属于主板前龙{len(open_limit_up_codes) + 1}(实时身位-{current_shsz_rank}/{len(current_limit_up_datas)})"
            else:
                if record_shsz_rank >= len(open_limit_up_codes) + 1:
                    cls.__remove_from_l2(code, f"{code}根据身位禁止买入:【{block}】历史身位{record_shsz_rank}")
                return False, f"板块-{block}: top4涨停板块,主板开1({open_limit_up_codes}),不为主板前龙{len(open_limit_up_codes) + 1}(实时身位-{current_shsz_rank}:{front_current_shsz_rank_codes},历史身位-{record_shsz_rank})"
        else:
            if current_shsz_rank == 0 and record_shsz_rank < 2:
                return True, f"{tool.get_now_time_str()} {block}:top4涨停板块,非主板开1,属于龙1,实时涨停列表数量({len(current_limit_up_datas)})"
            else:
                if record_shsz_rank >= 2:
                    cls.__remove_from_l2(code, f"{code}根据身位禁止买入:【{block}】历史身位{record_shsz_rank}")
                return False, f"板块-{block}: top4涨停板块,非主板开1,不为主板龙1(实时身位-{current_shsz_rank}:{front_current_shsz_rank_codes},历史身位-{record_shsz_rank})"
    # 获取可以买的板块
    # current_limit_up_datas: 今日实时涨停
    # latest_2_day_limit_up_datas:最近2天的实时涨停(不含今日)
@@ -415,16 +518,12 @@
                    code_limit_up_reason_dict[d[3]] = d[2]
            return code_limit_up_reason_dict
        now_time = int(tool.get_now_time_str().replace(":", ""))
        times = [100000, 103000, 110000, 133000, 150000]
        time_index = 0
        for i in range(len(times)):
            if now_time < times[i]:
                time_index = i
                break
        if current_limit_up_datas is None:
            current_limit_up_datas = []
        # 获取目标代码板块
        keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code)
        log.logger_kpl_debug.info("{}关键词:今日-{},今日历史-{},历史-{},二级行业-{},代码板块-{}", code, k1, k11, k2, k3, k4)
        # log.logger_kpl_debug.info("{}关键词:今日-{},今日历史-{},历史-{},二级行业-{},代码板块-{}", code, k1, k11, k2, k3, k4)
        keys = set()
        if k1:
            for k in k1:
@@ -437,7 +536,7 @@
                keys |= k4
                keys = keys - constant.KPL_INVALID_BLOCKS
        log.logger_kpl_debug.info("{}最终关键词:{}", code, keys)
        # log.logger_kpl_debug.info("{}最终关键词:{}", code, keys)
        # 涨停列表中匹配关键词,返回(板块:代码集合),代码集合中已经排除自身
        if not keys:
@@ -448,63 +547,13 @@
        can_buy_blocks = []
        for block in keys:
            log.logger_kpl_debug.info(f"判断板块是否可买:{block}")
            # is_top_8_record, top_8_record = kpl_block_util.is_record_top_block(code, block, limit_up_record_datas,
            #                                                                    yesterday_current_limit_up_codes, 50)
            # is_top_4_current, top_4_current = kpl_block_util.is_current_top_block(code, block, current_limit_up_datas,
            #                                                                       yesterday_current_limit_up_codes, 50)
            # is_top_4 = is_top_8_record and is_top_4_current
            # msg_list.append(f"\n实时top10(涨停数量:{len(current_limit_up_datas)})")
            # msg_list.append(f"历史top20(涨停数量:{len(top_8_record)})")
            # 获取主板实时身位,剔除高位板
            current_shsz_rank = kpl_block_util.get_code_current_rank(code, block, current_limit_up_datas,
                                                                     code_limit_up_reason_dict,
                                                                     yesterday_current_limit_up_codes, shsz=True)
            record_shsz_rank = kpl_block_util.get_code_record_rank(code, block, limit_up_record_datas,
                                                                   code_limit_up_reason_dict,
                                                                   yesterday_current_limit_up_codes, shsz=True)
            # 获取主板历史身位
            if True:
                pen_limit_up_codes = kpl_block_util.get_shsz_open_limit_up_codes(code, block, limit_up_record_datas,
                                                                                 code_limit_up_reason_dict)
                if pen_limit_up_codes:
                    # 主板开1
                    if current_shsz_rank < len(pen_limit_up_codes) + 1 and record_shsz_rank < len(
                            pen_limit_up_codes) + 1:
                        # 属于龙1,龙2
                        can_buy_blocks.append((block,
                                               f"{block}:top10涨停板块,主板开1({pen_limit_up_codes}),属于主板前龙{len(pen_limit_up_codes) + 1}(实时身位-{current_shsz_rank})"))
                        continue
                    else:
                        msg_list.append(
                            f"板块-{block}: top4涨停板块,主板开1({pen_limit_up_codes}),不为主板前龙{len(pen_limit_up_codes) + 1}(实时身位-{current_shsz_rank},历史身位-{record_shsz_rank})")
                        continue
                else:
                    if current_shsz_rank == 0 and record_shsz_rank < 2:
                        can_buy_blocks.append((block, f"{block}:top4涨停板块,非主板开1,属于龙1"))
                        continue
                    else:
                        msg_list.append(
                            f"板块-{block}: top4涨停板块,非主板开1,不为主板龙1(实时身位-{current_shsz_rank},历史身位-{record_shsz_rank})")
                        continue
            can_buy, msg = cls.__is_block_can_buy(code, block, current_limit_up_datas, code_limit_up_reason_dict,
                                                  yesterday_current_limit_up_codes, limit_up_record_datas)
            if can_buy:
                can_buy_blocks.append((block, msg))
            else:
                pass
                # # 是否满足行业精选流入要求
                # is_in_top_input = RealTimeKplMarketData.is_in_top(set([block]))[0]
                # if not is_in_top_input:
                #     msg_list.append(
                #         f"板块-{block}: 非top4涨停板块,不满足精选/行业流入要求")
                #     continue
                # else:
                #     # 是否为主板龙1(实时龙1,历史龙2以内)
                #     if current_shsz_rank == 0 and record_shsz_rank < 2:
                #         can_buy_blocks.append((block, f"{block}:不是top4涨停板块,满足精选/行业流入要求,满足主板龙1"))
                #         continue
                #     else:
                #         msg_list.append(
                #             f"板块-{block}: 不是top4涨停板块,满足精选/行业流入要求,不为主板龙1(实时身位-{current_shsz_rank},历史身位-{record_shsz_rank})")
                #         continue
                msg_list.append(msg)
        if len(can_buy_blocks) == len(keys):
            blocks = [x[0] for x in can_buy_blocks]
            blocks_msg = "\n".join([x[1] for x in can_buy_blocks])
@@ -516,8 +565,11 @@
    # 返回:是否可以下单,消息,板块类型
    @classmethod
    def can_buy(cls, code):
        # if constant.TEST:
        #     return True, cls.BLOCK_TYPE_NONE
        if constant.TEST:
            return True, cls.BLOCK_TYPE_NONE
        # if True:
        #     # 测试
        #     return True, "不判断板块身位"
        return cls.__can_buy_compute_result_dict.get(code)
    @classmethod
@@ -529,7 +581,6 @@
                                                  before_blocks_dict)
        if not blocks:
            return False, block_msg
        log.logger_kpl_debug.info(f"{code}:获取委托/买入代码")
        codes_delegate = set(cls.__CodesTradeStateManager.get_codes_by_trade_states_cache(
            {trade_manager.TRADE_STATE_BUY_DELEGATED, trade_manager.TRADE_STATE_BUY_PLACE_ORDER}))
        codes_success = set(cls.__CodesTradeStateManager.get_codes_by_trade_states_cache(
@@ -541,13 +592,11 @@
        trade_codes_blocks_dict = {}
        # 已经成交的板块
        trade_success_blocks_count = {}
        log.logger_kpl_debug.info(f"{code}:获取代码板块")
        for c in codes:
            keys_, k1_, k11_, k2_, k3_, k4_ = cls.__TargetCodePlateKeyManager.get_plate_keys(c)
            # 实时涨停原因
            trade_codes_blocks_dict[c] = k1_ | k4_
        # 统计板块中的代码
        log.logger_kpl_debug.info(f"{code}:统计板块中的代码")
        trade_block_codes_dict = {}
        for c in trade_codes_blocks_dict:
            for b in trade_codes_blocks_dict[c]:
@@ -560,7 +609,6 @@
                trade_block_codes_dict[b].add(c)
        # ---------------------------------加载已经下单/成交的代码信息------------end-------------
        log.logger_kpl_debug.info(f"{code}:开始计算是否可以买")
        msg_list = []
        for key in blocks:
            # 板块中已经有成交的就不下单了
@@ -574,6 +622,7 @@
        return False, ",".join(msg_list)
    # 更新代码板块判断是否可以买的结果
    @classmethod
    def update_can_buy_blocks(cls, code, current_limit_up_datas, limit_up_record_datas,
                              yesterday_current_limit_up_codes,
@@ -584,6 +633,43 @@
        # 保存板块计算结果
        cls.__can_buy_compute_result_dict[code] = (can_buy, msg)
    # 判断是否为真老大
    @classmethod
    def __is_real_first_limit_up(cls, code, block, current_limit_up_datas, limit_up_record_datas,
                                 yesterday_current_limit_up_codes,
                                 before_blocks_dict):
        # 加载涨停代码的目标板块
        def load_code_block():
            if limit_up_record_datas:
                for d in limit_up_record_datas:
                    if d[2] in constant.KPL_INVALID_BLOCKS and d[3] in before_blocks_dict:
                        code_limit_up_reason_dict[d[3]] = list(before_blocks_dict.get(d[3]))[0]
                    else:
                        code_limit_up_reason_dict[d[3]] = d[2]
            return code_limit_up_reason_dict
        if current_limit_up_datas is None:
            current_limit_up_datas = []
        if limit_up_record_datas is None:
            limit_up_record_datas = []
        code_limit_up_reason_dict = {}
        load_code_block()
        can_buy, msg = cls.__is_block_can_buy(code, block, current_limit_up_datas, code_limit_up_reason_dict,
                                              yesterday_current_limit_up_codes, limit_up_record_datas)
        return can_buy, msg
    @classmethod
    def is_need_cancel(cls, code, limit_up_reason, current_limit_up_datas, limit_up_record_datas,
                       yesterday_current_limit_up_codes,
                       before_blocks_dict):
        can_buy, msg = cls.__is_real_first_limit_up(code, limit_up_reason, current_limit_up_datas,
                                                    limit_up_record_datas,
                                                    yesterday_current_limit_up_codes,
                                                    before_blocks_dict)
        if not can_buy:
            logger_kpl_block_can_buy.warning(f"{code} 根据涨停原因({limit_up_reason})匹配不能买")
        return not can_buy
if __name__ == "__main__":
    pass