15个文件已修改
3个文件已添加
750 ■■■■■ 已修改文件
api/outside_api_callback.py 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 49 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/data_server.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/local_data_server.py 168 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/back_test.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/data_analyzer.py 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/env_info.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/low_suction_strategy.py 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_manager.py 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_params_settings.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_script_v6.py 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_script_v7.py 167 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_variable_factory.py 43 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/test.py 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/time_series_backtest.py 36 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_api.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_block_manager.py 55 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/outside_api_callback.py
@@ -1,13 +1,18 @@
import json
import logging
import os
import threading
import constant
from api.outside_api_command_manager import ActionCallback
from huaxin_client import l1_subscript_codes_manager
from huaxin_client.client_network import SendResponseSkManager
from strategy import strategy_params_settings, env_info
from strategy import strategy_params_settings, env_info, strategy_manager
from strategy.env_info import RealTimeEnvInfo
from strategy.strategy_variable_factory import DataLoader
from third_data.history_k_data_manager import TradeDateManager
from third_data.kpl_block_manager import KPLCodeJXBlocksManager
from utils import socket_util, middle_api_protocol, tool
@@ -69,6 +74,7 @@
        fdata["real_time_data"] = RealTimeEnvInfo().to_dict()
        # 历史数据
        fdata["history_data"] = {}
        print("获取环境", os.getpid())
        if need_hsitory_data:
            if tool.get_now_time_str() < '16:00:00':
                # 如果在16:00之前采用当前日期
@@ -87,6 +93,11 @@
                # 如果在16:00之后采用下一个交易日
                day = tool.get_now_date_str()
            fdata["history_data"]["k_bars_count"] = env_info.get_history_k_bars(day)
            day = tool.get_now_date_str()
            fdata["history_data"]["kpl_code_jx_blocks_count"] = env_info.get_kpl_code_jx_blocks(day)
        return {"code": 0, "data": fdata, "msg": "测试结果"}
    def __on_update_leading_limit_up_datas(self):
@@ -111,6 +122,30 @@
        threading.Thread(target=lambda: update(), daemon=True).start()
        return {"code": 0}
    def __on_update_kpl_code_jx_blocks_datas(self):
        """
        更新开盘啦精选板块数据
        @return:
        """
        def update():
            codes = set()
            codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
            codes |= set([x.decode() for x in codes_sh])
            codes |= set([x.decode() for x in codes_sz])
            KPLCodeJXBlocksManager(day, codes).start_download_blocks()
            # 如果在16:00之前采用当前日期
        day = tool.get_now_date_str()
        threading.Thread(target=lambda: update(), daemon=True).start()
        return {"code": 0}
    def __on_init_data(self):
        try:
            strategy_manager.low_suction_strtegy.load_data()
            return {"code": 0}
        except Exception as e:
            return {"code": 1, "msg": str(e)}
    def OnCommonRequest(self, client_id, request_id, data):
        ctype = data["ctype"]
        result_json = {}
@@ -120,7 +155,15 @@
            del data["ctype"]
            result_json = self.__on_set_settings(data)
        elif ctype == 'get_env':
            # 获取环境数据
            result_json = self.__on_get_env(data.get("history"))
        elif ctype == 'update_leading_limit_up_datas':
            # 更新领涨数据
            result_json = self.__on_update_leading_limit_up_datas()
        elif ctype == 'update_kpl_code_jx_blocks_datas':
            # 更新开盘啦精选数据
            result_json = self.__on_update_kpl_code_jx_blocks_datas()
        elif ctype == 'init_datas':
            # 初始化数据
            result_json = self.__on_init_data()
        self.send_response(result_json, client_id, request_id)
log_module/log.py
@@ -63,6 +63,9 @@
        logger.add(self.get_path("system", "system"), filter=lambda record: record["extra"].get("name") == "system",
                   rotation="00:00", compression="zip", enqueue=True)
        # 显示在控制台
        logger.add(sys.stdout,
                   filter=lambda record: record["extra"].get("name") == "system", enqueue=True)
        logger.add(self.get_path("ths", "buy_1_volumn"),
                   filter=lambda record: record["extra"].get("name") == "buy_1_volumn",
@@ -186,19 +189,20 @@
                   rotation="00:00", compression="zip", enqueue=True)
    def get_path(self, dir_name, log_name):
        path_str = "{}/ls_logs/gp/{}/{}".format(constant.get_path_prefix(), dir_name, log_name) + ".{time:YYYY-MM-DD}.log"
        path_str = "{}/ls_logs/gp/{}/{}".format(constant.get_path_prefix(), dir_name,
                                                log_name) + ".{time:YYYY-MM-DD}.log"
        # print(path_str)
        return path_str
    def get_hx_path(self, dir_name, log_name):
        path_str = "{}/ls_logs/huaxin/{}/{}".format(constant.get_path_prefix(), dir_name,
                                                 log_name) + ".{time:YYYY-MM-DD}.log"
                                                    log_name) + ".{time:YYYY-MM-DD}.log"
        # print(path_str)
        return path_str
    def get_local_huaxin_path(self, dir_name, log_name):
        path_str = "{}/ls_logs/huaxin_local/{}/{}".format(constant.get_path_prefix(), dir_name,
                                                       log_name) + ".{time:YYYY-MM-DD}.log"
                                                          log_name) + ".{time:YYYY-MM-DD}.log"
        # print(path_str)
        return path_str
main.py
@@ -4,17 +4,19 @@
import time
import requests
import schedule
from api import outside_api_callback
from api.outside_api_command_manager import ApiCommandManager
from db.redis_manager_delegate import RedisUtils
from huaxin_client import l2_market_client, trade_client
from huaxin_client import l2_market_client, trade_client, l1_subscript_codes_manager
from log_module import async_log_util
from log_module.log import logger_debug
from log_module.log import logger_debug, logger_system
from server import data_server
from strategy import strategy_manager
from strategy.env_info import RealTimeEnvInfo
from third_data import hx_qc_value_util
from third_data.kpl_block_manager import KPLCodeJXBlocksManager
from trade.huaxin import huaxin_trade_api
from utils import tool, middle_api_protocol
@@ -27,7 +29,7 @@
                if data.get("type") == 'set_target_codes':
                    # [(代码, 时间戳, 价格, 总交易量, 总交易额, 买5, 卖5)]
                    market_data_list = data["data"]["data"]
                    if  strategy_manager.low_suction_strtegy:
                    if strategy_manager.low_suction_strtegy:
                        strategy_manager.low_suction_strtegy.add_ticks(market_data_list)
                    RealTimeEnvInfo().ticks = (tool.get_now_time_str(), len(market_data_list))
            except Exception as e:
