Administrator
2025-05-26 a846b46f15ad309a62fe400cf78dd7fc888155d7
code_attribute/gpcode_manager.py
@@ -1,27 +1,78 @@
"""
股票代码管理器
"""
import copy
import json
import time
import constant
from db import redis_manager_delegate as redis_manager
from db.mysql_data_delegate import Mysqldb
from db.redis_manager_delegate import RedisUtils
from log_module import log_export
from log_module.log import logger_pre_close_price
from log_module.log import logger_pre_close_price, logger_debug
from trade import trade_record_log_util
from utils import tool
import decimal
from ths import l2_listen_pos_health_manager, client_manager
__redisManager = redis_manager.RedisManager(0)
__db = 0
class BuyOpenLimitUpCodeManager:
    """
    排1代码管理
    """
    __db = 2
    __redisManager = redis_manager.RedisManager(2)
    __instance = None
    __codes_cache = set()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(BuyOpenLimitUpCodeManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def __load_data(cls):
        val = RedisUtils.get(cls.__get_redis(), "buy_open_limit_up_codes")
        if val:
            val = json.loads(val)
            cls.__codes_cache = set(val)
    def set_codes(self, codes):
        self.__codes_cache = copy.deepcopy(codes)
        RedisUtils.set_async(self.__db, "buy_open_limit_up_codes", json.dumps(list(codes)))
    def get_codes(self):
        return self.__codes_cache
    def is_in_cache(self, code):
        if not self.__codes_cache:
            return False
        if code in self.__codes_cache:
            return True
        return False
class CodesNameManager:
    __mysqldb = Mysqldb()
    __code_name_dict = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodesNameManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_data()
        return cls.__instance
    @classmethod
    def __load_data(cls):
        cls.__code_name_dict = cls.list_code_name_dict()
    @classmethod
    def list_code_name_dict(cls):
@@ -52,6 +103,18 @@
            cls.__mysqldb.execute(f"insert into code_name(code,code_name,update_time ) values('{code}','{name}',now())")
    @classmethod
    def add_code_names(cls, code_name_dict):
        """
        批量添加代码名称
        @param code_name_dict:
        @return:
        """
        for code in code_name_dict:
            if code in cls.__code_name_dict and cls.__code_name_dict[code] == code_name_dict[code]:
                continue
            cls.add_code_name(code, code_name_dict[code])
    @classmethod
    def add_first_code_name(cls, code, name):
        cls.add_code_name(code, name)
@@ -69,8 +132,6 @@
            cls.__instance.redisManager = redis_manager.RedisManager(0)
            cls.__instance.__first_code_record_cache = RedisUtils.smembers(cls.__instance.__get_redis(),
                                                                           "first_code_record")
            cls.__instance.__first_code_limited_up_record_cache = RedisUtils.smembers(cls.__instance.__get_redis(),
                                                                                      "first_code_limited_up_record")
        return cls.__instance
    def __get_redis(self):
@@ -96,31 +157,11 @@
    def is_in_first_record_cache(self, code):
        return code in self.__first_code_record_cache
    # 加入首板涨停过代码集合
    def add_limited_up_record(self, codes):
        hasChanged = False
        for code in codes:
            if code not in self.__first_code_limited_up_record_cache:
                RedisUtils.sadd_async(self.__db, "first_code_limited_up_record", code)
                hasChanged = True
            self.__first_code_limited_up_record_cache.add(code)
        if hasChanged:
            RedisUtils.expire_async(self.__db, "first_code_limited_up_record", tool.get_expire())
    # 是否涨停过
    def is_limited_up(self, code):
        if RedisUtils.sismember(self.__get_redis(), "first_code_limited_up_record", code):
            return True
        else:
            return False
    def is_limited_up_cache(self, code):
        return code in self.__first_code_limited_up_record_cache
# 想要买的代码
class WantBuyCodesManager:
    __instance = None
    __db = 0
    redisManager = redis_manager.RedisManager(0)
    __redis_key = "want_buy_codes"
@@ -149,7 +190,7 @@
    def remove_code(self, code):
        self.__want_buy_codes_cache.discard(code)
        RedisUtils.srem(self.__get_redis(), self.__redis_key, code)
        RedisUtils.srem_async(self.__db, self.__redis_key, code)
    def sync(self):
        codes = self.list_code()
@@ -168,6 +209,81 @@
    def list_code_cache(self):
        return self.__want_buy_codes_cache
@tool.singleton
class HumanRemoveForbiddenManager:
    """
    认为移黑管理
    """
    __db = 0
    redisManager = redis_manager.RedisManager(0)
    __redis_key = "human_remove_forbidden_codes"
    __codes_cache = set()
    def __init__(self):
        self.__load_data()
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    def __load_data(self):
        self.__codes_cache = RedisUtils.smembers(self.__get_redis(), self.__redis_key)
        if self.__codes_cache is None:
            self.__codes_cache = set()
    def add_code(self, code):
        self.__codes_cache.add(code)
        RedisUtils.sadd_async(self.__db, self.__redis_key, code)
        RedisUtils.expire_async(self.__db, self.__redis_key, tool.get_expire())
    def remove_code(self, code):
        self.__codes_cache.discard(code)
        RedisUtils.srem_async(self.__db, self.__redis_key, code)
    def is_in_cache(self, code):
        return code in self.__codes_cache
@tool.singleton
class HumanForbiddenManager:
    """
    人为拉黑管理
    """
    __db = 0
    redisManager = redis_manager.RedisManager(0)
    __redis_key = "human_forbidden_codes"
    __codes_cache = set()
    def __init__(self):
        self.__load_data()
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    def __load_data(self):
        self.__codes_cache = RedisUtils.smembers(self.__get_redis(), self.__redis_key)
        if self.__codes_cache is None:
            self.__codes_cache = set()
    def add_code(self, code):
        trade_record_log_util.add_common_msg(code, "人为加黑", f"")
        self.__codes_cache.add(code)
        RedisUtils.sadd_async(self.__db, self.__redis_key, code)
        RedisUtils.expire_async(self.__db, self.__redis_key, tool.get_expire())
    def remove_code(self, code):
        trade_record_log_util.add_common_msg(code, "人为移黑", f"")
        self.__codes_cache.discard(code)
        RedisUtils.srem_async(self.__db, self.__redis_key, code)
    def is_in_cache(self, code):
        return code in self.__codes_cache
# 暂停下单代码管理
@@ -285,6 +401,7 @@
class WhiteListCodeManager:
    __instance = None
    __redis_manager = redis_manager.RedisManager(2)
    __human_remove_codes = set()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
@@ -305,14 +422,21 @@
        if data:
            self.__white_codes_cache |= data
    def add_code(self, code):
    def add_code(self, code, is_human=False):
        if not is_human and code in self.__human_remove_codes:
            # 机器加白,且被人为移白了就不能再加白
            return
        self.__white_codes_cache.add(code)
        RedisUtils.sadd(self.__get_redis(), "white_list_codes", code)
        RedisUtils.expire(self.__get_redis(), "white_list_codes", tool.get_expire())
    def remove_code(self, code):
    def remove_code(self, code, is_human=False):
        self.__white_codes_cache.discard(code)
        RedisUtils.srem(self.__get_redis(), "white_list_codes", code)
        if is_human:
            self.human_remove(code)
    def is_in(self, code):
        return RedisUtils.sismember(self.__get_redis(), "white_list_codes", code)
@@ -330,6 +454,24 @@
        self.__white_codes_cache.clear()
        RedisUtils.delete(self.__get_redis(), "white_list_codes")
    def human_remove(self, code):
        """
        人为移白
        @param code:
        @return:
        """
        self.__human_remove_codes.add(code)
    def clear_huamn_info(self, code):
        """
        移除人为干预信息
        @param code:
        @return:
        """
        if code in self.__human_remove_codes:
            self.__human_remove_codes.discard(code)
class BlackListCodeManager:
    __instance = None
@@ -343,6 +485,7 @@
            # 获取交易窗口的锁
            cls.__instance.__forbidden_trade_codes_cache = RedisUtils.smembers(cls.__get_redis(),
                                                                               "forbidden-trade-codes")
            logger_debug.info(f"加载加黑列表:{cls.__instance.__forbidden_trade_codes_cache}")
        return cls.__instance
@@ -383,6 +526,62 @@
    def clear(self):
        self.__forbidden_trade_codes_cache.clear()
        RedisUtils.delete(self.__get_redis(), "forbidden-trade-codes")
class GreenListCodeManager:
    """
    绿名单:买入即加红,撤单不移红
    """
    __instance = None
    __db = 2
    __redis_manager = redis_manager.RedisManager(2)
    __codes_set = set()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(GreenListCodeManager, cls).__new__(cls, *args, **kwargs)
            # 初始化设置
            # 获取交易窗口的锁
            cls.__codes_set = RedisUtils.smembers(cls.__get_redis(), "green-trade-codes")
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def add_code(self, code):
        self.__codes_set.add(code)
        RedisUtils.sadd_async(self.__db, "green-trade-codes", code)
        RedisUtils.expire_async(self.__db, "green-trade-codes", tool.get_expire())
    def sync(self):
        data = RedisUtils.smembers(self.__get_redis(),
                                   "green-trade-codes")
        self.__codes_set.clear()
        if data:
            self.__codes_set |= data
    def remove_code(self, code):
        self.__codes_set.discard(code)
        RedisUtils.srem_async(self.__db, "green-trade-codes", code)
    def is_in(self, code):
        return RedisUtils.sismember(self.__get_redis(), "green-trade-codes", code)
    def is_in_cache(self, code):
        return code in self.__codes_set
    def list_codes(self):
        codes = RedisUtils.smembers(self.__get_redis(), "green-trade-codes")
        self.__codes_set = codes
        return codes
    def list_codes_cache(self):
        return self.__codes_set
    def clear(self):
        self.__codes_set.clear()
        RedisUtils.delete(self.__get_redis(), "green-trade-codes")
def __parse_codes_data(code_datas):
@@ -499,7 +698,7 @@
# 获取代码的名称
def get_code_name(code):
    return CodesNameManager.get_code_name(code)
    return CodesNameManager().get_code_name(code)
def get_name_codes():
@@ -595,8 +794,7 @@
    # 设置收盘价
    @classmethod
    def set_price_pre(cls, code, price, force=False):
        codes = get_gp_list()
        if code not in codes and not FirstCodeManager().is_in_first_record_cache(code) and not force:
        if code in cls.__price_pre_cache and not force:
            return
        price = round(float(price), 2)
        logger_pre_close_price.info(f"{code}-{price}")
@@ -617,6 +815,13 @@
    limit_up_price = tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal(tool.get_limit_up_rate(code)))
    __limit_up_price_dict[code] = limit_up_price
    return limit_up_price
