# coding=utf-8 from __future__ import print_function, absolute_import, unicode_literals import logging import multiprocessing import threading import time import constant # 引入掘金桥梁API import utils.juejin_api from db.redis_manager_delegate import RedisUtils from huaxin_client.l2_data_transform_protocol import L2DataCallBack from log_module.log import logger_common, logger_kpl_jingxuan_in, logger_system, logger_debug # 引入开盘啦API模块 # 引入全局变量模块 # 引入定时器模块 # 引入历史K线方法模块 # 引入瞬时分时行情模块 # 引入账户管理模块【进行资金和仓位管理】 from strategy import kpl_api, data_cache, check_timer, all_K_line, instant_time_market, account_management, \ order_methods, local_data_management, kpl_data_manager, market_sentiment_analysis, plate_strength_analysis, \ selling_strategy from huaxin_client import l2_market_client, l2_client, trade_client from log_module import async_log_util, log from strategy.order_methods import TodayBuyCodeManager from trade import huaxin_trade_data_update, huaxin_trade_api from utils import hx_qc_value_util, huaxin_util, juejin_api, tool # 引入行情订阅模块 # import subscribe_market # 全局日志配置 如果不想用就把 logging.ERROR 如果要打就=logging.INFO logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # 计算当前线程数量 def get_current_thread_count(): # print(f"threading.active_count()=={threading.active_count()}") return threading.active_count() # 初始化数据 函数 def init_data(): # logging.info("main初始化数据开始") logger_common.info(f"main初始化数据开始") ''' 资金管理初始化 仓位管理初始化 都在main的初始化数据函数里面完成 无需掘金考虑的线程或进程方式实现 ''' # 初始化账户仓位管理数据 account_management.finance_management() # 初始化账户仓位管理数据 account_management.position_management() # 初始化.实例化缓存中的全局数据 data_cache.DataCache() # 读取本地K线数据 并更新到data_cache # 初始化A股所有目标票标的信息 data_cache.all_stocks = utils.juejin_api.JueJinApi.get_target_codes() # 获取目标标的K线---初始化 all_K_line.k_line_history.init(data_cache.DataCache().today_date, data_cache.DataCache().next_trading_day, data_cache.DataCache().filtered_stocks) # 直接调用目标标的指标K线写入本地文件 # all_K_line.all_stocks_all_k_line_dict_write() local_data_management.read_local_K_line_data() # 读取本地个股所属板块数据 并更新到data_cache local_data_management.read_local_all_stocks_plate_data() # 初始化拉取当日买入代码 TodayBuyCodeManager() # 第一步:初始化context函数,并开启获取实时数据的线程 def init(): # 在初始化函数里面就调用全局化变量函数(即上文缩写的init_data函数) init_data() # 实时运行定时器线程【定时器函数目前 只管理 15:00 后运行一次 整理当日涨停信息 和 获取所有个股的板块概念】 threading.Thread(target=lambda: check_timer.check_time(), daemon=True).start() # 获取实时大盘行情情绪综合强度 [分数] 线程 threading.Thread(target=lambda: market_sentiment_analysis.set_plan_position_quantity(), daemon=True).start() # 实时检测是否拉取K线线程 threading.Thread(target=lambda: all_K_line.check_time_and_data_date(), daemon=True).start() # print(f"all_stocks_all_K_line_property_dict== {type(data_cache.all_stocks_all_K_line_property_dict)}") # 获取实时大盘指数行情线程 threading.Thread(target=lambda: instant_time_market.index_market_current(), daemon=True).start() # instant_time_market.index_market_trend() # 开盘啦的涨停概念的回调函数 def kpl_limit_up_process(datas): # print(f"回调成功==={datas}") if datas is not None and len(datas) > 0: data_cache.limit_up_block_names = datas else: data_cache.limit_up_block_names = [] # # 计算当前线程数量 # get_current_thread_count() # 开启开盘啦 涨停列表 和 全盘个股概念板块 接口线程 # 涨停概念线程 # threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, daemon=True).start() #该行代码为只运行单一线程不回调数据的方式 threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, args=(kpl_limit_up_process,), daemon=True).start() # # 开盘啦的板块强度下的个股强度回调函数 def get_market_sift_plate_its_stock_power_process(market_sift_plate_info): # print(f"回调成功===精选板块股票强度数据更新==={market_sift_plate_stock_dict}") # logger_kpl_jingxuan_in.info(f"{market_sift_plate_stock_dict}") market_sift_plate_stock_dict, market_sift_plates = market_sift_plate_info[0], market_sift_plate_info[1] if market_sift_plate_stock_dict: data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict data_cache.market_sift_plates = market_sift_plates # 板块强度下个股强度线程 threading.Thread(target=plate_strength_analysis.get_market_sift_plate_its_stock_power_process, args=(get_market_sift_plate_its_stock_power_process,), daemon=True).start() # 初始化get_current_data方法函数,下单买逻辑才会运行中。。。【核心主线程,随时考虑其启动顺序】>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> # threading.Thread(target=lambda: instant_time_market.get_current_data(), daemon=True).start() try: # 计算开盘啦昨日拉取的概念数据中为空的股票数量 plate_strength_analysis.get_have_no_plate_num() except Exception as e: logger_system.exception(e) # 获取历史涨停信息数据并整理 try: plate_strength_analysis.get_handling_limit_up_info() except Exception as e: logger_system.exception(e) try: # 检查K线昨日涨停是否正确 all_K_line.check_limit_up_attribute() except Exception as e: logger_system.exception(e) # 持仓代码的L2数据回调 class MyPositionsL2DataCallback(L2DataCallBack): __last_price_dict = {} __pre_close_price_dict = {} # 昨日收盘价 def OnL2Transaction(self, code, datas): """ 昨日持仓L2逐笔成交回调 :param code: :param datas: :return: """ if datas: start_time = time.time() # logger_debug.debug(f"{code} - L2逐笔成交上报:{len(datas)}") price, time_str = datas[-1][1], huaxin_util.convert_time(datas[-1][3]) try: # 获取最近的成交价 if code not in self.__pre_close_price_dict: # 获取收盘价格 results = juejin_api.JueJinApi.history_n(tool.get_symbol(code), "1d", 1, 1, "close") if results: self.__pre_close_price_dict[code] = results[0]["close"] logger_debug.debug(f"{code} - 获取到昨日收盘价:{results[0]['close']}") if self.__last_price_dict.get(code) == price: return limit_up_price = tool.get_limit_up_price(code, self.__pre_close_price_dict[code]) if code in self.__last_price_dict: if abs(limit_up_price - self.__last_price_dict[code]) < 0.0001 < abs(limit_up_price - price): # 处理炸板逻辑 # 监听了炸板了要做的函数 try: selling_strategy.explosion_strategy(code) # 炸板s finally: logger_debug.info(f"炸板:{code}-({price},{time_str})") except Exception as e: logger_debug.exception(e) finally: self.__last_price_dict[code] = price data_cache.latest_deal_price_dict[code] = price use_time = time.time() - start_time if use_time > 0.1: logger_debug.warning(f"L2逐笔成交处理耗时:{use_time} 最后一条数据:{datas[-1]}") def OnMarketData(self, code, datas): # logger_debug.info(f"收到L2Market数据:{datas}") for d in datas: code = d["securityID"] buy1 = d["buy"][0] # 实时L2买1成交量 def OnRealTimeBuy1Info(self, code, buy1_info): pass # buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量] # 最终的买1为: 原始买1量+实时买1量 # async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}") # L1DataProcessor.excute_sell_rule(code, buy1_info[3], buy1_info[1], "l2-real") l2_data_callbacks = [] # 订阅持仓L2数据 def __subscript_position_l2(): """ 订阅持仓L2数据 :return: """ position_result = huaxin_trade_api.get_position_list(blocking=True) logger_debug.info(f"获取持仓结果:{position_result}") if not position_result or position_result["code"] != 0 or not position_result["data"]: return positions = position_result["data"] subscript_codes = set() for p in positions: if p["prePosition"] > 0: subscript_codes.add(p["securityID"]) if not subscript_codes: return for i in range(len(subscript_codes)): l2_data_callbacks.append(MyPositionsL2DataCallback()) l2_client.run(subscript_codes, l2_data_callbacks) # 第三步:执行策略的初始设置 if __name__ == '__main__': log.close_print() class MyMarketDataCallback(l2_market_client.L2MarketDataCallback): def on_markets(self, datas): """ L1数据回调 :param datas: :return: """ data_cache.latest_code_market_info_dict = {x[0]: x for x in datas} if constant.is_windows(): instant_time_market.get_current_info() else: instant_time_market.set_current_info(datas) # 加载开盘啦板块日志数据 kpl_data_manager.KPLMarketsSiftPlateLogManager() kpl_data_manager.KPLMarketStockHeatLogManager() # 启动异步日志 threading.Thread(target=async_log_util.run_sync, daemon=True).start() # redis 数据同步 threading.Thread(target=RedisUtils.run_loop, daemon=True).start() # 策略与交易通信队列 queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue() # 不是模拟盘的时候启动交易 if not constant.IS_SIMULATED_TRADE: multiprocessing.Process(target=trade_client.run, args=( queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r,)).start() # 启动交易 order_methods.run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r) # 运行华鑫增值服务进程,用于获取K线与交易日历 threading.Thread(target=hx_qc_value_util.run, daemon=True).start() # 运行交易数据更新服务 huaxin_trade_data_update.run() # 等待5s,等其他线程/进程启动完毕 time.sleep(15) # 订阅持仓票 threading.Thread(target=__subscript_position_l2, daemon=True).start() try: # 初始化数据 init() except Exception as e: logger_system.exception(e) # 需要订阅的目标代码 target_codes = [x["sec_id"] for x in data_cache.DataCache().all_stocks] # 订阅L2 market行情 l2_market_client.run(target_codes, MyMarketDataCallback())