admin
2025-01-15 f84dcf456dbfa318f490d6cf878be5d5d5262718
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# coding=utf-8
from __future__ import print_function, absolute_import, unicode_literals
import logging
import json
import multiprocessing
# import multiprocessing
# from log import logger
import threading
import time
# 引入掘金桥梁API
import utils.juejin_api
# 引入开盘啦API模块
# 引入全局变量模块
# 引入定时器模块
# 引入历史K线方法模块
# 引入瞬时分时行情模块
# 引入账户管理模块【进行资金和仓位管理】
from strategy import kpl_api, data_cache, check_timer, all_K_line, instant_time_market, account_management
from huaxin_client import l2_market_client
from log_module import async_log_util
# 引入日志模块
from strategy.logging_config import get_logger
from strategy import order_methods
from trade import huaxin_trade_data_update
from utils import hx_qc_value_util
 
# 获取logger实例
logger = get_logger()
# 引入行情订阅模块
# import subscribe_market
 
# 全局日志配置   如果不想用就把 logging.ERROR  如果要打就=logging.INFO
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 
 
# 计算当前线程数量
def get_current_thread_count():
    # print(f"threading.active_count()=={threading.active_count()}")
    return threading.active_count()
 
 
#  初始化数据 函数
def init_data():
    # logging.info("main初始化数据开始")
    logger.info(f"main初始化数据开始")
    # 初始化所有目标票标的信息
    data_cache.all_stocks = utils.juejin_api.JueJinApi.get_target_codes()
    # 获取K线
    data_cache.DataCache()
 
    all_K_line.k_line_history.init(data_cache.DataCache().today_date, data_cache.DataCache().next_trading_day,
                                   data_cache.DataCache().filtered_stocks)
 
    # 调用指标K线写入本地文件
    all_K_line.all_stocks_all_k_line_dict_write()
 
    # 先使用json.load()直接从文件中读取【已经存储在本地的K线指标属性字典】并解析JSON数据
    with open('strategy/local_storage_data/all_stocks_all_K_line_property_dict.json', 'r', encoding='utf-8') as f:
        data_cache.all_stocks_all_K_line_property_dict = json.load(f)
        print(
            f"data_cache.all_stocks_all_K_line_property_dict的个数==={len(data_cache.all_stocks_all_K_line_property_dict)}")
 
    # current_data_info = current(symbols='SHSE.603839', fields='open')
    # print(f"current_data_info==={current_data_info}")
 
 
