From 70b51a2636858fb8cfbf39d3764d88d07286d8ad Mon Sep 17 00:00:00 2001 From: admin <admin@example.com> Date: 星期二, 08 四月 2025 11:27:42 +0800 Subject: [PATCH] 增加并行数量 --- huaxin_client/l1_api_client.py | 66 ++++++++++++++++++++++++++++----- 1 files changed, 56 insertions(+), 10 deletions(-) diff --git a/huaxin_client/l1_api_client.py b/huaxin_client/l1_api_client.py index 237c4f8..9a929c5 100644 --- a/huaxin_client/l1_api_client.py +++ b/huaxin_client/l1_api_client.py @@ -5,7 +5,7 @@ import constant import qcvalueaddproapi -from log_module.log import logger_system +from log_module.log import logger_system, logger_debug from utils import tool global g_userid, g_passwd, g_address, g_port, g_seqnum @@ -22,6 +22,8 @@ class sampleSpi(qcvalueaddproapi.CQCValueAddProSpi): __result_cache = {} __temp_cache = {} + # 鎸囨暟鏁版嵁 + stock_index_data_dict = {} def __init__(self, t_tapi): qcvalueaddproapi.CQCValueAddProSpi.__init__(self) @@ -151,11 +153,22 @@ "bob": datetime.strptime(r['TradingDay'], '%Y%m%d'), "amount": r["Turnover"] }) - print("ReqReqQryStockDayQuotation:", len(fresults)) + # print("ReqReqQryStockDayQuotation 90澶㎏绾�:", len(fresults)) return fresults except Exception as e: logging.exception(e) + + def __subStockIndex(self): + """ + 璁㈤槄鑲$エ鎸囨暟琛屾儏 + """ + # 娌繁300 + self.m_api.SubscribeStockIndexData(qcvalueaddproapi.QCVD_EXD_COMM, "000300") # 娌繁300 + self.m_api.SubscribeStockIndexData(qcvalueaddproapi.QCVD_EXD_SZSE, "000300") + self.m_api.SubscribeStockIndexData(qcvalueaddproapi.QCVD_EXD_SSE, "000001") # 涓婅瘉 + self.m_api.SubscribeStockIndexData(qcvalueaddproapi.QCVD_EXD_SZSE, "399006") # 鍒涗笟鏉挎寚鏁� + self.m_api.SubscribeStockIndexData(qcvalueaddproapi.QCVD_EXD_SZSE, "399001") # 娣卞湷鎴愭寚 def OnFrontConnected(self): print("OnFrontConnected") @@ -184,6 +197,27 @@ # threading.Thread(target=lambda: print("浜ゆ槗鏃ュ巻锛�", self.queryTradeCalendar("2024-03-08", "2024-12-31"))).start() # threading.Thread( # target=lambda: print("鏃锛�", self.queryBars("601298", "2024-12-15", "2024-12-31"))).start() + try: + self.__subStockIndex() + except: + pass + + def OnRtnStockIndexData(self, pStockIndexData): + # 鎸囨暟鏁版嵁 + try: + data = { + "PreClosePrice":pStockIndexData.PreClosePrice, + "LastPrice": pStockIndexData.LastPrice, + "SecurityID": pStockIndexData.SecurityID, + "UpdateTime": pStockIndexData.UpdateTime, + "Volume": pStockIndexData.Volume, + "Turnover": pStockIndexData.Turnover, + "LXLastPrice": pStockIndexData.LXLastPrice, + } + self.stock_index_data_dict[data["SecurityID"]] = data + # logger_debug.info(f"鎸囨暟琛屾儏搴旂瓟锛歿data}") + except Exception as e: + logging.exception(e) def ReqQryGGTEODPrices(self): QryField = qcvalueaddproapi.CQCVDQryGGTEODPricesField() @@ -211,7 +245,7 @@ else: self.__result_cache[nRequestID] = self.__temp_cache[nRequestID] self.__temp_cache.pop(nRequestID) - print("OnRspInquiryShareCalendar:", self.__result_cache[nRequestID]) + # print("OnRspInquiryShareCalendar:", self.__result_cache[nRequestID]) def OnRspInquiryStockDayQuotation(self, pStockDayQuotation, pRspInfo, nRequestID, bIsPageLast, bIsTotalLast): """ @@ -240,11 +274,10 @@ else: self.__result_cache[nRequestID] = self.__temp_cache[nRequestID] self.__temp_cache.pop(nRequestID) - print("OnRspInquiryStockDayQuotation:", len(self.__result_cache[nRequestID])) + # print("OnRspInquiryStockDayQuotation 90澶㎏绾�:", len(self.__result_cache[nRequestID])) def __read_request(request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue): - def __set_response_data(request_id, response_data): response_queue.put_nowait({"request_id": request_id, "data": response_data}) @@ -271,11 +304,25 @@ pass -def run(request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue): +def __upload_datas(data_queue: multiprocessing.Queue): + # 1s涓婁紶涓�娆� + while True: + try: + if thespi.stock_index_data_dict: + data_queue.put_nowait(("stock_index_datas", thespi.stock_index_data_dict)) + except: + pass + finally: + time.sleep(1) + + +def run(request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue, + data_callback_queue: multiprocessing.Queue): """ 杩愯 @param request_queue: 璇锋眰闃熷垪 @param response_queue: 鍝嶅簲闃熷垪 + @param data_callback_queue: 鏁版嵁鍥炶皟 @return: """ global g_userid, g_passwd, g_address, g_port @@ -290,18 +337,17 @@ #IP锛�192.168.84.61锛夈�佺鍙o紙25557 - - # 鍥炴祴浜ゆ槗鏄敱鍘嗗彶琛屾儏鏉ラ┍鍔ㄦ挳鍚堟垚浜�: # 鍥犳蹇呴』鍚屾椂浣跨敤traderapi鍜宮dapi锛屼笉鑳藉崟鐙娇鐢╰raderapi锛屽苟涓攎dapi鑷冲皯闇�瑕佽闃呬竴涓互涓婅鎯呫�� # 鐢ㄦ埛鍙娇鐢ㄥ洖娴媡raderapi鐨凴egisterFront鍑芥暟鏉ユ敞鍐屾鍦板潃鍘昏繛鎺ヤ笂鍥炴祴鏈嶅姟鍣� print("GetApiVersion():", qcvalueaddproapi.CQCValueAddProApi_GetApiVersion()) theapi = qcvalueaddproapi.CQCValueAddProApi_CreateInfoQryApi() - global thespi + global thespi thespi = sampleSpi(theapi) theapi.RegisterSpi(thespi) theapi.RegisterFront(g_address, g_port) - threading.Thread(target=__read_request, args=(request_queue, response_queue, ), daemon=True).start() + threading.Thread(target=__read_request, args=(request_queue, response_queue,), daemon=True).start() + threading.Thread(target=__upload_datas, args=(data_callback_queue,), daemon=True).start() theapi.Run() -- Gitblit v1.8.0