import logging import multiprocessing import threading from gm.api import * from local_api.log_module import async_log_util from local_api.log_module.log import logger_system, logger_trade, logger_print from local_api.util import juejin_util, event_type class JueJinCallback: pass # 掘金交易管理 class JueJinTradeManager: def set_context(self, context): self.context = context def buy(self, code, volume, price): """ 下单:返回委托对象列表 :param code: :param volume: :param price: :return: """ async_log_util.info(logger_trade, f"买入下单:{code}-{volume}-{price}") return order_volume(juejin_util.get_juejin_code_list_with_prefix([code])[0], volume, side=OrderSide_Buy, price=price, order_type=OrderType_Limit, position_effect=PositionEffect_Open, account=self.context.account().id) def sell(self, code, volume, price): """ 卖 :param code: :param volume: 量 :param price: 价 :return: """ async_log_util.info(logger_trade, f"卖出下单:{code}-{volume}-{price}") return order_volume(juejin_util.get_juejin_code_list_with_prefix([code])[0], volume, side=OrderSide_Sell, order_type=OrderType_Limit, price=price, position_effect=PositionEffect_Close, account=self.context.account().id) def queryPosition(self): """ 持仓查询 :return: """ try: return self.context.account().positions() except: return [] def queryUnfinishOrders(self): """ 查询未完成委托 :return: """ try: return get_unfinished_orders() except: return [] def cancelOrders(self, local_id, account_id): """ 撤单 :param local_id: 本地订单ID :param account_id: 账号ID :return: """ async_log_util.info(logger_trade, f"撤单:{local_id}-{account_id}") return order_cancel([{"cl_ord_id": local_id, "account_id": account_id}]) __JueJinTradeManager = JueJinTradeManager() # 策略中必须有init方法 def init(context): logger_system.info("掘金初始化成功") __JueJinTradeManager.set_context(context) result_queue.put_nowait((event_type.EVENT_TYPE_JUEJIN_INIT, "", None)) def on_order_status(context, order): logger_trade.info(f"on_order_status:{order}") logger_print.info(f"on_order_status:{order}") def on_execution_report(context, execrpt): logger_trade.info(f"on_execution_report:{execrpt}") logger_print.info(f"on_execution_report:{execrpt}") def on_error(context, execrpt): logger_print.info(f"on_error:{execrpt}") def __read_command(command_queue: multiprocessing.Queue, result_queue: multiprocessing.Queue): while True: try: command = command_queue.get() logger_print.info("读取到命令:{}", command) # 命令格式(type, data, request_id) type = command[0] data = command[1] request_id = command[2] if type == "buy": results = __JueJinTradeManager.buy(data['code'], data['volume'], data['price']) __send_request_response(request_id, results) elif type == 'sell': results = __JueJinTradeManager.sell(data['code'], data['volume'], data['price']) __send_request_response(request_id, results) elif type == 'position': results = __JueJinTradeManager.queryPosition() __send_request_response(request_id, results) elif type == 'get_unfinish_orders': results = __JueJinTradeManager.queryUnfinishOrders() __send_request_response(request_id, results) elif type == 'cancel_order': __JueJinTradeManager.cancelOrders(data["local_order_id"], data["account_id"]) __send_request_response(request_id, {}) except Exception as e: logger_print.exception(e) def __send_request_response(request_id, results): result_queue.put_nowait((event_type.EVENT_TYPE_REQUEST, request_id, results)) def start_run(strategy_id, token, command_queue: multiprocessing.Queue, result_queue_: multiprocessing.Queue): # 启动异步日志 threading.Thread(target=async_log_util.run_sync, daemon=True).start() # 读取命令 global result_queue result_queue = result_queue_ threading.Thread(target=__read_command, args=(command_queue, result_queue), daemon=True).start() run(strategy_id=strategy_id, filename='local_api.juejin.py', mode=MODE_LIVE, token=token, backtest_start_time='2020-11-01 08:00:00', backtest_end_time='2020-11-10 16:00:00', backtest_adjust=ADJUST_PREV, backtest_initial_cash=10000000, backtest_commission_ratio=0.0001, backtest_slippage_ratio=0.0001)