Administrator
2025-03-21 2f4bf81b042d24d5fbbd2fa7ec3672a06b211264
根据板块成分股来获取新题材的成分股/成交数据处理优化
6个文件已修改
171 ■■■■ 已修改文件
l2/l2_transaction_data_manager.py 58 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
servers/data_server.py 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_api.py 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_constant.py 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/radical_buy_data_manager.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_manager.py
@@ -467,6 +467,22 @@
        use_time = time.time() - __start_time
        __start_time = time.time()
        use_time_list.append(("处理涨停卖", use_time))
        latest_sell_order = cls.__latest_sell_order_dict.get(code)
        big_sell_order_ids = cls.__big_sell_order_ids_dict.get(code)
        if big_sell_order_ids is None:
            big_sell_order_ids = set()
        big_sell_order_info = cls.__big_sell_order_info_dict.get(code)
        if big_sell_order_info is None:
            big_sell_order_info = {}
        big_sell_order_info_list = cls.__big_sell_order_info_list_dict.get(code)
        if big_sell_order_info_list is None:
            big_sell_order_info_list = []
        latest_all_sell_orders = cls.__latest_all_sell_orders_dict.get(code)
        if latest_all_sell_orders is None:
            latest_all_sell_orders = []
        last_trade_data = cls.__last_trade_data_dict.get(code)
        for d in fdatas:
            # 获取当前是否为主动买
            if d[1]:
@@ -478,22 +494,20 @@
                    L2TradeSingleDataProcessor.add_active_limit_up_sell_data(d[0])
                # 判断是否是涨停被动变主动
                last_trade_data = cls.__last_trade_data_dict.get(code)
                if last_trade_data and last_trade_data[1] and last_trade_data[2]:
                    if d[2]:
                        # 涨停被动变主动
                        L2TradeSingleDataManager.set_sell_passive_to_active_datas(code, last_trade_data[0], d[0])
                # cls.__latest_sell_order_info_list_dict[code].append(d)
                if code not in cls.__latest_sell_order_dict:
                    cls.__latest_sell_order_dict[code] = [d[0][7], d[0][2], d[0][1], (d[0][3], d[0][6]),
                                                          (d[0][3], d[0][6])]
                if latest_sell_order is None:
                    latest_sell_order = [d[0][7], d[0][2], d[0][1], (d[0][3], d[0][6]), (d[0][3], d[0][6])]
                else:
                    if cls.__latest_sell_order_dict[code][0] == d[0][7]:
                        cls.__latest_sell_order_dict[code][1] += d[0][2]
                        cls.__latest_sell_order_dict[code][2] = d[0][1]
                        cls.__latest_sell_order_dict[code][4] = (d[0][3], d[0][6])
                    if latest_sell_order[0] == d[0][7]:
                        latest_sell_order[1] += d[0][2]
                        latest_sell_order[2] = d[0][1]
                        latest_sell_order[4] = (d[0][3], d[0][6])
                    else:
                        info = cls.__latest_sell_order_dict[code]
                        info = latest_sell_order
                        # 上个卖单成交完成
                        # 封存数据,计算新起点
@@ -501,22 +515,34 @@
                        # 大于50w加入卖单
                        money = info[1] * info[2]
                        if money >= 500000:
                            cls.__big_sell_order_ids_dict[code].add(info[0])
                            cls.__big_sell_order_info_dict[code][info[0]] = info
                            cls.__big_sell_order_info_list_dict[code].append(info)
                            big_sell_order_ids.add(info[0])
                            big_sell_order_info[info[0]] = info
                            big_sell_order_info_list.append(info)
                        # 只保留10w以上的单
                        if money > 100000:
                            cls.__latest_all_sell_orders_dict[code].append(info)
                            latest_all_sell_orders.append(info)
                            l2_log.info(code, hx_logger_l2_transaction_sell_order,
                                        f"{info}")
                        if limit_up_price == info[2]:
                            # 将涨停主动卖记入日志
                            l2_log.info(code, hx_logger_l2_active_sell, f"{info}")
                        cls.__latest_sell_order_dict[code] = [d[0][7], d[0][2], d[0][1], (d[0][3], d[0][6]),
                                                              (d[0][3], d[0][6])]
                        latest_sell_order = [d[0][7], d[0][2], d[0][1], (d[0][3], d[0][6]),
                                             (d[0][3], d[0][6])]
            finally:
                cls.__last_trade_data_dict[code] = d
                last_trade_data = d
        cls.__last_trade_data_dict[code] = last_trade_data
        if latest_sell_order:
            cls.__latest_sell_order_dict[code] = latest_sell_order
        if big_sell_order_ids:
            cls.__big_sell_order_ids_dict[code] = big_sell_order_ids
        if  big_sell_order_info:
            cls.__big_sell_order_info_dict[code] = big_sell_order_info
        if  big_sell_order_info_list:
            cls.__big_sell_order_info_list_dict[code] = big_sell_order_info_list
        if  latest_all_sell_orders:
            cls.__latest_all_sell_orders_dict[code] = latest_all_sell_orders
        use_time = time.time() - __start_time
        __start_time = time.time()
