Administrator
2024-06-07 2f82a47b1744e27f9667181e6dba1348edfd3ef6
代码名称优化
4个文件已修改
200 ■■■■ 已修改文件
code_attribute/gpcode_manager.py 195 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_manager.py
@@ -6,6 +6,7 @@
import constant
from db import redis_manager_delegate as redis_manager
from db.mysql_data_delegate import Mysqldb
from db.redis_manager_delegate import RedisUtils
from log_module import log_export
from log_module.log import logger_pre_close_price
@@ -19,116 +20,40 @@
class CodesNameManager:
    __db = 0
    redisManager = redis_manager.RedisManager(0)
    __gp_list_names_first_cache = []
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    __mysqldb = Mysqldb()
    __code_name_dict = {}
    @classmethod
    def list_code_name_dict(cls):
        # 获取所有的代码名称对应表
        dict_ = {}
        val = cls.list_first_code_name_dict()
        if val is not None:
            for k in val:
                dict_[k] = val[k]
        val = cls.list_second_code_name_dict()
        if val is not None:
            for k in val:
                dict_[k] = val[k]
        fresults = cls.__mysqldb.select_all("select code, code_name from code_name")
        for result in fresults:
            dict_[result[0]] = result[1]
        return dict_
    @classmethod
    def list_first_code_name_dict(cls):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
        if val is not None:
            val = json.loads(val)
            return val
    def get_code_name(cls, code):
        if code in cls.__code_name_dict:
            return cls.__code_name_dict[code]
        fresults = cls.__mysqldb.select_one(f"select code_name from code_name where code = {code}")
        if fresults:
            cls.__code_name_dict[code] = fresults[0]
            return fresults[0]
        return None
    @classmethod
    def list_first_code_name_dict_cache(cls):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
        if val is not None:
            val = json.loads(val)
            return val
        return None
    @classmethod
    def get_first_code_name(cls, code):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
        if not val:
            return None
        val = json.loads(val)
        for k in val:
            if val[k] == code:
                return k
        return None
    @classmethod
    def get_first_name_code(cls, name):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
        if not val:
            return None
        val = json.loads(val)
        return val.get(name)
    def add_code_name(cls, code, name):
        cls.__code_name_dict[code] = name
        fresults = cls.__mysqldb.select_one(f"select code_name from code_name where code = {code}")
        if fresults:
            cls.__mysqldb.execute(f"update code_name set code_name ='{name}', update_time = now() where code= {code}")
        else:
            cls.__mysqldb.execute(f"insert into code_name(code,code_name,update_time ) values({code},'{name}',now())")
    @classmethod
    def add_first_code_name(cls, code, name):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
        if not val:
            return None
        val = json.loads(val)
        val[name] = code
        cls.set_first_code_names(val)
    # 设置首板代码名称
    @classmethod
    def set_first_code_names(cls, datas):
        RedisUtils.set_async(cls.__db, "gp_list_names_first", json.dumps(datas))
    # 删除首板代码名称
    @classmethod
    def clear_first_code_names(cls):
        RedisUtils.delete(cls.__get_redis(), "gp_list_names_first")
    @classmethod
    def list_second_code_name_dict(cls):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names")
        if val is not None:
            val = json.loads(val)
            return val
        return None
    @classmethod
    def get_second_code_name(cls, code):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names")
        if not val:
            return None
        val = json.loads(val)
        for k in val:
            if val[k] == code:
                return k
    @classmethod
    def get_second_name_code(cls, name):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names")
        if not val:
            return None
        val = json.loads(val)
        return val.get(name)
    # 设置二板代码名称
    @classmethod
    def set_second_code_names(cls, datas):
        RedisUtils.set(cls.__get_redis(), "gp_list_names", json.dumps(datas))
    # 设置二板代码名称
    @classmethod
    def clear_second_code_names(cls):
        RedisUtils.delete(cls.__get_redis(), "gp_list_names")
        cls.add_code_name(code, name)
# 首板代码管理
@@ -467,48 +392,12 @@
        # 正常的股票
        if _data["sec_type"] == 1 and _data["sec_level"] == 1:
            code = _data["symbol"].split(".")[1]
            if code.find("30") != 0 and code.find("68") != 0:
            if tool.is_can_buy_code(code):
                name = _data["sec_name"]
                codes.append(code)
                # 保存代码对应的名称
                name_codes[name] = code
    return codes, name_codes
