Administrator
2023-09-18 0ced5496fcfcb9de9927793023e8692fe792b477
trade/trade_manager.py
@@ -5,11 +5,10 @@
# 交易管理器
import datetime
import json
import random
import threading
import time
import dask
import concurrent.futures
from code_attribute import gpcode_manager
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
@@ -393,20 +392,17 @@
# 开始交易
def start_buy(code, capture_timestamp, last_data, last_data_index):
    # @dask.delayed
    def is_forbidden(code):
        if l2_trade_util.is_in_forbidden_trade_codes(code):
            return Exception("禁止交易")
        return None, None
    # @dask.delayed
    def is_state_right(code):
        trade_state = __CodesTradeStateManager.get_trade_state_cache(code)
        if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
            return Exception("代码处于不可交易状态"), trade_state
        return None, trade_state
    # @dask.delayed
    def is_money_enough(code):
        money = AccountAvailableMoneyManager().get_available_money_cache()
        if money is None:
@@ -419,32 +415,25 @@
            return Exception("账户可用资金不足"), price
        return None, price
    # @dask.delayed
    def can_trade(*args):
        for arg in args:
            if arg[0] is not None:
                return arg[0], None, None
        return None, args[1][1], args[2][1]
    _start_time = tool.get_now_timestamp()
    ex = is_forbidden(code)[0]
    if ex:
        raise ex
    ex, trade_state = is_state_right(code)
    if ex:
        raise ex
    ex, price = is_money_enough(code)
    if ex:
        raise ex
    print("开始买入")
    async_log_util.info(logger_trade, "{}开始买入".format(code))
    __CodesTradeStateManager.set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER)
    # 状态改变过后必须要有本地下单编号
    # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "买入判断时间", force=True)
    __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index)
    # l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "异步买入时间", force=True)
    async_log_util.info(logger_trade, "{} trade.manager.start_buy 开始".format(code))
    try:
        try:
            ex = is_forbidden(code)[0]
            if ex:
                raise ex
            ex, trade_state = is_state_right(code)
            if ex:
                raise ex
            ex, price = is_money_enough(code)
            if ex:
                raise ex
        finally:
            async_log_util.info(logger_trade, "{} trade.manager.start_buy 判断是否可买".format(code))
        __CodesTradeStateManager.set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER)
        # 状态改变过后必须要有本地下单编号
        __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index)
    finally:
        async_log_util.info(logger_trade, "{} trade.manager.start_buy 结束".format(code))
# 中断买入
@@ -455,7 +444,9 @@
# 购买
# @tool.async_call
def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index):
    async_log_util.info(logger_trade, "{} trade_manager.__buy 开始".format(code))
    try:
        if constant.API_TRADE_ENABLE:
            count = (constant.BUY_MONEY_PER_CODE // int(round(float(price) * 100))) * 100
            # 最低下单1手
@@ -475,6 +466,8 @@
        logger_trade.error("{}买入异常{}".format(code, str(e)))
        logger_trade.exception(e)
        raise e
    finally:
        async_log_util.info(logger_trade, "{} trade_manager.__buy 结束".format(code))
# 下单成功
@@ -501,20 +494,20 @@
__CodesTradeStateManager = CodesTradeStateManager()
__cancel_order_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
# 开始取消买入
def start_cancel_buy(code, force=False):
    async_log_util.info(logger_trade, "{}进入撤单方法".format(code))
    async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 开始".format(code))
    trade_state = __CodesTradeStateManager.get_trade_state_cache(code)
    if trade_state == TRADE_STATE_BUY_SUCCESS:
        return None
    if not force:
        if trade_state != TRADE_STATE_BUY_PLACE_ORDER and trade_state != TRADE_STATE_BUY_DELEGATED:
            return None
    try:
        async_log_util.info(logger_trade, "{}开始撤单".format(code))
        if trade_state == TRADE_STATE_BUY_SUCCESS:
            return None
        if not force:
            if trade_state != TRADE_STATE_BUY_PLACE_ORDER and trade_state != TRADE_STATE_BUY_DELEGATED:
                return None
        __CodesTradeStateManager.set_trade_state(code, TRADE_STATE_BUY_CANCEL_ING)
        async_log_util.info(logger_trade, "{}撤单方法开始".format(code))
        if constant.API_TRADE_ENABLE:
            if constant.TRADE_WAY == constant.TRADE_WAY_JUEJIN:
                trade_juejin.cancel_order(code)
@@ -522,11 +515,10 @@
                trade_huaxin.cancel_order(code)
        else:
            guiTrade.cancel_buy(code)
        async_log_util.info(logger_trade, "{}撤单方法结束".format(code))
        __cancel_success(code)
        # 再次撤单
        if constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN:
            threading.Thread(target=lambda: trade_huaxin.cancel_order(code, msg="再次撤单"), daemon=True).start()
            __cancel_order_thread_pool.submit(lambda: trade_huaxin.cancel_order(code, msg="再次撤单"))
        # 不需要再次撤单了
        # try:
        #     cancel_buy_again(code)
@@ -535,9 +527,10 @@
    except Exception as e:
        # 状态还原
        CodesTradeStateManager().set_trade_state(code, trade_state)
        logger_trade.error("{}撤单异常:{}".format(code, str(e)))
        async_log_util.error(logger_trade, "{}撤单异常:{}".format(code, str(e)))
        raise e
    async_log_util.info(logger_trade, "{}撤单完毕".format(code))
    finally:
        async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 结束".format(code))
# 再次撤单,防止没有撤掉
@@ -664,9 +657,6 @@
            RedisUtils.delete(redis_l2, k, auto_free=False)
    finally:
        RedisUtils.realse(redis_l2)
if __name__ == "__main__":