| | |
| | | # 交易管理器 |
| | | import datetime |
| | | import json |
| | | import random |
| | | import threading |
| | | import time |
| | | |
| | | import dask |
| | | import concurrent.futures |
| | | |
| | | from code_attribute import gpcode_manager |
| | | from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager |
| | |
| | | |
| | | # 开始交易 |
| | | def start_buy(code, capture_timestamp, last_data, last_data_index): |
| | | # @dask.delayed |
| | | def is_forbidden(code): |
| | | if l2_trade_util.is_in_forbidden_trade_codes(code): |
| | | return Exception("禁止交易") |
| | | return None, None |
| | | |
| | | # @dask.delayed |
| | | def is_state_right(code): |
| | | trade_state = __CodesTradeStateManager.get_trade_state_cache(code) |
| | | if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING: |
| | | return Exception("代码处于不可交易状态"), trade_state |
| | | return None, trade_state |
| | | |
| | | # @dask.delayed |
| | | def is_money_enough(code): |
| | | money = AccountAvailableMoneyManager().get_available_money_cache() |
| | | if money is None: |
| | |
| | | return Exception("账户可用资金不足"), price |
| | | return None, price |
| | | |
| | | # @dask.delayed |
| | | def can_trade(*args): |
| | | for arg in args: |
| | | if arg[0] is not None: |
| | | return arg[0], None, None |
| | | return None, args[1][1], args[2][1] |
| | | |
| | | _start_time = tool.get_now_timestamp() |
| | | |
| | | async_log_util.info(logger_trade, "{} trade.manager.start_buy 开始".format(code)) |
| | | try: |
| | | try: |
| | | ex = is_forbidden(code)[0] |
| | | if ex: |
| | | raise ex |
| | |
| | | ex, price = is_money_enough(code) |
| | | if ex: |
| | | raise ex |
| | | |
| | | print("开始买入") |
| | | async_log_util.info(logger_trade, "{}开始买入".format(code)) |
| | | finally: |
| | | async_log_util.info(logger_trade, "{} trade.manager.start_buy 判断是否可买".format(code)) |
| | | __CodesTradeStateManager.set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER) |
| | | # 状态改变过后必须要有本地下单编号 |
| | | # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "买入判断时间", force=True) |
| | | __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index) |
| | | # l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "异步买入时间", force=True) |
| | | finally: |
| | | async_log_util.info(logger_trade, "{} trade.manager.start_buy 结束".format(code)) |
| | | |
| | | |
| | | # 中断买入 |
| | |
| | | # 购买 |
| | | # @tool.async_call |
| | | def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index): |
| | | async_log_util.info(logger_trade, "{} trade_manager.__buy 开始".format(code)) |
| | | try: |
| | | |
| | | if constant.API_TRADE_ENABLE: |
| | | count = (constant.BUY_MONEY_PER_CODE // int(round(float(price) * 100))) * 100 |
| | | # 最低下单1手 |
| | |
| | | logger_trade.error("{}买入异常{}".format(code, str(e))) |
| | | logger_trade.exception(e) |
| | | raise e |
| | | finally: |
| | | async_log_util.info(logger_trade, "{} trade_manager.__buy 结束".format(code)) |
| | | |
| | | |
| | | # 下单成功 |
| | |
| | | |
| | | __CodesTradeStateManager = CodesTradeStateManager() |
| | | |
| | | __cancel_order_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) |
| | | |
| | | |
| | | # 开始取消买入 |
| | | def start_cancel_buy(code, force=False): |
| | | async_log_util.info(logger_trade, "{}进入撤单方法".format(code)) |
| | | async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 开始".format(code)) |
| | | trade_state = __CodesTradeStateManager.get_trade_state_cache(code) |
| | | try: |
| | | if trade_state == TRADE_STATE_BUY_SUCCESS: |
| | | return None |
| | | if not force: |
| | | if trade_state != TRADE_STATE_BUY_PLACE_ORDER and trade_state != TRADE_STATE_BUY_DELEGATED: |
| | | return None |
| | | try: |
| | | async_log_util.info(logger_trade, "{}开始撤单".format(code)) |
| | | __CodesTradeStateManager.set_trade_state(code, TRADE_STATE_BUY_CANCEL_ING) |
| | | async_log_util.info(logger_trade, "{}撤单方法开始".format(code)) |
| | | if constant.API_TRADE_ENABLE: |
| | | if constant.TRADE_WAY == constant.TRADE_WAY_JUEJIN: |
| | | trade_juejin.cancel_order(code) |
| | |
| | | trade_huaxin.cancel_order(code) |
| | | else: |
| | | guiTrade.cancel_buy(code) |
| | | async_log_util.info(logger_trade, "{}撤单方法结束".format(code)) |
| | | __cancel_success(code) |
| | | # 再次撤单 |
| | | if constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN: |
| | | threading.Thread(target=lambda: trade_huaxin.cancel_order(code, msg="再次撤单"), daemon=True).start() |
| | | __cancel_order_thread_pool.submit(lambda: trade_huaxin.cancel_order(code, msg="再次撤单")) |
| | | # 不需要再次撤单了 |
| | | # try: |
| | | # cancel_buy_again(code) |
| | |
| | | except Exception as e: |
| | | # 状态还原 |
| | | CodesTradeStateManager().set_trade_state(code, trade_state) |
| | | logger_trade.error("{}撤单异常:{}".format(code, str(e))) |
| | | async_log_util.error(logger_trade, "{}撤单异常:{}".format(code, str(e))) |
| | | raise e |
| | | async_log_util.info(logger_trade, "{}撤单完毕".format(code)) |
| | | finally: |
| | | async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 结束".format(code)) |
| | | |
| | | |
| | | # 再次撤单,防止没有撤掉 |
| | |
| | | RedisUtils.delete(redis_l2, k, auto_free=False) |
| | | finally: |
| | | RedisUtils.realse(redis_l2) |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | if __name__ == "__main__": |