""" 买入金额与数量设置 """ import json import constant from db import redis_manager_delegate as redis_manager from db.mysql_data_delegate import Mysqldb from db.redis_manager_delegate import RedisUtils from l2.l2_data_manager import OrderBeginPosInfo from utils import tool class BuyMoneyAndCountSetting: __mysql = Mysqldb() __instance = None # 常规买 __normal_buy = [10, [("15:00:00", constant.BUY_MONEY_PER_CODE, 4)]] # (数量, [("时间", 金额, 买入数量)]) # 扫入买 __radical_buy = [4, [("15:00:00", constant.BUY_MONEY_PER_CODE, 3)]] # (数量, [("时间", 金额, 买入数量)]) def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(BuyMoneyAndCountSetting, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __load_data(cls): keys = ["normal_buy_money_setting", "radical_buy_money_setting", "normal_buy_count_setting", "radical_buy_count_setting"] sql = "select `key`, `value` from config where " + " or ".join([f"`key`='{k}'" for k in keys]) results = cls.__mysql.select_all(sql) if results: for result in results: if result[0] == keys[0]: # 常规买金额设置 if result[1]: cls.__normal_buy[1] = json.loads(result[1]) elif result[0] == keys[1]: # 扫入买金额设置 if result[1]: cls.__radical_buy[1] = json.loads(result[1]) elif result[0] == keys[2]: if result[1]: # 常规买数量设置 cls.__normal_buy[0] = int(result[1]) elif result[0] == keys[3]: if result[1]: # 扫入买数量设置 cls.__radical_buy[0] = int(result[1]) def set_normal_buy_data(self, max_count, money_list: list): """ 常规买数据设置 @param max_count: @param money_list: @return: """ self.__normal_buy = [max_count, money_list] self.__mysql.execute(f"update config set `value` = '{max_count}' where `key` = 'normal_buy_count_setting'") self.__mysql.execute( f"update config set `value` = '{json.dumps(money_list)}' where `key` = 'normal_buy_money_setting'") def set_radical_buy_data(self, max_count, money_list: list): """ 扫入买数据设置 @param max_count: @param money_list: @return: """ self.__radical_buy = [max_count, money_list] self.__mysql.execute(f"update config set `value` = '{max_count}' where `key` = 'radical_buy_count_setting'") self.__mysql.execute( f"update config set `value` = '{json.dumps(money_list)}' where `key` = 'radical_buy_money_setting'") def get_normal_buy_setting(self): return self.__normal_buy def get_radical_buy_setting(self): return self.__radical_buy class RadicalBuyBlockCodeCountManager: """ 扫入目标板块的代码个数限制管理 """ __db = 2 __redisManager = redis_manager.RedisManager(2) __instance = None __block_count_dict = {} def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(RadicalBuyBlockCodeCountManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __get_redis(cls): return cls.__redisManager.getRedis() @classmethod def __load_data(cls): val = RedisUtils.get(cls.__get_redis(), "radical_block_code_count_setting") if val: val = json.loads(val) cls.__block_count_dict = val def set_block_code_count(self, block_count_infos): """ 设置板板块买入的代码数量 @param block_count_infos:[("名称", 数量)] @return: """ if block_count_infos is None: block_count_infos = [] temp_dict = {x[0]: x[1] for x in block_count_infos} self.__block_count_dict = temp_dict RedisUtils.setex_async(self.__db, "radical_block_code_count_setting", tool.get_expire(), json.dumps(temp_dict)) def get_block_code_count(self, block): """ 获取板块最多买入代码的数量 @param block: @return: 最大数量,默认1 """ if block not in self.__block_count_dict: return 1 return self.__block_count_dict.get(block) def get_block_code_count_settings(self): """ 获取所有的板块设置 @return: """ results = [(k, self.__block_count_dict[k]) for k in self.__block_count_dict] results.sort(key=lambda x: x[0]) return results class BuyMoneyUtil: @classmethod def get_buy_data(cls, time_str, buy_mode, deals, delegates): """ 获取买入数据 @param time_str: 买入时间 @param buy_mode: 买入模式 @return:是否可买入, 买入金额, 信息 """ # radical_deals, radical_delegates, normal_deals, normal_delegates # if buy_mode == OrderBeginPosInfo.MODE_RADICAL: # [(成交代码, 成交时间)] # deals = DealAndDelegateWithBuyModeDataManager().get_deal_codes_info(buy_mode) if deals is None: deals = [] # [(委托代码, 委托时间)] # delegates = DealAndDelegateWithBuyModeDataManager().get_delegates_codes_info(buy_mode) if delegates is None: delegates = [] # 最大委托数量 max_count = 0 # 配置数据:[(时间,金额,数量, 最大数量)] money_list = [] if buy_mode == OrderBeginPosInfo.MODE_RADICAL: max_count, money_list = BuyMoneyAndCountSetting().get_radical_buy_setting() else: max_count, money_list = BuyMoneyAndCountSetting().get_normal_buy_setting() codes = set([x[0] for x in deals]) if len(codes) >= max_count: return False, 0, f"成交数量({len(codes)})超过{max_count}个" # 获取当前时间段允许成交的数量 start_time = "09:25:00" end_info = None for i in range(0, len(money_list)): money_info = money_list[i] if int(time_str.replace(":", "")) <= int(money_info[0].replace(":", "")): end_info = money_info if i > 0: start_time = money_list[i - 1][0] break if end_info is None: return False, constant.BUY_MONEY_PER_CODE, f"当前时间段不能扫入" # 获取时间段已经成交/已经挂单的代码数量 end_time_int = int(end_info[0].replace(":", "")) start_time_int = int(start_time.replace(":", "")) codes = set() for d in deals: if start_time_int < int(d[1].replace(":", "")) <= end_time_int: codes.add(d[0]) for d in delegates: if start_time_int < int(d[1].replace(":", "")) <= end_time_int: codes.add(d[0]) if len(codes) >= end_info[2]: if len(codes) >= end_info[2] + end_info[3]: return False, constant.BUY_MONEY_PER_CODE, f"时间段:{start_time}-{end_info[0]} 已成交/委托数量({codes})超过{end_info[2] + end_info[3]}个,已经不能委托" else: return True, constant.BUY_MONEY_PER_CODE, f"时间段:{start_time}-{end_info[0]} 已成交/委托数量({codes})>={end_info[2]}个,<{end_info[2] + end_info[3]} ,按默认金额扫入" else: return True, end_info[ 1], f"时间段:{start_time}-{end_info[0]} 已成交/委托数量({codes})没有超过{end_info[2]}个,委托金额为:{end_info[1]}" @classmethod def __get_possible_buy_moneys(cls): """ 获取可能的买入金额 @return: """ moneys = set() radical_buy_setting = BuyMoneyAndCountSetting().get_radical_buy_setting() if radical_buy_setting: moneys |= set([x[1] for x in radical_buy_setting[1]]) normal_buy_setting = BuyMoneyAndCountSetting().get_normal_buy_setting() if normal_buy_setting: moneys |= set([x[1] for x in normal_buy_setting[1]]) moneys.add(constant.BUY_MONEY_PER_CODE) print(moneys) return moneys @classmethod def get_possible_buy_volumes(cls, limit_up_price): """ 获取所有可能下单的量 @param limit_up_price: 涨停价 @return: """ moneys = cls.__get_possible_buy_moneys() total_volumes = set() for money in moneys: volumes = cls.get_possible_buy_volumes_by_money(limit_up_price, money) if volumes: total_volumes |= set(list(volumes)) return total_volumes @classmethod def get_possible_buy_volumes_by_money(cls, limit_up_price, money): """ 获取所有可能下单的量 @param money: 金额 @param limit_up_price: 涨停价 @return: """ total_volume_unit_100 = tool.get_buy_volume_by_money(limit_up_price, money)//100 return cls.get_possible_buy_volumes_by_total_volume(total_volume_unit_100*100) @classmethod def get_possible_buy_volumes_by_total_volume(cls, volume): """ 获取所有可能下单的量 @param volume: 下单的股数 @param limit_up_price: 涨停价 @return: """ total_volume_unit_100 = volume // 100 if total_volume_unit_100 % 2 == 0: return 100 * (total_volume_unit_100 // 2 - 1), 100 * (total_volume_unit_100 // 2 + 1) else: # 防止与同花顺等交易平台拆单相似 return 100 * (total_volume_unit_100 // 2 - 1), 100 * (total_volume_unit_100 // 2 + 1 + 1) if __name__ == '__main__': limit_up_price = 14.38 special_volumes = BuyMoneyUtil.get_possible_buy_volumes(limit_up_price) special_volumes |= set([tool.get_buy_volume_by_money(limit_up_price, x) for x in constant.AVAILABLE_BUY_MONEYS]) print(special_volumes) # print(json.dumps(list(BuyMoneyUtil.get_possible_buy_moneys())))