13个文件已修改
410 ■■■■ 已修改文件
api/outside_api_callback.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_market_client.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server/data_server.py 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/data_analyzer.py 72 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_manager.py 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_script_v6.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_variable.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/strategy_variable_factory.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/test.py 39 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
strategy/time_series_backtest.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 150 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/outside_api_callback.py
@@ -144,6 +144,7 @@
            strategy_manager.low_suction_strtegy.load_data()
            return {"code": 0}
        except Exception as e:
            logging.exception(e)
            return {"code": 1, "msg": str(e)}
    def OnCommonRequest(self, client_id, request_id, data):
huaxin_client/l2_market_client.py
@@ -4,6 +4,7 @@
import logging
import multiprocessing
import os
import pickle
import queue
import time
import concurrent.futures
@@ -207,8 +208,8 @@
    # 上传数据
    type_ = "set_target_codes"
    request_id = f"sb_{int(time.time() * 1000)}"
    fdata = json.dumps(
        {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)})
    fdata = pickle.dumps(
        {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)}, protocol=pickle.HIGHEST_PROTOCOL)
    if queue_l1_w_strategy_r is not None:
        queue_l1_w_strategy_r.put_nowait(fdata)
    # 记录新增加的代码
main.py
@@ -1,5 +1,6 @@
import json
import multiprocessing
import pickle
import threading
import time
@@ -26,6 +27,7 @@
        while True:
            try:
                data = queue_l1_w_strategy_r.get()
                data = pickle.loads(data)
                if data.get("type") == 'set_target_codes':
                    # [(代码, 时间戳, 价格, 总交易量, 总交易额, 买5, 卖5)]
                    market_data_list = data["data"]["data"]
server/data_server.py
@@ -40,26 +40,26 @@
        url = urlparse.urlparse(path)
        result_str = ""
        try:
            params = self.__parse_request()
            if type(params) == str:
                params = json.loads(params)
            if url.path == "/upload_big_order_datas":
                # 接收成交大单数据
                params = self.__parse_request()
                strategy_manager.low_suction_strtegy.add_big_orders(params)
                # logger_debug.info("upload_big_order_datas:{}", f"{params}")
                big_order_datas = params
                strategy_manager.low_suction_strtegy.add_big_orders(big_order_datas)
                RealTimeEnvInfo().big_order_update_time = tool.get_now_time_str()
                print("获取到大单", os.getpid())
                result_str = json.dumps({"code": 0})
            elif url.path == "/upload_block_in_datas":
                # 接收板块流入数据
                params = self.__parse_request()
                strategy_manager.low_suction_strtegy.add_block_in(params)
                # logger_debug.info("upload_block_in_datas:{}", f"{params}")
                RealTimeEnvInfo().block_in = (tool.get_now_time_str(), len(params))
                block_in_datas = params
                strategy_manager.low_suction_strtegy.add_block_in(block_in_datas)
                RealTimeEnvInfo().block_in = (tool.get_now_time_str(), len(block_in_datas))
                result_str = json.dumps({"code": 0})
            elif url.path == "/upload_limit_up_list":
                params = self.__parse_request()
                strategy_manager.low_suction_strtegy.add_limit_up_list(params)
                # logger_debug.info("upload_limit_up_list:{}", f"{params}")
                RealTimeEnvInfo().kpl_current_limit_up = (tool.get_now_time_str(), len(params))
                limit_up_list = params
                strategy_manager.low_suction_strtegy.add_limit_up_list(limit_up_list)
                RealTimeEnvInfo().kpl_current_limit_up = (tool.get_now_time_str(), len(limit_up_list))
                result_str = json.dumps({"code": 0})
            else:
                pass
