admin
2025-04-08 9a79886657a5469905997090c69eecc461515120
持仓L2订阅
2个文件已修改
111 ■■■■ 已修改文件
huaxin_client/l2_client.py 38 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 73 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l2_client.py
@@ -17,7 +17,8 @@
from huaxin_client.l2_data_manager import L2DataUploadManager
from log_module import log, async_log_util
from log_module.async_log_util import huaxin_l2_log
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, printlog, logger_debug, logger_local_huaxin_l2_market
from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, printlog, \
    logger_debug, logger_local_huaxin_l2_market
from utils import tool
###B类###
@@ -398,31 +399,6 @@
__l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3)
def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue):
    logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}")
    while True:
        try:
            value = queue_trade_w_l2_r.get()
            if value:
                if type(value) == bytes:
                    value = value.decode("utf-8")
                data = json.loads(value)
                _type = data["type"]
                if _type == "l2_cmd":
                    huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"订阅代码:{data}")
                    __start_time = time.time()
                    # 线程池
                    __l2_cmd_thread_pool.submit(
                        lambda: l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data))
                    use_time = time.time() - __start_time
                    if use_time > 0.005:
                        huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"l2_cmd耗时:{use_time}s")
        except Exception as e:
            logging.exception(e)
pipe_strategy = None
@@ -469,14 +445,11 @@
            time.sleep(10)
def run(queue_r: multiprocessing.Queue, data_callbacks: list) -> None:
def run(codes, data_callbacks: list) -> None:
    logger_system.info("L2进程ID:{}", os.getpid())
    logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}")
    try:
        log.close_print()
        if queue_r is not None:
            t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True)
            t1.start()
        # log.close_print()
        # 初始化
        data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks)
@@ -489,6 +462,9 @@
        global l2CommandManager
        l2CommandManager = command_manager.L2CommandManager()
        l2CommandManager.init(MyL2ActionCallback())
        if codes:
            l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None,
                                             {"type": command_manager.CLIENT_TYPE_CMD_L2, "data": codes})
        logger_system.info("L2订阅服务启动成功")
    except Exception as e:
        logger_system.exception(e)
main.py
@@ -18,9 +18,9 @@
# 引入账户管理模块【进行资金和仓位管理】
from strategy import kpl_api, data_cache, check_timer, all_K_line, instant_time_market, account_management, \
    order_methods, local_data_management, kpl_data_manager, market_sentiment_analysis
from huaxin_client import l2_market_client
from huaxin_client import l2_market_client, l2_client
from log_module import async_log_util
from trade import huaxin_trade_data_update
from trade import huaxin_trade_data_update, huaxin_trade_api
from utils import hx_qc_value_util, huaxin_util
# 引入行情订阅模块
@@ -147,6 +147,52 @@
    # kpl_api.get_all_stocks_plate_dict(data_cache.min_stocks)
class MyPositionsL2DataCallback(L2DataCallBack):
    __last_price_dict = {}
    def OnL2Transaction(self, code, datas):
        if datas:
            # 获取最近的成交价
            price, time_str = datas[-1][1], huaxin_util.convert_time(datas[-1][3])
            # TODO 涨停价变为非涨停价才处理
            self.__last_price_dict[code] = price
    def OnMarketData(self, code, datas):
        # logger_debug.info(f"收到L2Market数据:{datas}")
        for d in datas:
            code = d["securityID"]
            buy1 = d["buy"][0]
    def OnRealTimeBuy1Info(self, code, buy1_info):
        # buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量]
        async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}")
        # L1DataProcessor.excute_sell_rule(code, buy1_info[3], buy1_info[1], "l2-real")
l2_data_callbacks = []
def __subscript_position_l2():
    """
    订阅持仓L2数据
    :return:
    """
    position_result = huaxin_trade_api.get_position_list(blocking=True)
    if not position_result or position_result["code"] != 0 or not position_result["data"]:
        return
    positions = position_result["data"]
    subscript_codes = set()
    for p in positions:
        if p["historyPos"] > 0:
            subscript_codes.add(p["securityID"])
    if not subscript_codes:
        return
    for i in range(len(subscript_codes)):
        l2_data_callbacks.append(MyPositionsL2DataCallback())
    l2_client.run(subscript_codes, l2_data_callbacks)
# 第三步:执行策略的初始设置
if __name__ == '__main__':
    class MyMarketDataCallback(l2_market_client.L2MarketDataCallback):
@@ -163,26 +209,6 @@
                instant_time_market.get_current_info()
            else:
                instant_time_market.set_current_info(datas)
    class MyL2DataCallback(L2DataCallBack):
        def OnL2Transaction(self, code, datas):
            if datas:
                # 获取最近的成交价
                price, time_str = datas[-1][1],  huaxin_util.convert_time(datas[-1][3])
                pass
        def OnMarketData(self, code, datas):
            # logger_debug.info(f"收到L2Market数据:{datas}")
            for d in datas:
                code = d["securityID"]
                buy1 = d["buy"][0]
        def OnRealTimeBuy1Info(self, code, buy1_info):
            # buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量]
            async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}")
            # L1DataProcessor.excute_sell_rule(code, buy1_info[3], buy1_info[1], "l2-real")
    # 加载开盘啦板块日志数据
@@ -206,6 +232,9 @@
    # 等待5s,等其他线程/进程启动完毕
    time.sleep(15)
    # 订阅持仓票
    threading.Thread(target=__subscript_position_l2, daemon=True).start()
    try:
        # 初始化数据
        init()