1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""
华鑫增值服务API
"""
import multiprocessing
import random
import threading
import time
 
from huaxin_client import l1_api_client
from log_module.log import logger_debug
from strategy import data_cache
from utils import tool
 
__response_data = {}
TIMEOUT = 3
 
 
def __set_response_data(request_id, response_data):
    __response_data[request_id] = response_data
 
 
def __read_response(request_id, blocking=True, timeout=TIMEOUT):
    if blocking:
        start_time = time.time()
        try:
            while True:
                time.sleep(0.005)
                if request_id in __response_data:
                    # 获取到了响应内容
                    result = __response_data.pop(request_id)
                    return result
                if time.time() - start_time > timeout:
                    # 读取内容超时才会释放
                    raise Exception(f"读取内容超时: request_id={request_id}")
        finally:
            pass
    return None
 
 
def __set_response(response_queue: multiprocessing.Queue):
    """
    读取结果队列
    @param response_queue:
    @return:
    """
    while True:
        try:
            val = response_queue.get()
            request_id = val['request_id']
            response_data = val['data']
            __response_data[request_id] = response_data
        except:
            pass
 
 
def __get_request_id(type):
    return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
 
 
def __base_request(type, data):
    request_id = __get_request_id(type)
    request_queue.put_nowait({"type": type, "request_id": request_id, "data": data})
    return request_id
 
 
def get_history_k_bars(code, count, end_date=tool.get_now_date_str()):
    """
    获取历史K线
    @param code:
    @param count:
    @param end_date:
    @return:
    """
    if not end_date:
        end_date = tool.get_now_date_str()
    load_latest_trade_calendar()
    _count = -1
    start_date = None
    for i in range(len(__trade_calendar_list) - 1, -1, -1):
        if _count == -1:
            if int(end_date.replace("-", "")) >= int(__trade_calendar_list[i].replace("-", "")):
                # 获取起始位置
                _count = 1
                start_date = __trade_calendar_list[i]
        else:
            _count += 1
            start_date = __trade_calendar_list[i]
        if _count >= count:
            break
    request_id = __base_request("get_history_k_bars", {"start_date": start_date, "end_date": end_date, "code": code})
    return __read_response(request_id)
 
 
def get_trade_calendar(start_date, end_date):
    request_id = __base_request("get_trade_calendar", {"start_date": start_date, "end_date": end_date})
    results = __read_response(request_id)
    return [f"{x[:4]}-{x[4:6]}-{x[6:8]}" for x in results]
 
 
# 缓存交易日期:格式:{"日期":(上一个交易日, 下一个交易日),}
__trade_calendar_list = []
__trade_calendar_dict = {}
 
 
def load_latest_trade_calendar():
    """
    加载最近的交易日历
    @return:
    """
    if __trade_calendar_list or __trade_calendar_dict:
        return
    end_date = tool.get_now_date_str()
    end_date = tool.date_sub(end_date, -30)
    days = get_trade_calendar(tool.date_sub(end_date, 365), end_date)
    logger_debug.info(f"华鑫API获取交易日历结果:{days}")
    for i in range(0, len(days)):
        if 0 < i < len(days) - 1:
            __trade_calendar_dict[days[i]] = (days[i - 1], days[i + 1])
        elif i == 0:
            __trade_calendar_dict[days[i]] = (None, days[i + 1])
        else:
            __trade_calendar_dict[days[i]] = (days[i - 1], None)
    __trade_calendar_list.clear()
    __trade_calendar_list.extend(days)
 
 
def get_previous_trading_date(day):
    """
    获取上一个交易日
    @param day:
    @return:
    """
    load_latest_trade_calendar()
    if day in __trade_calendar_dict:
        return __trade_calendar_dict.get(day)[0]
    return None
 
 
def get_next_trading_date(day):
    """
    获取下一个交易日
    @param day:
    @return:
    """
    load_latest_trade_calendar()
    if day in __trade_calendar_dict:
        return __trade_calendar_dict.get(day)[1]
    return None
 
 
def __read_callback_data(data_callback_queue: multiprocessing.Queue):
    """
    读取数据回调
    :param data_callback_queue:
    :return:
    """
    while True:
        try:
            data = data_callback_queue.get()
            type_ = data[0]
            data = data[1]
            if type_ == "stock_index_datas":
                for k in data:
                    """
                   值的格式为:
                    {
                       "PreClosePrice":pStockIndexData,
                       "LastPrice": pStockIndexData.LastPrice,
                       "SecurityID": pStockIndexData.SecurityID,
                       "UpdateTime": pStockIndexData.UpdateTime,
                       "Volume": pStockIndexData.Volume,
                       "Turnover": pStockIndexData.Turnover,
                       "LXLastPrice": pStockIndexData.LXLastPrice,
                   }
                   """
                    d = data[k]
                    data_cache.stock_index_dict[k] = (round(d["LastPrice"], 2), d["Volume"], d["Turnover"], d["PreClosePrice"])
        except:
            pass
 
 
def run():
    global request_queue
    request_queue, response_queue, data_callback_queue = multiprocessing.Queue(), multiprocessing.Queue(), multiprocessing.Queue()
    # 启动增值服务进程
    process = multiprocessing.Process(target=l1_api_client.run,
                                      args=(request_queue, response_queue, data_callback_queue,), daemon=True)
    process.start()
 
    # 读取数据回调
    threading.Thread(target=__read_callback_data, args=(data_callback_queue,), daemon=True).start()
 
    __set_response(response_queue)
 
 
def __test():
    time.sleep(5)
    print(get_history_k_bars("000333", count=150))
    print(get_previous_trading_date("2024-12-31"))
    print(get_next_trading_date("2024-12-30"))
 
 
if __name__ == '__main__':
    threading.Thread(target=__test, daemon=True).start()
    run()