import logging
|
import multiprocessing
|
import threading
|
|
from gm.api import *
|
|
from local_api.log_module import async_log_util
|
from local_api.log_module.log import logger_system, logger_trade, logger_print
|
from local_api.util import juejin_util, event_type
|
|
|
class JueJinCallback:
|
pass
|
|
|
# 掘金交易管理
|
class JueJinTradeManager:
|
def set_context(self, context):
|
self.context = context
|
|
def buy(self, code, volume, price):
|
"""
|
下单:返回委托对象列表
|
:param code:
|
:param volume:
|
:param price:
|
:return:
|
"""
|
async_log_util.info(logger_trade, f"买入下单:{code}-{volume}-{price}")
|
|
return order_volume(juejin_util.get_juejin_code_list_with_prefix([code])[0], volume,
|
side=OrderSide_Buy,
|
price=price,
|
order_type=OrderType_Limit, position_effect=PositionEffect_Open,
|
account=self.context.account().id)
|
|
def sell(self, code, volume, price):
|
"""
|
卖
|
:param code:
|
:param volume: 量
|
:param price: 价
|
:return:
|
"""
|
async_log_util.info(logger_trade, f"卖出下单:{code}-{volume}-{price}")
|
return order_volume(juejin_util.get_juejin_code_list_with_prefix([code])[0], volume, side=OrderSide_Sell,
|
order_type=OrderType_Limit,
|
price=price,
|
position_effect=PositionEffect_Close,
|
account=self.context.account().id)
|
|
def queryPosition(self):
|
"""
|
持仓查询
|
:return:
|
"""
|
try:
|
return self.context.account().positions()
|
except:
|
return []
|
|
def queryPositionBySymbol(self, symbol):
|
"""
|
持仓查询
|
:return:
|
"""
|
try:
|
return self.context.account().position(symbol, PositionSide_Long)
|
except:
|
return []
|
|
def queryUnfinishOrders(self):
|
"""
|
查询未完成委托
|
:return:
|
"""
|
try:
|
return get_unfinished_orders()
|
except:
|
return []
|
|
def cancelOrders(self, local_id, account_id):
|
"""
|
撤单
|
:param local_id: 本地订单ID
|
:param account_id: 账号ID
|
:return:
|
"""
|
async_log_util.info(logger_trade, f"撤单:{local_id}-{account_id}")
|
return order_cancel([{"cl_ord_id": local_id, "account_id": account_id}])
|
|
def getExecutionReports(self):
|
"""
|
获取执行回报
|
:return:
|
"""
|
return get_execution_reports()
|
|
|
__JueJinTradeManager = JueJinTradeManager()
|
|
|
# 策略中必须有init方法
|
def init(context):
|
logger_system.info("掘金初始化成功")
|
__JueJinTradeManager.set_context(context)
|
result_queue.put_nowait((event_type.EVENT_TYPE_JUEJIN_INIT, "", None))
|
|
|
def on_order_status(context, order):
|
logger_trade.info(f"on_order_status:{order}")
|
logger_print.info(f"on_order_status:{order}")
|
|
|
def on_execution_report(context, execrpt):
|
logger_trade.info(f"on_execution_report:{execrpt}")
|
logger_print.info(f"on_execution_report:{execrpt}")
|
|
|
def on_error(context, execrpt):
|
logger_print.info(f"on_error:{execrpt}")
|
|
|
def __read_command(command_queue: multiprocessing.Queue, result_queue: multiprocessing.Queue):
|
while True:
|
try:
|
command = command_queue.get()
|
logger_print.info("读取到命令:{}", command)
|
# 命令格式(type, data, request_id)
|
type = command[0]
|
data = command[1]
|
request_id = command[2]
|
if type == "buy":
|
results = __JueJinTradeManager.buy(data['code'], data['volume'], data['price'])
|
__send_request_response(request_id, results)
|
elif type == 'sell':
|
results = __JueJinTradeManager.sell(data['code'], data['volume'], data['price'])
|
__send_request_response(request_id, results)
|
elif type == 'position':
|
results = __JueJinTradeManager.queryPosition()
|
__send_request_response(request_id, results)
|
elif type == 'get_position_by_symbol':
|
results = __JueJinTradeManager.queryPositionBySymbol(data["symbol"])
|
__send_request_response(request_id, results)
|
elif type == 'get_unfinish_orders':
|
results = __JueJinTradeManager.queryUnfinishOrders()
|
__send_request_response(request_id, results)
|
elif type == 'cancel_order':
|
__JueJinTradeManager.cancelOrders(data["local_order_id"], data["account_id"])
|
__send_request_response(request_id, {})
|
elif type == 'get_execution_reports':
|
__JueJinTradeManager.getExecutionReports()
|
__send_request_response(request_id, {})
|
|
|
|
|
except Exception as e:
|
logger_print.exception(e)
|
|
|
def __send_request_response(request_id, results):
|
result_queue.put_nowait((event_type.EVENT_TYPE_REQUEST, request_id, results))
|
|
|
def start_run(strategy_id, token, command_queue: multiprocessing.Queue, result_queue_: multiprocessing.Queue):
|
# 启动异步日志
|
threading.Thread(target=async_log_util.run_sync, daemon=True).start()
|
# 读取命令
|
global result_queue
|
result_queue = result_queue_
|
threading.Thread(target=__read_command, args=(command_queue, result_queue), daemon=True).start()
|
run(strategy_id=strategy_id,
|
filename='local_api.juejin.py',
|
mode=MODE_LIVE,
|
token=token,
|
backtest_start_time='2020-11-01 08:00:00',
|
backtest_end_time='2020-11-10 16:00:00',
|
backtest_adjust=ADJUST_PREV,
|
backtest_initial_cash=10000000,
|
backtest_commission_ratio=0.0001,
|
backtest_slippage_ratio=0.0001)
|