import logging
|
import multiprocessing
|
import threading
|
import time
|
|
import constant
|
import qcvalueaddproapi
|
from log_module.log import logger_system
|
from utils import tool
|
|
global g_userid, g_passwd, g_address, g_port, g_seqnum
|
g_seqnum = 100000
|
from datetime import datetime
|
|
|
def new_seqnum():
|
global g_seqnum
|
g_seqnum = g_seqnum + 1
|
return g_seqnum
|
|
|
class sampleSpi(qcvalueaddproapi.CQCValueAddProSpi):
|
__result_cache = {}
|
__temp_cache = {}
|
|
def __init__(self, t_tapi):
|
qcvalueaddproapi.CQCValueAddProSpi.__init__(self)
|
self.m_api = t_tapi
|
|
def __create_request_id(self):
|
return new_seqnum()
|
|
def __query_trade_calendar(self, start_date, end_date, page_locate=1, page_count=200):
|
"""
|
查询交易日历
|
@param start_date: 格式:20241201
|
@param end_date: 格式:20241201
|
@param page_locate:
|
@param page_count:
|
@return: 返回日期[顺序]
|
"""
|
try:
|
queryField = qcvalueaddproapi.CQCVDReqQryShareCalendarField()
|
queryField.BegDate = start_date
|
queryField.EndDate = end_date
|
queryField.PageCount = page_count
|
queryField.PageLocate = page_locate
|
queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_ASC
|
request_id = self.__create_request_id()
|
results = self.m_api.ReqReqQryShareCalendar(queryField, request_id)
|
for i in range(0, 1000):
|
if request_id in self.__result_cache:
|
return self.__result_cache[request_id]
|
time.sleep(0.002)
|
except Exception as e:
|
logging.exception(e)
|
return []
|
|
def queryTradeCalendar(self, start_date, end_date):
|
"""
|
查询交易日历
|
@return:
|
"""
|
fresults = []
|
try:
|
page_locate = 1
|
for i in range(20):
|
results = self.__query_trade_calendar(start_date.replace("-", ""), end_date.replace("-", ""),
|
page_locate=page_locate)
|
if results:
|
fresults.extend(results)
|
if len(results) != 200:
|
break
|
page_locate += 1
|
print("queryTradeCalendar:", len(fresults))
|
except Exception as e:
|
logging.exception(e)
|
return fresults
|
|
def __query_bars(self, code, begin_date, end_date, page_locate=1, page_count=200):
|
queryField = qcvalueaddproapi.CQCVDReqQryStockDayQuotationField()
|
|
queryField.BegDate = begin_date.replace("-", "")
|
queryField.EndDate = end_date.replace("-", "")
|
if tool.is_sh_code(code):
|
queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SSE
|
else:
|
queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SZSE
|
queryField.SecurityID = code
|
queryField.PageCount = page_count
|
queryField.PageLocate = page_locate
|
queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_DESC
|
|
request_id = self.__create_request_id()
|
results = self.m_api.ReqReqQryStockDayQuotation(queryField, request_id)
|
# 读取结果
|
for i in range(0, 1000):
|
# 读取结果
|
if request_id in self.__result_cache:
|
results = self.__result_cache[request_id]
|
return results
|
time.sleep(0.002)
|
return []
|
|
def queryBars(self, code, begin_date, end_date):
|
"""
|
获取历史K线(包含起止日期)
|
@param code:
|
@param begin_date: 例如:2024-10-31
|
@param end_date: 例如:2024-11-30
|
@param page_locate: 第几页(从第一页开始)
|
@return:
|
"""
|
try:
|
|
page_locate = 1
|
final_results = []
|
for i in range(20):
|
# 最多翻20页
|
results = self.__query_bars(code, begin_date, end_date, page_locate)
|
if results:
|
final_results.extend(results)
|
if len(results) != 200:
|
break
|
else:
|
page_locate += 1
|
|
# 最新的复权因子
|
start_adjust_factor = None
|
for i in range(0, len(final_results)):
|
d = final_results[i]
|
if not start_adjust_factor:
|
start_adjust_factor = d["AdjustFactor"]
|
# 复权价格
|
if start_adjust_factor != d["AdjustFactor"]:
|
# 开始复权
|
d["PreClosePrice"] = round(d["AdjustFactor"] * d["PreClosePrice"] / start_adjust_factor, 4)
|
d["OpenPrice"] = round(d["AdjustFactor"] * d["OpenPrice"] / start_adjust_factor, 4)
|
d["HighPrice"] = round(d["AdjustFactor"] * d["HighPrice"] / start_adjust_factor, 4)
|
d["LowPrice"] = round(d["AdjustFactor"] * d["LowPrice"] / start_adjust_factor, 4)
|
d["ClosePrice"] = round(d["AdjustFactor"] * d["ClosePrice"] / start_adjust_factor, 4)
|
fresults = []
|
for r in final_results:
|
fresults.append({"sec_id": r["SecurityID"],
|
"open": r["OpenPrice"],
|
"high": r["HighPrice"],
|
"low": r["LowPrice"],
|
"close": r["ClosePrice"],
|
"volume": r["Volume"],
|
"pre_close": r["PreClosePrice"],
|
"bob": datetime.strptime(r['TradingDay'], '%Y%m%d'),
|
"amount": r["Turnover"]
|
})
|
print("ReqReqQryStockDayQuotation:", len(fresults))
|
return fresults
|
|
except Exception as e:
|
logging.exception(e)
|
|
def OnFrontConnected(self):
|
print("OnFrontConnected")
|
# 连接上后去登录
|
loginfield = qcvalueaddproapi.CQCVDReqUserLoginField()
|
loginfield.LogInAccount = g_userid
|
loginfield.AuthMode = qcvalueaddproapi.QCVD_AM_Password
|
loginfield.Password = g_passwd
|
self.m_api.ReqUserLogin(loginfield, new_seqnum())
|
|
def OnFrontDisconnected(self, nReason):
|
print("OnFrontDisconnected Reason[%d]"
|
% (nReason))
|
|
# 登录请求响应
|
def OnRspUserLogin(self, pRspUserLoginField, pRspInfo, nRequestID, bIsLast):
|
print("OnRspUserLogin LogInAccount[%s] RequestID[%d] ErrorID[%d] ErrorMsg[%s] "
|
% (pRspUserLoginField.LogInAccount,
|
nRequestID,
|
pRspInfo.ErrorID,
|
pRspInfo.ErrorMsg))
|
if pRspInfo.ErrorID == 0:
|
logger_system.info("华鑫增值服务API登录成功")
|
# 登录成功后直接查询
|
# self.ReqInquiryHistoryDelivery()
|
# threading.Thread(target=lambda: print("交易日历:", self.queryTradeCalendar("2024-03-08", "2024-12-31"))).start()
|
# threading.Thread(
|
# target=lambda: print("日K:", self.queryBars("601298", "2024-12-15", "2024-12-31"))).start()
|
|
def ReqQryGGTEODPrices(self):
|
QryField = qcvalueaddproapi.CQCVDQryGGTEODPricesField()
|
self.m_api.ReqQryGGTEODPrices(QryField, new_seqnum())
|
|
def ReqQryInvestor(self):
|
QryField = qcvalueaddproapi.CQCVDQryInvestorField()
|
self.m_api.ReqQryInvestor(QryField, new_seqnum())
|
|
def OnRspInquiryShareCalendar(self, pShareCalendar, pRspInfo, nRequestID, bIsPageLast, bIsTotalLast):
|
"""
|
交易日历响应
|
@param pShareCalendar:
|
@param pRspInfo:
|
@param nRequestID:
|
@param bIsPageLast:
|
@param bIsTotalLast:
|
@return:
|
"""
|
if nRequestID not in self.__temp_cache:
|
self.__temp_cache[nRequestID] = []
|
|
if pShareCalendar:
|
self.__temp_cache[nRequestID].append(pShareCalendar.TradingDay)
|
else:
|
self.__result_cache[nRequestID] = self.__temp_cache[nRequestID]
|
self.__temp_cache.pop(nRequestID)
|
print("OnRspInquiryShareCalendar:", self.__result_cache[nRequestID])
|
|
def OnRspInquiryStockDayQuotation(self, pStockDayQuotation, pRspInfo, nRequestID, bIsPageLast, bIsTotalLast):
|
"""
|
日K响应
|
@param pStockDayQuotation:
|
@param pRspInfo:
|
@param nRequestID:
|
@param bIsPageLast:
|
@param bIsTotalLast:
|
@return:
|
"""
|
if nRequestID not in self.__temp_cache:
|
self.__temp_cache[nRequestID] = []
|
|
# print("是否本页查询完毕:", bIsPageLast)
|
|
if not bIsPageLast:
|
self.__temp_cache[nRequestID].append({
|
"SecurityID": pStockDayQuotation.SecurityID, "TradingDay": pStockDayQuotation.TradingDay,
|
"AdjustFactor": pStockDayQuotation.AdjustFactor, "PreClosePrice": pStockDayQuotation.PreClosePrice,
|
"OpenPrice": pStockDayQuotation.OpenPrice, "HighPrice": pStockDayQuotation.HighPrice,
|
"LowPrice": pStockDayQuotation.LowPrice,
|
"ClosePrice": pStockDayQuotation.ClosePrice, "Volume": int(pStockDayQuotation.Volume * 100),
|
"Turnover": int(pStockDayQuotation.Turnover * 1000)
|
})
|
else:
|
self.__result_cache[nRequestID] = self.__temp_cache[nRequestID]
|
self.__temp_cache.pop(nRequestID)
|
print("OnRspInquiryStockDayQuotation:", len(self.__result_cache[nRequestID]))
|
|
|
def __read_request(request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue):
|
|
def __set_response_data(request_id, response_data):
|
response_queue.put_nowait({"request_id": request_id, "data": response_data})
|
|
while True:
|
try:
|
val = request_queue.get()
|
type_ = val['type']
|
request_id = val['request_id']
|
data = val.get('data')
|
if type_ == 'get_trade_calendar':
|
# 获取交易日历
|
start_date = data['start_date']
|
end_date = data['end_date']
|
results = thespi.queryTradeCalendar(start_date, end_date)
|
__set_response_data(request_id, results)
|
elif type_ == 'get_history_k_bars':
|
# 获取历史K线
|
start_date = data['start_date']
|
end_date = data['end_date']
|
code = data['code']
|
results = thespi.queryBars(code, start_date, end_date)
|
__set_response_data(request_id, results)
|
except Exception as e:
|
pass
|
|
|
def run(request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue):
|
"""
|
运行
|
@param request_queue: 请求队列
|
@param response_queue: 响应队列
|
@return:
|
"""
|
global g_userid, g_passwd, g_address, g_port
|
g_address = "101.230.90.99"
|
g_port = 25556
|
g_userid = "388000013942"
|
g_passwd = "110808"
|
if not constant.is_windows():
|
# 内网
|
g_address = "192.168.84.61"
|
g_port = 25557
|
|
#IP(192.168.84.61)、端口(25557
|
|
|
|
# 回测交易是由历史行情来驱动撮合成交:
|
# 因此必须同时使用traderapi和mdapi,不能单独使用traderapi,并且mdapi至少需要订阅一个以上行情。
|
# 用户可使用回测traderapi的RegisterFront函数来注册此地址去连接上回测服务器
|
print("GetApiVersion():", qcvalueaddproapi.CQCValueAddProApi_GetApiVersion())
|
theapi = qcvalueaddproapi.CQCValueAddProApi_CreateInfoQryApi()
|
global thespi
|
thespi = sampleSpi(theapi)
|
theapi.RegisterSpi(thespi)
|
theapi.RegisterFront(g_address, g_port)
|
threading.Thread(target=__read_request, args=(request_queue, response_queue, ), daemon=True).start()
|
theapi.Run()
|
|
|
def main():
|
request_queue, response_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024)
|
run(request_queue, response_queue)
|
|
|
if __name__ == '__main__':
|
main()
|