Administrator
2023-01-06 59fba698b03a51a8da5b56a919ebbf94d4784f74
gpcode_manager.py
@@ -1,40 +1,95 @@
"""
股票代码管理器
"""
import code
import json
import random
import time
import authority
import global_util
import client_manager
import redis_manager
import tool
import juejin
import data_process
import decimal
__redisManager = redis_manager.RedisManager(0)
def set_gp_list(gpset):
    # 获取基本信息
    code_datas = juejin.JueJinManager.get_gp_latest_info(gpset)
def __parse_codes_data(code_datas):
    codes = []
    name_codes = {}
    for _data in code_datas:
        # 正常的股票
        if _data["sec_type"] == 1 and _data["sec_level"] == 1:
            code = _data["symbol"].split(".")[1]
            if code.find("30") != 0 and code.find("68") != 0:
                name = _data["sec_name"]
                codes.append(code)
                # 保存代码对应的名称
                name_codes[name] = code
    return codes, name_codes
def set_gp_list(code_datas):
    codes, name_codes = __parse_codes_data(code_datas)
    redis_instance = __redisManager.getRedis()
    # 删除之前的
    redis_instance.delete("gp_list")
    redis_instance.delete("gp_list_names")
    for d in codes:
        redis_instance.sadd("gp_list", d)
    redis_instance.set("gp_list_names", json.dumps(name_codes))
# 新增代码
def add_gp_list(code_datas):
    redis_instance = __redisManager.getRedis()
    codes, name_codes = __parse_codes_data(code_datas)
    for d in codes:
        redis_instance.sadd("gp_list", d)
    old_name_codes = get_name_codes()
    if old_name_codes is None:
        old_name_codes = name_codes
    else:
        for key in name_codes:
            old_name_codes[key] = name_codes[key]
    redis_instance.set("gp_list_names", json.dumps(old_name_codes))
# 获取名称对应的代码
def get_name_code(name):
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("gp_list_names")
    if not val:
        return None
    val = json.loads(val)
    return val.get(name)
def get_code_name(code):
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("gp_list_names")
    if not val:
        return None
    val = json.loads(val)
    for key in val:
        if val[key] == code:
            return key
    return None
def get_name_codes():
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("gp_list_names")
    if not val:
        return None
    val = json.loads(val)
    return val
# 涨停犁碑坳
def set_limit_up_list(gpset):
    # 保存到内存中
    global_util.add_limit_up_codes(gpset)
    if gpset is None:
        return
    # 获取基本信息
    redis_instance = __redisManager.getRedis()
    # 删除之前的
@@ -42,7 +97,7 @@
    for d in gpset:
        redis_instance.sadd("gp_limit_up_list", json.dumps(d))
    redis_instance.expire("gp_limit_up_list", tool.get_expire())
    redis_instance.setex("gp_limit_up_list_update_time",tool.get_expire(),round( time.time()*1000))
    redis_instance.setex("gp_limit_up_list_update_time", tool.get_expire(), round(time.time() * 1000))
# 获取涨停列表
@@ -89,16 +144,30 @@
# 设置收盘价
def set_price_pre(code, price):
    codes= get_gp_list()
    codes = get_gp_list()
    if code not in codes:
        return
    redis_instance = __redisManager.getRedis()
    redis_instance.setex("price-pre-{}".format(code), tool.get_expire(), str(price))
__limit_up_price_dict = {}
# 获取涨停价
def get_limit_up_price(code):
    # 读取内存中的值
    if code in __limit_up_price_dict:
        return __limit_up_price_dict[code]
    price = get_price_pre(code)
    if price is None:
        return None
    limit_up_price = tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("1.1"))
    __limit_up_price_dict[code] = limit_up_price
    return limit_up_price
def get_limit_up_price_by_preprice(price):
    if price is None:
        return None
    return tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("1.1"))
@@ -142,17 +211,49 @@
# 根据位置获取正在监听的代码
def get_listen_code_by_pos(client_id, pos):
    redis_instance = __redisManager.getRedis()
    key="listen_code-{}-{}".format(client_id, pos)
    key = "listen_code-{}-{}".format(client_id, pos)
    value = redis_instance.get(key)
    # print("redis:", key,value)
    return value
