admin
2025-05-21 275a72f4237a79cd0aa270eb2dc81ebd5172830f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# coding=utf-8
from __future__ import print_function, absolute_import, unicode_literals
import logging
import multiprocessing
import threading
import time
 
import constant
# 引入掘金桥梁API
import utils.juejin_api
from db.redis_manager_delegate import RedisUtils
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
from log_module.log import logger_common, logger_kpl_jingxuan_in, logger_system, logger_debug
# 引入开盘啦API模块
# 引入全局变量模块
# 引入定时器模块
# 引入历史K线方法模块
# 引入瞬时分时行情模块
# 引入账户管理模块【进行资金和仓位管理】
from strategy import kpl_api, data_cache, check_timer, all_K_line, instant_time_market, account_management, \
    order_methods, local_data_management, kpl_data_manager, market_sentiment_analysis, plate_strength_analysis, \
    selling_strategy
from huaxin_client import l2_market_client, l2_client, trade_client
from log_module import async_log_util, log
from strategy.order_methods import TodayBuyCodeManager
from trade import huaxin_trade_data_update, huaxin_trade_api
from utils import hx_qc_value_util, huaxin_util, juejin_api, tool
 
# 引入行情订阅模块
# import subscribe_market
 
# 全局日志配置   如果不想用就把 logging.ERROR  如果要打就=logging.INFO
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 
 
# 计算当前线程数量
def get_current_thread_count():
    # print(f"threading.active_count()=={threading.active_count()}")
    return threading.active_count()
 
 
#  初始化数据 函数
def init_data():
    # logging.info("main初始化数据开始")
    logger_common.info(f"main初始化数据开始")
    '''
    资金管理初始化
    仓位管理初始化
    都在main的初始化数据函数里面完成
    无需掘金考虑的线程或进程方式实现
    '''
    # 初始化账户仓位管理数据
    account_management.finance_management()
    # 初始化账户仓位管理数据
    account_management.position_management()
    # 初始化.实例化缓存中的全局数据
    data_cache.DataCache()
    # 读取本地K线数据 并更新到data_cache
 
    # 初始化A股所有目标票标的信息
    data_cache.all_stocks = utils.juejin_api.JueJinApi.get_target_codes()
    # 获取目标标的K线---初始化
    all_K_line.k_line_history.init(data_cache.DataCache().today_date, data_cache.DataCache().next_trading_day,
                                   data_cache.DataCache().filtered_stocks)
 
    # 直接调用目标标的指标K线写入本地文件
    # all_K_line.all_stocks_all_k_line_dict_write()
    local_data_management.read_local_K_line_data()
    # 读取本地个股所属板块数据 并更新到data_cache
    local_data_management.read_local_all_stocks_plate_data()
    # 初始化拉取当日买入代码
    TodayBuyCodeManager()
 
    # todo 2025-03-25 测试无误即可删除下部注释
    # # 先使用json.load()直接从文件中读取【已经存储在本地的K线指标属性字典】并解析JSON数据
    # if os.path.exists(constant.K_BARS_PATH):
    #     with open(constant.K_BARS_PATH, 'r', encoding='utf-8') as f:
    #         data_cache.all_stocks_all_K_line_property_dict = json.load(f)
    #         print(
    #             f"data_cache.all_stocks_all_K_line_property_dict的个数==={len(data_cache.all_stocks_all_K_line_property_dict)}")
 
    # # 获取目标标的K线---初始化
    # all_K_line.main_index_k_line_history.init(data_cache.DataCache().today_date, data_cache.DataCache().next_trading_day, data_cache.DataCache().main_index_stocks)
    # # 直接调用主要指数K线写入本地文件
    # all_K_line.main_index_k_line_dict_write()
 
 
