Administrator
2025-03-04 5e66b4bb954b2cc3650d50e60be86c328fcdc158
板块拉黑重新定义
6个文件已修改
180 ■■■■ 已修改文件
main.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/data_server.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/huaxin_trade_server.py 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 111 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/history_k_data_util.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/radical_buy_data_manager.py 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py
@@ -13,6 +13,7 @@
from task import task_manager
from third_data import hx_qc_value_util
from third_data.code_plate_key_manager import KPLPlateForbiddenManager
logger_system.info("程序启动Pre:{}", os.getpid())
@@ -97,6 +98,14 @@
        logger_system.info("主进程ID:{}", os.getpid())
        fix_codes = set()
        open_buy_codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes()
        if open_buy_codes:
            fix_codes |= set(open_buy_codes)
        # 要监控的高标
        high_codes = KPLPlateForbiddenManager().get_watch_high_codes()
        if high_codes:
            fix_codes |= set(high_codes)
        # L1订阅数据
        l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run,
                                            args=(queue_l1_w_strategy_r, queue_l1_r_strategy_w,
servers/data_server.py
@@ -593,7 +593,11 @@
                        code_info[0]) != trade_constant.TRADE_STATE_NOT_TRADE:
                    code_info[5] = 1
            response_data = json.dumps({"code": 0, "data": codes_info})
            # 涨停数据
            fdatas = {"limit_up_list": codes_info}
            # 辨识度票
            fdatas["speical_codes"] =  [(x, gpcode_manager.get_code_name(x)) for x in special_codes]
            response_data = json.dumps({"code": 0, "data": fdatas})
        elif url.path == "/kpl/get_open_limit_up_count_rank":
            # 获取炸板次数排行
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
servers/huaxin_trade_server.py
@@ -357,6 +357,10 @@
        datas = data["data"]
        cls.__save_l1_current_price(datas)
        cls.__process_buy_open_limit_up_datas(datas)
        # 根据高标的实时涨幅计算拉黑板块
        rate_dict = {d[0]: d[2] for d in datas}
        cls.__process_l1_data_thread_pool.submit(
            lambda: KPLPlateForbiddenManager().compute(rate_dict))
        # 9:30之前采用非线程
        if int(tool.get_now_time_str().replace(":", "")) < int("093000") or True:
            HuaXinL1TargetCodesManager.set_level_1_codes_datas(datas, request_id=request_id)
@@ -999,17 +1003,17 @@
    # 加载自由流通量
    global_data_loader.load_zyltgb_volume_from_db()
    # 获取最近7天涨停数最多的板块
    try:
        if not KPLPlateForbiddenManager().list_all_cache() and tool.get_now_time_as_int() > int("070000"):
            # 没有添加过的时候需要重新添加
            datas_ = LatestLimitUpBlockManager().statistics_limit_up_block_infos()
            if datas_:
                for data_ in datas_:
                    # 连续2天的板块就不买
                    if data_[2] >= 2:
                        KPLPlateForbiddenManager().save_plate(data_[0])
    except:
        pass
    # try:
    #     if not KPLPlateForbiddenManager().list_all_cache() and tool.get_now_time_as_int() > int("070000"):
    #         # 没有添加过的时候需要重新添加
    #         datas_ = LatestLimitUpBlockManager().statistics_limit_up_block_infos()
    #         if datas_:
    #             for data_ in datas_:
    #                 # 连续2天的板块就不买
    #                 if data_[2] >= 2:
    #                     KPLPlateForbiddenManager().save_plate(data_[0])
    # except:
    #     pass
def run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, trade_ipc_addr):
third_data/code_plate_key_manager.py
@@ -5,18 +5,21 @@
# 涨停代码关键词板块管理
import copy
import datetime
import itertools
import json
import time
import constant
from code_attribute import gpcode_manager
from db.redis_manager_delegate import RedisUtils
from third_data import kpl_block_util, kpl_api, kpl_util
from settings.trade_setting import MarketSituationManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, ContainsLimitupCodesBlocksManager
from third_data.third_blocks_manager import BlockMapManager
from utils import global_util, tool, buy_condition_util
from log_module import async_log_util
from db import redis_manager_delegate as redis_manager
from db import redis_manager_delegate as redis_manager, mysql_data_delegate as mysql_data
from log_module.log import logger_kpl_block_can_buy, logger_kpl_jx_out, logger_kpl_jx_in, logger_debug
from third_data.kpl_util import KPLPlatManager
@@ -230,6 +233,11 @@
    # 已经删除了的板块
    __deleted_kpl_forbidden_plates_cache = set()
    # 监控的高标板块代码字典:{"板块":{"代码1","代码2"}}
    __watch_block_high_codes = {}
    # 高标代码
    __watch_high_codes = set()
    __instance = None
    def __new__(cls, *args, **kwargs):
@@ -246,6 +254,7 @@
            cls.__deleted_kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "deleted_kpl_forbidden_plates")
        finally:
            RedisUtils.realse(__redis)
        cls.__load_latest_gb()
    @classmethod
    def __get_redis(cls):
