"""
|
代码行业关键词管理
|
"""
|
|
# 涨停代码关键词板块管理
|
import json
|
|
import constant
|
from utils import global_util, tool
|
from log_module import log
|
from db import redis_manager
|
|
from log_module.log import logger_kpl_limit_up, logger_kpl_block_can_buy
|
from third_data.kpl_util import KPLPlatManager
|
from trade import trade_manager
|
|
|
# 开盘啦禁止交易板块管理
|
class KPLPlateForbiddenManager:
|
__redisManager = redis_manager.RedisManager(3)
|
|
def __get_redis(self):
|
return self.__redisManager.getRedis()
|
|
def save_plate(self, plate):
|
self.__get_redis().sadd("kpl_forbidden_plates", plate)
|
self.__get_redis().expire("kpl_forbidden_plates", tool.get_expire())
|
|
def list_all(self):
|
return self.__get_redis().smembers("kpl_forbidden_plates")
|
|
|
class LimitUpCodesPlateKeyManager:
|
# 今日涨停原因
|
today_limit_up_reason_dict = {}
|
today_total_limit_up_reason_dict = {}
|
total_code_keys_dict = {}
|
total_key_codes_dict = {}
|
__redisManager = redis_manager.RedisManager(1)
|
|
def __get_redis(self):
|
return self.__redisManager.getRedis()
|
|
# 获取今日涨停数据,格式:[(代码,涨停原因)]
|
def set_today_limit_up(self, datas):
|
temp_dict = {}
|
if datas:
|
for item in datas:
|
temp_dict[item[0]] = item[1]
|
self.today_limit_up_reason_dict = temp_dict
|
if datas:
|
for item in datas:
|
self.__set_total_keys(item[0])
|
self.set_today_total_limit_up(datas)
|
|
# 设置今日历史涨停数据
|
def set_today_total_limit_up(self, datas):
|
for item in datas:
|
code = item[0]
|
self.today_total_limit_up_reason_dict[code] = item[1]
|
|
# 今日涨停原因变化
|
def set_today_limit_up_reason_change(self, code, from_reason, to_reason):
|
self.__get_redis().sadd(f"kpl_limit_up_reason_his-{code}", from_reason)
|
self.__get_redis().expire(f"kpl_limit_up_reason_his-{code}", tool.get_expire())
|
self.__set_total_keys(code)
|
|
def __set_total_keys(self, code):
|
keys = set()
|
keys_his = self.__get_redis().smembers(f"kpl_limit_up_reason_his-{code}")
|
keys |= keys_his
|
if code in self.today_limit_up_reason_dict:
|
keys.add(self.today_limit_up_reason_dict.get(code))
|
self.total_code_keys_dict[code] = keys
|
for k in keys:
|
if k not in self.total_key_codes_dict:
|
self.total_key_codes_dict[k] = set()
|
self.total_key_codes_dict[k].add(code)
|
|
logger_kpl_limit_up.info("{}板块关键词:{}", code, keys)
|
|
def get_codes_by_key_without_mine(self, key, code):
|
# 只比较今日涨停原因
|
codes_set = set()
|
if key in self.total_key_codes_dict:
|
codes_set |= self.total_key_codes_dict[key]
|
codes_set.discard(code)
|
return codes_set
|
|
# 涨停原因匹配关键字(和涨停列表中的涨停原因做对比),返回:{关键词:代码集合}
|
def match_limit_up_reason_keys(self, code, keys):
|
fresult = {}
|
for k in keys:
|
if k in self.total_key_codes_dict:
|
codes = set(self.total_key_codes_dict[k])
|
codes.discard(code)
|
if codes:
|
fresult[k] = codes
|
return fresult
|
|
|
# 实时开盘啦市场数据
|
class RealTimeKplMarketData:
|
# 精选前5
|
top_5_reason_list = []
|
# 行业前5
|
top_5_industry_list = []
|
#
|
top_5_key_dict = {}
|
total_reason_dict = {}
|
total_industry_dict = {}
|
__KPLPlateForbiddenManager = KPLPlateForbiddenManager()
|
__LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
|
__KPLPlatManager = KPLPlatManager()
|
|
@classmethod
|
def set_top_5_reasons(cls, datas):
|
temp_list = []
|
for d in datas:
|
cls.total_reason_dict[d[1]] = d
|
# 排序
|
for i in range(0, len(datas)):
|
if datas[i][1] not in constant.KPL_INVALID_BLOCKS:
|
# (名称,净流入金额,排名)
|
temp_list.append((datas[i][1], datas[i][3], len(temp_list)))
|
# 只获取前10个
|
if len(temp_list) > 5:
|
break
|
if datas[i][3] < 1 * 10000 * 10000:
|
break
|
|
for temp in temp_list:
|
names = cls.__KPLPlatManager.get_same_plat_names_by_id(temp[0])
|
for name in names:
|
if name == temp[1]:
|
continue
|
temp_list.append((name, temp[1], temp[2]))
|
cls.top_5_reason_list = temp_list
|
cls.__reset_top_5_dict()
|
|
@classmethod
|
def set_top_5_industry(cls, datas):
|
for d in datas:
|
cls.total_industry_dict[d[1]] = d
|
temp_list = []
|
for i in range(0, len(datas)):
|
if datas[i][1] in constant.KPL_INVALID_BLOCKS:
|
continue
|
temp_list.append((datas[i][1], datas[i][2], len(temp_list)))
|
if len(temp_list) > 5:
|
break
|
if datas[i][2] < 1 * 10000 * 10000:
|
break
|
cls.top_5_industry_list = temp_list
|
cls.__reset_top_5_dict()
|
|
@classmethod
|
def __reset_top_5_dict(cls):
|
temp_dict = {}
|
for t in cls.top_5_industry_list:
|
temp_dict[t[0]] = t
|
for t in cls.top_5_reason_list:
|
temp_dict[t[0]] = t
|
cls.top_5_key_dict = temp_dict
|
|
# 获取能够买的行业关键字set
|
@classmethod
|
def get_can_buy_key_set(cls):
|
temp_set = cls.top_5_key_dict.keys()
|
return temp_set
|
|
# 通过关键字判断能买的代码数量
|
@classmethod
|
def get_can_buy_codes_count(cls, code, key):
|
# 判断行业涨停票数量,除开自己必须大于1个
|
temp_codes = LimitUpCodesPlateKeyManager.total_key_codes_dict.get(key)
|
if temp_codes is None:
|
temp_codes = set()
|
else:
|
temp_codes = set(temp_codes)
|
temp_codes.discard(code)
|
if len(temp_codes) < 1:
|
# 后排才能挂单
|
return 0, "身位不为后排"
|
|
forbidden_plates = cls.__KPLPlateForbiddenManager.list_all()
|
if key in forbidden_plates:
|
return 0, "不买该板块"
|
|
# 10:30以前可以挂2个单
|
if int(tool.get_now_time_str().replace(':', '')) < int("100000"):
|
return 2, "10:00以前可以挂2个单"
|
# 10:30以后
|
if key not in cls.top_5_key_dict:
|
return 0, "净流入没在前5"
|
if cls.top_5_key_dict[key][1] > 3 * 10000 * 10000:
|
return 2, "净流入在前5且大于3亿"
|
else:
|
return 1, "净流入在前5"
|
|
@classmethod
|
def is_in_top(cls, keys):
|
reasons = cls.get_can_buy_key_set()
|
log.logger_kpl_debug.debug("市场流入前5:{}", reasons)
|
forbidden_plates = cls.__KPLPlateForbiddenManager.list_all()
|
reasons = reasons - forbidden_plates
|
temp_set = keys & reasons
|
log.logger_kpl_debug.debug("市场流入前5匹配结果:{}", temp_set)
|
if temp_set:
|
return True, temp_set
|
else:
|
return False, None
|
|
|
#
|
class CodesHisReasonAndBlocksManager:
|
__redisManager = redis_manager.RedisManager(1)
|
# 历史涨停原因
|
__history_limit_up_reason_dict = {}
|
# 板块
|
__blocks_dict = {}
|
|
def __get_redis(self):
|
return self.__redisManager.getRedis()
|
|
def set_history_limit_up_reason(self, code, reasons):
|
self.__history_limit_up_reason_dict[code] = set(reasons)
|
self.__get_redis().setex(f"kpl_his_limit_up_reason-{code}", tool.get_expire(), json.dumps(list(reasons)))
|
|
# 如果返回值不为None表示已经加载过历史原因了
|
|
def get_history_limit_up_reason(self, code):
|
reasons = self.__history_limit_up_reason_dict.get(code)
|
if reasons is None:
|
# 从内存中加载
|
val = self.__get_redis().get(f"kpl_his_limit_up_reason-{code}")
|
if val is not None:
|
val = set(json.loads(val))
|
self.__history_limit_up_reason_dict[code] = val
|
if code in self.__history_limit_up_reason_dict:
|
return self.__history_limit_up_reason_dict.get(code)
|
else:
|
return None
|
else:
|
return reasons
|
|
def set_blocks(self, code, blocks):
|
self.__blocks_dict[code] = set(blocks)
|
self.__get_redis().setex(f"kpl_blocks-{code}", tool.get_expire(), json.dumps(list(blocks)))
|
|
def get_blocks(self, code):
|
reasons = self.__blocks_dict.get(code)
|
if reasons is None:
|
# 从内存中加载
|
val = self.__get_redis().get(f"kpl_blocks-{code}")
|
if val is not None:
|
val = set(json.loads(val))
|
self.__blocks_dict[code] = val
|
if code in self.__blocks_dict:
|
return self.__blocks_dict.get(code)
|
else:
|
return None
|
else:
|
return reasons
|
|
def get_total_keys(self, code):
|
reasons = self.get_history_limit_up_reason(code)
|
if reasons is None:
|
reasons = set()
|
blocks = self.get_blocks(code)
|
if blocks is None:
|
blocks = set()
|
return reasons | blocks
|
|
|
# 目标代码关键词管理
|
class TargetCodePlateKeyManager:
|
__redisManager = redis_manager.RedisManager(1)
|
__CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
|
|
def __get_redis(self):
|
return self.__redisManager.getRedis()
|
|
# 返回key集合(排除无效板块),今日涨停原因,今日历史涨停原因,历史涨停原因,二级,板块
|
def get_plate_keys(self, code):
|
keys = set()
|
k1 = set()
|
if code in LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict:
|
k1 = {LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict[code]}
|
# 加载历史原因
|
k11 = self.__get_redis().smembers(f"kpl_limit_up_reason_his-{code}")
|
k2 = self.__CodesPlateKeysManager.get_history_limit_up_reason(code)
|
if k2 is None:
|
k2 = set()
|
k3 = set()
|
industry = global_util.code_industry_map.get(code)
|
if industry:
|
k3 = {industry}
|
|
k4 = self.__CodesPlateKeysManager.get_blocks(code)
|
if k4 is None:
|
k4 = set()
|
for k in [k1, k11, k2, k3, k4]:
|
keys |= k
|
|
# 排除无效的涨停原因
|
keys = keys - set(constant.KPL_INVALID_BLOCKS)
|
|
return keys, k1, k11, k2, k3, k4
|
|
|
class CodePlateKeyBuyManager:
|
__TargetCodePlateKeyManager = TargetCodePlateKeyManager()
|
__LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
|
__CodesHisReasonAndBlocksManager = CodesHisReasonAndBlocksManager()
|
|
# 是否可以下单
|
@classmethod
|
def can_buy(cls, code):
|
if constant.TEST:
|
return True, ""
|
keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code)
|
log.logger_kpl_debug.info("{}关键词:{},{},{},{},{},{}", code, keys, k1, k11, k2, k3, k4)
|
# 涨停列表中匹配关键词
|
match_limit_up_result = cls.__LimitUpCodesPlateKeyManager.match_limit_up_reason_keys(code, keys)
|
log.logger_kpl_debug.info("{}关键词身位匹配结果:{}", code, match_limit_up_result)
|
if not match_limit_up_result:
|
return False, "未在涨停列表中未匹配到涨停原因"
|
|
# ---------------------------------判断目标代码的板块-------------------start------------
|
# 判断匹配出的涨停原因,判断是否有已经下单的票
|
# reason_need_buy_dict = {}
|
# for k in match_limit_up_result:
|
# codes = match_limit_up_result[k]
|
# final_codes_keys = [keys]
|
# for code_ in codes:
|
# temp_key_set = set()
|
# temp_key_set |= cls.__CodesHisReasonAndBlocksManager.get_total_keys(code_)
|
# temp = cls.__LimitUpCodesPlateKeyManager.total_code_keys_dict.get(code_)
|
# if temp:
|
# temp_key_set |= temp
|
# # 二级
|
# industry = global_util.code_industry_map.get(code_)
|
# if industry:
|
# temp_key_set.add(industry)
|
#
|
# final_codes_keys.append(temp_key_set)
|
# # 求共同的关键词
|
# intersection = set(final_codes_keys[0])
|
# for s in final_codes_keys:
|
# intersection &= s
|
# log.logger_kpl_debug.info("{}的板块求交集:{}-{}", code, k, intersection)
|
#
|
# # 求公共的板块是否在流入前5中
|
# is_in, valid_keys = RealTimeKplMarketData.is_in_top(intersection)
|
# if is_in:
|
# reason_need_buy_dict[k] = (is_in, valid_keys)
|
# ---------------------------------判断目标代码的板块-------------------end------------
|
|
# 获取板块可以下单的个数
|
can_buy_codes_count_dict = {}
|
|
for key__ in match_limit_up_result:
|
can_buy_count, msg = RealTimeKplMarketData.get_can_buy_codes_count(code, key__)
|
can_buy_codes_count_dict[key__] = can_buy_count
|
|
has_available_key = False
|
for key in can_buy_codes_count_dict:
|
if can_buy_codes_count_dict[key] > 0:
|
has_available_key = True
|
break
|
if not has_available_key:
|
return False, f"匹配到的【{','.join(match_limit_up_result.keys())}】没在精选/行业可以买入的板块中"
|
|
# ---------------------------------加载已经下单/成交的代码信息------------start-------------
|
match_reasons = match_limit_up_result.keys()
|
# 判断匹配到的原因是否已经有下单/买入成功的代码
|
codes_delegate = set(trade_manager.get_codes_by_trade_states(
|
{trade_manager.TRADE_STATE_BUY_DELEGATED, trade_manager.TRADE_STATE_BUY_PLACE_ORDER}))
|
codes_success = set(trade_manager.get_codes_by_trade_states(
|
{trade_manager.TRADE_STATE_BUY_SUCCESS}))
|
|
codes = codes_delegate | codes_success
|
|
# 统计成交代码的板块
|
trade_codes_blocks_dict = {}
|
# 已经成交的板块
|
trade_success_blocks = set()
|
for c in codes:
|
keys_, k1_, k11_, k2_, k3_, k4_ = cls.__TargetCodePlateKeyManager.get_plate_keys(c)
|
# 实时涨停原因
|
trade_codes_blocks_dict[c] = k1_
|
# 统计板块中的代码
|
trade_block_codes_dict = {}
|
for c in trade_codes_blocks_dict:
|
for b in trade_codes_blocks_dict[c]:
|
if c in codes_success:
|
trade_success_blocks.add(b)
|
if b not in trade_block_codes_dict:
|
trade_block_codes_dict[b] = set()
|
trade_block_codes_dict[b].add(c)
|
|
# ---------------------------------加载已经下单/成交的代码信息------------end-------------
|
|
msg_list = []
|
for key in can_buy_codes_count_dict:
|
log.logger_kpl_debug.debug(f"{code}:板块可以下单的数量【{key}】-{can_buy_codes_count_dict[key]}")
|
if can_buy_codes_count_dict[key] < 1:
|
continue
|
# 板块中已经有成交的就不下单了
|
if key in trade_success_blocks:
|
msg_list.append(f"【{key}】中已经有成交代码")
|
log.logger_kpl_debug.debug(f"{code}:板块已经有成交【{key}】")
|
continue
|
# 板块可以下单数量
|
if trade_block_codes_dict.get(key) is None or len(trade_block_codes_dict.get(key)) < \
|
can_buy_codes_count_dict[key]:
|
order_count = len(trade_block_codes_dict.get(key)) if key in trade_block_codes_dict else 0
|
logger_kpl_block_can_buy.info(
|
f"code={code}:【{key}】可以下单,现有数量:{order_count} 最大数量:{can_buy_codes_count_dict[key]}")
|
return True, f"可以下单,板块:【{key}】,板块中已经下单的数量:{order_count}"
|
else:
|
order_count = len(trade_block_codes_dict.get(key))
|
msg_list.append(f"【{key}】中下单代码数量{order_count}/允许下单数量{can_buy_codes_count_dict[key]}")
|
|
return False, ",".join(msg_list)
|
|
|
if __name__ == "__main__":
|
pass
|