"""
|
股票代码管理器
|
"""
|
import json
|
import time
|
|
import constant
|
from db import redis_manager
|
from db.redis_manager import RedisUtils
|
from utils import tool
|
import decimal
|
|
from ths import l2_listen_pos_health_manager, client_manager
|
|
__redisManager = redis_manager.RedisManager(0)
|
|
|
class CodesNameManager:
|
redisManager = redis_manager.RedisManager(0)
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.redisManager.getRedis()
|
|
@classmethod
|
def list_code_name_dict(cls):
|
dict_ = {}
|
val = cls.list_first_code_name_dict()
|
if val is not None:
|
for k in val:
|
dict_[k] = val[k]
|
val = cls.list_second_code_name_dict()
|
if val is not None:
|
for k in val:
|
dict_[k] = val[k]
|
return dict_
|
|
@classmethod
|
def list_first_code_name_dict(cls):
|
val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
|
if val is not None:
|
val = json.loads(val)
|
return val
|
return None
|
|
@classmethod
|
def list_second_code_name_dict(cls):
|
val = RedisUtils.get(cls.__get_redis(), "gp_list_names")
|
if val is not None:
|
val = json.loads(val)
|
return val
|
return None
|
|
@classmethod
|
def get_first_code_name(cls, code):
|
val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
|
if not val:
|
return None
|
val = json.loads(val)
|
for k in val:
|
if val[k] == code:
|
return k
|
return None
|
|
@classmethod
|
def get_second_code_name(cls, code):
|
val = RedisUtils.get(cls.__get_redis(), "gp_list_names")
|
if not val:
|
return None
|
val = json.loads(val)
|
for k in val:
|
if val[k] == code:
|
return k
|
|
@classmethod
|
def get_first_name_code(cls, name):
|
val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
|
if not val:
|
return None
|
val = json.loads(val)
|
return val.get(name)
|
|
@classmethod
|
def add_first_code_name(cls, code, name):
|
val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
|
if not val:
|
return None
|
val = json.loads(val)
|
val[name] = code
|
cls.set_first_code_names(val)
|
|
@classmethod
|
def get_second_name_code(cls, name):
|
val = RedisUtils.get(cls.__get_redis(), "gp_list_names")
|
if not val:
|
return None
|
val = json.loads(val)
|
return val.get(name)
|
|
# 设置首板代码名称
|
@classmethod
|
def set_first_code_names(cls, datas):
|
RedisUtils.set( cls.__get_redis(), "gp_list_names_first", json.dumps(datas))
|
|
# 设置二板代码名称
|
@classmethod
|
def set_second_code_names(cls, datas):
|
RedisUtils.set(cls.__get_redis(), "gp_list_names", json.dumps(datas))
|
|
# 删除首板代码名称
|
@classmethod
|
def clear_first_code_names(cls):
|
RedisUtils.delete(cls.__get_redis(), "gp_list_names_first")
|
|
# 设置二板代码名称
|
@classmethod
|
def clear_second_code_names(cls):
|
RedisUtils.delete(cls.__get_redis(), "gp_list_names")
|
|
|
# 首板代码管理
|
class FirstCodeManager:
|
redisManager = redis_manager.RedisManager(0)
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.redisManager.getRedis()
|
|
# 加入首板历史记录
|
@classmethod
|
def add_record(cls, codes):
|
for code in codes:
|
RedisUtils.sadd(cls.__get_redis(), "first_code_record", code)
|
RedisUtils.expire(cls.__get_redis(), "first_code_record", tool.get_expire())
|
|
@classmethod
|
def is_in_first_record(cls, code):
|
if RedisUtils.sismember( cls.__get_redis(), "first_code_record", code):
|
return True
|
else:
|
return False
|
|
# 加入首板涨停过代码集合
|
@classmethod
|
def add_limited_up_record(cls, codes):
|
for code in codes:
|
RedisUtils.sadd(cls.__get_redis(), "first_code_limited_up_record", code)
|
RedisUtils.expire(cls.__get_redis(), "first_code_limited_up_record", tool.get_expire())
|
|
# 是否涨停过
|
@classmethod
|
def is_limited_up(cls, code):
|
if RedisUtils.sismember( cls.__get_redis(), "first_code_limited_up_record", code):
|
return True
|
else:
|
return False
|
|
|
# 想要买的代码
|
class WantBuyCodesManager:
|
redisManager = redis_manager.RedisManager(0)
|
__redis_key = "want_buy_codes"
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.redisManager.getRedis()
|
|
@classmethod
|
def clear(cls):
|
RedisUtils.delete(cls.__get_redis(), cls.__redis_key)
|
|
@classmethod
|
def add_code(cls, code):
|
RedisUtils.sadd(cls.__get_redis(), cls.__redis_key, code)
|
RedisUtils.expire(cls.__get_redis(), cls.__redis_key, tool.get_expire())
|
|
@classmethod
|
def remove_code(cls, code):
|
RedisUtils.srem( cls.__get_redis(),cls.__redis_key, code)
|
|
@classmethod
|
def is_in(cls, code):
|
return RedisUtils.sismember( cls.__get_redis(), cls.__redis_key, code)
|
|
@classmethod
|
def list_code(cls):
|
return RedisUtils.smembers(cls.__get_redis(), cls.__redis_key)
|
|
|
# 暂停下单代码管理
|
# 与黑名单的区别是暂停交易代码只是不交易,不能移除L2监控位
|
class PauseBuyCodesManager:
|
redisManager = redis_manager.RedisManager(0)
|
__redis_key = "pause_buy_codes"
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.redisManager.getRedis()
|
|
@classmethod
|
def clear(cls):
|
RedisUtils.delete(cls.__get_redis(), cls.__redis_key)
|
|
@classmethod
|
def add_code(cls, code):
|
RedisUtils.sadd(cls.__get_redis(), cls.__redis_key, code)
|
RedisUtils.expire(cls.__get_redis(), cls.__redis_key, tool.get_expire())
|
|
@classmethod
|
def remove_code(cls, code):
|
RedisUtils.srem(cls.__get_redis(), cls.__redis_key, code)
|
|
@classmethod
|
def is_in(cls, code):
|
return RedisUtils.sismember( cls.__get_redis(), cls.__redis_key, code)
|
|
@classmethod
|
def list_code(cls):
|
return RedisUtils.smembers(cls.__get_redis(), cls.__redis_key)
|
|
|
def __parse_codes_data(code_datas):
|
codes = []
|
name_codes = {}
|
for _data in code_datas:
|
# 正常的股票
|
if _data["sec_type"] == 1 and _data["sec_level"] == 1:
|
code = _data["symbol"].split(".")[1]
|
if code.find("30") != 0 and code.find("68") != 0:
|
name = _data["sec_name"]
|
codes.append(code)
|
# 保存代码对应的名称
|
name_codes[name] = code
|
return codes, name_codes
|
|
|
# -------------------------------二板代码管理---------------------------------
|
|
def set_gp_list(code_datas):
|
codes, name_codes = __parse_codes_data(code_datas)
|
redis_instance = __redisManager.getRedis()
|
# 删除之前的
|
RedisUtils.delete(redis_instance, "gp_list")
|
CodesNameManager.clear_second_code_names()
|
for d in codes:
|
RedisUtils.sadd(redis_instance, "gp_list", d)
|
CodesNameManager.set_second_code_names(name_codes)
|
|
|
# 新增代码
|
def add_gp_list(code_datas):
|
if len(code_datas) > 200:
|
raise Exception("不能超过200个数据")
|
redis_instance = __redisManager.getRedis()
|
codes, name_codes = __parse_codes_data(code_datas)
|
for d in codes:
|
RedisUtils.sadd(redis_instance, "gp_list", d)
|
old_name_codes = CodesNameManager.list_second_code_name_dict()
|
if old_name_codes is None:
|
old_name_codes = name_codes
|
else:
|
for key in name_codes:
|
old_name_codes[key] = name_codes[key]
|
CodesNameManager.set_second_code_names(old_name_codes)
|
|
|
# -------------------------------首板代码管理-------------------------------
|
# 添加首板代码
|
# code_datas 掘金返回的数据
|
def set_first_gp_codes_with_data(code_datas):
|
redis_instance = __redisManager.getRedis()
|
codes, name_codes = __parse_codes_data(code_datas)
|
codes_set = set()
|
for code in codes:
|
codes_set.add(code)
|
old_codes_set = RedisUtils.smembers(redis_instance, "gp_list_first")
|
if old_codes_set is None:
|
old_codes_set = set()
|
del_set = old_codes_set - codes_set
|
add_codes = codes_set - old_codes_set
|
for code in add_codes:
|
RedisUtils.sadd(redis_instance, "gp_list_first", code)
|
for code in del_set:
|
redis_instance.srem("gp_list_first", code)
|
RedisUtils.expire(redis_instance, "gp_list_first", tool.get_expire())
|
|
old_name_codes = CodesNameManager.list_first_code_name_dict()
|
if old_name_codes is None:
|
old_name_codes = name_codes
|
else:
|
for key in name_codes:
|
old_name_codes[key] = name_codes[key]
|
CodesNameManager.set_first_code_names(old_name_codes)
|
|
|
# 移除首板代码
|
def remove_first_gp_code(codes):
|
redis_instance = __redisManager.getRedis()
|
for code in codes:
|
redis_instance.srem("gp_list_first", code)
|
|
|
# 获取首板代码
|
def get_first_gp_codes():
|
redis_instance = __redisManager.getRedis()
|
return RedisUtils.smembers(redis_instance, "gp_list_first")
|
|
|
# 是否在首板里面
|
def is_in_first_gp_codes(code):
|
redis_instance = __redisManager.getRedis()
|
return redis_instance.sismember("gp_list_first", code)
|
|
|
# 获取名称对应的代码
|
def get_name_code(name):
|
code = CodesNameManager.get_second_name_code(name)
|
if code is not None:
|
return code
|
code = CodesNameManager.get_first_name_code(name)
|
return code
|
|
|
# 代码名字缓存
|
__code_name_dict = {}
|
|
|
# 获取代码的名称
|
def get_code_name(code):
|
if code in __code_name_dict:
|
return __code_name_dict.get(code)
|
name = CodesNameManager.get_second_code_name(code)
|
if name is not None:
|
__code_name_dict[code] = name
|
return name
|
name = CodesNameManager.get_first_code_name(code)
|
if name:
|
__code_name_dict[code] = name
|
return name
|
|
|
def get_name_codes():
|
return CodesNameManager.list_code_name_dict()
|
|
|
# 涨停数据保存
|
def set_limit_up_list(gpset):
|
if gpset is None:
|
return
|
# 获取基本信息
|
redis_instance = __redisManager.getRedis()
|
# 删除之前的
|
RedisUtils.delete(redis_instance, "gp_limit_up_list")
|
for d in gpset:
|
RedisUtils.sadd(redis_instance, "gp_limit_up_list", json.dumps(d))
|
RedisUtils.expire(redis_instance, "gp_limit_up_list", tool.get_expire())
|
RedisUtils.setex(redis_instance, "gp_limit_up_list_update_time", tool.get_expire(), round(time.time() * 1000))
|
|
|
# 获取涨停列表
|
def get_limit_up_list():
|
redis_instance = __redisManager.getRedis()
|
return RedisUtils.get(redis_instance, "gp_limit_up_list_update_time"), RedisUtils.smembers(redis_instance,
|
"gp_limit_up_list")
|
|
|
def rm_gp(code):
|
redis_instance = __redisManager.getRedis()
|
redis_instance.srem("gp_list", code)
|
remove_first_gp_code([code])
|
|
|
def is_in_gp_pool(code):
|
redis_instance = __redisManager.getRedis()
|
return redis_instance.sismember("gp_list", code) or is_in_first_gp_codes(code)
|
|
|
def get_gp_list():
|
redis_instance = __redisManager.getRedis()
|
codes = RedisUtils.smembers(redis_instance, "gp_list")
|
first_codes = get_first_gp_codes()
|
return set.union(codes, first_codes)
|
|
|
# 获取二板代码
|
def get_second_gp_list():
|
redis_instance = __redisManager.getRedis()
|
codes = RedisUtils.smembers(redis_instance, "gp_list")
|
return codes
|
|
|
def get_gp_list_with_prefix(data=None):
|
if data is None:
|
data = get_gp_list()
|
list = []
|
for d in data:
|
if d[0:2] == '00':
|
list.append("SZSE.{}".format(d))
|
elif d[0:2] == '60':
|
list.append("SHSE.{}".format(d))
|
return list
|
|
|
# 获取收盘价
|
def get_price_pre(code):
|
redis_instance = __redisManager.getRedis()
|
result = RedisUtils.get(redis_instance, "price-pre-{}".format(code))
|
if result is not None:
|
return float(result)
|
return None
|
|
|
__price_pre_cache = {}
|
|
|
# 获取缓存
|
def get_price_pre_cache(code):
|
if code in __price_pre_cache:
|
return __price_pre_cache[code]
|
val = get_price_pre(code)
|
if val:
|
__price_pre_cache[code] = val
|
return val
|
|
|
# 设置收盘价
|
def set_price_pre(code, price, force=False):
|
codes = get_gp_list()
|
if code not in codes and not FirstCodeManager.is_in_first_record(code) and not force:
|
return
|
redis_instance = __redisManager.getRedis()
|
RedisUtils.setex(redis_instance, "price-pre-{}".format(code), tool.get_expire(), str(price))
|
|
|
__limit_up_price_dict = {}
|
|
|
# 获取涨停价
|
def get_limit_up_price(code):
|
# 读取内存中的值
|
if code in __limit_up_price_dict:
|
return __limit_up_price_dict[code]
|
price = get_price_pre(code)
|
if price is None:
|
return None
|
limit_up_price = tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("1.1"))
|
__limit_up_price_dict[code] = limit_up_price
|
return limit_up_price
|
|
|
def get_limit_up_price_by_preprice(price):
|
if price is None:
|
return None
|
return tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("1.1"))
|
|
|
# 获取跌停价
|
def get_limit_down_price(code):
|
price = get_price_pre(code)
|
if price is None:
|
return None
|
return tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("0.9"))
|
|
|
# 获取现价
|
def get_price(code):
|
redis_instance = __redisManager.getRedis()
|
result = RedisUtils.get(redis_instance, "price-{}".format(code))
|
if result is not None:
|
return float(result)
|
return None
|
|
|
# 设置现价
|
def set_price(code, price):
|
redis_instance = __redisManager.getRedis()
|
RedisUtils.setex(redis_instance, "price-{}".format(code), tool.get_expire(), price)
|
|
|
# 获取正在监听的代码
|
def get_listen_codes():
|
redis_instance = __redisManager.getRedis()
|
keys = RedisUtils.keys(redis_instance, "listen_code-*-*")
|
codes = set()
|
for k in keys:
|
code = RedisUtils.get(redis_instance, k)
|
if code is not None and len(code) > 0:
|
codes.add(code)
|
return codes
|
|
|
# 根据位置获取正在监听的代码
|
def get_listen_code_by_pos(client_id, pos):
|
redis_instance = __redisManager.getRedis()
|
key = "listen_code-{}-{}".format(client_id, pos)
|
value = RedisUtils.get(redis_instance, key)
|
# print("redis:", key,value)
|
return value
|
|
|
# 设置位置的监听代码
|
def set_listen_code_by_pos(client_id, pos, code):
|
redis_instance = __redisManager.getRedis()
|
RedisUtils.setex(redis_instance, "listen_code-{}-{}".format(client_id, pos), tool.get_expire(), code)
|
# 同步监听的代码集合
|
__sync_listen_codes_pos()
|
|
|
# 同步监听代码位置信息
|
def __sync_listen_codes_pos():
|
redis_instance = __redisManager.getRedis()
|
# 获取已经正在监听的代码
|
keys = RedisUtils.keys(redis_instance, "code_listen_pos-*")
|
codes_set = set()
|
for key in keys:
|
codes_set.add(key.replace("code_listen_pos-", ""))
|
|
keys = RedisUtils.keys(redis_instance, "listen_code-*-*")
|
|
for key in keys:
|
result = redis_instance.get(key)
|
if result:
|
# 移除需要添加的代码
|
codes_set.discard(result)
|
client_id_, pos_, code_ = int(key.split("-")[1]), int(key.split("-")[2]), result
|
key_ = "code_listen_pos-{}".format(code_)
|
val = redis_instance.get(key_)
|
if val is None:
|
RedisUtils.setex(redis_instance, key_, tool.get_expire(), json.dumps((client_id_, pos_)))
|
else:
|
val = json.loads(val)
|
if val[0] != client_id_ or val[1] != pos_:
|
RedisUtils.setex(redis_instance, key_, tool.get_expire(), json.dumps((client_id_, pos_)))
|
|
# 移除没有监听的代码
|
for code_ in codes_set:
|
RedisUtils.delete(redis_instance, code_)
|
|
|
# 初始化位置
|
def init_listen_code_by_pos(client_id, pos):
|
redis_instance = __redisManager.getRedis()
|
key = "listen_code-{}-{}".format(client_id, pos)
|
redis_instance.setnx(key, "")
|
RedisUtils.expire(redis_instance, key, tool.get_expire())
|
|
|
# 清除所有监听代码
|
def clear_listen_codes():
|
redis_instance = __redisManager.getRedis()
|
keys = RedisUtils.keys(redis_instance, "listen_code-*-*")
|
for key in keys:
|
RedisUtils.setex(redis_instance, key, tool.get_expire(), "")
|
|
|
def clear_first_codes():
|
redis_instance = __redisManager.getRedis()
|
RedisUtils.delete(redis_instance, "gp_list_first")
|
RedisUtils.delete(redis_instance, "gp_list_names_first")
|
RedisUtils.delete(redis_instance, "first_code_record")
|
RedisUtils.delete(redis_instance, "first_code_limited_up_record")
|
|
|
# 获取可以操作的位置
|
def get_can_listen_pos(client_id=0):
|
client_ids = []
|
if client_id <= 0:
|
client_ids = client_manager.getValidL2Clients()
|
else:
|
client_ids.append(client_id)
|
# random.shuffle(client_ids)
|
available_positions = []
|
for client_id in client_ids:
|
redis_instance = __redisManager.getRedis()
|
k = "listen_code-{}-*".format(client_id)
|
keys = RedisUtils.keys(redis_instance, k)
|
# random.shuffle(keys)
|
codes = []
|
for key in keys:
|
index = key.split("-")[-1]
|
if int(index) + 1 > constant.L2_CODE_COUNT_PER_DEVICE:
|
continue
|
result = redis_instance.get(key)
|
if result is None or len(result) == 0:
|
available_positions.append((client_id, int(key.replace("listen_code-{}-".format(client_id), ""))))
|
else:
|
codes.append((key, result))
|
# 查询是否有重复的代码
|
codes_set = set()
|
count = 0
|
for code in codes:
|
count = count + 1
|
codes_set.add(code[1])
|
if len(codes_set) < count:
|
return client_id, int(code[0].replace("listen_code-{}-".format(client_id), ""))
|
|
if available_positions:
|
# 获取健康状态
|
available_positions_health_states = l2_listen_pos_health_manager.list_health_state(available_positions)
|
# 尽量不分配第一个位置
|
available_positions_new = sorted(available_positions,
|
key=lambda x: (available_positions_health_states[x], 0 if x[1] == 0 else 1),
|
reverse=True)
|
# available_positions.sort(key=lambda x: available_positions_health_states[x], reverse=True)
|
# 取第1个数据
|
return available_positions_new[0][0], available_positions_new[0][1]
|
|
return None, None
|
|
|
# 获取可以操作的位置
|
def get_free_listen_pos_count():
|
client_ids = client_manager.getValidL2Clients()
|
free_count = 0
|
for client_id in client_ids:
|
redis_instance = __redisManager.getRedis()
|
k = "listen_code-{}-*".format(client_id)
|
keys = RedisUtils.keys(redis_instance, k)
|
for key in keys:
|
code = redis_instance.get(key)
|
if not code:
|
free_count += 1
|
return free_count
|
|
|
# 获取正在监听的代码的位置
|
def get_listen_code_pos(code):
|
redis_instance = __redisManager.getRedis()
|
val = redis_instance.get("code_listen_pos-{}".format(code))
|
if val is None:
|
return None, None
|
val = json.loads(val)
|
cid, pid = val[0], val[1]
|
code_ = get_listen_code_by_pos(cid, pid)
|
# 校验代码
|
if code_ == code:
|
return cid, pid
|
else:
|
return None, None
|
|
|
# 是否正在监听
|
def is_listen(code):
|
redis_instance = __redisManager.getRedis()
|
val = redis_instance.get("code_listen_pos-{}".format(code))
|
if val is None:
|
return False
|
else:
|
return True
|
# codes = get_listen_codes()
|
# return codes.__contains__(code)
|
|
|
def is_listen_old(code):
|
codes = get_listen_codes()
|
return codes.__contains__(code)
|
|
|
# 监听是否满了
|
def is_listen_full():
|
clients = client_manager.getValidL2Clients()
|
codes = get_listen_codes()
|
return len(codes) >= constant.L2_CODE_COUNT_PER_DEVICE * len(clients)
|
|
|
# 是否正在操作
|
def is_operate(code):
|
redis_instance = __redisManager.getRedis()
|
return redis_instance.get("gp_operate-{}".format(code)) is not None
|
|
|
# 设置正在操作的代码
|
def set_operate(code):
|
redis_instance = __redisManager.getRedis()
|
RedisUtils.setex(
|
redis_instance, "gp_operate-{}".format(code), 30, "1")
|
|
|
# 批量设置正在操作的代码
|
def set_operates(codes):
|
redis_instance = __redisManager.getRedis()
|
for code in codes:
|
RedisUtils.setex(
|
redis_instance, "gp_operate-{}".format(code), 30, "1")
|
|
|
# 移除正在操作的代码
|
def rm_operate(code):
|
redis_instance = __redisManager.getRedis()
|
RedisUtils.delete(redis_instance, "gp_operate-{}".format(code))
|
|
|
# 批量移除正在操作的代码
|
def rm_operates(codes):
|
redis_instance = __redisManager.getRedis()
|
for code in codes:
|
RedisUtils.delete(redis_instance, "gp_operate-{}".format(code))
|
|
|
if __name__ == '__main__':
|
get_can_listen_pos()
|