Administrator
2024-07-31 75304450cd47c427ade3ad40a7556fc8328a3f29
板块轮动统计/G撤比例修改
6个文件已修改
1个文件已添加
291 ■■■■■ 已修改文件
constant.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/data_server.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_block.py 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 207 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_limit_up_data_manager.py 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -159,7 +159,7 @@
F_CANCEL_CACEL_RATE = 0.69
# G撤单
G_CANCEL_RATE = 0.69
G_CANCEL_RATE = 0.79
# G撤加红
G_CANCEL_RATE_WITH_MUST_BUY = 0.9
servers/data_server.py
@@ -16,7 +16,7 @@
from cancel_strategy.s_l_h_cancel_strategy import HourCancelBigNumComputer, LCancelRateManager
from output.limit_up_data_filter import IgnoreCodeManager
from third_data import kpl_util, kpl_data_manager, kpl_api, block_info
from third_data.code_plate_key_manager import RealTimeKplMarketData, KPLPlateForbiddenManager
from third_data.code_plate_key_manager import RealTimeKplMarketData, KPLPlateForbiddenManager, LatestLimitUpBlockManager
from third_data.history_k_data_util import HistoryKDatasUtils
from third_data.kpl_data_manager import KPLDataManager, KPLLimitUpDataRecordManager, \
    KPLCodeLimitUpReasonManager
@@ -697,6 +697,10 @@
            msg_list.reverse()
            msg_list = [f"{msg.split('|')[0]}{msg.split('|')[-1].split('-')[1].strip()}" for msg in msg_list]
            response_data = json.dumps({"code": 0, "data": msg_list})
        elif url.path == "/statistic_latest_limit_up_block":
            # 统计最近的涨停板块
            datas = LatestLimitUpBlockManager().statistics_limit_up_block_infos()
            response_data = json.dumps({"code": 0, "data": datas})
        async_log_util.info(logger_request_api, f"结束请求{tool.get_thread_id()}-{url}")
        self.send_response(200)
test/l2_trade_test.py
@@ -233,6 +233,9 @@
        l2.l2_data_util.local_today_datas[code] = total_datas
        l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, total_datas, True)
    # @unittest.skip("跳过此单元测试")
    def test_block(self):
        code = "001376"
test/test.py
@@ -1,4 +1,6 @@
from settings.trade_setting import TradeBlockBuyModeManager
from third_data.history_k_data_util import HistoryKDatasUtils
from utils import tool
def test_active_buy():
@@ -18,11 +20,6 @@
if __name__ == "__main__":
    block_codes = {"000333"}
    print(block_codes | {"000222"})
    print(TradeBlockBuyModeManager().can_buy_unique_block(), bin(TradeBlockBuyModeManager().get_mode()))
    print(TradeBlockBuyModeManager().add_unique_block())
    print(TradeBlockBuyModeManager().can_buy_unique_block(), bin(TradeBlockBuyModeManager().get_mode()))
    print(TradeBlockBuyModeManager().remove_unique_block())
    print(TradeBlockBuyModeManager().can_buy_unique_block(), bin(TradeBlockBuyModeManager().get_mode()))
    __days = HistoryKDatasUtils.get_latest_trading_date_cache(6)
    __days.insert(0, tool.get_now_date_str())
    print(__days)
test/test_block.py
@@ -1,11 +1,21 @@
from db.redis_manager_delegate import RedisUtils
from third_data import code_plate_key_manager
from third_data import  kpl_data_manager, kpl_util
from third_data.code_plate_key_manager import LatestLimitUpBlockManager
from utils import tool
def block_run():
    current_datas = kpl_data_manager.KPLDataManager.get_data(kpl_util.KPLDataType.LIMIT_UP)
    kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(),current_datas)
    LatestLimitUpBlockManager().set_current_limit_up_data(tool.get_now_date_str(), current_datas)
    LatestLimitUpBlockManager().statistics_limit_up_block_infos()
if __name__ == "__main__":
    print(code_plate_key_manager.ForbiddenBlockManager().get_blocks())
    code_plate_key_manager.ForbiddenBlockManager().add("测试2")
    code_plate_key_manager.ForbiddenBlockManager().add("测试3")
    print(code_plate_key_manager.ForbiddenBlockManager().get_blocks())
    print( code_plate_key_manager.ForbiddenBlockManager().is_in("测试"))
    print(code_plate_key_manager.ForbiddenBlockManager().is_in("测试1"))
    RedisUtils.run_loop()
    block_run()
    # print(code_plate_key_manager.ForbiddenBlockManager().get_blocks())
    # code_plate_key_manager.ForbiddenBlockManager().add("测试2")
    # code_plate_key_manager.ForbiddenBlockManager().add("测试3")
    # print(code_plate_key_manager.ForbiddenBlockManager().get_blocks())
    # print( code_plate_key_manager.ForbiddenBlockManager().is_in("测试"))
    # print(code_plate_key_manager.ForbiddenBlockManager().is_in("测试1"))
    # RedisUtils.run_loop()