# 第一步:初始化context函数,并开启获取实时数据的线程
def init():
    # 在初始化函数里面就调用全局化变量函数(即上文缩写的init_data函数)
    init_data()
    # 实时运行定时器线程【定时器函数目前 只管理 15:00 后运行一次 整理当日涨停信息 和 获取所有个股的板块概念】
    threading.Thread(target=lambda: check_timer.check_time(), daemon=True).start()
    # 获取实时大盘行情情绪综合强度 [分数] 线程
    threading.Thread(target=lambda: kpl_api.get_real_time_market_strong(), daemon=True).start()
    # 实时检测是否拉取K线线程
    threading.Thread(target=lambda: all_K_line.check_time_and_data_date(), daemon=True).start()
    # print(f"all_stocks_all_K_line_property_dict== {type(data_cache.all_stocks_all_K_line_property_dict)}")
 
    '''
    目前设想,买只需要进行账户管理,卖才需要仓位管理,而卖在通过行情订阅实现
    并且行情订阅在另一个进程里面,所以目前不需要同时调用
    # 初始化资金管理,下单买逻辑才会有数据
    account_management.position_management(context)
    # 初始化仓位管理,下单卖逻辑才会有数据
    account_management.position_management(context)
    '''
 
    # TODO 此处注释,要替换context
    # threading.Thread(target=account_management.finance_management_process, args=(context,), daemon=True).start()
    # threading.Thread(target=account_management.position_management_process, args=(context,), daemon=True).start()
 
    #  开盘啦的涨停概念的回调函数
    def kpl_limit_up_process(datas):
        # print(f"回调成功==={datas}")
        data_cache.limit_up_block_names = datas
 
    # # 计算当前线程数量
    # get_current_thread_count()
 
    # 开启开盘啦 涨停列表 和 全盘个股概念板块 接口线程
    # 涨停概念线程
    # threading.Thread(target=kpl_api.kpl_limit_up_process, daemon=True).start()    #该行代码为只运行单一线程不回调数据的方式
    threading.Thread(target=kpl_api.kpl_limit_up_process, args=(kpl_limit_up_process,), daemon=True).start()
 
    # # 开盘啦的板块强度下的个股强度回调函数
    def get_market_sift_plate_its_stock_power_process(market_sift_plate_stock_dict):
        # print(f"回调成功===精选板块股票强度数据更新==={market_sift_plate_stock_dict}")
        data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict
 
    # 板块强度下个股强度线程
    threading.Thread(target=kpl_api.get_market_sift_plate_its_stock_power_process,
                     args=(get_market_sift_plate_its_stock_power_process,), daemon=True).start()
 
    # 行情订阅,沪深300指数【在on_tick函数中去打印tick 数据】
    # subscribe('SHSE.000300', frequency='tick', count=1, unsubscribe_previous=False)
 
    # 初始化get_current_data方法函数,下单买逻辑才会运行中。。。【核心主线程,随时考虑其启动顺序】>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    # threading.Thread(target=lambda: instant_time_market.get_current_data(), daemon=True).start()
 
    # 计算开盘啦昨日拉取的概念数据中为空的股票数量
    kpl_api.get_have_no_plate_num()
 
    # 获取历史涨停信息数据并整理
    kpl_api.get_handling_limit_up_info()
 
    # 检查K线昨日涨停是否正确
    all_K_line.check_limit_up_attribute()
 
    # # 获取所有个股的板块概念并写入文件【耗时较长应该放在 核心主线程 和 仓位管理 后面】
    # kpl_api.get_all_stocks_plate_dict(data_cache.min_stocks)
 
 
# 委托状态更新事件
def on_order_status(context, order):
    # 可在后面执行其他处理逻辑
    logger.info(f"context====main————{context}")
    logger.info(f"order====main————{order}")
 
 
# 委托执行回报事件
def on_execution_report(context, execrpt):
    logger.info(f"execrpt>context===={context}")
    logger.info(f"execrpt===={execrpt}")
 
 
# 交易账户状态更新事件 (交易账户状态对象,仅响应 已连接,已登录,已断开 和 错误 事件。)
def on_account_status(context, account):
    logger.info(f"account>context===={context}")
    logger.info(f"account===={account}")
    logger.info(account)
 
 
# 第三步:执行策略的初始设置
if __name__ == '__main__':
    class MyMarketDataCallback(l2_market_client.L2MarketDataCallback):
        def on_markets(self, datas):
            """
            L1数据回调
            :param datas:
            :return:
            """
            data_cache.latest_code_market_info_dict = {x[0]: x for x in datas}
            instant_time_market.process_current_infos(datas)
 
 
    # 启动异步日志
    threading.Thread(target=async_log_util.run_sync, daemon=True).start()
 
    # 启动交易
    order_methods.run()
 
    # 运行华鑫增值服务进程,用于获取K线与交易日历
    threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
 
    # 运行交易数据更新服务
    huaxin_trade_data_update.run()
 
    # 等待5s,等其他线程/进程启动完毕
    time.sleep(5)
 
    # 初始化数据
    init()
 
    # 需要订阅的目标代码
    target_codes = [x["sec_id"] for x in data_cache.DataCache().all_stocks]
 
    # 订阅L2 market行情
    l2_market_client.run(target_codes, MyMarketDataCallback())