main.py
@@ -22,6 +22,7 @@
    selling_strategy
from huaxin_client import l2_market_client, l2_client, trade_client
from log_module import async_log_util, log
from strategy.order_methods import TodayBuyCodeManager
from trade import huaxin_trade_data_update, huaxin_trade_api
from utils import hx_qc_value_util, huaxin_util, juejin_api, tool
@@ -67,19 +68,9 @@
    local_data_management.read_local_K_line_data()
    # 读取本地个股所属板块数据 并更新到data_cache
    local_data_management.read_local_all_stocks_plate_data()
    # 初始化拉取当日买入代码
    TodayBuyCodeManager()
    # todo 2025-03-25 测试无误即可删除下部注释
    # # 先使用json.load()直接从文件中读取【已经存储在本地的K线指标属性字典】并解析JSON数据
    # if os.path.exists(constant.K_BARS_PATH):
    #     with open(constant.K_BARS_PATH, 'r', encoding='utf-8') as f:
    #         data_cache.all_stocks_all_K_line_property_dict = json.load(f)
    #         print(
    #             f"data_cache.all_stocks_all_K_line_property_dict的个数==={len(data_cache.all_stocks_all_K_line_property_dict)}")
    # # 获取目标标的K线---初始化
    # all_K_line.main_index_k_line_history.init(data_cache.DataCache().today_date, data_cache.DataCache().next_trading_day, data_cache.DataCache().main_index_stocks)
    # # 直接调用主要指数K线写入本地文件
    # all_K_line.main_index_k_line_dict_write()
# 第一步:初始化context函数,并开启获取实时数据的线程
@@ -101,8 +92,11 @@
    #  开盘啦的涨停概念的回调函数
    def kpl_limit_up_process(datas):
        # print(f"回调成功==={datas}")
        now_time = tool.get_now_time_str()
        if datas is not None and len(datas) > 0:
            data_cache.limit_up_block_names = datas
            if data_cache.CLOSING_TIME < now_time < data_cache.AFTER_CLOSING_TIME:
                logger_common.info(f"收盘涨停概念列表更新==={now_time}=={datas}")
        else:
            data_cache.limit_up_block_names = []
@@ -116,10 +110,15 @@
                     daemon=True).start()
    # # 开盘啦的板块强度下的个股强度回调函数
    def get_market_sift_plate_its_stock_power_process(market_sift_plate_stock_dict):
    def get_market_sift_plate_its_stock_power_process(market_sift_plate_info):
        # print(f"回调成功===精选板块股票强度数据更新==={market_sift_plate_stock_dict}")
        # logger_kpl_jingxuan_in.info(f"{market_sift_plate_stock_dict}")
        data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict
        if market_sift_plate_info is not None:
            return
        market_sift_plate_stock_dict,  market_sift_plates = market_sift_plate_info[0], market_sift_plate_info[1]
        if market_sift_plate_stock_dict:
            data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict
            data_cache.market_sift_plates = market_sift_plates
    # 板块强度下个股强度线程
    threading.Thread(target=plate_strength_analysis.get_market_sift_plate_its_stock_power_process,
@@ -160,7 +159,8 @@
        :return:
        """
        if datas:
            logger_debug.debug(f"{code} - L2逐笔成交上报:{len(datas)}")
            start_time = time.time()
            # logger_debug.debug(f"{code} - L2逐笔成交上报:{len(datas)}")
            price, time_str = datas[-1][1], huaxin_util.convert_time(datas[-1][3])
            try:
                # 获取最近的成交价
@@ -175,7 +175,7 @@
                limit_up_price = tool.get_limit_up_price(code, self.__pre_close_price_dict[code])
                if code in self.__last_price_dict:
                    if abs(limit_up_price - self.__last_price_dict[code]) < 0.0001 < abs(limit_up_price - price):
                        # TODO 处理炸板逻辑
                        # 处理炸板逻辑
                        # 监听了炸板了要做的函数
                        try:
                            selling_strategy.explosion_strategy(code)
@@ -186,6 +186,11 @@
                logger_debug.exception(e)
            finally:
                self.__last_price_dict[code] = price
                data_cache.latest_deal_price_dict[code] = price
                use_time = time.time() - start_time
                if use_time > 0.1:
                    logger_debug.warning(f"L2逐笔成交处理耗时:{use_time} 最后一条数据:{datas[-1]}")
    def OnMarketData(self, code, datas):
        # logger_debug.info(f"收到L2Market数据:{datas}")
@@ -195,9 +200,10 @@
    # 实时L2买1成交量
    def OnRealTimeBuy1Info(self, code, buy1_info):
        pass
        # buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量]
        # 最终的买1为: 原始买1量+实时买1量
        async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}")
        # async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}")
        # L1DataProcessor.excute_sell_rule(code, buy1_info[3], buy1_info[1], "l2-real")
@@ -230,8 +236,6 @@
# 第三步:执行策略的初始设置
if __name__ == '__main__':
    log.close_print()
    class MyMarketDataCallback(l2_market_client.L2MarketDataCallback):
        def on_markets(self, datas):
            """
@@ -247,7 +251,8 @@
    # 加载开盘啦板块日志数据
    kpl_data_manager.KPLStockOfMarketsPlateLogManager()
    kpl_data_manager.KPLMarketsSiftPlateLogManager()
    kpl_data_manager.KPLMarketStockHeatLogManager()
    # 启动异步日志
    threading.Thread(target=async_log_util.run_sync, daemon=True).start()
@@ -261,8 +266,7 @@
    # 不是模拟盘的时候启动交易
    if not constant.IS_SIMULATED_TRADE:
        multiprocessing.Process(target=trade_client.run, args=(
            queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r,),
                                daemon=True).start()
            queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r,)).start()
    # 启动交易
    order_methods.run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r)