import multiprocessing
|
import random
|
import threading
|
import time
|
|
from local_api import juejin
|
from local_api.code_block_manager import CodeKPLBlockManager
|
from local_api.limit_up_manager import KPLLimitUpDataManager
|
from local_api.log_module import async_log_util
|
from local_api.log_module.log import logger_print
|
from local_api.tick_data_manager import TickQueueManager
|
from local_api.util import event_type, l1_data_api
|
import concurrent.futures
|
|
from local_api.util.juejin_util import JueJinApi
|
|
|
class APICallback:
|
|
def OnJueJinInited(self):
|
"""
|
掘金初始化完成
|
:return:
|
"""
|
pass
|
|
def OnTick(self, tick, blocks: set, limit_up_list: list):
|
"""
|
Tick回调
|
:param tick:
|
:param blocks: 代码的板块: 例如{"光伏"}
|
:param limit_up_list: 所有涨停
|
:return:
|
"""
|
pass
|
|
def OnTrade(self, trade_info):
|
"""
|
交易回调:当有成交或挂单时回调
|
:param trade_info:
|
:return:
|
"""
|
pass
|
|
def OnLimitUpListUpdated(self, limit_up_list):
|
"""
|
涨停列表更新回调
|
:param limit_up_list:
|
:return:
|
"""
|
|
|
class __TradeApi:
|
def __init__(self):
|
self.__result_dict = {}
|
self.__not_request_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=20)
|
|
def init(self, command_queue: multiprocessing.Queue, result_queue: multiprocessing.Queue,
|
api_callback: APICallback):
|
self.command_queue = command_queue
|
self.result_queue = result_queue
|
self.api_callback = api_callback
|
|
@classmethod
|
def __create_request_id(cls, type):
|
return f"request_id-{type}-{int(time.time() * 1000000)}-{random.randint(0, 100000)}"
|
|
def __read_result(self, request_id, timeout=3.0):
|
"""
|
读取结果
|
:param request_id:
|
:param timeout: 超时时间
|
:return:
|
"""
|
start_time = time.time()
|
# 最大超时时间50s
|
for i in range(10000):
|
if request_id in self.__result_dict:
|
return self.__result_dict.pop(request_id)
|
time.sleep(0.005)
|
if time.time() - start_time > timeout:
|
raise Exception("读取结果超时")
|
raise Exception("未获取到请求结果")
|
|
def buy(self, code, volume, price, blocking=True):
|
"""
|
买入下单
|
:param code:
|
:param volume:
|
:param price:
|
:param blocking: True表示需要等待结果
|
:return:
|
"""
|
commands = ("buy", {"code": code, "volume": volume, "price": price}, self.__create_request_id("buy"))
|
self.command_queue.put_nowait(commands)
|
if not blocking:
|
return
|
return self.__read_result(commands[2])
|
|
def sell(self, code, volume, price, blocking=True):
|
"""
|
买入下单
|
:param code:
|
:param volume:
|
:param price:
|
:param blocking: True表示需要等待结果
|
:return:
|
"""
|
commands = ("sell", {"code": code, "volume": volume, "price": price}, self.__create_request_id("sell"))
|
self.command_queue.put_nowait(commands)
|
if not blocking:
|
return
|
return self.__read_result(commands[2])
|
|
def positions(self, blocking=True):
|
"""
|
买入下单
|
:param code:
|
:param volume:
|
:param price:
|
:param blocking: True表示需要等待结果
|
:return:
|
"""
|
commands = ("position", {}, self.__create_request_id("position"))
|
self.command_queue.put_nowait(commands)
|
if not blocking:
|
return
|
return self.__read_result(commands[2], timeout=10)
|
|
def start_read_results(self):
|
while True:
|
try:
|
result = self.result_queue.get()
|
type, request_id, data = result[0], result[1], result[2]
|
logger_print.debug("API读取到结果:{}", result)
|
if type == event_type.EVENT_TYPE_REQUEST:
|
self.__result_dict[request_id] = data
|
elif type == event_type.EVENT_TYPE_JUEJIN_INIT:
|
if self.api_callback:
|
# 采用线程调用
|
self.__not_request_thread_pool.submit(self.api_callback.OnJueJinInited)
|
except:
|
time.sleep(0.005)
|
|
|
trade_api = __TradeApi()
|
|
target_codes = set()
|
|
|
def update_blocks(force=False):
|
"""
|
更新板块
|
:param force:是否强制更新
|
:return:
|
"""
|
CodeKPLBlockManager().request_block(target_codes, force)
|
|
|
def get_blocks_count_info():
|
"""
|
获取更新到板块的数量
|
:return: (今日更新到板块的代码数量, 上个交易日的代码数量)
|
"""
|
return CodeKPLBlockManager().get_codes_count_info()
|
|
|
# 最新的tick数据
|
latest_tick_dict = {}
|
|
|
def tick_callback(tick):
|
code = tick[0]
|
if code in latest_tick_dict and latest_tick_dict[code][7] == tick[7]:
|
return
|
latest_tick_dict[code] = tick
|
|
blocks = CodeKPLBlockManager().get_blocks(code)
|
limit_up_info = KPLLimitUpDataManager().get_current_limitup_info()
|
api_callback.OnTick(tick, blocks, limit_up_info[1] if limit_up_info else [])
|
|
|
__TickQueueManager = TickQueueManager(60, tick_callback)
|
|
|
def __run_tick_tasks():
|
threading.Thread(target=__TickQueueManager.run(), daemon=True).start()
|
while True:
|
try:
|
datas = l1_data_api.get_current_info()
|
for d in datas:
|
__TickQueueManager.add_tick(d[0], d)
|
except:
|
pass
|
finally:
|
time.sleep(3)
|
|
|
def run(jeujin_strategy_id, juejin_token, target_codes_, api_callback_: APICallback):
|
"""
|
启动API
|
:param jeujin_strategy_id: 掘金策略ID
|
:param juejin_token: TOKEN
|
:param target_codes: 目标代码
|
:param api_callback: 回调
|
:return:
|
"""
|
# 设置全局的目标代码
|
global target_codes, api_callback
|
api_callback = api_callback_
|
target_codes.clear()
|
target_codes |= set(target_codes_)
|
# 初始化掘金API的参数
|
JueJinApi.init(jeujin_strategy_id, juejin_token)
|
|
command_queue, result_queue = multiprocessing.Queue(), multiprocessing.Queue()
|
trade_api.init(command_queue, result_queue, api_callback)
|
threading.Thread(target=trade_api.start_read_results, daemon=True).start()
|
|
# 启动异步日志
|
threading.Thread(target=async_log_util.run_sync, daemon=True).start()
|
|
# 运行tick
|
threading.Thread(target=__run_tick_tasks, daemon=True).start()
|
|
# 运行掘金策略
|
# 开启新进程访问
|
jueJinProcess = multiprocessing.Process(target=juejin.start_run,
|
args=(jeujin_strategy_id, juejin_token, command_queue, result_queue,))
|
jueJinProcess.start()
|
|
# 运行异步日志
|
# threading.Thread(target=lambda:, daemon=True).start()
|