@@ -39,6 +41,32 @@
                                              args=(queue_l1_w_strategy_r,))
    l2MarketProcess.start()
    read_results()
def __init():
    """
    初始化
    @return:
    """
    # 定时更新代码精选板块
    def run_pending():
        # 更新今日代码精选板块
        codes = set()
        codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
        codes |= set([x.decode() for x in codes_sh])
        codes |= set([x.decode() for x in codes_sz])
        day = tool.get_now_date_str()
        schedule.every().day.at("08:05:00").do(lambda: KPLCodeJXBlocksManager(day, codes).start_download_blocks())
        while True:
            try:
                schedule.run_pending()
            except:
                pass
            finally:
                time.sleep(1)
    threading.Thread(target=run_pending, daemon=True).start()
def test():
@@ -66,9 +94,9 @@
    threading.Thread(target=lambda: RedisUtils.run_loop(), daemon=True).start()
    # --------启动本地API接口----------
    manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT, outside_api_callback.MyAPICallback())
    manager = ApiCommandManager(middle_api_protocol.SERVER_HOST, middle_api_protocol.SERVER_PORT,
                                outside_api_callback.MyAPICallback())
    manager.run(blocking=False)
    # --------启动交易----------
    huaxin_trade_api.run()
@@ -76,9 +104,18 @@
    threading.Thread(target=test, daemon=True).start()
    # test()
    # ----初始化------------
    __init()
    # 初始化数据
    strategy_manager.low_suction_strtegy = strategy_manager.LowSuctionStrategy(tool.get_now_date_str())
    logger_system.info("初始化策略对象成功")
    try:
        strategy_manager.low_suction_strtegy.load_data()
        logger_system.info("加载策略数据成功")
    except Exception as e:
        logger_system.error(f"加载策略数据失败:{str(e)}")
        logger_system.exception(e)
    # -------启动L2 market订阅------
    __run_l2_market_subscript()
    print("启动完成")
    logger_system.info("系统结束")
server/data_server.py
@@ -1,11 +1,13 @@
import http
import json
import logging
import os
import socketserver
from http.server import BaseHTTPRequestHandler
from log_module.log import logger_system, logger_debug, logger_request_api
from strategy import strategy_manager
from strategy.env_info import RealTimeEnvInfo
from utils import tool
from log_module import async_log_util
@@ -43,16 +45,21 @@
                params = self.__parse_request()
                strategy_manager.low_suction_strtegy.add_big_orders(params)
                # logger_debug.info("upload_big_order_datas:{}", f"{params}")
                RealTimeEnvInfo().big_order_update_time = tool.get_now_time_str()
                print("获取到大单", os.getpid())
                result_str = json.dumps({"code": 0})
            elif url.path == "/upload_block_in_datas":
                # 接收板块流入数据
                params = self.__parse_request()
                strategy_manager.low_suction_strtegy.add_block_in(params)
                # logger_debug.info("upload_block_in_datas:{}", f"{params}")
                RealTimeEnvInfo().block_in = (tool.get_now_time_str(), len(params))
                result_str = json.dumps({"code": 0})
            elif url.path == "/upload_limit_up_list":
                params = self.__parse_request()
                strategy_manager.low_suction_strtegy.add_limit_up_list(params)
                # logger_debug.info("upload_limit_up_list:{}", f"{params}")
                RealTimeEnvInfo().kpl_current_limit_up = (tool.get_now_time_str(), len(params))
                result_str = json.dumps({"code": 0})
            else:
                pass
