admin
3 天以前 0b0d0e790fec8c7edfdbcab5c31d625e0c2eadd6
main.py
@@ -1,9 +1,7 @@
# coding=utf-8
from __future__ import print_function, absolute_import, unicode_literals
import logging
import json
import os.path
# from log import logger
import multiprocessing
import threading
import time
@@ -11,7 +9,8 @@
# 引入掘金桥梁API
import utils.juejin_api
from db.redis_manager_delegate import RedisUtils
from log_module.log import logger_common, logger_kpl_jingxuan_in, logger_system
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module.log import logger_common, logger_kpl_jingxuan_in, logger_system, logger_debug
# 引入开盘啦API模块
# 引入全局变量模块
# 引入定时器模块
@@ -19,11 +18,13 @@
# 引入瞬时分时行情模块
# 引入账户管理模块【进行资金和仓位管理】
from strategy import kpl_api, data_cache, check_timer, all_K_line, instant_time_market, account_management, \
    order_methods, local_data_management, kpl_data_manager
from huaxin_client import l2_market_client
from log_module import async_log_util
from trade import huaxin_trade_data_update
from utils import hx_qc_value_util
    order_methods, local_data_management, kpl_data_manager, market_sentiment_analysis, plate_strength_analysis, \
    selling_strategy
from huaxin_client import l2_market_client, l2_client, trade_client
from log_module import async_log_util, log
from strategy.order_methods import TodayBuyCodeManager
from trade import huaxin_trade_data_update, huaxin_trade_api
from utils import hx_qc_value_util, huaxin_util, juejin_api, tool
# 引入行情订阅模块
# import subscribe_market
@@ -67,19 +68,9 @@
    local_data_management.read_local_K_line_data()
    # 读取本地个股所属板块数据 并更新到data_cache
    local_data_management.read_local_all_stocks_plate_data()
    # 初始化拉取当日买入代码
    TodayBuyCodeManager()
    # todo 2025-03-25 测试无误即可删除下部注释
    # # 先使用json.load()直接从文件中读取【已经存储在本地的K线指标属性字典】并解析JSON数据
    # if os.path.exists(constant.K_BARS_PATH):
    #     with open(constant.K_BARS_PATH, 'r', encoding='utf-8') as f:
    #         data_cache.all_stocks_all_K_line_property_dict = json.load(f)
    #         print(
    #             f"data_cache.all_stocks_all_K_line_property_dict的个数==={len(data_cache.all_stocks_all_K_line_property_dict)}")
    # # 获取目标标的K线---初始化
    # all_K_line.main_index_k_line_history.init(data_cache.DataCache().today_date, data_cache.DataCache().next_trading_day, data_cache.DataCache().main_index_stocks)
    # # 直接调用主要指数K线写入本地文件
    # all_K_line.main_index_k_line_dict_write()
# 第一步:初始化context函数,并开启获取实时数据的线程
@@ -89,19 +80,23 @@
    # 实时运行定时器线程【定时器函数目前 只管理 15:00 后运行一次 整理当日涨停信息 和 获取所有个股的板块概念】
    threading.Thread(target=lambda: check_timer.check_time(), daemon=True).start()
    # 获取实时大盘行情情绪综合强度 [分数] 线程
    threading.Thread(target=lambda: kpl_api.get_real_time_market_strong(), daemon=True).start()
    threading.Thread(target=lambda: market_sentiment_analysis.set_plan_position_quantity(), daemon=True).start()
    # 实时检测是否拉取K线线程
    threading.Thread(target=lambda: all_K_line.check_time_and_data_date(), daemon=True).start()
    # print(f"all_stocks_all_K_line_property_dict== {type(data_cache.all_stocks_all_K_line_property_dict)}")
    # 获取实时大盘指数行情线程
    threading.Thread(target=lambda: instant_time_market.index_market_current(), daemon=True).start()
    # instant_time_market.index_market_trend()
    #  开盘啦的涨停概念的回调函数
    def kpl_limit_up_process(datas):
        # print(f"回调成功==={datas}")
        now_time = tool.get_now_time_str()
        if datas is not None and len(datas) > 0:
            data_cache.limit_up_block_names = datas
            if data_cache.CLOSING_TIME < now_time < data_cache.AFTER_CLOSING_TIME:
                logger_common.info(f"收盘涨停概念列表更新==={now_time}=={datas}")
        else:
            data_cache.limit_up_block_names = []