def get_limit_up_price_as_num(code):
    limit_up_price = get_limit_up_price(code)
    if limit_up_price:
        return round(float(limit_up_price), 2)
    return None
def get_limit_up_price_cache(code):
@@ -766,143 +971,4 @@
        RedisUtils.delete(redis_instance, "first_code_record", auto_free=False)
        RedisUtils.delete(redis_instance, "first_code_limited_up_record", auto_free=False)
    finally:
        RedisUtils.realse(redis_instance)
# 获取可以操作的位置
def get_can_listen_pos(client_id=0):
    client_ids = []
    if client_id <= 0:
        client_ids = client_manager.getValidL2Clients()
    else:
        client_ids.append(client_id)
    # random.shuffle(client_ids)
    available_positions = []
    for client_id in client_ids:
        redis_instance = __redisManager.getRedis()
        k = "listen_code-{}-*".format(client_id)
        keys = RedisUtils.keys(redis_instance, k, auto_free=False)
        # random.shuffle(keys)
        codes = []
        for key in keys:
            index = key.split("-")[-1]
            if int(index) + 1 > constant.L2_CODE_COUNT_PER_DEVICE:
                continue
            result = RedisUtils.get(redis_instance, key, auto_free=False)
            if result is None or len(result) == 0:
                available_positions.append((client_id, int(key.replace("listen_code-{}-".format(client_id), ""))))
            else:
                codes.append((key, result))
        RedisUtils.realse(redis_instance)
        # 查询是否有重复的代码
        codes_set = set()
        count = 0
        for code in codes:
            count = count + 1
            codes_set.add(code[1])
            if len(codes_set) < count:
                return client_id, int(code[0].replace("listen_code-{}-".format(client_id), ""))
    if available_positions:
        # 获取健康状态
        available_positions_health_states = l2_listen_pos_health_manager.list_health_state(available_positions)
        # 尽量不分配第一个位置
        available_positions_new = sorted(available_positions,
                                         key=lambda x: (available_positions_health_states[x], 0 if x[1] == 0 else 1),
                                         reverse=True)
        # available_positions.sort(key=lambda x: available_positions_health_states[x], reverse=True)
        # 取第1个数据
        return available_positions_new[0][0], available_positions_new[0][1]
    return None, None