@@ -75,11 +75,9 @@
        self.wfile.write(data.encode())
    def __parse_request(self):
        params = {}
        datas = self.rfile.read(int(self.headers['content-length']))
        _str = str(datas, encoding="gbk")
        params = json.loads(_str)
        return params
        return json.loads(_str)
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
strategy/data_analyzer.py
@@ -281,7 +281,7 @@
        @return:
        """
        return abs(close - cls.calculate_upper_limit_price(code,
                                                        pre_close)) < 0.01
                                                           pre_close)) < 0.01
    @classmethod
    def get_third_limit_up_days(cls, k_data, days):
@@ -298,11 +298,18 @@
            if i + 3 >= len(k_data):
                continue
            # 判断连续三日涨停且第四日非涨停
            if cls.__is_limit_up(k_data[i]["sec_id"], k_data[i]['close'], k_data[i]["pre_close"]):
                if cls.__is_limit_up(k_data[i+1]["sec_id"], k_data[i+1]['close'], k_data[i+1]["pre_close"]):
                    if cls.__is_limit_up(k_data[i+2]["sec_id"], k_data[i+2]['close'], k_data[i+2]["pre_close"]):
                        if not cls.__is_limit_up(k_data[i+3]["sec_id"], k_data[i+3]['close'], k_data[i+3]["pre_close"]):
                            count += 1
            day_count = 3
            for n in range(day_count + 1):
                if n < day_count:
                    if not cls.__is_limit_up(k_data[i + n]["sec_id"], k_data[i + n]['close'],
                                             k_data[i + n]["pre_close"]):
                        # 非涨停
                        break
                else:
                    if not cls.__is_limit_up(k_data[i + n]["sec_id"], k_data[i + n]['close'],
                                             k_data[i + n]["pre_close"]):
                        count += 1
                        break
        return count
    @classmethod
@@ -414,6 +421,59 @@
                count += 1
        return count
    @classmethod
    def is_too_high_and_not_relase_volume(cls, k_data):
        """
        长得太高且没放量:30个交易日内,出现过最低价(最高价之前的交易日)到最高价之间的涨幅≥35%的票,且今日距离最高价那日无涨停/无炸板且>=3板且必须有2连板
        @param k_data: K线数据列表(近150个交易日,不包含当前交易日,时间倒序)
        @return: 四跌停及以上天数
        """
        k_data = k_data[:30]
        code = k_data[0]["sec_id"]
        # 获取最高价信息
        max_high_price_data = max(k_data, key=lambda x: x["high"])
        before_datas = [d for d in k_data if d['bob'] < max_high_price_data['bob']]
        after_datas = [d for d in k_data if d['bob'] >= max_high_price_data['bob']]
        if not before_datas:
            return False
        if len(before_datas) > 15:
            # 从最高价日期向前最多看15个交易日
            before_datas = before_datas[:15]
        min_close_price_data = min(before_datas, key=lambda x: x["close"])
        if (max_high_price_data['high'] - min_close_price_data['close']) / min_close_price_data['close'] < 0.35:
            # 涨幅小于35%
            return False
        before_k_datas = [d for d in k_data if min_close_price_data['bob'] <= d['bob'] <= max_high_price_data['bob']]
        before_k_datas.sort(key=lambda x: x['bob'])
        # [最低价-最高价]日期内有3个板且有两连扳
        continue_2_limit_up_date = None
        for i in range(len(before_k_datas) - 1):
            if cls.__is_limit_up(code, before_k_datas[i]["close"],
                                 before_k_datas[i]["pre_close"]) and cls.__is_limit_up(code,
                                                                                       before_k_datas[i + 1]["close"],
                                                                                       before_k_datas[i + 1][
                                                                                           "pre_close"]):
                continue_2_limit_up_date = before_k_datas[i + 1]['bob'][:10]
                break
        if not continue_2_limit_up_date:
            # 无两连板
            return False
        # 两连板之后是否有炸板/涨停
        # 取2连板之后的3个交易日
        temp_k_datas = [d for d in before_k_datas if d['bob'][:10] > continue_2_limit_up_date][:3]
        if len([d for d in temp_k_datas if cls.__is_limit_up(code, d["high"], d["pre_close"])]) < 1:
            # 两连板之后有个涨停/炸板且时间在2连板之后的3个交易日内
            return False
        k_data = [d for d in k_data if d['bob'] > max_high_price_data['bob']]
        # 判断是否涨停过
        if len([d for d in k_data if cls.__is_limit_up(code, d["high"], d["pre_close"])]) > 0 or len(after_datas) >= 10:
            # 最高价之后有过涨停或者是最高价后10个交易日
            return False
        return True, f"高价日期:{max_high_price_data['bob'][:10]},低价日期:{min_close_price_data['bob'][:10]},两连扳日期:{continue_2_limit_up_date}"
class K60SLineAnalyzer:
    """
