import logging import multiprocessing import threading import time import constant import qcvalueaddproapi from log_module.log import logger_system, logger_debug from utils import tool global g_userid, g_passwd, g_address, g_port, g_seqnum g_seqnum = 100000 from datetime import datetime def new_seqnum(): global g_seqnum g_seqnum = g_seqnum + 1 return g_seqnum class sampleSpi(qcvalueaddproapi.CQCValueAddProSpi): __result_cache = {} __temp_cache = {} # 指数数据 stock_index_data_dict = {} def __init__(self, t_tapi): qcvalueaddproapi.CQCValueAddProSpi.__init__(self) self.m_api = t_tapi def __create_request_id(self): return new_seqnum() def __query_trade_calendar(self, start_date, end_date, page_locate=1, page_count=200): """ 查询交易日历 @param start_date: 格式:20241201 @param end_date: 格式:20241201 @param page_locate: @param page_count: @return: 返回日期[顺序] """ try: queryField = qcvalueaddproapi.CQCVDReqQryShareCalendarField() queryField.BegDate = start_date queryField.EndDate = end_date queryField.PageCount = page_count queryField.PageLocate = page_locate queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_ASC request_id = self.__create_request_id() results = self.m_api.ReqReqQryShareCalendar(queryField, request_id) for i in range(0, 1000): if request_id in self.__result_cache: return self.__result_cache[request_id] time.sleep(0.002) except Exception as e: logging.exception(e) return [] def queryTradeCalendar(self, start_date, end_date): """ 查询交易日历 @return: """ fresults = [] try: page_locate = 1 for i in range(20): results = self.__query_trade_calendar(start_date.replace("-", ""), end_date.replace("-", ""), page_locate=page_locate) if results: fresults.extend(results) if len(results) != 200: break page_locate += 1 print("queryTradeCalendar:", len(fresults)) except Exception as e: logging.exception(e) return fresults def __query_bars(self, code, begin_date, end_date, page_locate=1, page_count=200): queryField = qcvalueaddproapi.CQCVDReqQryStockDayQuotationField() queryField.BegDate = begin_date.replace("-", "") queryField.EndDate = end_date.replace("-", "") if tool.is_sh_code(code): queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SSE else: queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SZSE queryField.SecurityID = code queryField.PageCount = page_count queryField.PageLocate = page_locate queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_DESC request_id = self.__create_request_id() results = self.m_api.ReqReqQryStockDayQuotation(queryField, request_id) # 读取结果 for i in range(0, 1000): # 读取结果 if request_id in self.__result_cache: results = self.__result_cache[request_id] return results time.sleep(0.002) return [] def queryBars(self, code, begin_date, end_date): """ 获取历史K线(包含起止日期) @param code: @param begin_date: 例如:2024-10-31 @param end_date: 例如:2024-11-30 @param page_locate: 第几页(从第一页开始) @return: """ try: page_locate = 1 final_results = [] for i in range(20): # 最多翻20页 results = self.__query_bars(code, begin_date, end_date, page_locate) if results: final_results.extend(results) if len(results) != 200: break else: page_locate += 1 # 最新的复权因子 start_adjust_factor = None for i in range(0, len(final_results)): d = final_results[i] if not start_adjust_factor: start_adjust_factor = d["AdjustFactor"] # 复权价格 if start_adjust_factor != d["AdjustFactor"]: # 开始复权 d["PreClosePrice"] = round(d["AdjustFactor"] * d["PreClosePrice"] / start_adjust_factor, 4) d["OpenPrice"] = round(d["AdjustFactor"] * d["OpenPrice"] / start_adjust_factor, 4) d["HighPrice"] = round(d["AdjustFactor"] * d["HighPrice"] / start_adjust_factor, 4) d["LowPrice"] = round(d["AdjustFactor"] * d["LowPrice"] / start_adjust_factor, 4) d["ClosePrice"] = round(d["AdjustFactor"] * d["ClosePrice"] / start_adjust_factor, 4) fresults = [] for r in final_results: fresults.append({"sec_id": r["SecurityID"], "open": r["OpenPrice"], "high": r["HighPrice"], "low": r["LowPrice"], "close": r["ClosePrice"], "volume": r["Volume"], "pre_close": r["PreClosePrice"], "bob": datetime.strptime(r['TradingDay'], '%Y%m%d'), "amount": r["Turnover"] }) # print("ReqReqQryStockDayQuotation 90天K线:", len(fresults)) return fresults except Exception as e: logging.exception(e) def __subStockIndex(self): """ 订阅股票指数行情 """ # 沪深300 self.m_api.SubscribeStockIndexData(qcvalueaddproapi.QCVD_EXD_COMM, "000300") # 沪深300 self.m_api.SubscribeStockIndexData(qcvalueaddproapi.QCVD_EXD_SZSE, "000300") self.m_api.SubscribeStockIndexData(qcvalueaddproapi.QCVD_EXD_SSE, "000001") # 上证 self.m_api.SubscribeStockIndexData(qcvalueaddproapi.QCVD_EXD_SZSE, "399006") # 创业板指数 self.m_api.SubscribeStockIndexData(qcvalueaddproapi.QCVD_EXD_SZSE, "399001") # 深圳成指 def OnFrontConnected(self): print("OnFrontConnected") # 连接上后去登录 loginfield = qcvalueaddproapi.CQCVDReqUserLoginField() loginfield.LogInAccount = g_userid loginfield.AuthMode = qcvalueaddproapi.QCVD_AM_Password loginfield.Password = g_passwd self.m_api.ReqUserLogin(loginfield, new_seqnum()) def OnFrontDisconnected(self, nReason): print("OnFrontDisconnected Reason[%d]" % (nReason)) # 登录请求响应 def OnRspUserLogin(self, pRspUserLoginField, pRspInfo, nRequestID, bIsLast): print("OnRspUserLogin LogInAccount[%s] RequestID[%d] ErrorID[%d] ErrorMsg[%s] " % (pRspUserLoginField.LogInAccount, nRequestID, pRspInfo.ErrorID, pRspInfo.ErrorMsg)) if pRspInfo.ErrorID == 0: logger_system.info("华鑫增值服务API登录成功") # 登录成功后直接查询 # self.ReqInquiryHistoryDelivery() # threading.Thread(target=lambda: print("交易日历:", self.queryTradeCalendar("2024-03-08", "2024-12-31"))).start() # threading.Thread( # target=lambda: print("日K:", self.queryBars("601298", "2024-12-15", "2024-12-31"))).start() try: self.__subStockIndex() except: pass def OnRtnStockIndexData(self, pStockIndexData): # 指数数据 try: data = { "PreClosePrice":pStockIndexData.PreClosePrice, "LastPrice": pStockIndexData.LastPrice, "SecurityID": pStockIndexData.SecurityID, "UpdateTime": pStockIndexData.UpdateTime, "Volume": pStockIndexData.Volume, "Turnover": pStockIndexData.Turnover, "LXLastPrice": pStockIndexData.LXLastPrice, } self.stock_index_data_dict[data["SecurityID"]] = data # logger_debug.info(f"指数行情应答:{data}") except Exception as e: logging.exception(e) def ReqQryGGTEODPrices(self): QryField = qcvalueaddproapi.CQCVDQryGGTEODPricesField() self.m_api.ReqQryGGTEODPrices(QryField, new_seqnum()) def ReqQryInvestor(self): QryField = qcvalueaddproapi.CQCVDQryInvestorField() self.m_api.ReqQryInvestor(QryField, new_seqnum()) def OnRspInquiryShareCalendar(self, pShareCalendar, pRspInfo, nRequestID, bIsPageLast, bIsTotalLast): """ 交易日历响应 @param pShareCalendar: @param pRspInfo: @param nRequestID: @param bIsPageLast: @param bIsTotalLast: @return: """ if nRequestID not in self.__temp_cache: self.__temp_cache[nRequestID] = [] if pShareCalendar: self.__temp_cache[nRequestID].append(pShareCalendar.TradingDay) else: self.__result_cache[nRequestID] = self.__temp_cache[nRequestID] self.__temp_cache.pop(nRequestID) # print("OnRspInquiryShareCalendar:", self.__result_cache[nRequestID]) def OnRspInquiryStockDayQuotation(self, pStockDayQuotation, pRspInfo, nRequestID, bIsPageLast, bIsTotalLast): """ 日K响应 @param pStockDayQuotation: @param pRspInfo: @param nRequestID: @param bIsPageLast: @param bIsTotalLast: @return: """ if nRequestID not in self.__temp_cache: self.__temp_cache[nRequestID] = [] # print("是否本页查询完毕:", bIsPageLast) if not bIsPageLast: self.__temp_cache[nRequestID].append({ "SecurityID": pStockDayQuotation.SecurityID, "TradingDay": pStockDayQuotation.TradingDay, "AdjustFactor": pStockDayQuotation.AdjustFactor, "PreClosePrice": pStockDayQuotation.PreClosePrice, "OpenPrice": pStockDayQuotation.OpenPrice, "HighPrice": pStockDayQuotation.HighPrice, "LowPrice": pStockDayQuotation.LowPrice, "ClosePrice": pStockDayQuotation.ClosePrice, "Volume": int(pStockDayQuotation.Volume * 100), "Turnover": int(pStockDayQuotation.Turnover * 1000) }) else: self.__result_cache[nRequestID] = self.__temp_cache[nRequestID] self.__temp_cache.pop(nRequestID) # print("OnRspInquiryStockDayQuotation 90天K线:", len(self.__result_cache[nRequestID])) def __read_request(request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue): def __set_response_data(request_id, response_data): response_queue.put_nowait({"request_id": request_id, "data": response_data}) while True: try: val = request_queue.get() type_ = val['type'] request_id = val['request_id'] data = val.get('data') if type_ == 'get_trade_calendar': # 获取交易日历 start_date = data['start_date'] end_date = data['end_date'] results = thespi.queryTradeCalendar(start_date, end_date) __set_response_data(request_id, results) elif type_ == 'get_history_k_bars': # 获取历史K线 start_date = data['start_date'] end_date = data['end_date'] code = data['code'] results = thespi.queryBars(code, start_date, end_date) __set_response_data(request_id, results) except Exception as e: pass def __upload_datas(data_queue: multiprocessing.Queue): # 1s上传一次 while True: try: if thespi.stock_index_data_dict: data_queue.put_nowait(("stock_index_datas", thespi.stock_index_data_dict)) except: pass finally: time.sleep(1) def run(request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue, data_callback_queue: multiprocessing.Queue): """ 运行 @param request_queue: 请求队列 @param response_queue: 响应队列 @param data_callback_queue: 数据回调 @return: """ global g_userid, g_passwd, g_address, g_port g_address = "101.230.90.99" g_port = 25556 g_userid = "388000013942" g_passwd = "110808" if not constant.is_windows(): # 内网 g_address = "192.168.84.61" g_port = 25557 #IP(192.168.84.61)、端口(25557 # 回测交易是由历史行情来驱动撮合成交: # 因此必须同时使用traderapi和mdapi,不能单独使用traderapi,并且mdapi至少需要订阅一个以上行情。 # 用户可使用回测traderapi的RegisterFront函数来注册此地址去连接上回测服务器 print("GetApiVersion():", qcvalueaddproapi.CQCValueAddProApi_GetApiVersion()) theapi = qcvalueaddproapi.CQCValueAddProApi_CreateInfoQryApi() global thespi thespi = sampleSpi(theapi) theapi.RegisterSpi(thespi) theapi.RegisterFront(g_address, g_port) threading.Thread(target=__read_request, args=(request_queue, response_queue,), daemon=True).start() threading.Thread(target=__upload_datas, args=(data_callback_queue,), daemon=True).start() theapi.Run() def main(): request_queue, response_queue = multiprocessing.Queue(), multiprocessing.Queue() run(request_queue, response_queue) if __name__ == '__main__': main()