# -------------------------------二板代码管理---------------------------------
def set_gp_list(code_datas):
    codes, name_codes = __parse_codes_data(code_datas)
    redis_instance = __redisManager.getRedis()
    try:
        # 删除之前的
        RedisUtils.delete(redis_instance, "gp_list", auto_free=False)
        CodesNameManager.clear_second_code_names()
        for d in codes:
            RedisUtils.sadd(redis_instance, "gp_list", d, auto_free=False)
        CodesNameManager.set_second_code_names(name_codes)
    finally:
        RedisUtils.realse(redis_instance)
# 新增代码
def add_gp_list(code_datas):
    if len(code_datas) > 200:
        raise Exception("不能超过200个数据")
    redis_instance = __redisManager.getRedis()
    try:
        codes, name_codes = __parse_codes_data(code_datas)
        for d in codes:
            RedisUtils.sadd(redis_instance, "gp_list", d, auto_free=False)
        old_name_codes = CodesNameManager.list_second_code_name_dict()
        if old_name_codes is None:
            old_name_codes = name_codes
        else:
            for key in name_codes:
                old_name_codes[key] = name_codes[key]
        CodesNameManager.set_second_code_names(old_name_codes)
    finally:
        RedisUtils.realse(redis_instance)
# -------------------------------首板代码管理-------------------------------
@@ -544,7 +433,7 @@
            # 正常的股票
            if _data["sec_type"] == 1 and _data["sec_level"] == 1:
                code = _data["symbol"].split(".")[1]
                if code.find("30") != 0 and code.find("68") != 0:
                if tool.is_can_buy_code(code):
                    name = _data["sec_name"]
                    codes.append(code)
                    # 保存代码对应的名称
@@ -554,7 +443,6 @@
    # 添加首板代码
    # code_datas 掘金返回的数据
    def set_first_gp_codes_with_data(self, code_datas):
        redis_instance = self.__get_redis()
        try:
            codes, name_codes = self.__parse_codes_data(code_datas)
            codes_set = set()
@@ -574,16 +462,10 @@
            # 更新缓存
            self.__gp_list_first_codes_cache.clear()
            self.__gp_list_first_codes_cache |= codes_set
            old_name_codes = CodesNameManager.list_first_code_name_dict()
            if old_name_codes is None:
                old_name_codes = name_codes
            else:
                for key in name_codes:
                    old_name_codes[key] = name_codes[key]
            CodesNameManager.set_first_code_names(old_name_codes)
            for key in name_codes:
                CodesNameManager.add_code_name(name_codes[key], key)
        finally:
            RedisUtils.realse(redis_instance)
            pass
    # 移除首板代码
    def remove_first_gp_code(self, codes):
@@ -611,35 +493,18 @@
        return code in self.__gp_list_first_codes_cache
# 获取名称对应的代码
def get_name_code(name):
    code = CodesNameManager.get_second_name_code(name)
    if code is not None:
        return code
    code = CodesNameManager.get_first_name_code(name)
    return code
# 代码名字缓存
__code_name_dict = {}
# 获取代码的名称
def get_code_name(code):
    if code in __code_name_dict:
        return __code_name_dict.get(code)
    name = CodesNameManager.get_second_code_name(code)
    if name is not None:
        __code_name_dict[code] = name
        return name
    name = CodesNameManager.get_first_code_name(code)
    if name:
        __code_name_dict[code] = name
    return name
    return CodesNameManager.get_code_name(code)
def get_name_codes():
    return CodesNameManager.list_code_name_dict()
    code_name_dict = CodesNameManager.list_code_name_dict()
    return {code_name_dict[x]: x for x in code_name_dict}
# 涨停数据保存
main.py
@@ -10,7 +10,6 @@
logger_system.info("程序启动Pre:{}", os.getpid())
import constant
from db import redis_manager_delegate as redis_manager
import huaxin_client.trade_client
import huaxin_client.l2_client
third_data/kpl_data_manager.py
@@ -455,7 +455,7 @@
        log.close_print()
        while True:
            try:
                if (tool.is_trade_time() and int(tool.get_now_time_str().replace(':', '')) > int("092530")) or True:
                if (tool.is_trade_time() and int(tool.get_now_time_str().replace(':', '')) > int("092530")):
                    results = kpl_api.getLimitUpInfoNew()
                    result = json.loads(results)
                    start_time = time.time()
utils/tool.py
@@ -313,7 +313,7 @@
def is_can_buy_code(code):
    if code.find("00") == 0 or code.find("60") == 0 or code.find("30") == 0:
    if code.find("00") == 0 or code.find("60") == 0:# or code.find("30") == 0:
        return True
    return False