# 设置位置的监听代码
def set_listen_code_by_pos(client_id, pos, code):
    redis_instance = __redisManager.getRedis()
    redis_instance.setex("listen_code-{}-{}".format(client_id, pos), tool.get_expire(), code)
    # 同步监听的代码集合
    __sync_listen_codes_pos()
# 同步监听代码位置信息
def __sync_listen_codes_pos():
    redis_instance = __redisManager.getRedis()
    # 获取已经正在监听的代码
    keys = redis_instance.keys("code_listen_pos-*")
    codes_set = set()
    for key in keys:
        codes_set.add(key.replace("code_listen_pos-", ""))
    keys = redis_instance.keys("listen_code-*-*")
    for key in keys:
        result = redis_instance.get(key)
        if result:
            # 移除需要添加的代码
            codes_set.discard(result)
            client_id_, pos_, code_ = int(key.split("-")[1]), int(key.split("-")[2]), result
            key_ = "code_listen_pos-{}".format(code_)
            val = redis_instance.get(key_)
            if val is None:
                redis_instance.setex(key_, tool.get_expire(), json.dumps((client_id_, pos_)))
            else:
                val = json.loads(val)
                if val[0] != client_id_ or val[1] != pos_:
                    redis_instance.setex(key_, tool.get_expire(), json.dumps((client_id_, pos_)))
    # 移除没有监听的代码
    for code_ in codes_set:
        redis_instance.delete(code_)
# 初始化位置
@@ -167,7 +268,7 @@
def get_can_listen_pos(client_id=0):
    client_ids = []
    if client_id <= 0:
        client_ids = data_process.getValidL2Clients()
        client_ids = client_manager.getValidL2Clients()
    else:
        client_ids.append(client_id)
    random.shuffle(client_ids)
@@ -200,25 +301,35 @@
# 获取正在监听的代码的位置
def get_listen_code_pos(code):
    redis_instance = __redisManager.getRedis()
    keys = redis_instance.keys("listen_code-*-*")
    for key in keys:
        result = redis_instance.get(key)
        if result is not None and code == result:
            return key.split("-")[1], key.split("-")[2]
    return None, None
    val = redis_instance.get("code_listen_pos-{}".format(code))
    if val is None:
        return None, None
    val = json.loads(val)
    return val[0], val[1]
# 是否正在监听
def is_listen(code):
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("code_listen_pos-{}".format(code))
    if val is None:
        return False
    else:
        return True
    # codes = get_listen_codes()
    # return codes.__contains__(code)
def is_listen_old(code):
    codes = get_listen_codes()
    return codes.__contains__(code)
# 监听是否满了
def is_listen_full():
    clients = data_process.getValidL2Clients()
    clients = client_manager.getValidL2Clients()
    codes = get_listen_codes()
    return len(codes) >= 8*len(clients)
    return len(codes) >= 8 * len(clients)
# 是否正在操作
@@ -233,17 +344,29 @@
    redis_instance.setex("gp_operate-{}".format(code), 30, "1")
# 批量设置正在操作的代码
def set_operates(codes):
    redis_instance = __redisManager.getRedis()
    for code in codes:
        redis_instance.setex("gp_operate-{}".format(code), 30, "1")
# 移除正在操作的代码
def rm_operate(code):
    redis_instance = __redisManager.getRedis()
    redis_instance.delete("gp_operate-{}".format(code))
# 批量移除正在操作的代码
def rm_operates(codes):
    redis_instance = __redisManager.getRedis()
    for code in codes:
        redis_instance.delete("gp_operate-{}".format(code))
if __name__ == '__main__':
    # print(get_can_listen_pos(0))
    # print(get_listen_codes())
    # print(is_listen_full())
    # print(is_listen("002271"))
    # print(get_listen_code_pos("002272"))
    code= get_listen_code_by_pos(2, 7)
    print(code)
    _start = time.time()
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("code_listen_pos-{}".format("603786"))
    print(json.loads(val))
    print((time.time() - _start) * 1000)