# coding=utf-8 from __future__ import print_function, absolute_import, unicode_literals import logging import threading import time import constant # 引入掘金桥梁API import utils.juejin_api from db.redis_manager_delegate import RedisUtils from huaxin_client.l2_data_transform_protocol import L2DataCallBack from log_module.log import logger_common, logger_kpl_jingxuan_in, logger_system, logger_debug # 引入开盘啦API模块 # 引入全局变量模块 # 引入定时器模块 # 引入历史K线方法模块 # 引入瞬时分时行情模块 # 引入账户管理模块【进行资金和仓位管理】 from strategy import kpl_api, data_cache, check_timer, all_K_line, instant_time_market, account_management, \ order_methods, local_data_management, kpl_data_manager, market_sentiment_analysis from huaxin_client import l2_market_client, l2_client from log_module import async_log_util from trade import huaxin_trade_data_update, huaxin_trade_api from utils import hx_qc_value_util, huaxin_util # 引入行情订阅模块 # import subscribe_market # 全局日志配置 如果不想用就把 logging.ERROR 如果要打就=logging.INFO logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # 计算当前线程数量 def get_current_thread_count(): # print(f"threading.active_count()=={threading.active_count()}") return threading.active_count() # 初始化数据 函数 def init_data(): # logging.info("main初始化数据开始") logger_common.info(f"main初始化数据开始") ''' 资金管理初始化 仓位管理初始化 都在main的初始化数据函数里面完成 无需掘金考虑的线程或进程方式实现 ''' # 初始化账户仓位管理数据 account_management.finance_management() # 初始化账户仓位管理数据 account_management.position_management() # 初始化.实例化缓存中的全局数据 data_cache.DataCache() # 读取本地K线数据 并更新到data_cache # 初始化A股所有目标票标的信息 data_cache.all_stocks = utils.juejin_api.JueJinApi.get_target_codes() # 获取目标标的K线---初始化 all_K_line.k_line_history.init(data_cache.DataCache().today_date, data_cache.DataCache().next_trading_day, data_cache.DataCache().filtered_stocks) # 直接调用目标标的指标K线写入本地文件 # all_K_line.all_stocks_all_k_line_dict_write() local_data_management.read_local_K_line_data() # 读取本地个股所属板块数据 并更新到data_cache local_data_management.read_local_all_stocks_plate_data() # todo 2025-03-25 测试无误即可删除下部注释 # # 先使用json.load()直接从文件中读取【已经存储在本地的K线指标属性字典】并解析JSON数据 # if os.path.exists(constant.K_BARS_PATH): # with open(constant.K_BARS_PATH, 'r', encoding='utf-8') as f: # data_cache.all_stocks_all_K_line_property_dict = json.load(f) # print( # f"data_cache.all_stocks_all_K_line_property_dict的个数==={len(data_cache.all_stocks_all_K_line_property_dict)}") # # 获取目标标的K线---初始化 # all_K_line.main_index_k_line_history.init(data_cache.DataCache().today_date, data_cache.DataCache().next_trading_day, data_cache.DataCache().main_index_stocks) # # 直接调用主要指数K线写入本地文件 # all_K_line.main_index_k_line_dict_write() # 第一步:初始化context函数,并开启获取实时数据的线程 def init(): # 在初始化函数里面就调用全局化变量函数(即上文缩写的init_data函数) init_data() # 实时运行定时器线程【定时器函数目前 只管理 15:00 后运行一次 整理当日涨停信息 和 获取所有个股的板块概念】 threading.Thread(target=lambda: check_timer.check_time(), daemon=True).start() # 获取实时大盘行情情绪综合强度 [分数] 线程 threading.Thread(target=lambda: market_sentiment_analysis.get_real_time_market_strong(), daemon=True).start() # 实时检测是否拉取K线线程 threading.Thread(target=lambda: all_K_line.check_time_and_data_date(), daemon=True).start() # print(f"all_stocks_all_K_line_property_dict== {type(data_cache.all_stocks_all_K_line_property_dict)}") # 获取实时大盘指数行情线程 threading.Thread(target=lambda: instant_time_market.index_market_current(), daemon=True).start() # instant_time_market.index_market_trend() # 开盘啦的涨停概念的回调函数 def kpl_limit_up_process(datas): # print(f"回调成功==={datas}") if datas is not None and len(datas) > 0: data_cache.limit_up_block_names = datas else: data_cache.limit_up_block_names = [] # # 计算当前线程数量 # get_current_thread_count() # 开启开盘啦 涨停列表 和 全盘个股概念板块 接口线程 # 涨停概念线程 # threading.Thread(target=kpl_api.kpl_limit_up_process, daemon=True).start() #该行代码为只运行单一线程不回调数据的方式 threading.Thread(target=kpl_api.kpl_limit_up_process, args=(kpl_limit_up_process,), daemon=True).start() # # 开盘啦的板块强度下的个股强度回调函数 def get_market_sift_plate_its_stock_power_process(market_sift_plate_stock_dict): # print(f"回调成功===精选板块股票强度数据更新==={market_sift_plate_stock_dict}") # logger_kpl_jingxuan_in.info(f"{market_sift_plate_stock_dict}") data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict # 板块强度下个股强度线程 threading.Thread(target=kpl_api.get_market_sift_plate_its_stock_power_process, args=(get_market_sift_plate_its_stock_power_process,), daemon=True).start() # 初始化get_current_data方法函数,下单买逻辑才会运行中。。。【核心主线程,随时考虑其启动顺序】>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> # threading.Thread(target=lambda: instant_time_market.get_current_data(), daemon=True).start() try: # 计算开盘啦昨日拉取的概念数据中为空的股票数量 kpl_api.get_have_no_plate_num() except Exception as e: logger_system.exception(e) # 获取历史涨停信息数据并整理 try: kpl_api.get_handling_limit_up_info() except Exception as e: logger_system.exception(e) try: # 检查K线昨日涨停是否正确 all_K_line.check_limit_up_attribute() except Exception as e: logger_system.exception(e) # # 获取所有个股的板块概念并写入文件【耗时较长应该放在 核心主线程 和 仓位管理 后面】 # kpl_api.get_all_stocks_plate_dict(data_cache.min_stocks) class MyPositionsL2DataCallback(L2DataCallBack): __last_price_dict = {} def OnL2Transaction(self, code, datas): if datas: # 获取最近的成交价 price, time_str = datas[-1][1], huaxin_util.convert_time(datas[-1][3]) # TODO 涨停价变为非涨停价才处理 self.__last_price_dict[code] = price def OnMarketData(self, code, datas): # logger_debug.info(f"收到L2Market数据:{datas}") for d in datas: code = d["securityID"] buy1 = d["buy"][0] def OnRealTimeBuy1Info(self, code, buy1_info): # buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量] async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}") # L1DataProcessor.excute_sell_rule(code, buy1_info[3], buy1_info[1], "l2-real") l2_data_callbacks = [] def __subscript_position_l2(): """ 订阅持仓L2数据 :return: """ position_result = huaxin_trade_api.get_position_list(blocking=True) logger_debug.info(f"获取持仓结果:{position_result}") if not position_result or position_result["code"] != 0 or not position_result["data"]: return positions = position_result["data"] subscript_codes = set() for p in positions: if p["prePosition"] > 0: subscript_codes.add(p["securityID"]) if not subscript_codes: return for i in range(len(subscript_codes)): l2_data_callbacks.append(MyPositionsL2DataCallback()) l2_client.run(subscript_codes, l2_data_callbacks) # 第三步:执行策略的初始设置 if __name__ == '__main__': class MyMarketDataCallback(l2_market_client.L2MarketDataCallback): def on_markets(self, datas): """ L1数据回调 :param datas: :return: """ data_cache.latest_code_market_info_dict = {x[0]: x for x in datas} if datas: print(datas[0]) if constant.is_windows(): instant_time_market.get_current_info() else: instant_time_market.set_current_info(datas) # 加载开盘啦板块日志数据 kpl_data_manager.KPLStockOfMarketsPlateLogManager() # 启动异步日志 threading.Thread(target=async_log_util.run_sync, daemon=True).start() # redis 数据同步 threading.Thread(target=RedisUtils.run_loop, daemon=True).start() # 启动交易 order_methods.run() # 运行华鑫增值服务进程,用于获取K线与交易日历 threading.Thread(target=hx_qc_value_util.run, daemon=True).start() # 运行交易数据更新服务 huaxin_trade_data_update.run() # 等待5s,等其他线程/进程启动完毕 time.sleep(15) # 订阅持仓票 threading.Thread(target=__subscript_position_l2, daemon=True).start() try: # 初始化数据 init() except Exception as e: logger_system.exception(e) # 需要订阅的目标代码 target_codes = [x["sec_id"] for x in data_cache.DataCache().all_stocks] # 订阅L2 market行情 l2_market_client.run(target_codes, MyMarketDataCallback())