import logging import threading import time import qcvalueaddproapi from utils import tool global g_userid, g_passwd, g_address, g_port, g_seqnum g_seqnum = 100000 def new_seqnum(): global g_seqnum g_seqnum = g_seqnum + 1 return g_seqnum class sampleSpi(qcvalueaddproapi.CQCValueAddProSpi): __result_cache = {} __temp_cache = {} def __init__(self, t_tapi): qcvalueaddproapi.CQCValueAddProSpi.__init__(self) self.m_api = t_tapi def __create_request_id(self): return new_seqnum() def queryTradeCalendar(self): try: queryField = qcvalueaddproapi.CQCVDReqQryShareCalendarField() # queryField.BegDate = "20240704" queryField.EndDate = "20240904" queryField.PageCount = 10 queryField.PageLocate = 1 if queryField.BegDate: queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_ASC if queryField.EndDate: queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_DESC request_id = self.__create_request_id() results = self.m_api.ReqReqQryShareCalendar(queryField, request_id) for i in range(0, 1000): if request_id in self.__result_cache: return self.__result_cache time.sleep(0.002) print("ReqReqQryShareCalendar:", results) except Exception as e: logging.exception(e) def queryBars(self, code, begin_date, end_date): try: queryField = qcvalueaddproapi.CQCVDReqQryStockDayQuotationField() queryField.BegDate = begin_date.replace("-", "") queryField.EndDate = end_date.replace("-", "") if tool.is_sh_code(code): queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SSE else: queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SZSE queryField.SecurityID = code # queryField.PageCount = 10 queryField.PageLocate = 1 queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_DESC request_id = self.__create_request_id() results = self.m_api.ReqReqQryStockDayQuotation(queryField, request_id) for i in range(0, 1000): if request_id in self.__result_cache: results = self.__result_cache[request_id] # 最新的复权因子 start_adjust_factor = None for i in range(0, len(results)): d = results[i] if not start_adjust_factor: start_adjust_factor = d["AdjustFactor"] # 复权价格 if start_adjust_factor != d["AdjustFactor"]: # 开始复权 d["PreClosePrice"] = round(d["AdjustFactor"] * d["PreClosePrice"] / start_adjust_factor, 4) d["OpenPrice"] = round(d["AdjustFactor"] * d["OpenPrice"] / start_adjust_factor, 4) d["HighPrice"] = round(d["AdjustFactor"] * d["HighPrice"] / start_adjust_factor, 4) d["LowPrice"] = round(d["AdjustFactor"] * d["LowPrice"] / start_adjust_factor, 4) d["ClosePrice"] = round(d["AdjustFactor"] * d["ClosePrice"] / start_adjust_factor, 4) fresults = [] for r in results: fresults.append({"sec_id": r["SecurityID"], "open": r["TradingDay"], "high": r["HighPrice"], "low": r["LowPrice"], "close": r["ClosePrice"], "volume": r["Volume"], "pre_close": r["PreClosePrice"], "bob": f"{r['TradingDay'][:4]}-{r['TradingDay'][4:6]}-{r['TradingDay'][6:]} 00:00:00", "amount": r["Turnover"] }) return fresults time.sleep(0.002) print("ReqReqQryStockDayQuotation:", results) except Exception as e: logging.exception(e) def OnFrontConnected(self): print("OnFrontConnected") # 连接上后去登录 loginfield = qcvalueaddproapi.CQCVDReqUserLoginField() loginfield.LogInAccount = g_userid loginfield.AuthMode = qcvalueaddproapi.QCVD_AM_Password loginfield.Password = g_passwd self.m_api.ReqUserLogin(loginfield, new_seqnum()) def OnFrontDisconnected(self, nReason): print("OnFrontDisconnected Reason[%d]" % (nReason)) # 登录请求响应 def OnRspUserLogin(self, pRspUserLoginField, pRspInfo, nRequestID, bIsLast): print("OnRspUserLogin LogInAccount[%s] RequestID[%d] ErrorID[%d] ErrorMsg[%s] " % (pRspUserLoginField.LogInAccount, nRequestID, pRspInfo.ErrorID, pRspInfo.ErrorMsg)) if (pRspInfo.ErrorID == 0): # 登录成功后直接查询 # self.ReqInquiryHistoryDelivery() # threading.Thread(target=lambda : print("交易日历:", self.queryTradeCalendar())).start() threading.Thread(target=lambda: print("日K:", self.queryBars())).start() def ReqQryGGTEODPrices(self): QryField = qcvalueaddproapi.CQCVDQryGGTEODPricesField() self.m_api.ReqQryGGTEODPrices(QryField, new_seqnum()) def ReqQryInvestor(self): QryField = qcvalueaddproapi.CQCVDQryInvestorField() self.m_api.ReqQryInvestor(QryField, new_seqnum()) def OnRspInquiryShareCalendar(self, pShareCalendar, pRspInfo, nRequestID, bIsPageLast, bIsTotalLast): """ 交易日历响应 @param pShareCalendar: @param pRspInfo: @param nRequestID: @param bIsPageLast: @param bIsTotalLast: @return: """ if nRequestID not in self.__temp_cache: self.__temp_cache[nRequestID] = [] if pShareCalendar: self.__temp_cache[nRequestID].append(pShareCalendar.TradingDay) else: self.__result_cache[nRequestID] = self.__temp_cache[nRequestID] self.__temp_cache.pop(nRequestID) print("OnRspInquiryShareCalendar:", self.__result_cache[nRequestID]) def OnRspInquiryStockDayQuotation(self, pStockDayQuotation, pRspInfo, nRequestID, bIsPageLast, bIsTotalLast): """ 日K响应 @param pStockDayQuotation: @param pRspInfo: @param nRequestID: @param bIsPageLast: @param bIsTotalLast: @return: """ if nRequestID not in self.__temp_cache: self.__temp_cache[nRequestID] = [] print("是否本页查询完毕:", bIsPageLast) if not bIsPageLast: self.__temp_cache[nRequestID].append({ "SecurityID": pStockDayQuotation.SecurityID, "TradingDay": pStockDayQuotation.TradingDay, "AdjustFactor": pStockDayQuotation.AdjustFactor, "PreClosePrice": pStockDayQuotation.PreClosePrice, "OpenPrice": pStockDayQuotation.OpenPrice, "HighPrice": pStockDayQuotation.HighPrice, "LowPrice": pStockDayQuotation.LowPrice, "ClosePrice": pStockDayQuotation.ClosePrice, "Volume": int(pStockDayQuotation.Volume * 100), "Turnover": int(pStockDayQuotation.Turnover * 1000) }) else: self.__result_cache[nRequestID] = self.__temp_cache[nRequestID] self.__temp_cache.pop(nRequestID) print("OnRspInquiryStockDayQuotation:", len(self.__result_cache[nRequestID])) def main(): # if (len(sys.argv)< 5): # ######运行命令行: # ###### ip地址 端口号 用户名 密码 # print("usage: ipaddress port userid passwd") # return global g_userid, g_passwd, g_address, g_port g_address = "101.230.90.99" g_port = 25556 g_userid = "388000013942" g_passwd = "110808" # 回测交易是由历史行情来驱动撮合成交: # 因此必须同时使用traderapi和mdapi,不能单独使用traderapi,并且mdapi至少需要订阅一个以上行情。 # 用户可使用回测traderapi的RegisterFront函数来注册此地址去连接上回测服务器 print("GetApiVersion():", qcvalueaddproapi.CQCValueAddProApi_GetApiVersion()) theapi = qcvalueaddproapi.CQCValueAddProApi_CreateInfoQryApi() thespi = sampleSpi(theapi) theapi.RegisterSpi(thespi) theapi.RegisterFront(g_address, g_port) theapi.Run() return if __name__ == '__main__': main()