Administrator
4 天以前 06fb51807f50dd274f2b2c78ed333edc6fb56814
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""
华鑫增值服务API
"""
import multiprocessing
import random
import threading
import time
 
from huaxin_client import l1_api_client
from utils import tool
 
__response_data = {}
TIMEOUT = 3
 
 
def __set_response_data(request_id, response_data):
    __response_data[request_id] = response_data
 
 
def __read_response(request_id, blocking=True, timeout=TIMEOUT):
    if blocking:
        start_time = time.time()
        try:
            while True:
                time.sleep(0.005)
                if request_id in __response_data:
                    # 获取到了响应内容
                    result = __response_data.pop(request_id)
                    return result
                if time.time() - start_time > timeout:
                    # 读取内容超时才会释放
                    raise Exception(f"读取内容超时: request_id={request_id}")
        finally:
            pass
    return None
 
 
def __set_response(response_queue: multiprocessing.Queue):
    """
    读取结果队列
    @param response_queue:
    @return:
    """
    while True:
        try:
            val = response_queue.get()
            request_id = val['request_id']
            response_data = val['data']
            __response_data[request_id] = response_data
        except:
            pass
 
 
def __get_request_id(type):
    return f"r_{type}_{round(time.time() * 10000)}_{random.randint(0, 100000)}"
 
 
def __base_request(type, data):
    request_id = __get_request_id(type)
    request_queue.put_nowait({"type": type, "request_id": request_id, "data": data})
    return request_id
 
 
def get_history_k_bars(code, count, end_date=tool.get_now_date_str()):
    """
    获取历史K线
    @param code:
    @param count:
    @param end_date:
    @return:
    """
    load_latest_trade_calendar()
    _count = -1
    start_date = None
    for i in range(len(__trade_calendar_list) - 1, -1, -1):
        if _count == -1:
            if int(end_date.replace("-", "")) >= int(__trade_calendar_list[i].replace("-", "")):
                # 获取起始位置
                _count = 1
                start_date = __trade_calendar_list[i]
        else:
            _count += 1
            start_date = __trade_calendar_list[i]
        if _count >= count:
            break
    request_id = __base_request("get_history_k_bars", {"start_date": start_date, "end_date": end_date, "code": code})
    return __read_response(request_id)
 
 
def get_trade_calendar(start_date, end_date):
    request_id = __base_request("get_trade_calendar", {"start_date": start_date, "end_date": end_date})
    results = __read_response(request_id)
    return [f"{x[:4]}-{x[4:6]}-{x[6:8]}" for x in results]
 
 
# 缓存交易日期:格式:{"日期":(上一个交易日, 下一个交易日),}
__trade_calendar_list = []
__trade_calendar_dict = {}
 
 
def load_latest_trade_calendar():
    """
    加载最近的交易日历
    @return:
    """
    if __trade_calendar_list or __trade_calendar_dict:
        return
    end_date = tool.get_now_date_str()
    days = get_trade_calendar(tool.date_sub(end_date, 365), end_date)
    for i in range(0, len(days)):
        if 0 < i < len(days) - 1:
            __trade_calendar_dict[days[i]] = (days[i - 1], days[i + 1])
        elif i == 0:
            __trade_calendar_dict[days[i]] = (None, days[i + 1])
        else:
            __trade_calendar_dict[days[i]] = (days[i - 1], None)
    __trade_calendar_list.clear()
    __trade_calendar_list.extend(days)
 
 
def get_previous_trading_date(day):
    """
    获取上一个交易日
    @param day:
    @return:
    """
    load_latest_trade_calendar()
    if day in __trade_calendar_dict:
        return __trade_calendar_dict.get(day)[0]
    return None
 
 
def get_next_trading_date(day):
    """
    获取下一个交易日
    @param day:
    @return:
    """
    load_latest_trade_calendar()
    if day in __trade_calendar_dict:
        return __trade_calendar_dict.get(day)[1]
    return None
 
 
request_queue = None
 
 
def run():
    global request_queue
    request_queue, response_queue = multiprocessing.Queue(maxsize=1024), multiprocessing.Queue(maxsize=1024)
    # 启动增值服务进程
    process = multiprocessing.Process(target=l1_api_client.run, args=(request_queue, response_queue,), daemon=True)
    process.start()
    __set_response(response_queue)
 
 
def __test():
    time.sleep(5)
    print(get_history_k_bars("000333", count=150))
    print(get_previous_trading_date("2024-12-31"))
    print(get_next_trading_date("2024-12-30"))
 
 
if __name__ == '__main__':
    threading.Thread(target=__test, daemon=True).start()
    run()