third_data/code_plate_key_manager.py
@@ -10,9 +10,11 @@
import constant
from code_attribute import code_nature_analyse
from db.redis_manager_delegate import RedisUtils
from third_data import kpl_block_util, kpl_api, kpl_util
from third_data import kpl_block_util, kpl_api, kpl_util, kpl_limit_up_data_manager
from settings.trade_setting import MarketSituationManager
from utils import global_util, tool, buy_condition_util
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.history_k_data_util import HistoryKDatasUtils
from utils import global_util, tool, buy_condition_util, init_data_util
from log_module import log, async_log_util
from db import redis_manager_delegate as redis_manager
@@ -1016,5 +1018,206 @@
            can_buy_blocks, unique, msg, can_buy_strong_blocks, keys, active_buy_blocks)
class LatestLimitUpBlockManager:
    """
    最近涨停的板块管理
    """
    # 看最近7天
    __LATEST_DAY_COUNT = 7
    __days = []
    # 目前涨停
    __current_limit_up_day_datas = {}
    # 曾涨停
    __history_limit_up_day_datas = {}
    # K线数据
    __k_datas = {}
    # 代码的最高涨幅
    __k_max_rate = {}
    __code_name_dict = {}
    # 统计板块数据:{"day":{"板块":[(涨停数,破板数, 代码集合)]}}
    __block_day_datas = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(LatestLimitUpBlockManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __load_datas(cls):
        # 加载最近7天的数据
        __days = HistoryKDatasUtils.get_latest_trading_date_cache(cls.__LATEST_DAY_COUNT - 1)
        now_day = tool.get_now_date_str()
        if __days[0] != now_day:
            __days.insert(0, now_day)
        cls.__days = __days
        # 加载之前6天的涨停,曾涨停,曾涨停代码的最近6天的K线
        for day in __days:
            if day == now_day:
                continue
            limit_up_records = kpl_limit_up_data_manager.get_history_limit_up_datas(day)
            cls.__history_limit_up_day_datas[day] = limit_up_records
            current_limit_up_datas = kpl_limit_up_data_manager.get_current_limit_up_datas(day)
            cls.__current_limit_up_day_datas[day] = current_limit_up_datas
        # 获取代码的k线
        __total_codes = set()
        for d in cls.__current_limit_up_day_datas:
            __total_codes |= set([dd[3] for dd in cls.__history_limit_up_day_datas[d]])
        # 获取最近7天的k线情况
        for code in __total_codes:
            cls.__get_bars(code)
        # 统计前6天的板块信息
        for day in __days:
            if day == now_day:
                continue
            cls.__block_day_datas[day] = cls.__statistics_limit_up_block_infos_by_day(day)
    def set_current_limit_up_data(self, day, datas):
        self.__current_limit_up_day_datas[day] = datas
        self.__history_limit_up_day_datas[day] = kpl_limit_up_data_manager.get_today_history_limit_up_datas_cache()
        # 加载代码K线数据
        __total_codes = set([d[0] for d in datas])
        __total_codes |= set([d[3] for d in self.__history_limit_up_day_datas[day]])
        for code in __total_codes:
            self.__get_bars(code)
    @classmethod
    def __statistics_limit_up_block_infos_by_day(cls, day):
        """
        统计涨停代码信息
        @return:
        """
        # 统计板块的
        current_code_dict = {d[0]: d for d in cls.__current_limit_up_day_datas[day]}
        # history_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas[day]}
        block_codes_dict = {}
        for h in cls.__history_limit_up_day_datas[day]:
            cls.__code_name_dict[h[3]] = h[4]
            if h[2] not in block_codes_dict:
                block_codes_dict[h[2]] = set()
            block_codes_dict[h[2]].add(h[3])
        fdata = {}
        for b in block_codes_dict:
            limit_up_count = 0
            open_limit_up_count = 0
            for code in block_codes_dict[b]:
                if code in current_code_dict:
                    limit_up_count += 1
                else:
                    open_limit_up_count += 1
            fdata[b] = (limit_up_count, open_limit_up_count, block_codes_dict[b])
        return fdata
    def statistics_limit_up_block_infos(self):
        """
        统计涨停板块数据
        @return:
        """
        # 只统计今天的数据
        now_day = tool.get_now_date_str()
        block_dict = self.__statistics_limit_up_block_infos_by_day(now_day)
        self.__block_day_datas[now_day] = block_dict
        # 板块出现的天数
        block_count_dict = {}
        # 板块出现的代码
        block_codes_dict = {}
        for day in self.__block_day_datas:
            for b in self.__block_day_datas[day]:
                if b not in block_count_dict:
                    block_count_dict[b] = set()
                if b not in block_codes_dict:
                    block_codes_dict[b] = set()
                block_count_dict[b].add(day)
                block_codes_dict[b] |= self.__block_day_datas[day][b][2]
        block_count_list = [(k, block_count_dict[k]) for k in block_count_dict]
        block_count_list.sort(key=lambda x: x[1], reverse=True)
        block_count_list = block_count_list[:20]
        # [(涨停原因,累计涨停次数,连续次数)]
        fdatas = []
        today_records_code_dict = {d[3]: d for d in self.__history_limit_up_day_datas.get(now_day)}
        for d in block_count_list:
            b = d[0]
            fdata = [d[0], len(d[1])]
            temp = []
            max_continue_count = 0
            for day in self.__days:
                if d[0] in self.__block_day_datas[day]:
                    temp.append(day)
                else:
                    c = len(temp)
                    if c > max_continue_count:
                        max_continue_count = c
                    temp.clear()
            c = len(temp)
            if c > max_continue_count:
                max_continue_count = c
            temp.clear()
            # 最大连续次数
            fdata.append(max_continue_count)
            # 最高板
            max_rate_info = None
            for code in block_codes_dict[d[0]]:
                if max_rate_info is None:
                    max_rate_info = (code, self.__k_max_rate.get(code),  self.__code_name_dict.get(code))
                if max_rate_info[1] < self.__k_max_rate.get(code):
                    max_rate_info = (code, self.__k_max_rate.get(code), self.__code_name_dict.get(code))
            fdata.append(max_rate_info)
            # 统计今天这个板块中大于二板的代码数量
            limit_up_counts = 0
            for code in block_codes_dict[d[0]]:
                if code in today_records_code_dict and today_records_code_dict[code][12] != '首板':
                    limit_up_counts += 1
            fdata.append(limit_up_counts)
            # 获取每天的数量
            days_datas = []
            for day in self.__days:
                binfo = self.__block_day_datas[day].get(b)
                if not binfo:
                    days_datas.append((0, 0))
                else:
                    days_datas.append((binfo[0], binfo[1]))
            fdata.append(days_datas)
            fdatas.append(fdata)
        return fdatas
    @classmethod
    def __get_bars(cls, code):
        """
        获取K线
        @param code:
        @return:
        """
        if code in cls.__k_datas:
            return cls.__k_datas[code]
        volumes_data = None
        if cls.__days:
            volumes_data = HistoryKDataManager().get_history_bars(code, cls.__days[1])
            if volumes_data:
                volumes_data = volumes_data[:cls.__LATEST_DAY_COUNT - 1]
                cls.__k_datas[code] = volumes_data
        if not volumes_data:
            volumes_data = init_data_util.get_volumns_by_code(code, cls.__LATEST_DAY_COUNT - 1)
            if volumes_data:
                cls.__k_datas[code] = volumes_data
        # 获取最大涨幅
        min_price = volumes_data[-1]["low"]
        for d in volumes_data:
            if min_price > d["low"]:
                min_price = d["low"]
        rate = int((volumes_data[0]["close"] - min_price) * 100 / min_price)
        cls.__k_max_rate[code] = rate
        return cls.__k_datas.get(code)
if __name__ == "__main__":
    pass
third_data/kpl_limit_up_data_manager.py
New file
@@ -0,0 +1,32 @@
"""
开盘啦涨停数据管理
"""
from third_data import kpl_util, kpl_data_manager
def get_current_limit_up_datas(day):
    """
    获取当时涨停的数据
    @param day:
    @return:
    """
    datas = kpl_data_manager.KPLDataManager.get_from_file_cache(kpl_util.KPLDataType.LIMIT_UP, day)
    return datas
def get_history_limit_up_datas(day):
    """
    获取曾涨停的代码数据
    @param day:
    @return:
    """
    limit_up_records = kpl_data_manager.KPLLimitUpDataRecordManager.list_all(day)
    return limit_up_records
def get_today_history_limit_up_datas_cache():
    """
    获取今天的历史涨停数据
    @return:
    """
    return kpl_data_manager.KPLLimitUpDataRecordManager.total_datas