# 第一步:初始化context函数,并开启获取实时数据的线程
def init():
    # 在初始化函数里面就调用全局化变量函数(即上文缩写的init_data函数)
    init_data()
    # 实时运行定时器线程【定时器函数目前 只管理 15:00 后运行一次 整理当日涨停信息 和 获取所有个股的板块概念】
    threading.Thread(target=lambda: check_timer.check_time(), daemon=True).start()
    # 获取实时大盘行情情绪综合强度 [分数] 线程
    threading.Thread(target=lambda: market_sentiment_analysis.set_plan_position_quantity(), daemon=True).start()
    # 实时检测是否拉取K线线程
    threading.Thread(target=lambda: all_K_line.check_time_and_data_date(), daemon=True).start()
    # print(f"all_stocks_all_K_line_property_dict== {type(data_cache.all_stocks_all_K_line_property_dict)}")
    # 获取实时大盘指数行情线程
    threading.Thread(target=lambda: instant_time_market.index_market_current(), daemon=True).start()
 
    # instant_time_market.index_market_trend()
 
    #  开盘啦的涨停概念的回调函数
    def kpl_limit_up_process(datas):
        # print(f"回调成功==={datas}")
        if datas is not None and len(datas) > 0:
            data_cache.limit_up_block_names = datas
        else:
            data_cache.limit_up_block_names = []
 
    # # 计算当前线程数量
    # get_current_thread_count()
 
    # 开启开盘啦 涨停列表 和 全盘个股概念板块 接口线程
    # 涨停概念线程
    # threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, daemon=True).start()    #该行代码为只运行单一线程不回调数据的方式
    threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, args=(kpl_limit_up_process,),
                     daemon=True).start()
 
    # # 开盘啦的板块强度下的个股强度回调函数
    def get_market_sift_plate_its_stock_power_process(market_sift_plate_stock_dict):
        # print(f"回调成功===精选板块股票强度数据更新==={market_sift_plate_stock_dict}")
        # logger_kpl_jingxuan_in.info(f"{market_sift_plate_stock_dict}")
        if market_sift_plate_stock_dict:
            data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict
 
    # 板块强度下个股强度线程
    threading.Thread(target=plate_strength_analysis.get_market_sift_plate_its_stock_power_process,
                     args=(get_market_sift_plate_its_stock_power_process,), daemon=True).start()
 
    # 初始化get_current_data方法函数,下单买逻辑才会运行中。。。【核心主线程,随时考虑其启动顺序】>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    # threading.Thread(target=lambda: instant_time_market.get_current_data(), daemon=True).start()
 
    try:
        # 计算开盘啦昨日拉取的概念数据中为空的股票数量
        plate_strength_analysis.get_have_no_plate_num()
    except Exception as e:
        logger_system.exception(e)
 
    # 获取历史涨停信息数据并整理
    try:
        plate_strength_analysis.get_handling_limit_up_info()
    except Exception as e:
        logger_system.exception(e)
 
    try:
        # 检查K线昨日涨停是否正确
        all_K_line.check_limit_up_attribute()
    except Exception as e:
        logger_system.exception(e)
 
 