# 获取可以操作的位置
def get_free_listen_pos_count():
    client_ids = client_manager.getValidL2Clients()
    free_count = 0
    for client_id in client_ids:
        redis_instance = __redisManager.getRedis()
        try:
            k = "listen_code-{}-*".format(client_id)
            keys = RedisUtils.keys(redis_instance, k, auto_free=False)
            for key in keys:
                code = RedisUtils.get(redis_instance, key, auto_free=False)
                if not code:
                    free_count += 1
        finally:
            RedisUtils.realse(redis_instance)
    return free_count
# 获取正在监听的代码的位置
def get_listen_code_pos(code):
    val = RedisUtils.get(__redisManager.getRedis(), "code_listen_pos-{}".format(code))
    if val is None:
        return None, None
    val = json.loads(val)
    cid, pid = val[0], val[1]
    code_ = get_listen_code_by_pos(cid, pid)
    # 校验代码
    if code_ == code:
        return cid, pid
    else:
        return None, None
# 是否正在监听
def is_listen(code):
    val = RedisUtils.get(__redisManager.getRedis(), "code_listen_pos-{}".format(code))
    if val is None:
        return False
    else:
        return True
    # codes = get_listen_codes()
    # return codes.__contains__(code)
def is_listen_old(code):
    codes = get_listen_codes()
    return codes.__contains__(code)
# 监听是否满了
def is_listen_full():
    clients = client_manager.getValidL2Clients()
    codes = get_listen_codes()
    return len(codes) >= constant.L2_CODE_COUNT_PER_DEVICE * len(clients)
# 是否正在操作
def is_operate(code):
    return RedisUtils.get(__redisManager.getRedis(), "gp_operate-{}".format(code)) is not None
# 设置正在操作的代码
def set_operate(code):
    RedisUtils.setex(__redisManager.getRedis(), "gp_operate-{}".format(code), 30, "1")
# 批量设置正在操作的代码
def set_operates(codes):
    for code in codes:
        RedisUtils.setex(__redisManager.getRedis(), "gp_operate-{}".format(code), 30, "1")
# 移除正在操作的代码
def rm_operate(code):
    RedisUtils.delete(__redisManager.getRedis(), "gp_operate-{}".format(code))
# 批量移除正在操作的代码
def rm_operates(codes):
    redis_instance = __redisManager.getRedis()
    try:
        for code in codes:
            RedisUtils.delete(redis_instance, "gp_operate-{}".format(code), auto_free=False)
    finally:
        RedisUtils.realse(redis_instance)
if __name__ == '__main__':
    get_can_listen_pos()
        RedisUtils.realse(redis_instance)