Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
huaxin_client/l1_api_client.py
@@ -1,12 +1,16 @@
import logging
import multiprocessing
import threading
import time
import constant
import qcvalueaddproapi
from log_module.log import logger_system, logger_debug
from utils import tool
global g_userid, g_passwd, g_address, g_port, g_seqnum
g_seqnum = 100000
from datetime import datetime
def new_seqnum():
@@ -26,76 +30,130 @@
    def __create_request_id(self):
        return new_seqnum()
    def queryTradeCalendar(self):
    def __query_trade_calendar(self, start_date, end_date, page_locate=1, page_count=200):
        """
        查询交易日历
        @param start_date: 格式:20241201
        @param end_date: 格式:20241201
        @param page_locate:
        @param page_count:
        @return: 返回日期[顺序]
        """
        try:
            queryField = qcvalueaddproapi.CQCVDReqQryShareCalendarField()
            # queryField.BegDate = "20240704"
            queryField.EndDate = "20240904"
            queryField.PageCount = 10
            queryField.PageLocate = 1
            if queryField.BegDate:
                queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_ASC
            if queryField.EndDate:
                queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_DESC
            queryField.BegDate = start_date
            queryField.EndDate = end_date
            queryField.PageCount = page_count
            queryField.PageLocate = page_locate
            queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_ASC
            request_id = self.__create_request_id()
            results = self.m_api.ReqReqQryShareCalendar(queryField, request_id)
            for i in range(0, 1000):
                if request_id in self.__result_cache:
                    return self.__result_cache
                    return self.__result_cache[request_id]
                time.sleep(0.002)
            print("ReqReqQryShareCalendar:", results)
        except Exception as e:
            logging.exception(e)
        return []
    def queryTradeCalendar(self, start_date, end_date):
        """
        查询交易日历
        @return:
        """
        fresults = []
        try:
            page_locate = 1
            for i in range(20):
                results = self.__query_trade_calendar(start_date.replace("-", ""), end_date.replace("-", ""),
                                                      page_locate=page_locate)
                if results:
                    fresults.extend(results)
                if len(results) != 200:
                    break
                page_locate += 1
            print("queryTradeCalendar:", len(fresults))
        except Exception as e:
            logger_debug.exception(e)
        return fresults
    def __query_bars(self, code, begin_date, end_date, page_locate=1, page_count=200):
        queryField = qcvalueaddproapi.CQCVDReqQryStockDayQuotationField()
        queryField.BegDate = begin_date.replace("-", "")
        queryField.EndDate = end_date.replace("-", "")
        if tool.is_sh_code(code):
            queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SSE
        else:
            queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SZSE
        queryField.SecurityID = code
        queryField.PageCount = page_count
        queryField.PageLocate = page_locate
        queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_DESC
        request_id = self.__create_request_id()
        results = self.m_api.ReqReqQryStockDayQuotation(queryField, request_id)
        # 读取结果
        for i in range(0, 1000):
            # 读取结果
            if request_id in self.__result_cache:
                results = self.__result_cache[request_id]
                return results
            time.sleep(0.002)
        return []
    def queryBars(self, code, begin_date, end_date):
        """
        获取历史K线(包含起止日期)
        @param code:
        @param begin_date:  例如:2024-10-31
        @param end_date: 例如:2024-11-30
        @param page_locate: 第几页(从第一页开始)
        @return:
        """
        try:
            queryField = qcvalueaddproapi.CQCVDReqQryStockDayQuotationField()
            queryField.BegDate = begin_date.replace("-", "")
            queryField.EndDate = end_date.replace("-", "")
            if tool.is_sh_code(code):
                queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SSE
            else:
                queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SZSE
            queryField.SecurityID = code
            # queryField.PageCount = 10
            queryField.PageLocate = 1
            queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_DESC
            request_id = self.__create_request_id()
            results = self.m_api.ReqReqQryStockDayQuotation(queryField, request_id)
            for i in range(0, 1000):
                if request_id in self.__result_cache:
                    results = self.__result_cache[request_id]
                    # 最新的复权因子
                    start_adjust_factor = None
                    for i in range(0, len(results)):
                        d = results[i]
                        if not start_adjust_factor:
                            start_adjust_factor = d["AdjustFactor"]
                        # 复权价格
                        if start_adjust_factor != d["AdjustFactor"]:
                            # 开始复权
                            d["PreClosePrice"] = round(d["AdjustFactor"] * d["PreClosePrice"] / start_adjust_factor, 4)
                            d["OpenPrice"] = round(d["AdjustFactor"] * d["OpenPrice"] / start_adjust_factor, 4)
                            d["HighPrice"] = round(d["AdjustFactor"] * d["HighPrice"] / start_adjust_factor, 4)
                            d["LowPrice"] = round(d["AdjustFactor"] * d["LowPrice"] / start_adjust_factor, 4)
                            d["ClosePrice"] = round(d["AdjustFactor"] * d["ClosePrice"] / start_adjust_factor, 4)
                    fresults = []
                    for r in results:
                        fresults.append({"sec_id": r["SecurityID"],
                                         "open": r["TradingDay"],
                                         "high": r["HighPrice"],
                                         "low": r["LowPrice"],
                                         "close": r["ClosePrice"],
                                         "volume": r["Volume"],
                                         "pre_close": r["PreClosePrice"],
                                         "bob": f"{r['TradingDay'][:4]}-{r['TradingDay'][4:6]}-{r['TradingDay'][6:]} 00:00:00",
                                         "amount": r["Turnover"]
                                         })
                    return fresults
                time.sleep(0.002)
            print("ReqReqQryStockDayQuotation:", results)
            page_locate = 1
            final_results = []
            for i in range(20):
                # 最多翻20页
                results = self.__query_bars(code, begin_date, end_date, page_locate)
                if results:
                    final_results.extend(results)
                if len(results) != 200:
                    break
                else:
                    page_locate += 1
            # 最新的复权因子
            start_adjust_factor = None
            for i in range(0, len(final_results)):
                d = final_results[i]
                if not start_adjust_factor:
                    start_adjust_factor = d["AdjustFactor"]
                # 复权价格
                if start_adjust_factor != d["AdjustFactor"]:
                    # 开始复权
                    d["PreClosePrice"] = round(d["AdjustFactor"] * d["PreClosePrice"] / start_adjust_factor, 4)
                    d["OpenPrice"] = round(d["AdjustFactor"] * d["OpenPrice"] / start_adjust_factor, 4)
                    d["HighPrice"] = round(d["AdjustFactor"] * d["HighPrice"] / start_adjust_factor, 4)
                    d["LowPrice"] = round(d["AdjustFactor"] * d["LowPrice"] / start_adjust_factor, 4)
                    d["ClosePrice"] = round(d["AdjustFactor"] * d["ClosePrice"] / start_adjust_factor, 4)
            fresults = []
            for r in final_results:
                fresults.append({"sec_id": r["SecurityID"],
                                 "open": r["OpenPrice"],
                                 "high": r["HighPrice"],
                                 "low": r["LowPrice"],
                                 "close": r["ClosePrice"],
                                 "volume": r["Volume"],
                                 "pre_close": r["PreClosePrice"],
                                 "bob": datetime.strptime(r['TradingDay'], '%Y%m%d'),
                                 "amount": r["Turnover"]
                                 })
            print("ReqReqQryStockDayQuotation:", len(fresults))
            return fresults
        except Exception as e:
            logging.exception(e)