servers/data_server.py
@@ -62,6 +62,8 @@
    __industry_cache_dict = {}
    __latest_limit_up_codes_set = set()
    __data_process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
    # 新题材请求
    __new_blocks_codes_request_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    # 代码的涨幅
    __code_limit_rate_dict = {}
@@ -1017,7 +1019,37 @@
            self.__send_response(result_str)
    def __process_kpl_data(self, data_origin):
        def do_limit_up(result_list_):
            def request_new_blocks_codes(blocks_info):
                """
                请求新板块的代码
                @param blocks_info:[(板块名称,板块代码)]
                @return:
                """
                yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes()
                for bi in blocks_info:
                    result = kpl_api.getCodesByPlate(bi[1])
                    result = json.loads(result)
                    code_info_list = []
                    for d in result["list"]:
                        if d[0] in yesterday_codes:
                            continue
                        # 涨幅要大于5%
                        rate = d[6] / int(round((tool.get_limit_up_rate(d[0]) - 1) * 10))
                        if rate < 5:
                            continue
                        # 格式:(代码,涨幅)
                        code_info_list.append((d[0], d[6]))
                    if code_info_list:
                        # 将代码加入新题材
                        for x in code_info_list:
                            add_result = LimitUpCodesBlockRecordManager().add_new_blocks(x[0], bi[0])
                            if add_result:
                                # 增加新题材是否成功, 临时将票加入辨识度
                                BlockSpecialCodesManager().add_code_block_for_temp(x[0], bi[0])
            try:
                if result_list_:
                    # 保存涨停时间
@@ -1094,8 +1126,11 @@
                        records = KPLLimitUpDataRecordManager.total_datas
                        # 计算今日新增的题材概念
                        block_codes = {}
                        # 统计板块的代码
                        block_plate_code_dict = {}
                        for x in records:
                            bs = {kpl_util.filter_block(x[2])}
                            block_plate_code_dict[kpl_util.filter_block(x[2])] = x[15]
                            if x[6]:
                                bs |= set(x[6].split("、"))
                            for b in bs:
@@ -1106,6 +1141,8 @@
                        reasons -= constant.KPL_INVALID_BLOCKS
                        reasons -= LimitUpCodesBlockRecordManager().get_total_before_blocks()
                        if reasons:
                            # 新板块
                            update_new_block_plates = []
                            for r in reasons:
                                for c in block_codes[r]:
                                    add_result = LimitUpCodesBlockRecordManager().add_new_blocks(c, r)
@@ -1113,6 +1150,13 @@
                                        # 增加新题材是否成功, 临时将票加入辨识度
                                        BlockSpecialCodesManager().add_code_block_for_temp(c, r)
                            for r in reasons:
                                if r in block_plate_code_dict:
                                    update_new_block_plates.append((r, block_plate_code_dict[r]))
                            if update_new_block_plates:
                                # 需要获取板块下的代码
                                self.__new_blocks_codes_request_thread_pool.submit(
                                    lambda: request_new_blocks_codes(update_new_block_plates))
                    except:
                        pass
                    self.__kplDataManager.save_data(type_, result_list_)
third_data/kpl_api.py
@@ -5,7 +5,7 @@
import constant
from third_data import kpl_util
from utils import middle_api_protocol
from utils import middle_api_protocol, tool
# 竞价
DABAN_TYPE_BIDDING = 8
@@ -299,6 +299,37 @@
    return int(data["info"]["strong"])
def request_new_blocks_codes(blocks_info):
    """
    请求新板块的代码
    @param blocks_info:[(板块名称,板块代码)]
    @return:
    """
    yesterday_codes = set()
    for bi in blocks_info:
        result = getCodesByPlate(bi[1])
        result = json.loads(result)
        code_info_list = []
        for d in result["list"]:
            if d[0] in yesterday_codes:
                continue
            # 涨幅要大于5%
            rate = d[6] / int(round((tool.get_limit_up_rate(d[0]) - 1) * 10))
            if rate < 5:
                continue
            # 格式:(代码,涨幅)
            code_info_list.append((d[0], d[6]))
        if code_info_list:
            # 将代码加入新题材
            for x in code_info_list:
                print("添加", x)
