| | |
| | | time.sleep(10) |
| | | |
| | | |
| | | def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue], |
| | | transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue, data_callbacks:list) -> None: |
| | | |
| | | def run(queue_r: multiprocessing.Queue, data_callbacks: list) -> None: |
| | | logger_system.info("L2进程ID:{}", os.getpid()) |
| | | logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | |
| | | t1.start() |
| | | |
| | | # 初始化 |
| | | order_queue_distribute_manager = CodeQueueDistributeManager(order_queues) |
| | | transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues) |
| | | data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) |
| | | l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager, |
| | | transaction_queue_distribute_manager, market_queue, data_callback_distribute_manager) |
| | | l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager) |
| | | __init_l2(l2_data_upload_manager) |
| | | l2_data_manager.run_upload_common() |
| | | l2_data_manager.run_log() |
| | |
| | | import json |
| | | import logging |
| | | import marshal |
| | | import multiprocessing |
| | | import queue |
| | | import threading |
| | | import time |
| | |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | |
| | | # 活动时间 |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager, CodeDataCallbackDistributeManager |
| | | from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager |
| | | from log_module import async_log_util |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_system, logger_local_huaxin_l2_subscript, \ |
| | |
| | | |
| | | # L2上传数据管理器 |
| | | class L2DataUploadManager: |
| | | def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, |
| | | transaction_queue_distribute_manager: CodeQueueDistributeManager, |
| | | market_data_queue: multiprocessing.Queue, |
| | | data_callback_distribute_manager: CodeDataCallbackDistributeManager): |
| | | |
| | | self.order_queue_distribute_manager = order_queue_distribute_manager |
| | | self.transaction_queue_distribute_manager = transaction_queue_distribute_manager |
| | | self.market_data_queue = market_data_queue |
| | | def __init__(self, data_callback_distribute_manager: CodeDataCallbackDistributeManager): |
| | | self.data_callback_distribute_manager = data_callback_distribute_manager |
| | | # 代码分配的对象 |
| | | self.temp_order_queue_dict = {} |
| | |
| | | |
| | | # 分配上传队列 |
| | | def distribute_upload_queue(self, code): |
| | | if not self.order_queue_distribute_manager.get_distributed_queue(code): |
| | | self.order_queue_distribute_manager.distribute_queue(code) |
| | | if not self.transaction_queue_distribute_manager.get_distributed_queue(code): |
| | | self.transaction_queue_distribute_manager.distribute_queue(code) |
| | | if not self.data_callback_distribute_manager.get_distributed_callback(code): |
| | | self.data_callback_distribute_manager.distribute_callback(code) |
| | | |
| | |
| | | # 释放已经分配的队列 |
| | | |
| | | def release_distributed_upload_queue(self, code): |
| | | self.order_queue_distribute_manager.release_distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.release_distribute_queue(code) |
| | | self.data_callback_distribute_manager.release_distribute_callback(code) |
| | | if code in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code].clear() |
| | |
| | | def __run_upload_order_task(self, code): |
| | | q: collections.deque = self.temp_order_queue_dict.get(code) |
| | | temp_list = [] |
| | | queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) |
| | | upload_queue = queue_info[1] |
| | | while True: |
| | | try: |
| | | while len(q) > 0: |
| | |
| | | # 处理成交数据并上传 |
| | | def __run_upload_transaction_task(self, code): |
| | | q: collections.deque = self.temp_transaction_queue_dict.get(code) |
| | | queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) |
| | | upload_queue = queue_info[1] |
| | | temp_list = [] |
| | | while True: |
| | | try: |
| | |
| | | def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue, |
| | | queue_l1_w_strategy_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, order_queues_, transaction_queues_, |
| | | market_queue_, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_): |
| | | queue_strategy_w_trade_r_for_read_: multiprocessing.Queue, queue_l1_trade_r_strategy_w_, queue_l1_trade_w_strategy_r_): |
| | | logger_system.info("策略进程ID:{}", os.getpid()) |
| | | log.close_print() |
| | | # 初始化参数 |
| | |
| | | # |
| | | # 启动华鑫交易服务 |
| | | huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, |
| | | queue_strategy_w_trade_r_for_read_, order_queues_, |
| | | transaction_queues_, market_queue_, |
| | | queue_strategy_w_trade_r_for_read_, |
| | | queue_l1_trade_w_strategy_r_) |
| | | |
| | | |
| | |
| | | args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read)) |
| | | tradeProcess.start() |
| | | |
| | | # 创建L2通信队列 |
| | | order_queues = [] |
| | | transaction_queues = [] |
| | | market_queue = multiprocessing.Queue() |
| | | for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): |
| | | order_queues.append(multiprocessing.Queue()) |
| | | for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): |
| | | transaction_queues.append(multiprocessing.Queue()) |
| | | |
| | | # 此处将L2的进程与策略进程合并 |
| | | # L2 |
| | | # l2Process = multiprocessing.Process( |
| | |
| | | # l2Process.start() |
| | | # 将L2的进程改为进程执行 |
| | | threading.Thread(target=huaxin_client.l2_client.run, args=( |
| | | queue_other_w_l2_r, order_queues, transaction_queues, market_queue, |
| | | huaxin_trade_server.my_l2_data_callbacks), daemon=True).start() |
| | | queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start() |
| | | |
| | | # 主进程 |
| | | createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, |
| | | queue_strategy_w_trade_r_for_read, |
| | | order_queues, transaction_queues, market_queue, queue_l1_trade_r_strategy_w, |
| | | queue_strategy_w_trade_r_for_read, queue_l1_trade_r_strategy_w, |
| | | queue_l1_trade_w_strategy_r) |
| | | |
| | | # 将tradeServer作为主进程 |
| | |
| | | |
| | | def get_limit_up(): |
| | | while True: |
| | | if (tool.is_trade_time() and int(tool.get_now_time_str().replace(':', '')) > int("092530")) or True: |
| | | if (tool.is_trade_time() and int(tool.get_now_time_str().replace(':', '')) > int("092530")): |
| | | try: |
| | | results = kpl_api.getLimitUpInfoNew() |
| | | result = json.loads(results) |
| | |
| | | from l2.code_price_manager import Buy1PriceManager |
| | | from l2.huaxin import huaxin_target_codes_manager |
| | | from l2.huaxin.huaxin_target_codes_manager import HuaXinL1TargetCodesManager |
| | | from l2.l2_data_listen_manager import L2DataListenManager |
| | | from l2.l2_data_manager import TradePointManager |
| | | from l2.l2_data_util import L2DataUtil |
| | | from l2.l2_sell_manager import L2MarketSellManager |
| | |
| | | |
| | | def OnGetActiveListenCount(self, client_id, request_id): |
| | | try: |
| | | order = l2DataListenManager.get_active_count(L2DataListenManager.TYPE_ORDER) |
| | | transaction = l2DataListenManager.get_active_count(L2DataListenManager.TYPE_TRANSACTION) |
| | | market = l2DataListenManager.get_active_count(L2DataListenManager.TYPE_MARKET) |
| | | order = 0#l2DataListenManager.get_active_count(L2DataListenManager.TYPE_ORDER) |
| | | transaction = 0#l2DataListenManager.get_active_count(L2DataListenManager.TYPE_TRANSACTION) |
| | | market = 0#l2DataListenManager.get_active_count(L2DataListenManager.TYPE_MARKET) |
| | | result = {"code": 0, "data": {"order": order, "transaction": transaction, "market": market}} |
| | | self.send_response(result, client_id, request_id) |
| | | except Exception as e: |
| | |
| | | my_l2_data_callback = MyL2DataCallback() |
| | | my_l2_data_callbacks = [MyL2DataCallback() for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT)] |
| | | my_trade_response = MyTradeResponse() |
| | | l2DataListenManager: L2DataListenManager = None |
| | | |
| | | |
| | | # 做一些初始化的操作 |
| | |
| | | threading.Thread(target=run_pending, daemon=True).start() |
| | | |
| | | |
| | | def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, |
| | | order_queues, transaction_queues, |
| | | market_queue, queue_l1_trade_w_strategy_r): |
| | | def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, queue_l1_trade_w_strategy_r): |
| | | logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | | # 执行一些初始化数据 |
| | |
| | | middle_api_protocol.SERVER_PORT, |
| | | OutsideApiCommandCallback(), common_client_count=50) |
| | | manager.run(blocking=False) |
| | | |
| | | # 监听L2数据 |
| | | global l2DataListenManager |
| | | l2DataListenManager = L2DataListenManager(my_l2_data_callback) |
| | | l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue) |
| | | |
| | | # 启动交易服务 |
| | | huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r, |