""" 交易数据股那里器 用于对交易临时数据(交易状态,代码状态等)进行管理 """ import json import time # 交易撤销数据管理器 import constant from db.mysql_data_delegate import Mysqldb from db.redis_manager_delegate import RedisUtils from utils import global_util, tool from db import redis_manager_delegate as redis_manager from log_module.log import logger_trade class TradeCancelDataManager: capture_time_dict = {} # 保存截图时间 @classmethod def save_l2_capture_time(cls, client_id, pos, code, capture_time): cls.capture_time_dict["{}-{}-{}".format(client_id, pos, code)] = {"create_time": round(time.time() * 1000), "capture_time": capture_time} # 获取最近一次的截图时间 @classmethod def get_latest_l2_capture_time(cls, client_id, pos, code): val = cls.capture_time_dict.get("{}-{}-{}".format(client_id, pos, code)) if val is None: return -1 # 间隔时间不能大于1s if round(time.time() * 1000) - val["create_time"] > 1000: return -1 return val["capture_time"] # 获取l2数据的增长速度 @classmethod def get_l2_data_grow_speed(cls, client_id, pos, code, add_datas, capture_time): count = 0 for data in add_datas: count += data["re"] lastest_capture_time = cls.get_latest_l2_capture_time(client_id, pos, code) if lastest_capture_time < 0: raise Exception("获取上次l2数据截图时间出错") return count / (capture_time - lastest_capture_time) # 获取买入确认点的位置 @classmethod def get_buy_sure_position(cls, index, speed, trade_time): return index + round(speed * trade_time) class TradeBuyDataManager: __db = 0 redisManager = redis_manager.RedisManager(0) buy_sure_position_dict = {} __buy_position_info_cache = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(TradeBuyDataManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls.redisManager.getRedis() @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: keys = RedisUtils.keys(__redis, "buy_position_info-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) val = json.loads(val) tool.CodeDataCacheUtil.set_cache(cls.__buy_position_info_cache, code, val) finally: RedisUtils.realse(__redis) # 设置买入点的信息 # trade_time: 买入点截图时间与下单提交时间差值 # capture_time: 买入点截图时间 # last_data: 买入点最后一条数据 def set_buy_position_info(self, code, capture_time, trade_time, last_data, last_data_index): val = (capture_time, trade_time, last_data, last_data_index) tool.CodeDataCacheUtil.set_cache(self.__buy_position_info_cache, code, val) RedisUtils.setex_async(self.__db, "buy_position_info-{}".format(code), tool.get_expire(), json.dumps(val)) # 获取买入点信息 def get_buy_position_info(self, code): val_str = RedisUtils.get(self.redisManager.getRedis(), "buy_position_info-{}".format(code)) if val_str is None: return None, None, None, None else: val = json.loads(val_str) return val[0], val[1], val[2], val[3] def get_buy_position_info_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy_position_info_cache, code) if cache_result[0]: return cache_result[1] return None, None, None, None # 删除买入点信息 def remove_buy_position_info(self, code): tool.CodeDataCacheUtil.clear_cache(self.__buy_position_info_cache, code) RedisUtils.delete_async(self.__db, "buy_position_info-{}".format(code)) # 设置买入确认点信息 def __set_buy_sure_position(self, code, index, data): logger_trade.debug("买入确认点信息: code:{} index:{} data:{}", code, index, data) key = "buy_sure_position-{}".format(code) RedisUtils.setex(self.redisManager.getRedis(), key, tool.get_expire(), json.dumps((index, data))) self.buy_sure_position_dict[code] = (index, data) # 移除下单信号的详细信息 self.remove_buy_position_info(code) # 清除买入确认点信息 def __clear_buy_sure_position(self, code): key = "buy_sure_position-{}".format(code) RedisUtils.delete(self.redisManager.getRedis(), key) if code in self.buy_sure_position_dict: self.buy_sure_position_dict.pop(code) # 获取买入确认点信息 def get_buy_sure_position(self, code): temp = self.buy_sure_position_dict.get(code) if temp is not None: return temp[0], temp[1] key = "buy_sure_position-{}".format(code) val = RedisUtils.get(self.redisManager.getRedis(), key) if val is None: return None, None else: val = json.loads(val) self.buy_sure_position_dict[code] = (val[0], val[1]) return val[0], val[1] # 代码实时价格管理器 class CodeActualPriceProcessor: __code_current_rate_cache = {} __code_current_rate_latest = {} __db = 0 __redisManager = redis_manager.RedisManager(0) __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(CodeActualPriceProcessor, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() @classmethod def __load_datas(cls): __redis = cls.__get_redis() try: keys = RedisUtils.keys(__redis, "code_current_rate-*") for k in keys: code = k.split("-")[-1] val = RedisUtils.get(__redis, k) tool.CodeDataCacheUtil.set_cache(cls.__code_current_rate_cache, code, float(val)) except Exception as e: pass finally: RedisUtils.realse(__redis) def __save_current_price_codes_count(self, count): key = "current_price_codes_count" RedisUtils.setex(self.__get_redis(), key, 10, count) def __get_current_price_codes_count(self): key = "current_price_codes_count" count = RedisUtils.get(self.__get_redis(), key) return 0 if count is None else count # 保存当前涨幅 def __save_current_rate(self, code, rate): # 变化之后才会持久化 if self.__code_current_rate_latest.get(code) == rate: return self.__code_current_rate_latest[code] = rate tool.CodeDataCacheUtil.set_cache(self.__code_current_rate_cache, code, rate) key = "code_current_rate-{}".format(code) RedisUtils.setex_async(self.__db, key, tool.get_expire(), rate) # 批量保存 def __save_current_rates(self, datas): # 变化之后才会持久化 for d in datas: if self.__code_current_rate_latest.get(d[0]) == d[1]: continue self.__code_current_rate_latest[d[0]] = d[1] tool.CodeDataCacheUtil.set_cache(self.__code_current_rate_cache, d[0], d[1]) key = "code_current_rate-{}".format(d[0]) RedisUtils.setex_async(self.__db, key, tool.get_expire(), d[1]) # 获取当前涨幅 def __get_current_rate(self, code): key = "code_current_rate-{}".format(code) rate = RedisUtils.get(self.__get_redis(), key) if rate is not None: return float(rate) return None def get_current_rate(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__code_current_rate_cache, code) if cache_result[0]: return cache_result[1] return None # 保存现价 def save_current_price(self, code, price, is_limit_up): global_util.cuurent_prices[code] = (price, is_limit_up, round(time.time())) pass # 获取现价 def get_current_price(self, code): return global_util.cuurent_prices.get(code) # 现价代码数量 def save_current_price_codes_count(self, count): self.__save_current_price_codes_count(count) def get_current_price_codes_count(self): return self.__get_current_price_codes_count() # 当前代码是否涨停 def current_is_limit_up(self, code): data = self.get_current_price(code) if data is None: return None return data[1] # 获取涨幅前几的代码 def get_top_rate_codes(self, top_n): keys = "code_current_rate-*" keys = RedisUtils.keys(self.__get_redis(), keys) infos = [] for k in keys: code = k.split("-")[1] rate = self.get_current_rate(code) infos.append((code, rate)) # 排序信息 sorted_infos = sorted(infos, key=lambda tup: tup[1], reverse=True) sorted_infos = sorted_infos[:top_n] codes = [] for data in sorted_infos: codes.append(data[0]) return codes # 涨停次数管理 class PlaceOrderCountManager: __db = 0 __redisManager = redis_manager.RedisManager(0) __place_order_count_cache = {} __instance = None def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(PlaceOrderCountManager, cls).__new__(cls, *args, **kwargs) cls.__load_datas() return cls.__instance @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() @classmethod def __load_datas(cls): redis_ = cls.__get_redis() try: keys = RedisUtils.keys(redis_, "place_order_count-*") for k in keys: code = k.split("-")[-1] count = RedisUtils.get(redis_, k) cls.__place_order_count_cache[code] = int(count) finally: RedisUtils.realse(redis_) def __incre_place_order_count(self, code): if code not in self.__place_order_count_cache: self.__place_order_count_cache[code] = 0 self.__place_order_count_cache[code] += 1 key = "place_order_count-{}".format(code) RedisUtils.incrby_async(self.__db, key, 1) RedisUtils.expire_async(self.__db, key, tool.get_expire()) def __get_place_order_count(self, code): key = "place_order_count-{}".format(code) count = RedisUtils.get(self.__get_redis(), key) if count is not None: return int(count) return 0 def __get_place_order_count_cache(self, code): cache_result = tool.CodeDataCacheUtil.get_cache(self.__place_order_count_cache, code) if cache_result[0]: return cache_result[1] return 0 def place_order(self, code): self.__incre_place_order_count(code) def get_place_order_count(self, code): return self.__get_place_order_count_cache(code) def clear_place_order_count(self, code): self.__place_order_count_cache[code] = 0 key = "place_order_count-{}".format(code) RedisUtils.delete_async(self.__db, key) def clear(self): self.__place_order_count_cache.clear() keys = RedisUtils.keys(self.__get_redis(), "place_order_count-*") for k in keys: RedisUtils.delete(self.__get_redis(), k) # 账户可用资金管理 class AccountMoneyManager: __db = 2 __redis_manager = redis_manager.RedisManager(2) __available_money_cache = None __commission_cache = None __instance = None __mysqldb = Mysqldb() def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(AccountMoneyManager, cls).__new__(cls, *args, **kwargs) __redis = cls.__get_redis() result = RedisUtils.get(cls.__get_redis(), "trade-account-canuse-money") if result: cls.__available_money_cache = round(float(result), 2) return cls.__instance @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() def set_available_money(self, client_id, money): self.__available_money_cache = round(float(money), 2) RedisUtils.set(self.__get_redis(), "trade-account-canuse-money", money) def set_commission(self, commission): self.__commission_cache = commission # 获取交易账户的可用金额 def get_available_money(self): result = RedisUtils.get(self.__get_redis(), "trade-account-canuse-money") if result is None: return None return round(float(result), 2) def get_available_money_cache(self): return self.__available_money_cache def get_commission_cache(self): return self.__commission_cache def get_delegated_count_info(self, from_date=None, to_date=None): """ 获取委托数量信息 @return: """ if not from_date: from_date = tool.get_now_date_str("%Y%m%d") if not to_date: to_date = tool.get_now_date_str("%Y%m%d") sql = f"SELECT * FROM (SELECT '挂买', COUNT(*) AS '数量' FROM `hx_trade_delegate_record` r WHERE r.`direction`=0 AND r.`insertDate`>='{from_date}' AND r.`insertDate`<='{to_date}'" sql += " UNION ALL " sql += f"SELECT '撤挂买',COUNT(*) AS '数量' FROM `hx_trade_delegate_record` r WHERE r.`direction`=0 AND r.`cancelTime`!='' AND r.`cancelTime`IS NOT NULL AND r.`insertDate`>='{from_date}' AND r.`insertDate`<='{to_date}'" sql += " UNION ALL " sql += f"SELECT '撤挂卖', COUNT(*) AS '数量' FROM `hx_trade_delegate_record` r WHERE r.`direction`=1 AND r.`cancelTime`!='' AND r.`cancelTime`IS NOT NULL AND r.`insertDate`>='{from_date}' AND r.`insertDate`<='{to_date}'" sql += " UNION ALL " sql += f"SELECT '挂卖' ,COUNT(*) AS '数量' FROM `hx_trade_delegate_record` r WHERE r.`direction`=1 AND r.`insertDate`>='{from_date}' AND r.`insertDate`<='{to_date}'" sql += ") a" return self.__mysqldb.select_all(sql) def get_deal_count_info(self, from_date=None, to_date=None): if not from_date: from_date = tool.get_now_date_str("%Y%m%d") if not to_date: to_date = tool.get_now_date_str("%Y%m%d") sql = "SELECT * FROM ( " sql += f"SELECT '股票', COUNT(*), sum(a.price*a.volume) as '金额' FROM (SELECT * FROM `hx_trade_deal_record` r WHERE r.`tradeDate` >='{from_date}' and r.`tradeDate` <='{to_date}' AND (r.`securityID` LIKE '30%' OR r.`securityID` LIKE '60%' OR r.`securityID` LIKE '68%' OR r.`securityID` LIKE '00%') GROUP BY r.`orderSysID`) a" sql += " UNION ALL " sql += f"SELECT '上证基金', COUNT(*) AS '数量', sum(a.price*a.volume) as '金额' FROM (SELECT * FROM `hx_trade_deal_record` r WHERE r.`tradeDate` >='{from_date}' and r.`tradeDate` <='{to_date}' AND (r.`securityID` LIKE '11%') GROUP BY r.`orderSysID`) a" sql += " UNION ALL " sql += f"SELECT '深证基金', COUNT(*) AS '数量', sum(a.price*a.volume) as '金额' FROM (SELECT * FROM `hx_trade_deal_record` r WHERE r.`tradeDate` >='{from_date}' and r.`tradeDate` <='{to_date}' AND (r.`securityID` LIKE '12%') GROUP BY r.`orderSysID`) a" sql += ") a" return self.__mysqldb.select_all(sql) # 激进买成交代码 class RadicalBuyDealCodesManager: """ 激进买成交代码管理 """ __db = 2 __redis_manager = redis_manager.RedisManager(2) __deal_codes_cache = set() __instance = None # 根据L2数据来激进买入的有效时间:{"code":(有效截至时间, 买单号, 扫入的板块, 最近成交时间, 买入板块净流入情况, 是否是板上放量买入)} buy_by_l2_delegate_expire_time_dict = {} # 仅仅买的板块 __radical_buy_blocks_dict = {} def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(RadicalBuyDealCodesManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __load_data(cls): result = RedisUtils.smembers(cls.__get_redis(), "radical_buy_deal_codes") if result: cls.__deal_codes_cache = set(result) keys = RedisUtils.keys(cls.__get_redis(), "radical_buy_blocks-*") if keys: for k in keys: code = k.split("-")[1] val = RedisUtils.get(cls.__get_redis(), k) val = json.loads(val) cls.__radical_buy_blocks_dict[code] = set(val) cls.__deal_codes_cache = set(result) def set_code_blocks(self, code, blocks): self.__radical_buy_blocks_dict[code] = set(blocks) RedisUtils.setex_async(self.__db, f"radical_buy_blocks-{code}", tool.get_expire(), json.dumps(list(blocks))) def get_code_blocks(self, code): return self.__radical_buy_blocks_dict.get(code) @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() def add_deal_code(self, code): """ 添加已成交的代码 @param code: @return: """ self.__deal_codes_cache.add(code) RedisUtils.sadd_async(self.__db, "radical_buy_deal_codes", code) RedisUtils.expire_async(self.__db, "radical_buy_deal_codes", tool.get_expire()) def get_deal_codes(self): """ 获取已经成交的代码 @return: """ if self.__deal_codes_cache: return self.__deal_codes_cache return set() if __name__ == "__main__": pass