admin
2024-06-28 e0b77a03d87eb9bafd08e35492f918b5b8b0fbb5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
import multiprocessing
import random
import threading
import time
 
from local_api import juejin
from local_api.code_block_manager import CodeKPLBlockManager
from local_api.limit_up_manager import KPLLimitUpDataManager
from local_api.log_module import async_log_util
from local_api.log_module.log import logger_print
from local_api.tick_data_manager import TickQueueManager
from local_api.util import event_type, l1_data_api
import concurrent.futures
 
from local_api.util.juejin_util import JueJinApi
 
 
class APICallback:
 
    def OnJueJinInited(self):
        """
        掘金初始化完成
        :return:
        """
        pass
 
    def OnTick(self, tick, blocks: set, limit_up_list: list):
        """
        Tick回调
        :param tick:
        :param blocks: 代码的板块: 例如{"光伏"}
        :param limit_up_list: 所有涨停
        :return:
        """
        pass
 
    def OnTrade(self, trade_info):
        """
        交易回调:当有成交或挂单时回调
        :param trade_info:
        :return:
        """
        pass
 
    def OnLimitUpListUpdated(self, limit_up_list):
        """
        涨停列表更新回调
        :param limit_up_list:
        :return:
        """
 
 
class __TradeApi:
    def __init__(self):
        self.__result_dict = {}
        self.__not_request_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
 
    def init(self, command_queue: multiprocessing.Queue, result_queue: multiprocessing.Queue,
             api_callback: APICallback):
        self.command_queue = command_queue
        self.result_queue = result_queue
        self.api_callback = api_callback
 
    @classmethod
    def __create_request_id(cls, type):
        return f"request_id-{type}-{int(time.time() * 1000000)}-{random.randint(0, 100000)}"
 
    def __read_result(self, request_id, timeout=3.0):
        """
        读取结果
        :param request_id:
        :param timeout: 超时时间
        :return:
        """
        start_time = time.time()
        # 最大超时时间50s
        for i in range(10000):
            if request_id in self.__result_dict:
                return self.__result_dict.pop(request_id)
            time.sleep(0.005)
            if time.time() - start_time > timeout:
                raise Exception("读取结果超时")
        raise Exception("未获取到请求结果")
 
    def buy(self, code, volume, price, blocking=True):
        """
        买入下单
        :param code:
        :param volume:
        :param price:
        :param blocking: True表示需要等待结果
        :return:
        """
        commands = ("buy", {"code": code, "volume": volume, "price": price}, self.__create_request_id("buy"))
        self.command_queue.put_nowait(commands)
        if not blocking:
            return
        return self.__read_result(commands[2])
 
    def sell(self, code, volume, price, blocking=True):
        """
        买入下单
        :param code:
        :param volume:
        :param price:
        :param blocking: True表示需要等待结果
        :return:
        """
        commands = ("sell", {"code": code, "volume": volume, "price": price}, self.__create_request_id("sell"))
        self.command_queue.put_nowait(commands)
        if not blocking:
            return
        return self.__read_result(commands[2])
 
    def positions(self, blocking=True):
        """
        买入下单
        :param code:
        :param volume:
        :param price:
        :param blocking: True表示需要等待结果
        :return:
        """
        commands = ("position", {}, self.__create_request_id("position"))
        self.command_queue.put_nowait(commands)
        if not blocking:
            return
        return self.__read_result(commands[2], timeout=10)
 
    def position_by_symbol(self, symbol, blocking=True):
        commands = ("get_position_by_symbol", {"symbol": symbol}, self.__create_request_id("get_position_by_symbol"))
        self.command_queue.put_nowait(commands)
        if not blocking:
            return
        return self.__read_result(commands[2], timeout=10)
 
    def get_unfinish_orders(self, blocking=True):
        """
        获取委托列表
        :param blocking:
        :return:
        """
        commands = ("get_unfinish_orders", {}, self.__create_request_id("get_unfinish_orders"))
        self.command_queue.put_nowait(commands)
        if not blocking:
            return
        return self.__read_result(commands[2], timeout=10)
 
    def cancel_order(self, local_order_id, account_id, blocking=True):
        """
        撤单
        :param local_order_id:
        :param account_id:
        :param blocking:
        :return:
        """
        commands = ("cancel_order", {"local_order_id": local_order_id, "account_id": account_id},
                    self.__create_request_id("cancel_order"))
        self.command_queue.put_nowait(commands)
        if not blocking:
            return
        return self.__read_result(commands[2], timeout=10)
 
    def get_symbol_names(self, symbols):
        """
        获取名称
        :param symbols:
        :return:
        """
        results = JueJinApi().get_gp_latest_info(symbols, "symbol,sec_name")
        return {x["symbol"]: x["sec_name"] for x in results}
 
    def get_execution_reports(self, blocking=True):
        commands = ("get_execution_reports", {},
                    self.__create_request_id("cancel_order"))
        self.command_queue.put_nowait(commands)
        if not blocking:
            return
        return self.__read_result(commands[2], timeout=10)
 
    def start_read_results(self):
        while True:
            try:
                result = self.result_queue.get()
                type, request_id, data = result[0], result[1], result[2]
                logger_print.debug("API读取到结果:{}", result)
                if type == event_type.EVENT_TYPE_REQUEST:
                    self.__result_dict[request_id] = data
                elif type == event_type.EVENT_TYPE_JUEJIN_INIT:
                    if self.api_callback:
                        # 采用线程调用
                        self.__not_request_thread_pool.submit(self.api_callback.OnJueJinInited)
            except:
                time.sleep(0.005)
 
 
