# coding=utf-8
|
from __future__ import print_function, absolute_import, unicode_literals
|
import logging
|
import threading
|
import time
|
|
import constant
|
# 引入掘金桥梁API
|
import utils.juejin_api
|
from db.redis_manager_delegate import RedisUtils
|
from huaxin_client.l2_data_transform_protocol import L2DataCallBack
|
from log_module.log import logger_common, logger_kpl_jingxuan_in, logger_system, logger_debug
|
# 引入开盘啦API模块
|
# 引入全局变量模块
|
# 引入定时器模块
|
# 引入历史K线方法模块
|
# 引入瞬时分时行情模块
|
# 引入账户管理模块【进行资金和仓位管理】
|
from strategy import kpl_api, data_cache, check_timer, all_K_line, instant_time_market, account_management, \
|
order_methods, local_data_management, kpl_data_manager, market_sentiment_analysis, plate_strength_analysis
|
from huaxin_client import l2_market_client, l2_client
|
from log_module import async_log_util
|
from trade import huaxin_trade_data_update, huaxin_trade_api
|
from utils import hx_qc_value_util, huaxin_util, juejin_api, tool
|
|
# 引入行情订阅模块
|
# import subscribe_market
|
|
# 全局日志配置 如果不想用就把 logging.ERROR 如果要打就=logging.INFO
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
# 计算当前线程数量
|
def get_current_thread_count():
|
# print(f"threading.active_count()=={threading.active_count()}")
|
return threading.active_count()
|
|
|
# 初始化数据 函数
|
def init_data():
|
# logging.info("main初始化数据开始")
|
logger_common.info(f"main初始化数据开始")
|
'''
|
资金管理初始化
|
仓位管理初始化
|
都在main的初始化数据函数里面完成
|
无需掘金考虑的线程或进程方式实现
|
'''
|
# 初始化账户仓位管理数据
|
account_management.finance_management()
|
# 初始化账户仓位管理数据
|
account_management.position_management()
|
# 初始化.实例化缓存中的全局数据
|
data_cache.DataCache()
|
# 读取本地K线数据 并更新到data_cache
|
|
# 初始化A股所有目标票标的信息
|
data_cache.all_stocks = utils.juejin_api.JueJinApi.get_target_codes()
|
# 获取目标标的K线---初始化
|
all_K_line.k_line_history.init(data_cache.DataCache().today_date, data_cache.DataCache().next_trading_day,
|
data_cache.DataCache().filtered_stocks)
|
|
# 直接调用目标标的指标K线写入本地文件
|
# all_K_line.all_stocks_all_k_line_dict_write()
|
local_data_management.read_local_K_line_data()
|
# 读取本地个股所属板块数据 并更新到data_cache
|
local_data_management.read_local_all_stocks_plate_data()
|
|
# todo 2025-03-25 测试无误即可删除下部注释
|
# # 先使用json.load()直接从文件中读取【已经存储在本地的K线指标属性字典】并解析JSON数据
|
# if os.path.exists(constant.K_BARS_PATH):
|
# with open(constant.K_BARS_PATH, 'r', encoding='utf-8') as f:
|
# data_cache.all_stocks_all_K_line_property_dict = json.load(f)
|
# print(
|
# f"data_cache.all_stocks_all_K_line_property_dict的个数==={len(data_cache.all_stocks_all_K_line_property_dict)}")
|
|
# # 获取目标标的K线---初始化
|
# all_K_line.main_index_k_line_history.init(data_cache.DataCache().today_date, data_cache.DataCache().next_trading_day, data_cache.DataCache().main_index_stocks)
|
# # 直接调用主要指数K线写入本地文件
|
# all_K_line.main_index_k_line_dict_write()
|
|
|
# 第一步:初始化context函数,并开启获取实时数据的线程
|
def init():
|
# 在初始化函数里面就调用全局化变量函数(即上文缩写的init_data函数)
|
init_data()
|
# 实时运行定时器线程【定时器函数目前 只管理 15:00 后运行一次 整理当日涨停信息 和 获取所有个股的板块概念】
|
threading.Thread(target=lambda: check_timer.check_time(), daemon=True).start()
|
# 获取实时大盘行情情绪综合强度 [分数] 线程
|
threading.Thread(target=lambda: market_sentiment_analysis.set_plan_position_quantity(), daemon=True).start()
|
# 实时检测是否拉取K线线程
|
threading.Thread(target=lambda: all_K_line.check_time_and_data_date(), daemon=True).start()
|
# print(f"all_stocks_all_K_line_property_dict== {type(data_cache.all_stocks_all_K_line_property_dict)}")
|
# 获取实时大盘指数行情线程
|
threading.Thread(target=lambda: instant_time_market.index_market_current(), daemon=True).start()
|
|
# instant_time_market.index_market_trend()
|
|
# 开盘啦的涨停概念的回调函数
|
def kpl_limit_up_process(datas):
|
# print(f"回调成功==={datas}")
|
if datas is not None and len(datas) > 0:
|
data_cache.limit_up_block_names = datas
|
else:
|
data_cache.limit_up_block_names = []
|
|
# # 计算当前线程数量
|
# get_current_thread_count()
|
|
# 开启开盘啦 涨停列表 和 全盘个股概念板块 接口线程
|
# 涨停概念线程
|
# threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, daemon=True).start() #该行代码为只运行单一线程不回调数据的方式
|
threading.Thread(target=plate_strength_analysis.kpl_limit_up_process, args=(kpl_limit_up_process,), daemon=True).start()
|
|
# # 开盘啦的板块强度下的个股强度回调函数
|
def get_market_sift_plate_its_stock_power_process(market_sift_plate_stock_dict):
|
# print(f"回调成功===精选板块股票强度数据更新==={market_sift_plate_stock_dict}")
|
# logger_kpl_jingxuan_in.info(f"{market_sift_plate_stock_dict}")
|
data_cache.market_sift_plate_stock_dict = market_sift_plate_stock_dict
|
|
# 板块强度下个股强度线程
|
threading.Thread(target=plate_strength_analysis.get_market_sift_plate_its_stock_power_process,
|
args=(get_market_sift_plate_its_stock_power_process,), daemon=True).start()
|
|
# 初始化get_current_data方法函数,下单买逻辑才会运行中。。。【核心主线程,随时考虑其启动顺序】>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
|
# threading.Thread(target=lambda: instant_time_market.get_current_data(), daemon=True).start()
|
|
try:
|
# 计算开盘啦昨日拉取的概念数据中为空的股票数量
|
plate_strength_analysis.get_have_no_plate_num()
|
except Exception as e:
|
logger_system.exception(e)
|
|
# 获取历史涨停信息数据并整理
|
try:
|
plate_strength_analysis.get_handling_limit_up_info()
|
except Exception as e:
|
logger_system.exception(e)
|
|
try:
|
# 检查K线昨日涨停是否正确
|
all_K_line.check_limit_up_attribute()
|
except Exception as e:
|
logger_system.exception(e)
|
|
|
# 持仓代码的L2数据回调
|
class MyPositionsL2DataCallback(L2DataCallBack):
|
__last_price_dict = {}
|
__pre_close_price_dict = {} # 昨日收盘价
|
|
def OnL2Transaction(self, code, datas):
|
"""
|
昨日持仓L2逐笔成交回调
|
:param code:
|
:param datas:
|
:return:
|
"""
|
if datas:
|
price, time_str = datas[-1][1], huaxin_util.convert_time(datas[-1][3])
|
try:
|
# 获取最近的成交价
|
if code not in self.__pre_close_price_dict:
|
# 获取收盘价格
|
results = juejin_api.JueJinApi.history_n(tool.get_symbol(code), "1d", 1, 1, "close")
|
if results:
|
self.__pre_close_price_dict[code] = results[0]["close"]
|
if self.__last_price_dict.get(code) == price:
|
return
|
limit_up_price = tool.get_limit_up_price(code, self.__pre_close_price_dict[code])
|
if code in self.__last_price_dict:
|
if abs(limit_up_price - self.__last_price_dict[code]) < 0.0001 < abs(limit_up_price - price):
|
# TODO 处理炸板逻辑
|
|
# 炸板
|
logger_debug.info(f"炸板:{code}-({price},{time_str})")
|
finally:
|
self.__last_price_dict[code] = price
|
|
def OnMarketData(self, code, datas):
|
# logger_debug.info(f"收到L2Market数据:{datas}")
|
for d in datas:
|
code = d["securityID"]
|
buy1 = d["buy"][0]
|
|
# 实时L2买1成交量
|
def OnRealTimeBuy1Info(self, code, buy1_info):
|
# buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量]
|
async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}")
|
# L1DataProcessor.excute_sell_rule(code, buy1_info[3], buy1_info[1], "l2-real")
|
|
|
l2_data_callbacks = []
|
|
|
def __subscript_position_l2():
|
"""
|
订阅持仓L2数据
|
:return:
|
"""
|
position_result = huaxin_trade_api.get_position_list(blocking=True)
|
logger_debug.info(f"获取持仓结果:{position_result}")
|
if not position_result or position_result["code"] != 0 or not position_result["data"]:
|
return
|
positions = position_result["data"]
|
subscript_codes = set()
|
for p in positions:
|
if p["prePosition"] > 0:
|
subscript_codes.add(p["securityID"])
|
if not subscript_codes:
|
return
|
|
for i in range(len(subscript_codes)):
|
l2_data_callbacks.append(MyPositionsL2DataCallback())
|
l2_client.run(subscript_codes, l2_data_callbacks)
|
|
|
# 第三步:执行策略的初始设置
|
if __name__ == '__main__':
|
class MyMarketDataCallback(l2_market_client.L2MarketDataCallback):
|
def on_markets(self, datas):
|
"""
|
L1数据回调
|
:param datas:
|
:return:
|
"""
|
data_cache.latest_code_market_info_dict = {x[0]: x for x in datas}
|
if datas:
|
print(datas[0])
|
if constant.is_windows():
|
instant_time_market.get_current_info()
|
else:
|
instant_time_market.set_current_info(datas)
|
|
|
# 加载开盘啦板块日志数据
|
kpl_data_manager.KPLStockOfMarketsPlateLogManager()
|
|
# 启动异步日志
|
threading.Thread(target=async_log_util.run_sync, daemon=True).start()
|
|
# redis 数据同步
|
threading.Thread(target=RedisUtils.run_loop, daemon=True).start()
|
|
# 启动交易
|
order_methods.run()
|
|
# 运行华鑫增值服务进程,用于获取K线与交易日历
|
threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
|
|
# 运行交易数据更新服务
|
huaxin_trade_data_update.run()
|
|
# 等待5s,等其他线程/进程启动完毕
|
time.sleep(15)
|
|
# 订阅持仓票
|
threading.Thread(target=__subscript_position_l2, daemon=True).start()
|
|
try:
|
# 初始化数据
|
init()
|
except Exception as e:
|
logger_system.exception(e)
|
|
# 需要订阅的目标代码
|
target_codes = [x["sec_id"] for x in data_cache.DataCache().all_stocks]
|
|
# 订阅L2 market行情
|
l2_market_client.run(target_codes, MyMarketDataCallback())
|