Administrator
2024-12-31 e1057fa87cbda64ec7bf88db4d944d1ee515b853
将掘金的交易日历/K线替换为华鑫
7个文件已修改
281 ■■■■■ 已修改文件
api/outside_api_command_callback.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/zyltgb_util.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
huaxin_client/l1_api_client.py 230 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/history_k_data_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/history_k_data_util.py 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/buy_radical/radical_buy_data_manager.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/outside_api_command_callback.py
@@ -487,7 +487,7 @@
        try:
            fdata = {}
            try:
                date = JueJinApi.get_previous_trading_date(tool.get_now_date_str())
                date = HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
                if date:
                    fdata["juejin"] = 1
            except Exception as e:
code_attribute/zyltgb_util.py
@@ -12,8 +12,8 @@
    @return:
    """
    # 判断当前是否是交易日
    previous_trading_date = history_k_data_util.JueJinApi.get_previous_trading_date(tool.get_now_date_str())
    if history_k_data_util.JueJinApi.get_next_trading_date(previous_trading_date) != tool.get_now_date_str():
    previous_trading_date = history_k_data_util.HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
    if history_k_data_util.HistoryKDatasUtils.get_next_trading_date(previous_trading_date) != tool.get_now_date_str():
        # 非交易日
        return 0
@@ -44,8 +44,8 @@
if __name__ == "__main__":
    previous_trading_date = history_k_data_util.JueJinApi.get_previous_trading_date(tool.get_now_date_str())
    if history_k_data_util.JueJinApi.get_next_trading_date(previous_trading_date) != tool.get_now_date_str():
    previous_trading_date = history_k_data_util.HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
    if history_k_data_util.HistoryKDatasUtils.get_next_trading_date(previous_trading_date) != tool.get_now_date_str():
        print("非交易日")
    else:
        print("交易日")
huaxin_client/l1_api_client.py
@@ -1,12 +1,15 @@
import logging
import multiprocessing
import threading
import time
import qcvalueaddproapi
from log_module.log import logger_system
from utils import tool
global g_userid, g_passwd, g_address, g_port, g_seqnum
g_seqnum = 100000
from datetime import datetime
def new_seqnum():
@@ -26,76 +29,130 @@
    def __create_request_id(self):
        return new_seqnum()
    def queryTradeCalendar(self):
    def __query_trade_calendar(self, start_date, end_date, page_locate=1, page_count=200):
        """
        查询交易日历
        @param start_date: 格式:20241201
        @param end_date: 格式:20241201
        @param page_locate:
        @param page_count:
        @return: 返回日期[顺序]
        """
        try:
            queryField = qcvalueaddproapi.CQCVDReqQryShareCalendarField()
            # queryField.BegDate = "20240704"
            queryField.EndDate = "20240904"
            queryField.PageCount = 10
            queryField.PageLocate = 1
            if queryField.BegDate:
                queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_ASC
            if queryField.EndDate:
                queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_DESC
            queryField.BegDate = start_date
            queryField.EndDate = end_date
            queryField.PageCount = page_count
            queryField.PageLocate = page_locate
            queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_ASC
            request_id = self.__create_request_id()
            results = self.m_api.ReqReqQryShareCalendar(queryField, request_id)
            for i in range(0, 1000):
                if request_id in self.__result_cache:
                    return self.__result_cache
                    return self.__result_cache[request_id]
                time.sleep(0.002)
            print("ReqReqQryShareCalendar:", results)
        except Exception as e:
            logging.exception(e)
        return []
    def queryTradeCalendar(self, start_date, end_date):
        """
        查询交易日历
        @return:
        """
        fresults = []
        try:
            page_locate = 1
            for i in range(20):
                results = self.__query_trade_calendar(start_date.replace("-", ""), end_date.replace("-", ""),
                                                      page_locate=page_locate)
                if results:
                    fresults.extend(results)
                if len(results) != 200:
                    break
                page_locate += 1
            print("queryTradeCalendar:", len(fresults))
        except Exception as e:
            logging.exception(e)
        return fresults
    def __query_bars(self, code, begin_date, end_date, page_locate=1, page_count=200):
        queryField = qcvalueaddproapi.CQCVDReqQryStockDayQuotationField()
        queryField.BegDate = begin_date.replace("-", "")
        queryField.EndDate = end_date.replace("-", "")
        if tool.is_sh_code(code):
            queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SSE
        else:
            queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SZSE
        queryField.SecurityID = code
        queryField.PageCount = page_count
        queryField.PageLocate = page_locate
        queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_DESC
        request_id = self.__create_request_id()
        results = self.m_api.ReqReqQryStockDayQuotation(queryField, request_id)
        # 读取结果
        for i in range(0, 1000):
            # 读取结果
            if request_id in self.__result_cache:
                results = self.__result_cache[request_id]
                return results
            time.sleep(0.002)
        return []
    def queryBars(self, code, begin_date, end_date):
        """
        获取历史K线(包含起止日期)
        @param code:
        @param begin_date:  例如:2024-10-31
        @param end_date: 例如:2024-11-30
        @param page_locate: 第几页(从第一页开始)
        @return:
        """
        try:
            queryField = qcvalueaddproapi.CQCVDReqQryStockDayQuotationField()
            queryField.BegDate = begin_date.replace("-", "")
            queryField.EndDate = end_date.replace("-", "")
            if tool.is_sh_code(code):
                queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SSE
            else:
                queryField.ExchangeID = qcvalueaddproapi.QCVD_EXD_SZSE
            queryField.SecurityID = code
            # queryField.PageCount = 10
            queryField.PageLocate = 1
            queryField.OrderType = qcvalueaddproapi.QCVD_ORDST_DESC
            request_id = self.__create_request_id()
            results = self.m_api.ReqReqQryStockDayQuotation(queryField, request_id)
            for i in range(0, 1000):
                if request_id in self.__result_cache:
                    results = self.__result_cache[request_id]
                    # 最新的复权因子
                    start_adjust_factor = None
                    for i in range(0, len(results)):
                        d = results[i]
                        if not start_adjust_factor:
                            start_adjust_factor = d["AdjustFactor"]
                        # 复权价格
                        if start_adjust_factor != d["AdjustFactor"]:
                            # 开始复权
                            d["PreClosePrice"] = round(d["AdjustFactor"] * d["PreClosePrice"] / start_adjust_factor, 4)
                            d["OpenPrice"] = round(d["AdjustFactor"] * d["OpenPrice"] / start_adjust_factor, 4)
                            d["HighPrice"] = round(d["AdjustFactor"] * d["HighPrice"] / start_adjust_factor, 4)
                            d["LowPrice"] = round(d["AdjustFactor"] * d["LowPrice"] / start_adjust_factor, 4)
                            d["ClosePrice"] = round(d["AdjustFactor"] * d["ClosePrice"] / start_adjust_factor, 4)
                    fresults = []
                    for r in results:
                        fresults.append({"sec_id": r["SecurityID"],
                                         "open": r["TradingDay"],
                                         "high": r["HighPrice"],
                                         "low": r["LowPrice"],
                                         "close": r["ClosePrice"],
                                         "volume": r["Volume"],
                                         "pre_close": r["PreClosePrice"],
                                         "bob": f"{r['TradingDay'][:4]}-{r['TradingDay'][4:6]}-{r['TradingDay'][6:]} 00:00:00",
                                         "amount": r["Turnover"]
                                         })
                    return fresults
                time.sleep(0.002)
            print("ReqReqQryStockDayQuotation:", results)
            page_locate = 1
            final_results = []
            for i in range(20):
                # 最多翻20页
                results = self.__query_bars(code, begin_date, end_date, page_locate)
                if results:
                    final_results.extend(results)
                if len(results) != 200:
                    break
                else:
                    page_locate += 1
            # 最新的复权因子
            start_adjust_factor = None
            for i in range(0, len(final_results)):
                d = final_results[i]
                if not start_adjust_factor:
                    start_adjust_factor = d["AdjustFactor"]
                # 复权价格
                if start_adjust_factor != d["AdjustFactor"]:
                    # 开始复权
                    d["PreClosePrice"] = round(d["AdjustFactor"] * d["PreClosePrice"] / start_adjust_factor, 4)
                    d["OpenPrice"] = round(d["AdjustFactor"] * d["OpenPrice"] / start_adjust_factor, 4)
                    d["HighPrice"] = round(d["AdjustFactor"] * d["HighPrice"] / start_adjust_factor, 4)
                    d["LowPrice"] = round(d["AdjustFactor"] * d["LowPrice"] / start_adjust_factor, 4)
                    d["ClosePrice"] = round(d["AdjustFactor"] * d["ClosePrice"] / start_adjust_factor, 4)
            fresults = []
            for r in final_results:
                fresults.append({"sec_id": r["SecurityID"],
                                 "open": r["OpenPrice"],
                                 "high": r["HighPrice"],
                                 "low": r["LowPrice"],
                                 "close": r["ClosePrice"],
                                 "volume": r["Volume"],
                                 "pre_close": r["PreClosePrice"],
                                 "bob": datetime.strptime(r['TradingDay'], '%Y%m%d'),
                                 "amount": r["Turnover"]
                                 })
            print("ReqReqQryStockDayQuotation:", len(fresults))
            return fresults
        except Exception as e:
            logging.exception(e)
@@ -119,11 +176,13 @@
                 nRequestID,
                 pRspInfo.ErrorID,
                 pRspInfo.ErrorMsg))
        if (pRspInfo.ErrorID == 0):
        if pRspInfo.ErrorID == 0:
            logger_system.info("华鑫增值服务API登录成功")
            # 登录成功后直接查询
            # self.ReqInquiryHistoryDelivery()
            # threading.Thread(target=lambda : print("交易日历:", self.queryTradeCalendar())).start()
            threading.Thread(target=lambda: print("日K:", self.queryBars())).start()
            # threading.Thread(target=lambda: print("交易日历:", self.queryTradeCalendar("2024-03-08", "2024-12-31"))).start()
            # threading.Thread(
            #     target=lambda: print("日K:", self.queryBars("601298", "2024-12-15", "2024-12-31"))).start()
    def ReqQryGGTEODPrices(self):
        QryField = qcvalueaddproapi.CQCVDQryGGTEODPricesField()
@@ -166,7 +225,7 @@
        if nRequestID not in self.__temp_cache:
            self.__temp_cache[nRequestID] = []
        print("是否本页查询完毕:", bIsPageLast)
        # print("是否本页查询完毕:", bIsPageLast)
        if not bIsPageLast:
            self.__temp_cache[nRequestID].append({
@@ -183,12 +242,41 @@
            print("OnRspInquiryStockDayQuotation:", len(self.__result_cache[nRequestID]))
def main():
    # if (len(sys.argv)< 5):
    #     ######运行命令行:
    #     ###### ip地址  端口号 用户名 密码
    #     print("usage: ipaddress port userid passwd")
    #     return
def __read_request(request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue):
    def __set_response_data(request_id, response_data):
        response_queue.put_nowait({"request_id": request_id, "data": response_data})
    while True:
        try:
            val = request_queue.get()
            type_ = val['type']
            request_id = val['request_id']
            data = val.get('data')
            if type_ == 'get_trade_calendar':
                # 获取交易日历
                start_date = data['start_date']
                end_date = data['end_date']
                results = thespi.queryTradeCalendar(start_date, end_date)
                __set_response_data(request_id, results)
            elif type_ == 'get_history_k_bars':
                # 获取历史K线
                start_date = data['start_date']
                end_date = data['end_date']
                code = data['code']
                results = thespi.queryBars(code, start_date, end_date)
                __set_response_data(request_id, results)
        except:
            pass
def run(request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue):
    """
    运行
    @param request_queue: 请求队列
    @param response_queue: 响应队列
    @return:
    """
    global g_userid, g_passwd, g_address, g_port
    g_address = "101.230.90.99"
    g_port = 25556
@@ -200,11 +288,17 @@
    # 用户可使用回测traderapi的RegisterFront函数来注册此地址去连接上回测服务器
    print("GetApiVersion():", qcvalueaddproapi.CQCValueAddProApi_GetApiVersion())
    theapi = qcvalueaddproapi.CQCValueAddProApi_CreateInfoQryApi()
    global  thespi
    thespi = sampleSpi(theapi)
    theapi.RegisterSpi(thespi)
    theapi.RegisterFront(g_address, g_port)
    threading.Thread(target=__read_request, args=(request_queue, response_queue, ), daemon=True).start()
    theapi.Run()
    return
def main():
    request_queue, response_queue = multiprocessing.Queue(), multiprocessing.Queue()
    run(request_queue, response_queue)
if __name__ == '__main__':
main.py
@@ -12,6 +12,7 @@
import threading
from task import task_manager
from third_data import hx_qc_value_util
logger_system.info("程序启动Pre:{}", os.getpid())
@@ -122,6 +123,9 @@
        threading.Thread(target=huaxin_client.l2_client.run, args=(
            queue_other_w_l2_r, huaxin_trade_server.my_l2_data_callbacks), daemon=True).start()
        # 运行华鑫增值服务进程
        threading.Thread(target=hx_qc_value_util.run, daemon=True).start()
        # 绑核运行
        # psutil.Process(l1Process.pid).cpu_affinity([20])
        # psutil.Process(tradeProcess.pid).cpu_affinity([21, 22])
third_data/history_k_data_manager.py
@@ -29,7 +29,7 @@
            except Exception as e:
                logger_debug.exception(e)
    previous_trading_date = history_k_data_util.JueJinApi.get_previous_trading_date(tool.get_now_date_str())
    previous_trading_date = history_k_data_util.HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
    if previous_trading_date is None:
        raise Exception("上一个交易日获取失败")
    # 刷新目标代码的自由流通量
third_data/history_k_data_util.py
@@ -11,6 +11,7 @@
import constant
from db.redis_manager_delegate import RedisUtils
from log_module.log import logger_request_api
from third_data import hx_qc_value_util
from utils import tool, middle_api_protocol
from db import redis_manager_delegate as redis_manager
@@ -232,7 +233,8 @@
    @classmethod
    def get_history_tick_n(cls, code, count, fields=None):
        return JueJinApi.get_history_tick_n(code, count, fields)
        # return JueJinApi.get_history_tick_n(code, count, fields)
        return hx_qc_value_util.get_history_k_bars(code, count)
    @classmethod
    def get_gp_current_info(cls, codes):
@@ -241,7 +243,7 @@
    # 返回指定日期的上个交易日
    @classmethod
    def get_previous_trading_date(cls, date):
        return JueJinApi.get_previous_trading_date(date)
        return hx_qc_value_util.get_previous_trading_date(date)
    @classmethod
    def get_previous_trading_date_cache(cls, date):
@@ -275,11 +277,12 @@
    # 返回指定日期的下个交易日
    @classmethod
    def get_next_trading_date(cls, date):
        return JueJinApi.get_next_trading_date(date)
        # return JueJinApi.get_next_trading_date(date)
        return hx_qc_value_util.get_next_trading_date(date)
    @classmethod
    def get_trading_dates(cls, start_date, end_date):
        return JueJinApi.get_trading_dates(start_date, end_date)
        return hx_qc_value_util.get_trade_calendar(start_date, end_date)
    @classmethod
    def get_now_price(cls, codes):
@@ -334,13 +337,16 @@
if __name__ == "__main__":
    now_day = tool.get_now_date_str()
    results = JueJinApi.get_history_instruments(JueJinApi.get_juejin_code_list_with_prefix(["600265"]),
                                                tool.date_sub(now_day, 30), tool.date_sub(now_day, 1))
    results = results[-5:]
    normal = True
    for r in results:
        if r["sec_level"] != 1:
            normal = False
            break
    print(HistoryKDatasUtils.get_previous_trading_date("2024-12-31"))
    print(HistoryKDatasUtils.get_history_tick_n("000095", 10))
    # now_day = tool.get_now_date_str()
    # results = JueJinApi.get_history_instruments(JueJinApi.get_juejin_code_list_with_prefix(["600265"]),
    #                                             tool.date_sub(now_day, 30), tool.date_sub(now_day, 1))
    # results = results[-5:]
    # normal = True
    # for r in results:
    #     if r["sec_level"] != 1:
    #         normal = False
    #         break
    # print(normal)
trade/buy_radical/radical_buy_data_manager.py
@@ -1333,6 +1333,7 @@
    def open_limit_up(cls, code):
        if code in cls.__deal_big_order_infos_dict:
            cls.__deal_big_order_infos_dict[code].clear()
            async_log_util.info(logger_l2_radical_buy_data, f"清除每次涨停大单数据:{code}")
    @classmethod
    def clear(cls, code):
@@ -1356,6 +1357,8 @@
            if order_info[0] not in cls.__deal_big_order_no_dict[code]:
                cls.__deal_big_order_infos_dict[code].append(order_info)
                cls.__deal_big_order_no_dict[code].add(order_info[0])
                async_log_util.info(logger_l2_radical_buy_data, f"添加每次上板的大单成交:{code}-{order_info}")
    @classmethod
    def get_big_buy_deal_order_count(cls, code):