@@ -281,6 +290,103 @@
        if self.__kpl_forbidden_plates_cache and plate in self.__kpl_forbidden_plates_cache:
            return True
        return False
    @classmethod
    def __load_latest_gb(cls):
        """
        加载最近的市场高标
        @return:
        """
        # 获取最近10个交易日涨停的涨停数据
        dates = HistoryKDatasUtils.get_latest_trading_date_cache(10)
        if not dates:
            return
        min_date = dates[-1]
        sql = f"SELECT r.`_code`, r.`_hot_block_name`, r.`_day`, r.`_open` FROM `kpl_limit_up_record` r WHERE r.`_day`>='{min_date}'"
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_all(sql)
        code_days_map = {}
        # 每炸板
        f_code_days_map = {}
        for r in results:
            if r[0] not in code_days_map:
                code_days_map[r[0]] = set()
            code_days_map[r[0]].add(r[2])
            if not r[3]:
                if r[0] not in f_code_days_map:
                    f_code_days_map[r[0]] = set()
                f_code_days_map[r[0]].add(r[2])
        # 过滤涨停次数>=3次的数据
        target_codes = set()
        for code in code_days_map:
            if f_code_days_map.get(code) and (len(f_code_days_map.get(code)) >= 4 or (
                    tool.is_ge_code(code) and len(f_code_days_map.get(code)) >= 2)):
                # 且有3天属于连续涨停
                day_list = list(code_days_map[code])
                day_list.sort(reverse=True)
                step = 3
                has_continue = False
                for i in range(0, len(day_list) - step + 1):
                    item_list = day_list[i:i + step]
                    # 是否属于连续涨停
                    is_sub = False
                    for j in range(0, len(dates) - step):
                        if f"{dates[j:j + step]}" == f"{item_list}":
                            is_sub = True
                            break
                    if is_sub:
                        has_continue = True
                        break
                if not has_continue:
                    continue
                target_codes.add(code)
        # 代码对应的板块
        code_blocks = {}
        for r in results:
            if r[0] not in target_codes:
                continue
            if r[0] not in code_blocks:
                code_blocks[r[0]] = set()
            code_blocks[r[0]].add(kpl_util.filter_block(r[1]))
        # 所有板块对应的代码集合
        block_codes = {}
        for code in code_blocks:
            for b in code_blocks[code]:
                if b in constant.KPL_INVALID_BLOCKS:
                    continue
                if b not in block_codes:
                    block_codes[b] = set()
                block_codes[b].add(code)
        print(block_codes)
        cls.__watch_block_high_codes = block_codes
        cls.__watch_high_codes.clear()
        for b in block_codes:
            cls.__watch_high_codes |= block_codes[b]
        for k in block_codes:
            print(k, [(x, gpcode_manager.get_code_name(x)) for x in block_codes[k]])
    def get_watch_high_codes(self):
        return self.__watch_high_codes
    def compute(self, code_rate_dict: dict):
        """
        根据比例计算需要拉黑的代码
        @param code_rate_dict: 涨幅百分数
        @return:
        """
        if self.__watch_block_high_codes:
            forbidden_blocks = set()
            for b in self.__watch_block_high_codes:
                total_rate = 0
                for code in self.__watch_block_high_codes[b]:
                    if code in code_rate_dict:
                        total_rate += code_rate_dict.get(code)
                average_rate = total_rate / len(self.__watch_block_high_codes[b])
                if average_rate < 1:
                    forbidden_blocks.add(b)
            self.__kpl_forbidden_plates_cache = forbidden_blocks
class LimitUpCodesPlateKeyManager:
@@ -1230,5 +1336,4 @@
if __name__ == "__main__":
    RealTimeKplMarketData.set_market_strong(120)
    print(RealTimeKplMarketData.get_jingxuan_in_block_threshold_count())
    KPLPlateForbiddenManager().compute()
third_data/history_k_data_util.py
@@ -379,16 +379,4 @@
if __name__ == "__main__":
    print(HistoryKDatasUtils.get_previous_trading_date("2024-12-31"))
    print(HistoryKDatasUtils.get_history_tick_n("000095", 10))
    # now_day = tool.get_now_date_str()
    # results = JueJinApi.get_history_instruments(JueJinApi.get_juejin_code_list_with_prefix(["600265"]),
    #                                             tool.date_sub(now_day, 30), tool.date_sub(now_day, 1))
    # results = results[-5:]
    # normal = True
    # for r in results:
    #     if r["sec_level"] != 1:
    #         normal = False
    #         break
    # print(normal)
    pass
trade/buy_radical/radical_buy_data_manager.py
@@ -1200,13 +1200,13 @@
        #         return False, f"处于首板老{history_index + 1},量比({volume_rate})<0.8"
        # 前排最多允许1个炸板
        limit_up_timestamp = cls.__get_limit_up_timestamp(code)
        # 获取当前的板块, 不要忽略开1的数据
        current_index, current_before_codes_info = cls.__get_current_index(code, block, yesterday_limit_up_codes,
                                                                           limit_up_time=limit_up_timestamp,
                                                                           ignore_open_limit_up=False)
        if history_index - current_index > 1:
            return False, f"前排只允许一个炸板:炸板个数-{history_index - current_index}"
        # limit_up_timestamp = cls.__get_limit_up_timestamp(code)
        # # 获取当前的板块, 不要忽略开1的数据
        # current_index, current_before_codes_info = cls.__get_current_index(code, block, yesterday_limit_up_codes,
        #                                                                    limit_up_time=limit_up_timestamp,
        #                                                                    ignore_open_limit_up=False)
        # if history_index - current_index > 1:
        #     return False, f"前排只允许一个炸板:炸板个数-{history_index - current_index}"
        return True, f"处于首板老{history_index + 1}"
    @classmethod