| | |
| | | from code_attribute import gpcode_manager |
| | | |
| | | # 代码股性记录管理 |
| | | from db import redis_manager_delegate as redis_manager |
| | | from utils import tool |
| | | from db.redis_manager_delegate import RedisManager, RedisUtils |
| | | from db.redis_manager_delegate import RedisUtils |
| | | from utils.tool import CodeDataCacheUtil |
| | | |
| | | |
| | | class CodeNatureRecordManager: |
| | | __redisManager = RedisManager(0) |
| | | __k_format_cache = {} |
| | | __nature_cache = {} |
| | | |
| | | __db = 0 |
| | | __instance = None |
| | | __redis_manager = redis_manager.RedisManager(0) |
| | | |
| | | def __new__(cls, *args, **kwargs): |
| | | if not cls.__instance: |
| | | cls.__instance = super(CodeNatureRecordManager, cls).__new__(cls, *args, **kwargs) |
| | | cls.__load_datas() |
| | | return cls.__instance |
| | | |
| | | @classmethod |
| | | def __get_redis(cls): |
| | | return cls.__redisManager.getRedis() |
| | | return cls.__redis_manager.getRedis() |
| | | |
| | | @classmethod |
| | | def __load_datas(cls): |
| | | __redis = cls.__get_redis() |
| | | try: |
| | | keys = RedisUtils.keys(__redis, "k_format-*") |
| | | for k in keys: |
| | | code = k.split("-")[1] |
| | | val = RedisUtils.get(__redis, k) |
| | | val = json.loads(val) |
| | | cls.__k_format_cache[code] = val |
| | | keys = RedisUtils.keys(__redis, "code_nature-*") |
| | | for k in keys: |
| | | code = k.split("-")[1] |
| | | val = RedisUtils.get(__redis, k) |
| | | val = json.loads(val) |
| | | cls.__nature_cache[code] = val |
| | | except: |
| | | pass |
| | | finally: |
| | | RedisUtils.realse(__redis) |
| | | |
| | | # 保存K线形态 |
| | | @classmethod |
| | | def save_k_format(cls, code, k_format): |
| | | RedisUtils.setex(cls.__get_redis(), f"k_format-{code}", tool.get_expire(), json.dumps(k_format)) |
| | | def save_k_format(self, code, k_format): |
| | | self.__k_format_cache[code] = k_format |
| | | RedisUtils.setex_async(self.__db, f"k_format-{code}", tool.get_expire(), json.dumps(k_format)) |
| | | |
| | | @classmethod |
| | | def get_k_format(cls, code): |
| | | val = RedisUtils.get(cls.__get_redis(), f"k_format-{code}") |
| | | def get_k_format(self, code): |
| | | val = RedisUtils.get(self.__get_redis(), f"k_format-{code}") |
| | | if val: |
| | | return json.loads(val) |
| | | return None |
| | | |
| | | @classmethod |
| | | def get_k_format_cache(cls, code): |
| | | val = None |
| | | if code in cls.__k_format_cache: |
| | | val = cls.__k_format_cache[code] |
| | | if not val: |
| | | val = cls.get_k_format(code) |
| | | if val: |
| | | cls.__k_format_cache[code] = val |
| | | def get_k_format_cache(self, code): |
| | | val = self.__k_format_cache.get(code) |
| | | # 复制 |
| | | return copy.deepcopy(val) if val else None |
| | | |
| | | # 保存股性 |
| | | @classmethod |
| | | def save_nature(cls, code, natures): |
| | | RedisUtils.setex(cls.__get_redis(), f"code_nature-{code}", tool.get_expire(), json.dumps(natures)) |
| | | def clear(self): |
| | | self.__k_format_cache.clear() |
| | | self.__nature_cache.clear() |
| | | keys = RedisUtils.keys(self.__get_redis(), "k_format-*") |
| | | for k in keys: |
| | | RedisUtils.delete(self.__get_redis(), k) |
| | | keys = RedisUtils.keys(self.__get_redis(), "code_nature-*") |
| | | for k in keys: |
| | | RedisUtils.delete(self.__get_redis(), k) |
| | | |
| | | @classmethod |
| | | def get_nature(cls, code): |
| | | val = RedisUtils.get(cls.__get_redis(), f"code_nature-{code}") |
| | | # 保存股性 |
| | | |
| | | def save_nature(self, code, natures): |
| | | RedisUtils.setex_async(self.__db, f"code_nature-{code}", tool.get_expire(), json.dumps(natures)) |
| | | |
| | | def get_nature(self, code): |
| | | val = RedisUtils.get(self.__get_redis(), f"code_nature-{code}") |
| | | if val: |
| | | return json.loads(val) |
| | | return None |
| | | |
| | | def get_nature_cache(self, code): |
| | | return self.__nature_cache.get(code) |
| | | |
| | | |
| | | class LatestMaxVolumeManager: |
| | | __db = 0 |
| | | __instance = None |
| | | __redis_manager = redis_manager.RedisManager(0) |
| | | __max_volume_cache = {} |
| | | |
| | | def __new__(cls, *args, **kwargs): |
| | | if not cls.__instance: |
| | | cls.__instance = super(LatestMaxVolumeManager, cls).__new__(cls, *args, **kwargs) |
| | | cls.__load_datas() |
| | | return cls.__instance |
| | | |
| | | @classmethod |
| | | def get_nature_cache(cls, code): |
| | | if code in cls.__nature_cache: |
| | | return cls.__nature_cache[code] |
| | | val = cls.get_nature(code) |
| | | if val: |
| | | cls.__nature_cache[code] = val |
| | | return val |
| | | def __load_datas(cls): |
| | | __redis = cls.__get_redis() |
| | | try: |
| | | keys = RedisUtils.keys(__redis, "is_latest_max_volume-*") |
| | | for k in keys: |
| | | code = k.split("-")[-1] |
| | | val = RedisUtils.get(__redis, k) |
| | | CodeDataCacheUtil.set_cache(cls.__max_volume_cache, code, val) |
| | | finally: |
| | | RedisUtils.realse(__redis) |
| | | |
| | | @classmethod |
| | | def __get_redis(cls): |
| | | return cls.__redis_manager.getRedis() |
| | | |
| | | def __save_has_latest_max_volume(self, code): |
| | | RedisUtils.setex_async(self.__db, f"is_latest_max_volume-{code}", tool.get_expire(), "1") |
| | | |
| | | # 设置最近有最大量 |
| | | def set_has_latest_max_volume(self, code): |
| | | CodeDataCacheUtil.set_cache(self.__max_volume_cache, code, 1) |
| | | self.__save_has_latest_max_volume(code) |
| | | |
| | | # 最近是否有最大量 |
| | | def is_latest_max_volume(self, code): |
| | | return code in self.__max_volume_cache |
| | | |
| | | def clear(self): |
| | | self.__max_volume_cache.clear() |
| | | keys = RedisUtils.keys(self.__get_redis(), "is_latest_max_volume-*") |
| | | for k in keys: |
| | | RedisUtils.delete_async(self.__db, k) |
| | | |
| | | |
| | | # 涨幅过高的票管理 |
| | | class HighIncreaseCodeManager: |
| | | __db = 0 |
| | | __instance = None |
| | | __redis_manager = redis_manager.RedisManager(0) |
| | | __high_increase_codes = set() |
| | | |
| | | def __new__(cls, *args, **kwargs): |
| | | if not cls.__instance: |
| | | cls.__instance = super(HighIncreaseCodeManager, cls).__new__(cls, *args, **kwargs) |
| | | cls.__load_datas() |
| | | return cls.__instance |
| | | |
| | | @classmethod |
| | | def __load_datas(cls): |
| | | __redis = cls.__get_redis() |
| | | try: |
| | | cls.__high_increase_codes = RedisUtils.smembers(__redis, "high_increase_codes") |
| | | finally: |
| | | RedisUtils.realse(__redis) |
| | | |
| | | @classmethod |
| | | def __get_redis(cls): |
| | | return cls.__redis_manager.getRedis() |
| | | |
| | | def add_code(self, code): |
| | | if self.__high_increase_codes is None: |
| | | self.__high_increase_codes = set() |
| | | self.__high_increase_codes.add(code) |
| | | RedisUtils.sadd_async(self.__db, "high_increase_codes", code) |
| | | RedisUtils.expire_async(self.__db, "high_increase_codes", tool.get_expire()) |
| | | |
| | | def is_in(self, code): |
| | | return code in self.__high_increase_codes |
| | | |
| | | # 加载全部 |
| | | def list_all(self): |
| | | return self.__high_increase_codes |
| | | |
| | | def clear(self): |
| | | if self.__high_increase_codes: |
| | | self.__high_increase_codes.clear() |
| | | RedisUtils.delete_async(self.__db, "high_increase_codes") |
| | | |
| | | |
| | | # 设置历史K线 |
| | | def set_record_datas(code, limit_up_price, record_datas): |
| | | k_format = get_k_format(float(limit_up_price), record_datas) |
| | | CodeNatureRecordManager.save_k_format(code, k_format) |
| | | natures = get_nature(record_datas) |
| | | CodeNatureRecordManager.save_nature(code, natures) |
| | | k_format = get_k_format(code, float(limit_up_price), record_datas) |
| | | CodeNatureRecordManager().save_k_format(code, k_format) |
| | | natures = get_nature(code, record_datas) |
| | | CodeNatureRecordManager().save_nature(code, natures) |
| | | |
| | | |
| | | # 获取K线形态 |
| | | # 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V,是否有形态,天量大阳信息,是否具有辨识度) |
| | | def get_k_format(limit_up_price, record_datas): |
| | | p1_data = get_lowest_price_rate(record_datas) |
| | | # 返回 (15个交易日涨幅是否大于24.9%,是否破前高,是否超跌,是否接近前高,是否N,是否V,是否有形态,天量大阳信息,是否具有辨识度,近2天有10天内最大量,上个交易日是否炸板, 上个交易日是否跌停) |
| | | def get_k_format(code, limit_up_price, record_datas): |
| | | p1_data = get_lowest_price_rate(code, record_datas) |
| | | p1 = p1_data[0] >= 0.249, p1_data[1] |
| | | p2 = __is_new_top(limit_up_price, record_datas) |
| | | p3 = __is_lowest(record_datas) |
| | | p4 = __is_near_new_top(limit_up_price, record_datas) |
| | | p5 = __is_n_model(record_datas) |
| | | p6 = __is_v_model(record_datas) |
| | | p8 = __get_big_volumn_info(record_datas) |
| | | p2 = __is_new_top(code, limit_up_price, record_datas) |
| | | p3 = __is_lowest(code, record_datas) |
| | | p4 = __is_near_new_top(code, limit_up_price, record_datas) |
| | | p5 = __is_n_model(code, record_datas) |
| | | p6 = __is_v_model(code, record_datas) |
| | | p8 = __get_big_volumn_info(code, record_datas) |
| | | |
| | | # # N字型包含了N字型 |
| | | # if p5: |
| | |
| | | p7 = (p1[0] or p2[0] or p3[0] or p4[0] or p5[0] or p6[0], '') |
| | | |
| | | # 是否具有辨识度 |
| | | p9 = is_special(record_datas) |
| | | p9 = is_special(code, record_datas) |
| | | p10 = is_latest_10d_max_volume_at_latest_2d(code, record_datas) |
| | | # 最近5天是否有炸板/涨停/跌停 |
| | | p11 = __has_latest_throwing_pressure(code, record_datas, 5) |
| | | # 90天内是否有涨停 |
| | | p12 = __has_limit_up(code, record_datas, 90) |
| | | # 上个交易日是否振幅过大 |
| | | p13 = __is_pre_day_limit_rate_too_low(code, record_datas) |
| | | # 60个交易日是否曾涨停 |
| | | p14 = __has_limited_up(code, record_datas, 60) |
| | | # 昨日是否涨停过 |
| | | p15 = __has_limited_up(code, record_datas, 1) |
| | | # 昨日是否跌停 |
| | | p16 = __is_latest_limit_down(code, record_datas, 1) |
| | | |
| | | return p1, p2, p3, p4, p5, p6, p7, p8, p9 |
| | | return p1, p2, p3, p4, p5, p6, p7, p8, p9, p10, p11, p12, p13, p14, p15, p16 |
| | | |
| | | |
| | | # 是否具有K线形态 |
| | | def is_has_k_format(limit_up_price, record_datas): |
| | | is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v, has_format, volume_info, is_special = get_k_format( |
| | | def is_has_k_format(code, limit_up_price, record_datas): |
| | | is_too_high, is_new_top, is_lowest, is_near_new_top, is_n, is_v, has_format, volume_info, is_special, has_max_volume, open_limit_up, is_limit_up_in_30days, is_latest_limit_down = get_k_format( |
| | | code, |
| | | float(limit_up_price), record_datas) |
| | | if not has_format: |
| | | return False, "不满足K线形态" |
| | |
| | | |
| | | # 获取股性 |
| | | # 返回(是否涨停,首板溢价率,首板炸板溢价率) |
| | | def get_nature(record_datas): |
| | | limit_up_count = get_first_limit_up_count(record_datas) |
| | | premium_rate = get_limit_up_premium_rate(record_datas) |
| | | open_premium_rate = get_open_limit_up_premium_rate(record_datas) |
| | | def get_nature(code, record_datas): |
| | | limit_up_count = get_first_limit_up_count(code, record_datas) |
| | | premium_rate = get_limit_up_premium_rate(code, record_datas) |
| | | open_premium_rate = get_open_limit_up_premium_rate(code, record_datas) |
| | | result = (limit_up_count, premium_rate, open_premium_rate) |
| | | return result |
| | | |
| | | |
| | | # 获取涨幅 |
| | | def get_lowest_price_rate(record_datas): |
| | | def get_lowest_price_rate(code, record_datas): |
| | | datas = copy.deepcopy(record_datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[-10:] |
| | | for data in datas: |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"])) |
| | | if abs(limit_up_price - data["high"]) < 0.01: |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"])) |
| | | if abs(limit_up_price - data["high"]) < 0.001: |
| | | date = data['bob'].strftime("%Y-%m-%d") |
| | | return round((datas[-1]["close"] - data["close"]) / data["close"], 4), date |
| | | return 0, '' |
| | | |
| | | |
| | | # 是否涨得太高 |
| | | def is_up_too_high_in_10d(record_datas): |
| | | def is_up_too_high_in_10d_with_limit_up(code, record_datas): |
| | | datas = copy.deepcopy(record_datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[-10:] |
| | | limit_ups = [] |
| | | limit_up_count = 0 |
| | | max_price = datas[0]["high"] |
| | | for data in datas: |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"])) |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"])) |
| | | date = data['bob'].strftime("%Y-%m-%d") |
| | | if abs(limit_up_price - data["high"]) < 0.01: |
| | | if data["high"] > max_price: |
| | | max_price = data["high"] |
| | | if abs(limit_up_price - data["high"]) < 0.001: |
| | | limit_ups.append((date, True)) |
| | | limit_up_count += 1 |
| | | else: |
| | | limit_ups.append((date, False)) |
| | | # 下降幅度 |
| | | limit_down_rate = round((max_price - datas[-1]["close"]) / max_price, 3) |
| | | |
| | | if limit_up_count < 3: |
| | | return False |
| | |
| | | for t in temp_datas: |
| | | if t[1]: |
| | | t_count += 1 |
| | | if t_count >= 3: |
| | | if t_count >= 3 and limit_down_rate < 0.15: |
| | | # 降幅小于20% |
| | | return True |
| | | |
| | | return False |
| | | |
| | | |
| | | # 10天内的最高量是否集中在最近两天 |
| | | def is_latest_10d_max_volume_at_latest_2d(code, record_datas): |
| | | datas = copy.deepcopy(record_datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[-10:] |
| | | max_volume_info = None |
| | | for i in range(0, len(datas)): |
| | | if not max_volume_info: |
| | | max_volume_info = (i, datas[i]["volume"]) |
| | | else: |
| | | if max_volume_info[1] < datas[i]["volume"]: |
| | | max_volume_info = (i, datas[i]["volume"]) |
| | | return len(datas) - max_volume_info[0] <= 2 |
| | | |
| | | |
| | | # 120 天内是否长得太高 |
| | | def is_up_too_high_in_120d(record_datas): |
| | | def is_up_too_high_in_120d(code, record_datas): |
| | | datas = copy.deepcopy(record_datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[-120:] |
| | | today_limit_up_price = round(float(gpcode_manager.get_limit_up_price_by_preprice(datas[-1]["close"])), 2) |
| | | today_limit_up_price = round( |
| | | float(gpcode_manager.get_limit_up_price_by_preprice(code, datas[-1]["close"])), 2) |
| | | max_price = 0 |
| | | for data in datas: |
| | | if data["high"] > max_price: |
| | |
| | | return False |
| | | |
| | | |
| | | # 暂时不使用 |
| | | # 从最近一次涨停开始,是否涨幅过高 |
| | | def is_up_too_high_from_latest_limit_up(code, record_datas): |
| | | datas = copy.deepcopy(record_datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[-20:] |
| | | datas.reverse() |
| | | today_limit_up_price = round( |
| | | float(gpcode_manager.get_limit_up_price_by_preprice(code, datas[0]["close"])), 2) |
| | | max_price = 0 |
| | | limit_up_price = None |
| | | for i in range(0, len(datas)): |
| | | item = datas[i] |
| | | if item['high'] > max_price: |
| | | max_price = item['high'] |
| | | if __is_limited_up(code, item): |
| | | limit_up_price = item['high'] |
| | | break |
| | | if not limit_up_price: |
| | | return False |
| | | if today_limit_up_price < max_price: |
| | | return False |
| | | if (today_limit_up_price - limit_up_price) / limit_up_price > 0.25: |
| | | return True |
| | | return False |
| | | |
| | | |
| | | # 最近几天是否有最大量 |
| | | def is_latest_max_volume(record_datas, day_count): |
| | | def is_have_latest_max_volume(code, record_datas, day_count): |
| | | datas = copy.deepcopy(record_datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[-120:] |
| | |
| | | return False |
| | | |
| | | |
| | | # 在最近几天内股价是否长得太高 |
| | | def is_price_too_high_in_days(code, record_datas, limit_up_price, day_count=5): |
| | | datas = copy.deepcopy(record_datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[0 - day_count:] |
| | | min_price_info = None |
| | | max_price_info = None |
| | | for d in datas: |
| | | if min_price_info is None: |
| | | min_price_info = d["low"], d |
| | | if max_price_info is None: |
| | | max_price_info = d["high"], d |
| | | if min_price_info[0] > d["low"]: |
| | | min_price_info = d["low"], d |
| | | if max_price_info[0] < d["high"]: |
| | | max_price_info = d["high"], d |
| | | # if max_price > float(limit_up_price): |
| | | # return False |
| | | rate = (max_price_info[1]["high"] - min_price_info[1]["low"]) / min_price_info[1]["low"] |
| | | THRESHOLD_RATE = 0.319 * 2 if tool.is_ge_code(code) else 0.319 |
| | | if rate >= THRESHOLD_RATE: |
| | | return True, rate |
| | | return False, rate |
| | | |
| | | |
| | | # 连续涨停后是否回调不足够 |
| | | def is_continue_limit_up_not_enough_fall_dwon(code, record_datas): |
| | | # 10 天内是否有连续3板 |
| | | datas = copy.deepcopy(record_datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[0 - 10:] |
| | | limit_up_continue_count_info = None |
| | | max_limit_up_continue_count_info_list = [] # [连续涨停次数,涨停起点] |
| | | |
| | | for i in range(len(datas)): |
| | | item = datas[i] |
| | | if __is_limit_up(code, item): |
| | | if not limit_up_continue_count_info: |
| | | limit_up_continue_count_info = [1, i] |
| | | else: |
| | | limit_up_continue_count_info[0] += 1 |
| | | else: |
| | | if limit_up_continue_count_info: |
| | | max_limit_up_continue_count_info_list.append(limit_up_continue_count_info) |
| | | limit_up_continue_count_info = None |
| | | max_limit_up_info = None |
| | | for x in max_limit_up_continue_count_info_list: |
| | | if max_limit_up_info is None: |
| | | max_limit_up_info = x |
| | | if max_limit_up_info[0] <= x[0]: |
| | | max_limit_up_info = x |
| | | |
| | | if not max_limit_up_info or max_limit_up_info[0] < 3: |
| | | return False |
| | | start_index = max_limit_up_info[1] |
| | | max_price_info = [0, None] |
| | | for i in range(start_index, len(datas)): |
| | | item = datas[i] |
| | | if item["high"] > max_price_info[0]: |
| | | max_price_info = [item["high"], i] |
| | | # 计算回踩价格 |
| | | lowest_price_threhhold = round((1 - 0.28) * max_price_info[0], 2) |
| | | for i in range(max_price_info[1] + 1, len(datas)): |
| | | item = datas[i] |
| | | if item["low"] < lowest_price_threhhold: |
| | | return False |
| | | return True |
| | | |
| | | |
| | | # 是否有涨停 |
| | | def get_first_limit_up_count(datas): |
| | | def get_first_limit_up_count(code, datas): |
| | | datas = copy.deepcopy(datas) |
| | | count = 0 |
| | | for i in range(len(datas)): |
| | | item = datas[i] |
| | | # 获取首板涨停次数 |
| | | if __is_limit_up(item) and i > 0 and not __is_limit_up(datas[i - 1]): |
| | | if __is_limit_up(code, item) and i > 0 and not __is_limit_up(code, datas[i - 1]): |
| | | # 首板涨停 |
| | | count += 1 |
| | | |
| | |
| | | |
| | | |
| | | # 是否破前高 |
| | | def __is_new_top(limit_up_price, datas): |
| | | def __is_new_top(code, limit_up_price, datas): |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[-80:] |
| | | datas = datas[-60:] |
| | | max_price = 0 |
| | | for data in datas: |
| | | if max_price < data["high"]: |
| | |
| | | return False, '' |
| | | |
| | | |
| | | def is_new_top(limit_up_price, datas): |
| | | return __is_new_top(float(limit_up_price), datas)[0] |
| | | def is_new_top(code, limit_up_price, datas): |
| | | return __is_new_top(code, float(limit_up_price), datas)[0] |
| | | |
| | | |
| | | def is_near_top(code, limit_up_price, datas): |
| | | return __is_near_new_top(code, float(limit_up_price), datas)[0] |
| | | |
| | | |
| | | # 接近新高 |
| | | def __is_near_new_top(limit_up_price, datas): |
| | | def __is_near_new_top(code, limit_up_price, datas): |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[-80:] |
| | | max_volume = 0 |
| | | max_volume_index = 0 |
| | | |
| | | max_price = 0 |
| | | max_price_index = 0 |
| | | for index in range(0, len(datas)): |
| | | data = datas[index] |
| | | if max_volume < data["volume"]: |
| | | max_volume = data["volume"] |
| | | max_volume_index = index |
| | | |
| | | price = 0 |
| | | price_index = 0 |
| | | for index in range(max_volume_index, len(datas)): |
| | | data = datas[index] |
| | | if data["high"] > price: |
| | | price = data["high"] |
| | | price_index = index |
| | | |
| | | index = price_index |
| | | # 最大量当日最高价比当日之后的最高价涨幅在15%以内 |
| | | if (price - datas[max_volume_index]["high"]) / datas[max_volume_index]["high"] < 0.15: |
| | | price = datas[max_volume_index]["high"] |
| | | index = max_volume_index |
| | | |
| | | print(max_volume) |
| | | rate = (price - limit_up_price) / limit_up_price |
| | | if 0 < rate < 0.03: |
| | | return True, datas[index]['bob'].strftime("%Y-%m-%d") |
| | | if data["high"] > max_price: |
| | | max_price = data["high"] |
| | | max_price_index = index |
| | | rate = (max_price - float(limit_up_price)) / max_price |
| | | if 0 < rate < 0.02: |
| | | return True, datas[max_price_index]['bob'].strftime("%Y-%m-%d") |
| | | return False, '' |
| | | |
| | | |
| | | # 是否跌破箱体 |
| | | def __is_lowest(datas): |
| | | def __is_lowest(code, datas): |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[-80:] |
| | |
| | | |
| | | |
| | | # N字形 |
| | | def __is_n_model(datas): |
| | | def __is_n_model(code, datas): |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[-80:] |
| | |
| | | min_price = 1000000 |
| | | for i in range(len(datas) - 5, len(datas)): |
| | | item = datas[i] |
| | | print(item) |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"])) |
| | | if abs(limit_up_price - item["high"]) < 0.001 and abs( |
| | | limit_up_price - datas[i - 1]["high"]) >= 0.001: |
| | | # 涨停,前一天非涨停 |
| | |
| | | return False, '' |
| | | |
| | | |
| | | def __has_latest_throwing_pressure(code, datas, day_count): |
| | | """ |
| | | 最近释放有抛压 |
| | | @param code: |
| | | @param datas: |
| | | @param day_count: |
| | | @return: 是否有抛压, None/(p高价数据, t高价数据) |
| | | """ |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | items = datas[0 - day_count:] |
| | | target_item = None |
| | | for i in range(len(items) - 1, -1, -1): |
| | | item = items[i] |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"])) |
| | | limit_down_price = float(gpcode_manager.get_limit_down_price_by_preprice(code, item["pre_close"])) |
| | | if abs(limit_up_price - item["high"]) < 0.001 or abs(limit_down_price - item["close"]) < 0.001: |
| | | # 炸板 # 或涨停 # 或者跌停 |
| | | target_item = item |
| | | break |
| | | if not target_item: |
| | | return False, None |
| | | p_price, p_volume = target_item["high"], target_item["volume"] |
| | | t_price, t_volume = 0, 0 |
| | | for i in range(len(items) - 1, -1, -1): |
| | | item = items[i] |
| | | if item["bob"] == target_item["bob"]: |
| | | break |
| | | if item["high"] >= p_price * 1.03: |
| | | t_price, t_volume = item["high"], item["volume"] |
| | | break |
| | | if t_price > 0: |
| | | return True, ((p_price, p_volume), (t_price, t_volume)) |
| | | else: |
| | | return True, ((p_price, p_volume), None) |
| | | |
| | | |
| | | def __is_latest_limit_down(code, datas, day_count): |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | items = datas[0 - day_count:] |
| | | for item in items: |
| | | # 是否有跌停 |
| | | limit_down_price = float(gpcode_manager.get_limit_down_price_by_preprice(code, item["pre_close"])) |
| | | if abs(limit_down_price - item["close"]) < 0.001: |
| | | # 跌停 |
| | | return True |
| | | return False |
| | | |
| | | |
| | | def __is_pre_day_limit_rate_too_low(code, datas): |
| | | """ |
| | | 上个交易日是否跌幅过大 |
| | | @param code: |
| | | @param datas: |
| | | @return: |
| | | """ |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | items = datas[-1:] |
| | | for item in items: |
| | | # 是否有跌停 |
| | | # 获取当日涨幅 |
| | | rate_open = (item["open"] - item["pre_close"]) / item["pre_close"] |
| | | rate_close = (item["close"] - item["pre_close"]) / item["pre_close"] |
| | | threshold_rate_ = 0.15 |
| | | if abs(rate_open - rate_close) >= threshold_rate_: |
| | | return True |
| | | return False |
| | | |
| | | |
| | | # V字形 |
| | | def __is_v_model(datas): |
| | | def __is_v_model(code, datas): |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[-30:] |
| | |
| | | max_price = datas[i]["close"] |
| | | max_price_index = i |
| | | min_price = max_price |
| | | min_price_index = max_price_index |
| | | for i in range(max_price_index, len(datas)): |
| | | if min_price > datas[i]["close"]: |
| | | min_price = datas[i]["close"] |
| | | min_price_index = i |
| | | |
| | | if (max_price - min_price) / max_price > 0.249: |
| | | return True, '' |
| | |
| | | |
| | | |
| | | # 是否天量大阳 |
| | | def __get_big_volumn_info(datas): |
| | | def __get_big_volumn_info(code, datas): |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas = datas[-30:] |
| | |
| | | |
| | | |
| | | # 是否涨停 |
| | | def __is_limit_up(data): |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"])) |
| | | return abs(limit_up_price - data["close"]) < 0.001 |
| | | def __is_limit_up(code, data): |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"])) |
| | | return abs(limit_up_price - data["close"]) < 0.009 |
| | | |
| | | |
| | | # 是否涨停过 |
| | | def __is_limited_up(data): |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(data["pre_close"])) |
| | | return abs(limit_up_price - data["high"]) < 0.001 |
| | | def __is_limited_up(code, data): |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, data["pre_close"])) |
| | | return abs(limit_up_price - data["high"]) < 0.009 |
| | | |
| | | |
| | | # 多少天内是否有涨停/曾涨停 |
| | | def __has_limit_up(code, datas, day_count): |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | if len(datas) > day_count: |
| | | datas = datas[0 - day_count:] |
| | | if len(datas) >= 1: |
| | | for i in range(0, len(datas)): |
| | | item = datas[i] |
| | | if __is_limit_up(code, item): |
| | | return True |
| | | return False |
| | | |
| | | |
| | | # 多少天内是否曾涨停 |
| | | def __has_limited_up(code, datas, day_count): |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | if len(datas) > day_count: |
| | | datas = datas[0 - day_count:] |
| | | if len(datas) >= 1: |
| | | for i in range(0, len(datas)): |
| | | item = datas[i] |
| | | if __is_limited_up(code, item): |
| | | return True |
| | | return False |
| | | |
| | | |
| | | # 首板涨停溢价率 |
| | | def get_limit_up_premium_rate(datas): |
| | | def get_limit_up_premium_rate(code, datas): |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | first_rate_list = [] |
| | | for i in range(0, len(datas)): |
| | | item = datas[i] |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"])) |
| | | if abs(limit_up_price - item["close"]) < 0.001: |
| | | if 0 < i < len(datas) - 1 and not __is_limit_up(datas[i - 1]): |
| | | if 0 < i < len(datas) - 1 and not __is_limit_up(code, datas[i - 1]): |
| | | # 首板涨停 |
| | | rate = (datas[i + 1]["high"] - datas[i + 1]["pre_close"]) / datas[i + 1]["pre_close"] |
| | | first_rate_list.append(rate) |
| | |
| | | |
| | | |
| | | # 首板炸板溢价率 |
| | | def get_open_limit_up_premium_rate(datas): |
| | | def get_open_limit_up_premium_rate(code, datas): |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | first_rate_list = [] |
| | | for i in range(0, len(datas)): |
| | | item = datas[i] |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(item["pre_close"])) |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(code, item["pre_close"])) |
| | | |
| | | if abs(limit_up_price - item["high"]) < 0.001 and abs(limit_up_price - item["close"]) > 0.001: |
| | | # |
| | | limit_up_price = float(gpcode_manager.get_limit_up_price_by_preprice(datas[i - 1]["pre_close"])) |
| | | if 0 < i < len(datas) - 1 and not __is_limit_up(datas[i - 1]): |
| | | if 0 < i < len(datas) - 1 and not __is_limit_up(code, datas[i - 1]): |
| | | # 前一天未涨停 |
| | | rate = (datas[i + 1]["high"] - item["high"]) / item["high"] |
| | | first_rate_list.append(rate) |
| | |
| | | |
| | | |
| | | # 是否具有辨识度 |
| | | def is_special(datas): |
| | | # 30个交易日内有≥5天曾涨停且连续涨停数或曾涨停≥2天 |
| | | if len(datas) > 30: |
| | | datas = datas[-30:] |
| | | def is_special(code, datas): |
| | | datas = copy.deepcopy(datas) |
| | | datas.sort(key=lambda x: x["bob"]) |
| | | datas_30 = datas[-30:] |
| | | datas_90 = datas[-90:] |
| | | count = 0 |
| | | # 30个交易日内累计涨停次数≥4次 |
| | | continue_count = 0 |
| | | has_continue = False |
| | | for item in datas_30: |
| | | if __is_limit_up(code, item): |
| | | continue_count += 1 |
| | | count += 1 |
| | | if continue_count >= 4: |
| | | has_continue = True |
| | | else: |
| | | continue_count = 0 |
| | | if count >= 5 and has_continue: |
| | | return True, "短期辨识度" |
| | | |
| | | count = 0 |
| | | continue_count = 0 |
| | | last_index = -1 |
| | | for i in range(len(datas)): |
| | | if __is_limited_up(datas[i]): |
| | | if last_index >= 0 and i - last_index == 1: |
| | | continue_count += 1 |
| | | has_continue = False |
| | | # 90个交易日内涨停次数≥6次 |
| | | for item in datas_90: |
| | | if __is_limit_up(code, item): |
| | | continue_count += 1 |
| | | count += 1 |
| | | last_index = i |
| | | if count >= 5 and continue_count > 0: |
| | | return True, '' |
| | | return False, '' |
| | | if continue_count >= 4: |
| | | has_continue = True |
| | | else: |
| | | continue_count = 0 |
| | | if count >= 6 and has_continue: |
| | | return True, "长期辨识度" |
| | | return False, "" |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | print(CodeNatureRecordManager.get_k_format("603717")) |
| | | code = "000333" |
| | | threshold_rate = 0 - ((1 - tool.get_limit_down_rate(code)) * 0.9) |
| | | print(threshold_rate) |