@@ -110,17 +105,21 @@
    # 开启开盘啦 涨停列表 和 全盘个股概念板块 接口线程
    # 涨停概念线程
    # threading.Thread(target=kpl_api.kpl_limit_up_process, daemon=True).start()    #该行代码为只运行单一线程不回调数据的方式
    threading.Thread(target=kpl_api.kpl_limit_up_process, args=(kpl_limit_up_process,), daemon=True).start()
    # threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, daemon=True).start()    #该行代码为只运行单一线程不回调数据的方式
    threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, args=(kpl_limit_up_process,),
                     daemon=True).start()
    # # 开盘啦的板块强度下的个股强度回调函数
    def get_market_sift_plate_its_stock_power_process(market_sift_plate_stock_dict):
    def get_market_sift_plate_its_stock_power_process(market_sift_plate_info):
        # print(f"回调成功===精选板块股票强度数据更新==={market_sift_plate_stock_dict}")
        # logger_kpl_jingxuan_in.info(f"{market_sift_plate_stock_dict}")
        data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict
        market_sift_plate_stock_dict,  market_sift_plates = market_sift_plate_info[0], market_sift_plate_info[1]
        if market_sift_plate_stock_dict:
            data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict
            data_cache.market_sift_plates = market_sift_plates
    # 板块强度下个股强度线程
    threading.Thread(target=kpl_api.get_market_sift_plate_its_stock_power_process,
    threading.Thread(target=plate_strength_analysis.get_market_sift_plate_its_stock_power_process,
                     args=(get_market_sift_plate_its_stock_power_process,), daemon=True).start()
    # 初始化get_current_data方法函数,下单买逻辑才会运行中。。。【核心主线程,随时考虑其启动顺序】>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
@@ -128,13 +127,13 @@
    try:
        # 计算开盘啦昨日拉取的概念数据中为空的股票数量
        kpl_api.get_have_no_plate_num()
        plate_strength_analysis.get_have_no_plate_num()
    except Exception as e:
        logger_system.exception(e)
    # 获取历史涨停信息数据并整理
    try:
        kpl_api.get_handling_limit_up_info()
        plate_strength_analysis.get_handling_limit_up_info()
    except Exception as e:
        logger_system.exception(e)
@@ -144,12 +143,97 @@
    except Exception as e:
        logger_system.exception(e)
    # # 获取所有个股的板块概念并写入文件【耗时较长应该放在 核心主线程 和 仓位管理 后面】
    # kpl_api.get_all_stocks_plate_dict(data_cache.min_stocks)
# 持仓代码的L2数据回调
class MyPositionsL2DataCallback(L2DataCallBack):
    __last_price_dict = {}
    __pre_close_price_dict = {}  # 昨日收盘价
    def OnL2Transaction(self, code, datas):
        """
        昨日持仓L2逐笔成交回调
        :param code:
        :param datas:
        :return:
        """
        if datas:
            start_time = time.time()
            # logger_debug.debug(f"{code} - L2逐笔成交上报:{len(datas)}")
            price, time_str = datas[-1][1], huaxin_util.convert_time(datas[-1][3])
            try:
                # 获取最近的成交价
                if code not in self.__pre_close_price_dict:
                    # 获取收盘价格
                    results = juejin_api.JueJinApi.history_n(tool.get_symbol(code), "1d", 1, 1, "close")
                    if results:
                        self.__pre_close_price_dict[code] = results[0]["close"]
                        logger_debug.debug(f"{code} - 获取到昨日收盘价:{results[0]['close']}")
                if self.__last_price_dict.get(code) == price:
                    return
                limit_up_price = tool.get_limit_up_price(code, self.__pre_close_price_dict[code])
                if code in self.__last_price_dict:
                    if abs(limit_up_price - self.__last_price_dict[code]) < 0.0001 < abs(limit_up_price - price):
                        # 处理炸板逻辑
                        # 监听了炸板了要做的函数
                        try:
                            selling_strategy.explosion_strategy(code)
                            # 炸板s
                        finally:
                            logger_debug.info(f"炸板:{code}-({price},{time_str})")
            except Exception as e:
                logger_debug.exception(e)
            finally:
                self.__last_price_dict[code] = price
                data_cache.latest_deal_price_dict[code] = price
                use_time = time.time() - start_time
                if use_time > 0.1:
                    logger_debug.warning(f"L2逐笔成交处理耗时:{use_time} 最后一条数据:{datas[-1]}")
    def OnMarketData(self, code, datas):
        # logger_debug.info(f"收到L2Market数据:{datas}")
        for d in datas:
            code = d["securityID"]
            buy1 = d["buy"][0]
    # 实时L2买1成交量
    def OnRealTimeBuy1Info(self, code, buy1_info):
        pass
        # buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量]
        # 最终的买1为: 原始买1量+实时买1量
        # async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}")
        # L1DataProcessor.excute_sell_rule(code, buy1_info[3], buy1_info[1], "l2-real")
l2_data_callbacks = []
# 订阅持仓L2数据
def __subscript_position_l2():
    """
    订阅持仓L2数据
    :return:
    """
    position_result = huaxin_trade_api.get_position_list(blocking=True)
    logger_debug.info(f"获取持仓结果:{position_result}")
    if not position_result or position_result["code"] != 0 or not position_result["data"]:
        return
    positions = position_result["data"]
    subscript_codes = set()
    for p in positions:
        if p["prePosition"] > 0:
            subscript_codes.add(p["securityID"])
    if not subscript_codes:
        return
    for i in range(len(subscript_codes)):
        l2_data_callbacks.append(MyPositionsL2DataCallback())
    l2_client.run(subscript_codes, l2_data_callbacks)
# 第三步:执行策略的初始设置
if __name__ == '__main__':
    log.close_print()
    class MyMarketDataCallback(l2_market_client.L2MarketDataCallback):
        def on_markets(self, datas):
            """
@@ -158,16 +242,15 @@
            :return:
            """
            data_cache.latest_code_market_info_dict = {x[0]: x for x in datas}
            if datas:
                print(datas[0])
            if constant.is_windows():
                instant_time_market.get_current_info()
            else:
                instant_time_market.set_current_info(datas)
    # 加载开盘啦板块日志数据
    kpl_data_manager.KPLStockOfMarketsPlateLogManager()
    # 加载开盘啦板块日志数据
    kpl_data_manager.KPLMarketsSiftPlateLogManager()
    kpl_data_manager.KPLMarketStockHeatLogManager()
    # 启动异步日志
    threading.Thread(target=async_log_util.run_sync, daemon=True).start()
@@ -175,8 +258,16 @@
    # redis 数据同步
    threading.Thread(target=RedisUtils.run_loop, daemon=True).start()
    # 策略与交易通信队列
    queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue()
    # 不是模拟盘的时候启动交易
    if not constant.IS_SIMULATED_TRADE:
        multiprocessing.Process(target=trade_client.run, args=(
            queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r,)).start()
    # 启动交易
    order_methods.run()
    order_methods.run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r)
    # 运行华鑫增值服务进程,用于获取K线与交易日历
    threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
@@ -187,6 +278,9 @@
    # 等待5s,等其他线程/进程启动完毕
    time.sleep(15)
    # 订阅持仓票
    threading.Thread(target=__subscript_position_l2, daemon=True).start()
    try:
        # 初始化数据
        init()