server/local_data_server.py
New file
@@ -0,0 +1,168 @@
"""
本地http服务器
"""
import json
from http.server import SimpleHTTPRequestHandler, HTTPServer
import urllib.parse as urlparse
from urllib.parse import parse_qs
import constant
from db.mysql_data_delegate import Mysqldb
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
from strategy.strategy_variable_factory import DataLoader
from third_data import kpl_util
from utils import tool, output_util, huaxin_util
class CORSRequestHandler(SimpleHTTPRequestHandler):
    def end_headers(self):
        # 添加 CORS 头
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
        self.send_header('Access-Control-Allow-Headers', 'Content-Type')
        super().end_headers()
    # 处理 OPTIONS 预检请求
    def do_OPTIONS(self):
        self.send_response(200)
        self.end_headers()
    def do_GET(self) -> None:
        path = self.path
        url = urlparse.urlparse(path)
        ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
        response_data = ""
        if url.path == "/get_limit_up_plate_list":
            date = ps_dict.get("date")
            time_str = ps_dict.get("time")
            __LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(date)
            # [(代码, 代码名称, 涨停原因, 涨停时间, 高度信息, 自由流通市值,是否炸板)]
            results = __LowSuctionOriginDataExportManager.export_current_limit_up_records()
            results = [x for x in results if tool.to_time_str(int(x[3])) <= time_str]
            code_plates = __LowSuctionOriginDataExportManager.export_code_plates()
            # 最终涨停的代码
            limit_up_codes = [x[0] for x in results if not x[6]]
            if date == tool.get_now_date_str():
                limit_up_list = __LowSuctionOriginDataExportManager.export_limit_up_list()
                if limit_up_list:
                    limit_up_list = [x for x in limit_up_list if x[0][:8] <= time_str]
                    limit_up_list = limit_up_list[-1][1]
                    limit_up_codes = [x[0] for x in limit_up_list]
            code_infos = {x[0]: x for x in results}
            plate_codes = {}
            for code in code_infos.keys():
                plates = code_plates.get(code)
                if not plates:
                    plates = {kpl_util.filter_block(code_infos[code][2])}
                plates -= constant.KPL_INVALID_BLOCKS
                for p in plates:
                    if p not in plate_codes:
                        plate_codes[p] = set()
                    plate_codes[p].add(code)
            # (板块名称, 涨停数,炸板数)
            data = [(p, len(plate_codes[p]), len([code for code in plate_codes[p] if code not in limit_up_codes])) for p
                    in
                    plate_codes]
            data.sort(key=lambda x: x[1] - x[2], reverse=True)
            response_data = json.dumps({"code": 0, "data": data})
        elif url.path == "/get_plate_codes":
            date = ps_dict.get("date")
            plate = ps_dict.get("plate")
            time_str = ps_dict.get("time")
            __LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(date)
            results = __LowSuctionOriginDataExportManager.export_current_limit_up_records()
            for r in results:
                r[3] = tool.to_time_str(int(r[3]))
                r[5] = output_util.money_desc(r[5])
            results = [x for x in results if x[3] <= time_str]
            # 最终涨停的代码
            limit_up_codes = [x[0] for x in results if not x[6]]
            if date == tool.get_now_date_str():
                limit_up_list = __LowSuctionOriginDataExportManager.export_limit_up_list()
                if limit_up_list:
                    limit_up_list = [x for x in limit_up_list if x[0][:8] <= time_str]
                    limit_up_list = limit_up_list[-1][1]
                    limit_up_codes = [x[0] for x in limit_up_list]
                for x in results:
                    x[6] = x[0] not in limit_up_codes
            # [(代码, 代码名称, 涨停原因, 涨停时间, 高度信息, 自由流通市值,是否炸板)]
            code_plates = __LowSuctionOriginDataExportManager.export_code_plates()
            code_infos = {x[0]: x for x in results}
            plate_codes = {}
            for code in code_infos.keys():
                plates = code_plates.get(code)
                if not plates:
                    plates = {kpl_util.filter_block(code_infos[code][2])}
                plates -= constant.KPL_INVALID_BLOCKS
                for p in plates:
                    if p not in plate_codes:
                        plate_codes[p] = set()
                    plate_codes[p].add(code)
            codes = plate_codes.get(plate)
            datas = [code_infos[code] for code in codes]
            datas.sort(key=lambda x: x[3])
            response_data = json.dumps({"code": 0, "data": datas})
        elif url.path == "/get_big_order_list":
            date = ps_dict.get("date")
            code = ps_dict.get("code")
            __LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(date)
            big_orders_dict = __LowSuctionOriginDataExportManager.export_all_big_order_deal(200e4)
            datas = big_orders_dict.get(code, [])
            for x in datas:
                x[2][3] = huaxin_util.convert_time(x[2][3], with_ms=False)
                if len(x[2]) > 5:
                    x[2][5] = huaxin_util.convert_time(x[2][5], with_ms=False)
            order_ids = set()
            datas.reverse()
            fdatas = []
            for d in datas:
                if d[2][0] in order_ids:
                    continue
                fdatas.append(d)
                order_ids.add(d[2][0])
            response_data = json.dumps({"code": 0, "data": fdatas})
        elif url.path == "/get_block_in_datas":
            date = ps_dict.get("date")
            time_str = ps_dict.get("time")
            __LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(date)
            block_in_datas = __LowSuctionOriginDataExportManager.export_block_in_datas()
            fdatas = []
            for d in reversed(block_in_datas):
                if d[0] <= time_str:
                    fdatas = d[1]
                    break
            fdatas = [(x[0], output_util.money_desc(x[1])) for x in fdatas]
            response_data = json.dumps({"code": 0, "data": fdatas})
        elif url.path == "/get_codes_by_jx_plates":
            # 根据精选板块获取代码
            plates = ps_dict.get("plates")
            date = ps_dict.get("date")
            plates = set(json.loads(plates))
            sql = f" select code, jx_blocks  from kpl_code_blocks where  day = '{date}' and " + " and ".join(
                [f"jx_blocks like '%{p}%'" for p in plates])
            datas = Mysqldb().select_all(sql)
            fdatas = [(x[0], "、".join([ f"<red>{dd}</red>" if dd in plates else dd for dd in json.loads(x[1])])) for x in datas if len(set(json.loads(x[1])) & plates) == len(plates)]
            response_data = json.dumps({"code": 0, "data": fdatas})
        print("GET请求")
        self.send_response(200)
        # 发给请求客户端的响应数据
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(response_data.encode())
    def do_POST(self) -> None:
        print("POST请求")
if __name__ == '__main__':
    server_address = ('', 8000)
    httpd = HTTPServer(server_address, CORSRequestHandler)
    print("Server running on http://localhost:8000")
    httpd.serve_forever()
strategy/back_test.py
@@ -9,6 +9,7 @@
from code_attribute import global_data_loader
from log_module.log import logger_debug
from strategy.data_analyzer import KTickLineAnalyzer
from strategy.data_downloader import DataDownloader
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
from strategy.strategy_variable_factory import DataLoader, StrategyVariableFactory
@@ -368,4 +369,6 @@
if __name__ == "__main__":
    __back_test2()
    data_loader = DataLoader("2025-06-10")
    kline_1d_dict = data_loader.load_kline_data()
    KTickLineAnalyzer.get_third_limit_up_days(kline_1d_dict.get("002907"), 10)
strategy/data_analyzer.py
@@ -272,6 +272,18 @@
        return count
    @classmethod
    def __is_limit_up(cls, code, close, pre_close):
        """
        是否涨停
        @param code:
        @param close:
        @param pre_close:
        @return:
        """
        return abs(close - cls.calculate_upper_limit_price(code,
                                                        pre_close)) < 0.01
    @classmethod
    def get_third_limit_up_days(cls, k_data, days):
        """
        获取近几个交易日的三板天数
@@ -280,18 +292,17 @@
        @return: 三板天数
        """
        count = 0
        k_data = k_data[:days]
        k_data = k_data[::-1]
        for i in range(days):
            if i + 3 >= len(k_data):
                continue
            # 判断连续三日涨停且第四日非涨停
            if (k_data[i]['close'] >= cls.calculate_upper_limit_price(k_data[i]["sec_id"], k_data[i]["pre_close"])) and \
                    (k_data[i + 1]['close'] >= cls.calculate_upper_limit_price(k_data[i + 1]["sec_id"],
                                                                               k_data[i + 1]["pre_close"])) and \
                    (k_data[i + 2]['close'] >= cls.calculate_upper_limit_price(k_data[i + 2]["sec_id"],
                                                                               k_data[i + 2]["pre_close"])) and \
                    (k_data[i + 3]['close'] < cls.calculate_upper_limit_price(k_data[i + 3]["sec_id"],
                                                                              k_data[i + 3]["pre_close"])):
                count += 1
            if cls.__is_limit_up(k_data[i]["sec_id"], k_data[i]['close'], k_data[i]["pre_close"]):
                if cls.__is_limit_up(k_data[i+1]["sec_id"], k_data[i+1]['close'], k_data[i+1]["pre_close"]):
                    if cls.__is_limit_up(k_data[i+2]["sec_id"], k_data[i+2]['close'], k_data[i+2]["pre_close"]):
                        if not cls.__is_limit_up(k_data[i+3]["sec_id"], k_data[i+3]['close'], k_data[i+3]["pre_close"]):
                            count += 1
        return count
    @classmethod