@@ -119,19 +177,27 @@
                 nRequestID,
                 pRspInfo.ErrorID,
                 pRspInfo.ErrorMsg))
        if (pRspInfo.ErrorID == 0):
        if pRspInfo.ErrorID == 0:
            logger_system.info("华鑫增值服务API登录成功")
            # 登录成功后直接查询
            # self.ReqInquiryHistoryDelivery()
            # threading.Thread(target=lambda : print("交易日历:", self.queryTradeCalendar())).start()
            threading.Thread(target=lambda: print("日K:", self.queryBars())).start()
            # threading.Thread(target=lambda: print("交易日历:", self.queryTradeCalendar("2024-03-08", "2024-12-31"))).start()
            # threading.Thread(
            #     target=lambda: print("日K:", self.queryBars("601298", "2024-12-15", "2024-12-31"))).start()
    def ReqQryGGTEODPrices(self):
        QryField = qcvalueaddproapi.CQCVDQryGGTEODPricesField()
        self.m_api.ReqQryGGTEODPrices(QryField, new_seqnum())
        try:
            QryField = qcvalueaddproapi.CQCVDQryGGTEODPricesField()
            self.m_api.ReqQryGGTEODPrices(QryField, new_seqnum())
        except:
            pass
    def ReqQryInvestor(self):
        QryField = qcvalueaddproapi.CQCVDQryInvestorField()
        self.m_api.ReqQryInvestor(QryField, new_seqnum())
        try:
            QryField = qcvalueaddproapi.CQCVDQryInvestorField()
            self.m_api.ReqQryInvestor(QryField, new_seqnum())
        except:
            pass
    def OnRspInquiryShareCalendar(self, pShareCalendar, pRspInfo, nRequestID, bIsPageLast, bIsTotalLast):
        """
@@ -145,13 +211,15 @@
        """
        if nRequestID not in self.__temp_cache:
            self.__temp_cache[nRequestID] = []
        if pShareCalendar:
            self.__temp_cache[nRequestID].append(pShareCalendar.TradingDay)
        else:
            self.__result_cache[nRequestID] = self.__temp_cache[nRequestID]
            self.__temp_cache.pop(nRequestID)
            print("OnRspInquiryShareCalendar:", self.__result_cache[nRequestID])
        try:
            if pShareCalendar:
                self.__temp_cache[nRequestID].append(pShareCalendar.TradingDay)
            else:
                self.__result_cache[nRequestID] = self.__temp_cache[nRequestID]
                self.__temp_cache.pop(nRequestID)
                print("OnRspInquiryShareCalendar:", self.__result_cache[nRequestID])
        except:
            pass
    def OnRspInquiryStockDayQuotation(self, pStockDayQuotation, pRspInfo, nRequestID, bIsPageLast, bIsTotalLast):
        """
@@ -166,45 +234,91 @@
        if nRequestID not in self.__temp_cache:
            self.__temp_cache[nRequestID] = []
        print("是否本页查询完毕:", bIsPageLast)
        # print("是否本页查询完毕:", bIsPageLast)
        try:
        if not bIsPageLast:
            self.__temp_cache[nRequestID].append({
                "SecurityID": pStockDayQuotation.SecurityID, "TradingDay": pStockDayQuotation.TradingDay,
                "AdjustFactor": pStockDayQuotation.AdjustFactor, "PreClosePrice": pStockDayQuotation.PreClosePrice,
                "OpenPrice": pStockDayQuotation.OpenPrice, "HighPrice": pStockDayQuotation.HighPrice,
                "LowPrice": pStockDayQuotation.LowPrice,
                "ClosePrice": pStockDayQuotation.ClosePrice, "Volume": int(pStockDayQuotation.Volume * 100),
                "Turnover": int(pStockDayQuotation.Turnover * 1000)
            })
        else:
            self.__result_cache[nRequestID] = self.__temp_cache[nRequestID]
            self.__temp_cache.pop(nRequestID)
            print("OnRspInquiryStockDayQuotation:", len(self.__result_cache[nRequestID]))
            if not bIsPageLast:
                self.__temp_cache[nRequestID].append({
                    "SecurityID": pStockDayQuotation.SecurityID, "TradingDay": pStockDayQuotation.TradingDay,
                    "AdjustFactor": pStockDayQuotation.AdjustFactor, "PreClosePrice": pStockDayQuotation.PreClosePrice,
                    "OpenPrice": pStockDayQuotation.OpenPrice, "HighPrice": pStockDayQuotation.HighPrice,
                    "LowPrice": pStockDayQuotation.LowPrice,
                    "ClosePrice": pStockDayQuotation.ClosePrice, "Volume": int(pStockDayQuotation.Volume * 100),
                    "Turnover": int(pStockDayQuotation.Turnover * 1000)
                })
            else:
                self.__result_cache[nRequestID] = self.__temp_cache[nRequestID]
                self.__temp_cache.pop(nRequestID)
                print("OnRspInquiryStockDayQuotation:", len(self.__result_cache[nRequestID]))
        except:
            pass
def main():
    # if (len(sys.argv)< 5):
    #     ######运行命令行:
    #     ###### ip地址  端口号 用户名 密码
    #     print("usage: ipaddress port userid passwd")
    #     return
def __read_request(request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue):
    def __set_response_data(request_id, response_data):
        response_queue.put_nowait({"request_id": request_id, "data": response_data})
    while True:
        try:
            val = request_queue.get()
            type_ = val['type']
            request_id = val['request_id']
            data = val.get('data')
            if type_ == 'get_trade_calendar':
                # 获取交易日历
                start_date = data['start_date']
                end_date = data['end_date']
                results = thespi.queryTradeCalendar(start_date, end_date)
                __set_response_data(request_id, results)
            elif type_ == 'get_history_k_bars':
                # 获取历史K线
                start_date = data['start_date']
                end_date = data['end_date']
                code = data['code']
                results = thespi.queryBars(code, start_date, end_date)
                __set_response_data(request_id, results)
        except Exception as e:
            pass
def run(request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue):
    """
    运行
    @param request_queue: 请求队列
    @param response_queue: 响应队列
    @return:
    """
    global g_userid, g_passwd, g_address, g_port
    g_address = "101.230.90.99"
    g_port = 25556
    g_userid = "388000013942"
    g_passwd = "110808"
    if not constant.is_windows():
        # 内网
        g_address = "192.168.84.61"
        g_port = 25557
    #IP(192.168.84.61)、端口(25557
    # 回测交易是由历史行情来驱动撮合成交:
    # 因此必须同时使用traderapi和mdapi,不能单独使用traderapi,并且mdapi至少需要订阅一个以上行情。
    # 用户可使用回测traderapi的RegisterFront函数来注册此地址去连接上回测服务器
    print("GetApiVersion():", qcvalueaddproapi.CQCValueAddProApi_GetApiVersion())
    theapi = qcvalueaddproapi.CQCValueAddProApi_CreateInfoQryApi()
    global  thespi
    thespi = sampleSpi(theapi)
    theapi.RegisterSpi(thespi)
    theapi.RegisterFront(g_address, g_port)
    threading.Thread(target=__read_request, args=(request_queue, response_queue, ), daemon=True).start()
    theapi.Run()
    return
def main():
    request_queue, response_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024)
    run(request_queue, response_queue)
if __name__ == '__main__':