""" 该模块为全局化变量的模块 不要试图把所有数据都全局化了 """ import datetime import json import logging import os import constant # 引入掘金API import utils.juejin_api from log_module.log import logger_common # from logging_config import get_logger from utils import tool, hx_qc_value_util # 获取logger实例 logger = logger_common # 全局日志配置 如果不想用就把 logging.ERROR 如果要打就=logging.INFO logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') @tool.singleton class DataCache: """ 数据缓存 """ # ---定义变量--- today_date = None pre_trading_day = None double_pre_trading_day = None next_trading_day = None all_stocks = None filtered_stocks = None min_instruments = None min_stocks = None code_name_dict = {} def __init__(self): logging.info("全局初始化数据 开始》》》") ''' 设定基本日期 ''' # 今日时间初始化 now = datetime.datetime.now() # 获取本机时间 print(f"本机时间::{now}") # 本机启动时间初始化 Local_startup_time = datetime.datetime.now().strftime("%H:%M:%S") print(f"本机启动时间==={Local_startup_time}") logger.info(f"本机启动时间==={Local_startup_time}") # 全局化 【今日日期 上一个交易日期 上上个交易日期 下一个交易日期】 self.today_date = now.strftime('%Y-%m-%d') # 获取今日日期参数 print(f"今日日期{self.today_date}") self.pre_trading_day = hx_qc_value_util.get_previous_trading_date(self.today_date) # 获取前一个交易日 print(f"上一个交易日{self.pre_trading_day}") self.double_pre_trading_day = hx_qc_value_util.get_previous_trading_date(self.pre_trading_day) # 获取上上一个交易日 print(f"上上一个交易日{self.double_pre_trading_day}") self.next_trading_day = hx_qc_value_util.get_next_trading_date(self.today_date) # 获取下一个交易日 print(f"下一个交易日{self.next_trading_day}") ''' 设定基本时间点 ''' # 获取A股市场(包含沪深两市)的股票列表跳过停牌,跳过ST 上交所 SHSE.600000 深交所 SZSE.000000 target = ['SHSE.603839', 'SZSE.002855'] self.all_stocks = utils.juejin_api.JueJinApi.get_target_codes() # self.all_stocks = [{'sec_level': 1, 'symbol': 'SZSE.301633','pre_close': 78.72000122070312, 'is_suspended': 0, 'sec_name': '港迪技术', 'listed_date': datetime.datetime(2024, 11, 7, 0, 0,tzinfo=datetime.timezone(datetime.timedelta(seconds=28800))), 'sec_type': 1, 'sec_id': '301633'}] # self.all_stocks = self.all_stocks[:10] # 缓存代码的名称 self.code_name_dict = {x['symbol']: x['sec_name'] for x in self.all_stocks} print("A股所有代码数量:", len(self.all_stocks)) # 只要上证A股和深证A股 # self.filtered_stocks = [stock for stock in self.all_stocks if # stock.startswith('SHSE.60') or (stock.startswith('SZSE.00'))] self.filtered_stocks = [stock['symbol'] for stock in self.all_stocks if isinstance(stock.get('symbol'), str) and ( stock['symbol'].startswith('SHSE.60') or stock['symbol'].startswith( 'SZSE.00'))] # self.filtered_stocks = self.filtered_stocks[:10] print(f"过滤后上证A股和深证A股数量filtered_stocks:{len(self.filtered_stocks)}") # 声明一下需要拉取K线的列表 self.main_index_stocks = ['SHSE.000001', 'SZSE.399001', 'SZSE.399006', 'SHSE.000300'] # 获取上证A股和深证A股 基本信息 instruments = [stock for stock in self.all_stocks if stock['symbol'] in self.filtered_stocks] # 将获取到的上证A股和深证A股 基本信息 汇编为一个字典 basic_info_symbols_dict = {} # 初始化没有K线的股票 no_k_line_stocks = [] for i in instruments: basic_info_symbols_dict[i["symbol"]] = i # print(f"basic_info_symbols_dict ====={basic_info_symbols_dict}") # basic_info_symbols_dict ===== 'SHSE.600611': {'symbol': 'SHSE.600611', 'sec_name': '大众交通', 'pre_close': 2.809999942779541}, # if ['sec_name'] # 如果公司名称中不包含["ST", "退市", "退", "XD", "XR", "DR", "N"] 则继续逻辑判断 if any(keyword in i['sec_name'] for keyword in ["ST", "退市", "退", "N", "C"]): # print(f"非ST, 退市, 退, XD, N==={basic_info['sec_name']} 继续判断") no_k_line_stocks.append(i['symbol']) # 将列表推导式移出循环 过滤出昨日收盘价在2-30元的股票 min_instruments = [stock for stock in instruments if 3 < stock['pre_close'] < 30] # print(f"min_instruments 数量==={len(min_instruments)}") # 使用 get 方法安全地获取 symbol 字段的值 self.min_stocks = [stock.get('symbol') for stock in min_instruments if stock.get('symbol') is not None] print(f"min_stocks 数量 ==={len(self.min_stocks)}") # logger.info(f"min_stocks==={self.min_stocks}") logging.info(f"全局初始化数据 完成《《《 - {os.getpid()}") # 获取当前进程的PID ''' 设定常用当前时间函数方法,为什么这里不是一个函数方法???? ''' now_time = datetime.datetime.now().strftime("%H:%M:%S") # 定义并实时获取 当前时间 ''' 设定常用时间点【常量】 字符串格式=="09:25:12" ''' SERVER_RESTART_TIME = "09:00:00" # 服务器重启时间 L1_DATA_START_TIME = "09:15:00" # L1数据开始时间 BEFORE_OPEN_BIDDING_TIME = "09:20:00" # 【盘前】集合竞价开始前时间 OPEN_BIDDING_TIME = "09:25:00" # 【盘前】集合竞价开始时间 LATER_OPEN_BIDDING_TIME = "09:25:06" # 【盘前】集合竞价开始后瞬间 AFTER_OPEN_BIDDING_TIME = "09:25:12" # 【盘前】集合竞价开始后一会 OPENING_TIME = "09:30:00" # 上午开盘时间 MORN_MARKET_TIME = "09:35:00" # 早盘黄金五分钟 MORN_MARKET_CLOSING_TIME = "11:30:00" # 上午收盘时间 NOON_MARKET_OPENING_TIME = "13:00:00" # 下午开盘时间 NOON_MARKET_TIME = "13:05:00" # 午盘黄金五分钟 CLOSE_POSITION_TIME = "14:55:00" # 尾盘平仓时间 WATCH_DISK_END_TIME = "14:56:00" # 板上盯结束时间 CLOSE_BIDDING_TIME = "14:57:00" # 【盘后】集合竞价开始 CLOSING_TIME = "15:00:00" # 定义 收盘时间 AFTER_CLOSING_TIME = "15:01:00" # 下午收盘后时间 CHECKING_DATA_TIME = "17:00:00" # 检查数据时间 UPDATE_DATA_TIME = "18:31:00" # 更新数据时间 PROGRAM_SLEEP_TIME = "22:00:00" # 程序休眠时间【不能在23点之后仍运行,获取到的数据可能有谬误】 # 初始化当日当时最高价 high_price = 0 # 初始化当日当时最低价 low_price = 0 # 初始化每日涨停信息 daily_limit_up_info = [] # 初始化开盘啦涨停昨日信息涨停代码 yesterday_limit_up_code_list = [] # 初始化开盘啦昨日炸板但通过K线数据计算涨停的股票代码列表 yesterday_frying_plate_last_minute_list = [] # 初始化每日涨停信息方法 和 获取所有个股的板块概念 是否被执行 execution = False # 初始化 写入带属性指数K线的方法 是否被执行 index_K_line_write_execution = False # 初始化实时涨停概念板块 limit_up_block_names = [] # 初始化板块强度下的个股强度 market_sift_plate_stock_dict = {} # 初始化实时大盘行情市场情绪综合强度【完整】字典 rise_and_fall_statistics_dirt = {} # 初始化实时大盘行情情绪综合强度[分数] real_time_market_strong = 0 # 初始化实时大盘行情市场情绪 涨跌统计 字典 real_time_market_sentiment_dirt = {} # 为所有个股的带属性K线 字典初始化 all_stocks_all_K_line_property_dict = {} # 为指数的带属性K线 字典初始化 all_index_k_line_property_dict = {} # 为所有个股的所属板块概念字典初始化 all_stocks_plate_dict = {} # 声明K线属性中所有涨停类型 limit_up_type = ['one_line_limit_up', 'limit_down_then_limit_up', 'limit_up_then_limit_down_then_limit_up', 'limit_up'] # 声明K线属性中所有跌停类型 limit_down_type = ['limit_up_then_limit_down', 'one_line_limit_down', 'limit_down', 'touch_limit_down'] # 声明K线属性中所有炸板类型 frying_plate_type = ['frying_plate', 'first_frying_plate', 'one_line_limit_up_then_frying_plate', 'one_line_limit_up_then_frying_plate_then_limit_down', 'not_first_frying_plate'] # 定义一个当日分时内所有个股的开盘价字典 all_stocks_current_open = {} # 定义一个当日分时内所有个股的实时最高价和最低价字典 all_stocks_current_high_and_low = {} # 定义一个当日分时内所有个股的上一个实时总成交量字典 all_stocks_last_volume = {} # 定义一个当前最新价初始化缓存 current_new_price = 0 # 为当前账户全部【资金信息】创建字典 account_finance_dict = {} # 为当前账户可用资金创建一个变量 usefulMoney = 0 # 为当前账户全部【持仓信息】创建列表 account_positions_dict = {} # 下面三个集合是为了分类整理持仓字典数据,将各类持仓情况的数据整理为需要的集合 # 为持仓代码创建一个初始集合 position_symbols_set = set() # 为当日可用持仓代码创建一个初始集合 available_symbols_set = set() # 为当日新增加持仓代码创建一个初始集合 买入策略中会用到 判断是否成功买了三支票 addition_position_symbols_set = set() # 定义一个记录开盘价(开盘时最新价)是否记录 record_current_open_execution = False # 定义一个实时计算最高价和最低价的函数是否在规定时间内启动 record_high_and_low_execution = False # 定义一个集合竞价时间段条件分支逻辑的初始执行次数 execution_times = 0 # 定义一个有概念逻辑分支买入进入执行的初始化次数 have_plate_buy_times = 0 # 定义一个有强度逻辑分支买入进入执行的初始化次数 have_strength_buy_times = 0 # 定义一个有小量换大涨幅逻辑分支买入进入执行的初始化次数 have_small_turn_large_buy_times = 0 # 创建一个已买板块列表 bought_plate = [] # 持仓金额自动管理开关 position_automatic_management_switch = True # GUI手动设置的每次买入的金额 BUY_MONEY_PER_CODE = -1 # 初始化有概念买入金额 have_plate_buy_money = 3000 # 初始化有强度买入金额 have_strength_buy_money = 3000 # 开仓策略计算金额 opening_strategy_results = 0 # 今日第一次下单的金额 today_first_planned_order_amount = 0 # 今日计划下单金额 today_planned_order_amount = 0 # 最新的市场行情数据 # {"code":(代码,昨日收盘价,最新价,总成交量,总成交额,买五档(价格,成交额),卖五档(价格,成交额),更新时间)} latest_code_market_info_dict = {} # 主要指数 掘金代码 列表:[上证指数,深证成指,创业扳指,沪深300] INDEX_SYMBOL_LIST = ['SHSE.000001', 'SZSE.399001', 'SZSE.399006', 'SHSE.000300'] # 股票指数字典 例如:{"000001":(指数, 量, 额, 昨日收盘价)} stock_index_dict = {} # 上证指数 瞬时涨幅 Shanghai_tick_growth = 0 # 上证指数 当日涨幅 Shanghai_today_growth = 0 # 上证指数 开盘涨幅 Shanghai_open_growth = 0 # 深证指数 瞬时涨幅 Shenzhen_tick_growth = 0 # 深证指数 当日涨幅 Shenzhen_today_growth = 0 # 深证指数 开盘涨幅 Shenzhen_open_growth = 0 # 创业板指 瞬时涨幅 TSXV_tick_growth = 0 # 创业板指 当日涨幅 TSXV_today_growth = 0 # 创业板指 开盘涨幅 TSXV_open_growth = 0 # 大盘指数情绪预期分数 index_trend_expectation_score = 0 # 理想交易行情比率分 满分=1 ideal_trading_market_score = 1 # 可以板上盯卖的代码 LIMIT_UP_SELL_CODES = set() # L2炸板数据 L2_explosion_data = {} # 最新的L1数据: {代码: L1数据} current_l1_dict = {} # 最新成交价格 latest_deal_price_dict = {} # 大单成交数据: {"代码":[大单数据1,大单数据2,...]} big_order_deal_dict = {} # 有意购买的股票名称列表 willing_buy_list = [] # 已成交股票详情列表 purchased_stocks_details_list = [] logging.info(f"全局初始化数据 完成《《《 - {os.getpid()}")