From 48fb7a00951f91bdc707e5dd2d196e5bccb752c3 Mon Sep 17 00:00:00 2001 From: Administrator <admin@example.com> Date: 星期三, 18 六月 2025 18:41:30 +0800 Subject: [PATCH] 异常保护 --- huaxin_client/l2_client.py | 384 +++++++++++++++++++++++++++++++----------------------- 1 files changed, 221 insertions(+), 163 deletions(-) diff --git a/huaxin_client/l2_client.py b/huaxin_client/l2_client.py index 4672fcd..5876188 100644 --- a/huaxin_client/l2_client.py +++ b/huaxin_client/l2_client.py @@ -7,26 +7,24 @@ import threading import time import concurrent.futures -from typing import List -from huaxin_client import command_manager, l2_data_transform_protocol +from huaxin_client import command_manager from huaxin_client import constant from huaxin_client import l2_data_manager import lev2mdapi -from huaxin_client.code_queue_distribute_manager import CodeQueueDistributeManager +from huaxin_client.code_queue_distribute_manager import CodeDataCallbackDistributeManager from huaxin_client.command_manager import L2ActionCallback from huaxin_client.l2_data_manager import L2DataUploadManager from log_module import log, async_log_util from log_module.async_log_util import huaxin_l2_log -from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_local_huaxin_l2_transaction, \ - logger_local_huaxin_g_cancel, logger_l2_codes_subscript +from log_module.log import logger_local_huaxin_l2_subscript, logger_system, logger_l2_codes_subscript, logger_debug from utils import tool ###B绫�### Front_Address = "tcp://10.0.1.101:6900" Multicast_Address = "udp://224.224.2.19:7889" Multicast_Address2 = "udp://224.224.224.234:7890" -Local_Interface_Address = "192.168.84.75" +Local_Interface_Address = constant.LOCAL_IP ###A绫�### if constant.IS_A: @@ -52,8 +50,9 @@ SZ_Securities = [b"002456", b"002849", b"002281", b"002336", b"000936", b"000920", b"000757", b"002896", b"002725", b"000952", b"000526", b"000753", b"000681", b"002088", b"002436"] SZ_Bond_Securities = [b"100303", b"109559", b"112617"] -set_codes_data_queue = queue.Queue() -market_code_dict = {} +set_codes_data_queue = queue.Queue(maxsize=1000) + +ENABLE_NGST = True class Lev2MdSpi(lev2mdapi.CTORATstpLev2MdSpi): @@ -78,9 +77,10 @@ szse_codes = [] sse_codes = [] for code in codes: - if code.find("00") == 0: + market_type = tool.get_market_type(code) + if market_type == tool.MARKET_TYPE_SZSE: szse_codes.append(code.encode()) - elif code.find("60") == 0: + elif market_type == tool.MARKET_TYPE_SSE: sse_codes.append(code.encode()) return sse_codes, szse_codes @@ -92,27 +92,38 @@ logger_local_huaxin_l2_subscript.info(f"鍙栨秷璁㈤槄涓婅瘉锛歿sh}") logger_local_huaxin_l2_subscript.info(f"鍙栨秷璁㈤槄娣辫瘉锛歿sz}") if sh: - # 鍙栨秷璁㈤槄閫愮瑪濮旀墭 - self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - # 鍙栨秷璁㈤槄閫愮瑪鎴愪氦 - self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + if ENABLE_NGST: + result = self.__api.UnSubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + logger_local_huaxin_l2_subscript.info(f"閫愮瑪NGTS璁㈤槄缁撴灉sh锛歿result}") + else: + # 鍙栨秷璁㈤槄閫愮瑪濮旀墭 + self.__api.UnSubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + # 鍙栨秷璁㈤槄閫愮瑪鎴愪氦 + self.__api.UnSubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) self.__api.UnSubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) if sz: self.__api.UnSubscribeOrderDetail(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) self.__api.UnSubscribeTransaction(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) self.__api.UnSubscribeMarketData(sz, lev2mdapi.TORA_TSTP_EXD_SZSE) + def subscribe_codes(self, _codes): + self.__subscribe(_codes) + def __subscribe(self, _codes): sh, sz = self.__split_codes(_codes) logger_local_huaxin_l2_subscript.info(f"璁㈤槄涓婅瘉锛歿sh}") logger_local_huaxin_l2_subscript.info(f"璁㈤槄娣辫瘉锛歿sz}") if sh: - # 璁㈤槄閫愮瑪濮旀墭 - result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}") - # 璁㈤槄閫愮瑪鎴愪氦 - result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) - logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}") + if ENABLE_NGST: + result = self.__api.SubscribeNGTSTick(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + logger_local_huaxin_l2_subscript.info(f"閫愮瑪NGTS璁㈤槄缁撴灉sh锛歿result}") + else: + # 璁㈤槄閫愮瑪濮旀墭 + result = self.__api.SubscribeOrderDetail(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + logger_local_huaxin_l2_subscript.info(f"閫愮瑪濮旀墭璁㈤槄缁撴灉sh锛歿result}") + # 璁㈤槄閫愮瑪鎴愪氦 + result = self.__api.SubscribeTransaction(sh, lev2mdapi.TORA_TSTP_EXD_SSE) + logger_local_huaxin_l2_subscript.info(f"閫愮瑪鎴愪氦璁㈤槄缁撴灉sh锛歿result}") result = self.__api.SubscribeMarketData(sh, lev2mdapi.TORA_TSTP_EXD_SSE) logger_local_huaxin_l2_subscript.info(f"甯傚満璁㈤槄缁撴灉sh锛歿result}") @@ -126,6 +137,9 @@ def __process_codes_data(self, codes_data, from_cache=False, delay=0.0): + if from_cache and self.codes_volume_and_price_dict: + return + if not self.is_login and not constant.TEST: raise Exception("L2灏氭湭鐧诲綍") if delay > 0: @@ -134,8 +148,9 @@ for d in codes_data: code = d[0] codes.add(code) - self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3]) - self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], float(d[2]), d[3]) + self.codes_volume_and_price_dict[code] = (d[1], d[2], d[3], d[4], d[5]) + self.l2_data_upload_manager.set_order_fileter_condition(code, d[1], round(float(d[2]), 2), d[3], d[4], d[5]) + logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄鎬绘暟锛歿}", len(codes)) add_codes = codes - self.subscripted_codes del_codes = self.subscripted_codes - codes print("add del codes", add_codes, del_codes) @@ -144,9 +159,9 @@ self.l2_data_upload_manager.release_distributed_upload_queue(c) l2_data_manager.del_target_code(c) for c in codes: - self.l2_data_upload_manager.distribute_upload_queue(c) + self.l2_data_upload_manager.distribute_upload_queue(c, codes) l2_data_manager.add_target_code(c) - except Exception as e: + except Exception as e: # TODO 娓呴櫎鍘熸潵杩樻病閲婃斁鎺夌殑鏁版嵁 logger_system.error(f"L2浠g爜鍒嗛厤涓婁紶闃熷垪鍑洪敊:{str(e)}") logger_system.exception(e) self.__subscribe(add_codes) @@ -154,6 +169,8 @@ if add_codes: logger_system.info(f"鏂板L2璁㈤槄浠g爜鏁伴噺({'缂撳瓨' if from_cache else ''}):{len(add_codes)}") + for c in add_codes: + logger_l2_codes_subscript.info(f"l2濮旀墭鏁版嵁杩囨护鏉′欢锛歿c} - {self.codes_volume_and_price_dict.get(c)}") logger_l2_codes_subscript.info("鍗庨懌L2璁㈤槄缁撴潫锛宎dd-{} del-{}", len(add_codes), len(del_codes)) @@ -186,17 +203,6 @@ return data_json[1] return [] - def set_code_special_watch_volume(self, code, volume): - # 鏈夋晥鏈熶负3s - # self.special_code_volume_for_order_dict[code] = (volume, time.time() + 3) - d = self.codes_volume_and_price_dict.get(code) - if d: - min_volume, limit_up_price, special_price = d[0], d[1], d[2] - self.l2_data_upload_manager.set_order_fileter_condition(code, min_volume, limit_up_price,special_price, - {volume, constant.SHADOW_ORDER_VOLUME}, - time.time() + 3) - huaxin_l2_log.info(logger_local_huaxin_l2_subscript, f"璁剧疆涓嬪崟閲忕洃鍚細{code}-{volume}") - def OnFrontConnected(self): print("OnFrontConnected") logger_system.info(f"l2_client OnFrontConnected 绾跨▼ID:{tool.get_thread_id()}") @@ -213,10 +219,12 @@ if pRspInfo['ErrorID'] == 0: print("----L2琛屾儏鐧诲綍鎴愬姛----") self.is_login = True + logger_system.info(f"L2琛屾儏鐧诲綍鎴愬姛") # 鍒濆璁剧疆鍊� - threading.Thread( - target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=6.0), - daemon=True).start() + if tool.trade_time_sub(tool.get_now_time_str(), "09:20:00") > 0: + threading.Thread( + target=lambda: self.__process_codes_data(self.__get_latest_datas(), from_cache=True, delay=60), + daemon=True).start() def OnRspSubMarketData(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubMarketData") @@ -237,6 +245,8 @@ if pRspInfo["ErrorID"] == 0: print("璁㈤槄鎴愬姛") self.subscripted_codes.add(pSpecificSecurity['SecurityID']) + # 鍒濆鍖� + SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) if bIsLast == 1: print("璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes) l2_data_manager.add_subscript_codes(self.subscripted_codes) @@ -245,6 +255,29 @@ print("OnRspUnSubOrderDetail", bIsLast) try: code = pSpecificSecurity['SecurityID'] + self.subscripted_codes.discard(code) + if bIsLast == 1: + print("鍙栨秷璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes) + l2_data_manager.add_subscript_codes(self.subscripted_codes) + except Exception as e: + logging.exception(e) + + def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): + async_log_util.info(logger_local_huaxin_l2_subscript, + f"NGTS璁㈤槄缁撴灉锛歿pSpecificSecurity['SecurityID']} {pRspInfo['ErrorID']} {pRspInfo['ErrorMsg']}") + if pRspInfo["ErrorID"] == 0: + print("璁㈤槄鎴愬姛") + self.subscripted_codes.add(pSpecificSecurity['SecurityID']) + # 鍒濆鍖� + SubscriptDefend.set_l2_market_update(pSpecificSecurity['SecurityID']) + if bIsLast == 1: + print("璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes) + l2_data_manager.add_subscript_codes(self.subscripted_codes) + + def OnRspUnSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): + try: + code = pSpecificSecurity['SecurityID'] + logger_local_huaxin_l2_subscript.info(f"NGTS鍙栨秷璁㈤槄锛歿code}") self.subscripted_codes.discard(code) if bIsLast == 1: print("鍙栨秷璁㈤槄鍝嶅簲缁撴潫", self.subscripted_codes) @@ -267,34 +300,42 @@ def OnRspSubXTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): print("OnRspSubXTSTick") - # 4.0.5鐗堟湰鎺ュ彛 - def OnRspSubNGTSTick(self, pSpecificSecurity, pRspInfo, nRequestID, bIsLast): - print("OnRspSubNGTSTick") - def OnRtnMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, FirstLevelSellOrderVolumes): # 浼犲叆锛氭椂闂达紝鐜颁环,鎴愪氦鎬婚噺,涔�1锛屼拱2锛屼拱3,涔�4,涔�5,鍗�1,鍗�2,鍗�3,鍗�4,鍗�5 try: + buys = [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']), + (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']), + (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']), + (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']), + (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])] + for i in range(6, 11): + if not pDepthMarketData[f"BidVolume{i}"]: + break + buys.append((pDepthMarketData[f'BidPrice{i}'], pDepthMarketData[f'BidVolume{i}'])) + + sells = [ + (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']), + (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']), + (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']), + (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']), + (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) + ] + for i in range(6, 11): + if not pDepthMarketData[f"AskVolume{i}"]: + break + sells.append((pDepthMarketData[f'AskPrice{i}'], pDepthMarketData[f'AskVolume{i}'])) + d = {"dataTimeStamp": pDepthMarketData['DataTimeStamp'], "securityID": pDepthMarketData['SecurityID'], "lastPrice": pDepthMarketData['LastPrice'], "totalVolumeTrade": pDepthMarketData['TotalVolumeTrade'], "totalValueTrade": pDepthMarketData['TotalValueTrade'], "totalAskVolume": pDepthMarketData['TotalAskVolume'], "avgAskPrice": pDepthMarketData["AvgAskPrice"], - "buy": [(pDepthMarketData['BidPrice1'], pDepthMarketData['BidVolume1']), - (pDepthMarketData['BidPrice2'], pDepthMarketData['BidVolume2']), - (pDepthMarketData['BidPrice3'], pDepthMarketData['BidVolume3']), - (pDepthMarketData['BidPrice4'], pDepthMarketData['BidVolume4']), - (pDepthMarketData['BidPrice5'], pDepthMarketData['BidVolume5'])], - "sell": [ - (pDepthMarketData['AskPrice1'], pDepthMarketData['AskVolume1']), - (pDepthMarketData['AskPrice2'], pDepthMarketData['AskVolume2']), - (pDepthMarketData['AskPrice3'], pDepthMarketData['AskVolume3']), - (pDepthMarketData['AskPrice4'], pDepthMarketData['AskVolume4']), - (pDepthMarketData['AskPrice5'], pDepthMarketData['AskVolume5']) - ]} - market_code_dict[pDepthMarketData['SecurityID']] = time.time() + "buy": buys, + "sell": sells} self.l2_data_upload_manager.add_market_data(d) + SubscriptDefend.set_l2_market_update(pDepthMarketData['SecurityID']) except: pass @@ -314,12 +355,6 @@ # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) # 杈撳嚭閫愮瑪鎴愪氦鏁版嵁 if pTransaction['ExecType'] == b"2": - # if min_volume is None: - # # 榛樿绛涢��50w - # if pTransaction['TradePrice'] * pTransaction['Volume'] < 500000: - # return - # elif pTransaction['TradeVolume'] < min_volume: - # return # 鎾ゅ崟 item = {"SecurityID": pTransaction['SecurityID'], "Price": pTransaction['TradePrice'], "Volume": pTransaction['TradeVolume'], @@ -357,28 +392,6 @@ self.l2_data_upload_manager.add_transaction_detail(item) def OnRtnOrderDetail(self, pOrderDetail): - # can_listen = False - # code = str(pOrderDetail['SecurityID']) - # start_time = 0 - # if code in self.special_code_volume_for_order_dict: - # start_time = time.time() - # if self.special_code_volume_for_order_dict[code][0] == pOrderDetail[ - # 'Volume'] or constant.SHADOW_ORDER_VOLUME == pOrderDetail['Volume']: - # # 鐩戞帶鐩爣璁㈠崟涓庡奖瀛愯鍗� - # if self.special_code_volume_for_order_dict[code][1] > time.time(): - # # 鐗规畩閲忕洃鍚� - # can_listen = True - # else: - # self.special_code_volume_for_order_dict.pop(code) - # if not can_listen: - # min_volume, limit_up_price = self.codes_volume_and_price_dict.get(code) - # if min_volume is None: - # # 榛樿绛涢��50w - # if pOrderDetail['Price'] * pOrderDetail['Volume'] < 500000: - # return - # elif pOrderDetail['Volume'] < min_volume: - # return - # 杈撳嚭閫愮瑪濮旀墭鏁版嵁 # 涓婅瘉OrderStatus=b"D"琛ㄧず鎾ゅ崟 item = {"SecurityID": pOrderDetail['SecurityID'], "Price": pOrderDetail['Price'], "Volume": pOrderDetail['Volume'], @@ -387,6 +400,38 @@ "SubSeq": pOrderDetail['SubSeq'], "OrderNO": pOrderDetail['OrderNO'], "OrderStatus": pOrderDetail['OrderStatus'].decode()} self.l2_data_upload_manager.add_l2_order_detail(item, 0) + + def OnRtnNGTSTick(self, pTick): + """ + 涓婅瘉鑲$エ鐨勯�愮瑪濮旀墭涓庢垚浜� + @param pTick: + @return: + """ + try: + if pTick['TickType'] == b'T': + # 鎴愪氦 + item = {"SecurityID": pTick['SecurityID'], "TradePrice": pTick['Price'], + "TradeVolume": pTick['Volume'], + "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'], + "SubSeq": pTick['SubSeq'], "BuyNo": pTick['BuyNo'], + "SellNo": pTick['SellNo'], + "ExecType": '1'} + self.l2_data_upload_manager.add_transaction_detail(item) + elif pTick['TickType'] == b'A' or pTick['TickType'] == b'D': + # 鎾ゅ崟 + item = {"SecurityID": pTick['SecurityID'], "Price": pTick['Price'], + "Volume": pTick['Volume'], + "Side": pTick['Side'].decode(), "OrderType": pTick['TickType'].decode(), + "OrderTime": pTick['TickTime'], "MainSeq": pTick['MainSeq'], + "SubSeq": pTick['SubSeq'], "OrderNO": '', + "OrderStatus": pTick['TickType'].decode()} + if pTick['Side'] == b'1': + item['OrderNO'] = pTick['BuyNo'] + elif pTick['Side'] == b'2': + item['OrderNO'] = pTick['SellNo'] + self.l2_data_upload_manager.add_l2_order_detail(item, 0) + except Exception as e: + logger_local_huaxin_l2_subscript.exception(e) def OnRtnBondMarketData(self, pDepthMarketData, FirstLevelBuyNum, FirstLevelBuyOrderVolumes, FirstLevelSellNum, FirstLevelSellOrderVolumes): @@ -460,33 +505,44 @@ for sell_index in range(0, FirstLevelSellNum): print("first level sell [%d] : [%d]" % (sell_index, FirstLevelSellOrderVolumes[sell_index])) - def OnRtnXTSTick(self, pTick): - # 杈撳嚭涓婃捣鍊哄埜閫愮瑪鏁版嵁鈥� - print( - "OnXTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % ( - pTick['TickType'], - pTick['SecurityID'], - pTick['Price'], - pTick['Volume'], - pTick['TickTime'], - pTick['MainSeq'], - pTick['SubSeq'], - pTick['BuyNo'], - pTick['SellNo'])) - def OnRtnNGTSTick(self, pTick): - # 杈撳嚭涓婃捣鑲″熀閫愮瑪鏁版嵁鈥� - print( - "OnRtnNGTSTick TickType[%s] SecurityID[%s] Price[%.2f] Volume[%d] TickTime[%d] MainSeq[%d] SubSeq[%d] BuyNo[%d] SellNo[%d]" % ( - pTick['TickType'], - pTick['SecurityID'], - pTick['Price'], - pTick['Volume'], - pTick['TickTime'], - pTick['MainSeq'], - pTick['SubSeq'], - pTick['BuyNo'], - pTick['SellNo'])) +class SubscriptDefend: + """ + 璁㈤槄瀹堟姢 + 瀹氫箟锛氬綋璁㈤槄鐨勪唬鐮佽秴杩囦竴瀹氭椂闂存病鏈夊洖璋冩暟鎹椂閲嶆柊璁㈤槄 + """ + __l2_market_update_time = {} + + @classmethod + def set_l2_market_update(cls, code): + cls.__l2_market_update_time[code] = time.time() + + @classmethod + def run(cls): + while True: + try: + now_time = tool.get_now_time_as_int() + if now_time < int("093015"): + continue + if int("112945") < now_time < int("130015"): + continue + if int("145645") < now_time: + continue + if spi.subscripted_codes: + codes = [] + for code in spi.subscripted_codes: + # 鑾峰彇涓婃鏇存柊鏃堕棿 + update_time = cls.__l2_market_update_time.get(code) + if update_time and time.time() - update_time > 15: + # 闇�瑕侀噸鏂拌闃� + codes.append(code) + if codes: + logger_debug.info(f"閲嶆柊璁㈤槄锛歿codes}") + spi.subscribe_codes(codes) + except: + pass + finally: + time.sleep(15) class MyL2ActionCallback(L2ActionCallback): @@ -507,9 +563,9 @@ g_SubMode = lev2mdapi.TORA_TSTP_MST_MCAST # case 1缂撳瓨妯″紡 - # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True) + api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, True) # case 2闈炵紦瀛樻ā寮� - api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) + # api = lev2mdapi.CTORATstpLev2MdApi_CreateTstpLev2MdApi(g_SubMode, False) global spi spi = Lev2MdSpi(api, l2_data_upload_manager) api.RegisterSpi(spi) @@ -545,11 +601,7 @@ value = value.decode("utf-8") data = json.loads(value) _type = data["type"] - if _type == "listen_volume": - volume = data["data"]["volume"] - code = data["data"]["code"] - spi.set_code_special_watch_volume(code, volume) - elif _type == "l2_cmd": + if _type == "l2_cmd": __start_time = time.time() # 绾跨▼姹� __l2_cmd_thread_pool.submit( @@ -565,43 +617,50 @@ pipe_strategy = None -def run(queue_r: multiprocessing.Queue, order_queues: List[multiprocessing.Queue], - transaction_queues: List[multiprocessing.Queue], market_queue: multiprocessing.Queue) -> None: - # def test_add_codes(): - # time.sleep(5) - # # if value: - # # if type(value) == bytes: - # # value = value.decode("utf-8") - # # data = json.loads(value) - # # _type = data["type"] - # # if _type == "listen_volume": - # # volume = data["data"]["volume"] - # # code = data["data"]["code"] - # # spi.set_code_special_watch_volume(code, volume) - # # elif _type == "l2_cmd": - # # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) - # - # demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35), ("002654", int(50 * 10000 / 15.59), 15.59), - # ("603701", int(50 * 10000 / 14.28), 14.28), ("002908", int(50 * 10000 / 12.78), 12.78)] - # - # queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) - # time.sleep(1) - # - # spi.l2_data_upload_manager.add_l2_order_detail( - # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', - # 'OrderTime': '13000015', - # 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) - # spi.l2_data_upload_manager.add_l2_order_detail( - # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', - # 'OrderTime': '13000015', - # 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) - # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) - # time.sleep(0.1) - # spi.l2_data_upload_manager.add_l2_order_detail( - # {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', - # 'OrderTime': '13000015', - # 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) +def test_add_codes(queue_r): + time.sleep(10) + # if value: + # if type(value) == bytes: + # value = value.decode("utf-8") + # data = json.loads(value) + # _type = data["type"] + # if _type == "listen_volume": + # volume = data["data"]["volume"] + # code = data["data"]["code"] + # spi.set_code_special_watch_volume(code, volume) + # elif _type == "l2_cmd": + # l2CommandManager.process_command(command_manager.CLIENT_TYPE_CMD_L2, None, data) + demo_datas = [("603002", int(50 * 10000 / 6.35), 6.35, 6.00, 200), + ("002654", int(50 * 10000 / 15.59), 15.59, 15.3, 200), + ("603701", int(50 * 10000 / 14.28), 14.28, 14.00, 200), + ("002908", int(50 * 10000 / 12.78), 12.78, 12.00, 200)] + + queue_r.put_nowait(json.dumps({"type": "l2_cmd", "data": [demo_datas[0]]})) + time.sleep(10) + while True: + try: + spi.l2_data_upload_manager.add_l2_order_detail( + {'SecurityID': '603002', 'Price': 6.35, 'Volume': 275000, 'Side': "1", 'OrderType': '0', + 'OrderTime': '13000015', + 'MainSeq': 2, 'SubSeq': 6739147, 'OrderNO': 5512466, 'OrderStatus': 'D'}, 0) + spi.l2_data_upload_manager.add_l2_order_detail( + {'SecurityID': '603002', 'Price': 6.35, 'Volume': 200, 'Side': "1", 'OrderType': '0', + 'OrderTime': '13000015', + 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) + # queue_r.put_nowait(json.dumps({"type": "listen_volume", "data": {"code": "603002", "volume": 100}})) + time.sleep(0.1) + spi.l2_data_upload_manager.add_l2_order_detail( + {'SecurityID': '603002', 'Price': 6.35, 'Volume': 100, 'Side': "1", 'OrderType': '0', + 'OrderTime': '13000015', + 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) + except Exception as e: + logging.exception(e) + finally: + time.sleep(10) + + +def run(queue_r: multiprocessing.Queue, data_callbacks: list) -> None: logger_system.info("L2杩涚▼ID锛歿}", os.getpid()) logger_system.info(f"l2_client 绾跨▼ID:{tool.get_thread_id()}") try: @@ -609,17 +668,16 @@ if queue_r is not None: t1 = threading.Thread(target=lambda: __receive_from_queue_trade(queue_r), daemon=True) t1.start() - + # 璁㈤槄瀹堟姢 + threading.Thread(target=SubscriptDefend.run, daemon=True).start() # 鍒濆鍖� - order_queue_distribute_manager = CodeQueueDistributeManager(order_queues) - transaction_queue_distribute_manager = CodeQueueDistributeManager(transaction_queues) - l2_data_upload_manager = L2DataUploadManager(order_queue_distribute_manager, - transaction_queue_distribute_manager, market_queue) + data_callback_distribute_manager = CodeDataCallbackDistributeManager(data_callbacks) + l2_data_upload_manager = L2DataUploadManager(data_callback_distribute_manager) __init_l2(l2_data_upload_manager) l2_data_manager.run_upload_common() l2_data_manager.run_log() - # 娴嬭瘯 - # threading.Thread(target=lambda: test_add_codes(),daemon=True).start() + # TODO 娴嬭瘯 + # threading.Thread(target=lambda: test_add_codes(queue_r), daemon=True).start() global l2CommandManager l2CommandManager = command_manager.L2CommandManager() l2CommandManager.init(MyL2ActionCallback()) @@ -666,13 +724,13 @@ 'OrderTime': '13000015', 'MainSeq': 2, 'SubSeq': 6739148, 'OrderNO': 5512467, 'OrderStatus': 'D'}, 0) - queue_r = multiprocessing.Queue() + queue_r = multiprocessing.Queue(maxsize=1024) order_queues = [] transaction_queues = [] - market_queue = multiprocessing.Queue() + market_queue = multiprocessing.Queue(maxsize=1024) for i in range(20): - order_queues.append(multiprocessing.Queue()) - transaction_queues.append(multiprocessing.Queue()) + order_queues.append(multiprocessing.Queue(maxsize=1024)) + transaction_queues.append(multiprocessing.Queue(maxsize=1024)) threading.Thread(target=test_add_codes).start() run(queue_r, order_queues, transaction_queues, market_queue) -- Gitblit v1.8.0