Administrator
2023-11-29 49f952cfa1dbc5bedaa8a2e136a662bae50aecc1
third_data/code_plate_key_manager.py
@@ -3,7 +3,9 @@
"""
# 涨停代码关键词板块管理
import copy
import json
import time
import constant
from db.redis_manager_delegate import RedisUtils
@@ -35,19 +37,28 @@
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def save_jx_blocks(self, code, blocks, by=False):
        if blocks is None:
    def save_jx_blocks(self, code, blocks: list, by=False):
        if not blocks:
            return
        final_blocks = copy.deepcopy(blocks)
        if len(blocks) > 2:
            blocks = blocks[:2]
            final_blocks.clear()
            # 根据涨幅排序
            blocks.sort(key=lambda x: x[2])
            blocks.reverse()
            for b in blocks:
                if b[2] > 0 and b[1] not in constant.KPL_INVALID_BLOCKS:
                    final_blocks.append(b)
            if len(final_blocks) < 2:
                final_blocks = blocks
        # 保存前2条数据
        if by:
            RedisUtils.setex_async(self.__db, f"kpl_jx_blocks_by-{code}", tool.get_expire(), json.dumps(blocks))
            self.__code_by_blocks[code] = blocks
            RedisUtils.setex_async(self.__db, f"kpl_jx_blocks_by-{code}", tool.get_expire(), json.dumps(final_blocks))
            self.__code_by_blocks[code] = (final_blocks, time.time())
        else:
            RedisUtils.setex_async(self.__db, f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(blocks))
            self.__code_blocks[code] = blocks
            RedisUtils.setex_async(self.__db, f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(final_blocks))
            self.__code_blocks[code] = (final_blocks, time.time())
    # 获取精选板块
    def get_jx_blocks(self, code, by=False):
@@ -80,22 +91,47 @@
    # 从网络上加载精选板块
    def load_jx_blocks(self, code, buy_1_price, limit_up_price):
        if limit_up_price and buy_1_price:
            # 处理买1,卖1信息
            pre_close_price = round(float(limit_up_price) / 1.1, 2)
            # 如果涨幅大于7%就读取板块
            price_rate = (buy_1_price - pre_close_price) / pre_close_price
            if price_rate > 0.07:
                if not self.get_jx_blocks_cache(code):
                    blocks = kpl_api.getCodeJingXuanBlocks(code)
                    self.save_jx_blocks(code, blocks)
                    async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块-{blocks}")
            elif price_rate > 0.03:
                # 添加备用板块
                if not self.get_jx_blocks_cache(code, by=True):
                    blocks = kpl_api.getCodeJingXuanBlocks(code)
                    self.save_jx_blocks(code, blocks, by=True)
                    async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块(备用)-{blocks}")
        try:
            logger_kpl_block_can_buy.info(f"准备更新精选板块:{code}-{buy_1_price}-{limit_up_price}")
            if limit_up_price and buy_1_price:
                # 处理买1,卖1信息
                pre_close_price = round(float(limit_up_price) / 1.1, 2)
                # 如果涨幅大于7%就读取板块
                price_rate = (buy_1_price - pre_close_price) / pre_close_price
                if price_rate > 0.07:
                    jx_blocks_info = self.get_jx_blocks_cache(code)
                    if not jx_blocks_info:
                        blocks = kpl_api.getCodeJingXuanBlocks(code)
                        self.save_jx_blocks(code, blocks)
                        async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块-{blocks}")
                    else:
                        # 还没涨停的需要更新精选板块 更新精选板块
                        if abs(float(buy_1_price) - float(limit_up_price)) >= 0.001 or True:
                            # 非涨停状态
                            UPDATE_TIME_SPACE = 5 * 60
                            time_diff = tool.trade_time_sub(tool.get_now_time_str(), "09:30:00")
                            if time_diff < 0:
                                UPDATE_TIME_SPACE = 60 * 60
                            else:
                                UPDATE_TIME_SPACE = int(time_diff / 30) + 60
                                if UPDATE_TIME_SPACE > 5 * 60:
                                    UPDATE_TIME_SPACE = 5 * 60
                            if time.time() - jx_blocks_info[1] > UPDATE_TIME_SPACE:
                                # 距离上次更新时间过去了5分钟
                                blocks = kpl_api.getCodeJingXuanBlocks(code)
                                self.save_jx_blocks(code, blocks)
                                async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块(更新)-{blocks}")
                elif price_rate > 0.03:
                    # 添加备用板块
                    if not self.get_jx_blocks_cache(code, by=True):
                        blocks = kpl_api.getCodeJingXuanBlocks(code)
                        self.save_jx_blocks(code, blocks, by=True)
                        async_log_util.info(logger_kpl_block_can_buy, f"{code}:获取到精选板块(备用)-{blocks}")
        except Exception as e:
            logger_kpl_block_can_buy.error(f"{code} 获取板块出错")
            logger_kpl_block_can_buy.exception(e)
# 开盘啦禁止交易板块管理
@@ -410,11 +446,11 @@
            k3 = {industry}
        k4 = set()
        jingxuan_blocks = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code)
        if not jingxuan_blocks:
            jingxuan_blocks = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True)
        if jingxuan_blocks:
            jingxuan_blocks = jingxuan_blocks[:2]
        jingxuan_block_info = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code)
        if not jingxuan_block_info:
            jingxuan_block_info = self.__KPLCodeJXBlockManager.get_jx_blocks_cache(code, by=True)
        if jingxuan_block_info:
            jingxuan_blocks = jingxuan_block_info[0]
            k4 |= set([x[1] for x in jingxuan_blocks])
        for k in [k1, k11, k2, k3, k4]:
            keys |= k