| | |
| | | |
| | | # 买入的大单订单号 |
| | | |
| | | def __init__(self, api, l2_data_upload_manager: L2DataUploadManager): |
| | | def __init__(self, api, l2_data_upload_manager: L2DataUploadManager, processor_index): |
| | | lev2mdapi.CTORATstpLev2MdSpi.__init__(self) |
| | | self.__api = api |
| | | self.is_login = False |
| | | self.l2_data_upload_manager = l2_data_upload_manager |
| | | self.codes_volume_and_price_dict = {} |
| | | self.processor_index = processor_index |
| | | |
| | | def __split_codes(self, codes): |
| | | szse_codes = [] |
| | |
| | | |
| | | def __subscribe(self, _codes): |
| | | sh, sz = self.__split_codes(_codes) |
| | | logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}") |
| | | logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}") |
| | | logger_local_huaxin_l2_subscript.info(f"订阅上证({self.processor_index}):{sh}") |
| | | logger_local_huaxin_l2_subscript.info(f"订阅深证({self.processor_index}):{sz}") |
| | | if sh: |
| | | if ENABLE_NGST: |
| | | result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh({self.processor_index}):{result}") |
| | | else: |
| | | # 订阅逐笔委托 |
| | | result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh({self.processor_index}):{result}") |
| | | # 订阅逐笔成交 |
| | | result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh({self.processor_index}):{result}") |
| | | |
| | | result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh({self.processor_index}):{result}") |
| | | if sz: |
| | | result = self.__api.SubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sz:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sz({self.processor_index}):{result}") |
| | | result = self.__api.SubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sz({self.processor_index}):{result}") |
| | | result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz:{result}") |
| | | logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz({self.processor_index}):{result}") |
| | | |
| | | def __process_codes_data(self, codes_data, from_cache=False, delay=0.0): |
| | | |
| | |
| | | codes.add(code) |
| | | self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5]) |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5]) |
| | | logger_l2_codes_subscript.info("华鑫L2订阅总数:{}", len(codes)) |
| | | logger_l2_codes_subscript.info("华鑫L2订阅总数({}):{}", self.processor_index, len(codes)) |
| | | add_codes = codes - self.subscripted_codes |
| | | del_codes = self.subscripted_codes - codes |
| | | print("add del codes", add_codes, del_codes) |
| | |
| | | self.__unsubscribe(del_codes) |
| | | |
| | | if add_codes: |
| | | logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}") |
| | | logger_system.info(f"新增L2订阅代码数量({self.processor_index}) ({'缓存' if from_cache else ''}):{len(add_codes)}") |
| | | for c in add_codes: |
| | | logger_l2_codes_subscript.info(f"l2委托数据过滤条件:{c} - {self.codes_volume_and_price_dict.get(c)}") |
| | | logger_l2_codes_subscript.info( |
| | | f"l2委托数据过滤条件({self.processor_index}):{c} - {self.codes_volume_and_price_dict.get(c)}") |
| | | |
| | | logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes)) |
| | | logger_l2_codes_subscript.info("华鑫L2订阅结束({}),add-{} del-{}", self.processor_index, len(add_codes), |
| | | len(del_codes)) |
| | | |
| | | # 设置最近的代码列表 |
| | | self.latest_codes_set = codes |
| | |
| | | # 保存一份最新的数据 |
| | | self.__set_latest_datas(codes_data) |
| | | |
| | | @classmethod |
| | | def __set_latest_datas(cls, codes_data): |
| | | def __set_latest_datas(self, codes_data): |
| | | path_str = f"{constant.L2_CODES_INFO_PATH}_{self.processor_index}" |
| | | data_str = json.dumps([tool.get_now_date_str(), codes_data]) |
| | | with open(constant.L2_CODES_INFO_PATH, mode='w') as f: |
| | | with open(path_str, mode='w') as f: |
| | | f.write(data_str) |
| | | |
| | | @classmethod |
| | | def __get_latest_datas(cls): |
| | | if os.path.exists(constant.L2_CODES_INFO_PATH): |
| | | with open(constant.L2_CODES_INFO_PATH, mode='r') as f: |
| | | def __get_latest_datas(self): |
| | | path_str = f"{constant.L2_CODES_INFO_PATH}_{self.processor_index}" |
| | | if os.path.exists(path_str): |
| | | with open(path_str, mode='r') as f: |
| | | str_ = f.readline() |
| | | data_json = json.loads(str_) |
| | | if data_json[0] == tool.get_now_date_str(): |
| | |
| | | if pRspInfo['ErrorID'] == 0: |
| | | print("----L2行情登录成功----") |
| | | self.is_login = True |
| | | logger_system.info(f"L2行情登录成功") |
| | | logger_system.info(f"L2行情登录成功({self.processor_index})") |
| | | # 初始设置值 |
| | | if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") > 0: |
| | | threading.Thread( |
| | |
| | | def OnRspSubOrderDetail(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | print("OnRspSubOrderDetail", pRspInfo) |
| | | # try: |
| | | print("订阅结果:", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], pRspInfo["ErrorID"], |
| | | print(f"订阅结果({self.processor_index}):", pSpecificSecurity["ExchangeID"], pSpecificSecurity["SecurityID"], |
| | | pRspInfo["ErrorID"], |
| | | pRspInfo["ErrorMsg"]) |
| | | async_log_util.info(logger_local_huaxin_l2_subscript, |
| | | f"订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") |
| | | f"订阅结果({self.processor_index}):{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") |
| | | if pRspInfo["ErrorID"] == 0: |
| | | print("订阅成功") |
| | | self.subscripted_codes.add(pSpecificSecurity['SecurityID']) |
| | |
| | | |
| | | def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | async_log_util.info(logger_local_huaxin_l2_subscript, |
| | | f"NGTS订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") |
| | | f"NGTS订阅结果({self.processor_index}):{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") |
| | | if pRspInfo["ErrorID"] == 0: |
| | | print("订阅成功") |
| | | self.subscripted_codes.add(pSpecificSecurity['SecurityID']) |
| | |
| | | def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | try: |
| | | code = pSpecificSecurity['SecurityID'] |
| | | logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅:{code}") |
| | | logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅({self.processor_index}):{code}") |
| | | self.subscripted_codes.discard(code) |
| | | if bIsLast == 1: |
| | | print("取消订阅响应结束", self.subscripted_codes) |
| | |
| | | logging.exception(e) |
| | | |
| | | |
| | | def __init_l2(l2_data_upload_manager): |
| | | def __init_l2(l2_data_upload_manager, processor_index): |
| | | print(lev2mdapi.CTORATstpLev2MdApi_GetApiVersion()) |
| | | # case 1: Tcp方式 |
| | | # g_SubMode=lev2mdapi.TORA_TSTP_MST_TCP |
| | |
| | | # case 2非缓存模式 |
| | | # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) |
| | | global spi |
| | | spi = Lev2MdSpi(api, l2_data_upload_manager) |
| | | spi = Lev2MdSpi(api, l2_data_upload_manager, processor_index) |
| | | api.RegisterSpi(spi) |
| | | # -------------------正式模式------------------------------------- |
| | | if g_SubMode != lev2mdapi.TORA_TSTP_MST_MCAST: |
| | |
| | | time.sleep(10) |
| | | |
| | | |
| | | def run(queue_r: multiprocessing.Queue, queue_data_callback: multiprocessing.Queue, channel_list: list) -> None: |
| | | def run(queue_r: multiprocessing.Queue, queue_data_callback: multiprocessing.Queue, channel_list: list, |
| | | processor_index) -> None: |
| | | """ |
| | | 运行 |
| | | @param queue_r: |
| | | @param queue_data_callback: 低频数据回调队列 |
| | | @param channel_list: [((编号,multiprocessing.Array, zmq_address),(编号, multiprocessing.Array, zmq_address))] |
| | | @param processor_index:处理器索引 |
| | | @return: |
| | | """ |
| | | logger_system.info("L2进程ID:{}", os.getpid()) |
| | |
| | | threading.Thread(target=SubscriptDefend.run, daemon=True).start() |
| | | # 初始化 |
| | | data_channel_distribute_manager = CodeDataChannelDistributeManager(channel_list) |
| | | l2_data_upload_manager = L2DataUploadManager(data_channel_distribute_manager) |
| | | __init_l2(l2_data_upload_manager) |
| | | l2_data_upload_manager = L2DataUploadManager(data_channel_distribute_manager, queue_data_callback) |
| | | __init_l2(l2_data_upload_manager, processor_index) |
| | | l2_data_manager_v2.run_upload_common() |
| | | l2_data_manager_v2.run_log() |
| | | # TODO 测试 |
| | |
| | | import json |
| | | import logging |
| | | import marshal |
| | | import multiprocessing |
| | | import queue |
| | | import threading |
| | | import time |
| | |
| | | |
| | | # L2上传数据管理器 |
| | | class L2DataUploadManager: |
| | | """ |
| | | L2逐笔委托/L2逐笔成交:通过共享内存+ZMQ上传数据 |
| | | L2市场行情: 通过普通数据上传队列写入 |
| | | L2订阅的代码: 通过普通数据上传队列写入 |
| | | """ |
| | | |
| | | TYPE_DELEGATE = 1 |
| | | TYPE_TRANSACTION = 2 |
| | | TYPE_MARKET = 3 |
| | | |
| | | def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager): |
| | | def __init__(self, data_channel_distribute_manager: CodeDataChannelDistributeManager, |
| | | common_data_upload_queue: multiprocessing.Queue): |
| | | self.data_channel_distribute_manager = data_channel_distribute_manager |
| | | self.common_data_upload_queue = common_data_upload_queue |
| | | # 代码分配的对象 |
| | | self.temp_order_queue_dict = {} |
| | | self.temp_transaction_queue_dict = {} |
| | |
| | | data['SellNo'], data['ExecType'], time.time())) |
| | | |
| | | def add_market_data(self, data): |
| | | # 加入上传队列 |
| | | # self.market_data_queue.put_nowait(data) |
| | | code = data['securityID'] |
| | | # TODO 改为zmq发送 |
| | | callback = self.data_channel_distribute_manager.get_distributed_channel(code) |
| | | if callback: |
| | | callback.OnMarketData(code, data) |
| | | # 改为队列回调发送 |
| | | self.common_data_upload_queue.put_nowait({"type": "l2_market", "data": (code, data)}) |
| | | |
| | | # 分配上传队列 |
| | | def distribute_upload_queue(self, code, _target_codes=None): |
| | |
| | | |
| | | if code in self.upload_l2_data_task_dict: |
| | | self.upload_l2_data_task_dict.pop(code) |
| | | |
| | | def __upload_l2_data(self, code, _queue, datas): |
| | | _queue.put_nowait(marshal.dumps([code, datas, time.time()])) |
| | | |
| | | # 处理订单数据并上传 |
| | | def __run_upload_order_task(self, code): |
| | |
| | | (code, temp_list)) |
| | | # 通知获取数据 |
| | | _socket = self.data_channel_distribute_manager.get_zmq_socket(zmq_host) |
| | | _socket.send(msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}})) |
| | | _socket.send( |
| | | msgpack.packb({"type": self.TYPE_TRANSACTION, "data": {"memery_number": shared_memery_number}})) |
| | | _socket.recv_string() |
| | | temp_list = [] |
| | | else: |
| | |
| | | L2成交数据处理器 |
| | | """ |
| | | import json |
| | | import time |
| | | |
| | | import l2_data_util |
| | | from db import redis_manager_delegate as redis_manager |
| | |
| | | |
| | | from log_module import async_log_util, log_export |
| | | from log_module.log import hx_logger_l2_transaction_desc, hx_logger_l2_transaction_sell_order, hx_logger_l2_active_sell, \ |
| | | hx_logger_l2_transaction_big_buy_order, hx_logger_l2_transaction_big_sell_order |
| | | hx_logger_l2_transaction_big_buy_order, hx_logger_l2_transaction_big_sell_order, hx_logger_l2_upload |
| | | |
| | | from utils import tool |
| | | |
| | |
| | | # 是否为主动卖 |
| | | def is_active_sell(sell_no, buy_no): |
| | | return sell_no > buy_no |
| | | |
| | | f_start_time = time.time() |
| | | use_time_list = [] |
| | | # q.append((data['SecurityID'], data['TradePrice'], data['TradeVolume'], |
| | | # data['OrderTime'], data['MainSeq'], data['SubSeq'], data['BuyNo'], |
| | | # data['SellNo'], data['ExecType'])) |
| | |
| | | if code not in cls.__latest_all_sell_orders_dict: |
| | | cls.__latest_all_sell_orders_dict[code] = [] |
| | | |
| | | sell_no_map = local_today_sellno_map.get(code) |
| | | total_datas = local_today_datas.get(code) |
| | | if not sell_no_map: |
| | | sell_no_map = {} |
| | | |
| | | # 保存最近的成交价格:(价格,成交时间) |
| | | cls.__latest_trade_price_dict[code] = (datas[-1][1], datas[-1][3]) |
| | | |
| | | __start_time = time.time() |
| | | # 是否还有涨停卖剩下 |
| | | no_left_limit_up_sell = L2TradeSingleDataProcessor.process_passive_limit_up_sell_data(code, datas, limit_up_price) |
| | | use_time = time.time() - __start_time |
| | | __start_time = time.time() |
| | | use_time_list.append(("处理涨停卖", use_time)) |
| | | |
| | | async_log_util.info(hx_logger_l2_upload, |
| | | f"{code}处理涨停卖:{use_time} 数据数量:{len(datas)} 详情:{use_time_list}") |
| | | |
| | | |
| | | |
| | | for d in datas: |
| | | # 获取当前是否为主动买 |
| | |
| | | finally: |
| | | cls.__last_trade_data_dict[code] = d |
| | | |
| | | use_time = time.time() - __start_time |
| | | __start_time = time.time() |
| | | use_time_list.append(("大单统计", use_time)) |
| | | |
| | | latest_time = l2_huaxin_util.convert_time(datas[-1][3], with_ms=True) |
| | | min_time = tool.trade_time_add_millionsecond(latest_time, -1000) |
| | | min_time_int = int(min_time.replace(":", "").replace(".", "")) |
| | |
| | | total_sell_info[0] += int(latest_sell_order_info[1] * latest_sell_order_info[2]) |
| | | big_sell_orders.reverse() |
| | | total_sell_info[1] = big_sell_orders |
| | | |
| | | use_time = time.time() - __start_time |
| | | __start_time = time.time() |
| | | use_time_list.append(("最近大单统计", use_time)) |
| | | |
| | | # ----------------统计涨停主动买----------------- |
| | | try: |
| | | limit_up_active_buy_datas = [] |
| | |
| | | # 有涨停主动买 |
| | | limit_up_active_buy_datas.append(d) |
| | | L2TradeSingleDataManager.set_limit_up_active_buy(code, limit_up_active_buy_datas, no_left_limit_up_sell) |
| | | |
| | | use_time = time.time() - __start_time |
| | | __start_time = time.time() |
| | | use_time_list.append(("涨停主动买成交", use_time)) |
| | | except: |
| | | pass |
| | | |
| | | use_time = time.time() - f_start_time |
| | | if use_time > 0.01: |
| | | async_log_util.info(hx_logger_l2_upload, |
| | | f"{code}处理成交详细用时:{use_time} 数据数量:{len(datas)} 详情:{use_time_list}") |
| | | return total_sell_info |
| | | |
| | | # 获取最近成交数据 |
| | |
| | | import math |
| | | import multiprocessing |
| | | import random |
| | | import threading |
| | | |
| | | import msgpack |
| | | import zmq |
| | | |
| | | from huaxin_client import l2_data_transform_protocol |
| | | from utils import shared_memery_util |
| | | |
| | | process_manager = None |
| | | |
| | | |
| | | class TargetCodeProcessManager: |
| | |
| | | # 代码所在队列ID |
| | | self.__code_queue_dict = {} |
| | | |
| | | def add_codes(self, codes: set): |
| | | def set_codes(self, codes: set): |
| | | """ |
| | | 设置订阅代码 |
| | | @param codes: |
| | | @return: 返回队列与对应分配的代码:[(队列对象, {"代码1","代码2"}),...] |
| | | """ |
| | | add_codes = codes - self.__code_queue_dict.keys() |
| | | del_codes = self.__code_queue_dict.keys() - codes |
| | | # 删除代码 |
| | |
| | | for code in add_codes: |
| | | # 寻找未满的队列 |
| | | for queue_id in self.__queue_codes: |
| | | count_per_process = min(self.__max_code_count_per_queue_dict.get(queue_id), math.ceil(len(codes) / len(self.__com_queues))) |
| | | count_per_process = self.__max_code_count_per_queue_dict.get(queue_id) |
| | | if len(self.__queue_codes[queue_id]) >= count_per_process: |
| | | # 队列已满 |
| | | continue |
| | |
| | | self.__queue_codes[queue_id].add(code) |
| | | self.__code_queue_dict[code] = queue_id |
| | | break |
| | | return [(self.__com_queue_id_object_dict.get(queue_id), self.__queue_codes[queue_id]) for queue_id in |
| | | self.__queue_codes] |
| | | |
| | | def get_queues_with_codes(self): |
| | | """ |
| | |
| | | return results |
| | | |
| | | |
| | | class L2DataListener: |
| | | """ |
| | | L2数据监听 |
| | | """ |
| | | |
| | | def __init__(self, channel_list): |
| | | """ |
| | | |
| | | @param channel_list:channel_list:[((共享内存编号,委托共享内存数组, zmq地址),(共享内存编号,成交共享内存数组, zmq地址))] |
| | | """ |
| | | self.channel_list = channel_list |
| | | # 设置共享内存编号与共享内存数组映射 |
| | | self.shared_memery_num_object_dict = {} |
| | | for channel in self.channel_list: |
| | | self.shared_memery_num_object_dict[channel[0][0]] = channel[0][1] |
| | | self.shared_memery_num_object_dict[channel[1][0]] = channel[1][1] |
| | | |
| | | def create_data_listener(self, l2_data_callback: l2_data_transform_protocol.L2DataCallBack): |
| | | """ |
| | | 创建数据监听器 |
| | | @param |
| | | @return: |
| | | """ |
| | | for channel in self.channel_list: |
| | | channel_delegate = channel[0] |
| | | channel_deal = channel[1] |
| | | threading.Thread(target=self.__create_l2_zmq_server, args=(channel_delegate[2], l2_data_callback,), |
| | | daemon=True).start() |
| | | threading.Thread(target=self.__create_l2_zmq_server, args=(channel_deal[2], l2_data_callback,), |
| | | daemon=True).start() |
| | | |
| | | def __create_l2_zmq_server(self, ipc_addr, l2_data_callback: l2_data_transform_protocol.L2DataCallBack): |
| | | """ |
| | | 创建L2逐笔委托/成交zmq服务 |
| | | @param ipc_addr: |
| | | @return: |
| | | """ |
| | | context = zmq.Context() |
| | | socket = context.socket(zmq.REP) |
| | | socket.bind(ipc_addr) |
| | | while True: |
| | | data = socket.recv() |
| | | try: |
| | | #接收数据 |
| | | data = msgpack.unpackb(data) |
| | | shared_memery_id = data["data"]["memery_number"] |
| | | datas = shared_memery_util.read_datas(self.shared_memery_num_object_dict.get(shared_memery_id)) |
| | | if data["type"] == 1: |
| | | # 委托 |
| | | code, data_list, timestamp = datas[0], datas[1], datas[2] |
| | | l2_data_callback.OnL2Order(code, data_list, timestamp) |
| | | elif data["type"] == 2: |
| | | # 成交 |
| | | code, data_list = datas[0], datas[1] |
| | | l2_data_callback.OnL2Transaction(code, data_list) |
| | | except Exception as e: |
| | | pass |
| | | finally: |
| | | socket.send_string("SUCCESS") |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | queues = [multiprocessing.Queue() for i in range(7)] |
| | | manager = TargetCodeProcessManager(queues, 10) |
| | |
| | | |
| | | import constant |
| | | from code_attribute import gpcode_manager |
| | | from l2.subscript import l2_subscript_manager |
| | | from log_module import log |
| | | from log_module.log import logger_l2_trade, logger_system |
| | | import logging |
| | |
| | | # ======分组====== |
| | | # 记录每个分组的数量 |
| | | channel_count_list = [] |
| | | # 数据回调队列 |
| | | data_callback_queue_list = [] |
| | | # 消息传递队列 |
| | | sub_single_queue_list = [] |
| | | |
| | | for i in range(l2_process_count): |
| | | channel_count = base_channel_count + (1 if i < left_count else 0) |
| | | channel_count_list.append(channel_count) |
| | |
| | | index += channel_count |
| | | # 订阅信号队列, 数据回调队列(回调频次小的数据通过这种回调) |
| | | sub_single_queue, data_callback_queue = multiprocessing.Queue(), multiprocessing.Queue() |
| | | sub_single_queue_list.append(sub_single_queue) |
| | | data_callback_queue_list.append(data_callback_queue) |
| | | l2_process = multiprocessing.Process(target=l2_client_v2.run, |
| | | args=(sub_single_queue, data_callback_queue, channels,)) |
| | | args=(sub_single_queue, data_callback_queue, channels, i, )) |
| | | l2_process.start() |
| | | |
| | | l2_subscript_manager.process_manager = l2_subscript_manager.TargetCodeProcessManager(sub_single_queue_list, channel_count_list) |
| | | # 监听L2市场行情数据 |
| | | huaxin_trade_server.run_l2_market_info_reciever(data_callback_queue_list) |
| | | # 启动ZMQserver,针对委托队列与成交队列进行监听 |
| | | l2_subscript_manager.L2DataListener(channel_list).create_data_listener(huaxin_trade_server.my_l2_data_callback) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | # 可绑定16-31之间的核 |
| | |
| | | my_trade_response = MyTradeResponse() |
| | | |
| | | |
| | | def run_l2_market_info_reciever(queues: list): |
| | | """ |
| | | 接收L2 market数据 |
| | | @param queues: |
| | | @return: |
| | | """ |
| | | def recieve_data(queue): |
| | | while True: |
| | | try: |
| | | d = queue.get() |
| | | # {"type": "l2_market", "data": (code, data)} |
| | | if d["type"] == "l2_market": |
| | | code, market_data = d["data"] |
| | | my_l2_data_callback.OnMarketData(code, market_data) |
| | | except: |
| | | pass |
| | | for q in queues: |
| | | threading.Thread(target=recieve_data, args=(q,), daemon=True).start() |
| | | |
| | | |
| | | # 预埋单 |
| | | def __test_pre_place_order(): |
| | | codes = gpcode_manager.BuyOpenLimitUpCodeManager().get_codes() |