strategy/strategy_manager.py
@@ -8,7 +8,7 @@
from db.mysql_data_delegate import Mysqldb
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util
from log_module.log import logger_trade
from log_module.log import logger_trade, logger_debug
from strategy.data_analyzer import KPLLimitUpDataAnalyzer
from strategy.low_suction_strategy import LowSuctionOriginDataExportManager
from strategy.strategy_params_settings import StrategyParamsSettingsManager
@@ -16,7 +16,7 @@
from strategy.strategy_variable_factory import DataLoader, StrategyVariableFactory
import constant
from third_data import kpl_util
from trade.trade_manager import DealCodesManager
from trade.trade_manager import DealCodesManager, PlatePlaceOrderManager
from utils import huaxin_util, tool
@@ -105,7 +105,7 @@
    """
    def __init__(self, day, script_name="strategy_script_v6.py",
                 settings=StrategyParamsSettingsManager().get_settings(), need_load_data = False):
                 settings=StrategyParamsSettingsManager().get_settings(), need_load_data=False):
        self.now_day = day
        # 买大单:{代码:[大单数据]}
        self.big_order_buy = {}
@@ -116,7 +116,8 @@
        # 历史日K数据
        self.kline_data = {}
        # 历史涨停数据
        self.limit_up_record_data = {}
        self.limit_up_record_data_dict = {}
        self.limit_up_record_data_list = {}
        # 历史数据
        self.timeline_data = {}
        # 今日数据
@@ -170,13 +171,17 @@
        trade_days = self.data_loader.trade_days
        # 加载历史数据
        self.kline_data = self.data_loader.load_kline_data()
        self.limit_up_record_data = self.data_loader.load_limit_up_data()
        self.limit_up_record_data_list = self.data_loader.load_limit_up_data()
        for d in self.limit_up_record_data_list:
            if d[0] not in self.limit_up_record_data_dict:
                self.limit_up_record_data_dict[d[0]] = []
            self.limit_up_record_data_dict[d[0]].append(d)
        self.next_trade_day = self.data_loader.load_next_trade_day()
        if not trade_days:
            raise Exception("交易日历获取失败")
        if not self.kline_data:
            raise Exception("历史日K获取失败")
        if not self.limit_up_record_data:
        if not self.limit_up_record_data_list:
            raise Exception("历史涨停获取失败")
    def __load_current_date_data_by_timeline(self):
@@ -233,7 +238,7 @@
            return
        stock_variables = StrategyVariableFactory.create_from_history_data(
            self.kline_data.get(code_), None,
            self.limit_up_record_data.get(code_), self.data_loader.trade_days)
            self.limit_up_record_data_dict.get(code_), self.data_loader.trade_days)
        # 加载今日涨停价
        pre_close = self.kline_data.get(code_)[0]["close"]
@@ -249,10 +254,10 @@
            days = self.data_loader.trade_days[:day]
            stock_variables.__setattr__(f"日出现的板块_{day}",
                                        KPLLimitUpDataAnalyzer.get_limit_up_reasons(
                                            self.limit_up_record_data, min_day=days[-1],
                                            self.limit_up_record_data_list, min_day=days[-1],
                                            max_day=days[0]))
        stock_variables.连续老题材 = KPLLimitUpDataAnalyzer.get_continuous_limit_up_reasons(
            self.limit_up_record_data, self.data_loader.trade_days[:2])
            self.limit_up_record_data_list, self.data_loader.trade_days[:2])
        # 加载Tick信息
        open_price_info = TickSummaryDataManager().open_price_info_dict.get(code_)
@@ -274,21 +279,31 @@
        @param big_orders: [(代码, 买/卖, [订单号,量,金额,最后时间戳,最后价格, 初始时间戳, 初始价格])] 如:[ ('002741', 0, [475820, 91600, 1610328, 92500000, 17.58, 92500000, 17.58])]
        @return:
        """
        big_orders = [x for x in big_orders if x[0] in self.fcodes]
        codes = []
        for d in big_orders:
            code = d[0]
            if d[1] == 0:
                # 买单
                if code not in self.big_order_buy:
                    self.big_order_buy[code] = []
                self.big_order_buy[code].append(d[2])
            else:
                # 卖单
                if code not in self.big_order_sell:
                    self.big_order_sell[code] = []
                self.big_order_sell[code].append(d[2])
                if code not in codes:
                    codes.append(code)
            try:
                code = d[0]
                # 只计算200w以上的买单
                if d[2][2] < 200e4:
                    continue
                if d[1] == 0:
                    # 买单
                    if code not in self.big_order_buy:
                        self.big_order_buy[code] = []
                    self.big_order_buy[code].append(d[2])
                else:
                    # 卖单
                    if code not in self.big_order_sell:
                        self.big_order_sell[code] = []
                    self.big_order_sell[code].append(d[2])
                    if code not in codes:
                        codes.append(code)
                # 设置现价
                if code in self.stock_variables_dict:
                    self.stock_variables_dict[code].当前价 = d[2][4]
            except Exception as e:
                logger_debug.error(f"{d}")
            # 驱动下单
        for code in codes:
            self.__run(code, self.stock_variables_dict.get(code))
