""" 激进买数据管理 """ import json import logging import time import urllib import requests import constant import l2_data_util from code_attribute.code_volumn_manager import CodeVolumeManager from l2 import l2_data_util as l2_data_util_new, l2_log, l2_data_manager from code_attribute import code_nature_analyse, code_volumn_manager, gpcode_manager from code_attribute.code_l1_data_manager import L1DataManager from code_attribute.gpcode_manager import WantBuyCodesManager from db import redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from l2.huaxin import l2_huaxin_util from l2.l2_data_manager import TradePointManager from l2.l2_data_util import L2DataUtil from l2.l2_transaction_data_manager import BigOrderDealManager, HuaXinBuyOrderManager from log_module import async_log_util from log_module.log import logger_l2_radical_buy, logger_debug, logger_l2_radical_buy_data from third_data import kpl_block_util, huaxin_l1_data_manager, kpl_data_constant from third_data.code_plate_key_manager import RealTimeKplMarketData, KPLPlateForbiddenManager from third_data.kpl_data_constant import LimitUpDataConstant, LimitUpCodesBlockRecordManager from third_data.kpl_data_manager import CodeHighLevel from third_data.third_blocks_manager import BlockMapManager from trade import trade_record_log_util, trade_data_manager, trade_manager, trade_constant from trade.buy_money_count_setting import RadicalBuyBlockCodeCountManager from trade.buy_radical import new_block_processor from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager from trade.trade_data_manager import RadicalBuyDealCodesManager from utils import tool, global_util, trade_util # RadicalBigOrderThresholdManager @tool.singleton class BeforeSubDealBigOrderManager: """ 订阅之前的大单管理 """ __db = 3 __big_order_threshold = {} __big_sell_order_threshold = {} # 已经成交的累计大买单金额:用于初次上板尚未订阅的情况 __already_total_deal_big_order_money = {} # 已经成交的累计大卖单金额:用于初次上板尚未订阅的情况 __already_total_sell_deal_big_order_money = {} __redis_manager = redis_manager.RedisManager(3) # 临时大单阈值,初次上板使用: {"代码":(金额,更新时间, 是否是默认的值)} __temp_big_order_threshold = {} def __init__(self): self.__load_data() def __load_data(self): keys = redis_manager.RedisUtils.keys(self.__get_redis(), "radical_big_order_threshold-*") for k in keys: code = k.split("-")[1] val = redis_manager.RedisUtils.get(self.__get_redis(), k) val = int(val) self.__big_order_threshold[code] = val keys = redis_manager.RedisUtils.keys(self.__get_redis(), "radical_big_sell_order_threshold-*") for k in keys: code = k.split("-")[1] val = redis_manager.RedisUtils.get(self.__get_redis(), k) val = int(val) self.__big_sell_order_threshold[code] = val def __get_redis(self): return self.__redis_manager.getRedis() def set_big_order_threshold(self, code, threshold_money): """ 设置大单的阈值 @param code: @param threshold_money: @return: """ if code in self.__big_order_threshold: return async_log_util.info(logger_l2_radical_buy_data, f"大单阈值数据:{code}-{threshold_money}") if threshold_money < 299e4: # 小于299w的不保存 return self.__big_order_threshold[code] = threshold_money redis_manager.RedisUtils.setex_async(self.__db, f"radical_big_order_threshold-{code}", tool.get_expire(), f"{threshold_money}") def set_big_sell_order_threshold(self, code, threshold_money): """ 设置大卖单的阈值 @param code: @param threshold_money: @return: """ if code in self.__big_sell_order_threshold: return async_log_util.info(logger_l2_radical_buy_data, f"大卖单阈值数据:{code}-{threshold_money}") if threshold_money < self.__get_base_big_order_threshold(code): # 小于基础大单不设置 return self.__big_sell_order_threshold[code] = threshold_money redis_manager.RedisUtils.setex_async(self.__db, f"radical_big_sell_order_threshold-{code}", tool.get_expire(), f"{threshold_money}") @classmethod def compute_re_limit_up_big_money_threshold(cls, limit_up_price_money_list): """ 获取回封的大单阈值 @param limit_up_price_money_list: @return: """ average_money = sum(limit_up_price_money_list) // len(limit_up_price_money_list) max_money = max(limit_up_price_money_list) # 取max((平均值+最大单一半)/2, 平均值) threshold_money = max((max_money // 2 + average_money) // 2, average_money) return threshold_money def set_big_deal_order_list(self, code, buy_money_list, sell_money_list, limit_up_price): """ 设置大单成交数据 @param code: @param buy_money_list:[(金额,价格,订单号)] @param sell_money_list:[(金额,价格,订单号)] @param limit_up_price: @return: """ # 涨停价成交的大单(策略进程尚未统计到的) # 炸板时间附近的订单号 # RadicalCodeMarketInfoManager # 如果炸过板就取炸板时间之前的第一个订单号作为最大订单号 opened_time = RadicalCodeMarketInfoManager().get_opened_time(code) total_deal_buy_money = 0 total_deal_buy_money_info_list = [] limit_up_price_money_list = [] pre_limit_up_price_money_list = [] deal_order_list = BigOrderDealManager().get_total_buy_data_list(code) deal_order_ids = set() if deal_order_list: for x in deal_order_list: if opened_time and int(opened_time.replace(":", "")) > int( l2_huaxin_util.convert_time(x[3]).replace(":", "")): # 开板时间之前 continue deal_order_ids.add(x[0]) for info in buy_money_list: if info[1] != limit_up_price: continue limit_up_price_money_list.append(info[0]) if info[2] in deal_order_ids: continue pre_limit_up_price_money_list.append((info[0], info[2])) total_deal_buy_money += info[0] total_deal_buy_money_info_list.append(info) if limit_up_price_money_list: threshold_money = self.compute_re_limit_up_big_money_threshold(limit_up_price_money_list) self.set_big_order_threshold(code, threshold_money) self.__already_total_deal_big_order_money[code] = (total_deal_buy_money, pre_limit_up_price_money_list) async_log_util.info(logger_l2_radical_buy_data, f"之前的大买单:{code}-{total_deal_buy_money}-{total_deal_buy_money_info_list}") # 处理大卖单 pre_limit_up_price_money_sell_list = [] if True: total_deal_sell_money_info_list = [] total_deal_sell_money = 0 for info in sell_money_list: if info[1] != limit_up_price: continue pre_limit_up_price_money_sell_list.append((info[0], info[2])) total_deal_sell_money += info[0] total_deal_sell_money_info_list.append(info) # 计算卖单阈值 if total_deal_sell_money_info_list: max_sell_money = max([x[0] for x in total_deal_sell_money_info_list]) average_sell_money = sum([x[0] for x in total_deal_sell_money_info_list]) // len( total_deal_sell_money_info_list) threshold_sell_money = int( round(max(self.__get_base_big_order_threshold(code), min((max_sell_money // 2 + average_sell_money) // 2, average_sell_money)))) self.set_big_sell_order_threshold(code, threshold_sell_money) async_log_util.info(logger_l2_radical_buy_data, f"之前的大卖单:{code}-{total_deal_sell_money}-{total_deal_sell_money_info_list},卖大单阈值-{threshold_sell_money}") self.__already_total_sell_deal_big_order_money[code] = ( total_deal_sell_money, pre_limit_up_price_money_sell_list) def get_big_order_threshold(self, code): """ 获取大单阈值 @param code: @return: 大单金额, 是否默认 """ if code in self.__big_order_threshold: return self.__big_order_threshold.get(code), False return 2990000, True def get_big_order_threshold_info(self, code): """ 获取大单阈值 @param code: @return:大单阈值,是否是临时大单 """ if not is_first_limit_up_buy(code): return self.get_big_order_threshold(code)[0], False else: money_info = self.get_temp_deal_big_order_threshold_info(code) if not money_info or money_info[1] < time.time(): # 默认阈值为 1个亿 money = int(round(1e8)) else: money = money_info[0] return money, True def __get_base_big_order_threshold(self, code): """ 获取基础大单的金额 @param code: @return: """ return l2_data_util.get_big_money_val(gpcode_manager.get_limit_up_price_as_num(code), tool.is_ge_code(code)) def get_big_sell_order_threshold(self, code): """ 获取大单阈值 @param code: @return: """ if code in self.__big_sell_order_threshold: return self.__big_sell_order_threshold.get(code) return self.__get_base_big_order_threshold(code) def get_deal_big_order_money(self, code): """ 获取大单成交金额 @param code: @return: """ if code in self.__already_total_deal_big_order_money: return self.__already_total_deal_big_order_money[code][0] return 0 def get_sell_deal_big_order_money(self, code, threshold_money): if code in self.__already_total_sell_deal_big_order_money: sellno_map = l2_data_util_new.local_today_sellno_map.get(code) total_sell_money = 0 for x in self.__already_total_sell_deal_big_order_money[code][1]: if x[0] < threshold_money: continue if f"{x[1]}" in sellno_map: continue total_sell_money += x[0] return total_sell_money return 0 def get_deal_big_order_money_list(self, code): """ 获取大单成交列表 @param code: @return: [(金额, 订单号)] """ if code in self.__already_total_deal_big_order_money: return self.__already_total_deal_big_order_money[code][1] return [] def is_need_update(self, code): """ 是否有必要拉数据 @param code: @return: """ if self.__big_order_threshold.get(code) and self.__big_sell_order_threshold.get(code): # 没有大单成交 return False return True def set_temp_deal_big_orders(self, code, money_info_list): """ 设置临时大单 @param code: @param money_info_list: 成交的大买单:[(订单号,总股数,成交金额,成交开始时间,成交结束时间, 最近的成交价格, 最近的卖单号, 涨停价成交金额)] @return: """ temp_big_order_threshold_info = self.__temp_big_order_threshold.get(code) if temp_big_order_threshold_info and not temp_big_order_threshold_info[2] and time.time() < \ self.__temp_big_order_threshold[code][1]: return if not money_info_list or len(money_info_list) < 1: return fmoney_list = [] min_money = l2_data_util.get_big_money_val(gpcode_manager.get_limit_up_price_as_num(code), tool.is_ge_code(code)) for info in money_info_list: # 要求是10s之内成交的 if tool.trade_time_sub(tool.get_now_time_str(), l2_huaxin_util.convert_time(info[3])) > 10: continue if info[7] >= min_money: # 涨停价成交部分是大单 fmoney_list.append((info[7], info[0])) if len(fmoney_list) < 1: return money_list = [x[0] for x in fmoney_list] money_list = money_list[:2] # 计算大单: 前2个大单+万手的均值 # if tool.is_ge_code(code): # money_list.append(gpcode_manager.get_limit_up_price_as_num(code) * 300000) # else: # money_list.append(gpcode_manager.get_limit_up_price_as_num(code) * 1000000) self.__temp_big_order_threshold[code] = ( int(sum(money_list) // len(money_list)), time.time() + 10, len(money_list) < 2) async_log_util.info(logger_l2_radical_buy_data, f"首次上板临时买大单:{code}-{self.__temp_big_order_threshold[code]}-{fmoney_list}") trade_record_log_util.add_common_msg(code, "首封大单设置", f"{self.__temp_big_order_threshold[code][0]}") def get_temp_deal_big_order_threshold_info(self, code): data = self.__temp_big_order_threshold.get(code) return data @tool.singleton class TotalDealBigOrderThresholdMoneyManager: """ 累计成交大单阈值管理(人为设置) """ __db = 3 __redis_manager = redis_manager.RedisManager(3) __total_big_order_threshold = {} def __init__(self): self.__load_data() def __get_redis(self): return self.__redis_manager.getRedis() def __load_data(self): keys = redis_manager.RedisUtils.keys(self.__get_redis(), "total_radical_big_order_threshold-*") for k in keys: code = k.split("-")[1] val = redis_manager.RedisUtils.get(self.__get_redis(), k) val = int(val) self.__total_big_order_threshold[code] = val def set_money(self, code, money): """ 设置金额 @param code: @param money: @return: """ self.__total_big_order_threshold[code] = money redis_manager.RedisUtils.setex_async(self.__db, f"total_radical_big_order_threshold-{code}", tool.get_expire(), money) def get_money_cache(self, code): """ 获取缓存 @param code: @return: """ return self.__total_big_order_threshold.get(code) @tool.singleton class RadicalCodeMarketInfoManager: """ 激进买的票行情数据管理 """ # 涨停情况 {"代码":[涨停时间,炸板时间]} __db = 3 __code_limit_up_info_dict = {} __redis_manager = redis_manager.RedisManager(3) # 记录最近60s的买1数据(炸板要清除) # 格式:{"代码":[(时间,(价格,量)),...]} __latest_60s_buy1_list_dict = {} def __init__(self): self.__load_data() def __load_data(self): keys = redis_manager.RedisUtils.keys(self.__get_redis(), "market_info_for_radical-*") for k in keys: code = k.split("-")[1] val = redis_manager.RedisUtils.get(self.__get_redis(), k) val = json.loads(val) self.__code_limit_up_info_dict[code] = val def __get_redis(self): return self.__redis_manager.getRedis() def __set_code_limit_up_info(self, code, info): async_log_util.info(logger_l2_radical_buy_data, f"上板/炸板数据:{code}-{info}") redis_manager.RedisUtils.setex_async(self.__db, f"market_info_for_radical-{code}", tool.get_expire(), json.dumps(info)) def set_market_info(self, code, time_str, limit_up_price, buy1_info, sell1_info): """ 设置行情信息 @param code: @param time_str: 行情时间 @param limit_up_price: 涨停价格 @param buy1_info: 买1信息 (价格,量) @param sell1_info: 卖1信息 (价格,量) @return: """ if abs(buy1_info[0] - limit_up_price) < 0.0001: # 涨停 if code not in self.__latest_60s_buy1_list_dict: self.__latest_60s_buy1_list_dict[code] = [] self.__latest_60s_buy1_list_dict[code].append((time_str, buy1_info)) # 清除60s之前的数据 max_count = len(self.__latest_60s_buy1_list_dict[code]) while True: max_count -= 1 if max_count <= 0: break if self.__latest_60s_buy1_list_dict[code]: if tool.trade_time_sub(time_str, self.__latest_60s_buy1_list_dict[code][0][0]) > 60: self.__latest_60s_buy1_list_dict[code].pop(0) else: break if buy1_info[0] * buy1_info[1] > 1e7: # 1000w的封单才算涨停 if code not in self.__code_limit_up_info_dict: self.__code_limit_up_info_dict[code] = [time_str, ''] self.__set_code_limit_up_info(code, self.__code_limit_up_info_dict[code]) else: if code in self.__latest_60s_buy1_list_dict: self.__latest_60s_buy1_list_dict.pop(code) # 尚未涨停,判断炸板 if sell1_info and sell1_info[1] > 0: # 出现买2 if code in self.__code_limit_up_info_dict: if not self.__code_limit_up_info_dict[code][1]: # 有涨停时间且没有记录炸板时间 self.__code_limit_up_info_dict[code][1] = time_str self.__set_code_limit_up_info(code, self.__code_limit_up_info_dict[code]) def is_opened_limit_up(self, code): """ 是否炸过板 @param code: @return: """ data = self.__code_limit_up_info_dict.get(code) if data and data[1]: return True return False def get_opened_time(self, code): """ 获取炸板时间 @param code: @return: """ data = self.__code_limit_up_info_dict.get(code) if data: return data[1] return None def is_almost_open_limit_up(self, code): """ 是否即将炸板 @param code: @return: """ buy1_list = self.__latest_60s_buy1_list_dict.get(code) if not buy1_list: return False if len(buy1_list) < 2: return False latest_buy1 = buy1_list[-1][1] if not gpcode_manager.WantBuyCodesManager().is_in_cache(code) and latest_buy1[0] * latest_buy1[1] > 500e4: # 不是想买单:最近的买1要小于500万 return False latest_volume = latest_buy1[1] max_volume = max([x[1][1] for x in buy1_list]) if max_volume // 10 > latest_volume: # 当前量小于最大量的1/10 return True return False class RadicalBuyDataManager: @classmethod def is_code_can_buy(cls, code, deal_codes=None, is_refered=False): """ 代码是否可以买(根据代码本身属性) @param deal_codes: 已经成交的代码 @param is_refered: 是否是参考票 @param code: 代码 @param total_sell_volume: 总卖量 @return: 是否可以买, 原因, 是否可拉黑 """ if WantBuyCodesManager().is_in_cache(code): return True, "已加想", False zyltgb = global_util.zyltgb_map.get(code) k_format = None if not constant.TEST: k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code) if k_format: if not k_format[13] and zyltgb and zyltgb < 50e8: # 50亿以下的60天未涨停不买 return False, "近60个交易日无涨停", True if k_format[14]: # 昨天炸板,一律不买 return False, f"昨日炸板", True # 获取涨停价 price = gpcode_manager.get_limit_up_price_as_num(code) if not price: # 获取现价 price = L1DataManager.get_l1_current_price(code) if price: if price < constant.MIN_CODE_RADICAL_BUY_PRICE or price > constant.MAX_CODE_RADICAL_BUY_PRICE: return False, "价格不满足需求", True # 判断自由流通股本 if zyltgb: zyltgb_as_yi = round(zyltgb / 100000000, 2) zylt_can_buy = False for zy in constant.RADICAL_BUY_ZYLTGB_AS_YI_RANGES: if zy[0][0] <= zyltgb_as_yi <= zy[0][1] and zy[1][0] <= price <= zy[1][1]: zylt_can_buy = True break if not zylt_can_buy: return False, "自由流通市值/价格不满足扫的范围", True # 判断昨日是否跌幅过大 if k_format and len(k_format) > 12 and k_format[12]: return False, "上个交易日振幅过大", True # 是否有抛压 if len(k_format) > 10 and k_format[10][0]: limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) # 近5个交易日有涨停/炸板/跌停 price_info = k_format[10][1] p_price_info = price_info[0] # (价格,量) t_price_info = price_info[1] # (价格,量)/None # 初始化不能买 can_buy = False # 抛压已释放:今日涨停价≥T高价且T高价当日成交量≥P高价对应的那一天的成交量*101% if t_price_info and limit_up_price >= t_price_info[0] and t_price_info[1] >= p_price_info[0] * 1.01: can_buy = True if not can_buy: # 抛压当日大幅释放:今日涨停价≥T高价+2%且今日涨停实时成交量≥T高价当日成交量*90% if t_price_info and limit_up_price >= t_price_info[0] * 1.02: today_volume = CodeVolumeManager().get_today_volumn_cache(code) if today_volume >= t_price_info[1] * 0.9: can_buy = True if not can_buy: # 抛压今日强势释放:没有T高价时,今日涨停价≥P高价+6%且今日涨停实时成交量≥P高价当日成交量*101% if not t_price_info and limit_up_price >= p_price_info[0] * 1.06: today_volume = CodeVolumeManager().get_today_volumn_cache(code) if today_volume >= p_price_info[1] * 1.01: can_buy = True if not can_buy: return False, "抛压没释放", False if gpcode_manager.BlackListCodeManager().is_in_cache(code): if deal_codes is not None and code in deal_codes: pass else: # 拉黑且不是已经买入性质的拉黑 return False, "已拉黑", False if is_refered: # 参考票没有大单 volume_rate = None if not constant.TEST: volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate(code, with_info=False) if volume_rate is None: volume_rate = 0.5 big_order_deal_enough = True, "", True if not constant.TEST: big_order_deal_enough = is_big_order_deal_enough(code, volume_rate, 0) if not big_order_deal_enough[0] and big_order_deal_enough[2]: return False, "交易前两分钟大单不足", False # 暂时不管净流出 # if not RealTimeKplMarketData.is_ignore_block_in_money(): # jx_out_blocks = RealTimeKplMarketData.get_top_market_jingxuan_out_blocks() # if jx_out_blocks: # blocks = RadicalBuyBlockManager.get_code_blocks(code)[0] # if blocks: # fblocks = set(blocks) # else: # fblocks = set() # for b in blocks: # # 辨识度的票不看净流出 # block_codes = BlockSpecialCodesManager().get_block_codes(b) # if block_codes and code in block_codes: # # 辨识度 # fblocks.discard(b) # same_blocks = fblocks & set(jx_out_blocks) # if same_blocks: # return False, f"【{same_blocks}】板块精选流出", False return True, "", False @classmethod def big_order_deal(cls, code): """ 有大单成交 @param code: @param count: @return: """ cls.__process_add_white(code) if gpcode_manager.MustBuyCodesManager().is_in_cache(code): return total_deal_big_order_result = get_total_deal_big_order_info(code, gpcode_manager.get_limit_up_price_as_num(code)) if total_deal_big_order_result[0] <= 0: cls.big_order_deal_enough(code) @classmethod def place_order_success(cls, code): # 如果是加想,且成交大单足够就加红 total_deal_big_order_result = get_total_deal_big_order_info(code, gpcode_manager.get_limit_up_price_as_num(code)) if WantBuyCodesManager().is_in_cache(code): if total_deal_big_order_result[0] <= 0: # 累计大单足够需要加红 gpcode_manager.MustBuyCodesManager().add_code(code) trade_record_log_util.add_must_buy(code, "累计成交大单足够") cls.__process_add_white(code) @classmethod def __process_add_white(cls, code): """ 处理加白 @param code: @return: """ if not constant.CAN_AUTO_ADD_WHITE: return if gpcode_manager.WhiteListCodeManager().is_in_cache(code): return try: total_deal_big_order_result = get_total_deal_big_order_info(code, gpcode_manager.get_limit_up_price_as_num(code)) if total_deal_big_order_result[0] <= 0 and total_deal_big_order_result[2] >= 1e8: # 1个亿以上的且本批次成交的大单金额大于2倍大单金额就加白 order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(code) is_placed_order = l2_data_manager.TradePointManager.is_placed_order(order_begin_pos) if not is_placed_order: # 没有下过单 return if order_begin_pos and order_begin_pos.min_order_no is not None: # 在 min_order_no 之后成交的大单金额 total_buy_data_list = BigOrderDealManager().get_total_buy_data_list(code) min_order_no = order_begin_pos.min_order_no if min_order_no is None: async_log_util.warning(logger_debug, "处理成交大单足够加白: 最小订单号为空") return bigger_money = l2_data_util.get_big_money_val(gpcode_manager.get_limit_up_price_as_num(code), tool.is_ge_code(code)) deal_money = sum( [x[2] for x in total_buy_data_list if x[0] >= min_order_no and x[2] >= bigger_money]) # 获取均大单 THRESHOLD_MONEY, is_temp_threshold_money = BeforeSubDealBigOrderManager().get_big_order_threshold_info( code) if deal_money >= 2 * THRESHOLD_MONEY: gpcode_manager.WhiteListCodeManager().add_code(code) trade_record_log_util.add_common_msg(code, "加白", f"{code}大单成交足够加白, 本批次成交金额-{deal_money}/{THRESHOLD_MONEY * 2} 累计大单金额:{total_deal_big_order_result[1]}/{total_deal_big_order_result[2]}") else: async_log_util.info(logger_debug, f"{code}-成交大单少({deal_money}/{2 * THRESHOLD_MONEY},最小订单号-{order_begin_pos.min_order_no}),不能加白") except Exception as e: logger_debug.exception(e) async_log_util.info(logger_debug, f"处理成交大单足够加白的问题:{str(e)}") @classmethod def market_info_change(cls, code): """ 行情变化:当量达到69%就加红 @param code: @return: """ if gpcode_manager.MustBuyCodesManager().is_in_cache(code): return # 不根据量的比例加红 # volume_rate = code_volumn_manager.CodeVolumeManager().get_volume_rate_refer_in_5days(code) # if volume_rate and volume_rate >= 0.39: # gpcode_manager.MustBuyCodesManager().add_code(code) @classmethod def big_order_deal_enough(cls, code): """ 大单成交足够 @param code: @return: """ if 1 > 0: # 大单足够暂时不做处理 return # 如果加绿了就直接拉白 if gpcode_manager.GreenListCodeManager().is_in_cache(code): if gpcode_manager.WhiteListCodeManager().is_in_cache(code): return gpcode_manager.WhiteListCodeManager().add_code(code) trade_record_log_util.add_white_buy(code, "已加绿且大单足够") else: if gpcode_manager.MustBuyCodesManager().is_in_cache(code): return gpcode_manager.MustBuyCodesManager().add_code(code) trade_record_log_util.add_must_buy(code, "大单足够") class RadicalBuyBlockManager: """ 扫入买板块管理 """ # 上次的涨停代码 __last_limit_up_codes = set() # 记录代码的涨停时间 __limit_up_time_dict = {} # 炸板代码的总涨停时间 __total_limit_up_space_dict = {} # 当前涨停的代码 __current_limit_up_codes = set() @classmethod def set_current_limit_up_datas(cls, current_limit_up_datas): # 查询当前的涨停代码集合 codes = set([d[0] for d in current_limit_up_datas]) cls.__current_limit_up_codes = codes try: # 炸板代码 break_limit_up_codes = cls.__last_limit_up_codes - codes # 新涨停的代码 new_limit_up_codes = codes - cls.__last_limit_up_codes if new_limit_up_codes: for code in new_limit_up_codes: if code not in cls.__limit_up_time_dict: cls.__limit_up_time_dict[code] = time.time() if break_limit_up_codes: # 记录总涨停时间 for bc in break_limit_up_codes: if bc in cls.__limit_up_time_dict: space = tool.trade_time_sub(tool.get_now_time_str(), tool.to_time_str(cls.__limit_up_time_dict[bc])) if bc not in cls.__total_limit_up_space_dict: cls.__total_limit_up_space_dict[bc] = 0 cls.__total_limit_up_space_dict[bc] = cls.__total_limit_up_space_dict[bc] + space logger_debug.info(f"炸板代码涨停时间:{bc}-{cls.__total_limit_up_space_dict[bc]}") cls.__limit_up_time_dict.pop(bc) except Exception as e: logger_debug.exception(e) finally: cls.__last_limit_up_codes = codes cls.compute_open_limit_up_code_dict_for_radical_buy(current_limit_up_datas) @classmethod def compute_open_limit_up_code_dict_for_radical_buy(cls, current_limit_up_datas): """ 计算开1的代码信息,不包含5板以上的 @param current_limit_up_datas: @return: """ timestamp_start, timestamp_end = kpl_block_util.open_limit_up_time_range temp_dict = {} for d in current_limit_up_datas: code = d[0] # d: (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额,涨停原因代码,涨停原因代码数量) # 计算是否开1 if int(d[2]) >= timestamp_end or int(d[2]) < timestamp_start: continue buy1_money = huaxin_l1_data_manager.get_buy1_money(code) # 买1是否大于5000w if not constant.TEST: if not buy1_money or buy1_money < 5e7: continue if not tool.is_can_buy_code(code): continue blocks = {d[5]} if d[6]: blocks |= set(d[6].split("、")) blocks -= constant.KPL_INVALID_BLOCKS # 过滤 blocks = BlockMapManager().filter_blocks(blocks) # 开1剔除4板以上的 high_level = CodeHighLevel().get_high_level(code) if high_level >= 4: continue temp_dict[code] = (high_level, blocks) kpl_data_constant.open_limit_up_code_dict_for_radical_buy = temp_dict @classmethod def __get_current_index(cls, code, block, yesterday_limit_up_codes, exclude_codes=None, limit_up_time=None, ignore_open_limit_up=True): """ 获取当前涨停身位 @param code: @param block: @param yesterday_limit_up_codes: @param exclude_codes: @param limit_up_time: @param ignore_open_limit_up: 是否忽略开1的代码 @return: 索引,前排代码信息([(代码, 涨停时间)]) """ if exclude_codes is None: exclude_codes = set() current_index = 0 block_codes_infos = [] timestamp_start, timestamp_end = kpl_block_util.open_limit_up_time_range if limit_up_time is None: limit_up_time = time.time() for k in LimitUpDataConstant.current_limit_up_datas: _code = k[0] if _code in exclude_codes: continue blocks = LimitUpDataConstant.get_blocks_with_history(_code) if not blocks: blocks = set() if _code == code: # 获取当前代码涨停时间 limit_up_time = int(k[2]) continue # 不是这个板块 if block not in blocks: continue # 剔除4板以上的板 if CodeHighLevel().get_high_level(_code) >= 4 and len(blocks) > 1: continue if not tool.is_can_buy_code(_code): continue # 剔除开1的数据 if ignore_open_limit_up and timestamp_start <= int(k[2]) < timestamp_end: continue # 剔除高位板 if _code in yesterday_limit_up_codes: continue # 代码.涨停时间 block_codes_infos.append((_code, int(k[2]))) block_codes_infos.append((code, limit_up_time)) block_codes_infos.sort(key=lambda x: x[1]) before_codes_info = [] for i in range(0, len(block_codes_infos)): if block_codes_infos[i][0] == code: current_index = i break else: before_codes_info.append(block_codes_infos[i]) return current_index, before_codes_info @classmethod def get_history_index(cls, code, block, yesterday_limit_up_codes, exclude_codes=None, ignore_open_limit_up=True): datas = cls.__get_history_index(code, block, yesterday_limit_up_codes, exclude_codes, ignore_open_limit_up) return datas[0], datas[1] @classmethod def filter_before_codes(cls, code, block, history_index, history_before_codes_info, yesterday_limit_up_codes): return cls.__filter_before_codes(code, block, history_index, history_before_codes_info, yesterday_limit_up_codes) @classmethod def __get_history_index(cls, code, block, yesterday_limit_up_codes, exclude_codes=None, ignore_open_limit_up=True): """ 获取历史涨停身位 @param code: @param block: @param yesterday_limit_up_codes: @param exclude_codes: @param ignore_open_limit_up: 是否忽略开1代码 @return: """ if exclude_codes is None: exclude_codes = set() history_index = 0 block_codes_infos = [] # 开1时间范围 timestamp_start, timestamp_end = kpl_block_util.open_limit_up_time_range limit_up_time = time.time() limit_up_space_ge_60s_codes = set() for k in LimitUpDataConstant.history_limit_up_datas: _code = k[3] if _code in exclude_codes: continue if _code == code: # 获取当前代码涨停时间 limit_up_time = int(k[5]) continue blocks = LimitUpDataConstant.get_blocks_with_history(_code) # 不是这个板块 if block not in blocks: continue # 剔除4板以上且板块数量大于1个 if CodeHighLevel().get_high_level(_code) >= 4 and len(blocks) > 1: continue if not tool.is_can_buy_code(_code): continue # 剔除开1的数据 if ignore_open_limit_up and timestamp_start <= int(k[5]) < timestamp_end: continue # 剔除高位板 if _code in yesterday_limit_up_codes: continue # 剔除炸板代码持续涨停时间小于1分钟的代码 且 只能用于不排除前2条数据 if _code not in cls.__current_limit_up_codes and _code in cls.__total_limit_up_space_dict and \ cls.__total_limit_up_space_dict[_code] < 60 and not exclude_codes and len( limit_up_space_ge_60s_codes) < 3: limit_up_space_ge_60s_codes.add(_code) continue # 代码,涨停时间 block_codes_infos.append((_code, int(k[5]))) block_codes_infos.append((code, limit_up_time)) block_codes_infos.sort(key=lambda x: x[1]) before_codes_info = [] for i in range(0, len(block_codes_infos)): if block_codes_infos[i][0] == code: history_index = i break else: before_codes_info.append(block_codes_infos[i]) return history_index, before_codes_info, limit_up_space_ge_60s_codes @classmethod def __is_radical_buy_with_open_limitup(cls, code, block, yesterday_limit_up_codes): """ 是否需要激进买(某个板块开1) 1.有>=2个开1买老2 2.有1个开1的买老3 @param code: @param block: @param yesterday_limit_up_codes 昨日涨停代码 @return: """ if not kpl_data_constant.is_new_block(block): return False, "非新题材" # 9:45点之前涨停的才能买入 # 获取当前代码的涨停时间 limit_up_timestamp = cls.__get_limit_up_timestamp(code) if tool.get_now_time_as_int() > 100000: return False, "超过生效时间" # 开始买的身位 2:从老三开始买 1: 从老二开始买 START_BUY_RANK = 2 # 根据板块聚合数据 open_limit_up_block_codes_dict = {} for c in kpl_data_constant.open_limit_up_code_dict_for_radical_buy: blocks = kpl_data_constant.open_limit_up_code_dict_for_radical_buy[c][1] for b in blocks: if b not in open_limit_up_block_codes_dict: open_limit_up_block_codes_dict[b] = set() open_limit_up_block_codes_dict[b].add(c) if block not in open_limit_up_block_codes_dict: return False, "板块未开1" open_limit_up_block_codes = list(open_limit_up_block_codes_dict.get(block)) count = len(open_limit_up_block_codes) # ----获取历史身位---- history_index, history_before_codes_info, limit_up_space_ge_60s_codes = cls.__get_history_index(code, block, yesterday_limit_up_codes) # ----获取实时身位---- current_index, current_before_codes_info = cls.__get_current_index(code, block, yesterday_limit_up_codes, limit_up_time=limit_up_timestamp) exclude_codes = set() # 是否为强势二板开1:该代码所有的慨念必须都为二板开1且开1的只能有他自己 is_strong_2_level = count == 1 and \ kpl_data_constant.open_limit_up_code_dict_for_radical_buy[open_limit_up_block_codes[0]][ 0] == 2 if is_strong_2_level: _code = open_limit_up_block_codes[0] blocks = LimitUpDataConstant.get_blocks_with_history(_code) if blocks: # 所有的慨念的开1代码必须只有他自己 for b in blocks: temp_codes = open_limit_up_block_codes_dict.get(b) if temp_codes and len(temp_codes) > 1: is_strong_2_level = False break if count >= 2 or is_strong_2_level: # 开始数量大于2个或者只有一个2板开1 exclude_codes.clear() else: # 只有一个开1 # 获取包含高位板的身位 # ----获取历史身位---- history_index, history_before_codes_info, limit_up_space_ge_60s_codes = cls.__get_history_index(code, block, set()) # ----获取实时身位---- current_index, current_before_codes_info = cls.__get_current_index(code, block, set(), limit_up_time=limit_up_timestamp) if history_before_codes_info and current_before_codes_info and history_before_codes_info[0][0] == \ current_before_codes_info[0][0]: # 排除第一个非开1数据 exclude_codes = {history_before_codes_info[0][0]} else: return False, f"开1数量:{count},历史-{history_index + 1} 实时-{current_index + 1}" # 获取主板的身位(已经排除了开1的代码) history_index, history_before_codes_info, limit_up_space_ge_60s_codes = cls.__get_history_index(code, block, yesterday_limit_up_codes, exclude_codes=exclude_codes) # 过滤不正的原因 history_index, history_before_codes_info = cls.__filter_before_codes(code, block, history_index, history_before_codes_info, yesterday_limit_up_codes) # 买首板老大/老二 # 首板老大不能买时可买老二 if history_index > 1: return False, f"开1数量:{count},非开1首板身位不匹配:历史-{history_index + 1} 实时-{current_index + 1}" if history_index == 1: # 当前代码为老2,要判断老大是否可买 pre_code = history_before_codes_info[0][0] deal_codes = RadicalBuyDealCodesManager().get_deal_codes() if pre_code in deal_codes: return False, f"开1数量:{count},前排代码已成交:{history_before_codes_info[0]}" # if RadicalBuyDataManager.is_code_can_buy(pre_code, # DealAndDelegateWithBuyModeDataManager().get_deal_codes(), # is_refered=True)[0]: # deal_codes = RadicalBuyDealCodesManager().get_deal_codes() # if pre_code in deal_codes: # return False, f"开1数量:{count},前排代码已成交:{history_before_codes_info[0]}" # else: # if not cls.__is_not_buy_success(pre_code): # return False, f"开1数量:{count},前排代码未下过单:{pre_code}" # return True, f"开1数量:{count},前排代码尚未成交:{history_before_codes_info[0]}" # return True, f"开1数量:{count},前排代码不可买:{history_before_codes_info[0]},历史前排-{history_before_codes_info},开1代码-{open_limit_up_block_codes}" # 曾涨停有后排才能买 return True, f"开1数量:{count},历史-{history_index + 1} 实时-{current_index + 1}, 前排代码-{current_before_codes_info},开1代码-{open_limit_up_block_codes}" @classmethod def __filter_before_codes(cls, code, block, index, before_codes_info, yesterday_codes): """ 过滤前排涨停原因不正/不算身位的代码 @param code: @param block:板块 @param index: 目标代码位置 @param before_codes_info: [(代码, 涨停时间戳)] @return: 过滤之后的 (index, before_codes_info) """ try: if index == 0 or not before_codes_info: return index, before_codes_info temp_index = index temp_before_codes_info = [] # 已经下单过的代码 for b in before_codes_info: # 当作目标票获取扫入板块 code_ = b[0] need_delete = False # 判断是否是不计算身位 if not need_delete: if ExcludeIndexComputeCodesManager.is_in_cache(code_): need_delete = True if need_delete: temp_index -= 1 else: temp_before_codes_info.append(b) if temp_index != index: async_log_util.info(logger_l2_radical_buy_data, f"【板块前排代码过滤】#{code}#{block}#{(index, before_codes_info)}#{(temp_index, temp_before_codes_info)}") return temp_index, temp_before_codes_info except Exception as e: logging.exception(e) async_log_util.error(logger_debug, f"扫入板块过滤出错:{str(e)}") return index, before_codes_info @classmethod def __is_not_buy_success(cls, code_): """ 是否下单了还没买入成功 @param code: @return: """ deal_codes = RadicalBuyDealCodesManager().get_deal_codes() if code_ in BlockPlaceOrderRecordManager().get_codes() and code_ not in deal_codes: return True return False @classmethod def __is_have_back_codes(cls, code, block): """ 板块是否有后排代码 @param code: @param block: @return: """ codes = LimitUpDataConstant.get_history_limit_up_block_codes(block) has_back = False limit_up_timestamp = cls.__get_limit_up_timestamp(code) if codes: for c in codes: if c == code: continue if cls.__get_limit_up_timestamp(c) > limit_up_timestamp: has_back = True break return has_back @classmethod def __is_radical_buy_with_block_up(cls, code, block, yesterday_limit_up_codes): """ 是否激进买(板块突然涨起来) 1.老二和老三的涨停时间相差5分钟内 2.老三不能买顺位到老四(老四与老三相差10分钟内) 3.前2个票不能炸板(历史身位与现在身位一致) 4.除开前两个代码可买老1与老2 5.买老2的情况:老1不满足买入条件 @param code: @param block: @param yesterday_limit_up_codes: @return: """ if not kpl_data_constant.is_new_block(block): return False, "非新题材" # 开始买的身位 2:从老三开始买 1: 从老二开始买 START_BUY_RANK = 1 # 获取当前代码的涨停时间 limit_up_timestamp = cls.__get_limit_up_timestamp(code) # 获取当前的板块, 不要忽略开1的数据 current_index, current_before_codes_info = cls.__get_current_index(code, block, set(), limit_up_time=limit_up_timestamp, ignore_open_limit_up=False) current_before_codes = [x[0] for x in current_before_codes_info] if len(current_before_codes_info) < START_BUY_RANK: return False, f"前排代码小于{START_BUY_RANK}个:{current_before_codes_info}" # 当前代码开1不能买 if limit_up_timestamp < kpl_block_util.open_limit_up_time_range[1]: return False, f"当前代码开1" # 扩大到5小时 THRESHOLD_MINUTES = 60 * 5 if tool.trade_time_sub(tool.timestamp_format(limit_up_timestamp, '%H:%M:%S'), tool.timestamp_format(current_before_codes_info[-1][1], '%H:%M:%S')) >= THRESHOLD_MINUTES * 60: return False, f"距离上个代码涨停已过去{THRESHOLD_MINUTES}分钟({current_before_codes_info[-1]})" # 包含高位板的整体排序 all_history_index, all_history_before_codes_info, limit_up_space_ge_60s_codes = cls.__get_history_index(code, block, set(), ignore_open_limit_up=False) history_index_first, history_before_codes_info_first, limit_up_space_ge_60s_codes = cls.__get_history_index( code, block, yesterday_limit_up_codes, ignore_open_limit_up=False) all_first_history_before_codes = [x[0] for x in history_before_codes_info_first] if not constant.TEST: # 前两个代码是否有炸板 # dif_codes = set(all_history_before_codes[:2]) - set(current_before_codes[:2]) # if dif_codes: # return False, f"前2代码有炸板:{dif_codes}, 前排代码:{all_history_before_codes}" # 前排代码炸板不能>=2个 # 前排首板代码炸板数量在2个及以上不能买 dif_codes = set(all_first_history_before_codes) - set(current_before_codes) if len(dif_codes) >= 2: return False, f"板块前排首板有{len(dif_codes)}个炸板" # 剔除所有开1的票 exclude_codes = set() for x in all_history_before_codes_info: if x[1] < kpl_block_util.open_limit_up_time_range[1]: exclude_codes.add(x[0]) # 除去前二代码与开1代码之后是否为首板老大:所有开1的视为1个 open_count = len(exclude_codes) if open_count > 0 and open_count + 1 <= len(current_before_codes): # 前排有开1且除开开1之后是首板老大 exclude_codes |= set(all_first_history_before_codes[open_count:open_count + START_BUY_RANK]) else: # 排除首板老大 exclude_codes |= set(all_first_history_before_codes[:START_BUY_RANK]) # 开1排除 open_limit_up_code_dict = kpl_data_constant.open_limit_up_code_dict_for_radical_buy if open_limit_up_code_dict: exclude_codes |= set(open_limit_up_code_dict.keys()) origin_history_index, origin_history_before_codes_info, limit_up_space_ge_60s_codes = cls.__get_history_index( code, block, yesterday_limit_up_codes, exclude_codes) # 过滤不正的原因 history_index, history_before_codes_info = cls.__filter_before_codes(code, block, origin_history_index, origin_history_before_codes_info, yesterday_limit_up_codes) # 获取本板块买入代码的最大数量 max_count = RadicalBuyBlockCodeCountManager().get_block_code_count(block) if history_index > max_count: return False, f"排除前{START_BUY_RANK}-{exclude_codes},目标代码位于历史身位-{history_index + 1},前排代码:{history_before_codes_info}, 板块最多可买{max_count}" if max_count == 1: if history_index == 1: # 首板老2,判断前面的老大是否是属于不能买的范畴 pre_code = history_before_codes_info[0][0] deal_codes = RadicalBuyDealCodesManager().get_deal_codes() if pre_code in deal_codes: return False, f"前排代码已成交:{pre_code}" return True, f"被顺位代码({pre_code}) 尚未成交" # pre_code不能买,才能买 # if RadicalBuyDataManager.is_code_can_buy(pre_code, # DealAndDelegateWithBuyModeDataManager().get_deal_codes(), # is_refered=True)[0]: # # 前排代码可买而没有买成功的可继续下单 # deal_codes = RadicalBuyDealCodesManager().get_deal_codes() # if pre_code in deal_codes: # return False, f"前排代码已成交:{pre_code}" # pre_code不是因为买不进 # if not cls.__is_not_buy_success(pre_code): # return False, f"前排代码没下过单:{pre_code}" # 前面一个代码不能买,前一个代码必须与前前个代码涨停时间相差15分钟内 # for i in range(len(all_history_before_codes_info) - 1, -1, -1): # if all_history_before_codes_info[i][0] == pre_code: # if i > 0 and tool.trade_time_sub( # tool.timestamp_format(all_history_before_codes_info[i][1], '%H:%M:%S'), # tool.timestamp_format(all_history_before_codes_info[i - 1][1], # '%H:%M:%S')) >= THRESHOLD_MINUTES * 60: # return False, f"被顺位代码({pre_code}) 与上个代码涨停时间>={THRESHOLD_MINUTES}分钟 ({all_history_before_codes_info[i]}##{all_history_before_codes_info[i - 1]})" # else: # return True, f"被顺位代码({pre_code}) 与上个代码涨停时间<{THRESHOLD_MINUTES}分钟 ({all_history_before_codes_info[i]})" # return False, f"没找到顺位代码({pre_code})的前排代码" else: # 代码为目标代码 pass else: # 代码为目标代码 pass return True, f"满足买入需求: 实时前排代码-{current_before_codes_info} 历史前排代码-{history_before_codes_info}" @classmethod def __is_first_can_buy(cls, code, block): """ 老大是否可买:整个板块数量>=2个代码老大可买 @param code: @param block: @return: """ if not kpl_data_constant.is_new_block(block): return False, "非新题材" # 获取身位 current_index, current_before_codes_info = cls.__get_current_index(code, block, set(), limit_up_time=cls.__get_limit_up_timestamp( code)) history_index, history_before_codes_info, limit_up_space_ge_60s_codes = cls.__get_history_index(code, block, set()) if current_index != history_index: return False, f"有其他炸板" if current_index >= 1: return False, f"不是板块老大" history_codes = set() # 获取板块炸板情况 for k in LimitUpDataConstant.history_limit_up_datas: _code = k[3] if _code not in limit_up_space_ge_60s_codes: continue blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(_code) if not blocks: blocks = set() # 不是这个板块 if block in blocks: history_codes.add(_code) if len(history_codes) < 2: return False, f"板块历史涨停小于2个:{history_codes}" # 获取当前涨停数量 # current_codes = set() # for k in LimitUpDataConstant.current_limit_up_datas: # _code = k[0] # blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) # if not blocks: # blocks = set() # blocks = BlockMapManager().filter_blocks(blocks) # # 不是这个板块 # if block in blocks: # current_codes.add(_code) # current_codes.add(code) # diff = history_codes - current_codes # if diff: # return False, f"板块炸板不止当前票:{diff}" return True, f"历史涨停:{history_codes}" @classmethod def __block_special_codes(cls, code, block, yesterday_limit_up_codes): """ 是否是这个板块有辨识度的票 @param code: @param block: @return: """ block_codes = BlockSpecialCodesManager().get_block_codes(block) if not block_codes or code not in block_codes: return False, "无辨识度" history_index, history_before_codes_info, limit_up_space_ge_60s_codes = cls.__get_history_index(code, block, yesterday_limit_up_codes, ignore_open_limit_up=False) if history_before_codes_info: before_codes = set([x[0] for x in history_before_codes_info]) before_special_codes = before_codes & set(block_codes) if len(before_special_codes) >= 3: return False, f"前排辨识度>=3个({before_special_codes})" return True, f"处于首板老{history_index + 1}" @classmethod def __today_block_special_codes(cls, code, block, yesterday_limit_up_codes): """ 今日板块辨识度: 板块是净流入排名前5,且板块有≥2个涨停包含目标票,自由市值≥50亿,买首板前二 @param code: @param block: @return: """ zyltgb = global_util.zyltgb_map.get( code) if not zyltgb or zyltgb < 50e8: return False, "自由流通小于50亿" # 板块是否在净流入前5 jx_in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks() if not jx_in_blocks or block not in jx_in_blocks or jx_in_blocks.index(block) >= 5: return False, "尚未在净流入前5" # 板块包含自己至少2个涨停 codes = LimitUpDataConstant.get_current_limit_up_block_codes(block) if not codes: return False, "当前板块尚未涨停" codes = set(codes) codes.add(code) if len(codes) < 2: return False, "板块涨停个数小于2个" history_index, history_before_codes_info, limit_up_space_ge_60s_codes = cls.__get_history_index(code, block, yesterday_limit_up_codes, ignore_open_limit_up=False) if history_before_codes_info: before_codes = [x[0] for x in history_before_codes_info] big_zylt_count = 0 for c in before_codes: zyltgb = global_util.zyltgb_map.get( c) if zyltgb and zyltgb >= 50e8: big_zylt_count += 1 if big_zylt_count >= 2: return False, "前排有2个大市值票" return True, f"处于首板老{history_index + 1}" @classmethod def is_today_block_special_codes(cls, code, block, yesterday_limit_up_codes): """ 是否有今日辨识度 @param code: @param block: @param yesterday_limit_up_codes: @return: """ result = cls.__today_block_special_codes(code, block, yesterday_limit_up_codes) if result[0]: return True return False @classmethod def __get_limit_up_timestamp(cls, code): """ 获取代码的涨停时间,默认当前时间 @param code: @return: """ limit_up_timestamp = LimitUpDataConstant.get_first_limit_up_time(code) if not limit_up_timestamp: limit_up_timestamp = time.time() return limit_up_timestamp @classmethod def get_code_blocks(cls, code): """ 获取目标代码的板块 @param code: @return: 过滤后的板块,过滤前的板块 """ # 新版本 before_fblocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(code) if not before_fblocks: before_fblocks = set() fblocks = BlockMapManager().filter_blocks(before_fblocks) if fblocks: fblocks -= constant.KPL_INVALID_BLOCKS return fblocks, before_fblocks @classmethod def is_block_can_buy_with_block_in(cls, code, block, yesterday_limit_up_codes, is_for_buy=False): """ 根据板块流入判断板块是否可买 @param code: @param block: @param is_for_buy: 是否是买入 @return:板块是否可买, 是否需要大单够 """ if RealTimeKplMarketData.is_ignore_block_in_money(): return True, False jx_in_blocks = RealTimeKplMarketData.get_top_market_jingxuan_blocks() if jx_in_blocks and block in jx_in_blocks: return True, False if kpl_data_constant.is_new_block(block): # 今日新板块不考虑净流入 return True, False # 10点之前不考虑净流入 if tool.get_now_time_as_int() <= 100000: return True, False # 辨识度不在流入的票打回封(回封已经在订阅的时候处理了) if is_for_buy: # 有辨识度且不在流出 special_codes = BlockSpecialCodesManager().get_block_codes(block) if special_codes and code in special_codes: out_blocks = RealTimeKplMarketData.get_top_market_jingxuan_out_blocks() if not out_blocks or block not in out_blocks: return True, False # 获取当前板块涨停数量 codes = LimitUpDataConstant.get_current_limit_up_block_codes(block) if codes and len(codes) >= 5: return True, False else: codes = LimitUpDataConstant.get_history_limit_up_block_codes(block) if codes and len(codes) >= 3: # 10点之后净流入是正才能订阅 in_money = RealTimeKplMarketData.get_jx_block_in_money(block) if tool.get_now_time_as_int() > 100000 and (not in_money or in_money < 0): # 10点以后且净流入不为正 return False, False # 净流入不为负 # 获取首板的身位 info = RadicalBuyBlockManager().get_history_index(code, block, yesterday_limit_up_codes, ignore_open_limit_up=False) if info[0] < 2: # 只能买板块前2 if is_for_buy: # 买入 if tool.get_now_time_as_int() < 100000: return True, True else: # 订阅 return True, False return False, False @classmethod def is_radical_buy(cls, code, yesterday_limit_up_codes): """ 是否是激进买 @param code: @return: {激进买的板块}, 原因 """ # =========计算代码买入的目标板块=========== # 获取开1的板块 open_limit_up_code_dict = kpl_data_constant.open_limit_up_code_dict_for_radical_buy open_limit_up_blocks = set() if open_limit_up_code_dict: for c in open_limit_up_code_dict: open_limit_up_blocks |= open_limit_up_code_dict[c][1] keys_, info = cls.get_code_blocks(code) if WantBuyCodesManager().is_in_cache(code): return set(keys_) if keys_ else {"无板块"}, "已加想" # 获取代码的板块 if not keys_: return set(), "没获取到板块" # 获取精选净流入 keys_in_jx = set() msgs = [] for k in keys_: block_info = cls.is_block_can_buy_with_block_in(code, k, yesterday_limit_up_codes, is_for_buy=True) if block_info[0]: # if block_info[1] and False: # # 暂时不需要: 要求大单够了才买 # big_deal_info = get_total_deal_big_order_info(code, gpcode_manager.get_limit_up_price_as_num(code), is_for_buy=True) # if big_deal_info[0] <= 0: # keys_in_jx.add(k) # else: # msgs.append(f"【{k}】大单不够") # else: keys_in_jx.add(k) if not keys_in_jx: msgs.append(f"【{keys_}】板块未在精选流入中:") # 板块尚未精选流入,且板块涨停代码个数是否<5个 return set(), ",".join(msgs) keys_ = keys_in_jx today_history_limit_up_codes = set([d[3] for d in LimitUpDataConstant.history_limit_up_datas]) # 初次上板买入方式 fmsges = [] msges = [] can_buy_blocks = set() if not is_first_limit_up_buy(code): # 回封买入方式 match_blocks = open_limit_up_blocks & keys_ for b in match_blocks: # 判断板块是否该激进买 result = cls.__is_radical_buy_with_open_limitup(code, b, yesterday_limit_up_codes) if result[0]: can_buy_blocks.add(b) msges.append(f"【{b}】:{result[1]}") fmsges.append("开1判断##" + ",".join(msges)) if not can_buy_blocks: msges.clear() for b in keys_: # 板块快速启动 result = cls.__is_radical_buy_with_block_up(code, b, yesterday_limit_up_codes) if result[0]: can_buy_blocks.add(b) msges.append(f"【{b}】:{result[1]}") fmsges.append("板块快速启动判断##" + ",".join(msges)) if not can_buy_blocks: msges.clear() for b in keys_: result = cls.__is_first_can_buy(code, b) if result[0]: can_buy_blocks.add(b) msges.append(f"【{b}】:{result[1]}") fmsges.append("板块回封判断##" + ",".join(msges)) if not can_buy_blocks: msges.clear() for b in keys_: result = cls.__block_special_codes(code, b, yesterday_limit_up_codes) if result[0]: can_buy_blocks.add(b) msges.append(f"【{b}】:{result[1]}") fmsges.append("板块辨识度判断##" + ",".join(msges)) else: # 首次上板, 只买辨识度 if not can_buy_blocks: msges.clear() for b in keys_: result = cls.__block_special_codes(code, b, yesterday_limit_up_codes) if result[0]: can_buy_blocks.add(b) msges.append(f"【{b}】:{result[1]}") fmsges.append("首封板块辨识度判断##" + ",".join(msges)) # 今日辨识度判断 msges = [] for b in keys_: result = cls.__today_block_special_codes(code, b, yesterday_limit_up_codes) if result[0]: can_buy_blocks.add(b) msges.append(f"【{b}】:{result[1]}") fmsges.append("今日板块辨识度判断##" + ",".join(msges)) # 新题材只买前2 if can_buy_blocks: msges.clear() # 判断代码是否破前高 is_new_high = False k_format = code_nature_analyse.CodeNatureRecordManager().get_k_format_cache(code) if k_format and k_format[1][0]: # 突破形态 is_new_high = True delete_blocks = set() for b in can_buy_blocks: # 板块 if kpl_data_constant.is_new_block(b): if not is_new_high: msges.append(f"【{b}】:新题材没破前高") delete_blocks.add(b) continue # 新题材只买前2 history_index, history_before_codes_info, limit_up_space_ge_60s_codes = cls.__get_history_index( code, b, yesterday_limit_up_codes) if history_index > 1: delete_blocks.add(b) msges.append(f"【{b}】:新题材只买前2") continue can_buy_blocks -= delete_blocks if msges: fmsges.append("新题材判断##" + ",".join(msges)) # 判断板块是否不能买 if can_buy_blocks: msges.clear() forbidden_blocks = set() is_first_limit_up = is_first_limit_up_buy(code) for b in can_buy_blocks: result = is_can_buy_for_forbidden_plate(code, b, yesterday_limit_up_codes, is_first_limit_up) if not result[0]: forbidden_blocks.add(b) msges.append(f"【{b}】-{result[1]}") if forbidden_blocks: fmsges.append("禁止买入的板块##" + ",".join(msges)) # 排除禁止买入的板块 can_buy_blocks -= set(forbidden_blocks) return can_buy_blocks, " **** ".join(fmsges) class ExcludeIndexComputeCodesManager: """ 排除身位计算管理器(始终都要排除) """ __codes_cache = set() @classmethod def add_code(cls, code): cls.__codes_cache.add(code) @classmethod def remove_code(cls, code): cls.__codes_cache.discard(code) @classmethod def get_all_codes(cls): return cls.__codes_cache @classmethod def is_in_cache(cls, code): return code in cls.__codes_cache class BlockPlaceOrderRecordManager: """ 板块下单记录 """ __db = 2 __redis_manager = redis_manager.RedisManager(2) __instance = None # 下单板块的代码记录 __block_record_codes_dict = {} __codes = set() def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(BlockPlaceOrderRecordManager, cls).__new__(cls, *args, **kwargs) cls.__load_data() return cls.__instance @classmethod def __get_redis(cls): return cls.__redis_manager.getRedis() @classmethod def __load_data(cls): val = RedisUtils.get(cls.__get_redis(), "radical_place_order_block_records") if val: try: val = json.loads(val) for v in val: cls.__block_record_codes_dict[v[0]] = set(v[1]) cls.__codes |= set(v[1]) except: pass def add_record(self, code, blocks): """ 添加下单记录 @param code: @param blocks: @return: """ if blocks: for b in blocks: if b not in self.__block_record_codes_dict: self.__block_record_codes_dict[b] = set() self.__block_record_codes_dict[b].add(code) datas = [(b, list(self.__block_record_codes_dict[b])) for b in self.__block_record_codes_dict] RedisUtils.set_async(self.__db, "radical_place_order_block_records", tool.get_expire(), json.dumps(datas)) self.__codes.add(code) def get_block_codes(self, block): """ 获取板块下过单的代码 @param block: @return: """ if block in self.__block_record_codes_dict: return self.__block_record_codes_dict[block] return set() def get_codes(self): return self.__codes class TotalDealBigOrderInfoManager: """ 总成交大单信息管理 """ # {代码:(数据,更新时间)} __l2_big_order_deal_info_dict = {} # 大单成交比例 __l2_big_order_deal_rate_dict = {} @classmethod def update_big_order_info(cls, code, total_trade_value): """ 更新大单信息 @param code: 代码 @param total_trade_value: 总成交额 @return: """ # (净流入金额, (大单买金额, 大单买数量), (大单卖金额, 大单卖数量)) deal_big_order_detail_info = get_l2_big_order_deal_info(code) if deal_big_order_detail_info: cls.__l2_big_order_deal_info_dict[code] = (deal_big_order_detail_info, tool.get_now_time_str()) if total_trade_value == 0: total_trade_value = 1 cls.__l2_big_order_deal_rate_dict[code] = round(deal_big_order_detail_info[1][0] / total_trade_value, 2) @classmethod def get_big_order_info(cls, code): """ @param code: @return:((净流入金额, (大单买金额, 大单买数量), (大单卖金额, 大单卖数量)), 更新时间) """ return cls.__l2_big_order_deal_info_dict.get(code) @classmethod def get_big_order_rate(cls, code): """ 获取大单占整个成交额的比例 @param code: @return: """ return cls.__l2_big_order_deal_rate_dict.get(code) def get_total_detal_big_order_details(code): """ 获取成交大单详情 @param code: @return: 买大单, 上板前买大单, 卖大单, 上板前卖大单 """ threshold_sell_money = BeforeSubDealBigOrderManager().get_big_sell_order_threshold(code) buy_money = BigOrderDealManager().get_total_buy_money(code) sell_money = BigOrderDealManager().get_total_sell_money(code, threshold_money=threshold_sell_money) before_buy_money = BeforeSubDealBigOrderManager().get_deal_big_order_money(code) before_sell_money = BeforeSubDealBigOrderManager().get_sell_deal_big_order_money(code, threshold_sell_money) return buy_money, before_buy_money, sell_money, before_sell_money def compute_total_deal_big_order_threshold_money(code, limit_up_price, threshold_money_per_order): """ 计算累计大单足够的阈值 @param code: @param limit_up_price: @param threshold_money_per_order: 均大单阈值 @return: """ TOTAL_BIG_DEAL_MONEY_THRESHOLD_COUNT = round( code_volumn_manager.CodeVolumeManager().get_radical_buy_refer_volume(code, limit_up_price) * limit_up_price / 1e8, 2) * 2.5 if tool.is_ge_code(code): TOTAL_BIG_DEAL_MONEY_THRESHOLD_COUNT *= 1.5 return TOTAL_BIG_DEAL_MONEY_THRESHOLD_COUNT * threshold_money_per_order def get_total_deal_big_order_info(code, limit_up_price, is_for_buy=False): """ 总成交大单啊是否足够 @param code: @return:(缺少的资金, 净成交金额, 要求的大单金额, 计算得到的大单阈值金额, 人为设置的大单) """ THRESHOLD_MONEY, is_temp_threshold_money = BeforeSubDealBigOrderManager().get_big_order_threshold_info(code) TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY_WITH_COMPUTE = compute_total_deal_big_order_threshold_money(code, limit_up_price, THRESHOLD_MONEY) human_setting_money = TotalDealBigOrderThresholdMoneyManager().get_money_cache(code) if human_setting_money is not None: TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY = int(human_setting_money) else: TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY = TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY_WITH_COMPUTE # if is_for_buy and is_temp_threshold_money: # # 首次上板买入,大单阈值打3折 # TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY //= 3 # 买大单, 上板前买大单, 卖大单, 上板前卖大单 big_order_detail_info = get_total_detal_big_order_details(code) deal_big_order_money = big_order_detail_info[0] + big_order_detail_info[1] - big_order_detail_info[2] - \ big_order_detail_info[3] try: # 获取正在成交的订单 dealing_order_info = HuaXinBuyOrderManager().get_dealing_order_info(code) threshold_big_money = l2_data_util.get_big_money_val(limit_up_price, tool.is_ge_code(code)) if dealing_order_info and dealing_order_info[2] >= threshold_big_money: # 正在成交的订单是大单 deal_big_order_money += dealing_order_info[2] except Exception as e: async_log_util.info(logger_l2_radical_buy, f"计算正在成交大单出错:{str(e)}") total_lack_money = max(0, int(TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY - deal_big_order_money)) return total_lack_money, deal_big_order_money, TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY, TOTAL_BIG_DEAL_MONEY_THRESHOLD_MONEY_WITH_COMPUTE, human_setting_money def is_big_order_deal_enough(code, volume_rate, refer_total_sell_money, for_buy=False, is_almost_open_limit_up=False): """ 大单成交是否足够 @param is_almost_open_limit_up: 是否是板上放量买入 @param refer_total_sell_money: 引用的总卖额 @param code: @param volume_rate: @param for_buy: 是否是下单 @return: 大单是否足够, 原因, 是否是短时生效, 还差的金额, 累计大单是否足够, 当前批次缺少的大单, 累计缺少的大单 """ limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) refer_volume = code_volumn_manager.CodeVolumeManager().get_radical_buy_refer_volume(code, limit_up_price) if refer_volume is None: refer_volume = 0 money_y = int(refer_volume * limit_up_price / 1e8) money_y = min(money_y, 50) money_y = max(money_y, 5) before_time = tool.get_now_time_as_int() < 93100 # 计算大单参考数量 current_threshold_count = 1 # int(round(0.4 * money_y)) if refer_total_sell_money >= 1e7: current_threshold_count = 2 else: current_threshold_count = 1 if before_time: current_threshold_count = int(round(0.4 * money_y * 1.5)) # 是不是板上放量 if is_almost_open_limit_up: # 自由流通市值50亿以上的不处理 zyltgb = global_util.zyltgb_map.get( code) if zyltgb and zyltgb > 50e8: # 需要4倍均大单 current_threshold_count = 4 is_first = is_first_limit_up_buy(code) if not is_first: # 如果是回封下单阈值就采用均大单 THRESHOLD_MONEY, is_temp_threshold_money = BeforeSubDealBigOrderManager().get_big_order_threshold_info(code) if THRESHOLD_MONEY < 600e4 and current_threshold_count < 3: # 均大单金额≤600万,则就需要加倍大单数 current_threshold_count *= 2 current_threshold_money = current_threshold_count * THRESHOLD_MONEY else: THRESHOLD_MONEY = 2990000 if is_almost_open_limit_up: THRESHOLD_MONEY, is_default = BeforeSubDealBigOrderManager().get_big_order_threshold(code) if is_default: # 如果回封均大单是默认值就取首封均大单 temp_info = BeforeSubDealBigOrderManager().get_temp_deal_big_order_threshold_info(code) if temp_info: THRESHOLD_MONEY = temp_info[0] if THRESHOLD_MONEY < 600e4 and current_threshold_count < 3: # 均大单金额≤600万,则就需要加倍大单数 current_threshold_count *= 2 current_threshold_money = current_threshold_count * THRESHOLD_MONEY # ==========判断总大单成交================ total_lack_money_info = get_total_deal_big_order_info(code, limit_up_price, is_for_buy=for_buy) total_lack_money, total_deal_money, total_threshold_money = total_lack_money_info[0], total_lack_money_info[1], \ total_lack_money_info[2] if gpcode_manager.GreenListCodeManager().is_in_cache(code): # 加绿之后不考虑总大单 total_lack_money = 0 # ===========判断单次大单成交============== current_big_order_deal_money = 0 current_big_order_deal_money_info = EveryLimitupBigDealOrderManager.get_big_buy_deal_order_money_info(code) if current_big_order_deal_money_info: if tool.trade_time_sub(tool.get_now_time_str(), current_big_order_deal_money_info[1]) > 60: # 60s以上的大单不看 current_big_order_deal_money = 0 else: current_big_order_deal_money = current_big_order_deal_money_info[0] current_lack_money = max(0, int(current_threshold_money - current_big_order_deal_money)) if for_buy and not tool.is_ge_code(code): # 要下单的且不是创业版的目标代码大单数量打8折 if is_first_limit_up_buy(code): # 首封不打折 total_lack_money = int(total_threshold_money * 0.8 - total_deal_money) else: total_lack_money = int(total_threshold_money - total_deal_money) if total_lack_money < 0: total_lack_money = 0 if current_lack_money == 0 and total_lack_money == 0: return True, f"量比-{volume_rate}, 瞬时大单成交-({current_big_order_deal_money}/{current_threshold_money}),总大单成交-({total_lack_money_info[1]}/{total_lack_money_info[2]})", before_time, 0, total_lack_money <= 0, current_lack_money, total_lack_money return False, f"量比-{volume_rate}, 瞬时大单成交-({current_big_order_deal_money}/{current_threshold_money}),总大单成交-({total_lack_money_info[1]}/{total_lack_money_info[2]})", before_time, max( current_lack_money, total_lack_money), total_lack_money <= 0, current_lack_money, total_lack_money class EveryLimitupBigDealOrderManager: """ 每次上板的成交大单管理 """ # 成交大单的订单号:{"code":{"order_no",...}} __deal_big_order_infos_dict = {} # 成交大单号 __deal_big_order_no_dict = {} @classmethod def open_limit_up(cls, code, msg=""): if code in cls.__deal_big_order_infos_dict: cls.__deal_big_order_infos_dict[code].clear() l2_log.info(code, logger_l2_radical_buy_data, f"清除每次涨停大单数据({msg}):{code}") @classmethod def clear(cls, code, msg=""): cls.open_limit_up(code, msg) @classmethod def add_big_buy_order_deal(cls, code, order_infos: list): """ 加入大单成交 @param code: @param order_infos:[(订单号,金额, 最后成交时间)] @return: """ if code not in cls.__deal_big_order_infos_dict: cls.__deal_big_order_infos_dict[code] = [] if code not in cls.__deal_big_order_no_dict: cls.__deal_big_order_no_dict[code] = set() for order_info in order_infos: if order_info[0] not in cls.__deal_big_order_no_dict[code]: cls.__deal_big_order_infos_dict[code].append(order_info) cls.__deal_big_order_no_dict[code].add(order_info[0]) l2_log.info(code, logger_l2_radical_buy_data, f"添加每次上板的大单成交:{code}-{order_info}") @classmethod def get_big_buy_deal_order_count(cls, code): if code in cls.__deal_big_order_infos_dict: return len(cls.__deal_big_order_infos_dict[code]) return 0 @classmethod def get_big_buy_deal_order_money(cls, code): if code in cls.__deal_big_order_infos_dict: return sum([x[1] for x in cls.__deal_big_order_infos_dict[code]]) return 0 @classmethod def get_big_buy_deal_order_money_info(cls, code): """ 获取成交大单的信息 @param code: @return: (总共大单成交金额, 最近成交大单的最后成交时间) """ if cls.__deal_big_order_infos_dict.get(code): return sum([x[1] for x in cls.__deal_big_order_infos_dict[code]]), l2_huaxin_util.convert_time( cls.__deal_big_order_infos_dict[code][-1][2]) return None @classmethod def list_big_buy_deal_orders(cls, code): """ @param code: @return:[(订单号,金额, 最后成交时间)] """ return cls.__deal_big_order_infos_dict.get(code, []) class EveryLimitupBigDelegateOrderManager: """ 每次上板的委托大单管理 """ # 成交大单的订单号:{"code":{"order_no",...}} __delegate_big_order_infos_dict = {} # 成交大单号 __delegate_big_order_no_dict = {} @classmethod def clear(cls, code, msg=""): if code in cls.__delegate_big_order_infos_dict: cls.__delegate_big_order_infos_dict.pop(code) if code in cls.__delegate_big_order_no_dict: cls.__delegate_big_order_no_dict.pop(code) @classmethod def add_big_buy_order_delegate(cls, code, order_infos: list): """ 加入大单成交 @param code: @param order_infos:[(订单号,金额, 最后成交时间)] @return: """ if code not in cls.__delegate_big_order_infos_dict: cls.__delegate_big_order_infos_dict[code] = [] if code not in cls.__delegate_big_order_no_dict: cls.__delegate_big_order_no_dict[code] = set() for order_info in order_infos: if order_info[0] not in cls.__delegate_big_order_no_dict[code]: cls.__delegate_big_order_infos_dict[code].append(order_info) cls.__delegate_big_order_no_dict[code].add(order_info[0]) async_log_util.info(logger_l2_radical_buy_data, f"添加每次上板的大单委托:{code}-{order_info}") @classmethod def get_big_buy_delegate_order_count(cls, code): if code in cls.__delegate_big_order_infos_dict: return len(cls.__delegate_big_order_infos_dict[code]) return 0 @classmethod def get_big_buy_delegate_order_money(cls, code): if code in cls.__delegate_big_order_infos_dict: return sum([x[1] for x in cls.__delegate_big_order_infos_dict[code]]) return 0 @classmethod def get_big_buy_delegate_order_money_info(cls, code): return cls.__delegate_big_order_infos_dict.get(code) def __get_deal_reasons(code): """ 获取成交的原因 @param code: @return: """ reasons = set() limit_up_reason = None # 当前的涨停原因 # limit_up_reason = kpl_data_manager.LimitUpDataConstant.get_limit_up_reason_with_history(code) # if limit_up_reason and limit_up_reason in constant.KPL_INVALID_BLOCKS: # limit_up_reason = None # 如果涨停原因为空就需要获取上次激进买的原因 if limit_up_reason: reasons.add(limit_up_reason) if not limit_up_reason: radical_buy_deal_blocks = RadicalBuyDealCodesManager().get_code_blocks(code) if radical_buy_deal_blocks: reasons |= radical_buy_deal_blocks return reasons def is_block_can_radical_buy(code, radical_buy_blocks, deal_codes): """ 板块是否还能买入:如果有身位更加靠前的买入一个了可以下单 @param code: @param radical_buy_blocks: 板块 @param deal_codes: 成交的代码 @return: """ # 原因下面的代码个数 deal_reason_codes = {} # 加想不管板块是否成交 if gpcode_manager.WantBuyCodesManager().is_in_cache(code): return radical_buy_blocks for dc in deal_codes: # 获取涨停原因 reasons = __get_deal_reasons(dc) for r in reasons: if r not in deal_reason_codes: deal_reason_codes[r] = set() deal_reason_codes[r].add(dc) # async_log_util.info(logger_l2_radical_buy, f"已经成交的板块:{code}-{deal_reason_codes.keys()}") f_buy_blocks = set() for b in radical_buy_blocks: # 获取板块的最大买入个数 max_count = RadicalBuyBlockCodeCountManager().get_block_code_count(b) if b in deal_reason_codes and len(deal_reason_codes[b]) >= max_count: if len(deal_reason_codes[b]) >= max_count + 1: continue else: # 当前代码身位比成交代码身位靠前的可以多成交一个代码 is_front = False # 暂时注释 # limit_up_timestamp = LimitUpDataConstant.get_first_limit_up_time(code) # if not limit_up_timestamp: # limit_up_timestamp = time.time() # for c in deal_reason_codes[b]: # # 获取涨停时间 # timestamp = LimitUpDataConstant.get_first_limit_up_time(c) # if timestamp and timestamp > limit_up_timestamp: # # 比目标代码后涨停 # is_front = True # async_log_util.info(logger_l2_radical_buy, f"前排代码还可买入:{b}-{code},后排成交代码-{c}") # break if not is_front: continue f_buy_blocks.add(b) # async_log_util.info(logger_l2_radical_buy, f"还可以买入的板块:{code}-{f_buy_blocks}") return f_buy_blocks def get_deal_codes_by_block(block, deal_codes): """ 获取板块成交的代码 @param block: @param deal_codes: @return: """ codes = set() for dc in deal_codes: # 获取涨停原因 reasons = __get_deal_reasons(dc) if block in reasons: codes.add(block) return codes def get_volume_rate_threshold(code, volume_rate): """ 获取吃卖1的比例 @param code: 代码 @param volume_rate:量比 @return: """ fvolume_rate = volume_rate if volume_rate <= 0.05: fvolume_rate = 0.05 elif volume_rate > 1: fvolume_rate = 1 if tool.is_sh_code(code): return round(0 - 0.44 * fvolume_rate + 0.822, 3) else: return round(0 - 0.44 * fvolume_rate + 0.722, 3) def request_deal_big_orders(code_): """ 请求大单成交 @param code_: @return: """ response_data = requests.get( "http://127.0.0.1:9005/get_big_order_list?code=" + code_) r_str = response_data.text response_data = json.loads(r_str) if response_data["code"] == 0: datas = response_data["data"] async_log_util.info(logger_debug, f"拉取订阅之前的数据:{code_}-{datas}") if datas: # [(金额,价格,订单号)] buy_datas = datas["buy"] # [(金额,价格,订单号)] sell_datas = datas["sell"] return buy_datas, sell_datas return None def pull_pre_deal_big_orders(code_): """ 拉取订阅之前成交的大单 @param code_: @return: """ # 只有涨停过的代码才需要拉大单 if code_ not in LimitUpDataConstant.get_history_limit_up_codes(): return result = request_deal_big_orders(code_) if result: try: buy_datas, sell_datas = result[0], result[1] BeforeSubDealBigOrderManager().set_big_deal_order_list(code_, buy_datas, sell_datas, gpcode_manager.get_limit_up_price_as_num(code_)) except Exception as e: logger_debug.exception(e) def pull_pre_deal_big_orders_by_codes(codes): for code in codes: try: pull_pre_deal_big_orders(code) except Exception as e: logger_debug.exception(e) def get_l2_big_order_deal_info(code_): """ 获取成交大单信息 @param code_: @return:(净流入金额, (大单买金额, 大单买数量), (大单卖金额, 大单卖数量)) """ response_data = requests.get( "http://127.0.0.1:9005/get_code_money_info?code=" + code_) r_str = response_data.text response_data = json.loads(r_str) if response_data["code"] == 0: datas = response_data["data"] return datas return None def list_l2_big_order_deal_info(codes): """ 获取成交大单信息 @param code_: @return:{"code": (净流入金额, (大单买金额, 大单买数量), (大单卖金额, 大单卖数量))} """ response_data = requests.get( "http://127.0.0.1:9005/get_codes_money_info?codes=" + urllib.parse.quote(json.dumps(list(codes)))) r_str = response_data.text response_data = json.loads(r_str) if response_data["code"] == 0: datas = response_data["data"] return datas return None def is_first_limit_up_buy(code): """ 是否是首封下单: (下单次数为0+没在涨停代码中) 或者 (处于下单状态 + 下单次数为1,且下单为首次下单) @param code: @return: """ is_limit_up = code in LimitUpDataConstant.get_history_limit_up_codes() place_order_count = trade_data_manager.PlaceOrderCountManager().get_place_order_count(code) if place_order_count is None: place_order_count = 0 if not is_limit_up and place_order_count == 0: return True # 如果当前代码处于下单状态,就判断下单参数中是否为首封下单 trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code) if place_order_count == 1: if trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_constant.TRADE_STATE_BUY_DELEGATED: # 处于下单状态, 需要计算是否为首封下单 order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(code) if order_begin_pos and order_begin_pos.first_limit_up_buy: return True return False def is_can_buy_for_forbidden_plate(code, block, yesterday_limit_up_codes, is_first_limit_up): """ 被禁止的板块是否可买 @param code: @param block: @return: """ if not KPLPlateForbiddenManager().is_in_cache(block): # 板块没有被禁止 return True, "" if is_first_limit_up: # 禁止买入的板块只能买汇丰 return False, "禁止买入的板块-只能买回封" special_codes = BlockSpecialCodesManager().get_block_codes(block) if not special_codes: # 无辨识度 return False, "禁止买入的板块-不具辨识度" if code not in special_codes: # 当前票无辨识度 return False, "禁止买入的板块-不具辨识度" # 买辨识度老大+流入前5的辨识度老2 data = RadicalBuyBlockManager().get_history_index(code, block, yesterday_limit_up_codes) if data[1]: before_codes = set([x[0] for x in data[1]]) before_codes &= set(special_codes) if len(before_codes) > 1: # 辨识度老三及以后不能买 return False, "禁止买入的板块-不能买辨识度老三" if len(before_codes) == 1: # 辨识度老2需要在流入前5 in_blocks: list = RealTimeKplMarketData.get_top_market_jingxuan_blocks() if not in_blocks or block not in in_blocks or in_blocks.index(block) >= 5: return False, "禁止买入的板块辨-识度老2需要在流入前5" return True, "" if __name__ == '__main__': pass