trade_api = __TradeApi()
 
target_codes = set()
 
 
def update_blocks(force=False):
    """
    更新板块
    :param force:是否强制更新
    :return:
    """
    CodeKPLBlockManager().request_block(target_codes, force)
 
 
def get_blocks_count_info():
    """
    获取更新到板块的数量
    :return: (今日更新到板块的代码数量, 上个交易日的代码数量)
    """
    return CodeKPLBlockManager().get_codes_count_info()
 
 
# 最新的tick数据
latest_tick_dict = {}
 
 
def tick_callback(tick):
    code = tick[0]
    if code in latest_tick_dict and latest_tick_dict[code][7] == tick[7]:
        return
    latest_tick_dict[code] = tick
 
    blocks = CodeKPLBlockManager().get_blocks(code)
    limit_up_info = KPLLimitUpDataManager().get_current_limitup_info()
    api_callback.OnTick(tick, blocks, limit_up_info[1] if limit_up_info else [])
 
 
__TickQueueManager = TickQueueManager(60, tick_callback)
 
 
def __run_tick_tasks():
    threading.Thread(target=__TickQueueManager.run(), daemon=True).start()
    while True:
        try:
            datas = l1_data_api.get_current_info()
            for d in datas:
                __TickQueueManager.add_tick(d[0], d)
        except:
            pass
        finally:
            time.sleep(3)
 
 
def run(jeujin_strategy_id, juejin_token, target_codes_, api_callback_: APICallback):
    """
    启动API
    :param jeujin_strategy_id: 掘金策略ID
    :param juejin_token: TOKEN
    :param target_codes: 目标代码
    :param api_callback: 回调
    :return:
    """
    # 设置全局的目标代码
    global target_codes, api_callback
    api_callback = api_callback_
    target_codes.clear()
    target_codes |= set(target_codes_)
    # 初始化掘金API的参数
    JueJinApi.init(jeujin_strategy_id, juejin_token)
 
    command_queue, result_queue = multiprocessing.Queue(), multiprocessing.Queue()
    trade_api.init(command_queue, result_queue, api_callback)
    threading.Thread(target=trade_api.start_read_results, daemon=True).start()
 
    # 启动异步日志
    threading.Thread(target=async_log_util.run_sync, daemon=True).start()
 
    # 运行tick
    threading.Thread(target=__run_tick_tasks, daemon=True).start()
 
    # 运行掘金策略
    # 开启新进程访问
    jueJinProcess = multiprocessing.Process(target=juejin.start_run,
                                            args=(jeujin_strategy_id, juejin_token, command_queue, result_queue,))
    jueJinProcess.start()
 
    # 运行异步日志
    # threading.Thread(target=lambda:, daemon=True).start()