| | |
| | | class L2DataUploadManager: |
| | | def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, |
| | | transaction_queue_distribute_manager: CodeQueueDistributeManager, |
| | | market_data_queue: multiprocessing.Queue, order_ipc_hosts, data_callback_distribute_manager:CodeDataCallbackDistributeManager): |
| | | market_data_queue: multiprocessing.Queue, order_ipc_hosts, |
| | | data_callback_distribute_manager: CodeDataCallbackDistributeManager): |
| | | |
| | | self.order_queue_distribute_manager = order_queue_distribute_manager |
| | | self.transaction_queue_distribute_manager = transaction_queue_distribute_manager |
| | |
| | | if not self.data_callback_distribute_manager.get_distributed_callback(code): |
| | | self.data_callback_distribute_manager.distribute_callback(code) |
| | | |
| | | |
| | | if code not in self.temp_order_queue_dict: |
| | | self.temp_order_queue_dict[code] = collections.deque() |
| | | if code not in self.temp_transaction_queue_dict: |
| | |
| | | # 上传数据 |
| | | # self.__upload_l2_data(code, upload_queue, temp_list) |
| | | # self.__upload_l2_order_data(code, temp_list) |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list, time.time()) |
| | | __start_time = time.time() |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Order(code, temp_list, |
| | | time.time()) |
| | | use_time = time.time() - __start_time |
| | | if use_time > 0.01: |
| | | # 记录10ms以上的数据 |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_error, f"耗时:{use_time}s 结束数据:{temp_list[-1]}") |
| | | |
| | | temp_list = [] |
| | | |
| | | else: |
| | |
| | | if temp_list: |
| | | # 上传数据 |
| | | # self.__upload_l2_data(code, upload_queue, temp_list) |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code, temp_list) |
| | | self.data_callback_distribute_manager.get_distributed_callback(code).OnL2Transaction(code, |
| | | temp_list) |
| | | temp_list = [] |
| | | else: |
| | | if code not in self.temp_transaction_queue_dict: |
| | |
| | | import threading |
| | | |
| | | import constant |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_trade_record, logger_l2_trade, \ |
| | | logger_l2_s_cancel, logger_l2_h_cancel, logger_l2_l_cancel, logger_l2_error, logger_l2_d_cancel, logger_l2_f_cancel, \ |
| | | logger_l2_g_cancel |
| | | |
| | | |
| | | # 日志队列分配管理器 |
| | | # 为每一个代码分配专门的管理器 |
| | | class CodeLogQueueDistributeManager: |
| | | def __init__(self, l2_channel_count): |
| | | self.async_log_managers = [async_log_util.AsyncLogManager() for i in range(l2_channel_count)] |
| | | # 存放代码分配的日志索引 如:{"000333":2} |
| | | self.distributed_log_dict = {} |
| | | |
| | | def __get_avaiable_log_manager(self): |
| | | distributed_indexes = set([self.distributed_log_dict[x] for x in self.distributed_log_dict]) |
| | | all_indexes = set([i for i in range(0, len(self.async_log_managers))]) |
| | | available_indexes = all_indexes - distributed_indexes |
| | | if not available_indexes: |
| | | raise Exception("已分配完") |
| | | return available_indexes.pop() |
| | | |
| | | def distribute_log_manager(self, code): |
| | | if code in self.distributed_log_dict: |
| | | return |
| | | log_manager_index = self.__get_avaiable_log_manager() |
| | | if log_manager_index is not None: |
| | | self.distributed_log_dict[code] = log_manager_index |
| | | |
| | | def realase_log_manager(self, code): |
| | | if code in self.distributed_log_dict: |
| | | self.distributed_log_dict.pop(code) |
| | | |
| | | # 设置L2订阅代码 |
| | | def set_l2_subscript_codes(self, codes): |
| | | codes = set(codes) |
| | | now_codes = set([code for code in self.distributed_log_dict]) |
| | | del_codes = now_codes - codes |
| | | add_codes = codes - now_codes |
| | | for c in del_codes: |
| | | self.realase_log_manager(c) |
| | | for c in add_codes: |
| | | self.distribute_log_manager(c) |
| | | |
| | | def get_log_manager(self, code): |
| | | if code in self.distributed_log_dict: |
| | | return self.async_log_managers[self.distributed_log_dict[code]] |
| | | return None |
| | | |
| | | # 运行同步服务 |
| | | def run_async(self): |
| | | for m in self.async_log_managers: |
| | | threading.Thread(target=m.run_sync, daemon=True).start() |
| | | |
| | | |
| | | codeLogQueueDistributeManager = CodeLogQueueDistributeManager(constant.HUAXIN_L2_MAX_CODES_COUNT) |
| | | |
| | | threadIds = {} |
| | | |
| | | |
| | | def __add_async_log(logger_, code, content, *args): |
| | | try: |
| | | full_content = "" |
| | | if len(args) > 0: |
| | | async_log_util.debug(logger_, |
| | | ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args)) |
| | | full_content = ("thread-id={} code={} ".format(threadIds.get(code), code) + content).format(*args) |
| | | else: |
| | | async_log_util.debug(logger_, |
| | | "thread-id={} code={} ".format(threadIds.get(code), code) + content) |
| | | full_content = "thread-id={} code={} ".format(threadIds.get(code), code) + content |
| | | async_log_manager = codeLogQueueDistributeManager.get_log_manager(code) |
| | | if async_log_manager: |
| | | async_log_manager.debug(logger_, full_content) |
| | | else: |
| | | async_log_util.debug(logger_, full_content) |
| | | except Exception as e: |
| | | logger_l2_error.exception(e) |
| | | |
| | | |
| | | def debug(code, content, *args): |
| | | __add_async_log(logger_l2_trade, code, content, *args) |
| | | |
| | | |
| | | def info(code, logger_, content, *args): |
| | | __add_async_log(logger_, code, content, *args) |
| | | |
| | | |
| | | def buy_debug(code, content, *args): |
| | |
| | | def d_cancel_debug(code, content, *args): |
| | | __add_async_log(logger_l2_d_cancel, code, content, *args) |
| | | |
| | | |
| | | def f_cancel_debug(code, content, *args): |
| | | __add_async_log(logger_l2_f_cancel, code, content, *args) |
| | | |
| | | |
| | | def g_cancel_debug(code, content, *args): |
| | | __add_async_log(logger_l2_g_cancel, code, content, *args) |
| | | |
| | | |
| | | # 交易记录 |
| | | def trade_record(code, type, content, *args): |
| | |
| | | async_log_util.debug(logger_trade_record, |
| | | "thread-id={} code={} type={} data=".format(threadIds.get(code), code, |
| | | type) + content) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | codes = [] |
| | | for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): |
| | | codes.append(f"{i}" * 6) |
| | | codeLogQueueDistributeManager.set_l2_subscript_codes(codes) |
| | | codes[0] = "123456" |
| | | codeLogQueueDistributeManager.set_l2_subscript_codes(codes) |
| | | codeLogQueueDistributeManager.get_log_manager("000333") |
| | |
| | | |
| | | huaxin_l2_log = AsyncLogManager() |
| | | |
| | | log_queue = queue.Queue() |
| | | |
| | | |
| | | def __add_log(logger, time_out_log, method, *args): |
| | | start_time = time.time() |
| | | log_queue.put_nowait((logger, start_time, method, args)) |
| | | if time_out_log: |
| | | end_time = time.time() |
| | | sub_time = end_time - start_time |
| | | if sub_time > 0.01: |
| | | # 记录日志保存慢的日志 |
| | | __add_log(logger_debug, False, f"保存到日志队列用时:{sub_time}s") |
| | | __common_log = AsyncLogManager() |
| | | |
| | | |
| | | def debug(logger, *args): |
| | | __add_log(logger, True, "debug", *args) |
| | | __common_log.debug(logger, *args) |
| | | |
| | | |
| | | def info(logger, *args): |
| | | __add_log(logger, True, "info", *args) |
| | | __common_log.info(logger, *args) |
| | | |
| | | |
| | | def warning(logger, *args): |
| | | __add_log(logger, True, "warning", *args) |
| | | __common_log.warning(logger, *args) |
| | | |
| | | |
| | | def error(logger, *args): |
| | | __add_log(logger, True, "error", *args) |
| | | __common_log.error(logger, *args) |
| | | |
| | | |
| | | def exception(logger, *args): |
| | | __add_log(logger, True, "exception", *args) |
| | | __common_log.exception(logger, *args) |
| | | |
| | | |
| | | # 运行同步日志 |
| | | def run_sync(): |
| | | logger_system.info(f"async_log 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | val = log_queue.get() |
| | | time_s = val[1] |
| | | cmd = val[2] |
| | | method = getattr(val[0], cmd) |
| | | d = list(val[3]) |
| | | d[0] = f"[{tool.to_time_str(int(time_s))}.{str(time_s).split('.')[1][:6]}] " + d[0] |
| | | d = tuple(d) |
| | | method(*d) |
| | | except: |
| | | pass |
| | | __common_log.run_sync() |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | |
| | | huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.save_subscript_codes(datas) |
| | | # 上传数据 |
| | | codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes() |
| | | l2_log.codeLogQueueDistributeManager.set_l2_subscript_codes(codes) |
| | | |
| | | fresults = [] |
| | | if codes: |
| | | for code in codes: |
| | |
| | | use_time = int((time.time() - timestamp) * 1000) |
| | | thread_id = random.randint(0, 100000) |
| | | l2_log.threadIds[code] = thread_id |
| | | async_log_util.info(hx_logger_l2_orderdetail, |
| | | f"{code}#耗时:{use_time}-{thread_id}#{_datas}") |
| | | l2_log.info(code, hx_logger_l2_orderdetail, |
| | | f"{code}#耗时:{use_time}-{thread_id}#数量:{len(_datas)}#{_datas[-1]}") |
| | | |
| | | # l2_data_log.l2_time_log(code, "开始处理L2逐笔委托") |
| | | try: |
| | |
| | | trade_rules_count = len(TradeRuleManager().list_can_excut_rules_cache()) |
| | | |
| | | fdata = {"code": code, "total": 0, "available": 0, "sell_orders": [], "sell_rules_count": trade_rules_count, |
| | | "cost_price": 0, "cost_price_rate":0, |
| | | "cost_price": 0, "cost_price_rate": 0, |
| | | "code_info": (code, code_name), "desc": "".join(desc_list)} |
| | | if positions: |
| | | for d in positions: |
| | |
| | | |
| | | def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, queue_strategy_w_trade_r_for_read, |
| | | order_queues, transaction_queues, |
| | | market_queue, queue_l1_trade_w_strategy_r,order_ipc_hosts): |
| | | market_queue, queue_l1_trade_w_strategy_r, order_ipc_hosts): |
| | | logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | | # 执行一些初始化数据 |
| | |
| | | t1 = threading.Thread(target=lambda: async_log_util.run_sync(), daemon=True) |
| | | t1.start() |
| | | |
| | | # 同步L2的异步日志 |
| | | l2_log.codeLogQueueDistributeManager.run_async() |
| | | |
| | | t1 = threading.Thread(target=lambda: async_log_util.l2_data_log.run_sync(), daemon=True) |
| | | t1.start() |
| | | |