admin
5 小时以前 aa5800658e452c0aa8c9a822395acb989e848f9f
main.py
@@ -4,6 +4,7 @@
import multiprocessing
import threading
import time
import schedule
import constant
# 引入掘金桥梁API
@@ -22,6 +23,9 @@
    selling_strategy
from huaxin_client import l2_market_client, l2_client, trade_client
from log_module import async_log_util, log
from strategy.order_methods import TodayBuyCodeManager
from strategy.trade_setting import BuyMoneyPerCodeManager, OpeningQuantityManager
from strategy.trading_dates_manager import TradingDatesManager
from trade import huaxin_trade_data_update, huaxin_trade_api
from utils import hx_qc_value_util, huaxin_util, juejin_api, tool
@@ -49,9 +53,15 @@
    无需掘金考虑的线程或进程方式实现
    '''
    # 初始化账户仓位管理数据
    account_management.finance_management()
    try:
        account_management.finance_management()
    except:
        pass
    # 初始化账户仓位管理数据
    account_management.position_management()
    try:
        account_management.position_management()
    except:
        pass
    # 初始化.实例化缓存中的全局数据
    data_cache.DataCache()
    # 读取本地K线数据 并更新到data_cache
@@ -67,19 +77,50 @@
    local_data_management.read_local_K_line_data()
    # 读取本地个股所属板块数据 并更新到data_cache
    local_data_management.read_local_all_stocks_plate_data()
    # 初始化拉取当日买入代码
    TodayBuyCodeManager()
    # 开仓金额初始化
    BuyMoneyPerCodeManager()
    # 开仓数量
    OpeningQuantityManager()
    # todo 2025-03-25 测试无误即可删除下部注释
    # # 先使用json.load()直接从文件中读取【已经存储在本地的K线指标属性字典】并解析JSON数据
    # if os.path.exists(constant.K_BARS_PATH):
    #     with open(constant.K_BARS_PATH, 'r', encoding='utf-8') as f:
    #         data_cache.all_stocks_all_K_line_property_dict = json.load(f)
    #         print(
    #             f"data_cache.all_stocks_all_K_line_property_dict的个数==={len(data_cache.all_stocks_all_K_line_property_dict)}")
    # # 获取目标标的K线---初始化
    # all_K_line.main_index_k_line_history.init(data_cache.DataCache().today_date, data_cache.DataCache().next_trading_day, data_cache.DataCache().main_index_stocks)
    # # 直接调用主要指数K线写入本地文件
    # all_K_line.main_index_k_line_dict_write()
def __run_pending():
    def update_k_lines():
        logger_debug.info("开始更新K线")
        all_K_line.all_stocks_all_k_line_dict_write()
        local_data_management.read_local_K_line_data()
        logger_debug.info("结束更新K线")
    def update_target_codes():
        data_cache.target_codes_manager.update_today_codes_info()
        time.sleep(2)
        data_cache.target_codes_manager.load_data()
    def update_all_stocks_plate():
        plate_strength_analysis.get_all_stocks_plate_dict(data_cache.DataCache().filtered_stocks)
        local_data_management.read_local_all_stocks_plate_data()
    def update_trading_dates():
        dates = TradingDatesManager().update_trading_dates()
        TradingDatesManager().load_data()
    schedule.every().day.at("17:00:00").do(lambda: threading.Thread(target=update_k_lines, daemon=True).start())
    schedule.every().day.at("09:05:00").do(lambda: threading.Thread(target=update_k_lines, daemon=True).start())
    schedule.every().day.at("09:05:00").do(lambda: huaxin_trade_data_update.add_money_list())
    schedule.every().day.at("09:05:00").do(lambda: huaxin_trade_data_update.add_position_list())
    schedule.every().day.at("17:10:00").do(lambda: threading.Thread(target=update_target_codes, daemon=True).start())
    schedule.every().day.at("17:00:00").do(
        lambda: threading.Thread(target=update_all_stocks_plate, daemon=True).start())
    schedule.every().day.at("17:00:00").do(lambda: threading.Thread(target=update_trading_dates, daemon=True).start())
    while True:
        try:
            schedule.run_pending()
        except:
            pass
        finally:
            time.sleep(1)
# 第一步:初始化context函数,并开启获取实时数据的线程
@@ -95,6 +136,8 @@
    # print(f"all_stocks_all_K_line_property_dict== {type(data_cache.all_stocks_all_K_line_property_dict)}")
    # 获取实时大盘指数行情线程
    threading.Thread(target=lambda: instant_time_market.index_market_current(), daemon=True).start()
    # 定时任务
    threading.Thread(target=lambda: __run_pending(), daemon=True).start()
    # instant_time_market.index_market_trend()
@@ -116,11 +159,15 @@
                     daemon=True).start()
    # # 开盘啦的板块强度下的个股强度回调函数
    def get_market_sift_plate_its_stock_power_process(market_sift_plate_stock_dict):
    def get_market_sift_plate_its_stock_power_process(market_sift_plate_info):
        # print(f"回调成功===精选板块股票强度数据更新==={market_sift_plate_stock_dict}")
        # logger_kpl_jingxuan_in.info(f"{market_sift_plate_stock_dict}")
        if market_sift_plate_info is None:
            return
        market_sift_plate_stock_dict, market_sift_plates = market_sift_plate_info[0], market_sift_plate_info[1]
        if market_sift_plate_stock_dict:
            data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict
            data_cache.market_sift_plates = market_sift_plates
    # 板块强度下个股强度线程
    threading.Thread(target=plate_strength_analysis.get_market_sift_plate_its_stock_power_process,
@@ -161,6 +208,7 @@
        :return:
        """
        if datas:
            start_time = time.time()
            # logger_debug.debug(f"{code} - L2逐笔成交上报:{len(datas)}")
            price, time_str = datas[-1][1], huaxin_util.convert_time(datas[-1][3])
            try:
@@ -176,7 +224,7 @@
                limit_up_price = tool.get_limit_up_price(code, self.__pre_close_price_dict[code])
                if code in self.__last_price_dict:
                    if abs(limit_up_price - self.__last_price_dict[code]) < 0.0001 < abs(limit_up_price - price):
                        # TODO 处理炸板逻辑
                        # 处理炸板逻辑
                        # 监听了炸板了要做的函数
                        try:
                            selling_strategy.explosion_strategy(code)
@@ -187,6 +235,11 @@
                logger_debug.exception(e)
            finally:
                self.__last_price_dict[code] = price
                data_cache.latest_deal_price_dict[code] = price
                data_cache.latest_l2_transaction_info_dict[code] = (price, time_str)
                use_time = time.time() - start_time
                if use_time > 0.1:
                    logger_debug.warning(f"L2逐笔成交处理耗时:{use_time} 最后一条数据:{datas[-1]}")
    def OnMarketData(self, code, datas):
        # logger_debug.info(f"收到L2Market数据:{datas}")
@@ -196,9 +249,10 @@
    # 实时L2买1成交量
    def OnRealTimeBuy1Info(self, code, buy1_info):
        pass
        # buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量]
        # 最终的买1为: 原始买1量+实时买1量
        async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}")
        # async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}")
        # L1DataProcessor.excute_sell_rule(code, buy1_info[3], buy1_info[1], "l2-real")
@@ -231,14 +285,39 @@
# 第三步:执行策略的初始设置
if __name__ == '__main__':
    log.close_print()
    class MyMarketDataCallback(l2_market_client.L2MarketDataCallback):
        def __init__(self):
            def push_ticks_of_position_codes():
                """
                推送持仓代码的tick数据
                :return:
                """
                while True:
                    try:
                        if data_cache.OPEN_BIDDING_TIME <= tool.get_now_time_str() <= data_cache.OPENING_TIME:
                            logger_debug.info(f"09:25-09:30主动推送数据")
                            datas = [v for k, v in data_cache.current_l1_dict.items()]
                            instant_time_market.set_current_info(datas)
                        elif tool.get_now_time_str() > data_cache.OPENING_TIME:
                            break
                    except Exception as e:
                        logger_debug.exception(e)
                    finally:
                        time.sleep(3)
            threading.Thread(target=push_ticks_of_position_codes, daemon=True).start()
        def on_markets(self, datas):
            """
            L1数据回调
            :param datas:
            :return:
            """
            data_cache.latest_code_market_info_dict = {x[0]: x for x in datas}
            for x in datas:
                data_cache.latest_code_market_info_dict[x[0]] = x
            if constant.is_windows():
                instant_time_market.get_current_info()
            else:
@@ -246,7 +325,8 @@
    # 加载开盘啦板块日志数据
    kpl_data_manager.KPLStockOfMarketsPlateLogManager()
    kpl_data_manager.KPLMarketsSiftPlateLogManager()
    kpl_data_manager.KPLMarketStockHeatLogManager()
    # 启动异步日志
    threading.Thread(target=async_log_util.run_sync, daemon=True).start()
@@ -269,7 +349,7 @@
    threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
    # 运行交易数据更新服务
    huaxin_trade_data_update.run()
    huaxin_trade_data_update.run(order_methods.trade_callback)
    # 等待5s,等其他线程/进程启动完毕
    time.sleep(15)
@@ -284,7 +364,11 @@
        logger_system.exception(e)
    # 需要订阅的目标代码
    target_codes = [x["sec_id"] for x in data_cache.DataCache().all_stocks]
    target_codes = [x[-6:] for x in data_cache.DataCache().min_stocks]
    position_codes = [x[-6:] for x in data_cache.position_symbols_set]
    logger_debug.info(f"今日持仓代码:{position_codes}")
    target_codes = set(target_codes) | set(position_codes)
    logger_debug.info(f"今日要订阅的代码:{len(target_codes)}-{target_codes}")
    # 订阅L2 market行情
    l2_market_client.run(target_codes, MyMarketDataCallback())