if __name__ == "__main__":
    result =getHistoryLimitUpInfo("2024-02-19")
    print(result)
    request_new_blocks_codes([("机器人", "801159")])
    # result = getCodesByPlate("801159")  # getHistoryLimitUpInfo("2024-02-19")
    # result = json.loads(result)
    # for d in result["list"]:
    #     print(d)
    #
    # print(result)
third_data/kpl_data_constant.py
@@ -8,7 +8,7 @@
from third_data.third_blocks_manager import BlockMapManager
from trade import trade_record_log_util
from utils import tool
from utils import tool, global_util
from utils.kpl_data_db_util import KPLLimitUpDataUtil
# 用于计算激进买开1的板数:{"代码":(几版,{板块})}
@@ -217,6 +217,11 @@
        @param block:
        @return: 返回增加新题材是否成功
        """
        # 自由流通股本要大于50亿
        zyltgb = global_util.zyltgb_map.get(code)
        if not zyltgb or zyltgb < 50e8:
            return False
        if block in constant.KPL_INVALID_BLOCKS:
            return False
        old_blocks = self.__radical_buy_reasons_dict.get(code)
@@ -245,6 +250,17 @@
        """
        return self.__new_blocks
    def is_new_block(self, block):
        """
        是否是新题材
        @param block:
        @return:
        """
        if self.__new_blocks and block in self.__new_blocks:
             return True
        return False
class TodayLimitUpReasonChangeManager:
    """
third_data/kpl_data_manager.py
@@ -148,7 +148,7 @@
            result = mysqldb.select_one("select * from kpl_limit_up_record where _id='{}'".format(_id))
            if not result:
                mysqldb.execute(
                    f"insert into kpl_limit_up_record(_id,_day,_hot_block_name,_code,_code_name,_limit_up_time,_blocks,_latest_limit_up_time,_update_time,_create_time,_hot_block_code_count,_limit_up_high_info,_zylt_val) values('{_id}','{day}','{d[5]}','{d[0]}','{d[1]}','{d[2]}','{d[6]}','{d[3]}',now(),now(),{d[10]},'{d[4]}',{d[7]})")
                    f"insert into kpl_limit_up_record(_id,_day,_hot_block_name,_code,_code_name,_limit_up_time,_blocks,_latest_limit_up_time,_update_time,_create_time,_hot_block_code_count,_limit_up_high_info,_zylt_val,_hot_block_code) values('{_id}','{day}','{d[5]}','{d[0]}','{d[1]}','{d[2]}','{d[6]}','{d[3]}',now(),now(),{d[10]},'{d[4]}',{d[7]},{d[9]})")
                cls.__load_hist_and_blocks(code)
            else:
                if _id in cls.latest_datas and json.dumps(cls.latest_datas.get(_id)) != json.dumps(d):
trade/buy_radical/radical_buy_data_manager.py
@@ -805,8 +805,7 @@
        @return:
        """
        new_blocks = LimitUpCodesBlockRecordManager().get_new_blocks()
        if not new_blocks or block not in new_blocks:
        if not LimitUpCodesBlockRecordManager().is_new_block(block):
            return False, "非新题材"
        # 9:45点之前涨停的才能买入
@@ -992,8 +991,7 @@
        @return:
        """
        new_blocks = LimitUpCodesBlockRecordManager().get_new_blocks()
        if not new_blocks or block not in new_blocks:
        if not LimitUpCodesBlockRecordManager().is_new_block(block):
            return False, "非新题材"
        # 开始买的身位 2:从老三开始买  1: 从老二开始买
@@ -1123,8 +1121,8 @@
        @param block:
        @return:
        """
        new_blocks = LimitUpCodesBlockRecordManager().get_new_blocks()
        if not new_blocks or block not in new_blocks:
        if not LimitUpCodesBlockRecordManager().is_new_block(block):
            return False, "非新题材"
        # 获取身位
@@ -1253,7 +1251,7 @@
        if jx_in_blocks and block in jx_in_blocks:
            return True, False
        if block in LimitUpCodesBlockRecordManager().get_new_blocks():
        if block in LimitUpCodesBlockRecordManager().is_new_block(block):
            # 今日新板块不考虑净流入
            return True, False