""" 交易管理器, 对一系列的代码交易变量,下单,撤单进行管理 """ # 交易管理器 import copy import datetime import json import threading import time import concurrent.futures from code_attribute import gpcode_manager from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager from db.mysql_data_delegate import Mysqldb from db.redis_manager_delegate import RedisUtils from l2.l2_data_manager import OrderBeginPosInfo, TradePointManager from log_module import async_log_util from output import kp_client_msg_manager from trade import trade_data_manager, l2_trade_util, trade_juejin, trade_huaxin, trade_constant import time as t from l2 import l2_data_manager, l2_data_log from log_module.log import * from trade.buy_money_count_setting import BuyMoneyUtil from trade.huaxin.huaxin_trade_record_manager import TradeOrderIdManager from trade.order_statistic import DealAndDelegateWithBuyModeDataManager from trade.trade_data_manager import AccountMoneyManager, RadicalBuyDealCodesManager from utils import import_util, tool, huaxin_util trade_gui = import_util.import_lib("trade.trade_gui") __db = 2 __redis_manager = redis_manager.RedisManager(2) guiTrade = None # trade_gui.THSGuiTrade() if trade_gui is not None else None latest_trade_delegate_data = [] # 关闭购买入口 # 开启购买入口 class TradeStateManager: __instance = None __db = 2 redisManager = redis_manager.RedisManager(2) def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(TradeStateManager, cls).__new__(cls, *args, **kwargs) cls.__instance.__trade_buy_state_cache = cls.is_can_buy() return cls.__instance @classmethod def __get_redis(cls): return cls.redisManager.getRedis() def sync(self): self.__trade_buy_state_cache = self.is_can_buy() # 开启购买入口 def open_buy(self): self.__trade_buy_state_cache = True RedisUtils.setex_async(self.__db, "trade_buy_state", tool.get_expire(), 1) # 关闭购买入口 def close_buy(self): self.__trade_buy_state_cache = False RedisUtils.setex_async(self.__db, "trade_buy_state", tool.get_expire(), 0) # 是否可以下单 @classmethod def is_can_buy(cls): # 默认设置为可交易 val = RedisUtils.get(cls.__get_redis(), "trade_buy_state") if val is None: return True if int(val) == 1: return True else: return False # 是否可以下单 def is_can_buy_cache(self): # 默认设置为可交易 return self.__trade_buy_state_cache # 交易目标票模式 class TradeTargetCodeModeManager: # 只买辨识度 MODE_ONLY_BUY_SPECIAL_CODES = 2 # 只买想买单 MODE_ONLY_BUY_WANT_CODES = 1 # 买所有 MODE_BUY_ALL = 0 __instance = None redisManager = redis_manager.RedisManager(2) def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(TradeTargetCodeModeManager, cls).__new__(cls, *args, **kwargs) # 初始化设置 # 获取交易窗口的锁 cls.__instance.__trade_buy_mode_cache = cls.get_mode() return cls.__instance @classmethod def __get_redis(cls): return cls.redisManager.getRedis() # 开启购买入口 def sync(self): self.__trade_buy_mode_cache = self.get_mode() def set_mode(self, mode): if mode != self.MODE_ONLY_BUY_WANT_CODES and mode != self.MODE_BUY_ALL and mode != self.MODE_ONLY_BUY_SPECIAL_CODES: raise Exception("mode参数值错误") self.__trade_buy_mode_cache = mode RedisUtils.setex(self.__get_redis(), "trade_buy_mode", tool.get_expire(), mode) # 是否可以下单 @classmethod def get_mode(cls): # 默认设置为可交易 val = RedisUtils.get(cls.__get_redis(), "trade_buy_mode") if val is None: return cls.MODE_BUY_ALL return int(val) def get_mode_cache(self): return self.__trade_buy_mode_cache # 自动撤卖模式管理 class AutoCancelSellModeManager: # 撤所有 MODE_CANCEL_ALL = 0 # 撤机器下单 MODE_CANCEL_MECHINE = 1 __instance = None __auto_cancel_sell_mode = MODE_CANCEL_ALL redisManager = redis_manager.RedisManager(2) def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(AutoCancelSellModeManager, cls).__new__(cls, *args, **kwargs) # 初始化设置 # 获取交易窗口的锁 cls.__auto_cancel_sell_mode = cls.get_mode() return cls.__instance @classmethod def __get_redis(cls): return cls.redisManager.getRedis() def set_mode(self, mode): if mode != self.MODE_CANCEL_ALL and mode != self.MODE_CANCEL_MECHINE: raise Exception("mode参数值错误") self.__auto_cancel_sell_mode = mode RedisUtils.setex(self.__get_redis(), "auto_cancel_sell_mode", tool.get_expire(), mode) # 是否可以下单 @classmethod def get_mode(cls): # 默认设置为可交易 val = RedisUtils.get(cls.__get_redis(), "auto_cancel_sell_mode") if val is None: return cls.MODE_CANCEL_ALL return int(val) def get_mode_cache(self): return self.__auto_cancel_sell_mode # 代码的交易状态管理 class CodesTradeStateManager: __trade_state_cache = {} __db = 2 __redis_manager = redis_manager.RedisManager(2) __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(CodesTradeStateManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: # 初始化数据 keys = RedisUtils.keys(__redis, "trade-state-*", auto_free=False) if keys: for key in keys: code = key.replace("trade-state-", '') cls.__trade_state_cache[code] = int(RedisUtils.get(__redis, key, auto_free=False)) finally: RedisUtils.realse(__redis) # 获取交易状态 def get_trade_state(self, code): state = RedisUtils.get(self.__get_redis(), "trade-state-{}".format(code)) if state is None: return trade_constant.TRADE_STATE_NOT_TRADE return int(state) def get_trade_state_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__trade_state_cache, code) if cache_result[0]: return cache_result[1] return trade_constant.TRADE_STATE_NOT_TRADE def get_trade_state_dict(self): return copy.deepcopy(self.__trade_state_cache) # 设置交易状态 def set_trade_state(self, code, state): async_log_util.info(logger_trade, "set_trade_state {}-{}".format(code, state)) tool.CodeDataCacheUtil.set_cache(self.__trade_state_cache, code, state) RedisUtils.setex_async(self.__db, "trade-state-{}".format(code), tool.get_expire(), state) def get_codes_by_trade_state(self, state): redis = self.__get_redis() try: keys = RedisUtils.keys(redis, "trade-state-*", auto_free=False) codes = [] if keys is not None: for key in keys: if int(RedisUtils.get(redis, key, auto_free=False)) == state: codes.append(key.replace("trade-state-", '')) return codes finally: RedisUtils.realse(redis) def get_codes_by_trade_states(self, states): redis = self.__get_redis() try: keys = RedisUtils.keys(redis, "trade-state-*", auto_free=False) codes = [] if keys is not None: for key in keys: if int(RedisUtils.get(redis, key, auto_free=False)) in states: codes.append(key.replace("trade-state-", '')) return codes finally: RedisUtils.realse(redis) def get_codes_by_trade_states_cache(self, states): # 获取 codes = [] for code in self.__trade_state_cache: if self.__trade_state_cache[code] in states: codes.append(code) return codes # 设置交易账户的可用金额 # 保存交易成功的数据 def save_trade_success_data(datas, day=datetime.datetime.now().strftime("%Y%m%d")): time_str = tool.get_now_time_str() RedisUtils.setex(__redis_manager.getRedis(), "trade-success-latest-time", tool.get_expire(), time_str) mysqldb = mysql_data.Mysqldb() # 合并同一合同编号 dict_ = {} for data in datas: trade_num = data["trade_num"] if trade_num not in dict_: dict_[trade_num] = data else: # 合并成交数量与成交金额 dict_[trade_num]["num"] = int(dict_[trade_num]["num"]) + int(data["num"]) dict_[trade_num]["money"] = round(float(dict_[trade_num]["money"]) + float(data["money"]), 3) for key in dict_: data = dict_[key] _time = data["time"] # 过滤错误数据 if _time == "00:00:00": continue data["_id"] = "{}_{}".format(day, data["trade_num"]) data["day"] = day data["create_time"] = int(round(t.time() * 1000)) counts = mysqldb.select_one("select count(*) from ths_trade_success_record where _id='{}'".format(data["_id"])) if counts[0] < 1: mysqldb.execute( "insert into ths_trade_success_record(_id,code,money,num,price,time,trade_num,type,day,create_time) values('{}','{}','{}','{}','{}','{}','{}',{},'{}',{})".format( data["_id"], data["code"], data["money"], data["num"], data["price"], data["time"], data["trade_num"], data["type"], data["day"], round(t.time() * 1000))) else: mysqldb.execute( "update ths_trade_success_record set money=%s, num=%s, price=%s,time=%s,trade_num=%s,type=%s where _id=%s", ( data["money"], data["num"], data["price"], data["time"], data["trade_num"], data["type"], data["_id"])) # 保存交易委托数据 def save_trade_delegate_data(datas): day = datetime.datetime.now().strftime("%Y%m%d") time_str = tool.get_now_time_str() mysqldb = mysql_data.Mysqldb() for data in datas: data["_id"] = "{}-{}-{}".format(day, data["code"], data["apply_time"][:6]) data["day"] = day data["create_time"] = int(round(t.time() * 1000)) counts = mysqldb.select_one("select count(*) from ths_trade_delegate_record where _id='{}'".format(data["_id"])) if counts[0] < 1: mysqldb.execute( "insert into ths_trade_delegate_record(_id,code,num,price,time,trade_num,trade_price,type,day,create_time) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", ( data["_id"], data["code"], data["num"], data["price"], data["time"], data["trade_num"], data["trade_price"], data["type"], data["day"], round(t.time() * 1000))) # 保存最新的委托数据 RedisUtils.setex(__redis_manager.getRedis(), "trade-delegate-latest", tool.get_expire(), json.dumps(datas)) RedisUtils.setex(__redis_manager.getRedis(), "trade-delegate-latest-time", tool.get_expire(), time_str) # 获取交易成功数据 def get_trade_success_data(): day = datetime.datetime.now().strftime("%Y%m%d") mysqldb = mysql_data.Mysqldb() results = mysqldb.select_all("select * from ths_trade_success_record where day='{}'".format(day)) datas = [] for result in results: data = {"_id": result[0], "code": result[1], "money": result[2], "num": result[3], "price": result[4], "time": result[5], "trade_num": result[6], "type": result[7], "day": result[8], "create_time": result[9]} datas.append(data) return datas, RedisUtils.get(__redis_manager.getRedis(), "trade-success-latest-time") # 获取交易委托数据 def get_trade_delegate_data(): redis = __redis_manager.getRedis() try: result = RedisUtils.get(redis, "trade-delegate-latest", auto_free=False) time_str = RedisUtils.get(redis, "trade-delegate-latest-time", auto_free=False) if result is None: return [], time_str else: return json.loads(result), time_str finally: RedisUtils.realse(redis) __CodesTradeStateManager = CodesTradeStateManager() # 开始交易 def start_buy(code, capture_timestamp, last_data, last_data_index, mode=0, exec_index=None): def is_forbidden(code): if l2_trade_util.is_in_forbidden_trade_codes(code): return Exception("禁止交易") return None, None def is_state_right(code): trade_state = __CodesTradeStateManager.get_trade_state_cache(code) if trade_state != trade_constant.TRADE_STATE_NOT_TRADE and trade_state != trade_constant.TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != trade_constant.TRADE_STATE_BUY_CANCEL_ING: return Exception("代码处于不可交易状态"), trade_state return None, trade_state def is_money_enough(code): money = AccountMoneyManager().get_available_money_cache() if money is None: return Exception("未获取到账户可用资金"), None price = gpcode_manager.get_limit_up_price(code) if price is None: return Exception("尚未获取到涨停价"), None # 买一手的资金是否足够 if price * 100 > money: return Exception("账户可用资金不足"), price return None, price async_log_util.info(logger_trade, "{} trade.manager.start_buy 开始".format(code)) try: try: ex = is_forbidden(code)[0] if ex: raise ex ex, trade_state = is_state_right(code) if ex: raise ex ex, price = is_money_enough(code) if ex: raise ex finally: async_log_util.info(logger_trade, "{} trade.manager.start_buy 判断是否可买".format(code)) __CodesTradeStateManager.set_trade_state(code, trade_constant.TRADE_STATE_BUY_PLACE_ORDER) # 状态改变过后必须要有本地下单编号 __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index, mode, exec_index=exec_index) finally: async_log_util.info(logger_trade, "{} trade.manager.start_buy 结束".format(code)) def test_order(code, last_data, exec_index): """ TODO 测试下单 @param code: @param last_data: @param exec_index: @return: """ price = gpcode_manager.get_limit_up_price(code) __buy(code, price, trade_constant.TRADE_STATE_NOT_TRADE, 0, last_data, last_data["index"], 0, exec_index=exec_index) # 中断买入 def break_buy(code, reason): trade_data_manager.TradeBuyDataManager().remove_buy_position_info(code) # 购买 # @tool.async_call def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index, mode=0, exec_index=None): async_log_util.info(logger_trade, "{} trade_manager.__buy 开始".format(code)) try: if constant.API_TRADE_ENABLE: can_buy, money, msg = BuyMoneyUtil.get_buy_data(tool.get_now_time_str(), mode, DealAndDelegateWithBuyModeDataManager().get_deal_codes_info( mode), DealAndDelegateWithBuyModeDataManager().get_delegates_codes_info( mode)) if not can_buy: raise Exception(msg) count = tool.get_buy_volume_by_money(price, money) # if mode == OrderBeginPosInfo.MODE_RADICAL: # # 激进买入金额为1手 # count = 100 if constant.TRADE_WAY == constant.TRADE_WAY_JUEJIN: trade_juejin.order_volume(code, price, count) elif constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN: if constant.IS_NEW_VERSION_PLACE_ORDER: trade_huaxin.order_volume_new(code, price, count, last_data, exec_index=exec_index) else: order_ref = huaxin_util.create_order_ref() TradeOrderIdManager().add_order_ref(code, order_ref) trade_huaxin.order_volume(code, price, count, last_data, order_ref=order_ref, exec_index=exec_index) else: guiTrade.buy(code, price) __place_order_success(code, capture_timestamp, last_data, last_data_index) except Exception as e: __place_order_fail(code, trade_state) logger_trade.error("{}买入异常{}".format(code, str(e))) logger_trade.exception(e) raise e finally: async_log_util.info(logger_trade, "{} trade_manager.__buy 结束".format(code)) # 下单成功 def __place_order_success(code, capture_timestamp, last_data, last_data_index): # 买入结束点 use_time = round(time.time() * 1000) - capture_timestamp # 下单成功,加入固定代码库 if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS: l2_data_manager.add_to_l2_fixed_codes(code) # 记录下单的那一帧图片的截图时间与交易用时 trade_data_manager.TradeBuyDataManager().set_buy_position_info(code, capture_timestamp, use_time, last_data, last_data_index) print("买入结束") async_log_util.info(logger_trade, "{}买入成功".format(code)) # kp_client_msg_manager.add_msg(code, "下单成功") # 下单失败 def __place_order_fail(code, trade_state): print("买入异常") # 状态还原 CodesTradeStateManager().set_trade_state(code, trade_state) __CodesTradeStateManager = CodesTradeStateManager() __cancel_order_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) # 开始取消买入 def start_cancel_buy(code, force=False): """ 开始撤单 @param code: @param force: @return: """ async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 开始".format(code)) trade_state = __CodesTradeStateManager.get_trade_state_cache(code) try: if trade_state == trade_constant.TRADE_STATE_BUY_SUCCESS: return None if not force: if trade_state != trade_constant.TRADE_STATE_BUY_PLACE_ORDER and trade_state != trade_constant.TRADE_STATE_BUY_DELEGATED: return None __CodesTradeStateManager.set_trade_state(code, trade_constant.TRADE_STATE_BUY_CANCEL_ING) if constant.API_TRADE_ENABLE: if constant.TRADE_WAY == constant.TRADE_WAY_JUEJIN: trade_juejin.cancel_order(code) elif constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN: trade_huaxin.cancel_order(code) else: guiTrade.cancel_buy(code) __cancel_success(code) # 再次撤单 if constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN: async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 再次撤单开始".format(code)) __cancel_order_thread_pool.submit(lambda: trade_huaxin.cancel_order(code, msg="再次撤单")) async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 再次撤单结束".format(code)) # 不需要再次撤单了 # try: # cancel_buy_again(code) # except Exception as e1: # pass except Exception as e: # 状态还原 CodesTradeStateManager().set_trade_state(code, trade_state) async_log_util.error(logger_trade, "{}撤单异常:{}".format(code, str(e))) raise e finally: async_log_util.info(logger_trade, "{} trade_manager.start_cancel_buy 结束".format(code)) # 再次撤单,防止没有撤掉 @tool.async_call def cancel_buy_again(code): time.sleep(0.02) for i in range(0, 5): # 如果时 trade_state = CodesTradeStateManager().get_trade_state_cache(code) if trade_state != trade_constant.TRADE_STATE_BUY_CANCEL_ING and trade_state != trade_constant.TRADE_STATE_BUY_CANCEL_SUCCESS: return try: logger_trade.info("{}:开始再次撤单", code) guiTrade.cancel_buy_again(code) logger_trade.info("{}:再次撤单成功", code) break except Exception as e: logger_trade.error("{}再次撤单异常:{}".format(code, str(e))) time.sleep(0.1 + 0.05 * i) pass # 取消委托成功 def __cancel_success(code): trade_data_manager.TradeBuyDataManager().remove_buy_position_info(code) # 下单成功,加入固定代码库 if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS: l2_data_manager.remove_from_l2_fixed_codes(code) async_log_util.info(logger_trade, "{}撤单成功".format(code)) # 暂时注释掉,有可能拖慢进程 # kp_client_msg_manager.add_msg(code, "撤单成功") # 处理交易成功数据 def process_trade_success_data(datas): if datas is None: return None for data in datas: code = data["code"] _time = data["time"] if _time == "00:00:00": continue # 买入成功 if code is not None and int(data["type"]) == 0: l2_trade_util.forbidden_trade(code, msg="交易成功", force=True) state = CodesTradeStateManager().get_trade_state_cache(code) if state != trade_constant.TRADE_STATE_BUY_SUCCESS: CodesTradeStateManager().set_trade_state(code, trade_constant.TRADE_STATE_BUY_SUCCESS) # 删除买撤记录的临时信息 kp_client_msg_manager.add_msg(code, "买入成交") l2_data_manager.TradePointManager().delete_buy_point(code) # 交易成功时间过去3s之后,且当前委托列表里面还有该代码数据就再次执行撤单 # 新版下单不处理 if not constant.IS_NEW_VERSION_PLACE_ORDER: if tool.trade_time_sub(tool.get_now_time_str(), _time) > 3: # 获取到当前是否委托 for dd in latest_trade_delegate_data: if dd["code"] == code: logger_trade.info("{}交易成功触发,重复下单撤单".format(code)) start_cancel_buy(code, True) # 处理委托成功数据 def process_trade_delegate_data(datas): if datas is None: return None latest_trade_delegate_data.clear() latest_trade_delegate_data.extend(datas) codes = [] for data in datas: code = data["code"] if code is not None: codes.append(code) trade_state = CodesTradeStateManager().get_trade_state_cache(code) # 设置下单状态的代码为已委托 if trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER: CodesTradeStateManager().set_trade_state(code, trade_constant.TRADE_STATE_BUY_DELEGATED) if constant.TRADE_WAY == constant.TRADE_WAY_JUEJIN: ing_codes = CodesTradeStateManager().get_codes_by_trade_state(trade_constant.TRADE_STATE_BUY_CANCEL_ING) if ing_codes is not None: for code in ing_codes: if code in codes: # 强制重新取消 start_cancel_buy(code, True) else: CodesTradeStateManager().set_trade_state(code, trade_constant.TRADE_STATE_BUY_CANCEL_SUCCESS) l2_data_manager.remove_from_l2_fixed_codes(code) def __clear_data(code): redis_l2 = redis_manager.RedisManager(1).getRedis() try: keys = RedisUtils.keys(redis_l2, "*{}*".format(code), auto_free=False) for k in keys: # if (k.find("l2-") is None or k.find("l2-") < 0) and (k.find("big_data-") is None or k.find("big_data-") < 0): RedisUtils.delete(redis_l2, k, auto_free=False) finally: RedisUtils.realse(redis_l2) RedisUtils.delete(redis_manager.RedisManager(2).getRedis(), "trade-state-{}".format(code)) redis_info = redis_manager.RedisManager(0).getRedis() try: keys = RedisUtils.keys(redis_info, "*{}*".format(code), auto_free=False) for k in keys: if k.find("pre") is not None: continue if k.find("zyltgb") is not None: continue RedisUtils.delete(redis_info, k, auto_free=False) finally: RedisUtils.realse(redis_info) def __clear_big_data(): redis_l2 = redis_manager.RedisManager(1).getRedis() try: keys = RedisUtils.keys(redis_l2, "big_data-*", auto_free=False) for k in keys: RedisUtils.delete(redis_l2, k, auto_free=False) finally: RedisUtils.realse(redis_l2) # 买入成功 def buy_success(code): # 加入黑名单 if not l2_trade_util.is_in_forbidden_trade_codes(code): l2_trade_util.forbidden_trade(code, "buy success", force=True) mode = TradePointManager().get_latest_place_order_mode(code) if mode is None: mode = OrderBeginPosInfo.MODE_NORMAL DealAndDelegateWithBuyModeDataManager().add_deal_code(code, tool.get_now_time_str(), mode) if mode == OrderBeginPosInfo.MODE_RADICAL: RadicalBuyDealCodesManager().add_deal_code(code) # 取s消所有的挂单 if constant.API_TRADE_ENABLE: if not constant.IS_NEW_VERSION_PLACE_ORDER: # 新版本拆单下单不撤单 if constant.TRADE_WAY == constant.TRADE_WAY_JUEJIN: trade_juejin.cancel_order(code) elif constant.TRADE_WAY == constant.TRADE_WAY_HUAXIN: trade_huaxin.cancel_order(code) else: guiTrade.cancel_buy(code) if __name__ == "__main__": print(CodesTradeStateManager().get_codes_by_trade_states_cache([0, 1])) print(CodesTradeStateManager().get_trade_state_cache("002235")) print(CodesTradeStateManager().get_trade_state_cache("002235"))