New file |
| | |
| | | """ |
| | | 代码队列管理 |
| | | """ |
| | | |
| | | |
| | | class CodeQueueDistributeManager: |
| | | # queue_list |
| | | def __init__(self, queue_list: list): |
| | | flist = [] |
| | | for i in range(0, len(queue_list)): |
| | | flist.append((i, queue_list[i])) |
| | | self.queue_list = flist |
| | | self.distibuted_code_queue_dict = {} |
| | | |
| | | # 获取可用的队列 |
| | | def get_available_queue(self): |
| | | distibuted_queue_indexes = set() |
| | | for code in self.distibuted_code_queue_dict: |
| | | distibuted_queue_indexes.add(self.distibuted_code_queue_dict[code][0]) |
| | | for q_info in self.queue_list: |
| | | if q_info[0] not in distibuted_queue_indexes: |
| | | return q_info |
| | | return None |
| | | |
| | | # 为代码分配队列 |
| | | def distribute_queue(self, code): |
| | | if code in self.distibuted_code_queue_dict: |
| | | return self.distibuted_code_queue_dict.get(code) |
| | | q_info = self.get_available_queue() |
| | | if not q_info: |
| | | raise Exception("无可用的队列") |
| | | self.distibuted_code_queue_dict[code] = q_info |
| | | return q_info |
| | | |
| | | # 获取代码分配的队列 |
| | | def get_distributed_queue(self, code): |
| | | return self.distibuted_code_queue_dict.get(code) |
| | | |
| | | def release_distribute_queue(self, code): |
| | | if code in self.distibuted_code_queue_dict: |
| | | self.distibuted_code_queue_dict.pop(code) |
| | | |
| | | # 获取空闲的位置数量 |
| | | def get_free_queue_count(self): |
| | | return len(self.queue_list) - len(self.distibuted_code_queue_dict.keys()) |
| | |
| | | |
| | | class L2ActionCallback(object): |
| | | # 监听L2数据 |
| | | def OnSetL2Position(self, client_id, request_id, codes_data): |
| | | def OnSetL2Position(self, codes_data): |
| | | pass |
| | | |
| | | |
| | |
| | | |
| | | # L2指令管理 |
| | | class L2CommandManager: |
| | | action_callback = None |
| | | |
| | | @classmethod |
| | | def init(cls, l2_action_callback): |
| | |
| | | @classmethod |
| | | def process_command(cls, _type, client_id, result_json): |
| | | data = result_json["data"] |
| | | request_id = result_json["request_id"] |
| | | ctype = data["type"] |
| | | if not socket_util.is_client_params_sign_right(result_json): |
| | | # 签名出错 |
| | | SendResponseSkManager.send_error_response(_type, request_id, client_id, |
| | | {"code": -1, "msg": "签名错误"}) |
| | | return |
| | | codes_data = data["data"] |
| | | ctype = result_json["type"] |
| | | if ctype == CLIENT_TYPE_CMD_L2: |
| | | cls.action_callback.OnSetL2Position(client_id, request_id, codes_data) |
| | | cls.action_callback.OnSetL2Position(data) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | |
| | | # -*- coding: utf-8 -*- |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import os |
| | | import threading |
| | | import time |
| | |
| | | __latest_subscript_codes = set() |
| | | |
| | | |
| | | def __upload_codes_info(pipe_l2, datas): |
| | | def __upload_codes_info(queue_l1_w_strategy_r: multiprocessing.Queue, datas): |
| | | if not tool.is_trade_time(): |
| | | return |
| | | # 上传数据 |
| | |
| | | request_id = f"sb_{int(time.time() * 1000)}" |
| | | fdata = json.dumps( |
| | | {"type": type_, "data": {"data": datas}, "request_id": request_id, "time": round(time.time() * 1000, 0)}) |
| | | if pipe_l2 is not None: |
| | | pipe_l2.send(fdata) |
| | | if queue_l1_w_strategy_r is not None: |
| | | queue_l1_w_strategy_r.put_nowait(fdata) |
| | | # 记录新增加的代码 |
| | | codes = set([x[0] for x in datas]) |
| | | add_codes = codes - __latest_subscript_codes |
| | |
| | | pass |
| | | |
| | | |
| | | def run(pipe_l2): |
| | | def run(queue_l1_w_strategy_r): |
| | | logger_local_huaxin_l1.info("运行l1订阅服务") |
| | | codes_sh = [] |
| | | codes_sz = [] |
| | |
| | | # 测试链路 |
| | | # level1_data_dict["000969"] = ( |
| | | # "000969", 9.46, 9.11, 771000*100, time.time()) |
| | | # level1_data_dict["000961"] = ( |
| | | # "000961",1.93, 10.29, 2638000 * 100, time.time()) |
| | | level1_data_dict["002292"] = ( |
| | | "002292", 8.06, 9.96, 969500 * 100, time.time()) |
| | | |
| | | # 等待程序结束 |
| | | while True: |
| | |
| | | codes = [x[0] for x in datas] |
| | | print("代码数量:", len(datas)) |
| | | logger_l2_codes_subscript.info("开始#华鑫L1上传代码:数量-{}", len(datas)) |
| | | __upload_codes_info(pipe_l2, datas) |
| | | __upload_codes_info(queue_l1_w_strategy_r, datas) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |
| | |
| | | import queue |
| | | import threading |
| | | import time |
| | | from typing import List |
| | | |
| | | from huaxin_client import command_manager, l2_data_transform_protocol |
| | | from huaxin_client import constant |
| | | from huaxin_client import l2_data_manager |
| | | import lev2mdapi |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager |
| | | from huaxin_client.command_manager import L2ActionCallback |
| | | from huaxin_client.l2_data_manager import L2DataUploadManager |
| | | from log_module import log, async_log_util |
| | | from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \ |
| | | logger_local_huaxin_g_cancel, logger_l2_codes_subscript |
| | |
| | | SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725", |
| | | b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"] |
| | | SZ_Bond_Securities = [b"100303", b"109559", b"112617"] |
| | | spi = None |
| | | set_codes_data_queue = queue.Queue() |
| | | market_code_dict = {} |
| | | |
| | |
| | | |
| | | # 买入的大单订单号 |
| | | |
| | | def __init__(self, api): |
| | | def __init__(self, api, l2_data_upload_manager): |
| | | lev2mdapi.CTORATstpLev2MdSpi.__init__(self) |
| | | self.__api = api |
| | | self.is_login = False |
| | | self.l2_data_upload_manager = l2_data_upload_manager |
| | | |
| | | def __split_codes(self, codes): |
| | | szse_codes = [] |
| | |
| | | add_codes = codes - self.subscripted_codes |
| | | del_codes = self.subscripted_codes - codes |
| | | print("add del codes", add_codes, del_codes) |
| | | for c in codes: |
| | | l2_data_manager.add_target_code(c) |
| | | try: |
| | | for c in del_codes: |
| | | self.l2_data_upload_manager.release_distributed_upload_queue(c) |
| | | l2_data_manager.del_target_code(c) |
| | | for c in add_codes: |
| | | l2_data_manager.run_upload_task(c, l2_data_callback) |
| | | for c in codes: |
| | | self.l2_data_upload_manager.distribute_upload_queue(c) |
| | | l2_data_manager.add_target_code(c) |
| | | except Exception as e: |
| | | logger_system.error(f"L2代码分配上传队列出错:{str(e)}") |
| | | logger_system.exception(e) |
| | | self.__subscribe(add_codes) |
| | | self.__unsubscribe(del_codes) |
| | | |
| | |
| | | (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) |
| | | ]} |
| | | market_code_dict[pDepthMarketData['SecurityID']] = time.time() |
| | | |
| | | l2_data_manager.add_market_data(d) |
| | | self.l2_data_upload_manager.add_market_data(d) |
| | | except: |
| | | pass |
| | | |
| | |
| | | min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) |
| | | # 输出逐笔成交数据 |
| | | if pTransaction['ExecType'] == b"2": |
| | | # G撤数据暂时注释 |
| | | # transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code) |
| | | # if transaction_big_order_nos and pTransaction['BuyNo'] in transaction_big_order_nos: |
| | | # # 正在成交的订单撤单了 |
| | | # l2_data_manager.trading_order_canceled(code, pTransaction['BuyNo']) |
| | | # async_log_util.info(logger_local_huaxin_g_cancel, f"G撤撤单:{code} - {pTransaction['BuyNo']}") |
| | | if min_volume is None: |
| | | # 默认筛选50w |
| | | if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000: |
| | |
| | | item["Side"] = "2" |
| | | # 深证撤单 |
| | | print("逐笔委托", item) |
| | | |
| | | l2_data_manager.add_l2_order_detail(item, 0, True) |
| | | self.l2_data_upload_manager.add_l2_order_detail(item, 0, True) |
| | | else: |
| | | if abs(pTransaction['TradePrice'] - limit_up_price) < 0.201: |
| | | # 涨停价 |
| | |
| | | # return |
| | | # self.__last_transaction_keys_dict[code] = key |
| | | # print("逐笔成交", item) |
| | | l2_data_manager.add_transaction_detail(item) |
| | | self.l2_data_upload_manager.add_transaction_detail(item) |
| | | |
| | | def OnRtnOrderDetail(self, pOrderDetail): |
| | | can_listen = False |
| | |
| | | start_time = 0 |
| | | if code in self.special_code_volume_for_order_dict: |
| | | start_time = time.time() |
| | | if self.special_code_volume_for_order_dict[code][0] == pOrderDetail['Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']: |
| | | if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[ |
| | | 'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']: |
| | | # 监控目标订单与影子订单 |
| | | if self.special_code_volume_for_order_dict[code][1] > time.time(): |
| | | # 特殊量监听 |
| | |
| | | else: |
| | | self.special_code_volume_for_order_dict.pop(code) |
| | | if not can_listen: |
| | | # 暂时注释掉G撤相关数据产生 |
| | | # if pOrderDetail['OrderStatus'] == b'D': |
| | | # transaction_big_order_nos = l2_data_manager.get_latest_transaction_order_nos(code) |
| | | # if transaction_big_order_nos and pOrderDetail['OrderNO'] in transaction_big_order_nos: |
| | | # # 正在成交的订单撤单了 |
| | | # l2_data_manager.trading_order_canceled(code, pOrderDetail['OrderNO']) |
| | | # async_log_util.info(logger_local_huaxin_g_cancel, f"G撤撤单:{code} - {pOrderDetail['OrderNO']}") |
| | | min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) |
| | | if min_volume is None: |
| | | # 默认筛选50w |
| | |
| | | "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'], |
| | | "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], |
| | | "OrderStatus": pOrderDetail['OrderStatus'].decode()} |
| | | l2_data_manager.add_l2_order_detail(item, start_time) |
| | | self.l2_data_upload_manager.add_l2_order_detail(item, start_time) |
| | | |
| | | def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, |
| | | FirstLevelSellOrderVolumes): |
| | |
| | | |
| | | class MyL2ActionCallback(L2ActionCallback): |
| | | |
| | | def OnSetL2Position(self, client_id, request_id, codes_data): |
| | | def OnSetL2Position(self, codes_data): |
| | | print("L2订阅数量:", len(codes_data)) |
| | | logger_l2_codes_subscript.info("华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data)) |
| | | try: |
| | |
| | | logging.exception(e) |
| | | |
| | | |
| | | def __init_l2(): |
| | | def __init_l2(l2_data_upload_manager): |
| | | print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) |
| | | # case 1: Tcp方式 |
| | | # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP |
| | |
| | | # case 2非缓存模式 |
| | | api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) |
| | | global spi |
| | | spi = Lev2MdSpi(api) |
| | | spi = Lev2MdSpi(api, l2_data_upload_manager) |
| | | api.RegisterSpi(spi) |
| | | # -------------------正式模式------------------------------------- |
| | | if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: |
| | |
| | | api.Init() |
| | | |
| | | |
| | | def __receive_from_pipe_trade(queue_trade_w_l2_r: multiprocessing.Queue): |
| | | def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue): |
| | | logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | value = queue_trade_w_l2_r.get() |
| | | if value: |
| | | if type(value) == bytes: |
| | | value = value.decode("utf-8") |
| | | data = json.loads(value) |
| | | if data["type"] == "listen_volume": |
| | | _type = data["type"] |
| | | if _type == "listen_volume": |
| | | volume = data["data"]["volume"] |
| | | code = data["data"]["code"] |
| | | spi.set_code_special_watch_volume(code, volume) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | def __receive_from_pipe_strategy(pipe_): |
| | | logger_system.info(f"l2_client __receive_from_pipe_strategy 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | # print("__receive_from_pipe_strategy") |
| | | try: |
| | | val = pipe_.recv() |
| | | if val: |
| | | print("L2客户端接受到数据") |
| | | data = json.loads(val) |
| | | if data["data"]["type"] == "l2_cmd": |
| | | elif _type == "l2_cmd": |
| | | l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | |
| | | pipe_strategy = None |
| | | |
| | | |
| | | def run(queue_trade_w_l2_r: multiprocessing.Queue, _pipe_strategy, |
| | | _l2_data_callback: l2_data_transform_protocol.L2DataCallBack) -> None: |
| | | def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue], |
| | | transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None: |
| | | logger_system.info("L2进程ID:{}", os.getpid()) |
| | | logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | | log.close_print() |
| | | if queue_trade_w_l2_r is not None: |
| | | t1 = threading.Thread(target=lambda: __receive_from_pipe_trade(queue_trade_w_l2_r), daemon=True) |
| | | if queue_r is not None: |
| | | t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True) |
| | | t1.start() |
| | | if _pipe_strategy is not None: |
| | | global pipe_strategy |
| | | pipe_strategy = _pipe_strategy |
| | | t1 = threading.Thread(target=__receive_from_pipe_strategy, args=(_pipe_strategy,), daemon=True) |
| | | t1.start() |
| | | __init_l2() |
| | | global l2_data_callback |
| | | l2_data_callback = _l2_data_callback |
| | | l2_data_manager.run_upload_common(l2_data_callback) |
| | | l2_data_manager.run_upload_trading_canceled(l2_data_callback) |
| | | |
| | | # 初始化 |
| | | order_queue_distribute_manager = CodeQueueDistributeManager(order_queues) |
| | | transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues) |
| | | l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager, |
| | | transaction_queue_distribute_manager, market_queue) |
| | | __init_l2(l2_data_upload_manager) |
| | | l2_data_manager.run_upload_common() |
| | | l2_data_manager.run_log() |
| | | l2_data_manager.run_upload_daemon(l2_data_callback) |
| | | # l2_data_manager.run_test(l2_data_callback) |
| | | global l2CommandManager |
| | | l2CommandManager = command_manager.L2CommandManager() |
| | | l2CommandManager.init(MyL2ActionCallback()) |
| | |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | run(None, None, None) |
| | | # run(None, None, None) |
| | | # spi.set_codes_data([("000333", 12000)]) |
| | | input() |
| | |
| | | # -*- coding: utf-8 -*- |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import queue |
| | | import random |
| | | import threading |
| | | import time |
| | | from huaxin_client import socket_util, l2_data_transform_protocol |
| | | from huaxin_client import socket_util |
| | | |
| | | from huaxin_client.client_network import SendResponseSkManager |
| | | |
| | | # 活动时间 |
| | | from huaxin_client.l2_data_transform_protocol import L2DataCallBack |
| | | from log_module import log_export, async_log_util |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_local_huaxin_l2_upload, logger_local_huaxin_l2_buy_no, \ |
| | | logger_local_huaxin_g_cancel, hx_logger_contact_debug, logger_system, logger_local_huaxin_l2_orderdetail |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_local_huaxin_l2_error, logger_system |
| | | from utils import tool |
| | | |
| | | order_detail_upload_active_time_dict = {} |
| | |
| | | target_codes = set() |
| | | target_codes_add_time = {} |
| | | common_queue = queue.Queue() |
| | | trading_canceled_queue = queue.Queue() |
| | | log_buy_no_queue = queue.Queue() |
| | | # 买入订单号的字典 |
| | | buy_order_nos_dict = {} |
| | | # 最近的大单成交单号 |
| | | latest_big_order_transaction_orders_dict = {} |
| | | |
| | | |
| | | # L2上传数据管理器 |
| | | class L2DataUploadManager: |
| | | def __init__(self, order_queue_distribute_manager: CodeQueueDistributeManager, |
| | | transaction_queue_distribute_manager: CodeQueueDistributeManager, |
| | | market_data_queue: multiprocessing.Queue): |
| | | self.order_queue_distribute_manager = order_queue_distribute_manager |
| | | self.transaction_queue_distribute_manager = transaction_queue_distribute_manager |
| | | self.market_data_queue = market_data_queue |
| | | |
| | | # 添加委托详情 |
| | | def add_l2_order_detail(self, data, start_time, istransaction=False): |
| | | code = data["SecurityID"] |
| | | queue_info = self.order_queue_distribute_manager.get_distributed_queue(code) |
| | | if not queue_info: |
| | | return |
| | | queue_info[1].put_nowait( |
| | | (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | |
| | | # 添加逐笔成交 |
| | | def add_transaction_detail(self, data): |
| | | code = data["SecurityID"] |
| | | queue_info = self.transaction_queue_distribute_manager.get_distributed_queue(code) |
| | | if not queue_info: |
| | | return |
| | | # 判断是否为大单成交 |
| | | queue_info[1].put_nowait((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | data['SellNo'], data['ExecType'])) |
| | | |
| | | def add_market_data(self, data): |
| | | # 加入上传队列 |
| | | self.market_data_queue.put_nowait(data) |
| | | |
| | | # 分配上传队列 |
| | | def distribute_upload_queue(self, code): |
| | | self.order_queue_distribute_manager.distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.distribute_queue(code) |
| | | |
| | | # 释放已经分配的队列 |
| | | def release_distributed_upload_queue(self, code): |
| | | self.order_queue_distribute_manager.release_distribute_queue(code) |
| | | self.transaction_queue_distribute_manager.release_distribute_queue(code) |
| | | |
| | | |
| | | def add_target_code(code): |
| | |
| | | target_codes.discard(code) |
| | | if code in target_codes_add_time: |
| | | target_codes_add_time.pop(code) |
| | | |
| | | |
| | | # 获取最近的大单成交订单号 |
| | | def get_latest_transaction_order_nos(code): |
| | | return latest_big_order_transaction_orders_dict.get(code) |
| | | |
| | | |
| | | # 正在成交的订单撤单了 |
| | | def trading_order_canceled(code_, order_no): |
| | | trading_canceled_queue.put((code_, order_no)) |
| | | |
| | | |
| | | # 添加委托详情 |
| | | def add_l2_order_detail(data, start_time, istransaction=False): |
| | | code = data["SecurityID"] |
| | | # 异步日志记录 |
| | | if code not in tmep_order_detail_queue_dict: |
| | | tmep_order_detail_queue_dict[code] = queue.Queue() |
| | | # 原来的格式 |
| | | # {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'], |
| | | # "Volume": pOrderDetail['Volume'], |
| | | # "Side": pOrderDetail['Side'].decode(), "OrderType": pOrderDetail['OrderType'].decode(), |
| | | # "OrderTime": pOrderDetail['OrderTime'], "MainSeq": pOrderDetail['MainSeq'], |
| | | # "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], |
| | | # "OrderStatus": pOrderDetail['OrderStatus'].decode()} |
| | | |
| | | # 用于G撤的数据,暂时注释 |
| | | # if data['Side'] == "1": |
| | | # # 记录所有买入的订单号 |
| | | # if data['SecurityID'] not in buy_order_nos_dict: |
| | | # buy_order_nos_dict[data['SecurityID']] = set() |
| | | # buy_order_nos_dict[data['SecurityID']].add(data['OrderNO']) |
| | | # # 买入订单号需要记录日志 |
| | | # async_log_util.huaxin_l2_log.info(logger_local_huaxin_l2_buy_no, f"{data['SecurityID']}#{data['OrderNO']}") |
| | | |
| | | tmep_order_detail_queue_dict[code].put( |
| | | (data['SecurityID'], data['Price'], data['Volume'], data['Side'], data['OrderType'], data['OrderTime'], |
| | | data['MainSeq'], data['SubSeq'], data['OrderNO'], data['OrderStatus'], time.time(), start_time)) |
| | | |
| | | |
| | | # 添加逐笔成交 |
| | | def add_transaction_detail(data): |
| | | code = data["SecurityID"] |
| | | if code not in tmep_transaction_queue_dict: |
| | | tmep_transaction_queue_dict[code] = queue.Queue() |
| | | # 原来的格式 |
| | | # item = {"SecurityID": pTransaction['SecurityID'], "TradePrice": pTransaction['TradePrice'], |
| | | # "TradeVolume": pTransaction['TradeVolume'], |
| | | # "OrderTime": pTransaction['TradeTime'], "MainSeq": pTransaction['MainSeq'], |
| | | # "SubSeq": pTransaction['SubSeq'], "BuyNo": pTransaction['BuyNo'], "SellNo": pTransaction['SellNo'], |
| | | # "ExecType": pTransaction['ExecType'].decode()} |
| | | |
| | | # 判断是否为大单成交 |
| | | code = data['SecurityID'] |
| | | # G撤相关数据操作暂时注释 |
| | | # if code in buy_order_nos_dict: |
| | | # if data['BuyNo'] in buy_order_nos_dict[code]: |
| | | # try: |
| | | # temp_list = latest_big_order_transaction_orders_dict.get(code) |
| | | # if not temp_list: |
| | | # temp_list = [] |
| | | # if temp_list: |
| | | # if temp_list[-1] != data['BuyNo']: |
| | | # # 不加入重复订单号 |
| | | # temp_list.append(data['BuyNo']) |
| | | # if len(temp_list) > 10: |
| | | # # 最多加10个订单号 |
| | | # temp_list = temp_list[-10:] |
| | | # else: |
| | | # temp_list.append(data['BuyNo']) |
| | | # latest_big_order_transaction_orders_dict[code] = temp_list |
| | | # except: |
| | | # pass |
| | | tmep_transaction_queue_dict[code].put((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | data['SellNo'], data['ExecType'])) |
| | | |
| | | |
| | | def add_market_data(data): |
| | | code = data['securityID'] |
| | | # 加入上传队列 |
| | | common_queue.put((code, "l2_market_data", data)) |
| | | |
| | | |
| | | def add_subscript_codes(codes): |
| | | print("add_subscript_codes", codes) |
| | |
| | | |
| | | # 上传数据 |
| | | def upload_data(code, _type, datas, new_sk=False): |
| | | uid = random.randint(0, 100000) |
| | | key = f"{_type}_{code}" |
| | | fdata = json.dumps( |
| | | {"type": _type, "data": {"code": code, "data": datas, "time": round(time.time() * 1000)}}) |
| | |
| | | logging.exception(e) |
| | | finally: |
| | | pass |
| | | # print("请求结束", uid, result) |
| | | # logger_local_huaxin_l2_upload.info( |
| | | # f"{code} 上传数据耗时-{_type}: {round((time.time() - start_time) * 1000, 1)} 数据量:{len(datas)}") |
| | | # print("上传结果", result) |
| | | |
| | | |
| | | # 循环读取上传数据 |
| | | def __run_upload_order(code: str, l2_data_callback: L2DataCallBack) -> None: |
| | | if code not in tmep_order_detail_queue_dict: |
| | | tmep_order_detail_queue_dict[code] = queue.Queue() |
| | | if True: |
| | | while True: |
| | | # print("order task") |
| | | try: |
| | | if code not in target_codes: |
| | | break |
| | | order_detail_upload_active_time_dict[code] = time.time() |
| | | udatas = [] |
| | | while not tmep_order_detail_queue_dict[code].empty(): |
| | | temp = tmep_order_detail_queue_dict[code].get() |
| | | udatas.append(temp) |
| | | if udatas: |
| | | # start_time = time.time() |
| | | # upload_data(code, "l2_order", udatas) |
| | | l2_data_callback.OnL2Order(code, udatas, int(time.time() * 1000)) |
| | | # l2_data_transaction_protocol.send_l2_order_detail(pipe, _mmap, code, udatas) |
| | | # use_time = int((time.time() - start_time) * 1000) |
| | | # if use_time > 10: |
| | | # async_log_util.info(logger_local_huaxin_l2_upload, f"{code}-上传代码耗时:{use_time}ms") |
| | | else: |
| | | # 没有数据的时候需等待,有数据时不需等待 |
| | | time.sleep(0.001) |
| | | except Exception as e: |
| | | hx_logger_contact_debug.exception(e) |
| | | logger_local_huaxin_l2_error.error(f"上传订单数据出错:{str(e)}") |
| | | pass |
| | | |
| | | |
| | | def __run_upload_trans(code, l2_data_callback: L2DataCallBack): |
| | | if code not in tmep_transaction_queue_dict: |
| | | tmep_transaction_queue_dict[code] = queue.Queue() |
| | | while True: |
| | | # print("trans task") |
| | | try: |
| | | if code not in target_codes: |
| | | break |
| | | transaction_upload_active_time_dict[code] = time.time() |
| | | udatas = [] |
| | | while not tmep_transaction_queue_dict[code].empty(): |
| | | temp = tmep_transaction_queue_dict[code].get() |
| | | udatas.append(temp) |
| | | if udatas: |
| | | # upload_data(code, "l2_trans", udatas) |
| | | l2_data_callback.OnL2Transaction(code, udatas) |
| | | time.sleep(0.01) |
| | | except Exception as e: |
| | | logger_local_huaxin_l2_error.error(f"上传成交数据出错:{str(e)}") |
| | | |
| | | |
| | | def __run_upload_common(l2_data_callback: L2DataCallBack): |
| | | def __run_upload_common(): |
| | | print("__run_upload_common") |
| | | logger_system.info(f"l2_client __run_upload_common 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | while not common_queue.empty(): |
| | | temp = common_queue.get() |
| | | if temp[1] == "l2_market_data": |
| | | l2_data_callback.OnMarketData(temp[0], temp[2]) |
| | | else: |
| | | upload_data(temp[0], temp[1], temp[2]) |
| | | |
| | | except Exception as e: |
| | |
| | | time.sleep(0.01) |
| | | |
| | | |
| | | def __run_upload_trading_canceled(l2_data_callback: L2DataCallBack): |
| | | print("__run_upload_trading_canceled") |
| | | logger_system.info(f"l2_client __run_upload_trading_canceled 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | temp = trading_canceled_queue.get() |
| | | if temp: |
| | | logger_local_huaxin_g_cancel.info(f"准备上报:{temp}") |
| | | # upload_data(temp[0], "trading_order_canceled", temp[1], new_sk=True) |
| | | l2_data_callback.OnTradingOrderCancel(temp[0], temp[1]) |
| | | logger_local_huaxin_g_cancel.info(f"上报成功:{temp}") |
| | | except Exception as e: |
| | | logger_local_huaxin_l2_error.exception(e) |
| | | logger_local_huaxin_l2_error.error(f"上传普通数据出错:{str(e)}") |
| | | |
| | | |
| | | def __run_log(): |
| | | print("__run_log") |
| | | logger_system.info(f"l2_client __run_log 线程ID:{tool.get_thread_id()}") |
| | | async_log_util.huaxin_l2_log.run_sync() |
| | | |
| | | |
| | | __upload_order_threads = {} |
| | | __upload_trans_threads = {} |
| | | |
| | | |
| | | # 运行上传任务 |
| | | def run_upload_task(code: str, l2_data_callback: L2DataCallBack) -> None: |
| | | try: |
| | | # 如果代码没有在目标代码中就不需要运行 |
| | | if code not in target_codes: |
| | | return |
| | | # 如果最近的活动时间小于2s就不需要运行 |
| | | if code not in order_detail_upload_active_time_dict or time.time() - order_detail_upload_active_time_dict[ |
| | | code] > 2: |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, l2_data_callback), daemon=True) |
| | | t.start() |
| | | __upload_order_threads[code] = t |
| | | |
| | | if code not in transaction_upload_active_time_dict or time.time() - transaction_upload_active_time_dict[ |
| | | code] > 2: |
| | | t = threading.Thread(target=lambda: __run_upload_trans(code, l2_data_callback), daemon=True) |
| | | t.start() |
| | | __upload_trans_threads[code] = t |
| | | finally: |
| | | pass |
| | | |
| | | |
| | | def run_upload_common(l2_data_callback: L2DataCallBack): |
| | | t = threading.Thread(target=lambda: __run_upload_common(l2_data_callback), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | | def run_upload_trading_canceled(l2_data_callback: L2DataCallBack): |
| | | t = threading.Thread(target=lambda: __run_upload_trading_canceled(l2_data_callback), daemon=True) |
| | | # 采用socket传输数据 |
| | | def run_upload_common(): |
| | | t = threading.Thread(target=lambda: __run_upload_common(), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | | def run_log(): |
| | | # G撤相关数据,暂时注释 |
| | | # fdatas = log_export.load_huaxin_local_buy_no() |
| | | # global buy_order_nos_dict |
| | | # buy_order_nos_dict = fdatas |
| | | t = threading.Thread(target=lambda: __run_log(), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | | # 运行守护线程 |
| | | def run_upload_daemon(_l2_data_callback): |
| | | def upload_daemon(): |
| | | logger_system.info(f"l2_client upload_daemon 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | | try: |
| | | for code in target_codes_add_time: |
| | | # 目标代码加入2s之后启动守护 |
| | | if time.time() - target_codes_add_time[code] > 2: |
| | | if code not in __upload_order_threads or not __upload_order_threads[code].is_alive(): |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), |
| | | daemon=True) |
| | | t.start() |
| | | __upload_order_threads[code] = t |
| | | logger_local_huaxin_l2_upload.info(f"重新创建L2订单上传线程:{code}") |
| | | if code not in __upload_trans_threads or not __upload_trans_threads[code].is_alive(): |
| | | t = threading.Thread(target=lambda: __run_upload_trans(code, _l2_data_callback), |
| | | daemon=True) |
| | | t.start() |
| | | __upload_trans_threads[code] = t |
| | | logger_local_huaxin_l2_upload.info(f"重新创建L2成交上传线程:{code}") |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(3) |
| | | |
| | | t = threading.Thread(target=lambda: upload_daemon(), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | | def __test(_l2_data_callback): |
| | | def __test(): |
| | | code = "002073" |
| | | if code not in tmep_order_detail_queue_dict: |
| | | tmep_order_detail_queue_dict[code] = queue.Queue() |
| | | target_codes.add(code) |
| | | t = threading.Thread(target=lambda: __run_upload_order(code, _l2_data_callback), daemon=True) |
| | | t.start() |
| | | while True: |
| | | try: |
| | | tmep_order_detail_queue_dict[code].put_nowait( |
| | | ['002073', 0.0, 88100, '1', '2', 103831240, 2011, 18190761, 18069131, 'D', 1693276711224]) |
| | | time.sleep(5) |
| | | except: |
| | | pass |
| | | |
| | | |
| | | def run_test(_l2_data_callback): |
| | | t = threading.Thread(target=lambda: __test(_l2_data_callback), daemon=True) |
| | | def run_test(): |
| | | t = threading.Thread(target=lambda: __test(), daemon=True) |
| | | t.start() |
| | | |
| | | |
| | |
| | | if ret != 0: |
| | | raise Exception('ReqOrderInsert fail, ret[%d]' % ret) |
| | | |
| | | if queue_trade_w_l2_r is not None: |
| | | queue_trade_w_l2_r.put_nowait( |
| | | if queue_other_w_l2_r is not None: |
| | | queue_other_w_l2_r.put_nowait( |
| | | json.dumps({"type": "listen_volume", "data": {"code": code, |
| | | "volume": count}}).encode( |
| | | 'utf-8')) |
| | |
| | | addr, port = constant.SERVER_IP, constant.SERVER_PORT |
| | | |
| | | |
| | | def run(trade_response_: TradeResponse = None, queue_trade_w_l2_r_: multiprocessing.Queue = None, |
| | | def run(trade_response_: TradeResponse = None, queue_other_w_l2_r_: multiprocessing.Queue = None, |
| | | queue_strategy_trade_write_=None, |
| | | queue_strategy_trade_read=None): |
| | | try: |
| | | logger_system.info("交易进程ID:{}", os.getpid()) |
| | | logger_system.info(f"trade 线程ID:{tool.get_thread_id()}") |
| | | __init_trade_data_server() |
| | | global queue_trade_w_l2_r |
| | | queue_trade_w_l2_r = queue_trade_w_l2_r_ |
| | | global queue_other_w_l2_r |
| | | queue_other_w_l2_r = queue_other_w_l2_r_ |
| | | |
| | | global queue_strategy_trade_write |
| | | queue_strategy_trade_write = queue_strategy_trade_write_ |
New file |
| | |
| | | """ |
| | | L2数据监听 |
| | | """ |
| | | import multiprocessing |
| | | import threading |
| | | import time |
| | | |
| | | from log_module import async_log_util |
| | | from log_module.log import logger_debug |
| | | |
| | | __l2_order_active_time_dict = {} |
| | | __l2_transaction_active_time_dict = {} |
| | | |
| | | |
| | | class L2DataListenManager: |
| | | TYPE_ORDER = "order" |
| | | TYPE_TRANSACTION = "transaction" |
| | | TYPE_MARKET = "market" |
| | | |
| | | def __init__(self, l2_data_callback): |
| | | self.my_l2_data_callback = l2_data_callback |
| | | self.__l2_order_active_time_dict = {} |
| | | self.__l2_transaction_active_time_dict = {} |
| | | self.__l2_market_active_time_dict = {} |
| | | |
| | | # 接收L2逐笔委托数据 |
| | | def __recive_l2_orders(self, q: multiprocessing.Queue): |
| | | __id = id(q) |
| | | count = 0 |
| | | while True: |
| | | datas_dict = {} |
| | | try: |
| | | while not q.empty(): |
| | | item = q.get() |
| | | if item[0] not in datas_dict: |
| | | datas_dict[item[0]] = [] |
| | | datas_dict[item[0]].append(item) |
| | | if datas_dict: |
| | | for c in datas_dict: |
| | | self.my_l2_data_callback.OnL2Order(c, datas_dict[c], datas_dict[c][0][10]) |
| | | else: |
| | | time.sleep(0.002) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_debug, e) |
| | | finally: |
| | | datas_dict.clear() |
| | | count += 1 |
| | | if count > 100: |
| | | count = 0 |
| | | # 记录活跃时间,每100次记录一次 |
| | | self.__l2_order_active_time_dict[__id] = time.time() |
| | | |
| | | # 接收L2逐笔成交数据 |
| | | def __recive_transaction_orders(self, q: multiprocessing.Queue): |
| | | __id = id(q) |
| | | datas_dict = {} |
| | | count = 0 |
| | | while True: |
| | | try: |
| | | while not q.empty(): |
| | | item = q.get() |
| | | if item[0] not in datas_dict: |
| | | datas_dict[item[0]] = [] |
| | | datas_dict[item[0]].append(item) |
| | | if datas_dict: |
| | | for c in datas_dict: |
| | | self.my_l2_data_callback.OnL2Transaction(c, datas_dict[c]) |
| | | else: |
| | | time.sleep(0.01) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_debug, e) |
| | | finally: |
| | | datas_dict.clear() |
| | | count += 1 |
| | | if count > 50: |
| | | count = 0 |
| | | # 记录活跃时间,每100次记录一次 |
| | | self.__l2_transaction_active_time_dict[__id] = time.time() |
| | | |
| | | def __recive_l2_markets(self, q: multiprocessing.Queue): |
| | | __id = id(q) |
| | | while True: |
| | | try: |
| | | if not q.empty(): |
| | | item = q.get() |
| | | self.my_l2_data_callback.OnMarketData(item['securityID'], item) |
| | | else: |
| | | time.sleep(0.002) |
| | | except Exception as e: |
| | | async_log_util.exception(logger_debug, e) |
| | | finally: |
| | | self.__l2_market_active_time_dict[__id] = time.time() |
| | | |
| | | # 接收L2数据 |
| | | def receive_l2_data(self, order_queues, transaction_queues, market_queue): |
| | | for q in order_queues: |
| | | t1 = threading.Thread(target=lambda: self.__recive_l2_orders(q), daemon=True) |
| | | t1.start() |
| | | for q in transaction_queues: |
| | | t2 = threading.Thread(target=lambda: self.__recive_transaction_orders(q), daemon=True) |
| | | t2.start() |
| | | t3 = threading.Thread(target=lambda: self.__recive_l2_markets(market_queue), daemon=True) |
| | | t3.start() |
| | | |
| | | def get_active_count(self, type_): |
| | | expire_time = time.time() - 5 |
| | | active_count = 0 |
| | | if type_ == self.TYPE_ORDER: |
| | | for _id in self.__l2_order_active_time_dict: |
| | | if self.__l2_order_active_time_dict[_id] > expire_time: |
| | | active_count += 1 |
| | | elif type_ == self.TYPE_TRANSACTION: |
| | | for _id in self.__l2_transaction_active_time_dict: |
| | | if self.__l2_transaction_active_time_dict[_id] > expire_time: |
| | | active_count += 1 |
| | | elif type_ == self.TYPE_MARKET: |
| | | for _id in self.__l2_market_active_time_dict: |
| | | if self.__l2_market_active_time_dict[_id] > expire_time: |
| | | active_count += 1 |
| | | return active_count |
| | |
| | | # from huaxin_api import trade_client, l2_client, l1_client |
| | | |
| | | |
| | | def createTradeServer(pipe_server, queue_strategy_r_trade_w: multiprocessing.Queue, pipe_l1, pipe_l2, queue_trade_w_l2_r: multiprocessing.Queue, psl2_l2, queue_strategy_w_trade_r: multiprocessing.Queue): |
| | | def createTradeServer(pipe_server, queue_strategy_r_trade_w_: multiprocessing.Queue, |
| | | queue_l1_w_strategy_r_: multiprocessing.Queue, |
| | | queue_strategy_w_trade_r_: multiprocessing.Queue, order_queues_, transaction_queues_, market_queue_): |
| | | logger_system.info("策略进程ID:{}", os.getpid()) |
| | | log.close_print() |
| | | # 初始化参数 |
| | |
| | | t1.start() |
| | | # |
| | | # 交易接口服务 |
| | | t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", args=(pipe_server, pipe_l2), |
| | | t1 = threading.Thread(target=huaxin_trade_api_server.run, name="trade_api_server", |
| | | args=(pipe_server, queue_other_w_l2_r), |
| | | daemon=True) |
| | | t1.start() |
| | | # |
| | |
| | | t1 = threading.Thread(target=redis_manager.RedisUtils.run_loop, name="redis", daemon=True) |
| | | t1.start() |
| | | # |
| | | # 启动L2订阅服务 |
| | | t1 = threading.Thread(target=huaxin_client.l2_client.run, name="l2_client", |
| | | args=(queue_trade_w_l2_r, psl2_l2, huaxin_trade_server.my_l2_data_callback), |
| | | daemon=True) |
| | | t1.start() |
| | | # |
| | | # 启动华鑫交易服务 |
| | | huaxin_trade_server.run(queue_strategy_r_trade_w, pipe_l1, pipe_l2, queue_strategy_w_trade_r) |
| | | huaxin_trade_server.run(queue_strategy_r_trade_w_, queue_l1_w_strategy_r_, queue_strategy_w_trade_r_, order_queues_, |
| | | transaction_queues_, market_queue_) |
| | | |
| | | |
| | | # 主服务 |
| | |
| | | # 策略与server间的通信 |
| | | pss_server, pss_strategy = multiprocessing.Pipe() |
| | | |
| | | # 交易写L2读 |
| | | queue_trade_w_l2_r = multiprocessing.Queue() |
| | | # 策略与l2之间的通信 |
| | | psl2_strategy, psl2_l2 = multiprocessing.Pipe() |
| | | |
| | | # l1与策略间的通信 |
| | | pl1t_l1, pl1t_strategy = multiprocessing.Pipe() |
| | | # L2读其他写 |
| | | queue_other_w_l2_r = multiprocessing.Queue() |
| | | # |
| | | queue_l1_w_strategy_r = multiprocessing.Queue() |
| | | |
| | | # 交易读策略写 |
| | | queue_strategy_w_trade_r = multiprocessing.Queue() |
| | |
| | | logger_system.info("主进程ID:{}", os.getpid()) |
| | | |
| | | # L1订阅数据 |
| | | l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(pl1t_l1,)) |
| | | l1Process = multiprocessing.Process(target=huaxin_client.l1_client.run, args=(queue_l1_w_strategy_r,)) |
| | | l1Process.start() |
| | | |
| | | # 交易进程 |
| | | tradeProcess = multiprocessing.Process( |
| | | target=lambda: huaxin_client.trade_client.run(None, queue_trade_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r)) |
| | | target=huaxin_client.trade_client.run, |
| | | args=(None, queue_other_w_l2_r, queue_strategy_r_trade_w, queue_strategy_w_trade_r,)) |
| | | tradeProcess.start() |
| | | |
| | | # 创建L2通信队列 |
| | | order_queues = [] |
| | | transaction_queues = [] |
| | | market_queue = multiprocessing.Queue() |
| | | for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): |
| | | order_queues.append(multiprocessing.Queue()) |
| | | for i in range(constant.HUAXIN_L2_MAX_CODES_COUNT): |
| | | transaction_queues.append(multiprocessing.Queue()) |
| | | |
| | | # L2 |
| | | l2Process = multiprocessing.Process( |
| | | target=huaxin_client.l2_client.run, |
| | | args=(queue_other_w_l2_r, order_queues, transaction_queues, market_queue)) |
| | | l2Process.start() |
| | | |
| | | # 主进程 |
| | | createTradeServer(pss_strategy, queue_strategy_r_trade_w, pl1t_strategy, psl2_strategy, queue_trade_w_l2_r, psl2_l2, queue_strategy_w_trade_r) |
| | | createTradeServer(pss_strategy, queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, |
| | | order_queues, transaction_queues, market_queue) |
| | | |
| | | # 将tradeServer作为主进程 |
| | | l1Process.join() |
| | | l2Process.join() |
| | | tradeProcess.join() |
| | | except Exception as e: |
| | | logging.exception(e) |
| | |
| | | API_TYPE_SYNC_L1_TARGET_CODES = "sync_l1_subscript_codes" # 同步L1需要订阅的代码 |
| | | API_TYPE_SYSTEM_LOG = "system_log" # 系统日志 |
| | | API_TYPE_GET_FROM_DATA_SERVER = "get_from_data_server" # 从数据服务器拉取数据 |
| | | API_TYPE_CODE_TRADE_INFO = "code_trade_info" |
| | | API_TYPE_CODE_TRADE_INFO = "code_trade_info" # 代码交易信息 |
| | | API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT = "l2_listen_active_count" # L2有效监听数量 |
| | | |
| | | |
| | | class ActionCallback(object): |
| | | # 交易 |
| | |
| | | |
| | | # 代码的交易信息 |
| | | def OnGetCodeTradeInfo(self, client_id, request_id, data): |
| | | pass |
| | | |
| | | def OnGetActiveListenCount(self, client_id, request_id): |
| | | pass |
| | | |
| | | |
| | |
| | | cls.action_callback.OnGetFromDataServer(client_id, request_id, data) |
| | | elif content_type == API_TYPE_CODE_TRADE_INFO: |
| | | cls.action_callback.OnGetCodeTradeInfo(client_id, request_id, data) |
| | | elif content_type == API_TYPE_CODE_L2_LISTEN_ACTIVE_COUNT: |
| | | cls.action_callback.OnGetActiveListenCount(client_id, request_id) |
| | | |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | pass |
New file |
| | |
| | | import logging |
| | | import multiprocessing |
| | | import time |
| | | |
| | | |
| | | def run_process1(queue: multiprocessing.Queue): |
| | | while True: |
| | | try: |
| | | queue.put_nowait("process1") |
| | | time.sleep(1) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | def run_process2(queue: multiprocessing.Queue): |
| | | while True: |
| | | try: |
| | | queue.put_nowait("process2") |
| | | time.sleep(1) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | def run_process3(queue: multiprocessing.Queue): |
| | | while True: |
| | | try: |
| | | print(queue.get()) |
| | | time.sleep(0.001) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | q = multiprocessing.Queue() |
| | | p1 = multiprocessing.Process(target=run_process1, args=(q,)) |
| | | p2 = multiprocessing.Process(target=run_process2, args=(q,)) |
| | | p3 = multiprocessing.Process(target=run_process3, args=(q,)) |
| | | p1.start() |
| | | p2.start() |
| | | p3.start() |
| | | |
| | | while True: |
| | | time.sleep(0.1) |
| | |
| | | import hashlib |
| | | import json |
| | | import logging |
| | | import multiprocessing |
| | | import socket |
| | | import socketserver |
| | | import threading |
| | |
| | | super().finish() |
| | | |
| | | |
| | | def __set_target_codes(pipe_l2): |
| | | def __set_target_codes(queue_other_w_l2_r: multiprocessing.Queue): |
| | | logger_system.info("启动读取L2订阅代码队列") |
| | | while True: |
| | | try: |
| | |
| | | codes = [d[0] for d in datas] |
| | | for code in codes: |
| | | block_info.init_code(code) |
| | | root_data = {"data": {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, |
| | | "data": datas}, |
| | | "request_id": f"{ClientSocketManager.CLIENT_TYPE_CMD_L2}_{round(time.time() * 1000)}"} |
| | | root_data = socket_util.encryp_client_params_sign(root_data) |
| | | pipe_l2.send(json.dumps(root_data)) |
| | | root_data = {"type": ClientSocketManager.CLIENT_TYPE_CMD_L2, |
| | | "data": datas} |
| | | queue_other_w_l2_r.put_nowait(json.dumps(root_data)) |
| | | print("设置L2代码结束") |
| | | # 如果在9:24-9:30 需要加载板块 |
| | | if int("092400") < int(tool.get_now_time_str().replace(":", "")) < int("093000"): |
| | |
| | | time.sleep(1) |
| | | |
| | | |
| | | def run(pipe_server, pipe_l2): |
| | | def run(pipe_server, queue_other_w_l2_r): |
| | | logger_system.info("create TradeApiServer") |
| | | logger_system.info(f"trade_api_server 线程ID:{tool.get_thread_id()}") |
| | | # 拉取交易信息 |
| | | huaxin_trade_data_update.run() |
| | | # |
| | | t1 = threading.Thread(target=lambda: __set_target_codes(pipe_l2), daemon=True) |
| | | t1 = threading.Thread(target=lambda: __set_target_codes(queue_other_w_l2_r), daemon=True) |
| | | t1.start() |
| | | |
| | | t1 = threading.Thread(target=lambda: __read_sync_task(pipe_server), daemon=True) |
| | |
| | | GCancelBigNumComputer, SecondCancelBigNumComputer, LCancelRateManager, LatestCancelIndexManager |
| | | from l2.huaxin import huaxin_target_codes_manager |
| | | from l2.huaxin.huaxin_target_codes_manager import HuaXinL1TargetCodesManager |
| | | from l2.l2_data_listen_manager import L2DataListenManager |
| | | from l2.l2_data_util import L2DataUtil |
| | | from l2.l2_sell_manager import L2MarketSellManager |
| | | from l2.l2_transaction_data_manager import HuaXinTransactionDatasProcessor |
| | |
| | | def l2_order(cls, code, _datas, timestamp): |
| | | now_timestamp = int(time.time() * 1000) |
| | | async_log_util.info(hx_logger_l2_orderdetail, |
| | | f"{code}#耗时:{int(time.time() * 1000) - timestamp}-{now_timestamp}#{_datas}") |
| | | f"{code}#耗时:{int((time.time() - timestamp)*1000)}-{now_timestamp}#{_datas}") |
| | | thread_id = random.randint(0, 100000) |
| | | l2_log.threadIds[code] = thread_id |
| | | l2_data_log.l2_time_log(code, "开始处理L2逐笔委托") |
| | |
| | | time.sleep(2) |
| | | |
| | | |
| | | def __recv_pipe_l1(pipe_l1): |
| | | def __recv_pipe_l1(queue_l1_w_strategy_r: multiprocessing.Queue): |
| | | logger_system.info(f"trade_server __recv_pipe_l1 线程ID:{tool.get_thread_id()}") |
| | | if pipe_l1 is not None: |
| | | if queue_l1_w_strategy_r is not None: |
| | | while True: |
| | | try: |
| | | val = pipe_l1.recv() |
| | | val = queue_l1_w_strategy_r.get() |
| | | if val: |
| | | val = json.loads(val) |
| | | print("收到来自L1的数据:", val["type"]) |
| | |
| | | logging.exception(e) |
| | | self.send_response(json.dumps({"code": 1, "msg": f"数据处理出错:{e}"}), client_id, request_id) |
| | | |
| | | def OnGetActiveListenCount(self, client_id, request_id): |
| | | try: |
| | | order = l2DataListenManager.get_active_count(L2DataListenManager.TYPE_ORDER) |
| | | transaction = l2DataListenManager.get_active_count(L2DataListenManager.TYPE_TRANSACTION) |
| | | market = l2DataListenManager.get_active_count(L2DataListenManager.TYPE_MARKET) |
| | | result = {"code": 0, "data": {"order": order, "transaction": transaction, "market": market}} |
| | | self.send_response(result, client_id, request_id) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | self.send_response(json.dumps({"code": 1, "msg": f"数据处理出错:{e}"}), client_id, request_id) |
| | | |
| | | |
| | | class MyL2DataCallback(l2_data_transform_protocol.L2DataCallBack): |
| | | def OnL2Order(self, code, datas, timestamp): |
| | |
| | | def OnL2Transaction(self, code, datas): |
| | | TradeServerProcessor.l2_transaction(code, datas) |
| | | |
| | | def OnMarketData(self, code, datas): |
| | | TradeServerProcessor.l2_market_data(code, datas) |
| | | def OnMarketData(self, code, data): |
| | | TradeServerProcessor.l2_market_data(code, data) |
| | | |
| | | def OnTradingOrderCancel(self, code, buy_no): |
| | | TradeServerProcessor.trading_order_canceled(code, buy_no) |
| | |
| | | # 回调 |
| | | my_l2_data_callback = MyL2DataCallback() |
| | | my_trade_response = MyTradeResponse() |
| | | l2DataListenManager: L2DataListenManager = None |
| | | |
| | | |
| | | def run(queue_strategy_r_trade_w, pipe_l1, pipe_l2, queue_strategy_w_trade_r): |
| | | def run(queue_strategy_r_trade_w, queue_l1_w_strategy_r, queue_strategy_w_trade_r, order_queues, transaction_queues, |
| | | market_queue): |
| | | logger_system.info(f"trade_server 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | | # 执行一些初始化数据 |
| | |
| | | OutsideApiCommandCallback()) |
| | | manager.run(blocking=False) |
| | | |
| | | # 监听L2数据 |
| | | global l2DataListenManager |
| | | l2DataListenManager = L2DataListenManager(my_l2_data_callback) |
| | | l2DataListenManager.receive_l2_data(order_queues, transaction_queues, market_queue) |
| | | |
| | | # 启动交易服务 |
| | | huaxin_trade_api.run_pipe_trade(queue_strategy_r_trade_w, queue_strategy_w_trade_r) |
| | | |
| | | # 监听l1那边传过来的代码 |
| | | t1 = threading.Thread(target=lambda: __recv_pipe_l1(pipe_l1), daemon=True) |
| | | t1 = threading.Thread(target=lambda: __recv_pipe_l1(queue_l1_w_strategy_r), daemon=True) |
| | | t1.start() |
| | | |
| | | # 同步异步日志 |