@@ -376,10 +391,14 @@
        # 注入板块流入信息
        if self.current_block_in_datas:
            sv.资金流入板块 = self.current_block_in_datas
        # 注入已成交代码
        place_order_plate_codes = DealCodesManager().get_place_order_plate_codes()
        # 注入已成交代码,成交代码以委托数据来计算
        place_order_plate_codes = PlatePlaceOrderManager().get_plate_codes()
        sv.板块成交代码 = place_order_plate_codes
        sv.成交代码 = DealCodesManager().get_deal_codes()
        code_sets = [set(lst) for lst in place_order_plate_codes.values()]
        # 2. 使用 set.union() 求并集
        union_code_sets = set().union(*code_sets)
        sv.成交代码 = union_code_sets
        global_dict = {
            "sv": sv,
            "target_code": code,
@@ -392,8 +411,11 @@
                return
            # 可以下单
            # 判断是否可以买
            order_ref = huaxin_util.create_order_ref()
            price = tool.get_buy_max_price(sv.当前价)
            volume = 100
            DealCodesManager().place_order(set(compute_result[3]), code, order_ref, price, volume)
            for b in compute_result[3]:
                DealCodesManager().place_order(b, code)
                async_log_util.info(logger_trade, f"{code}下单,板块:{compute_result[3]}")
strategy/strategy_script_v6.py
@@ -94,6 +94,9 @@
    if sv.日三板个数_10 >= 1:
        return False, f"10个交易日有>=3连板"
    if sv.涨得高未放量:
        return False, f"涨得高未放量"
    # if sv.当前价 > sv.昨日最低价 * 1.1:
    #     return False, f"买入时的价格必须≤昨日最低价*110%"
strategy/strategy_variable.py
@@ -213,6 +213,7 @@
        self.辨识度代码 = set()
        self.领涨板块信息 = set()
        self.连续老题材 = set()
        self.涨得高未放量 = False
    def replace_variables(self, expression):
        """
strategy/strategy_variable_factory.py
@@ -602,6 +602,12 @@
            kline_data_60s = kline_data_60s_dict.get(trade_days[0])
            fdata = K60SLineAnalyzer.get_close_price_of_max_volume(kline_data_60s)
            instance.__setattr__(f"昨日分时最高量价", fdata)
        if KTickLineAnalyzer.is_too_high_and_not_relase_volume(kline_data_1d):
            instance.涨得高未放量 = True
        else:
            instance.涨得高未放量 = False
        return instance
strategy/test.py
@@ -1,13 +1,15 @@
from huaxin_client import l1_subscript_codes_manager
from strategy import strategy_manager
from strategy import strategy_manager, data_analyzer
from strategy.strategy_variable import StockVariables
# 统计当日的平均溢价率
from strategy.strategy_variable_factory import DataLoader
from third_data.kpl_block_manager import KPLCodeJXBlocksManager
def statistic_average(path):
    rate_list = []
    yjl_list = []
    with open(path, mode='r', encoding='utf-8') as f:
        lines = f.readlines()
        for line in lines:
@@ -17,22 +19,39 @@
                continue
            r = round(float(line.split("当日盈亏:")[1].split(",")[0].replace("%", "")), 2)
            rate_list.append(r)
    print("平均利润率:", round(sum(rate_list) / len(rate_list), 2))
    print("总利润率:", round(sum(rate_list), 2), "总买票数量:", len(rate_list))
            r = line.split("溢价率:")[1].split(",")[0].replace("%", "")
            if r.find("未知") < 0:
                yjl_list.append(round(float(r), 2))
    print("当日平均利润率:", round(sum(rate_list) / len(rate_list), 2))
    print("当日总利润率:", round(sum(rate_list), 2), "总买票数量:", len(rate_list))
    print("次日开盘平均利润率:", round(sum(yjl_list) / len(yjl_list), 2))
    print("次日开盘总利润率:", round(sum(yjl_list), 2), "总买票数量:", len(yjl_list))
if __name__ == "__main__":
    print("======3个票涨停之后买_不买长得太高未放量")
    statistic_average(r"C:\Users\Administrator\Desktop\3个票涨停之后买_不买长得太高未放量.txt")
    print("======3个票涨停之后买")
    statistic_average(r"C:\Users\Administrator\Desktop\3个票涨停之后买.txt")
    # print("======3个票涨停之后买+不限开盘涨幅+3个涨停之后大单打折")
    # statistic_average(r"C:\Users\Administrator\Desktop\3个票涨停之后买_不限开盘涨幅.txt")
    codes = set()
    codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
    codes |= set([x.decode() for x in codes_sh])
    codes |= set([x.decode() for x in codes_sz])
    KPLCodeJXBlocksManager('2025-06-17', codes).start_download_blocks()
    # codes = set()
    # codes_sh, codes_sz = l1_subscript_codes_manager.get_codes()
    # codes |= set([x.decode() for x in codes_sh])
    # codes |= set([x.decode() for x in codes_sz])
    # KPLCodeJXBlocksManager('2025-06-17', codes).start_download_blocks()
    # target_block = {"石油石化", "天然气", "化工"}
    # for code in code_blocks:
    #     blocks = code_blocks.get(code)
    #     if len(blocks & target_block) == len(target_block):
    #         print(code, blocks)
    __DataLoader = DataLoader("2025-06-18")
    kline_datas = __DataLoader.load_kline_data()
    codes = []
    for code in kline_datas:
        # if code !='003010':
        #     continue
        result = data_analyzer.KTickLineAnalyzer.is_too_high_and_not_relase_volume(kline_datas[code])
        if result:
            print("未放量", code, result[1])
            codes.append(code)
    print(len(codes))
strategy/time_series_backtest.py
@@ -1,3 +1,5 @@
import logging
import constant
from code_attribute import gpcode_manager, code_nature_analyse
from strategy.data_analyzer import KPLLimitUpDataAnalyzer
@@ -715,11 +717,14 @@
                    if block_in_datas:
                        stock_variables.资金流入板块 = block_in_datas
                    stock_variables.当前价 =  big_order[1][4]
                    compute_result = self.__run_backtest(code, stock_variables)
                    # print(compute_result)
                    self.__process_test_result(code, stock_variables, next_trade_day, big_order[1][4],
                                               huaxin_util.convert_time(big_order[1][3]), compute_result)
                    stock_variables.当前价 = big_order[1][4]
                    try:
                        compute_result = self.__run_backtest(code, stock_variables)
                        # print(compute_result)
                        self.__process_test_result(code, stock_variables, next_trade_day, big_order[1][4],
                                                   huaxin_util.convert_time(big_order[1][3]), compute_result)
                    except Exception as e:
                        logging.exception(e)
        print("可买题材:", all_new_plates)
@@ -802,13 +807,11 @@
if __name__ == "__main__":
    back_test_dict = {}
    days = ["2025-05-12", "2025-05-13", "2025-05-14", "2025-05-15", "2025-05-16", "2025-05-19", "2025-05-20",
            "2025-05-21", "2025-05-22", "2025-05-23", "2025-05-26", "2025-05-27", "2025-05-28", "2025-05-29",
            "2025-05-30", "2025-06-03"]
    # days = ["2025-05-12", "2025-05-13", "2025-05-14", "2025-05-15", "2025-05-16", "2025-05-19", "2025-05-20",
    #         "2025-05-21", "2025-05-22", "2025-05-23", "2025-05-26", "2025-05-27", "2025-05-28", "2025-05-29",
    #         "2025-05-30", "2025-06-03", "2025-06-04", "2025-06-05", "2025-06-06", "2025-06-09", "2025-06-10",
    #         "2025-06-11", "2025-06-12", "2025-06-13", "2025-06-16", "2025-06-17"]
    #         "2025-05-30", "2025-06-03"]
    days = ["2025-06-03", "2025-06-04", "2025-06-05", "2025-06-06", "2025-06-09", "2025-06-10",
            "2025-06-11", "2025-06-12", "2025-06-13", "2025-06-16", "2025-06-17", "2025-06-18"]
    # days = ["2025-05-23"]
trade/trade_manager.py
@@ -176,14 +176,11 @@
    def __init__(self):
        self.musql = Mysqldb()
        # 成交得订单信息
        self.__deal_code_orders_info = {}
        self.redis_manager = redis_manager.RedisManager(12)
        # 下过单的板块代码
        self.__place_order_plate_codes_info = {}
        # 委托得订单信息:{code#order_ref:}
        self.__delegate_code_orders = {}
        self.__load_data()
    def __get_redis(self):
        return self.redis_manager.getRedis()
    def __load_data(self):
        # 不算打板的数据
@@ -193,13 +190,9 @@
            for r in results:
                self.add_deal_order(r[1], r[4], round(float(r[3]), 2), r[0], r[2])
        val = RedisUtils.get(self.__get_redis(), "place_order_plate_codes_info")
        if val:
            self.__place_order_plate_codes_info = json.loads(val)
    def add_deal_order(self, code, volume, price, trade_id, order_sys_id):
        """
        添加成交大单
        添加成交订单
        @param code:
        @param volume:
        @param price:
@@ -217,31 +210,148 @@
            return
        self.__deal_code_orders_info[code][trade_id] = (volume, price, order_sys_id)
    def set_order_status(self, code, order_ref, order_sys_id, price, volume, status):
        """
        设置订单状态
        @param code:
        @param order_ref:
        @param order_sys_id:
        @param price:
        @param volume:
        @param status:
        @return:
        """
        k = f"{code}#{order_ref}"
        if k not in self.__delegate_code_orders:
            return
        # [代码,订单索引,订单号,价格,量,状态,板块集合]
        data = self.__delegate_code_orders[k]
        data[2] = order_sys_id
        data[5] = status
        data[3] = price
        data[4] = volume
        # 如果订单已经取消就需要删除
        if status == huaxin_util.TORA_TSTP_OST_AllCanceled or status == huaxin_util.TORA_TSTP_OST_Rejected:
            data = self.__delegate_code_orders.pop(k)
            if data:
                PlatePlaceOrderManager().remove_plates_code(data[6], code)
    def get_deal_codes(self):
        if not self.__deal_code_orders_info:
            return set()
        return set(self.__deal_code_orders_info.keys())
    def place_order(self, plate, code):
    def place_order(self, plates, code, order_ref, price, volume):
        """
        下单
        @param plate:
        @param plates:
        @param code:
        @param order_ref:
        @param price:
        @param volume:
        @return:
        """
        # 初始化委托数据 [代码,订单索引,订单号,价格,量,状态,板块集合]
        data = [code, order_ref, '', price, volume, huaxin_util.TORA_TSTP_OST_Unknown, plates]
        k = f"{code}#{order_ref}"
        if k not in self.__delegate_code_orders:
            self.__delegate_code_orders[k] = data
        PlatePlaceOrderManager().add_plates_code(plates, code)
    def place_order_fail(self, code, order_ref):
        """
        下单失败了
        @param code:
        @param order_ref:
        @return:
        """
        k = f"{code}#{order_ref}"
        if k in self.__delegate_code_orders:
            data = self.__delegate_code_orders.pop(k)
            if data:
                PlatePlaceOrderManager().remove_plates_code(data[6], code)
    def get_deal_or_delegated_codes(self):
        """
        获取已经成交或者委托的代码
        @return:
        """
        codes = set()
        if self.__delegate_code_orders:
            for k in self.__delegate_code_orders:
                codes.add(self.__delegate_code_orders[k][0])
        if self.__deal_code_orders_info:
            codes |= set(self.__deal_code_orders_info.keys())
        return codes
@tool.singleton
class PlatePlaceOrderManager:
    """
    板块下单管理
    """
    def __init__(self):
        self.__db = 12
        self.redis_manager = redis_manager.RedisManager(self.__db)
        # 下过单的板块代码
        self.__place_order_plate_codes_info = {}
        self.__load_data()
    def __get_redis(self):
        return self.redis_manager.getRedis()
    def __load_data(self):
        val = RedisUtils.get(self.__get_redis(), "place_order_plate_codes_info")
        if val:
            self.__place_order_plate_codes_info = json.loads(val)
    def add_plates_code(self, plates, code):
        """
        添加板块下单
        @param plates:
        @param code:
        @return:
        """
        if plate not in self.__place_order_plate_codes_info:
            self.__place_order_plate_codes_info[plate] = []
        if code not in self.__place_order_plate_codes_info[plate]:
            self.__place_order_plate_codes_info[plate].append(code)
        for plate in plates:
            if plate not in self.__place_order_plate_codes_info:
                self.__place_order_plate_codes_info[plate] = []
            if code not in self.__place_order_plate_codes_info[plate]:
                self.__place_order_plate_codes_info[plate].append(code)
        self.__sync_plate_place_order_info()
    def __sync_plate_place_order_info(self):
        """
        同步板块下单信息
        @return:
        """
        RedisUtils.setex_async(self.__db, "place_order_plate_codes_info", tool.get_expire(),
                               json.dumps(self.__place_order_plate_codes_info))
    def get_place_order_plate_codes(self):
    def remove_plates_code(self, plates, code):
        """
        移除板块下单
        @param plates:
        @param code:
        @return:
        """
        for plate in plates:
            if plate in self.__place_order_plate_codes_info:
                if code in self.__place_order_plate_codes_info[plate]:
                    self.__place_order_plate_codes_info[plate].remove(code)
        self.__sync_plate_place_order_info()
    def get_plate_codes(self):
        return self.__place_order_plate_codes_info
__CodesTradeStateManager = CodesTradeStateManager()
if __name__ == "__main__":
    codes = DealCodesManager().get_codes()
    print(codes)
    PlatePlaceOrderManager().add_plates_code({"通信","计算机"}, "000333")
    place_order_plate_codes = PlatePlaceOrderManager().get_plate_codes()
    code_sets = [set(lst) for lst in place_order_plate_codes.values()]
    # 2. 使用 set.union() 求并集
    union_code_sets = set().union(*code_sets)
    print(union_code_sets)
utils/tool.py
@@ -318,6 +318,14 @@
    return max(price1, price2)
# 获取买入价格笼子的最高价
def get_buy_max_price(price):
    price1 = price * (1 + 0.02)
    price1 = math.ceil(price1 * 100) / 100
    price2 = price + 0.1
    return max(price1, price2)
# 获取买入价格笼子的最低价
def get_shadow_price(price):
    # fprice = round((100 - random.randint(2, 10)) * price / 100, 2)