| | |
| | | import queue |
| | | import threading |
| | | import time |
| | | from typing import List |
| | | import concurrent.futures |
| | | |
| | | from huaxin_client import command_manager, l2_data_transform_protocol |
| | | from huaxin_client import command_manager |
| | | from huaxin_client import constant |
| | | from huaxin_client import l2_data_manager |
| | | import lev2mdapi |
| | | from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager |
| | | from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager |
| | | from huaxin_client.command_manager import L2ActionCallback |
| | | from huaxin_client.l2_data_manager import L2DataUploadManager |
| | | from log_module import log, async_log_util |
| | | from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \ |
| | | logger_local_huaxin_g_cancel, logger_l2_codes_subscript |
| | | from log_module.async_log_util import huaxin_l2_log |
| | | from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug |
| | | from utils import tool |
| | | |
| | | ###B类### |
| | | Front_Address = "tcp://10.0.1.101:6900" |
| | | Multicast_Address = "udp://224.224.2.19:7889" |
| | | Multicast_Address2 = "udp://224.224.224.234:7890" |
| | | Local_Interface_Address = "192.168.84.75" |
| | | Local_Interface_Address = constant.LOCAL_IP |
| | | |
| | | ###A类### |
| | | if constant.IS_A: |
| | |
| | | SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725", |
| | | b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"] |
| | | SZ_Bond_Securities = [b"100303", b"109559", b"112617"] |
| | | set_codes_data_queue = queue.Queue() |
| | | market_code_dict = {} |
| | | set_codes_data_queue = queue.Queue(maxsize=1000) |
| | | |
| | | ENABLE_NGST = True |
| | | |
| | | |
| | | class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): |
| | |
| | | szse_codes = [] |
| | | sse_codes = [] |
| | | for code in codes: |
| | | if code.find("00") == 0: |
| | | market_type = tool.get_market_type(code) |
| | | if market_type == tool.MARKET_TYPE_SZSE: |
| | | szse_codes.append(code.encode()) |
| | | elif code.find("60") == 0: |
| | | elif market_type == tool.MARKET_TYPE_SSE: |
| | | sse_codes.append(code.encode()) |
| | | return sse_codes, szse_codes |
| | | |
| | |
| | | logger_local_huaxin_l2_subscript.info(f"取消订阅上证:{sh}") |
| | | logger_local_huaxin_l2_subscript.info(f"取消订阅深证:{sz}") |
| | | if sh: |
| | | # 取消订阅逐笔委托 |
| | | self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | # 取消订阅逐笔成交 |
| | | self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | if ENABLE_NGST: |
| | | result = self.__api.UnSubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}") |
| | | else: |
| | | # 取消订阅逐笔委托 |
| | | self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | # 取消订阅逐笔成交 |
| | | self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | if sz: |
| | | self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | |
| | | def subscribe_codes(self, _codes): |
| | | self.__subscribe(_codes) |
| | | |
| | | def __subscribe(self, _codes): |
| | | sh, sz = self.__split_codes(_codes) |
| | | logger_local_huaxin_l2_subscript.info(f"订阅上证:{sh}") |
| | | logger_local_huaxin_l2_subscript.info(f"订阅深证:{sz}") |
| | | if sh: |
| | | # 订阅逐笔委托 |
| | | result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh:{result}") |
| | | # 订阅逐笔成交 |
| | | result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}") |
| | | if ENABLE_NGST: |
| | | result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔NGTS订阅结果sh:{result}") |
| | | else: |
| | | # 订阅逐笔委托 |
| | | result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔委托订阅结果sh:{result}") |
| | | # 订阅逐笔成交 |
| | | result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"逐笔成交订阅结果sh:{result}") |
| | | |
| | | result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) |
| | | logger_local_huaxin_l2_subscript.info(f"市场订阅结果sh:{result}") |
| | |
| | | result = self.__api.SubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) |
| | | logger_local_huaxin_l2_subscript.info(f"市场订阅结果sz:{result}") |
| | | |
| | | def __process_codes_data(self, codes_data, from_cache=False, delay=0): |
| | | def __process_codes_data(self, codes_data, from_cache=False, delay=0.0): |
| | | |
| | | if from_cache and self.codes_volume_and_price_dict: |
| | | return |
| | | |
| | | if not self.is_login and not constant.TEST: |
| | | raise Exception("L2尚未登录") |
| | |
| | | for d in codes_data: |
| | | code = d[0] |
| | | codes.add(code) |
| | | self.codes_volume_and_price_dict[code] = (d[1], d[2]) |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], float(d[2])) |
| | | self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5]) |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5]) |
| | | logger_l2_codes_subscript.info("华鑫L2订阅总数:{}", len(codes)) |
| | | add_codes = codes - self.subscripted_codes |
| | | del_codes = self.subscripted_codes - codes |
| | | print("add del codes", add_codes, del_codes) |
| | |
| | | self.l2_data_upload_manager.release_distributed_upload_queue(c) |
| | | l2_data_manager.del_target_code(c) |
| | | for c in codes: |
| | | self.l2_data_upload_manager.distribute_upload_queue(c) |
| | | self.l2_data_upload_manager.distribute_upload_queue(c, codes) |
| | | l2_data_manager.add_target_code(c) |
| | | except Exception as e: |
| | | except Exception as e: # TODO 清除原来还没释放掉的数据 |
| | | logger_system.error(f"L2代码分配上传队列出错:{str(e)}") |
| | | logger_system.exception(e) |
| | | self.__subscribe(add_codes) |
| | |
| | | |
| | | if add_codes: |
| | | logger_system.info(f"新增L2订阅代码数量({'缓存' if from_cache else ''}):{len(add_codes)}") |
| | | for c in add_codes: |
| | | logger_l2_codes_subscript.info(f"l2委托数据过滤条件:{c} - {self.codes_volume_and_price_dict.get(c)}") |
| | | |
| | | logger_l2_codes_subscript.info("华鑫L2订阅结束,add-{} del-{}", len(add_codes), len(del_codes)) |
| | | |
| | |
| | | return data_json[1] |
| | | return [] |
| | | |
| | | def set_code_special_watch_volume(self, code, volume): |
| | | # 有效期为3s |
| | | # self.special_code_volume_for_order_dict[code] = (volume, time.time() + 3) |
| | | d = self.codes_volume_and_price_dict.get(code) |
| | | if d: |
| | | min_volume, limit_up_price = d[0], d[1] |
| | | self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price, |
| | | {volume, constant.SHADOW_ORDER_VOLUME}, |
| | | time.time() + 3) |
| | | |
| | | async_log_util.info(logger_local_huaxin_l2_subscript, f"设置下单量监听:{code}-{volume}") |
| | | |
| | | def OnFrontConnected(self): |
| | | print("OnFrontConnected") |
| | | logger_system.info(f"l2_client OnFrontConnected 线程ID:{tool.get_thread_id()}") |
| | |
| | | if pRspInfo['ErrorID'] == 0: |
| | | print("----L2行情登录成功----") |
| | | self.is_login = True |
| | | logger_system.info(f"L2行情登录成功") |
| | | # 初始设置值 |
| | | t1 = threading.Thread( |
| | | target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=6), |
| | | daemon=True) |
| | | # 后台运行 |
| | | t1.start() |
| | | if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") > 0: |
| | | threading.Thread( |
| | | target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=60), |
| | | daemon=True).start() |
| | | |
| | | def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | print("OnRspSubMarketData") |
| | |
| | | if pRspInfo["ErrorID"] == 0: |
| | | print("订阅成功") |
| | | self.subscripted_codes.add(pSpecificSecurity['SecurityID']) |
| | | # 初始化 |
| | | SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) |
| | | if bIsLast == 1: |
| | | print("订阅响应结束", self.subscripted_codes) |
| | | l2_data_manager.add_subscript_codes(self.subscripted_codes) |
| | |
| | | print("OnRspUnSubOrderDetail", bIsLast) |
| | | try: |
| | | code = pSpecificSecurity['SecurityID'] |
| | | self.subscripted_codes.discard(code) |
| | | if bIsLast == 1: |
| | | print("取消订阅响应结束", self.subscripted_codes) |
| | | l2_data_manager.add_subscript_codes(self.subscripted_codes) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | | def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | async_log_util.info(logger_local_huaxin_l2_subscript, |
| | | f"NGTS订阅结果:{pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") |
| | | if pRspInfo["ErrorID"] == 0: |
| | | print("订阅成功") |
| | | self.subscripted_codes.add(pSpecificSecurity['SecurityID']) |
| | | # 初始化 |
| | | SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) |
| | | if bIsLast == 1: |
| | | print("订阅响应结束", self.subscripted_codes) |
| | | l2_data_manager.add_subscript_codes(self.subscripted_codes) |
| | | |
| | | def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | try: |
| | | code = pSpecificSecurity['SecurityID'] |
| | | logger_local_huaxin_l2_subscript.info(f"NGTS取消订阅:{code}") |
| | | self.subscripted_codes.discard(code) |
| | | if bIsLast == 1: |
| | | print("取消订阅响应结束", self.subscripted_codes) |
| | |
| | | def OnRspSubXTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | print("OnRspSubXTSTick") |
| | | |
| | | # 4.0.5版本接口 |
| | | def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): |
| | | print("OnRspSubNGTSTick") |
| | | |
| | | def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, |
| | | FirstLevelSellOrderVolumes): |
| | | # 传入:时间,现价,成交总量,买1,买2,买3,买4,买5,卖1,卖2,卖3,卖4,卖5 |
| | | try: |
| | | buys = [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']), |
| | | (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']), |
| | | (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']), |
| | | (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']), |
| | | (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])] |
| | | for i in range(6, 11): |
| | | if not pDepthMarketData[f"BidVolume{i}"]: |
| | | break |
| | | buys.append((pDepthMarketData[f'BidPrice{i}'], pDepthMarketData[f'BidVolume{i}'])) |
| | | |
| | | sells = [ |
| | | (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']), |
| | | (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']), |
| | | (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']), |
| | | (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']), |
| | | (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) |
| | | ] |
| | | for i in range(6, 11): |
| | | if not pDepthMarketData[f"AskVolume{i}"]: |
| | | break |
| | | sells.append((pDepthMarketData[f'AskPrice{i}'], pDepthMarketData[f'AskVolume{i}'])) |
| | | |
| | | d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'], |
| | | "lastPrice": pDepthMarketData['LastPrice'], |
| | | "totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'], |
| | | "totalValueTrade": pDepthMarketData['TotalValueTrade'], |
| | | "totalAskVolume": pDepthMarketData['TotalAskVolume'], |
| | | "avgAskPrice": pDepthMarketData["AvgAskPrice"], |
| | | "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']), |
| | | (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']), |
| | | (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']), |
| | | (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']), |
| | | (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])], |
| | | "sell": [ |
| | | (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']), |
| | | (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']), |
| | | (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']), |
| | | (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']), |
| | | (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) |
| | | ]} |
| | | market_code_dict[pDepthMarketData['SecurityID']] = time.time() |
| | | "buy": buys, |
| | | "sell": sells} |
| | | self.l2_data_upload_manager.add_market_data(d) |
| | | SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID']) |
| | | except: |
| | | pass |
| | | |
| | |
| | | # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) |
| | | # 输出逐笔成交数据 |
| | | if pTransaction['ExecType'] == b"2": |
| | | # if min_volume is None: |
| | | # # 默认筛选50w |
| | | # if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000: |
| | | # return |
| | | # elif pTransaction['TradeVolume'] < min_volume: |
| | | # return |
| | | # 撤单 |
| | | item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'], |
| | | "Volume": pTransaction['TradeVolume'], |
| | |
| | | self.l2_data_upload_manager.add_transaction_detail(item) |
| | | |
| | | def OnRtnOrderDetail(self, pOrderDetail): |
| | | # can_listen = False |
| | | # code = str(pOrderDetail['SecurityID']) |
| | | # start_time = 0 |
| | | # if code in self.special_code_volume_for_order_dict: |
| | | # start_time = time.time() |
| | | # if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[ |
| | | # 'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']: |
| | | # # 监控目标订单与影子订单 |
| | | # if self.special_code_volume_for_order_dict[code][1] > time.time(): |
| | | # # 特殊量监听 |
| | | # can_listen = True |
| | | # else: |
| | | # self.special_code_volume_for_order_dict.pop(code) |
| | | # if not can_listen: |
| | | # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) |
| | | # if min_volume is None: |
| | | # # 默认筛选50w |
| | | # if pOrderDetail['Price'] * pOrderDetail['Volume'] < 500000: |
| | | # return |
| | | # elif pOrderDetail['Volume'] < min_volume: |
| | | # return |
| | | # 输出逐笔委托数据 |
| | | # 上证OrderStatus=b"D"表示撤单 |
| | | item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'], |
| | | "Volume": pOrderDetail['Volume'], |
| | |
| | | "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], |
| | | "OrderStatus": pOrderDetail['OrderStatus'].decode()} |
| | | self.l2_data_upload_manager.add_l2_order_detail(item, 0) |
| | | |
| | | def OnRtnNGTSTick(self, pTick): |
| | | """ |
| | | 上证股票的逐笔委托与成交 |
| | | @param pTick: |
| | | @return: |
| | | """ |
| | | try: |
| | | if pTick['TickType'] == b'T': |
| | | # 成交 |
| | | item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'], |
| | | "TradeVolume": pTick['Volume'], |
| | | "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'], |
| | | "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'], |
| | | "SellNo": pTick['SellNo'], |
| | | "ExecType": '1'} |
| | | self.l2_data_upload_manager.add_transaction_detail(item) |
| | | elif pTick['TickType'] == b'A' or pTick['TickType'] == b'D': |
| | | # 撤单 |
| | | item = {"SecurityID": pTick['SecurityID'], "Price": pTick['Price'], |
| | | "Volume": pTick['Volume'], |
| | | "Side": pTick['Side'].decode(), "OrderType": pTick['TickType'].decode(), |
| | | "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'], |
| | | "SubSeq": pTick['SubSeq'], "OrderNO": '', |
| | | "OrderStatus": pTick['TickType'].decode()} |
| | | if pTick['Side'] == b'1': |
| | | item['OrderNO'] = pTick['BuyNo'] |
| | | elif pTick['Side'] == b'2': |
| | | item['OrderNO'] = pTick['SellNo'] |
| | | self.l2_data_upload_manager.add_l2_order_detail(item, 0) |
| | | except Exception as e: |
| | | logger_local_huaxin_l2_subscript.exception(e) |
| | | |
| | | def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, |
| | | FirstLevelSellOrderVolumes): |
| | |
| | | for sell_index in range(0, FirstLevelSellNum): |
| | | print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index])) |
| | | |
| | | def OnRtnXTSTick(self, pTick): |
| | | # 输出上海债券逐笔数据’ |
| | | print( |
| | | "OnXTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % ( |
| | | pTick['TickType'], |
| | | pTick['SecurityID'], |
| | | pTick['Price'], |
| | | pTick['Volume'], |
| | | pTick['TickTime'], |
| | | pTick['MainSeq'], |
| | | pTick['SubSeq'], |
| | | pTick['BuyNo'], |
| | | pTick['SellNo'])) |
| | | |
| | | def OnRtnNGTSTick(self, pTick): |
| | | # 输出上海股基逐笔数据’ |
| | | print( |
| | | "OnRtnNGTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % ( |
| | | pTick['TickType'], |
| | | pTick['SecurityID'], |
| | | pTick['Price'], |
| | | pTick['Volume'], |
| | | pTick['TickTime'], |
| | | pTick['MainSeq'], |
| | | pTick['SubSeq'], |
| | | pTick['BuyNo'], |
| | | pTick['SellNo'])) |
| | | class SubscriptDefend: |
| | | """ |
| | | 订阅守护 |
| | | 定义:当订阅的代码超过一定时间没有回调数据时重新订阅 |
| | | """ |
| | | __l2_market_update_time = {} |
| | | |
| | | @classmethod |
| | | def set_l2_market_update(cls, code): |
| | | cls.__l2_market_update_time[code] = time.time() |
| | | |
| | | @classmethod |
| | | def run(cls): |
| | | while True: |
| | | try: |
| | | now_time = tool.get_now_time_as_int() |
| | | if now_time < int("093015"): |
| | | continue |
| | | if int("112945") < now_time < int("130015"): |
| | | continue |
| | | if int("145645") < now_time: |
| | | continue |
| | | if spi.subscripted_codes: |
| | | codes = [] |
| | | for code in spi.subscripted_codes: |
| | | # 获取上次更新时间 |
| | | update_time = cls.__l2_market_update_time.get(code) |
| | | if update_time and time.time() - update_time > 15: |
| | | # 需要重新订阅 |
| | | codes.append(code) |
| | | if codes: |
| | | logger_debug.info(f"重新订阅:{codes}") |
| | | spi.subscribe_codes(codes) |
| | | except: |
| | | pass |
| | | finally: |
| | | time.sleep(15) |
| | | |
| | | |
| | | class MyL2ActionCallback(L2ActionCallback): |
| | | |
| | | def OnSetL2Position(self, codes_data): |
| | | print("L2订阅数量:", len(codes_data)) |
| | | logger_l2_codes_subscript.info("华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data)) |
| | | huaxin_l2_log.info(logger_l2_codes_subscript, "华鑫L2代码处理队列获取到数据:数量-{}", len(codes_data)) |
| | | try: |
| | | spi.set_codes_data(codes_data) |
| | | except Exception as e: |
| | |
| | | g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST |
| | | |
| | | # case 1缓存模式 |
| | | # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True) |
| | | api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True) |
| | | # case 2非缓存模式 |
| | | api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) |
| | | # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) |
| | | global spi |
| | | spi = Lev2MdSpi(api, l2_data_upload_manager) |
| | | api.RegisterSpi(spi) |
| | |
| | | api.Init() |
| | | |
| | | |
| | | __l2_cmd_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=3) |
| | | |
| | | |
| | | def __receive_from_queue_trade(queue_trade_w_l2_r: multiprocessing.Queue): |
| | | logger_system.info(f"l2_client __receive_from_pipe_trade 线程ID:{tool.get_thread_id()}") |
| | | while True: |
| | |
| | | value = value.decode("utf-8") |
| | | data = json.loads(value) |
| | | _type = data["type"] |
| | | if _type == "listen_volume": |
| | | volume = data["data"]["volume"] |
| | | code = data["data"]["code"] |
| | | spi.set_code_special_watch_volume(code, volume) |
| | | elif _type == "l2_cmd": |
| | | l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | if _type == "l2_cmd": |
| | | __start_time = time.time() |
| | | # 线程池 |
| | | __l2_cmd_thread_pool.submit( |
| | | lambda: l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data)) |
| | | use_time = time.time() - __start_time |
| | | if use_time > 0.005: |
| | | huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"l2_cmd耗时:{use_time}s") |
| | | |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | |
| | | pipe_strategy = None |
| | | |
| | | |
| | | def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue], |
| | | transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None: |
| | | # def test_add_codes(): |
| | | # time.sleep(5) |
| | | # # if value: |
| | | # # if type(value) == bytes: |
| | | # # value = value.decode("utf-8") |
| | | # # data = json.loads(value) |
| | | # # _type = data["type"] |
| | | # # if _type == "listen_volume": |
| | | # # volume = data["data"]["volume"] |
| | | # # code = data["data"]["code"] |
| | | # # spi.set_code_special_watch_volume(code, volume) |
| | | # # elif _type == "l2_cmd": |
| | | # # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | # |
| | | # demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59), |
| | | # ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)] |
| | | # |
| | | # queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) |
| | | # time.sleep(1) |
| | | # |
| | | # spi.l2_data_upload_manager.add_l2_order_detail( |
| | | # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', |
| | | # 'OrderTime': '13000015', |
| | | # 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) |
| | | # spi.l2_data_upload_manager.add_l2_order_detail( |
| | | # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', |
| | | # 'OrderTime': '13000015', |
| | | # 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) |
| | | # time.sleep(0.1) |
| | | # spi.l2_data_upload_manager.add_l2_order_detail( |
| | | # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', |
| | | # 'OrderTime': '13000015', |
| | | # 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | def test_add_codes(queue_r): |
| | | time.sleep(10) |
| | | # if value: |
| | | # if type(value) == bytes: |
| | | # value = value.decode("utf-8") |
| | | # data = json.loads(value) |
| | | # _type = data["type"] |
| | | # if _type == "listen_volume": |
| | | # volume = data["data"]["volume"] |
| | | # code = data["data"]["code"] |
| | | # spi.set_code_special_watch_volume(code, volume) |
| | | # elif _type == "l2_cmd": |
| | | # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) |
| | | |
| | | demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35, 6.00, 200), |
| | | ("002654", int(50 * 10000 / 15.59), 15.59, 15.3, 200), |
| | | ("603701", int(50 * 10000 / 14.28), 14.28, 14.00, 200), |
| | | ("002908", int(50 * 10000 / 12.78), 12.78, 12.00, 200)] |
| | | |
| | | queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) |
| | | time.sleep(10) |
| | | while True: |
| | | try: |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) |
| | | time.sleep(0.1) |
| | | spi.l2_data_upload_manager.add_l2_order_detail( |
| | | {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | finally: |
| | | time.sleep(10) |
| | | |
| | | |
| | | def run(queue_r: multiprocessing.Queue, data_callbacks: list) -> None: |
| | | logger_system.info("L2进程ID:{}", os.getpid()) |
| | | logger_system.info(f"l2_client 线程ID:{tool.get_thread_id()}") |
| | | try: |
| | |
| | | if queue_r is not None: |
| | | t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True) |
| | | t1.start() |
| | | |
| | | # 订阅守护 |
| | | threading.Thread(target=SubscriptDefend.run, daemon=True).start() |
| | | # 初始化 |
| | | order_queue_distribute_manager = CodeQueueDistributeManager(order_queues) |
| | | transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues) |
| | | l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager, |
| | | transaction_queue_distribute_manager, market_queue) |
| | | data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) |
| | | l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager) |
| | | __init_l2(l2_data_upload_manager) |
| | | l2_data_manager.run_upload_common() |
| | | l2_data_manager.run_log() |
| | | # 测试 |
| | | # threading.Thread(target=lambda: test_add_codes(),daemon=True).start() |
| | | # TODO 测试 |
| | | # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start() |
| | | global l2CommandManager |
| | | l2CommandManager = command_manager.L2CommandManager() |
| | | l2CommandManager.init(MyL2ActionCallback()) |
| | |
| | | 'OrderTime': '13000015', |
| | | 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) |
| | | |
| | | queue_r = multiprocessing.Queue() |
| | | queue_r = multiprocessing.Queue(maxsize=1024) |
| | | order_queues = [] |
| | | transaction_queues = [] |
| | | market_queue = multiprocessing.Queue() |
| | | market_queue = multiprocessing.Queue(maxsize=1024) |
| | | for i in range(20): |
| | | order_queues.append(multiprocessing.Queue()) |
| | | transaction_queues.append(multiprocessing.Queue()) |
| | | order_queues.append(multiprocessing.Queue(maxsize=1024)) |
| | | transaction_queues.append(multiprocessing.Queue(maxsize=1024)) |
| | | threading.Thread(target=test_add_codes).start() |
| | | |
| | | run(queue_r, order_queues, transaction_queues, market_queue) |