# 持仓代码的L2数据回调
class MyPositionsL2DataCallback(L2DataCallBack):
    __last_price_dict = {}
    __pre_close_price_dict = {}  # 昨日收盘价
 
    def OnL2Transaction(self, code, datas):
        """
        昨日持仓L2逐笔成交回调
        :param code:
        :param datas:
        :return:
        """
        if datas:
            start_time = time.time()
            # logger_debug.debug(f"{code} - L2逐笔成交上报:{len(datas)}")
            price, time_str = datas[-1][1], huaxin_util.convert_time(datas[-1][3])
            try:
                # 获取最近的成交价
                if code not in self.__pre_close_price_dict:
                    # 获取收盘价格
                    results = juejin_api.JueJinApi.history_n(tool.get_symbol(code), "1d", 1, 1, "close")
                    if results:
                        self.__pre_close_price_dict[code] = results[0]["close"]
                        logger_debug.debug(f"{code} - 获取到昨日收盘价:{results[0]['close']}")
                if self.__last_price_dict.get(code) == price:
                    return
                limit_up_price = tool.get_limit_up_price(code, self.__pre_close_price_dict[code])
                if code in self.__last_price_dict:
                    if abs(limit_up_price - self.__last_price_dict[code]) < 0.0001 < abs(limit_up_price - price):
                        # TODO 处理炸板逻辑
                        # 监听了炸板了要做的函数
                        try:
                            selling_strategy.explosion_strategy(code)
                            # 炸板s
                        finally:
                            logger_debug.info(f"炸板:{code}-({price},{time_str})")
            except Exception as e:
                logger_debug.exception(e)
            finally:
                self.__last_price_dict[code] = price
                data_cache.latest_deal_price_dict[code] = price
                use_time = time.time() - start_time
                if use_time > 0.1:
                    logger_debug.warning(f"L2逐笔成交处理耗时:{use_time} 最后一条数据:{datas[-1]}")
 
 
    def OnMarketData(self, code, datas):
        # logger_debug.info(f"收到L2Market数据:{datas}")
        for d in datas:
            code = d["securityID"]
            buy1 = d["buy"][0]
 
    # 实时L2买1成交量
    def OnRealTimeBuy1Info(self, code, buy1_info):
        # buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量]
        # 最终的买1为: 原始买1量+实时买1量
        async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}")
        # L1DataProcessor.excute_sell_rule(code, buy1_info[3], buy1_info[1], "l2-real")
 
 
l2_data_callbacks = []
 
 
# 订阅持仓L2数据
def __subscript_position_l2():
    """
    订阅持仓L2数据
    :return:
    """
    position_result = huaxin_trade_api.get_position_list(blocking=True)
    logger_debug.info(f"获取持仓结果:{position_result}")
    if not position_result or position_result["code"] != 0 or not position_result["data"]:
        return
    positions = position_result["data"]
    subscript_codes = set()
    for p in positions:
        if p["prePosition"] > 0:
            subscript_codes.add(p["securityID"])
    if not subscript_codes:
        return
 
    for i in range(len(subscript_codes)):
        l2_data_callbacks.append(MyPositionsL2DataCallback())
    l2_client.run(subscript_codes, l2_data_callbacks)
 
 
# 第三步:执行策略的初始设置
if __name__ == '__main__':
    log.close_print()
    class MyMarketDataCallback(l2_market_client.L2MarketDataCallback):
        def on_markets(self, datas):
            """
            L1数据回调
            :param datas:
            :return:
            """
            data_cache.latest_code_market_info_dict = {x[0]: x for x in datas}
            if constant.is_windows():
                instant_time_market.get_current_info()
            else:
                instant_time_market.set_current_info(datas)
 
 
    # 加载开盘啦板块日志数据
    kpl_data_manager.KPLStockOfMarketsPlateLogManager()
 
    # 启动异步日志
    threading.Thread(target=async_log_util.run_sync, daemon=True).start()
 
    # redis 数据同步
    threading.Thread(target=RedisUtils.run_loop, daemon=True).start()
 
    # 策略与交易通信队列
    queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue()
 
    # 不是模拟盘的时候启动交易
    if not constant.IS_SIMULATED_TRADE:
        multiprocessing.Process(target=trade_client.run, args=(
            queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r,)).start()
 
    # 启动交易
    order_methods.run(queue_strategy_r_trade_w, queue_strategy_w_trade_r, queue_strategy_w_trade_for_query_r)
 
    # 运行华鑫增值服务进程,用于获取K线与交易日历
    threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
 
    # 运行交易数据更新服务
    huaxin_trade_data_update.run()
 
    # 等待5s,等其他线程/进程启动完毕
    time.sleep(15)
 
    # 订阅持仓票
    threading.Thread(target=__subscript_position_l2, daemon=True).start()
 
    try:
        # 初始化数据
        init()
    except Exception as e:
        logger_system.exception(e)
 
    # 需要订阅的目标代码
    target_codes = [x["sec_id"] for x in data_cache.DataCache().all_stocks]
 
    # 订阅L2 market行情
    l2_market_client.run(target_codes, MyMarketDataCallback())