@@ -528,5 +539,5 @@
                        if reason not in block_days:
                            block_days[reason] = set()
                        block_days[reason].add(date)
            return set([b for b in block_days if len(block_days[b])==len(days_list)])
            return set([b for b in block_days if len(block_days[b]) == len(days_list)])
        return set()
strategy/env_info.py
@@ -5,6 +5,7 @@
from strategy.strategy_variable_factory import DataLoader
from third_data.history_k_data_manager import HistoryKDataManager
from third_data.kpl_block_manager import KPLCodeJXBlocksManager
from utils import tool
@@ -58,5 +59,16 @@
    return count
def get_kpl_code_jx_blocks(day):
    """
    获取开盘啦代码精选板块数量
    @param day:
    @return:
    """
    count = KPLCodeJXBlocksManager(day, set()).get_all_code_blocks_count()
    return count
if __name__ == "__main__":
    print(get_history_k_bars("2025-06-04"))
    RealTimeEnvInfo().kpl_current_limit_up = tool.get_now_time_str()
    print(RealTimeEnvInfo().to_dict())
strategy/low_suction_strategy.py
@@ -5,6 +5,7 @@
import os
import constant
from db.mysql_data_delegate import Mysqldb
from strategy import strategy_variable
@@ -191,6 +192,25 @@
                fdatas[data[0]].append(data[2])
        return fdatas
    def export_all_big_order_deal(self, min_money=299e4):
        """
        所有大单成交,包含买,卖
        @return: {"代码":[代码,卖/买, (买单号, 量, 金额, 时间, 最终成交价)]}
        """
        fdatas = {}
        lines = self.__export_logs(f"logs/huaxin_local/l2/transaction_accurate_big_order.{self.day}.log")
        if lines:
            for line in lines:
                line = line[line.find(" - ") + 3:].strip()
                data = eval(line)
                if data[2][2] < min_money:
                    continue
                if data[0] not in fdatas:
                    fdatas[data[0]] = []
                fdatas[data[0]].append(data)
        return fdatas
    def export_big_sell_order_deal(self, min_money=299e4):
        """
        大单成交
@@ -267,6 +287,15 @@
                data = eval(line)
        return data
    def export_current_limit_up_records(self):
        """
        导出当日历史涨停
        @return: [(代码, 代码名称, 涨停原因, 涨停时间, 高度信息, 自由流通市值,是否炸板)]
        """
        results = Mysqldb().select_all(
            f"select r.`_code`, r.`_code_name`, r.`_hot_block_name`, r.`_limit_up_time`, r.`_limit_up_high_info`, r.`_zylt_val`, r.`_open`  from kpl_limit_up_record r where r._day = '{self.day}'")
        return results
class LowSuctionDataManager:
    """
strategy/strategy_manager.py
@@ -7,6 +7,8 @@
from db import redis_manager_delegate as redis_manager
from db.mysql_data_delegate import Mysqldb
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util
from log_module.log import logger_trade
from strategy.data_analyzer import KPLLimitUpDataAnalyzer
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
from strategy.strategy_params_settings import StrategyParamsSettingsManager
@@ -101,7 +103,9 @@
    """
    低吸策略
    """
    def __init__(self, day, script_name="strategy_script_v6.py", settings=StrategyParamsSettingsManager().get_settings()):
    def __init__(self, day, script_name="strategy_script_v6.py",
                 settings=StrategyParamsSettingsManager().get_settings(), need_load_data = False):
        self.now_day = day
        # 买大单:{代码:[大单数据]}
        self.big_order_buy = {}
@@ -135,7 +139,8 @@
        self.current_block_in_datas = []
        # 加载策略脚本文件
        with open(script_name if constant.is_windows() else f'{constant.get_path_prefix()}/{script_name}', mode='r', encoding='utf-8') as f:
        with open(script_name if constant.is_windows() else f'{constant.get_path_prefix()}/{script_name}', mode='r',
                  encoding='utf-8') as f:
            lines = f.readlines()
            scripts = "\n".join(lines)
            # 注释掉里面的import与变量
@@ -146,9 +151,10 @@
            self.scripts = scripts
        self.settings = settings
        self.data_loader = DataLoader(self.now_day, cache_path=f"{constant.get_path_prefix()}/datas")
        self.data_loader = DataLoader(self.now_day)
        self.__LowSuctionOriginDataExportManager = LowSuctionOriginDataExportManager(self.now_day)
        self.load_data()
        if need_load_data:
            self.load_data()
    def load_data(self):
        # 加载历史数据
@@ -225,7 +231,6 @@
        """
        if code_ in self.stock_variables_dict:
            return
        stock_variables = StrategyVariableFactory.create_from_history_data(
            self.kline_data.get(code_), None,
            self.limit_up_record_data.get(code_), self.data_loader.trade_days)
@@ -269,6 +274,7 @@
        @param big_orders: [(代码, 买/卖, [订单号,量,金额,最后时间戳,最后价格, 初始时间戳, 初始价格])] 如:[ ('002741', 0, [475820, 91600, 1610328, 92500000, 17.58, 92500000, 17.58])]
        @return:
        """
        codes = []
        for d in big_orders:
            code = d[0]
            if d[1] == 0:
