admin
5 天以前 cea6eee0edb661e0215a90a61642bf31afb3e502
main.py
@@ -1,6 +1,7 @@
# coding=utf-8
from __future__ import print_function, absolute_import, unicode_literals
import logging
import multiprocessing
import threading
import time
@@ -19,8 +20,9 @@
from strategy import kpl_api, data_cache, check_timer, all_K_line, instant_time_market, account_management, \
    order_methods, local_data_management, kpl_data_manager, market_sentiment_analysis, plate_strength_analysis, \
    selling_strategy
from huaxin_client import l2_market_client, l2_client
from log_module import async_log_util
from huaxin_client import l2_market_client, l2_client, trade_client
from log_module import async_log_util, log
from strategy.order_methods import TodayBuyCodeManager
from trade import huaxin_trade_data_update, huaxin_trade_api
from utils import hx_qc_value_util, huaxin_util, juejin_api, tool
@@ -66,19 +68,9 @@
    local_data_management.read_local_K_line_data()
    # 读取本地个股所属板块数据 并更新到data_cache
    local_data_management.read_local_all_stocks_plate_data()
    # 初始化拉取当日买入代码
    TodayBuyCodeManager()
    # todo 2025-03-25 测试无误即可删除下部注释
    # # 先使用json.load()直接从文件中读取【已经存储在本地的K线指标属性字典】并解析JSON数据
    # if os.path.exists(constant.K_BARS_PATH):
    #     with open(constant.K_BARS_PATH, 'r', encoding='utf-8') as f:
    #         data_cache.all_stocks_all_K_line_property_dict = json.load(f)
    #         print(
    #             f"data_cache.all_stocks_all_K_line_property_dict的个数==={len(data_cache.all_stocks_all_K_line_property_dict)}")
    # # 获取目标标的K线---初始化
    # all_K_line.main_index_k_line_history.init(data_cache.DataCache().today_date, data_cache.DataCache().next_trading_day, data_cache.DataCache().main_index_stocks)
    # # 直接调用主要指数K线写入本地文件
    # all_K_line.main_index_k_line_dict_write()
# 第一步:初始化context函数,并开启获取实时数据的线程
@@ -111,13 +103,17 @@
    # 开启开盘啦 涨停列表 和 全盘个股概念板块 接口线程
    # 涨停概念线程
    # threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, daemon=True).start()    #该行代码为只运行单一线程不回调数据的方式
    threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, args=(kpl_limit_up_process,), daemon=True).start()
    threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, args=(kpl_limit_up_process,),
                     daemon=True).start()
    # # 开盘啦的板块强度下的个股强度回调函数
    def get_market_sift_plate_its_stock_power_process(market_sift_plate_stock_dict):
    def get_market_sift_plate_its_stock_power_process(market_sift_plate_info):
        # print(f"回调成功===精选板块股票强度数据更新==={market_sift_plate_stock_dict}")
        # logger_kpl_jingxuan_in.info(f"{market_sift_plate_stock_dict}")
        data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict
        market_sift_plate_stock_dict,  market_sift_plates = market_sift_plate_info[0], market_sift_plate_info[1]
        if market_sift_plate_stock_dict:
            data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict
            data_cache.market_sift_plates = market_sift_plates
    # 板块强度下个股强度线程
    threading.Thread(target=plate_strength_analysis.get_market_sift_plate_its_stock_power_process,
@@ -158,7 +154,8 @@
        :return:
        """
        if datas:
            logger_debug.debug(f"{code} - L2逐笔成交上报:{len(datas)}")
            start_time = time.time()
            # logger_debug.debug(f"{code} - L2逐笔成交上报:{len(datas)}")
            price, time_str = datas[-1][1], huaxin_util.convert_time(datas[-1][3])
            try:
                # 获取最近的成交价
@@ -173,7 +170,7 @@
                limit_up_price = tool.get_limit_up_price(code, self.__pre_close_price_dict[code])
                if code in self.__last_price_dict:
                    if abs(limit_up_price - self.__last_price_dict[code]) < 0.0001 < abs(limit_up_price - price):
                        # TODO 处理炸板逻辑
                        # 处理炸板逻辑
                        # 监听了炸板了要做的函数
                        try:
                            selling_strategy.explosion_strategy(code)
@@ -184,6 +181,11 @@
                logger_debug.exception(e)
            finally:
                self.__last_price_dict[code] = price
                data_cache.latest_deal_price_dict[code] = price
                use_time = time.time() - start_time
                if use_time > 0.1:
                    logger_debug.warning(f"L2逐笔成交处理耗时:{use_time} 最后一条数据:{datas[-1]}")
    def OnMarketData(self, code, datas):
        # logger_debug.info(f"收到L2Market数据:{datas}")
@@ -193,12 +195,15 @@
    # 实时L2买1成交量
    def OnRealTimeBuy1Info(self, code, buy1_info):
        pass
        # buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量]
        async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}")
        # 最终的买1为: 原始买1量+实时买1量
        # async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}")
        # L1DataProcessor.excute_sell_rule(code, buy1_info[3], buy1_info[1], "l2-real")
l2_data_callbacks = []
# 订阅持仓L2数据
def __subscript_position_l2():
@@ -225,6 +230,7 @@
# 第三步:执行策略的初始设置
if __name__ == '__main__':
    log.close_print()
    class MyMarketDataCallback(l2_market_client.L2MarketDataCallback):
        def on_markets(self, datas):
            """
@@ -233,8 +239,6 @@
            :return:
            """
            data_cache.latest_code_market_info_dict = {x[0]: x for x in datas}
            if datas:
                print(datas[0])
            if constant.is_windows():
                instant_time_market.get_current_info()
            else:
@@ -242,7 +246,8 @@
    # 加载开盘啦板块日志数据
    kpl_data_manager.KPLStockOfMarketsPlateLogManager()
    kpl_data_manager.KPLMarketsSiftPlateLogManager()
    kpl_data_manager.KPLMarketStockHeatLogManager()
    # 启动异步日志
    threading.Thread(target=async_log_util.run_sync, daemon=True).start()
@@ -250,8 +255,16 @@
    # redis 数据同步
    threading.Thread(target=RedisUtils.run_loop, daemon=True).start()
    # 策略与交易通信队列
    queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue()
    # 不是模拟盘的时候启动交易
    if not constant.IS_SIMULATED_TRADE:
        multiprocessing.Process(target=trade_client.run, args=(
            queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r,)).start()
    # 启动交易
    order_methods.run()
    order_methods.run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r)
    # 运行华鑫增值服务进程,用于获取K线与交易日历
    threading.Thread(target=hx_qc_value_util.run, daemon=True).start()