| | |
| | | from huaxin_client.l2_data_manager import L2DataUploadManager |
| | | from log_module import log, async_log_util |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, printlog, logger_debug, logger_local_huaxin_l2_market |
| | | from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, printlog, \ |
| | | logger_debug, logger_local_huaxin_l2_market |
| | | from utils import tool |
| | | |
| | | ###B类### |
| | |
| | | |
| | | __l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3) |
| | | |
| | | |
| | | def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue): |
| | | logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | value = queue_trade_w_l2_r.get() |
| | | if value: |
| | | if type(value) == bytes: |
| | | value = value.decode("utf-8") |
| | | data = json.loads(value) |
| | | _type = data["type"] |
| | | if _type == "l2_cmd": |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"订阅代码:{data}") |
| | | __start_time = time.time() |
| | | # 线程池 |
| | | __l2_cmd_thread_pool.submit( |
| | | lambda: l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)) |
| | | use_time = time.time() - __start_time |
| | | if use_time > 0.005: |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"l2_cmd耗时:{use_time}s") |
| | | |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | pipe_strategy = None |
| | | |
| | | |
| | |
| | | time.sleep(10) |
| | | |
| | | |
| | | def run(queue_r: multiprocessing.Queue, data_callbacks: list) -> None: |
| | | def run(codes, data_callbacks: list) -> None: |
| | | logger_system.info("L2进程ID:{}", os.getpid()) |
| | | logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | | log.close_print() |
| | | if queue_r is not None: |
| | | t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True) |
| | | t1.start() |
| | | # log.close_print() |
| | | |
| | | # 初始化 |
| | | data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) |
| | |
| | | global l2CommandManager |
| | | l2CommandManager = command_manager.L2CommandManager() |
| | | l2CommandManager.init(MyL2ActionCallback()) |
| | | if codes: |
| | | l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, |
| | | {"type": command_manager.CLIENT_TYPE_CMD_L2, "data": codes}) |
| | | logger_system.info("L2订阅服务启动成功") |
| | | except Exception as e: |
| | | logger_system.exception(e) |
| | |
| | | # 引入账户管理模块【进行资金和仓位管理】 |
| | | from strategy import kpl_api, data_cache, check_timer, all_K_line, instant_time_market, account_management, \ |
| | | order_methods, local_data_management, kpl_data_manager, market_sentiment_analysis |
| | | from huaxin_client import l2_market_client |
| | | from huaxin_client import l2_market_client, l2_client |
| | | from log_module import async_log_util |
| | | from trade import huaxin_trade_data_update |
| | | from trade import huaxin_trade_data_update, huaxin_trade_api |
| | | from utils import hx_qc_value_util, huaxin_util |
| | | |
| | | # 引入行情订阅模块 |
| | |
| | | # kpl_api.get_all_stocks_plate_dict(data_cache.min_stocks) |
| | | |
| | | |
| | | class MyPositionsL2DataCallback(L2DataCallBack): |
| | | __last_price_dict = {} |
| | | |
| | | def OnL2Transaction(self, code, datas): |
| | | if datas: |
| | | # 获取最近的成交价 |
| | | price, time_str = datas[-1][1], huaxin_util.convert_time(datas[-1][3]) |
| | | # TODO 涨停价变为非涨停价才处理 |
| | | self.__last_price_dict[code] = price |
| | | |
| | | def OnMarketData(self, code, datas): |
| | | # logger_debug.info(f"收到L2Market数据:{datas}") |
| | | for d in datas: |
| | | code = d["securityID"] |
| | | buy1 = d["buy"][0] |
| | | |
| | | def OnRealTimeBuy1Info(self, code, buy1_info): |
| | | # buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量] |
| | | async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}") |
| | | # L1DataProcessor.excute_sell_rule(code, buy1_info[3], buy1_info[1], "l2-real") |
| | | |
| | | |
| | | l2_data_callbacks = [] |
| | | |
| | | |
| | | def __subscript_position_l2(): |
| | | """ |
| | | 订阅持仓L2数据 |
| | | :return: |
| | | """ |
| | | position_result = huaxin_trade_api.get_position_list(blocking=True) |
| | | if not position_result or position_result["code"] != 0 or not position_result["data"]: |
| | | return |
| | | positions = position_result["data"] |
| | | subscript_codes = set() |
| | | for p in positions: |
| | | if p["historyPos"] > 0: |
| | | subscript_codes.add(p["securityID"]) |
| | | if not subscript_codes: |
| | | return |
| | | |
| | | for i in range(len(subscript_codes)): |
| | | l2_data_callbacks.append(MyPositionsL2DataCallback()) |
| | | l2_client.run(subscript_codes, l2_data_callbacks) |
| | | |
| | | |
| | | # 第三步:执行策略的初始设置 |
| | | if __name__ == '__main__': |
| | | class MyMarketDataCallback(l2_market_client.L2MarketDataCallback): |
| | |
| | | instant_time_market.get_current_info() |
| | | else: |
| | | instant_time_market.set_current_info(datas) |
| | | |
| | | |
| | | class MyL2DataCallback(L2DataCallBack): |
| | | |
| | | def OnL2Transaction(self, code, datas): |
| | | if datas: |
| | | # 获取最近的成交价 |
| | | price, time_str = datas[-1][1], huaxin_util.convert_time(datas[-1][3]) |
| | | pass |
| | | |
| | | def OnMarketData(self, code, datas): |
| | | # logger_debug.info(f"收到L2Market数据:{datas}") |
| | | for d in datas: |
| | | code = d["securityID"] |
| | | buy1 = d["buy"][0] |
| | | |
| | | def OnRealTimeBuy1Info(self, code, buy1_info): |
| | | # buy1_info: [买1时间,买1价格, 原始买1量, 实时买1量] |
| | | async_log_util.info(logger_debug, f"OnRealTimeBuy1Info:{code}-{buy1_info}") |
| | | # L1DataProcessor.excute_sell_rule(code, buy1_info[3], buy1_info[1], "l2-real") |
| | | |
| | | |
| | | # 加载开盘啦板块日志数据 |
| | |
| | | # 等待5s,等其他线程/进程启动完毕 |
| | | time.sleep(15) |
| | | |
| | | # 订阅持仓票 |
| | | threading.Thread(target=__subscript_position_l2, daemon=True).start() |
| | | |
| | | try: |
| | | # 初始化数据 |
| | | init() |