@@ -281,7 +287,11 @@
                if code not in self.big_order_sell:
                    self.big_order_sell[code] = []
                self.big_order_sell[code].append(d[2])
        # 驱动下单
                if code not in codes:
                    codes.append(code)
            # 驱动下单
        for code in codes:
            self.__run(code, self.stock_variables_dict.get(code))
    def add_ticks(self, ticks):
        """
@@ -355,6 +365,8 @@
        self.current_block_in_datas = _block_in_datas
    def __run(self, code, sv: StockVariables):
        if not sv:
            return
        # 运行代码
        # 注入大单
        sv.今日大单数据 = self.big_order_buy.get(code)
@@ -382,6 +394,8 @@
            # 判断是否可以买
            for b in compute_result[3]:
                DealCodesManager().place_order(b, code)
                async_log_util.info(logger_trade, f"{code}下单,板块:{compute_result[3]}")
# 当前的低吸策略对象
low_suction_strtegy = None
low_suction_strtegy = None
strategy/strategy_params_settings.py
@@ -31,7 +31,7 @@
        # 昨日不能跌停
        self.cant_yesterday_limit_down = True
        # 昨日不能炸板
        self.cant_yesterday_open_limit_up = False
        self.cant_yesterday_open_limit_up = True
        # 有涨停的交易日数量
        self.has_limit_up_days = 10
        # xx个交易日内不能有xx次涨停
strategy/strategy_script_v6.py
@@ -91,6 +91,9 @@
    if sv.六个交易日涨幅过高:
        return False, f"6个交易日涨幅过高"
    if sv.日三板个数_10 >= 1:
        return False, f"10个交易日有>=3连板"
    # if sv.当前价 > sv.昨日最低价 * 1.1:
    #     return False, f"买入时的价格必须≤昨日最低价*110%"
@@ -112,8 +115,8 @@
    # 目标票板块涨停个数>=2
    # 板块只能买入一个代码
    # if sv.板块成交代码:
    #     can_buy_plates -= set(sv.板块成交代码.keys())
    if sv.板块成交代码:
        can_buy_plates -= set(sv.板块成交代码.keys())
    if not can_buy_plates:
        return False, f"没有涨停的板块: {[(plate, sv.开盘啦最正板块涨停.get(plate)) for plate in sv.代码板块 if sv.开盘啦最正板块涨停]}  连续老题材:{sv.连续老题材}"
@@ -138,7 +141,7 @@
    if sv.今日大单数据:
        # print(sv.今日大单数据[-1][3], format_time(sv.今日大单数据[-1][3]))
        # filter_orders = [(o[0], o[2]) for o in sv.今日大单数据 if format_time(o[3]) >= sv.今日量够信息[0]]
        filter_orders = [(o[0], o[2]) for o in sv.今日大单数据 if o[2] >= 200e4]
        filter_orders = [(o[0], o[2]) for o in sv.今日大单数据]
        filter_orders.reverse()
        orderids = set()
        for o in filter_orders:
@@ -146,34 +149,20 @@
                continue
            orderids.add(o[0])
            big_order_money += o[1]
    big_sell_order_money = 0
    if sv.今日卖大单数据:
        filter_orders = [(o[0], o[2]) for o in sv.今日卖大单数据 if o[2] >= 200e4]
        filter_orders.reverse()
        orderids = set()
        for o in filter_orders:
            if o[0] in orderids:
                continue
            orderids.add(o[0])
            big_sell_order_money += o[1]
    threshold_money = max(sv.自由流通市值 // 1000, 200e4)
    threshold_money = max(1.5 * sv.自由流通市值 // 1000, 200e4)
    limit_up_codes_count = max([(p, len(sv.开盘啦最正板块涨停.get(p, []))) for p in can_buy_plates], key=lambda x: x[1])[1]
    # threshold_money *= max(min(10 - limit_up_codes_count + 1, 10), 5) / 10
    threshold_money *= 0.5
    # threshold_money *= max(10 - limit_up_codes_count + 3, 5) / 10
    if big_order_money < threshold_money:
        return False, f"({big_order_money}/{threshold_money})大单金额不足"
    # print(target_code, sv.自由流通市值, threshold_money, limit_up_codes_count)
    big_sell_order_money = 0
    # threshold_money = 200e4  # int(sv.昨日成交量 * 0.2 * sv.今日涨停价 * 0.05)
    if big_order_money < threshold_money:
        return False, f"({round(big_order_money / 1e4, 2)}万/{round(threshold_money / 1e4, 2)}万)大单金额不足"
    final_big_order_money = big_order_money - big_sell_order_money
    if final_big_order_money < threshold_money:
        return False, f"({round(final_big_order_money / 1e4, 2)}万/{round(threshold_money / 1e4, 2)}万)大单金额不足"
    return True, f" \n\t大单信息:{round(final_big_order_money / 1e4, 2)}万(买:{round(big_order_money / 1e4, 2)}万 卖:{round(big_sell_order_money / 1e4, 2)}万)/{round(threshold_money / 1e4, 2)}万  \n\t量够信息:{sv.今日量够信息}\n\t今日最高价:{sv.今日最高价信息} \n\t5日最高价:{sv.日最高价_5}", f"\n\t板块信息:{[(p, sv.开盘啦最正板块涨停.get(p)) for p in can_buy_plates]}", can_buy_plates
    return True, f" \n\t大单信息:{round(big_order_money / 1e4, 2)}万(买:{round(big_order_money / 1e4, 2)}万 卖:{round(big_sell_order_money / 1e4, 2)}万)/{round(threshold_money / 1e4, 2)}万  \n\t量够信息:{sv.今日量够信息}\n\t今日最高价:{sv.今日最高价信息} \n\t5日最高价:{sv.日最高价_5}", f"\n\t板块信息:{[(p, sv.开盘啦最正板块涨停.get(p)) for p in can_buy_plates]}", can_buy_plates
compute_result = can_buy()
strategy/strategy_script_v7.py
New file
@@ -0,0 +1,167 @@
import logging
from strategy.strategy_params_settings import StrategyParamsSettings
from strategy.strategy_variable import StockVariables
sv = StockVariables()
settings = StrategyParamsSettings()
target_code = ''
def format_time(huaxin_timestamp):
    huaxin_timestamp = str(huaxin_timestamp)
    if huaxin_timestamp.find("9") == 0:
        return f"0{huaxin_timestamp[0]}:{huaxin_timestamp[1: 3]}:{huaxin_timestamp[3: 5]}"
    return f"{huaxin_timestamp[0:2]}:{huaxin_timestamp[2: 4]}:{huaxin_timestamp[4: 6]}"
def can_buy():
    """
    @return: 是否可买, 不能买的原因/可买的板块, 是否量够
    """
    # print(f"{target_code}:执行策略")
    if not settings.can_buy_ge_code:
        if target_code.find("60") != 0 and target_code.find("00") != 0:
            return False, "创业板/科创板的票不买"
    else:
        if target_code.find("60") != 0 and target_code.find("00") != 0 and target_code.find("30") != 0:
            return False, "科创板的票不买"
    if sv.板块成交代码:
        deal_codes = set()
        for p in sv.板块成交代码:
            deal_codes |= set(sv.板块成交代码[p])
        if len(deal_codes) >= settings.max_buy_codes_count:
            return False, f"买入代码数超限({len(deal_codes)}/{settings.max_buy_codes_count})"
    # 目标票板块涨停个数>=2
    can_buy_plates = set()
    for plate in sv.代码板块:
        if not sv.资金流入板块 or plate not in sv.资金流入板块:
            continue
        if plate in sv.连续老题材:
            continue
        if plate in sv.日出现的板块_2:
            # 老题材
            threshold_count = settings.limit_up_count_of_old_plate
        else:
            # 新题材
            threshold_count = settings.limit_up_count_of_new_plate
        if len(sv.开盘啦最正板块涨停.get(plate, [])) >= threshold_count:
            can_buy_plates.add(plate)
    if not sv.当前价:
        return False, "无当前价"
    # if getattr(sv, f"涨停数_{settings.has_limit_up_days}") < 1 and getattr(sv, f"炸板数_{settings.has_limit_up_days}") < 1:
    #     return False, f"近{settings.has_limit_up_days}个交易日无涨停/无炸板"
    if settings.cant_yesterday_limit_down and not sv.昨日非跌停:
        return False, "昨日跌停"
    if settings.cant_yesterday_limit_up and not sv.昨日非涨停:
        return False, "昨日涨停"
    if settings.cant_yesterday_open_limit_up and not sv.昨日非炸板:
        return False, "昨日炸板"
    if sv.今日涨停价 > settings.price_range[1] or sv.今日涨停价 < settings.price_range[0]:
        return False, f"今日涨停价高于{settings.price_range[1]}/低于{settings.price_range[0]}"
    if sv.自由流通市值 > settings.zyltgb_range[1] * 1e8 or sv.自由流通市值 < settings.zyltgb_range[0] * 1e8:
        return False, f"自由市值({sv.自由流通市值})不满足要求"
    if sv.六个交易日涨幅过高:
        return False, f"6个交易日涨幅过高"
    if sv.日三板个数_10 >= 1:
        return False, f"10个交易日有>=3连板"
    # if sv.当前价 > sv.昨日最低价 * 1.1:
    #     return False, f"买入时的价格必须≤昨日最低价*110%"
    if (sv.当前价 - sv.今日最低价) / sv.昨日收盘价 > 0.03:
        return False, f"买入时的价格不能高于今日最低价的3%"
    # if abs((sv.当前价 - round(sv.今日成交额 / sv.今日成交量, 2)) / sv.昨日收盘价) >= settings.max_rate_than_average_price:
    #     return False, f"买入价高于均价{settings.max_rate_than_average_price}({abs((sv.当前价 - round(sv.今日成交额 / sv.今日成交量, 2)) / sv.昨日收盘价)})"
    #
    # if (sv.今日最高价信息[0] - sv.当前价) / sv.昨日收盘价 > settings.min_rate_of_highest_and_price:
    #     return False, f"低于分时高价{settings.min_rate_of_highest_and_price}"
    if not settings.can_buy_limited_up and abs(sv.今日最高价信息[0] - sv.今日涨停价) <= 0.001:
        return False, f"今日有涨停"
    # if sv.涨停数_30 <= 0 and not sv.日放倍量日期_15:
    #     return False, "30个交易日无涨停且15个交易日无倍量"
    # 目标票板块涨停个数>=2
    # 板块只能买入一个代码
    # if sv.板块成交代码:
    #     can_buy_plates -= set(sv.板块成交代码.keys())
    if not can_buy_plates:
        return False, f"没有涨停的板块: {[(plate, sv.开盘啦最正板块涨停.get(plate)) for plate in sv.代码板块 if sv.开盘啦最正板块涨停]}  连续老题材:{sv.连续老题材}"
    # new_plates = set(can_buy_plates) - sv.日出现的板块_2
    # if new_plates:
    #     # 有新题材,判断是否过昨日前高
    #     if sv.今日最高价信息[0] - sv.昨日最高价 < 0.02:
    #         return False, "今日最高价需大于昨日最高价"
    # else:
    #     if sv.今日最高价信息[0] - sv.日最高价_5 < 0.02:
    #         return False, "今日最高价需大于5日最高价"
    if settings.trade_days_count_of_limit_up_price_over_high and sv.今日涨停价 <= getattr(sv,
                                                                                     f"日最高价_{settings.trade_days_count_of_limit_up_price_over_high}"):
        return False, f"今日涨停价要突破{settings.trade_days_count_of_limit_up_price_over_high}日最高价"
    # if sv.今日成交量 < sv.昨日成交量 * 0.8:
    #     return False, f"实时成交量必须≥80%昨日总成交量"
    # =======成交大单=====
    big_order_money = 0
    if sv.今日大单数据:
        # print(sv.今日大单数据[-1][3], format_time(sv.今日大单数据[-1][3]))
        # filter_orders = [(o[0], o[2]) for o in sv.今日大单数据 if format_time(o[3]) >= sv.今日量够信息[0]]
        filter_orders = [(o[0], o[2]) for o in sv.今日大单数据 if o[2] >= 299e4 and o[4] > sv.昨日收盘价]
        if not filter_orders:
            return False, "无涨幅大于0%的大单"
        filter_orders = [(o[0], o[2]) for o in sv.今日大单数据 if o[2] >= 299e4]
        filter_orders.reverse()
        orderids = set()
        for o in filter_orders:
            if o[0] in orderids:
                continue
            orderids.add(o[0])
            big_order_money += o[1]
    big_sell_order_money = 0
    if sv.今日卖大单数据:
        filter_orders = [(o[0], o[2]) for o in sv.今日卖大单数据 if o[2] >= 100e4]
        filter_orders.reverse()
        orderids = set()
        for o in filter_orders:
            if o[0] in orderids:
                continue
            orderids.add(o[0])
            big_sell_order_money += o[1]
    threshold_money = max(sv.自由流通市值 // 1000, 200e4)
    limit_up_codes_count = max([(p, len(sv.开盘啦最正板块涨停.get(p, []))) for p in can_buy_plates], key=lambda x: x[1])[1]
    # threshold_money *= max(min(10 - limit_up_codes_count + 1, 10), 5) / 10
    threshold_money *= 0.5
    # print(target_code, sv.自由流通市值, threshold_money, limit_up_codes_count)
    # threshold_money = 200e4  # int(sv.昨日成交量 * 0.2 * sv.今日涨停价 * 0.05)
    final_big_order_money = big_order_money - big_sell_order_money
    if final_big_order_money <= 200e4:
        # 大单净额要大于200w
        return False, f"({round(final_big_order_money / 1e4, 2)}万/{round(threshold_money / 1e4, 2)}万)大单金额不足"
    return True, f" \n\t大单信息:{round(final_big_order_money / 1e4, 2)}万(买:{round(big_order_money / 1e4, 2)}万 卖:{round(big_sell_order_money / 1e4, 2)}万)/{round(threshold_money / 1e4, 2)}万  \n\t量够信息:{sv.今日量够信息}\n\t今日最高价:{sv.今日最高价信息} \n\t5日最高价:{sv.日最高价_5}", f"\n\t板块信息:{[(p, sv.开盘啦最正板块涨停.get(p)) for p in can_buy_plates]}", can_buy_plates
compute_result = can_buy()
strategy/strategy_variable_factory.py
@@ -7,7 +7,7 @@
import re
import constant
from code_attribute import global_data_loader
from code_attribute import global_data_loader, gpcode_manager
from db import mysql_data_delegate
from strategy.data_analyzer import KTickLineAnalyzer, KPLLimitUpDataAnalyzer, K60SLineAnalyzer
from strategy.strategy_variable import StockVariables
@@ -22,7 +22,7 @@
    数据加载器类,用于集中管理策略变量所需的各类数据加载逻辑
    """
    def __init__(self, now_day, cache_path="D:/datas"):
    def __init__(self, now_day, cache_path=f"{constant.get_path_prefix()}/datas"):
        """
        初始化数据加载器
        :param now_day: 当前日期,格式为"2025-01-01"
@@ -235,8 +235,8 @@
                results = [x for x in results if
                           (tool.is_can_buy_code(x[0]) and x[0] in valid_codes and x[0] not in exclude_codes)]
                # 取前1/3且涨停数是前10
                max_count = len(results) // 2 if len(results) % 2 == 0 else len(results) // 2 + 1
                results = results[:max_count]
                max_count = len(results) // 3 if len(results) % 3 == 0 else len(results) // 3 + 1
                # results = results[:max_count]
                # 取前10
                results = results[:10]
                codes = [x[0] for x in results]
@@ -636,8 +636,39 @@
            print(code, code_blocks[code])
def __load_target_codes_v1():
    """
    50亿以下的
    @return:
    """
    def get_zylt(code):
        zylt_volume_map = global_util.zylt_volume_map
        last_trade_day = __DataLoader.trade_days[0]
        volume = zylt_volume_map.get(code)
        # 今日涨停价要突破昨日最高价
        k_bars = HistoryKDataManager().get_history_bars(code, last_trade_day)
        return k_bars[0]["close"] * volume * tool.get_limit_up_rate(code)
    __DataLoader = DataLoader('2025-06-13')
    global_data_loader.load_zyltgb_volume_from_db()
    results = __DataLoader.load_target_plate_and_codes()
    # for k in results:
    #     print(k, results[k])
    plates = ["天然气", "军工"]
    print("==========新题材=======")
    for p in plates:
        codes = [x for x in results.get(p)]  # if get_zylt(x) < 31e8
        print("======", p)
        for code in codes:
            print("\t\t", code, gpcode_manager.CodesNameManager().get_code_name(code))
if __name__ == "__main__":
    __DataLoader = DataLoader("2025-06-12")
    # __load_target_codes_v1()
    __DataLoader = DataLoader("2025-06-17")
    # __test_jx_blocks(__DataLoader)
    # instance = StockVariables()
@@ -655,7 +686,7 @@
    results = __DataLoader.load_target_plate_and_codes()
    # for k in results:
    #     print(k, results[k])
    plates = ["汽车零部件", "稀土永磁", "化工", "医药", "光伏"]
    plates = ["脑机接口"]
    print("==========新题材=======")
    for p in plates:
        print(p, results.get(p))
strategy/test.py
@@ -1,8 +1,11 @@
from huaxin_client import l1_subscript_codes_manager
from strategy import strategy_manager
from strategy.strategy_variable import StockVariables
# 统计当日的平均溢价率
from third_data.kpl_block_manager import KPLCodeJXBlocksManager
def statistic_average(path):
    rate_list = []
    with open(path, mode='r', encoding='utf-8') as f:
@@ -19,10 +22,17 @@
if __name__ == "__main__":
    # print("======3个票涨停之后买+开盘价>=-3")
    # statistic_average(r"C:\Users\Administrator\Desktop\3个票涨停之后买.txt")
    print("======3个票涨停之后买")
    statistic_average(r"C:\Users\Administrator\Desktop\3个票涨停之后买.txt")
    # print("======3个票涨停之后买+不限开盘涨幅+3个涨停之后大单打折")
    # statistic_average(r"C:\Users\Administrator\Desktop\3个票涨停之后买_不限开盘涨幅.txt")
    strategy_manager.low_suction_strtegy
    codes = set()
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
    codes |= set([x.decode() for x in codes_sh])
    codes |= set([x.decode() for x in codes_sz])
    KPLCodeJXBlocksManager('2025-06-17', codes).start_download_blocks()
    # target_block = {"石油石化", "天然气", "化工"}
    # for code in code_blocks:
    #     blocks = code_blocks.get(code)
    #     if len(blocks & target_block) == len(target_block):
    #         print(code, blocks)
strategy/time_series_backtest.py
@@ -298,6 +298,9 @@
        if code_ in self.stock_variables_dict:
            return
        if code_ == '002907':
            print("")
        stock_variables = StrategyVariableFactory.create_from_history_data(
            timeline_data["kline_data"].get(code_), timeline_data["minute_data"].get(code_),
            timeline_data["limit_up_record_data"].get(code_), timeline_data["trade_days"])
@@ -582,6 +585,21 @@
                            if p not in most_real_kpl_plate_limit_up_codes_info:
                                most_real_kpl_plate_limit_up_codes_info[p] = []
                            most_real_kpl_plate_limit_up_codes_info[p].append(code)
                # print(time_str, "涨停数大于3个", [p for p in most_real_kpl_plate_limit_up_codes_info if
                #                             len(most_real_kpl_plate_limit_up_codes_info[p]) >= 3])
                # ---------测试--------
                # test_plate = "化工"
                # if len(most_real_kpl_plate_limit_up_codes_info.get(test_plate, [])) >= 3:
                #     print("测试开始=========")
                #     code_plates_for_buy = self.current_data["code_plates_for_buy"]
                #     plate_codes = [c for c in code_plates_for_buy if test_plate in code_plates_for_buy[c]]
                #     print(f"{test_plate}满足", time_str, plate_codes)
                #     for c in plate_codes:
                #         sv: StockVariables = self.stock_variables_dict.get(c)
                #         if sv and sv.当前价 > sv.昨日收盘价:
                #             print(c)
                #     print("测试完毕=========")
            if ticks:
                for tick in ticks:
@@ -650,6 +668,7 @@
                        stock_variables.今日最低价 = tick["price"]
                    if most_real_kpl_plate_limit_up_codes_info:
                        stock_variables.开盘啦最正板块涨停 = most_real_kpl_plate_limit_up_codes_info
                    # if time_str >= '09:30:00':
                    #     if stock_variables.今日大单数据 and stock_variables.开盘啦最正板块涨停 and max(
                    #             [len(stock_variables.开盘啦最正板块涨停.get(x, [])) for x in stock_variables.代码板块]) >= 3:
@@ -695,6 +714,8 @@
                    if block_in_datas:
                        stock_variables.资金流入板块 = block_in_datas
                    stock_variables.当前价 =  big_order[1][4]
                    compute_result = self.__run_backtest(code, stock_variables)
                    # print(compute_result)
                    self.__process_test_result(code, stock_variables, next_trade_day, big_order[1][4],
@@ -768,7 +789,7 @@
            stock_variables.板块成交代码 = self.deal_block_codes
# DEBUG_CODES =  ['603040', '603758', '603286', '603586', '605255', '002048', '605208', '002806', '603266', '603788']
# DEBUG_CODES = ['603579', '300884']
DEBUG_CODES = []
VOLUME_LOG_ENABLE = False
@@ -777,18 +798,19 @@
DEBUG_BLOCKS = []
BIG_ORDER_MONEY_THRESHOLD = 100e4
BIG_ORDER_MONEY_THRESHOLD = 200e4
if __name__ == "__main__":
    back_test_dict = {}
    # days = ["2025-05-06", "2025-05-07", "2025-05-08", "2025-05-09", "2025-05-12", "2025-05-13", "2025-05-14",
    #         "2025-05-15", "2025-05-16", "2025-05-19", "2025-05-20",  "2025-05-21", "2025-05-22"]
    days = ["2025-05-12", "2025-05-13", "2025-05-14", "2025-05-15", "2025-05-16", "2025-05-19", "2025-05-20",
            "2025-05-21", "2025-05-22", "2025-05-23", "2025-05-26", "2025-05-27", "2025-05-28", "2025-05-29",
            "2025-05-30", "2025-06-03", "2025-06-04", "2025-06-05", "2025-06-06", "2025-06-09", "2025-06-10",
            "2025-06-11",  "2025-06-12"]
            "2025-05-30", "2025-06-03"]
    # days = ["2025-05-12", "2025-05-13", "2025-05-14", "2025-05-15", "2025-05-16", "2025-05-19", "2025-05-20",
    #         "2025-05-21", "2025-05-22", "2025-05-23", "2025-05-26", "2025-05-27", "2025-05-28", "2025-05-29",
    #         "2025-05-30", "2025-06-03", "2025-06-04", "2025-06-05", "2025-06-06", "2025-06-09", "2025-06-10",
    #         "2025-06-11", "2025-06-12", "2025-06-13", "2025-06-16", "2025-06-17"]
    # days = ["2025-06-09"]
    # days = ["2025-05-23"]
    days.reverse()
    for day in days:
third_data/kpl_api.py
@@ -361,7 +361,7 @@
    #     print(d)
    #
    # print(result)
    print(getCodeJingXuanBlocks("002519"))
    print(getCodeJingXuanBlocks("002878"))
    # results = getHistoryCodesByPlateOrderByLZCS("801199", "2025-05-16", "0930", 3)
    # results = json.loads(results)["list"]
    # for result in results:
third_data/kpl_block_manager.py
New file
@@ -0,0 +1,55 @@
"""
开盘啦板块管理
"""
import json
import constant
from db.mysql_data_delegate import Mysqldb
from third_data import kpl_api, kpl_util
from utils import tool
class KPLCodeJXBlocksManager:
    """
    开盘啦精选板块管理
    """
    def __init__(self, day, target_codes):
        self.day = day
        self.mysql_db = Mysqldb()
        self.target_codes = target_codes
    def __download_blocks(self, code):
        datas = kpl_api.getCodeJingXuanBlocks(code)
        blocks = json.dumps([kpl_util.filter_block(x[1]) for x in datas], ensure_ascii=False)
        block_ids = json.dumps([x[0] for x in datas], ensure_ascii=False)
        block_details = json.dumps(datas, ensure_ascii=False)
        id = f"{self.day}_{code}"
        self.mysql_db.execute(
            f"insert into kpl_code_blocks(id,code, day,jx_blocks, jx_block_ids, jx_blocks_detail, create_time) values('{id}','{code}','{self.day}','{blocks}', '{block_ids}','{block_details}', now())")
    def start_download_blocks(self):
        codes = self.mysql_db.select_all(f"select code from kpl_code_blocks where day='{self.day}'")
        codes = set([x[0] for x in codes])
        need_update_codes = set(self.target_codes) - codes
        for code in need_update_codes:
            self.__download_blocks(code)
    def get_all_code_blocks(self):
        """
        获取所有代码的板块
        @return:
        """
        sql = f"select code, jx_blocks from kpl_code_blocks where day = '{self.day}'"
        results = self.mysql_db.select_all(sql)
        return {x[0]: set(json.loads(x[1])) - constant.KPL_INVALID_BLOCKS for x in results}
    def get_all_code_blocks_count(self):
        """
        获取所有代码的板块的数量
        @return:
        """
        sql = f"select count(*) from kpl_code_blocks where day = '{self.day}'"
        results = self.mysql_db.select_one(sql)
        return int(results[0])