admin
2023-08-22 c057275036cd3e28f2de146e5993c6f97016ffdb
bug修改
6个文件已修改
16个文件已添加
4137 ■■■■■ 已修改文件
code_attribute/code_price_manager.py 156 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_manager.py 787 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_server.py 537 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_analyse.py 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_export.py 307 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_api_server.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
middle_server.py 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/code_info_output.py 137 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/limit_up_data_filter.py 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/output_util.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
socket_manager.py 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 614 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_api.py 111 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_block_util.py 173 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 377 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_util.py 268 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_util.py 140 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 306 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/global_util.py 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/hosting_api_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/code_price_manager.py
New file
@@ -0,0 +1,156 @@
"""
代码价格管理
"""
import json
from db.redis_manager import RedisUtils
from utils import tool
from db import redis_manager as redis_manager
class Buy1PriceManager:
    __db = 1
    __redisManager = redis_manager.RedisManager(1)
    __latest_data = {}
    __current_buy_1_price = {}
    __buy1_price_info_cache = {}
    __open_limit_up_lowest_price_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(Buy1PriceManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def __load_datas(cls):
        redis_ = cls.__get_redis()
        try:
            keys = RedisUtils.keys(redis_, "buy1_price_limit_up_info-*")
            for key in keys:
                code = key.split("-")[-1]
                val = RedisUtils.get(redis_, key)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.__buy1_price_info_cache, code, val)
            keys = RedisUtils.keys(redis_, "buy1_price-*")
            for key in keys:
                code = key.split("-")[-1]
                val = RedisUtils.get(redis_, key)
                val = round(float(val), 2)
                tool.CodeDataCacheUtil.set_cache(cls.__current_buy_1_price, code, val)
            keys = RedisUtils.keys(redis_, "open_limit_up_lowest_price-*")
            for key in keys:
                code = key.split("-")[-1]
                val = RedisUtils.get(redis_, key)
                val = round(float(val), 2)
                tool.CodeDataCacheUtil.set_cache(cls.__open_limit_up_lowest_price_cache, code, val)
        finally:
            RedisUtils.realse(redis_)
    # 保存买1价格信息
    def __save_buy1_price_info(self, code, limit_up_time, open_limit_up_time):
        tool.CodeDataCacheUtil.set_cache(self.__buy1_price_info_cache, code, (limit_up_time, open_limit_up_time))
        RedisUtils.setex_async(self.__db, f"buy1_price_limit_up_info-{code}", tool.get_expire(),
                               json.dumps((limit_up_time, open_limit_up_time)))
    def __get_buy1_price_info(self, code):
        data = RedisUtils.get(self.__get_redis(), f"buy1_price_limit_up_info-{code}")
        if not data:
            return None, None
        data = json.loads(data)
        return data[0], data[1]
    def __get_buy1_price_info_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy1_price_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None, None
    def __save_buy1_price(self, code, buy_1_price):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code)
        if cache_result[0] and abs(cache_result[1] - float(buy_1_price)) < 0.001:
            return
        tool.CodeDataCacheUtil.set_cache(self.__current_buy_1_price, code, buy_1_price)
        RedisUtils.setex_async(self.__db, f"buy1_price-{code}", tool.get_expire(), buy_1_price)
    # datas:[(code, buy_1_price)]
    def __save_buy1_prices(self, datas):
        for d in datas:
            code = d[0]
            buy_1_price = d[1]
            # 不保存重复的数据
            self.__save_buy1_price(code, buy_1_price)
    def __get_buy1_price(self, code):
        return RedisUtils.get(self.__get_redis(), f"buy1_price-{code}")
    def __get_buy1_price_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    # 设置炸板后的最低价
    def __save_open_limit_up_lowest_price(self, code, price):
        tool.CodeDataCacheUtil.set_cache(self.__open_limit_up_lowest_price_cache, code, round(float(price), 2))
        RedisUtils.setex_async(self.__db, f"open_limit_up_lowest_price-{code}", tool.get_expire(), f"{price}")
    def __get_open_limit_up_lowest_price(self, code):
        return RedisUtils.get(self.__get_redis(), f"open_limit_up_lowest_price-{code}")
    def __get_open_limit_up_lowest_price_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__open_limit_up_lowest_price_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    def set_open_limit_up_lowest_price(self, code, price):
        old_price = self.__get_open_limit_up_lowest_price_cache(code)
        if not old_price or float(old_price) - float(price) > 0.001:
            self.__save_open_limit_up_lowest_price(code, price)
    def get_buy1_price(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__current_buy_1_price, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    def get_open_limit_up_lowest_price(self, code):
        price = self.__get_open_limit_up_lowest_price_cache(code)
        return price
    # 是否可以下单
    def is_can_buy(self, code):
        old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_info_cache(code)
        if old_limit_up_time and old_open_limit_up_time:
            return True
        return False
    # 获取涨停信息
    # 返回涨停时间与炸板时间
    def get_limit_up_info(self, code):
        old_limit_up_time, old_open_limit_up_time = self.__get_buy1_price_info_cache(code)
        return old_limit_up_time, old_open_limit_up_time
    # 设置涨停时间
    def set_limit_up_time(self, code, time_str):
        limit_up_time, open_limit_up_time = self.get_limit_up_info(code)
        if limit_up_time is None:
            self.__save_buy1_price_info(code, time_str, None)
if __name__ == "__main__":
    print(Buy1PriceManager().get_limit_up_info("002777"))
code_attribute/gpcode_manager.py
New file
@@ -0,0 +1,787 @@
"""
股票代码管理器
"""
import json
import time
from db import redis_manager as redis_manager
from db.redis_manager import RedisUtils
from utils import tool
import decimal
__redisManager = redis_manager.RedisManager(0)
class CodesNameManager:
    redisManager = redis_manager.RedisManager(0)
    __gp_list_names_first_cache = []
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    @classmethod
    def list_code_name_dict(cls):
        dict_ = {}
        val = cls.list_first_code_name_dict()
        if val is not None:
            for k in val:
                dict_[k] = val[k]
        val = cls.list_second_code_name_dict()
        if val is not None:
            for k in val:
                dict_[k] = val[k]
        return dict_
    @classmethod
    def list_first_code_name_dict(cls):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
        if val is not None:
            val = json.loads(val)
            return val
        return None
    @classmethod
    def list_first_code_name_dict_cache(cls):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
        if val is not None:
            val = json.loads(val)
            return val
        return None
    @classmethod
    def get_first_code_name(cls, code):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
        if not val:
            return None
        val = json.loads(val)
        for k in val:
            if val[k] == code:
                return k
        return None
    @classmethod
    def get_first_name_code(cls, name):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
        if not val:
            return None
        val = json.loads(val)
        return val.get(name)
    @classmethod
    def add_first_code_name(cls, code, name):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names_first")
        if not val:
            return None
        val = json.loads(val)
        val[name] = code
        cls.set_first_code_names(val)
    # 设置首板代码名称
    @classmethod
    def set_first_code_names(cls, datas):
        RedisUtils.set(cls.__get_redis(), "gp_list_names_first", json.dumps(datas))
    # 删除首板代码名称
    @classmethod
    def clear_first_code_names(cls):
        RedisUtils.delete(cls.__get_redis(), "gp_list_names_first")
    @classmethod
    def list_second_code_name_dict(cls):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names")
        if val is not None:
            val = json.loads(val)
            return val
        return None
    @classmethod
    def get_second_code_name(cls, code):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names")
        if not val:
            return None
        val = json.loads(val)
        for k in val:
            if val[k] == code:
                return k
    @classmethod
    def get_second_name_code(cls, name):
        val = RedisUtils.get(cls.__get_redis(), "gp_list_names")
        if not val:
            return None
        val = json.loads(val)
        return val.get(name)
    # 设置二板代码名称
    @classmethod
    def set_second_code_names(cls, datas):
        RedisUtils.set(cls.__get_redis(), "gp_list_names", json.dumps(datas))
    # 设置二板代码名称
    @classmethod
    def clear_second_code_names(cls):
        RedisUtils.delete(cls.__get_redis(), "gp_list_names")
# 首板代码管理
class FirstCodeManager:
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(FirstCodeManager, cls).__new__(cls, *args, **kwargs)
            # 初始化设置
            # 获取交易窗口的锁
            cls.__instance.redisManager = redis_manager.RedisManager(0)
            cls.__instance.__first_code_record_cache = RedisUtils.smembers(cls.__instance.__get_redis(),
                                                                           "first_code_record")
            cls.__instance.__first_code_limited_up_record_cache = RedisUtils.smembers(cls.__instance.__get_redis(),
                                                                                      "first_code_limited_up_record")
        return cls.__instance
    def __get_redis(self):
        return self.redisManager.getRedis()
    # 加入首板历史记录
    def add_record(self, codes):
        hasChanged = False
        for code in codes:
            if code not in self.__first_code_record_cache:
                RedisUtils.sadd(self.__get_redis(), "first_code_record", code)
                hasChanged = True
            self.__first_code_record_cache.add(code)
        if hasChanged:
            RedisUtils.expire(self.__get_redis(), "first_code_record", tool.get_expire())
    def is_in_first_record(self, code):
        if RedisUtils.sismember(self.__get_redis(), "first_code_record", code):
            return True
        else:
            return False
    def is_in_first_record_cache(self, code):
        return code in self.__first_code_record_cache
    # 加入首板涨停过代码集合
    def add_limited_up_record(self, codes):
        hasChanged = False
        for code in codes:
            if code not in self.__first_code_limited_up_record_cache:
                RedisUtils.sadd(self.__get_redis(), "first_code_limited_up_record", code)
                hasChanged = True
            self.__first_code_limited_up_record_cache.add(code)
        if hasChanged:
            RedisUtils.expire(self.__get_redis(), "first_code_limited_up_record", tool.get_expire())
    # 是否涨停过
    def is_limited_up(self, code):
        if RedisUtils.sismember(self.__get_redis(), "first_code_limited_up_record", code):
            return True
        else:
            return False
    def is_limited_up_cache(self, code):
        return code in self.__first_code_limited_up_record_cache
# 想要买的代码
class WantBuyCodesManager:
    __instance = None
    redisManager = redis_manager.RedisManager(0)
    __redis_key = "want_buy_codes"
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(WantBuyCodesManager, cls).__new__(cls, *args, **kwargs)
            # 初始化设置
            # 获取交易窗口的锁
            cls.__instance.__want_buy_codes_cache = RedisUtils.smembers(cls.__get_redis(),
                                                                        cls.__redis_key)
        return cls.__instance
    __want_buy_codes_cache = set()
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    def clear(self):
        RedisUtils.delete(self.__get_redis(), self.__redis_key)
    def add_code(self, code):
        self.__want_buy_codes_cache.add(code)
        RedisUtils.sadd(self.__get_redis(), self.__redis_key, code)
        RedisUtils.expire(self.__get_redis(), self.__redis_key, tool.get_expire())
    def remove_code(self, code):
        self.__want_buy_codes_cache.discard(code)
        RedisUtils.srem(self.__get_redis(), self.__redis_key, code)
    def sync(self):
        codes = self.list_code()
        self.__want_buy_codes_cache.clear()
        if codes:
            self.__want_buy_codes_cache |= set(codes)
    def is_in(self, code):
        return RedisUtils.sismember(self.__get_redis(), self.__redis_key, code)
    def is_in_cache(self, code):
        return code in self.__want_buy_codes_cache
    def list_code(self):
        return RedisUtils.smembers(self.__get_redis(), self.__redis_key)
    def list_code_cache(self):
        return self.__want_buy_codes_cache
# 暂停下单代码管理
# 与黑名单的区别是暂停交易代码只是不交易,不能移除L2监控位
class PauseBuyCodesManager:
    __instance = None
    redisManager = redis_manager.RedisManager(0)
    __redis_key = "pause_buy_codes"
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(PauseBuyCodesManager, cls).__new__(cls, *args, **kwargs)
            cls.__instance.__pause_buy_codes_cache = RedisUtils.smembers(cls.__get_redis(),
                                                                         cls.__redis_key)
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    def clear(self):
        self.__pause_buy_codes_cache.clear()
        RedisUtils.delete(self.__get_redis(), self.__redis_key)
    def sync(self):
        data = RedisUtils.smembers(self.__get_redis(),
                                   self.__redis_key)
        self.__pause_buy_codes_cache.clear()
        if data:
            self.__pause_buy_codes_cache |= data
    def add_code(self, code):
        self.__pause_buy_codes_cache.add(code)
        RedisUtils.sadd(self.__get_redis(), self.__redis_key, code)
        RedisUtils.expire(self.__get_redis(), self.__redis_key, tool.get_expire())
    def remove_code(self, code):
        self.__pause_buy_codes_cache.discard(code)
        RedisUtils.srem(self.__get_redis(), self.__redis_key, code)
    def is_in(self, code):
        return RedisUtils.sismember(self.__get_redis(), self.__redis_key, code)
    def is_in_cache(self, code):
        return code in self.__pause_buy_codes_cache
    def list_code(self):
        return RedisUtils.smembers(self.__get_redis(), self.__redis_key)
    def list_code_cache(self):
        return self.__pause_buy_codes_cache
def __parse_codes_data(code_datas):
    codes = []
    name_codes = {}
    for _data in code_datas:
        # 正常的股票
        if _data["sec_type"] == 1 and _data["sec_level"] == 1:
            code = _data["symbol"].split(".")[1]
            if code.find("30") != 0 and code.find("68") != 0:
                name = _data["sec_name"]
                codes.append(code)
                # 保存代码对应的名称
                name_codes[name] = code
    return codes, name_codes
# -------------------------------二板代码管理---------------------------------
def set_gp_list(code_datas):
    codes, name_codes = __parse_codes_data(code_datas)
    redis_instance = __redisManager.getRedis()
    try:
        # 删除之前的
        RedisUtils.delete(redis_instance, "gp_list", auto_free=False)
        CodesNameManager.clear_second_code_names()
        for d in codes:
            RedisUtils.sadd(redis_instance, "gp_list", d, auto_free=False)
        CodesNameManager.set_second_code_names(name_codes)
    finally:
        RedisUtils.realse(redis_instance)
# 新增代码
def add_gp_list(code_datas):
    if len(code_datas) > 200:
        raise Exception("不能超过200个数据")
    redis_instance = __redisManager.getRedis()
    try:
        codes, name_codes = __parse_codes_data(code_datas)
        for d in codes:
            RedisUtils.sadd(redis_instance, "gp_list", d, auto_free=False)
        old_name_codes = CodesNameManager.list_second_code_name_dict()
        if old_name_codes is None:
            old_name_codes = name_codes
        else:
            for key in name_codes:
                old_name_codes[key] = name_codes[key]
        CodesNameManager.set_second_code_names(old_name_codes)
    finally:
        RedisUtils.realse(redis_instance)
# -------------------------------首板代码管理-------------------------------
class FirstGPCodesManager:
    __db = 0
    __redisManager = redis_manager.RedisManager(0)
    __gp_list_first_codes_cache = set()
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(FirstGPCodesManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            cls.__gp_list_first_codes_cache = RedisUtils.smembers(__redis, "gp_list_first")
        finally:
            RedisUtils.realse(__redis)
    @classmethod
    def __parse_codes_data(cls, code_datas):
        codes = []
        name_codes = {}
        for _data in code_datas:
            # 正常的股票
            if _data["sec_type"] == 1 and _data["sec_level"] == 1:
                code = _data["symbol"].split(".")[1]
                if code.find("30") != 0 and code.find("68") != 0:
                    name = _data["sec_name"]
                    codes.append(code)
                    # 保存代码对应的名称
                    name_codes[name] = code
        return codes, name_codes
    # 添加首板代码
    # code_datas 掘金返回的数据
    def set_first_gp_codes_with_data(self, code_datas):
        redis_instance = self.__get_redis()
        try:
            codes, name_codes = self.__parse_codes_data(code_datas)
            codes_set = set()
            for code in codes:
                codes_set.add(code)
            old_codes_set = self.__gp_list_first_codes_cache
            if old_codes_set is None:
                old_codes_set = set()
            del_set = old_codes_set - codes_set
            add_codes = codes_set - old_codes_set
            for code in add_codes:
                RedisUtils.sadd(redis_instance, "gp_list_first", code, auto_free=False)
            for code in del_set:
                RedisUtils.srem(redis_instance, "gp_list_first", code, auto_free=False)
            if add_codes or del_set:
                RedisUtils.expire(redis_instance, "gp_list_first", tool.get_expire(), auto_free=False)
            # 更新缓存
            self.__gp_list_first_codes_cache.clear()
            self.__gp_list_first_codes_cache |= codes_set
            old_name_codes = CodesNameManager.list_first_code_name_dict()
            if old_name_codes is None:
                old_name_codes = name_codes
            else:
                for key in name_codes:
                    old_name_codes[key] = name_codes[key]
            CodesNameManager.set_first_code_names(old_name_codes)
        finally:
            RedisUtils.realse(redis_instance)
    # 移除首板代码
    def remove_first_gp_code(self, codes):
        redis_instance = self.__get_redis()
        try:
            for code in codes:
                self.__gp_list_first_codes_cache.discard(code)
                RedisUtils.srem(redis_instance, "gp_list_first", code, auto_free=False)
        finally:
            RedisUtils.realse(redis_instance)
    # 获取首板代码
    def get_first_gp_codes(self):
        return RedisUtils.smembers(self.__get_redis(), "gp_list_first")
    def get_first_gp_codes_cache(self):
        return self.__gp_list_first_codes_cache
    # 是否在首板里面
    def is_in_first_gp_codes(self, code):
        return RedisUtils.sismember(self.__get_redis(), "gp_list_first", code)
    # 是否在首板里面
    def is_in_first_gp_codes_cache(self, code):
        return code in self.__gp_list_first_codes_cache
# 获取名称对应的代码
def get_name_code(name):
    code = CodesNameManager.get_second_name_code(name)
    if code is not None:
        return code
    code = CodesNameManager.get_first_name_code(name)
    return code
# 代码名字缓存
__code_name_dict = {}
# 获取代码的名称
def get_code_name(code):
    if code in __code_name_dict:
        return __code_name_dict.get(code)
    name = CodesNameManager.get_second_code_name(code)
    if name is not None:
        __code_name_dict[code] = name
        return name
    name = CodesNameManager.get_first_code_name(code)
    if name:
        __code_name_dict[code] = name
    return name
def get_name_codes():
    return CodesNameManager.list_code_name_dict()
# 涨停数据保存
def set_limit_up_list(gpset):
    if gpset is None:
        return
    # 获取基本信息
    redis_instance = __redisManager.getRedis()
    try:
        # 删除之前的
        RedisUtils.delete(redis_instance, "gp_limit_up_list", auto_free=False)
        for d in gpset:
            RedisUtils.sadd(redis_instance, "gp_limit_up_list", json.dumps(d), auto_free=False)
        RedisUtils.expire(redis_instance, "gp_limit_up_list", tool.get_expire(), auto_free=False)
        RedisUtils.setex(redis_instance, "gp_limit_up_list_update_time", tool.get_expire(), round(time.time() * 1000),
                         auto_free=False)
    finally:
        RedisUtils.realse(redis_instance)
# 获取涨停列表
def get_limit_up_list():
    redis_instance = __redisManager.getRedis()
    try:
        return RedisUtils.get(redis_instance, "gp_limit_up_list_update_time",
                              auto_free=False), RedisUtils.smembers(redis_instance,
                                                                    "gp_limit_up_list", auto_free=False)
    finally:
        RedisUtils.realse(redis_instance)
def rm_gp(code):
    RedisUtils.srem(__redisManager.getRedis(), "gp_list", code)
    FirstGPCodesManager().remove_first_gp_code([code])
def is_in_gp_pool(code):
    return RedisUtils.sismember(__redisManager.getRedis(), "gp_list", code) or FirstGPCodesManager().is_in_first_gp_codes_cache(code)
def get_gp_list():
    codes = RedisUtils.smembers(__redisManager.getRedis(), "gp_list")
    first_codes = FirstGPCodesManager().get_first_gp_codes()
    return set.union(codes, first_codes)
# 获取二板代码
def get_second_gp_list():
    codes = RedisUtils.smembers(__redisManager.getRedis(), "gp_list")
    return codes
def get_gp_list_with_prefix(data=None):
    if data is None:
        data = get_gp_list()
    list = []
    for d in data:
        if d[0:2] == '00':
            list.append("SZSE.{}".format(d))
        elif d[0:2] == '60':
            list.append("SHSE.{}".format(d))
    return list
class CodePrePriceManager:
    __price_pre_cache = {}
    __redisManager = redis_manager.RedisManager(0)
    # 获取收盘价
    @classmethod
    def get_price_pre(cls, code):
        result = RedisUtils.get(cls.__redisManager.getRedis(), "price-pre-{}".format(code))
        if result is not None:
            return float(result)
        return None
    # 获取缓存
    @classmethod
    def get_price_pre_cache(cls, code):
        if code in cls.__price_pre_cache:
            return float(cls.__price_pre_cache[code])
        val = cls.get_price_pre(code)
        if val:
            cls.__price_pre_cache[code] = val
        return val
    # 设置收盘价
    @classmethod
    def set_price_pre(cls, code, price, force=False):
        codes = get_gp_list()
        if code not in codes and not FirstCodeManager().is_in_first_record_cache(code) and not force:
            return
        RedisUtils.setex(cls.__redisManager.getRedis(), "price-pre-{}".format(code), tool.get_expire(), str(price))
        cls.__price_pre_cache[code] = float(price)
__limit_up_price_dict = {}
# 获取涨停价
def get_limit_up_price(code):
    # 读取内存中的值
    if code in __limit_up_price_dict:
        return __limit_up_price_dict[code]
    price = CodePrePriceManager.get_price_pre_cache(code)
    if price is None:
        return None
    limit_up_price = tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("1.1"))
    __limit_up_price_dict[code] = limit_up_price
    return limit_up_price
def get_limit_up_price_by_preprice(price):
    if price is None:
        return None
    return tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("1.1"))
# 获取跌停价
def get_limit_down_price(code):
    price = CodePrePriceManager.get_price_pre_cache(code)
    if price is None:
        return None
    return tool.to_price(decimal.Decimal(str(price)) * decimal.Decimal("0.9"))
# 获取现价
def get_price(code):
    result = RedisUtils.get(__redisManager.getRedis(), "price-{}".format(code))
    if result is not None:
        return float(result)
    return None
__current_price_cache = {}
# 设置现价
def set_price(code, price):
    if code in __current_price_cache and __current_price_cache[code] == price:
        return
    __current_price_cache[code] = price
    RedisUtils.setex(__redisManager.getRedis(), "price-{}".format(code), tool.get_expire(), price)
# datas:[(code,price)]
def set_prices(datas):
    pipe = __redisManager.getRedis().pipeline()
    for d in datas:
        code, price = d[0], d[1]
        if code in __current_price_cache and __current_price_cache[code] == price:
            continue
        __current_price_cache[code] = price
        RedisUtils.setex(pipe, "price-{}".format(code), tool.get_expire(), price)
    pipe.execute()
# 获取正在监听的代码
def get_listen_codes():
    redis_instance = __redisManager.getRedis()
    try:
        keys = RedisUtils.keys(redis_instance, "listen_code-*-*", auto_free=False)
        codes = set()
        for k in keys:
            code = RedisUtils.get(redis_instance, k, auto_free=False)
            if code is not None and len(code) > 0:
                codes.add(code)
        return codes
    finally:
        RedisUtils.realse(redis_instance)
# 根据位置获取正在监听的代码
def get_listen_code_by_pos(client_id, pos):
    key = "listen_code-{}-{}".format(client_id, pos)
    value = RedisUtils.get(__redisManager.getRedis(), key)
    # print("redis:", key,value)
    return value
# 设置位置的监听代码
def set_listen_code_by_pos(client_id, pos, code):
    RedisUtils.setex(__redisManager.getRedis(), "listen_code-{}-{}".format(client_id, pos), tool.get_expire(), code)
    # 同步监听的代码集合
    __sync_listen_codes_pos()
# 同步监听代码位置信息
def __sync_listen_codes_pos():
    redis_instance = __redisManager.getRedis()
    try:
        # 获取已经正在监听的代码
        keys = RedisUtils.keys(redis_instance, "code_listen_pos-*", auto_free=False)
        codes_set = set()
        for key in keys:
            codes_set.add(key.replace("code_listen_pos-", ""))
        keys = RedisUtils.keys(redis_instance, "listen_code-*-*", auto_free=False)
        for key in keys:
            result = RedisUtils.get(redis_instance, key, auto_free=False)
            if result:
                # 移除需要添加的代码
                codes_set.discard(result)
                client_id_, pos_, code_ = int(key.split("-")[1]), int(key.split("-")[2]), result
                key_ = "code_listen_pos-{}".format(code_)
                val = RedisUtils.get(redis_instance, key_, auto_free=False)
                if val is None:
                    RedisUtils.setex(redis_instance, key_, tool.get_expire(), json.dumps((client_id_, pos_)),
                                     auto_free=False)
                else:
                    val = json.loads(val)
                    if val[0] != client_id_ or val[1] != pos_:
                        RedisUtils.setex(redis_instance, key_, tool.get_expire(), json.dumps((client_id_, pos_)),
                                         auto_free=False)
        # 移除没有监听的代码
        for code_ in codes_set:
            RedisUtils.delete(redis_instance, code_, auto_free=False)
    finally:
        RedisUtils.realse(redis_instance)
# 初始化位置
def init_listen_code_by_pos(client_id, pos):
    key = "listen_code-{}-{}".format(client_id, pos)
    RedisUtils.setnx(__redisManager.getRedis(), key, "")
    RedisUtils.expire(__redisManager.getRedis(), key, tool.get_expire())
# 清除所有监听代码
def clear_listen_codes():
    redis_instance = __redisManager.getRedis()
    try:
        keys = RedisUtils.keys(redis_instance, "listen_code-*-*", auto_free=False)
        for key in keys:
            RedisUtils.setex(redis_instance, key, tool.get_expire(), "", auto_free=False)
    finally:
        RedisUtils.realse(redis_instance)
def clear_first_codes():
    redis_instance = __redisManager.getRedis()
    try:
        RedisUtils.delete(redis_instance, "gp_list_first", auto_free=False)
        RedisUtils.delete(redis_instance, "gp_list_names_first", auto_free=False)
        RedisUtils.delete(redis_instance, "first_code_record", auto_free=False)
        RedisUtils.delete(redis_instance, "first_code_limited_up_record", auto_free=False)
    finally:
        RedisUtils.realse(redis_instance)
# 获取正在监听的代码的位置
def get_listen_code_pos(code):
    val = RedisUtils.get(__redisManager.getRedis(), "code_listen_pos-{}".format(code))
    if val is None:
        return None, None
    val = json.loads(val)
    cid, pid = val[0], val[1]
    code_ = get_listen_code_by_pos(cid, pid)
    # 校验代码
    if code_ == code:
        return cid, pid
    else:
        return None, None
# 是否正在监听
def is_listen(code):
    val = RedisUtils.get(__redisManager.getRedis(), "code_listen_pos-{}".format(code))
    if val is None:
        return False
    else:
        return True
    # codes = get_listen_codes()
    # return codes.__contains__(code)
def is_listen_old(code):
    codes = get_listen_codes()
    return codes.__contains__(code)
# 是否正在操作
def is_operate(code):
    return RedisUtils.get(__redisManager.getRedis(), "gp_operate-{}".format(code)) is not None
# 设置正在操作的代码
def set_operate(code):
    RedisUtils.setex(__redisManager.getRedis(), "gp_operate-{}".format(code), 30, "1")
# 批量设置正在操作的代码
def set_operates(codes):
    for code in codes:
        RedisUtils.setex(__redisManager.getRedis(), "gp_operate-{}".format(code), 30, "1")
# 移除正在操作的代码
def rm_operate(code):
    RedisUtils.delete(__redisManager.getRedis(), "gp_operate-{}".format(code))
# 批量移除正在操作的代码
def rm_operates(codes):
    redis_instance = __redisManager.getRedis()
    try:
        for code in codes:
            RedisUtils.delete(redis_instance, "gp_operate-{}".format(code), auto_free=False)
    finally:
        RedisUtils.realse(redis_instance)
constant.py
@@ -1,5 +1,16 @@
import platform
TEST = False
# 买入分数分档
BUY_SCORE_RANK_0 = 150
BUY_SCORE_RANK_1 = 220
BUY_SCORE_RANK_2 = 240
BUY_SCORE_RANK_3 = 260
KPL_INVALID_BLOCKS = {"一季报增长", "二季报增长", "三季报增长", "四季报增长", "业绩增长", "中报增长", "年报增长", "年报预增", "无", "次新股", "ST摘帽", "超跌",
                      "股权转让", "并购重组", "再融资", "年报预增", " 专精特新", "壳资源", "行业龙头", "参股金融", "科创板", "实控人变更"}
def is_windows():
    system = platform.system()
@@ -8,6 +19,7 @@
    return False
CACHE_PATH = f"{'D:' if is_windows() else '/home'}/trade_cache"
# redis设置
REDIS_CONFIG = {
@@ -43,6 +55,7 @@
    "passwd": "Yeshi2016@"
}
# 获取根路径
def get_path_prefix():
    return 'D:' if is_windows() else '/home'
data_server.py
New file
@@ -0,0 +1,537 @@
import http
import json
import socketserver
import time
from http.server import BaseHTTPRequestHandler
import dask
from code_attribute import gpcode_manager
from log_module import log_analyse, log_export
from output import limit_up_data_filter, output_util, code_info_output
from output.limit_up_data_filter import IgnoreCodeManager
from third_data import kpl_util, kpl_data_manager, kpl_api
from third_data.code_plate_key_manager import KPLPlateForbiddenManager
from third_data.kpl_data_manager import KPLLimitUpDataRecordManager, KPLDataManager, KPLCodeLimitUpReasonManager
from third_data.kpl_util import KPLPlatManager, KPLDataType
from trade import trade_manager
from trade.l2_trade_util import BlackListCodeManager
from utils import tool, global_util, kp_client_msg_manager
from utils.history_k_data_util import HistoryKDatasUtils
import urllib.parse as urlparse
from urllib.parse import parse_qs
class DataServer(BaseHTTPRequestHandler):
    ocr_temp_data = {}
    __kplDataManager = KPLDataManager()
    __IgnoreCodeManager = IgnoreCodeManager()
    __KPLPlatManager = KPLPlatManager()
    __KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager()
    __KPLPlateForbiddenManager = KPLPlateForbiddenManager()
    # 历史板块
    __history_plates_dict = {}
    # 板块
    __blocks_dict = {}
    # 精选,行业数据缓存
    __jingxuan_cache_dict = {}
    __industry_cache_dict = {}
    def __get_limit_up_list(self):
        # 统计目前为止的代码涨停数量(分涨停原因)
        total_datas = KPLLimitUpDataRecordManager.total_datas
        if not total_datas:
            KPLLimitUpDataRecordManager.load_total_datas()
            total_datas = KPLLimitUpDataRecordManager.total_datas
        # 通过涨停时间排序
        total_datas = list(total_datas)
        # 统计涨停原因
        limit_up_reason_dict = {}
        for d in total_datas:
            if d[2] not in limit_up_reason_dict:
                limit_up_reason_dict[d[2]] = []
            limit_up_reason_dict[d[2]].append(d)
        for k in limit_up_reason_dict:
            limit_up_reason_dict[k].sort(key=lambda x: int(x[5]))
        # 统计想买单数量
        want_codes = gpcode_manager.WantBuyCodesManager().list_code()
        limit_up_reason_want_count_dict = {}
        for d in total_datas:
            if d[2] not in limit_up_reason_want_count_dict:
                limit_up_reason_want_count_dict[d[2]] = 0
            if d[3] in want_codes:
                limit_up_reason_want_count_dict[d[2]] += 1
        # (板块名称,涨停代码数量,想买单数量,涨停时间)
        limit_up_reason_statistic_info = [
            (k, len(limit_up_reason_dict[k]), limit_up_reason_want_count_dict.get(k), limit_up_reason_dict[k][0][5]) for
            k in
            limit_up_reason_dict]
        limit_up_reason_statistic_info.sort(key=lambda x: int(x[3]))
        codes_set = set([d[3] for d in total_datas])
        # 判断是龙几,判断是否涨停,判断是否炸板,加载分数
        rank_dict = limit_up_data_filter.get_limit_up_time_rank_dict(total_datas)
        limit_up_dict, limit_up_codes, open_limit_up_codes = limit_up_data_filter.get_limit_up_info(codes_set)
        score_dict = {}
        fresult = []
        ignore_codes = self.__IgnoreCodeManager.list_ignore_codes("1")
        total_datas.sort(key=lambda x: int(x[5]))
        total_datas.reverse()
        # 获取涨停原因变化记录
        reason_changes = log_export.load_kpl_reason_changes()
        reason_changes.reverse()
        reason_changes_dict = {}
        for r in reason_changes:
            if r[0] not in reason_changes_dict:
                reason_changes_dict[r[0]] = r[1]
        # 统计最近下单动作反馈
        order_reasons_dict = log_analyse.get_cant_order_reasons_dict()
        kpl_can_buy_reasons_dict = log_analyse.get_kpl_can_buy_reasons_dict()
        for d in total_datas:
            code = d[3]
            # (代码, 名称, 涨停状态(0 - 无状态 1-涨停 2-炸板), 龙几, 首板, 分值, 涨停时间, 原因, 相同原因代码数量, 自由流通, 涨停原因是否变化,涨停原因的流入净额,下单简介)
            limit_up_state = 0
            if code in limit_up_dict:
                if limit_up_dict[code][0]:
                    limit_up_state = 1
                elif limit_up_dict[code][1]:
                    limit_up_state = 2
            score = ""
            if code in score_dict:
                score = score_dict[code]
            if code in ignore_codes:
                continue
            # 涨停原因的净流入金额
            reason = d[2]
            reason_money = ''
            if reason in self.__jingxuan_cache_dict:
                reason_money = output_util.money_desc(self.__jingxuan_cache_dict[reason][3])
            elif reason in self.__industry_cache_dict:
                reason_money = output_util.money_desc(self.__industry_cache_dict[reason][3])
            # 匹配下单反馈
            order_desc = ''
            order_reason = order_reasons_dict.get(code)
            kpl_can_buy_reason = kpl_can_buy_reasons_dict.get(code)
            if order_reason and kpl_can_buy_reason:
                if int(order_reason[0].replace(":", "").replace(".", "")) > int(
                        kpl_can_buy_reason[0].replace(":", "").replace(".", "")):
                    order_desc = f"不:{order_reason[1]}"
                else:
                    order_desc = f"买:{kpl_can_buy_reason[1]}"
            elif order_reason:
                order_desc = f"不:{order_reason[1]}"
            elif kpl_can_buy_reason:
                order_desc = f"买:{kpl_can_buy_reason[1]}"
            fresult.append((code, d[4], limit_up_state, f"龙{rank_dict.get(code)}", d[12], score,
                            output_util.time_format(int(d[5])), d[2], d[10], output_util.money_desc(d[13]),
                            reason_changes_dict.get(code), reason_money, order_desc))
        response_data = json.dumps({"code": 0, "data": {"limit_up_count": len(limit_up_codes),
                                                        "open_limit_up_count": len(open_limit_up_codes),
                                                        "limit_up_reason_statistic": limit_up_reason_statistic_info,
                                                        "limit_up_codes": fresult}})
        return response_data
    def __get_plate_info(self, ps_dict):
        @dask.delayed
        def kpl_getStockIDPlate(code_):
            temp_data = kpl_api.getStockIDPlate(code_)
            return temp_data
        @dask.delayed
        def kpl_getSonPlate(plate_code_):
            if not plate_code:
                return None
            temp_data = kpl_api.getSonPlate(plate_code_)
            return temp_data
        @dask.delayed
        def kpl_getCodesByPlate(plate_code_):
            if not plate_code:
                return None
            temp_data = kpl_api.getCodesByPlate(plate_code_)
            return temp_data
        @dask.delayed
        def request_data(f1_, f2_):
            temp_data = f1_, f2_
            return temp_data
        # 获取板块的代码
        fresult = {}
        code = ps_dict["code"]
        code_info = KPLLimitUpDataRecordManager.list_by_code(code, tool.get_now_date_str())[0]
        hot_block_name = code_info[2]
        plate_code = self.__KPLPlatManager.get_plat(hot_block_name)
        f1 = kpl_getStockIDPlate(code)
        # f2 = kpl_getSonPlate(plate_code)
        f3 = kpl_getCodesByPlate(plate_code)
        dask_result = request_data(f1, f3)
        plate_info, codes_by_plate_info = dask_result.compute()
        if plate_info:
            plate_info.sort(key=lambda x: x[2])
            plate_info.reverse()
            fresult["plate"] = plate_info
        # 获取代码的历史涨停数据,(涨停原因,日期,板块)
        fresult["code_records"] = KPLLimitUpDataRecordManager.get_latest_infos(code, 4, False)[:2]
        # 获取今日数据
        fresult["today"] = (code_info[2], code_info[1], code_info[6])
        fresult["industry"] = global_util.code_industry_map.get(code)
        if plate_code:
            # 获取强度
            # datas = son_plate_info
            # # (代码,名称,强度)
            # temp = kpl_util.parseSonPlat(datas)
            # temp.sort(key=lambda x: x[2])
            # temp.reverse()
            # fresult["plat_strength"] = temp
            # 获取涨停原因下面的列表
            datas = codes_by_plate_info
            # (代码,名称,现价,涨幅,自由流通,几板,龙几,主力净额,300w净额,机构增仓)
            temps = kpl_util.parsePlateCodes(datas)
            # --数据准备开始--
            codes_set = set([d[0] for d in temps])
            limit_up_dict, limit_up_codes, open_limit_up_codes = limit_up_data_filter.get_limit_up_info(codes_set)
            score_dict = {}
            want_codes = gpcode_manager.WantBuyCodesManager().list_code()
            black_codes = BlackListCodeManager().list_codes()
            total_datas = KPLLimitUpDataRecordManager.total_datas
            code_info_dict = {}
            for val in total_datas:
                code_info_dict[val[3]] = val
            # --数据准备结束--
            ignore_codes = self.__IgnoreCodeManager.list_ignore_codes("2")
            # 最终结果:(代码,名称,涨停状态(0-无状态 1-涨停 2-炸板),龙几,首板,分值,涨停时间,原因,相同原因代码数量,自由流通,涨停原因是否变化,涨幅,现价,黑名单,想买单,主力净值,300w,)
            codes_info_list = []
            for t in temps:
                code = t[0]
                limit_up_state = 0
                if code in limit_up_dict:
                    if limit_up_dict[code][0]:
                        limit_up_state = 1
                    elif limit_up_dict[code][1]:
                        limit_up_state = 2
                score = ""
                if code in score_dict:
                    score = score_dict[code]
                limit_up_time = ''
                if code in code_info_dict:
                    limit_up_time = output_util.time_format(code_info_dict[code][5])
                final_code_info = {"code_info": (
                    t[0], t[1], limit_up_state, t[6], t[5], score, limit_up_time,
                    code_info[2], code_info[10], output_util.money_desc(t[4]), 0, t[3], t[2],
                    "黑名单" if code in black_codes else "", "想买单" if code in want_codes else "",
                    output_util.money_desc(t[7]), output_util.money_desc(t[8]), output_util.money_desc(t[9]))}
                if code in code_info_dict:
                    final_code_info["today"] = (
                        code_info_dict[code][2], code_info_dict[code][1], code_info_dict[code][6])
                # 加载历史
                if code in self.__history_plates_dict:
                    final_code_info["code_records"] = self.__history_plates_dict[code][1]
                # 加载板块
                if code in self.__blocks_dict:
                    final_code_info["plate"] = self.__blocks_dict[code][1]
                # 获取二级行业
                final_code_info["industry"] = global_util.code_industry_map.get(code)
                if code not in ignore_codes:
                    codes_info_list.append(final_code_info)
                fresult["code_list_info"] = codes_info_list
        response_data = json.dumps({"code": 0, "data": fresult})
        return response_data
    def do_GET(self):
        path = self.path
        url = urlparse.urlparse(path)
        response_data = ""
        if url.path == "/get_kpl_data":
            best_feng_kou = self.__kplDataManager.get_data(kpl_util.KPLDataType.BEST_FENG_KOU)
            if not best_feng_kou:
                best_feng_kou = []
            best_feng_kou = best_feng_kou[:22]
            feng_kou = self.__kplDataManager.get_data(kpl_util.KPLDataType.FENG_KOU)
            if not feng_kou:
                feng_kou = []
            feng_kou = feng_kou[:22]
            industry_rank = self.__kplDataManager.get_data(kpl_util.KPLDataType.INDUSTRY_RANK)
            if not industry_rank:
                industry_rank = []
            industry_rank = industry_rank[:22]
            feng_xiang = self.__kplDataManager.get_data(kpl_util.KPLDataType.FENG_XIANG)
            if not feng_xiang:
                feng_xiang = []
            feng_xiang = feng_xiang[:22]
            response_data = json.dumps({"code": 0, "data": {"best_feng_kou": best_feng_kou, "feng_kou": feng_kou,
                                                            "industry_rank": industry_rank, "feng_xiang": feng_xiang}})
        elif url.path == "/get_score_info":
            start_time = time.time()
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict['code']
            name = ps_dict.get('name')
            data = code_info_output.get_output_params(code, self.__jingxuan_cache_dict, self.__industry_cache_dict)
            if data["code_name"].find("None") > -1 and name:
                data["code_name"] = f"{name} {code}"
            self.__history_plates_dict[code] = (time.time(), data["kpl_code_info"]["code_records"])
            if "plate" in data["kpl_code_info"]:
                self.__blocks_dict[code] = (time.time(), data["kpl_code_info"]["plate"])
            response_data = json.dumps({"code": 0, "data": data})
            print("get_score_info 耗时:", time.time() - start_time)
            # 获取评分信息
            pass
        elif url.path == "/kpl/get_limit_up_list":
            response_data = self.__get_limit_up_list()
        elif url.path == "/kpl/get_plate_info":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            response_data = self.__get_plate_info(ps_dict)
        elif url.path == "/kpl/get_market_data":
            # 获取板块信息
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            type_ = int(ps_dict['type'])
            result = []
            if type_ == 0:
                # 行业,主力净额倒序
                result = kpl_api.getMarketIndustryRealRankingInfo(True)
                result = kpl_util.parseMarketIndustry(result)
            elif type_ == 1:
                # 行业,主力净额顺序
                result = kpl_api.getMarketIndustryRealRankingInfo(False)
                result = kpl_util.parseMarketIndustry(result)
            elif type_ == 2:
                # 精选,主力净额倒序
                result = kpl_api.getMarketJingXuanRealRankingInfo(True)
                result = kpl_util.parseMarketJingXuan(result)
            elif type_ == 3:
                # 精选,主力净额顺序
                result = kpl_api.getMarketJingXuanRealRankingInfo(False)
                result = kpl_util.parseMarketJingXuan(result)
            forbidden_plates = self.__KPLPlateForbiddenManager.list_all()
            fresult = []
            for d in result:
                if type_ == 2 or type_ == 3:
                    self.__jingxuan_cache_dict[d[1]] = d
                elif type_ == 0 or type_ == 1:
                    self.__industry_cache_dict[d[1]] = d
                d = list(d)
                d.append(1 if d[1] in forbidden_plates else 0)
                fresult.append(d)
            response_data = json.dumps({"code": 0, "data": fresult})
        elif url.path == "/kpl/add_ignore_code":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict['code']
            type_ = ps_dict['type']
            self.__IgnoreCodeManager.ignore_code(type_, code)
            response_data = json.dumps({"code": 0})
        elif url.path == "/kpl/forbidden_plate":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            plate = ps_dict["plate"]
            # 加入禁止
            self.__KPLPlateForbiddenManager.save_plate(plate)
            response_data = json.dumps({"code": 0})
        elif url.path == "/kpl/get_plate_codes":
            # 获取涨停原因下面的代码
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            plate = ps_dict["plate"]
            # 获取板块下的代码
            # 统计目前为止的代码涨停数量(分涨停原因)
            now_limit_up_codes_info = self.__kplDataManager.get_data(KPLDataType.LIMIT_UP)
            now_limit_up_codes = set([d[0] for d in now_limit_up_codes_info])
            # 获取历史涨停
            record_limit_up_datas = KPLLimitUpDataRecordManager.total_datas
            if not record_limit_up_datas:
                KPLLimitUpDataRecordManager.load_total_datas()
                record_limit_up_datas = KPLLimitUpDataRecordManager.total_datas
            codes_info = []
            for d in record_limit_up_datas:
                if d[2] != plate:
                    continue
                # 代码,名称,涨停时间,是否炸板,是否想买,是否已经下过单
                codes_info.append(
                    [d[3], d[4], tool.to_time_str(int(d[5])), 1 if d[3] not in now_limit_up_codes else 0, 0, 0])
            codes_info.sort(key=lambda x: x[2])
            # 查询是否为想买单
            want_codes = gpcode_manager.WantBuyCodesManager().list_code()
            for code_info in codes_info:
                code_info[4] = 1 if code_info[0] in want_codes else 0
                # 获取代码状态
                if trade_manager.CodesTradeStateManager().get_trade_state(code_info[0]) != trade_manager.TRADE_STATE_NOT_TRADE:
                    code_info[5] = 1
            response_data = json.dumps({"code": 0, "data": codes_info})
        elif url.path == "/get_h_cancel_data":
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict["code"]
            if code:
                trade_state = trade_manager.CodesTradeStateManager().get_trade_state(code)
                if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED or trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # TODO 获取H撤数据
                    response_data = json.dumps({"code": 1, "msg": "无H撤数据"})
                else:
                    response_data = json.dumps({"code": 1, "msg": "无H撤数据"})
            else:
                response_data = json.dumps({"code": 1, "msg": "请上传code"})
        elif url.path == "/get_last_trade_day_reasons":
            # 获取上个交易日的相同涨停原因的代码信息
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            code = ps_dict["code"]
            day = HistoryKDatasUtils.get_previous_trading_date(tool.get_now_date_str())
            # 获取涨停数据
            # 获取代码的原因
            reasons = kpl_data_manager.KPLLimitUpDataRecordManager.list_by_code(code, day)
            if reasons:
                reasons = list(reasons)
                reasons.sort(key=lambda x: x[9])
                reason = reasons[-1][2]
                datas = self.__kplDataManager.get_from_file(kpl_util.KPLDataType.LIMIT_UP, day)
                # (代码,名称,首次涨停时间,最近涨停时间,几板,涨停原因,板块,实际流通,主力净额,涨停原因代码,涨停原因代码数量)
                result_list = []
                if datas:
                    for d in datas:
                        if d[5] == reason and d[0] != code:
                            # (代码,名称)
                            result_list.append((d[0], d[1]))
                response_data = json.dumps({"code": 0, "data": {"reason": reason, "data": result_list}})
            else:
                response_data = json.dumps({"code": 1, "msg": "昨日未涨停"})
        elif url.path == "/pull_kp_client_msg":
            # 拉取客户端消息
            ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()])
            client = ps_dict["client"]
            msg = kp_client_msg_manager.read_msg(client)
            if msg:
                response_data = json.dumps({"code": 0, "data": msg})
            else:
                response_data = json.dumps({"code": 1, "msg": "暂无消息"})
        elif url.path == "/list_kp_client_msg":
            msg_list = kp_client_msg_manager.list_msg_from_local()
            msg_list.reverse()
            msg_list = [f"{msg.split('|')[0]}{msg.split('|')[-1].split('-')[1].strip()}" for msg in msg_list]
            response_data = json.dumps({"code": 0, "data": msg_list})
        self.send_response(200)
        # 发给请求客户端的响应数据
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(response_data.encode())
    def do_POST(self):
        path = self.path
        url = urlparse.urlparse(path)
        if url.path == "/upload_kpl_data":
            # 接受开盘啦数据
            params = self.__parse_request()
            result_str = self.__process_kpl_data(params)
            self.__send_response(result_str)
    def __process_kpl_data(self, data):
        type_ = data["type"]
        print("开盘啦type:", type_)
        if type_ == KPLDataType.BIDDING.value:
            pass
        elif type_ == KPLDataType.LIMIT_UP.value:
            result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_LIMIT_UP)
            if result_list:
                # 保存涨停时间
                self.__kplDataManager.save_data(type_, result_list)
                kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), result_list)
        elif type_ == KPLDataType.OPEN_LIMIT_UP.value:
            result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_OPEN_LIMIT_UP)
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
        elif type_ == KPLDataType.LIMIT_DOWN.value:
            result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_LIMIT_DOWN)
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
        elif type_ == KPLDataType.EVER_LIMIT_DOWN.value:
            result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_EVER_LIMIT_DOWN)
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
        elif type_ == KPLDataType.FENG_KOU.value:
            fdata = data["data"]
            result_list = kpl_util.parseFengKou(fdata)
            result_list.sort(key=lambda x: x[3])
            result_list.reverse()
            self.__kplDataManager.save_data(type_, result_list)
        elif type_ == KPLDataType.BEST_FENG_KOU.value:
            result_list = kpl_util.parseBestFengKou(data["data"])
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
            # 保存最强风口
        elif type_ == KPLDataType.FENG_XIANG.value:
            result_list = kpl_util.parseFengXiang(data["data"])
            # 保存风向数据
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
        elif type_ == KPLDataType.INDUSTRY_RANK.value:
            result_list = kpl_util.parseIndustryRank(data["data"])
            # 保存行业数据
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
        elif type_ == KPLDataType.JINGXUAN_RANK.value:
            result_list = kpl_util.parseMarketJingXuan(data["data"])
            # 保存精选数据
            if result_list:
                self.__kplDataManager.save_data(type_, result_list)
        return json.dumps({"code": 0})
    def __send_response(self, data):
        # 发给请求客户端的响应数据
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        self.wfile.write(data.encode())
    def __parse_request(self):
        params = {}
        datas = self.rfile.read(int(self.headers['content-length']))
        _str = str(datas, encoding="gbk")
        # print(_str)
        params = json.loads(_str)
        return params
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
    pass
def run(addr, port):
    # 运行看盘消息采集
    # kp_client_msg_manager.run_capture()
    kpl_data_manager.run_pull_task()
    handler = DataServer
    # httpd = socketserver.TCPServer((addr, port), handler)
    httpd = ThreadedHTTPServer((addr, port), handler)
    print("DataServer is at: http://%s:%d/" % (addr, port))
    httpd.serve_forever()
if __name__ == "__main__":
    run("0.0.0.0", 9004)
log_module/log_analyse.py
New file
@@ -0,0 +1,46 @@
"""
日志分析
"""
# 获取不可以下单的原因
import os
import constant
from utils import tool
def get_cant_order_reasons_dict():
    file_path = "{}/logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), tool.get_now_date_str())
    dict_ = {}
    if os.path.exists(file_path):
        with open(file_path, encoding="utf-8") as f:
            line = f.readline()
            while line:
                if line.find("不可以下单,原因:") > -1:
                    code = line.split("code=")[1][:6]
                    time_ = line.split("|")[0].split(" ")[1][:12]
                    reason = line.split("不可以下单,原因:")[1].strip()
                    dict_[code] = (time_, reason)
                    # print(time_, code, reason)
                line = f.readline()
    return dict_
def get_kpl_can_buy_reasons_dict():
    file_path = "{}/logs/gp/kpl/kpl_block_can_buy.{}.log".format(constant.get_path_prefix(), tool.get_now_date_str())
    dict_ = {}
    if os.path.exists(file_path):
        with open(file_path, encoding="utf-8") as f:
            line = f.readline()
            while line:
                if True:
                    code = line.split("code=")[1][:6]
                    time_ = line.split("|")[0].split(" ")[1][:12]
                    reason = line.split(f"code={code}:")[1].strip()
                    dict_[code] = (time_, reason.replace("可以下单", ""))
                    # print(time_, code, reason)
                line = f.readline()
    return dict_
if __name__ == "__main__":
    print(get_kpl_can_buy_reasons_dict())
log_module/log_export.py
New file
@@ -0,0 +1,307 @@
import datetime
import os
import shutil
import constant
from code_attribute import gpcode_manager
from utils import tool
class LogUtil:
    @classmethod
    def extract_log_from_key(cls, key, path, target_path):
        fw = open(target_path, mode='w', encoding="utf-8")
        try:
            with open(path, 'r', encoding="utf-8") as f:
                lines = f.readlines()
                for line in lines:
                    if line.find("{}".format(key)) > 0:
                        fw.write(line)
        finally:
            fw.close()
# 导出数据处理位置日志
def __export_l2_pos_range(code, date, dir):
    LogUtil.extract_log_from_key("{} 处理数据范围".format(code),
                                 "{}/logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_process_{}.log".format(dir, date))
# 导出交易日志
def __export_l2_trade_log(code, date, dir):
    LogUtil.extract_log_from_key(code, "{}/logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_trade_{}.log".format(dir, date))
# 导出交易取消日志
def __export_l2_trade_cancel_log(code, date, dir):
    LogUtil.extract_log_from_key(code, "{}/logs/gp/l2/l2_trade_cancel.{}.log".format(constant.get_path_prefix(), date),
                                 "{}/l2_trade_cancel_{}.log".format(dir, date))
def __analyse_pricess_time():
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    file_path = f"{constant.get_path_prefix()}/logs/gp/l2/l2_process.{date}.log"
    with open(file_path, encoding="utf-8") as f:
        line = f.readline()
        while line:
            time_ = line.split(":")[-1]
            if int(time_) > 150:
                print(line)
            line = f.readline()
def export_l2_log(code):
    if len(code) < 6:
        return
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    dir_ = "{}/logs/gp/l2/{}".format(constant.get_path_prefix(), code)
    if not os.path.exists(dir_):
        os.mkdir(dir_)
    __export_l2_pos_range(code, date, dir_)
    __export_l2_trade_cancel_log(code, date, dir_)
    __export_l2_trade_log(code, date, dir_)
def compute_buy1_real_time(time_):
    ts = time_.split(":")
    s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    cha = (s - 2) % 3
    return tool.time_seconds_format(s - 2 - cha)
def load_l2_from_log(date=None):
    today_data = {}
    if date is None:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    try:
        with open("{}/logs/gp/l2/l2_data.{}.log".format(constant.get_path_prefix(), date), mode='r') as f:
            while True:
                data = f.readline()
                if not data:
                    break
                index = data.find('save_l2_data:')
                index = data.find('-', index)
                data = data[index + 1:].strip()
                code = data[0:6]
                data = data[7:]
                dict_ = eval(data)
                if code not in today_data:
                    today_data[code] = dict_
                else:
                    today_data[code].extend(dict_)
        for key in today_data:
            news = sorted(today_data[key], key=lambda x: x["index"])
            today_data[key] = news
            print(key, len(today_data[key]) - 1, today_data[key][-1]["index"])
    except:
        pass
    return today_data
# 获取日志时间
def __get_log_time(line):
    time_ = line.split("|")[0].split(" ")[1].split(".")[0]
    return time_
# 获取L2每次批量处理数据的位置范围
def get_l2_process_position(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    pos_list = []
    with open("{}/logs/gp/l2/l2_process.{}.log".format(constant.get_path_prefix(), date), mode='r',
              encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            if line.find("code:{}".format(code)) < 0:
                continue
            time_ = __get_log_time(line)
            line = line[line.find("处理数据范围") + len("处理数据范围") + 1:line.find("处理时间")].strip()
            if len(pos_list) == 0 or pos_list[-1][1] < int(line.split("-")[0]):
                if int("093000") <= int(time_.replace(":", "")) <= int("150000"):
                    pos_list.append((int(line.split("-")[0]), int(line.split("-")[1])))
    return pos_list
# 获取L2每次批量处理数据的位置范围
def get_l2_trade_position(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    pos_list = []
    with open("{}/logs/gp/l2/l2_trade.{}.log".format(constant.get_path_prefix(), date), mode='r',
              encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            if line.find("code={}".format(code)) < 0:
                continue
            print(line)
            time_ = __get_log_time(line)
            if int("093000") > int(time_.replace(":", "")) or int(time_.replace(":", "")) > int("150000"):
                continue
            if line.find("获取到买入信号起始点") > 0:
                str_ = line.split("获取到买入信号起始点:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("信号起始位置:", index)
                pos_list.append((0, int(index), ""))
            elif line.find("获取到买入执行位置") > 0:
                str_ = line.split("获取到买入执行位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("买入执行位置:", index)
                pos_list.append((1, int(index), ""))
            elif line.find("触发撤单") > 0:
                str_ = line.split("触发撤单,撤单位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("撤单位置:", index)
                pos_list.append((2, int(index), line.split("撤单原因:")[1]))
                pass
            else:
                continue
    return pos_list
# 获取交易进度
def get_trade_progress(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    index_list = []
    buy_queues = []
    with open("{}/logs/gp/l2/l2_trade_buy_queue.{}.log".format(constant.get_path_prefix(), date), mode='r',
              encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            time_ = __get_log_time(line).strip()
            if int(time_.replace(":", "")) > int("150000"):
                continue
            if line.find(f"{code}-[") >= 0:
                buy_queues.append((eval(line.split(f"{code}-")[1]), time_))
            if line.find("获取成交位置成功: code-{}".format(code)) < 0:
                continue
            try:
                index = int(line.split("index-")[1].split(" ")[0])
                index_list.append((index, time_))
            except:
                pass
    return index_list, buy_queues
# 获取H级撤单计算结果
def get_h_cancel_compute_info(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    path_str = f"{constant.get_path_prefix()}/logs/gp/l2/cancel/h_cancel.{date}.log"
    latest_info = None
    if os.path.exists(path_str):
        with open(path_str, mode='r', encoding="utf-8") as f:
            while True:
                line = f.readline()
                if not line:
                    break
                if line.find(f"code-{code}") < 0:
                    continue
                if line.find(f"H级撤单计算结果") < 0:
                    continue
                target_rate = line.split("目标比例:")[1].split(" ")[0].strip()
                cancel_num = line.split("取消计算结果")[1][1:].split("/")[0].strip()
                total_num = line.split("取消计算结果")[1][1:].split("/")[1].split(" ")[0].strip()
                latest_info = (target_rate, round(int(cancel_num) / int(total_num), 2), cancel_num, total_num,)
    return latest_info
# 读取看盘消息
def get_kp_msg_list(date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    path_str = f"{constant.get_path_prefix()}/logs/gp/kp/kp_msg.{date}.log"
    msg_list = []
    if os.path.exists(path_str):
        with open(path_str, mode='r', encoding="utf-8") as f:
            while True:
                line = f.readline()
                if not line:
                    break
                msg_list.append(line)
    return msg_list
def export_logs(code):
    code_name = gpcode_manager.get_code_name(code)
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    target_dir = f"{constant.get_path_prefix()}/logs/gp/l2/export/{code}_{code_name}_{date}"
    if os.path.exists(target_dir):
        shutil.rmtree(target_dir)
    os.makedirs(target_dir)
    log_names = ["l2_process", "l2_trade", "l2_trade_cancel", "l2_process_time", "l2_trade_buy",
                 "l2_trade_buy_progress", "cancel/h_cancel"]
    # 导出交易日志
    for log_name in log_names:
        key = f"code={code}"
        if log_name == "l2_process" or log_name == "l2_process_time" or log_name == "cancel/h_cancel" or log_name == "l2_trade_buy_progress":
            key = code
        target_path = f"{target_dir}/{log_name}.{code}_{code_name}.{date}.log"
        # 创建文件夹
        dir_path = "/".join(target_path.split("/")[:-1])
        if not os.path.exists(dir_path):
            os.makedirs(dir_path)
        LogUtil.extract_log_from_key(key, f"{constant.get_path_prefix()}/logs/gp/l2/{log_name}.{date}.log",
                                     target_path)
def export_trade_progress(code):
    path = f"{constant.get_path_prefix()}/logs/gp/l2/l2_trade_buy_progress.{tool.get_now_date_str()}.log"
    index_set = set()
    with open(path, mode='r', encoding="utf-8") as f:
        lines = f.readlines()
        for line in lines:
            if line.find(f"code-{code}") > -1 and line.find("确定交易进度成功") > -1:
                index = line.split("index-")[1].split(" ")[0]
                index_set.add(int(index))
    results = list(index_set)
    results.sort()
    return results
# 加载买入得分记录
def load_buy_score_recod(code):
    path = f"{constant.get_path_prefix()}/logs/gp/trade/trade_record.{tool.get_now_date_str()}.log"
    fdatas = []
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                data_index = line.find(f"code={code}")
                if data_index > 0:
                    time_str = line[11:19]
                    data = line[line.find("data=") + 5:]
                    type = line[line.find("type=") + 5:line.find(" ", line.find("type="))]
                    fdatas.append((time_str, type, eval("{" + data + "}")))
    return fdatas
def load_kpl_reason_changes():
    path = f"{constant.get_path_prefix()}/logs/gp/kpl/kpl_limit_up_reason_change.{tool.get_now_date_str()}.log"
    fdatas = []
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                data = line[line.find("code-") + 5:]
                code = data.split(":")[0]
                from_r = data.split(":")[1].split("-")[0]
                to_r = eval(data.split(":")[1].split("-")[1])
                fdatas.append((code, from_r, to_r))
    return fdatas
main.py
@@ -1,9 +1,12 @@
import threading
import data_server
import middle_api_server
import middle_server
if __name__ == "__main__":
    t1 = threading.Thread(target=lambda: middle_api_server.run(), daemon=True)
    t1.start()
    t1 = threading.Thread(target=lambda: data_server.run("0.0.0.0", 9004), daemon=True)
    t1.start()
    middle_server.run()
middle_api_server.py
@@ -6,6 +6,7 @@
import threading
import time
import socket_manager
import trade_manager
from db import mysql_data, redis_manager
from db.redis_manager import RedisUtils
@@ -238,6 +239,9 @@
                                {"code": 0, "data": times, "msg": ""})
                        finally:
                            redis.close()
                    elif type_ == "trade_server_channels":
                        channels = socket_manager.ClientSocketManager.list_client()
                        return_str = json.dumps({"code": 0, "data": channels})
                break
                # sk.close()
@@ -395,6 +399,7 @@
def run():
    print("create middle_api_server")
    laddr = "0.0.0.0", 10009
    print("middle_api_server is at: http://%s:%d/" % (laddr))
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
    tcpserver.serve_forever()
middle_server.py
@@ -13,6 +13,7 @@
import socket_manager
from db import mysql_data
from db.redis_manager import RedisUtils, RedisManager
from log import logger_debug
from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util
from utils.juejin_util import JueJinHttpApi
@@ -93,7 +94,6 @@
                            {"code": 100, "msg": f"JSON解析失败"}).encode(
                            encoding='utf-8')))
                        continue
                    print(data_json["type"])
                    if data_json["type"] == 'register':
                        client_type = data_json["data"]["client_type"]
                        rid = data_json["rid"]
@@ -109,9 +109,10 @@
                                        # 记录活跃客户端
                                        socket_manager.ClientSocketManager.heart(resultJSON['client_id'])
                                except json.decoder.JSONDecodeError as e:
                                    print("JSON解析出错", result, header)
                                    if not result:
                                        sk.close()
                                    print("JSON解析出错", result, header)
                                        break
                                time.sleep(1)
                        except ConnectionResetError as ee:
                            socket_manager.ClientSocketManager.del_client(rid)
@@ -153,7 +154,7 @@
                                redis = RedisManager(db).getRedis()
                                method = getattr(RedisUtils, cmd)
                                args_ = [redis, key]
                                if args:
                                if args is not None:
                                    if type(args) == tuple or type(args) == list:
                                        args = list(args)
                                        for a in args:
@@ -162,9 +163,13 @@
                                        args_.append(args)
                                args_ = tuple(args_)
                                result = method(*args_)
                                if type(result) == set:
                                    result = list(result)
                                result_str = json.dumps({"code": 0, "data": result})
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        except Exception as e:
                            logger_debug.exception(e)
                            logger_debug.info(f"Redis操作出错:data_json:{data_json}")
                            logging.exception(e)
                            sk.sendall(socket_util.load_header(
                                json.dumps({"code": 1, "msg": str(e)}).encode(encoding='utf-8')))
@@ -185,7 +190,7 @@
                                    args_.append(args)
                            args_ = tuple(args_)
                            result = method(*args_)
                            result_str = json.dumps({"code": 0, "data": result})
                            result_str = json.dumps({"code": 0, "data": result}, default=str)
                            sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        except Exception as e:
                            logging.exception(e)
@@ -228,6 +233,7 @@
                        result_str = json.dumps({"code": 0, "data": {}})
                        sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8')))
                        pass
                else:
                    # 断开连接
                    break
@@ -269,6 +275,7 @@
    t1.start()
    laddr = "0.0.0.0", 10008
    print("MiddleServer is at: http://%s:%d/" % (laddr))
    tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle)  # 注意:参数是MyBaseRequestHandle
    tcpserver.serve_forever()
output/code_info_output.py
New file
@@ -0,0 +1,137 @@
"""
代码信息对外输出
"""
# score_info 得分信息
# 下单参数信息
# 选股宝
# 市场热度
import sys
import time
import code_attribute
from code_attribute import gpcode_manager, code_price_manager
from log_module import log_export
from third_data import kpl_data_manager, kpl_api
from third_data.kpl_data_manager import KPLLimitUpDataRecordManager
from trade import l2_trade_util
from utils import global_util
base_output_content = {}
kpl_block_info_dict = {}
__kplDataManager = kpl_data_manager.KPLDataManager()
def __get_base_html_content():
    print("路径", sys.path[0])
    if base_output_content.get('css') is None:
        __base_html_content = ""
        with open("./output/css/style.css", mode='r') as f:
            lines = f.readlines()
            for line in lines:
                __base_html_content += line
        base_output_content['css'] = __base_html_content
    return f"<head><style>{base_output_content['css']}</style></head>"
def money_desc(money):
    if abs(money) > 100000000:
        return f"{round(money / 100000000, 2)}亿"
    else:
        return f"{round(money / 10000, 2)}万"
def get_output_params(code, jingxuan_cache_dict, industry_cache_dict):
    __start_time = time.time()
    def format_plate_output(_plat):
        if _plat in jingxuan_cache_dict:
            return _plat, money_desc(jingxuan_cache_dict[_plat][3])
        elif _plat in industry_cache_dict:
            return _plat, money_desc(industry_cache_dict[_plat][3])
        else:
            return _plat, ''
    params = {
        "base_url": "http://192.168.3.252/kp/",
    }
    code_extra_infos = []
    if l2_trade_util.BlackListCodeManager().is_in(code):
        code_extra_infos.append("黑名单")
    if l2_trade_util.WhiteListCodeManager().is_in(code):
        code_extra_infos.append("白名单")
    # 获取白名单,黑名单
    if code_attribute.gpcode_manager.WantBuyCodesManager().is_in_cache(code):
        code_extra_infos.append("想买单")
    if code_attribute.gpcode_manager.PauseBuyCodesManager().is_in_cache(code):
        code_extra_infos.append("暂不买")
    params["code"] = code
    params["code_name"] = f"{gpcode_manager.get_code_name(code)} {code}  ({','.join(code_extra_infos)})"
    score_info = None
    buy_params_info = None
    ##############################主动买,被动买##################################
    # 返回主动买,被动买,不买的列表(代码, 名称, 得分, 是否涨停)
    params["initiative_buy_codes"] = []
    params["passive_buy_codes"] = []
    params["passive_buy_codes"] = params["passive_buy_codes"]
    __start_time = time.time()
    trade_info = __load_trade_record(code)
    params["trade_record"] = {"open_limit_up": trade_info[0], "records": trade_info[2]}
    __start_time = time.time()
    ##############################开盘啦相关信息##################################
    industry = global_util.code_industry_map.get(code)
    params["kpl_code_info"] = {"industry": format_plate_output(industry)}
    # 获取开盘啦板块
    if code not in kpl_block_info_dict:
        plate_info = kpl_api.getStockIDPlate(code)
    else:
        plate_info = kpl_block_info_dict.get(code)
    if plate_info:
        kpl_block_info_dict[code] = plate_info
        plate_info.sort(key=lambda x: x[2])
        plate_info.reverse()
        params["kpl_code_info"]["plate"] = [(k[0], k[1], k[2], format_plate_output(k[1])[1]) for k in plate_info]
    __start_time = time.time()
    # 获取代码的历史涨停数据,(涨停原因,日期,板块)
    code_records = KPLLimitUpDataRecordManager.get_latest_infos(code, 4, False)[:2]
    if code_records:
        code_records = [(format_plate_output(k[0]), k[1], [format_plate_output(k1) for k1 in k[2].split("、")]) for k in
                        code_records]
    params["kpl_code_info"]["code_records"] = code_records
    if not KPLLimitUpDataRecordManager.total_datas:
        KPLLimitUpDataRecordManager.load_total_datas()
    for d in KPLLimitUpDataRecordManager.total_datas:
        if d[3] == code:
            # 获取今日
            plates = d[6].split("、")
            plates = [format_plate_output(p) for p in plates]
            params["kpl_code_info"]["today"] = (format_plate_output(d[2]), d[1], plates)
            break
    __start_time = time.time()
    return params
def __load_trade_record(code):
    def format_l2_data(item):
        return f"{item['val']['time']}#{item['val']['num']}手#{round(item['val']['num'] * float(item['val']['price']) * 100 / 10000, 1)}万"
    # 获取炸板信息
    limit_up_info = code_price_manager.Buy1PriceManager().get_limit_up_info(code)
    break_time = limit_up_info[1]
    records = []
    try:
        records = log_export.load_buy_score_recod(code)
    except:
        pass
    records_new = []
    records_new_data = []
    return break_time, records_new, records_new_data
output/limit_up_data_filter.py
New file
@@ -0,0 +1,62 @@
"""
涨停数据过滤器
"""
# 判断是龙几,判断是否涨停,判断是否炸板,加载分数
from db.redis_manager import RedisUtils
from utils import tool
from db import redis_manager as redis_manager
from third_data.kpl_data_manager import KPLDataManager
from third_data.kpl_util import KPLDataType
__kplDataManager = KPLDataManager()
# 忽略代码管理器
class IgnoreCodeManager:
    __redisManager = redis_manager.RedisManager(3)
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def ignore_code(self, type, code):
        RedisUtils.sadd(self.__get_redis(), f"kp_ignore_codes_{type}", code)
        RedisUtils.expire(self.__get_redis(), f"kp_ignore_codes_{type}", tool.get_expire())
    def list_ignore_codes(self, type):
        return RedisUtils.smembers(self.__get_redis(), f"kp_ignore_codes_{type}")
# 获取涨停顺序(按涨停原因分组)
def get_limit_up_time_rank_dict(datas):
    datas.sort(key=lambda x: int(x[5]))
    max_record = {}
    rank_dict = {}
    for d in datas:
        if d[2] not in max_record:
            max_record[d[2]] = 0
        max_record[d[2]] = max_record[d[2]] + 1
        rank_dict[d[3]] = max_record[d[2]]
    return rank_dict
# 获取涨停信息
def get_limit_up_info(codes):
    limit_up_data = __kplDataManager.get_data(KPLDataType.LIMIT_UP)
    limit_up_codes = []
    if limit_up_data:
        limit_up_codes = set([val[0] for val in limit_up_data])
    open_limit_up_data = __kplDataManager.get_data(KPLDataType.OPEN_LIMIT_UP)
    open_limit_up_codes = set()
    if open_limit_up_data:
        open_limit_up_codes = set([val[0] for val in open_limit_up_data])
    dict_ = {}
    for code in codes:
        dict_[code] = (code in limit_up_codes, code in open_limit_up_codes)
    return dict_, limit_up_codes, open_limit_up_codes
if __name__ == "__main__":
    get_limit_up_info()
output/output_util.py
New file
@@ -0,0 +1,14 @@
import time
def money_desc(money):
    if abs(money) > 100000000:
        return f"{round(money / 100000000, 2)}亿"
    else:
        return f"{round(money / 10000, 2)}万"
def time_format(timestamp):
    if timestamp:
        return time.strftime("%H:%M:%S", time.localtime(int(timestamp)))
    return ""
socket_manager.py
@@ -1,6 +1,8 @@
import threading
import time
from utils import tool
class ClientSocketManager:
    # 客户端类型
@@ -27,7 +29,8 @@
        if _type == cls.CLIENT_TYPE_TRADE:
            if _type in cls.socket_client_dict:
                # 根据排序活跃时间排序
                client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[0] in cls.active_client_dict else 0,
                client_list = sorted(cls.socket_client_dict[_type], key=lambda x: cls.active_client_dict.get(x[0]) if x[
                                                                                                                          0] in cls.active_client_dict else 0,
                                     reverse=True)
                for d in client_list:
                    if d[0] in cls.socket_client_lock_dict:
@@ -64,11 +67,19 @@
                for d in cls.socket_client_dict[t]:
                    if d[0] == rid:
                        cls.socket_client_dict[t].remove(d)
                        try:
                            d[1].close()
                        except:
                            pass
                        break
            elif type(cls.socket_client_dict[t]) == tuple:
                if cls.socket_client_dict[t][0] == rid:
                    cls.socket_client_dict.pop(t)
                    try:
                        t[1].close()
                    except:
                        pass
                    break
    # 心跳信息
@@ -82,4 +93,21 @@
        for k in cls.active_client_dict.keys():
            if time.time() - cls.active_client_dict[k] > 20:
                # 心跳时间间隔20s以上视为无效
                cls.del_client(k)
                cls.del_client(k)
    @classmethod
    def list_client(cls):
        _type = cls.CLIENT_TYPE_TRADE
        client_list = sorted(cls.socket_client_dict[_type],
                             key=lambda x: cls.active_client_dict.get(x[0]) if x[0] in cls.active_client_dict else 0,
                             reverse=True)
        fdata = []
        for client in client_list:
            active_time = cls.active_client_dict.get(client[0])
            if active_time is None:
                active_time = 0
            active_time = tool.to_time_str(int(active_time))
            fdata.append(
                (client[0], cls.socket_client_lock_dict[client[0]].locked(),active_time))
        return fdata
third_data/code_plate_key_manager.py
New file
@@ -0,0 +1,614 @@
"""
代码行业关键词管理
"""
# 涨停代码关键词板块管理
import json
import constant
from db.redis_manager import RedisUtils
from third_data import kpl_block_util
from utils import global_util, tool
from db import redis_manager as redis_manager
from third_data.kpl_util import KPLPlatManager
from trade import trade_manager
# 代码精选板块管理
class KPLCodeJXBlockManager:
    __redisManager = redis_manager.RedisManager(3)
    __code_blocks = {}
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def save_jx_blocks(self, code, blocks):
        if blocks is None:
            return
        # 保存前2条数据
        RedisUtils.setex(self.__get_redis(), f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(blocks))
        self.__code_blocks[code] = blocks
    # 获取精选板块
    def get_jx_blocks(self, code):
        if code in self.__code_blocks:
            return self.__code_blocks[code]
        val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks-{code}")
        if val is None:
            return None
        else:
            val = json.loads(val)
            self.__code_blocks[code] = val
        return self.__code_blocks[code]
# 开盘啦禁止交易板块管理
class KPLPlateForbiddenManager:
    __redisManager = redis_manager.RedisManager(3)
    __kpl_forbidden_plates_cache = set()
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(KPLPlateForbiddenManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            __kpl_forbidden_plates_cache = RedisUtils.smembers(__redis, "kpl_forbidden_plates")
        finally:
            RedisUtils.realse(__redis)
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    def save_plate(self, plate):
        self.__kpl_forbidden_plates_cache.add(plate)
        RedisUtils.sadd(self.__get_redis(), "kpl_forbidden_plates", plate)
        RedisUtils.expire(self.__get_redis(), "kpl_forbidden_plates", tool.get_expire())
    def list_all(self):
        return RedisUtils.smembers(self.__get_redis(), "kpl_forbidden_plates")
    def list_all_cache(self):
        return self.__kpl_forbidden_plates_cache
class LimitUpCodesPlateKeyManager:
    # 今日涨停原因
    today_limit_up_reason_dict = {}
    today_total_limit_up_reason_dict = {}
    total_code_keys_dict = {}
    total_key_codes_dict = {}
    __redisManager = redis_manager.RedisManager(1)
    def __get_redis(self):
        return self.__redisManager.getRedis()
    # 获取今日涨停数据,格式:[(代码,涨停原因)]
    def set_today_limit_up(self, datas):
        temp_dict = {}
        if datas:
            for item in datas:
                temp_dict[item[0]] = item[1]
            self.today_limit_up_reason_dict = temp_dict
        if datas:
            for item in datas:
                self.__set_total_keys(item[0])
        self.set_today_total_limit_up(datas)
    # 设置今日历史涨停数据
    def set_today_total_limit_up(self, datas):
        for item in datas:
            code = item[0]
            self.today_total_limit_up_reason_dict[code] = item[1]
    # 今日涨停原因变化
    def set_today_limit_up_reason_change(self, code, from_reason, to_reason):
        RedisUtils.sadd(self.__get_redis(), f"kpl_limit_up_reason_his-{code}", from_reason)
        RedisUtils.expire(self.__get_redis(), f"kpl_limit_up_reason_his-{code}", tool.get_expire())
        self.__set_total_keys(code)
    # 设置代码的今日涨停原因
    def __set_total_keys(self, code):
        keys = set()
        # keys_his = self.__get_redis().smembers(f"kpl_limit_up_reason_his-{code}")
        # keys |= keys_his
        if code in self.today_limit_up_reason_dict:
            if self.today_limit_up_reason_dict.get(code) not in constant.KPL_INVALID_BLOCKS:
                keys.add(self.today_limit_up_reason_dict.get(code))
        self.total_code_keys_dict[code] = keys
        for k in keys:
            if k not in self.total_key_codes_dict:
                self.total_key_codes_dict[k] = set()
            self.total_key_codes_dict[k].add(code)
    # 根据传入的关键词与涨停代码信息匹配身位
    def get_codes_by_key_without_mine(self, key, code):
        # 只比较今日涨停原因
        codes_set = set()
        if key in self.total_key_codes_dict:
            codes_set |= self.total_key_codes_dict[key]
        codes_set.discard(code)
        return codes_set
    # 涨停原因匹配关键字(和涨停列表中的涨停原因做对比),返回:{关键词:代码集合}
    def match_limit_up_reason_keys(self, code, keys):
        fresult = {}
        for k in keys:
            if k in self.total_key_codes_dict:
                codes = set(self.total_key_codes_dict[k])
                codes.discard(code)
                if codes:
                    fresult[k] = codes
        return fresult
# 实时开盘啦市场数据
class RealTimeKplMarketData:
    # 精选前5
    top_5_reason_list = []
    # 行业前5
    top_5_industry_list = []
    #
    top_5_key_dict = {}
    total_reason_dict = {}
    total_industry_dict = {}
    __KPLPlateForbiddenManager = KPLPlateForbiddenManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __KPLPlatManager = KPLPlatManager()
    @classmethod
    def set_top_5_reasons(cls, datas):
        temp_list = []
        for d in datas:
            cls.total_reason_dict[d[1]] = d
        # 排序
        for i in range(0, len(datas)):
            if datas[i][1] not in constant.KPL_INVALID_BLOCKS:
                # (名称,净流入金额,排名)
                temp_list.append((datas[i][1], datas[i][3], len(temp_list)))
                # 只获取前10个
                if len(temp_list) > 10:
                    break
                if datas[i][3] < 3 * 10000 * 10000:
                    break
        for temp in temp_list:
            names = cls.__KPLPlatManager.get_same_plat_names_by_id(temp[0])
            for name in names:
                if name == temp[1]:
                    continue
                temp_list.append((name, temp[1], temp[2]))
        cls.top_5_reason_list = temp_list
        cls.__reset_top_5_dict()
    @classmethod
    def set_top_5_industry(cls, datas):
        for d in datas:
            cls.total_industry_dict[d[1]] = d
        temp_list = []
        for i in range(0, len(datas)):
            if datas[i][1] in constant.KPL_INVALID_BLOCKS:
                continue
            temp_list.append((datas[i][1], datas[i][2], len(temp_list)))
            if len(temp_list) > 10:
                break
            if datas[i][2] < 3 * 10000 * 10000:
                break
        cls.top_5_industry_list = temp_list
        cls.__reset_top_5_dict()
    @classmethod
    def __reset_top_5_dict(cls):
        temp_dict = {}
        for t in cls.top_5_industry_list:
            temp_dict[t[0]] = t
        for t in cls.top_5_reason_list:
            temp_dict[t[0]] = t
        cls.top_5_key_dict = temp_dict
    # 获取能够买的行业关键字set
    @classmethod
    def get_can_buy_key_set(cls):
        temp_set = cls.top_5_key_dict.keys()
        return temp_set
    # 通过关键字判断能买的代码数量
    @classmethod
    def get_can_buy_codes_count(cls, code, key):
        # 判断行业涨停票数量,除开自己必须大于1个
        temp_codes = LimitUpCodesPlateKeyManager.total_key_codes_dict.get(key)
        if temp_codes is None:
            temp_codes = set()
        else:
            temp_codes = set(temp_codes)
        temp_codes.discard(code)
        if len(temp_codes) < 1:
            # 后排才能挂单
            return 0, "身位不为后排"
        forbidden_plates = cls.__KPLPlateForbiddenManager.list_all_cache()
        if key in forbidden_plates:
            return 0, "不买该板块"
        # 10:30以前可以挂2个单
        if int(tool.get_now_time_str().replace(':', '')) < int("100000"):
            return 2, "10:00以前可以挂2个单"
        # 10:30以后
        if key not in cls.top_5_key_dict:
            return 0, "净流入没在前5"
        if cls.top_5_key_dict[key][1] > 3 * 10000 * 10000:
            return 2, "净流入在前5且大于3亿"
        else:
            return 1, "净流入在前5"
    @classmethod
    def is_in_top(cls, keys):
        reasons = cls.get_can_buy_key_set()
        forbidden_plates = cls.__KPLPlateForbiddenManager.list_all_cache()
        reasons = reasons - forbidden_plates
        temp_set = keys & reasons
        if temp_set:
            return True, temp_set
        else:
            return False, None
# 代码历史涨停原因与板块管理
class CodesHisReasonAndBlocksManager:
    __redisManager = redis_manager.RedisManager(1)
    # 历史涨停原因
    __history_limit_up_reason_dict = {}
    # 板块
    __blocks_dict = {}
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def set_history_limit_up_reason(self, code, reasons):
        self.__history_limit_up_reason_dict[code] = set(reasons)
        RedisUtils.setex(self.__get_redis(), f"kpl_his_limit_up_reason-{code}", tool.get_expire(),
                         json.dumps(list(reasons)))
    # 如果返回值不为None表示已经加载过历史原因了
    def get_history_limit_up_reason(self, code):
        reasons = self.__history_limit_up_reason_dict.get(code)
        if reasons is None:
            # 从内存中加载
            val = RedisUtils.get(self.__get_redis(), f"kpl_his_limit_up_reason-{code}")
            if val is not None:
                val = set(json.loads(val))
                self.__history_limit_up_reason_dict[code] = val
            if code in self.__history_limit_up_reason_dict:
                return self.__history_limit_up_reason_dict.get(code)
            else:
                return None
        else:
            return reasons
    def set_blocks(self, code, blocks):
        self.__blocks_dict[code] = set(blocks)
        RedisUtils.setex(self.__get_redis(), f"kpl_blocks-{code}", tool.get_expire(), json.dumps(list(blocks)))
    def get_blocks(self, code):
        reasons = self.__blocks_dict.get(code)
        if reasons is None:
            # 从内存中加载
            val = RedisUtils.get(self.__get_redis(), f"kpl_blocks-{code}")
            if val is not None:
                val = set(json.loads(val))
                self.__blocks_dict[code] = val
            if code in self.__blocks_dict:
                return self.__blocks_dict.get(code)
            else:
                return None
        else:
            return reasons
    def get_total_keys(self, code):
        reasons = self.get_history_limit_up_reason(code)
        if reasons is None:
            reasons = set()
        blocks = self.get_blocks(code)
        if blocks is None:
            blocks = set()
        return reasons | blocks
# 目标代码板块关键词管理
class TargetCodePlateKeyManager:
    __redisManager = redis_manager.RedisManager(1)
    __CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
    __KPLCodeJXBlockManager = KPLCodeJXBlockManager()
    def __get_redis(self):
        return self.__redisManager.getRedis()
    # 返回key集合(排除无效板块),今日涨停原因,今日历史涨停原因,历史涨停原因,二级,精选板块
    def get_plate_keys(self, code):
        keys = set()
        k1 = set()
        if code in LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict:
            k1 = {LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict[code]}
        # 加载今日历史原因,暂时不需要历史原因了
        k11 = set()  # RedisUtils.smembers(self.__get_redis(), f"kpl_limit_up_reason_his-{code}")
        k2 = self.__CodesPlateKeysManager.get_history_limit_up_reason(code)
        if k2 is None:
            k2 = set()
        k3 = set()
        industry = global_util.code_industry_map.get(code)
        if industry:
            k3 = {industry}
        k4 = set()
        jingxuan_blocks = self.__KPLCodeJXBlockManager.get_jx_blocks(code)
        if jingxuan_blocks:
            k4 |= set([x[1] for x in jingxuan_blocks])
        for k in [k1, k11, k2, k3, k4]:
            keys |= k
        # 排除无效的涨停原因
        keys = keys - set(constant.KPL_INVALID_BLOCKS)
        return keys, k1, k11, k2, k3, k4
class CodePlateKeyBuyManager:
    # 无板块
    BLOCK_TYPE_NONE = -1
    # 一般板块
    BLOCK_TYPE_COMMON = 0
    # 强势板块
    BLOCK_TYPE_STRONG = 1
    # 猛拉板块
    BLOCK_TYPE_SOON_LIMIT_UP = 2
    # 潜伏板块
    BLOCK_TYPE_START_UP = 3
    __TargetCodePlateKeyManager = TargetCodePlateKeyManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __CodesHisReasonAndBlocksManager = CodesHisReasonAndBlocksManager()
    # 获取可以买的板块
    # current_limit_up_datas: 今日实时涨停
    # latest_2_day_limit_up_datas:最近2天的实时涨停(不含今日)
    # limit_up_record_datas:今日历史涨停
    # yesterday_current_limit_up_codes : 昨日涨停代码
    # before_blocks_dict:历史涨停原因
    @classmethod
    def get_can_buy_block(cls, code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes,
                          before_blocks_dict):
        # 加载涨停代码的目标板块
        def load_code_block():
            for d in limit_up_record_datas:
                if d[2] in constant.KPL_INVALID_BLOCKS and d[3] in before_blocks_dict:
                    code_limit_up_reason_dict[d[3]] = list(before_blocks_dict.get(d[3]))[0]
                else:
                    code_limit_up_reason_dict[d[3]] = d[2]
            return code_limit_up_reason_dict
        now_time = int(tool.get_now_time_str().replace(":", ""))
        times = [100000, 103000, 110000, 133000, 150000]
        time_index = 0
        for i in range(len(times)):
            if now_time < times[i]:
                time_index = i
                break
        # 获取目标代码板块
        keys, k1, k11, k2, k3, k4 = cls.__TargetCodePlateKeyManager.get_plate_keys(code)
        keys = set()
        if k1:
            for k in k1:
                if k not in constant.KPL_INVALID_BLOCKS:
                    keys.add(k)
        if not keys:
            for k in k2:
                if k not in constant.KPL_INVALID_BLOCKS:
                    keys.add(k)
        # 始终获取精选板块
        if True:
            # 获取
            if k4:
                keys |= k4
        # 涨停列表中匹配关键词,返回(板块:代码集合),代码集合中已经排除自身
        if not keys:
            return None, "尚未找到涨停原因"
        code_limit_up_reason_dict = {}
        load_code_block()
        msg_list = []
        can_buy_blocks = []
        for block in keys:
            is_top_8_record, top_8_record = kpl_block_util.is_record_top_block(code, block, limit_up_record_datas,
                                                                               yesterday_current_limit_up_codes, 50)
            is_top_4_current, top_4_current = kpl_block_util.is_current_top_block(code, block, current_limit_up_datas,
                                                                                  yesterday_current_limit_up_codes, 50)
            is_top_4 = is_top_8_record and is_top_4_current
            msg_list.append(f"\n实时top10(涨停数量:{len(current_limit_up_datas)})")
            msg_list.append(f"历史top20(涨停数量:{len(top_8_record)})")
            # 获取主板实时身位,剔除高位板
            current_shsz_rank = kpl_block_util.get_code_current_rank(code, block, current_limit_up_datas,
                                                                     code_limit_up_reason_dict,
                                                                     yesterday_current_limit_up_codes, shsz=True)
            record_shsz_rank = kpl_block_util.get_code_record_rank(code, block, limit_up_record_datas,
                                                                   code_limit_up_reason_dict,
                                                                   yesterday_current_limit_up_codes, shsz=True)
            # 获取主板历史身位
            if is_top_4:
                pen_limit_up_codes = kpl_block_util.get_shsz_open_limit_up_codes(code, block, limit_up_record_datas,
                                                                                 code_limit_up_reason_dict)
                if pen_limit_up_codes:
                    # 主板开1
                    if current_shsz_rank < len(pen_limit_up_codes) + 1 and record_shsz_rank < len(
                            pen_limit_up_codes) + 1:
                        # 属于龙1,龙2
                        can_buy_blocks.append((block,
                                               f"{block}:top10涨停板块,主板开1({pen_limit_up_codes}),属于主板前龙{len(pen_limit_up_codes) + 1}(实时身位-{current_shsz_rank})"))
                        continue
                    else:
                        msg_list.append(
                            f"板块-{block}: top4涨停板块,主板开1({pen_limit_up_codes}),不为主板前龙{len(pen_limit_up_codes) + 1}(实时身位-{current_shsz_rank},历史身位-{record_shsz_rank})")
                        continue
                else:
                    if current_shsz_rank == 0 and record_shsz_rank < 2:
                        can_buy_blocks.append((block, f"{block}:top4涨停板块,非主板开1,属于龙1"))
                        continue
                    else:
                        msg_list.append(
                            f"板块-{block}: top4涨停板块,非主板开1,不为主板龙1(实时身位-{current_shsz_rank},历史身位-{record_shsz_rank})")
                        continue
            else:
                # 是否满足行业精选流入要求
                is_in_top_input = RealTimeKplMarketData.is_in_top(set([block]))[0]
                if not is_in_top_input:
                    msg_list.append(
                        f"板块-{block}: 非top4涨停板块,不满足精选/行业流入要求")
                    continue
                else:
                    # 是否为主板龙1(实时龙1,历史龙2以内)
                    if current_shsz_rank == 0 and record_shsz_rank < 2:
                        can_buy_blocks.append((block, f"{block}:不是top4涨停板块,满足精选/行业流入要求,满足主板龙1"))
                        continue
                    else:
                        msg_list.append(
                            f"板块-{block}: 不是top4涨停板块,满足精选/行业流入要求,不为主板龙1(实时身位-{current_shsz_rank},历史身位-{record_shsz_rank})")
                        continue
        if len(can_buy_blocks) == len(keys):
            blocks = [x[0] for x in can_buy_blocks]
            blocks_msg = "\n".join([x[1] for x in can_buy_blocks])
            return blocks, blocks_msg
        return None, "\n".join(msg_list)
    # 是否可以下单
    # 返回:是否可以下单,消息,板块类型
    @classmethod
    def can_buy(cls, code, current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes,
                before_blocks_dict):
        if constant.TEST:
            return True, "", cls.BLOCK_TYPE_NONE
        blocks, block_msg = cls.get_can_buy_block(code, current_limit_up_datas,
                                                  limit_up_record_datas, yesterday_current_limit_up_codes,
                                                  before_blocks_dict)
        if not blocks:
            return False, block_msg
        # ---------------------------------判断目标代码的板块-------------------start------------
        # 判断匹配出的涨停原因,判断是否有已经下单的票
        # reason_need_buy_dict = {}
        # for k in match_limit_up_result:
        #     codes = match_limit_up_result[k]
        #     final_codes_keys = [keys]
        #     for code_ in codes:
        #         temp_key_set = set()
        #         temp_key_set |= cls.__CodesHisReasonAndBlocksManager.get_total_keys(code_)
        #         temp = cls.__LimitUpCodesPlateKeyManager.total_code_keys_dict.get(code_)
        #         if temp:
        #             temp_key_set |= temp
        #         # 二级
        #         industry = global_util.code_industry_map.get(code_)
        #         if industry:
        #             temp_key_set.add(industry)
        #
        #         final_codes_keys.append(temp_key_set)
        #     # 求共同的关键词
        #     intersection = set(final_codes_keys[0])
        #     for s in final_codes_keys:
        #         intersection &= s
        #     log.logger_kpl_debug.info("{}的板块求交集:{}-{}", code, k, intersection)
        #
        #     # 求公共的板块是否在流入前5中
        #     is_in, valid_keys = RealTimeKplMarketData.is_in_top(intersection)
        #     if is_in:
        #         reason_need_buy_dict[k] = (is_in, valid_keys)
        # ---------------------------------判断目标代码的板块-------------------end------------
        # 获取板块可以下单的个数
        # can_buy_codes_count_dict = {}
        #
        # for key__ in match_limit_up_result:
        #     can_buy_count, msg = RealTimeKplMarketData.get_can_buy_codes_count(code, key__)
        #     can_buy_codes_count_dict[key__] = can_buy_count
        # has_available_key = False
        # for key in can_buy_codes_count_dict:
        #     if can_buy_codes_count_dict[key] > 0:
        #         has_available_key = True
        #         break
        # if not has_available_key:
        #     return False, f"匹配到的【{','.join(match_limit_up_result.keys())}】没在精选/行业可以买入的板块中"
        # ---------------------------------加载已经下单/成交的代码信息------------start-------------
        # match_reasons = match_limit_up_result.keys()
        # 判断匹配到的原因是否已经有下单/买入成功的代码
        codes_delegate = set(trade_manager.CodesTradeStateManager().get_codes_by_trade_states_cache(
            {trade_manager.TRADE_STATE_BUY_DELEGATED, trade_manager.TRADE_STATE_BUY_PLACE_ORDER}))
        codes_success = set(trade_manager.CodesTradeStateManager().get_codes_by_trade_states_cache(
            {trade_manager.TRADE_STATE_BUY_SUCCESS}))
        codes = codes_delegate | codes_success
        # 统计成交代码的板块
        trade_codes_blocks_dict = {}
        # 已经成交的板块
        trade_success_blocks_count = {}
        for c in codes:
            keys_, k1_, k11_, k2_, k3_, k4_ = cls.__TargetCodePlateKeyManager.get_plate_keys(c)
            # 实时涨停原因
            trade_codes_blocks_dict[c] = k1_ | k4_
        # 统计板块中的代码
        trade_block_codes_dict = {}
        for c in trade_codes_blocks_dict:
            for b in trade_codes_blocks_dict[c]:
                if c in codes_success:
                    if b not in trade_success_blocks_count:
                        trade_success_blocks_count[b] = set()
                    trade_success_blocks_count[b].add(c)
                if b not in trade_block_codes_dict:
                    trade_block_codes_dict[b] = set()
                trade_block_codes_dict[b].add(c)
        # ---------------------------------加载已经下单/成交的代码信息------------end-------------
        msg_list = []
        for key in blocks:
            # 板块中已经有成交的就不下单了
            if key in trade_success_blocks_count:
                success_codes_count = len(trade_success_blocks_count[key])
                if success_codes_count >= 1:
                    msg_list.append(f"【{key}】中已经有{success_codes_count}个成交代码")
                    continue
                # 10:30以后买1个
                if int(tool.get_now_time_str().replace(":", "")) > int("103000") and success_codes_count >= 1:
                    msg_list.append(f"【{key}】中已经有{success_codes_count}个成交代码")
                    continue
            return True, block_msg
            # 板块可以下单数量
            # if trade_block_codes_dict.get(key) is None or len(trade_block_codes_dict.get(key)) < \
            #         can_buy_codes_count_dict[key]:
            #     order_count = len(trade_block_codes_dict.get(key)) if key in trade_block_codes_dict else 0
            #     logger_kpl_block_can_buy.info(
            #         f"code={code}:【{key}】可以下单,现有数量:{order_count} 最大数量:{can_buy_codes_count_dict[key]}")
            #     return True, f"可以下单,板块:【{key}】,板块中已经下单的数量:{order_count}"
            # else:
            #     order_count = len(trade_block_codes_dict.get(key))
            #     msg_list.append(f"【{key}】中下单代码数量{order_count}/允许下单数量{can_buy_codes_count_dict[key]}")
        return False, ",".join(msg_list)
if __name__ == "__main__":
    pass
third_data/kpl_api.py
New file
@@ -0,0 +1,111 @@
import json
import requests
# 竞价
DABAN_TYPE_BIDDING = 8
# 涨停
DABAN_TYPE_LIMIT_UP = 1
# 炸板
DABAN_TYPE_OPEN_LIMIT_UP = 2
# 跌停
DABAN_TYPE_LIMIT_DOWN = 3
# 曾跌停
DABAN_TYPE_EVER_LIMIT_DOWN = 5
def __base_request(url, data):
    headers = {
        "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
        "User-Agent": "Dalvik / 2.1.0(Linux;U;Android 6.0.1;MuMu Build/V417IR)"
    }
    # proxies={'https': '192.168.3.251:9002'}
    # 禁止代理,不然会走本地代理
    response = requests.post(url, data=data, headers=headers, proxies={"http": None, "https": None})
    if response.status_code != 200:
        raise Exception("请求出错")
    return response.text
def daBanList(pidType):
    data = "Order=1&a=DaBanList&st=100&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23" \
           f"&VerSion=5.8.0.2&Index=0&Is_st=1&PidType={pidType}&apiv=w32&Type=4&FilterMotherboard=0&Filter=0&FilterTIB=0" \
           "&FilterGem=0 "
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    return result
# 市场行情-行业
def getMarketIndustryRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                            data=data)
    return result
# 市场行情-精选
def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                            data=data)
    return result
# 获取代码的板块
def getStockIDPlate(code):
    data = f"a=GetStockIDPlate_New&apiv=w32&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    result = json.loads(result)
    if int(result["errcode"]) != 0:
        return None
    return result["ListJX"] if result["ListJX"] else result["List"]
# 获取概念代码
def getCodesByPlate(plate_code):
    data = f"Order=1&a=ZhiShuStockList_W8&st=30&c=ZhiShuRanking&PhoneOSNew=1&old=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&IsZZ=0&Token=0&Index=0&apiv=w32&Type=6&IsKZZType=0&UserID=0&PlateID={plate_code}&"
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 获取概念中的板块强度
def getSonPlate(plate_code):
    data = f"a=SonPlate_Info&apiv=w32&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&PlateID={plate_code}&"
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
# 市场行情-行业
def getMarketIndustryRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&"
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 市场行情-精选
def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&"
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 获取代码的精选板块
# 返回格式:[(板块代码,板块名称,涨幅百分比)]
def getCodeJingXuanBlocks(code):
    data = f"a=GetStockIDPlate&apiv=w32&Type=2&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    result = json.loads(result)
    return result.get("ListJX")
# 获取自由流通市值
def getZYLTAmount(code):
    data = f"a=GetStockPanKou_Narrow&apiv=w32&c=StockL2Data&VerSion=5.8.0.2&State=1&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&StockID={code}&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    result = json.loads(result)
    if "real" in result:
        return result["real"].get("actualcirculation_value")
    return None
if __name__ == "__main__":
    print(getStockIDPlate("000333"))
third_data/kpl_block_util.py
New file
@@ -0,0 +1,173 @@
"""
开盘啦板块工具
"""
# 是否是强势板块
# current_limit_up_datas:实时涨停数据 (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额,涨停原因代码,涨停原因代码数量)
import datetime
import time
import constant
# 是否主板开1
# limit_up_record_datas 今日历史涨停
def get_shsz_open_limit_up_codes(code, block, limit_up_record_datas, code_block_dict):
    # 获取今日9:30的时间戳
    time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00"
    timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))
    limit_up_codes = set()
    for k in limit_up_record_datas:
        if code_block_dict.get(k[3]) == block:
            if int(k[5]) < timestamp:
                limit_up_codes.add(k[3])
    return limit_up_codes
# 代码是否是后排
def is_back_row(code, block, current_limit_up_datas):
    codes = set()
    for k in current_limit_up_datas:
        if k[5] == block:
            codes.add(k[0])
    codes.discard(code)
    if len(codes) == 0:
        return False
    else:
        return True
# 是否是前几的板块
# 板块中有主板涨停的才参与排序(排序时间按照板块中的涨停时间来排序)
def __is_top_block(block, block_codes_infos, topn):
    block_limit_up_dict = {}
    for b in block_codes_infos:
        if b[1] not in block_limit_up_dict:
            block_limit_up_dict[b[1]] = []
        block_limit_up_dict[b[1]].append(b)
    # 剔除只有非主板涨停的板块
    invalid_blocks = []
    for k in block_limit_up_dict:
        has_shsz = False
        for b in block_limit_up_dict[k]:
            if b[0].find('00') == 0 or b[0].find('60') == 0:
                has_shsz = True
                break
        if not has_shsz:
            invalid_blocks.append(k)
    for k in invalid_blocks:
        block_limit_up_dict.pop(k)
    # 每个板块涨停时间排序
    invalid_blocks = []
    for k in block_limit_up_dict:
        # 删除宽泛概念
        if k in constant.KPL_INVALID_BLOCKS:
            invalid_blocks.append(k)
            continue
        block_limit_up_dict[k].sort(key=lambda x: x[2])
    for k in invalid_blocks:
        block_limit_up_dict.pop(k)
    block_codes_infos = [block_limit_up_dict[k][0] for k in block_limit_up_dict]
    block_codes_infos.sort(key=lambda x: x[2])
    # 去除通用涨停原因
    index = 0
    for d in block_codes_infos:
        if d[1] == block:
            if index + 1 <= topn:
                return True, block_codes_infos[:topn]
            else:
                return False, block_codes_infos[:topn]
        index += 1
    if index <= topn:
        return True, block_codes_infos[:topn]
    return False, block_codes_infos[:topn]
def is_record_top_block(code, block, limit_up_record_datas, yesterday_current_limit_up_codes, topn):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in limit_up_record_datas:
        # 判断是否是首板
        if k[0] in yesterday_current_limit_up_codes:
            continue
        if k[3] != code:
            block_codes_infos.append((k[3], k[2], int(k[5])))
        else:
            limit_up_time = int(k[5])
    block_codes_infos.append((code, block, limit_up_time))
    # 排序
    return __is_top_block(block, block_codes_infos, topn)
def is_current_top_block(code, block, current_limit_up_datas, yesterday_current_limit_up_codes, topn):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in current_limit_up_datas:
        # 判断是否是首板
        if k[0] in yesterday_current_limit_up_codes:
            continue
        if k[0] != code:
            block_codes_infos.append((k[0], k[5], int(k[2])))
        else:
            limit_up_time = int(k[2])
    # 排序
    block_codes_infos.append((code, block, limit_up_time))
    # 排序
    return __is_top_block(block, block_codes_infos, topn)
# 获取当日历史身位
# shsz:是否主板
def get_code_record_rank(code, block, limit_up_record_datas, code_limit_up_reason_dict, yesterday_current_limit_up_codes, shsz=True):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in limit_up_record_datas:
        if shsz and k[3].find("00") != 0 and k[3].find("60") != 0:
            continue
        # 剔除高位板
        if k[3] in yesterday_current_limit_up_codes:
            continue
        if code_limit_up_reason_dict.get(k[3]) == block:
            if k[3] != code:
                block_codes_infos.append((k[3], int(k[5])))
            else:
                limit_up_time = int(k[5])
    block_codes_infos.append((code, limit_up_time))
    block_codes_infos.sort(key=lambda x: x[1])
    for i in range(0, len(block_codes_infos)):
        if block_codes_infos[i][0] == code:
            return i
    return 0
# 获取当日实时身位
# before_blocks_dict格式位{"代码":set("板块")}
def get_code_current_rank(code, block, current_limit_up_datas, code_limit_up_reason_dict, yesterday_current_limit_up_codes, shsz=False):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in current_limit_up_datas:
        if shsz and k[0].find("00") != 0 and k[0].find("60") != 0:
            continue
        # 剔除高位板
        if k[0] in yesterday_current_limit_up_codes:
            continue
        if code_limit_up_reason_dict.get(k[0]) == block:
            if k[0] != code:
                # 代码.涨停时间
                block_codes_infos.append((k[0], int(k[2])))
            else:
                limit_up_time = int(k[2])
    block_codes_infos.append((code, limit_up_time))
    block_codes_infos.sort(key=lambda x: x[1])
    for i in range(0, len(block_codes_infos)):
        if block_codes_infos[i][0] == code:
            return i
    return 0
if __name__ == "__main__":
    pass
third_data/kpl_data_manager.py
New file
@@ -0,0 +1,377 @@
import json
import os
import threading
import time
import requests
import constant
from db.redis_manager import RedisUtils
from utils import tool
# 开盘啦历史涨停数据管理
from db import mysql_data as mysql_data, redis_manager as redis_manager
from third_data import kpl_util, kpl_api
from third_data.code_plate_key_manager import LimitUpCodesPlateKeyManager, CodesHisReasonAndBlocksManager
# 代码对应的涨停原因保存
from third_data.kpl_util import KPLPlatManager, KPLDataType
class KPLCodeLimitUpReasonManager:
    __redisManager = redis_manager.RedisManager(3)
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def save_reason(self, code, reason):
        RedisUtils.setex(self.__get_redis(), f"kpl_limitup_reason-{code}", tool.get_expire(), reason)
    def list_all(self):
        keys = RedisUtils.keys(self.__get_redis(), "kpl_limitup_reason-*")
        dict_ = {}
        for k in keys:
            val = RedisUtils.get(self.__get_redis(), k)
            dict_[k.split("-")[1]] = val
        return dict_
class KPLLimitUpDataRecordManager:
    total_datas = None
    latest_datas = {}
    latest_origin_datas = []
    __kplPlatManager = KPLPlatManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __CodesPlateKeysManager = CodesHisReasonAndBlocksManager()
    @classmethod
    def __load_hist_and_blocks(cls, code):
        # 有数据新增,加载历史原因与板块
        his_reasons = cls.get_latest_infos(code, 10, False)
        his_reasons = set([r[0] for r in his_reasons])
        cls.__CodesPlateKeysManager.set_history_limit_up_reason(code, his_reasons)
        try:
            if not cls.__CodesPlateKeysManager.get_blocks(code):
                results = kpl_api.getStockIDPlate(code)
                bs = [r[1] for r in results]
                cls.__CodesPlateKeysManager.set_blocks(code, bs)
        except Exception as e:
            pass
    @classmethod
    def save_record(cls, day, records):
        # 统计代码所属板块
        code_block_dict = {}
        for data in records:
            blocks = set(data[5].split("、"))
            code = data[0]
            for b in blocks:
                if not code_block_dict.get(code):
                    code_block_dict[code] = set()
                code_block_dict[code].add(b)
                # 设置涨停数据
        if records:
            cls.latest_origin_datas = records
            cls.__LimitUpCodesPlateKeyManager.set_today_limit_up([(r[0], r[5]) for r in records])
        # 涨停数据记录
        mysqldb = mysql_data.Mysqldb()
        # 统计涨停原因和概念代码
        plats = {}
        for d in records:
            plats[d[5]] = d[9]
        for p in plats:
            cls.__kplPlatManager.save_plat(plats[p], p)
        for d in records:
            # (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额,涨停原因代码,涨停原因代码数量)
            code = d[0]
            _id = f"{day}_{code}_{d[5]}"
            result = mysqldb.select_one("select * from kpl_limit_up_record where _id='{}'".format(_id))
            if not result:
                mysqldb.execute(
                    f"insert into kpl_limit_up_record(_id,_day,_hot_block_name,_code,_code_name,_limit_up_time,_blocks,_latest_limit_up_time,_update_time,_create_time,_hot_block_code_count,_limit_up_high_info,_zylt_val) values('{_id}','{day}','{d[5]}','{d[0]}','{d[1]}','{d[2]}','{d[6]}','{d[3]}',now(),now(),{d[10]},'{d[4]}',{d[7]})")
                cls.__load_hist_and_blocks(code)
            else:
                if _id in cls.latest_datas and json.dumps(cls.latest_datas.get(_id)) != json.dumps(d):
                    mysqldb.execute(
                        f"update kpl_limit_up_record set _latest_limit_up_time='{d[3]}',_limit_up_time='{d[2]}',_hot_block_code_count={d[10]},_limit_up_high_info='{d[4]}' ,_update_time=now() where _id='{_id}'")
                    cls.latest_datas[_id] = d
            cls.latest_datas[_id] = d
            # 获取原来的代码所属板块,删除之前错误的板块
            old_datas = KPLLimitUpDataRecordManager.list_by_code(code, day)
            if old_datas:
                for dd in old_datas:
                    if dd[2] not in code_block_dict[code]:
                        mysqldb.execute(f"delete from kpl_limit_up_record where _id='{dd[0]}'")
                        logger_kpl_limit_up_reason_change.info(f"code-{dd[3]}:{dd[2]}-{code_block_dict[code]}")
                        # 板块更改过
                        mysqldb.execute(
                            f"update kpl_limit_up_record set _hot_block_change = f'{dd[2]}' where _day='{dd[1]}' and _code='{code}'")
                        cls.__LimitUpCodesPlateKeyManager.set_today_limit_up_reason_change(code, dd[2],
                                                                                           code_block_dict[code])
                        if dd[0] in cls.latest_datas:
                            cls.latest_datas.pop(dd[0])
        cls.total_datas = KPLLimitUpDataRecordManager.list_all(tool.get_now_date_str())
    @classmethod
    def load_total_datas(cls):
        cls.total_datas = KPLLimitUpDataRecordManager.list_all(tool.get_now_date_str())
        cls.__LimitUpCodesPlateKeyManager.set_today_total_limit_up([(r[3], r[2]) for r in cls.total_datas])
        for d in cls.total_datas:
            cls.__load_hist_and_blocks(d[3])
    @staticmethod
    def list_all(day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(f"select * from kpl_limit_up_record where _day='{day}'")
    @staticmethod
    def list_by_code(code, day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(f"select * from kpl_limit_up_record where _code='{code}' and _day='{day}'")
    @staticmethod
    def list_by_block(block_name, day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(
            f"select * from kpl_limit_up_record where _hot_block_name='{block_name}' and _day='{day}'")
    @staticmethod
    def list_blocks_with_day(days):
        mysqldb = mysql_data.Mysqldb()
        sql = "select _hot_block_name,_day from kpl_limit_up_record where "
        wheres = []
        for day in days:
            wheres.append(f"_day = '{day}'")
        sql += " or ".join(wheres)
        sql += " group by _hot_block_name,_day"
        results = mysqldb.select_all(sql)
        return results
    @staticmethod
    def get_latest_blocks(code):
        wheres = []
        for b in constant.KPL_INVALID_BLOCKS:
            wheres.append(f"hb.`_hot_block_name` != '{b}'")
        wheres = " and ".join(wheres)
        sql = f"SELECT GROUP_CONCAT(_hot_block_name) FROM (SELECT hb.`_hot_block_name`,hb.`_day` FROM `kpl_limit_up_record` hb WHERE hb.`_code`='{code}' AND {wheres} ORDER BY hb.`_day` DESC LIMIT 2) a  GROUP BY a._day ORDER BY a._day DESC LIMIT 1"
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_one(sql)
    # 获取代码最近的板块,返回[(板块,日期)]
    @classmethod
    def get_latest_infos(cls, code, count, contains_today=True):
        wheres = []
        for b in constant.KPL_INVALID_BLOCKS:
            wheres.append(f"hb.`_hot_block_name` != '{b}'")
        wheres = " and ".join(wheres)
        # 只获取最近180天的数据
        min_day = tool.date_sub(tool.get_now_date_str(), 180)
        sql = f"SELECT GROUP_CONCAT(_hot_block_name),`_day`,_blocks FROM (SELECT hb.`_hot_block_name`,hb.`_day`,hb._blocks FROM `kpl_limit_up_record` hb WHERE hb.`_code`='{code}' and {wheres} and hb.`_day` > '{min_day}' ORDER BY hb.`_day` DESC LIMIT 10) a  GROUP BY a._day ORDER BY a._day DESC LIMIT {count}"
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_all(sql)
        if results and not contains_today and results[0][1] == tool.get_now_date_str():
            return results[1:]
        return results
    @classmethod
    def get_latest_blocks_set(cls, code):
        results = cls.get_latest_infos(code, 2, False)
        bs = set([b[0] for b in results])
        return bs
class KPLDataManager:
    __latest_datas = {}
    kpl_data_update_info = {}
    @classmethod
    def __save_in_file(cls, key, datas):
        name = f"{tool.get_now_date_str()}_{key}.log"
        path = f"{constant.CACHE_PATH}/{name}"
        with open(path, 'w') as f:
            f.write(json.dumps(datas))
    @classmethod
    def __get_from_file(cls, key, day=tool.get_now_date_str()):
        name = f"{day}_{key}.log"
        path = f"{constant.CACHE_PATH}/{name}"
        if not os.path.exists(path):
            return None
        with open(path, 'r') as f:
            lines = f.readlines()
            if lines:
                return json.loads(lines[0])
        return None
    @classmethod
    def get_from_file(cls, type, day):
        name = f"{day}_{type.value}.log"
        path = f"{constant.CACHE_PATH}/{name}"
        if not os.path.exists(path):
            return None
        with open(path, 'r') as f:
            lines = f.readlines()
            if lines:
                return json.loads(lines[0])
        return None
    @classmethod
    # 获取最近几天的数据,根据日期倒序返回
    def get_latest_from_file(cls, type, count):
        files = os.listdir(constant.CACHE_PATH)
        file_name_list = []
        for f in files:
            if f[10:] == f"_{type.value}.log":
                file_name_list.append((f.split("_")[0], f))
        file_name_list.sort(key=lambda x: x[0], reverse=True)
        file_name_list = file_name_list[:count]
        fresults = []
        for file in file_name_list:
            path = f"{constant.CACHE_PATH}/{file[1]}"
            if not os.path.exists(path):
                continue
            with open(path, 'r') as f:
                lines = f.readlines()
                if lines:
                    fresults.append((file[0], json.loads(lines[0])))
        return fresults
    @classmethod
    def save_data(cls, type, datas):
        cls.kpl_data_update_info[type] = (tool.get_now_time_str(), len(datas))
        cls.__latest_datas[type] = datas
        cls.__save_in_file(type, datas)
    @classmethod
    def get_data(cls, type):
        type = type.value
        if type in cls.__latest_datas:
            return cls.__latest_datas[type]
        result = cls.__get_from_file(type)
        if result is not None:
            cls.__latest_datas[type] = result
        return result
def load_history_limit_up():
    for file_name in os.listdir(f"{constant.get_path_prefix()}/kpl/his"):
        if file_name.find("HisDaBanList_1.log") < 0:
            continue
        day = file_name[:10]
        with open(f"{constant.get_path_prefix()}/kpl/his/{file_name}", 'r', encoding="utf-16") as f:
            lines = f.readlines()
            line = lines[0]
            result = json.loads(line)
            list_ = kpl_util.parseDaBanData(result, kpl_util.DABAN_TYPE_LIMIT_UP)
            # KPLLimitUpDataRecordManager.save_record(day, list_)
            for r in list_:
                print(r[-1], r[5])
                KPLPlatManager().save_plat(r[-1], r[5])
            # print(day, list_)
# 历史涨停列表
__limit_up_list_records_dict = {}
# 获取最近几天的实时涨停信息
# 返回格式([日期,数据])
def get_current_limit_up_data_records(count):
    fresults = []
    day = tool.get_now_date_str()
    datas = []
    if day in __limit_up_list_records_dict:
        datas = __limit_up_list_records_dict[day]
    else:
        datas = KPLDataManager().get_latest_from_file(KPLDataType.LIMIT_UP, 10)
        if datas:
            # 保存数据
            __limit_up_list_records_dict[day] = datas
    for i in range(len(datas)):
        if datas[i][0] == day:
            continue
        fresults.append(datas[i])
        if len(fresults) >= count:
            break
    return fresults
def get_yesterday_limit_up_codes():
    yesterday_limit_up_data_records = get_current_limit_up_data_records(1)[0][1]
    yesterday_codes = set([x[0] for x in yesterday_limit_up_data_records])
    return yesterday_codes
# 运行拉取任务
def run_pull_task():
    def __upload_data(type, datas):
        root_data = {
            "type": type,
            "data": datas
        }
        requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data))
    def get_limit_up():
        while True:
            if tool.is_trade_time():
                try:
                    results = kpl_api.daBanList(kpl_api.DABAN_TYPE_LIMIT_UP)
                    result = json.loads(results)
                    __upload_data("limit_up", result)
                except Exception as e:
                    pass
            time.sleep(3)
    def get_bidding_money():
        # 竞价数据上传
        while True:
            if int("092600") < int(tool.get_now_time_str().replace(":", "")) < int("092700"):
                try:
                    results = kpl_api.daBanList(kpl_api.DABAN_TYPE_BIDDING)
                    result = json.loads(results)
                    __upload_data("biddings", result)
                except Exception as e:
                    pass
            time.sleep(3)
    def get_market_industry():
        while True:
            if tool.is_trade_time():
                try:
                    results = kpl_api.getMarketIndustryRealRankingInfo()
                    result = json.loads(results)
                    __upload_data("industry_rank", result)
                except:
                    pass
            time.sleep(3)
    def get_market_jingxuan():
        while True:
            if tool.is_trade_time():
                try:
                    results = kpl_api.getMarketJingXuanRealRankingInfo()
                    result = json.loads(results)
                    __upload_data("jingxuan_rank", result)
                except:
                    pass
            time.sleep(3)
    threading.Thread(target=get_limit_up, daemon=True).start()
    threading.Thread(target=get_bidding_money, daemon=True).start()
    threading.Thread(target=get_market_industry, daemon=True).start()
    threading.Thread(target=get_market_jingxuan, daemon=True).start()
if __name__ == "__main__":
    run_pull_task()
    input()
third_data/kpl_util.py
New file
@@ -0,0 +1,268 @@
import enum
import json
from db import mysql_data as mysql_data
def parse_kpl_datas(results):
    start_y = -1
    end_x = -1
    index = 0
    datas = []
    for result in results:
        text = result[1]
        if text.find("股票名称") > -1:
            start_y = result[0][0][1]
        if text.find("竞价涨幅") > -1:
            end_x = result[0][0][0]
        if start_y > 0 and end_x > 0:
            if result[0][0][0] < end_x and result[0][0][1] > start_y and (result[0][1][0] - result[0][0][0]) > 30:
                datas.append(text)
                index += 1
    datas = datas[:3 * 5]
    fdatas = []
    temp = []
    for data in datas:
        temp.append(data)
        if len(temp) == 3:
            fdatas.append((temp[2][:6], temp[1]))
            temp = []
    return fdatas
# 涨停代码:
# (代码,名称,首次涨停时间,最近涨停时间,几板,涨停原因,板块,实际流通,主力净额,涨停原因代码,涨停原因代码数量)
# (0,1,6,25,9,16,11,15,12)
# 竞价代码:
# (代码,名称,涨停委买额,板块,竞价成交额,实际流通)
# (0,1,18,11,22,15)
# 炸板:
# (代码,名称,涨幅,板块,实际流通)
# (0,1,4,11,15)
# 跌停:
# (代码,名称,板块,实际流通)
# (0,1,11,15)
# 曾跌停:
# (代码,名称,涨幅,板块,实际流通)
# (0,1,4,11,15)
DABAN_TYPE_BIDDING = 8
DABAN_TYPE_LIMIT_UP = 1
DABAN_TYPE_OPEN_LIMIT_UP = 2
DABAN_TYPE_LIMIT_DOWN = 3
DABAN_TYPE_EVER_LIMIT_DOWN = 5
class KPLDataType(enum.Enum):
    BIDDING = "biddings"
    LIMIT_UP = "limit_up"
    OPEN_LIMIT_UP = "open_limit_up"
    LIMIT_DOWN = "limit_down"
    EVER_LIMIT_DOWN = "ever_limit_down"
    FENG_KOU = "feng_kou"
    BEST_FENG_KOU = "best_feng_kou"
    FENG_XIANG = "feng_xiang"
    INDUSTRY_RANK = "industry_rank"
    JINGXUAN_RANK = "jingxuan_rank"
def __parseDaBanItemData(data, type):
    if type == DABAN_TYPE_BIDDING:
        return data[0], data[1], data[18], data[11], data[22], data[15]
    elif type == DABAN_TYPE_LIMIT_UP:
        return data[0], data[1], data[6], data[25], data[9], data[16], data[11], data[15], data[12], data[26], data[27]
    elif type == DABAN_TYPE_OPEN_LIMIT_UP:
        return data[0], data[1], data[4], data[11], data[15]
    elif type == DABAN_TYPE_LIMIT_DOWN:
        return data[0], data[1], data[11], data[15]
    elif type == DABAN_TYPE_EVER_LIMIT_DOWN:
        return data[0], data[1], data[4], data[11], data[15]
    return None
# 最强风口
# (代码,名称,强度,涨幅,热门板块,所有板块)
def __parseBestFengKouItemData(data):
    return data[0], data[1], data[2], data[4], data[12], data[10]
# 市场风口
# (代码,名称,涨幅,主力净额,风口概念)
def __parseFengKouItemData(data):
    return data[0], data[1], data[3], data[7], data[11]
# 风向标
# (代码, 名称, 现价, 涨幅, 板块, 300万大单净额, 主力净额, 自由市值)
def __parseFengXiangBiaoItemData(data):
    return data[0], data[1], data[5], data[6], data[4], data[-3], data[13], data[10]
# 行业涨幅
# (代码,名称,主力净额,涨跌幅)
def __parseIndustry_rank(data):
    return data[0], data[1], data[6], data[3]
def parseDaBanData(data, type_):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["list"]
    fresult_ = []
    for d in list_:
        pdata = __parseDaBanItemData(d, type_)
        if pdata:
            fresult_.append(pdata)
    return fresult_
def parseFengKou(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["List"]
    fresult_ = []
    for d in list_:
        pdata = __parseFengKouItemData(d)
        if pdata:
            fresult_.append(pdata)
    return fresult_
def parseBestFengKou(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["List"]
    fresult_ = []
    for d in list_:
        pdata = __parseBestFengKouItemData(d)
        if pdata:
            fresult_.append(pdata)
    return fresult_
def parseFengXiang(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["list"]
    fresult_ = []
    for d in list_:
        pdata = __parseFengXiangBiaoItemData(d)
        if pdata:
            fresult_.append(pdata)
    return fresult_
def parseIndustryRank(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["list"]
    fresult_ = []
    for d in list_:
        pdata = __parseIndustry_rank(d)
        if pdata:
            fresult_.append(pdata)
    return fresult_
# 解析板块代码
def parsePlateCodes(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["list"]
    fresult_ = []
    for d in list_:
        # (代码,名称,现价,涨幅,自由流通,几板,龙几,主力净额,300w净额,机构增仓)
        fresult_.append((d[0], d[1], d[5], d[6], d[10], d[23], d[24], d[13], d[50], d[42]))
    return fresult_
# 解析概念中的板块强度
def parseSonPlat(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["List"]
    fresult_ = []
    for d in list_:
        # (代码,名称,强度)
        fresult_.append((d[0], d[1], d[2]))
    return fresult_
def __money_desc(money):
    if abs(money) > 100000000:
        return f"{round(money / 100000000, 2)}亿"
    else:
        return f"{round(money / 10000, 2)}万"
def parseMarketIndustry(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["list"]
    fresult_ = []
    for d in list_:
        # (代码,名称,涨幅,主力净额)
        fresult_.append((d[0], d[1], d[3], d[6]))
    return fresult_
def parseMarketJingXuan(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["list"]
    fresult_ = []
    for d in list_:
        # (代码,名称,强度,主力净额)
        fresult_.append((d[0], d[1], d[2], d[6]))
    return fresult_
class KPLPlatManager:
    def save_plat(self, _id, name):
        if not _id:
            return
        mysqldb = mysql_data.Mysqldb()
        key = f"{_id}-{name}"
        results = mysqldb.select_one(f"select * from kpl_plate where _name='{name}'")
        if not results:
            mysqldb.execute(f"insert into kpl_plate(_id,_name,_key) values({_id},'{name}','{key}')")
    def get_plat(self, name):
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_one(f"select * from kpl_plate where _name='{name}'")
        if results:
            return results[0]
        return None
    def get_same_plat_names(self, name):
        mysqldb = mysql_data.Mysqldb()
        plate = self.get_plat(name)
        if not plate:
            return {name}
        results = mysqldb.select_all(f"select _name from kpl_plate where _id='{plate}'")
        return set([r[0] for r in results])
    def get_same_plat_names_by_id(self, id_):
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_all(f"select _name from kpl_plate where _id='{id_}'")
        return set([r[0] for r in results])
trade/l2_trade_util.py
New file
@@ -0,0 +1,140 @@
# 是否在禁止交易代码中
from db import redis_manager as redis_manager
from db.redis_manager import RedisUtils
from utils import tool
__redis_manager = redis_manager.RedisManager(2)
class WhiteListCodeManager:
    __instance = None
    __redis_manager = redis_manager.RedisManager(2)
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(WhiteListCodeManager, cls).__new__(cls, *args, **kwargs)
            # 初始化设置
            # 获取交易窗口的锁
            cls.__instance.__white_codes_cache = RedisUtils.smembers(cls.__get_redis(), "white_list_codes")
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def sync(self):
        data = RedisUtils.smembers(self.__get_redis(), "white_list_codes")
        self.__white_codes_cache.clear()
        if data:
            self.__white_codes_cache |= data
    def add_code(self, code):
        self.__white_codes_cache.add(code)
        RedisUtils.sadd(self.__get_redis(), "white_list_codes", code)
        RedisUtils.expire(self.__get_redis(), "white_list_codes", tool.get_expire())
    def remove_code(self, code):
        self.__white_codes_cache.discard(code)
        RedisUtils.srem(self.__get_redis(), "white_list_codes", code)
    def is_in(self, code):
        return RedisUtils.sismember(self.__get_redis(), "white_list_codes", code)
    def is_in_cache(self, code):
        return code in self.__white_codes_cache
    def list_codes(self):
        return RedisUtils.smembers(self.__get_redis(), "white_list_codes")
    def list_codes_cache(self):
        return self.__white_codes_cache
    def clear(self):
        self.__white_codes_cache.clear()
        RedisUtils.delete(self.__get_redis(), "white_list_codes")
class BlackListCodeManager:
    __instance = None
    __redis_manager = redis_manager.RedisManager(2)
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(BlackListCodeManager, cls).__new__(cls, *args, **kwargs)
            # 初始化设置
            # 获取交易窗口的锁
            cls.__instance.__forbidden_trade_codes_cache = RedisUtils.smembers(cls.__get_redis(),
                                                                               "forbidden-trade-codes")
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def add_code(self, code):
        self.__forbidden_trade_codes_cache.add(code)
        RedisUtils.sadd(self.__get_redis(), "forbidden-trade-codes", code)
        RedisUtils.expire(self.__get_redis(), "forbidden-trade-codes", tool.get_expire())
    def sync(self):
        data = RedisUtils.smembers(self.__get_redis(),
                                   "forbidden-trade-codes")
        self.__forbidden_trade_codes_cache.clear()
        if data:
            self.__forbidden_trade_codes_cache |= data
    def remove_code(self, code):
        self.__forbidden_trade_codes_cache.discard(code)
        RedisUtils.srem(self.__get_redis(), "forbidden-trade-codes", code)
    def is_in(self, code):
        return RedisUtils.sismember(self.__get_redis(), "forbidden-trade-codes", code)
    def is_in_cache(self, code):
        return code in self.__forbidden_trade_codes_cache
    def list_codes(self):
        codes = RedisUtils.smembers(self.__get_redis(), "forbidden-trade-codes")
        self.__forbidden_trade_codes_cache = codes
        return codes
    def list_codes_cache(self):
        return self.__forbidden_trade_codes_cache
    def clear(self):
        self.__forbidden_trade_codes_cache.clear()
        RedisUtils.delete(self.__get_redis(), "forbidden-trade-codes")
#  初始化禁止交易代码库
def init_forbidden_trade_codes():
    BlackListCodeManager().clear()
    BlackListCodeManager().add_code("000000")
# 移除禁止交易代码
def remove_from_forbidden_trade_codes(code):
    BlackListCodeManager().remove_code(code)
# 添加代码到禁止交易
def add_to_forbidden_trade_codes(code):
    BlackListCodeManager().add_code(code)
# 禁止代码交易
def forbidden_trade(code):
    add_to_forbidden_trade_codes(code)
    # l2_data_manager.remove_from_l2_fixed_codes(code)
    # l2_code_operate.L2CodeOperate.get_instance().remove_l2_listen(code, "禁止代码交易")
def is_in_forbidden_trade_codes(code):
    return BlackListCodeManager().is_in_cache(code)
if __name__ == "__main__":
    # add_to_forbidden_trade_codes("000977")
    WhiteListCodeManager().add_code("002977")
trade/trade_manager.py
New file
@@ -0,0 +1,306 @@
"""
交易管理器,
对一系列的代码交易变量,下单,撤单进行管理
"""
# 交易管理器
import datetime
import json
from db import mysql_data as mysql_data, redis_manager as redis_manager
from db.redis_manager import RedisUtils
from utils import tool
__db = 2
__redis_manager = redis_manager.RedisManager(2)
# 未交易
TRADE_STATE_NOT_TRADE = 0
# 下单
TRADE_STATE_BUY_PLACE_ORDER = 10
# 已委托买
TRADE_STATE_BUY_DELEGATED = 11
# 委托取消进行中
TRADE_STATE_BUY_CANCEL_ING = 13
# 撤销成功
TRADE_STATE_BUY_CANCEL_SUCCESS = 14
# 买成功
TRADE_STATE_BUY_SUCCESS = 12
latest_trade_delegate_data = []
# 关闭购买入口
# 开启购买入口
class TradeStateManager:
    __instance = None
    redisManager = redis_manager.RedisManager(2)
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradeStateManager, cls).__new__(cls, *args, **kwargs)
            cls.__instance.__trade_buy_state_cache = cls.is_can_buy()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    def sync(self):
        self.__trade_buy_state_cache = self.is_can_buy()
    # 开启购买入口
    def open_buy(self):
        self.__trade_buy_state_cache = True
        RedisUtils.setex_async(self.__get_redis(), "trade_buy_state", tool.get_expire(), 1)
    # 关闭购买入口
    def close_buy(self):
        self.__trade_buy_state_cache = False
        RedisUtils.setex_async(self.__get_redis(), "trade_buy_state", tool.get_expire(), 0)
    # 是否可以下单
    @classmethod
    def is_can_buy(cls):
        # 默认设置为可交易
        val = RedisUtils.get(cls.__get_redis(), "trade_buy_state")
        if val is None:
            return True
        if int(val) == 1:
            return True
        else:
            return False
        # 是否可以下单
    def is_can_buy_cache(self):
        # 默认设置为可交易
        return self.__trade_buy_state_cache
# 交易目标票模式
class TradeTargetCodeModeManager:
    # 只买想买单
    MODE_ONLY_BUY_WANT_CODES = 1
    # 买所有
    MODE_BUY_ALL = 0
    __instance = None
    redisManager = redis_manager.RedisManager(2)
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradeTargetCodeModeManager, cls).__new__(cls, *args, **kwargs)
            # 初始化设置
            # 获取交易窗口的锁
            cls.__instance.__trade_buy_mode_cache = cls.get_mode()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    # 开启购买入口
    def sync(self):
        self.__trade_buy_mode_cache = self.get_mode()
    def set_mode(self, mode):
        if mode != self.MODE_ONLY_BUY_WANT_CODES and mode != self.MODE_BUY_ALL:
            raise Exception("mode参数值错误")
        self.__trade_buy_mode_cache = mode
        RedisUtils.setex(self.__get_redis(), "trade_buy_mode", tool.get_expire(), mode)
    # 是否可以下单
    @classmethod
    def get_mode(cls):
        # 默认设置为可交易
        val = RedisUtils.get(cls.__get_redis(), "trade_buy_mode")
        if val is None:
            return cls.MODE_BUY_ALL
        return int(val)
    def get_mode_cache(self):
        return self.__trade_buy_mode_cache
# 根据分数禁止买的票管理
class ForbiddenBuyCodeByScoreManager:
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(ForbiddenBuyCodeByScoreManager, cls).__new__(cls, *args, **kwargs)
            cls.__instance.__redisManager = redis_manager.RedisManager(2)
            cls.__instance.__key = "forbidden_codes_by_score"
            cls.__instance.__forbidden_codes_by_score_cache = RedisUtils.smembers(cls.__instance.__get_redis(),
                                                                                  cls.__instance.__key)
        return cls.__instance
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def add_code(self, code):
        self.__forbidden_codes_by_score_cache.add(code)
        RedisUtils.sadd(self.__get_redis(), self.__key, code)
    def remove_code(self, code):
        self.__forbidden_codes_by_score_cache.discard(code)
        RedisUtils.srem(self.__get_redis(), self.__key, code)
    def is_in(self, code):
        return RedisUtils.sismember(self.__get_redis(), self.__key, code)
    def is_in_cache(self, code):
        return code in self.__forbidden_codes_by_score_cache
    def clear(self):
        self.__forbidden_codes_by_score_cache.clear()
        RedisUtils.delete(self.__get_redis(), self.__key)
# 代码的交易状态管理
class CodesTradeStateManager:
    __trade_state_cache = {}
    __db = 2
    __redis_manager = redis_manager.RedisManager(2)
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodesTradeStateManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            # 初始化数据
            keys = RedisUtils.keys(__redis, "trade-state-*", auto_free=False)
            if keys:
                for key in keys:
                    code = key.replace("trade-state-", '')
                    cls.__trade_state_cache[code] = int(RedisUtils.get(__redis, key, auto_free=False))
        finally:
            RedisUtils.realse(__redis)
    # 获取交易状态
    def get_trade_state(self, code):
        state = RedisUtils.get(self.__get_redis(), "trade-state-{}".format(code))
        if state is None:
            return TRADE_STATE_NOT_TRADE
        return int(state)
    def get_trade_state_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__trade_state_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return TRADE_STATE_NOT_TRADE
    # 设置交易状态
    def set_trade_state(self, code, state):
        tool.CodeDataCacheUtil.set_cache(self.__trade_state_cache, code, state)
        RedisUtils.setex_async(self.__db, "trade-state-{}".format(code), tool.get_expire(), state)
    def get_codes_by_trade_state(self, state):
        redis = self.__get_redis()
        try:
            keys = RedisUtils.keys(redis, "trade-state-*", auto_free=False)
            codes = []
            if keys is not None:
                for key in keys:
                    if int(RedisUtils.get(redis, key, auto_free=False)) == state:
                        codes.append(key.replace("trade-state-", ''))
            return codes
        finally:
            RedisUtils.realse(redis)
    def get_codes_by_trade_states(self, states):
        redis = self.__get_redis()
        try:
            keys = RedisUtils.keys(redis, "trade-state-*", auto_free=False)
            codes = []
            if keys is not None:
                for key in keys:
                    if int(RedisUtils.get(redis, key, auto_free=False)) in states:
                        codes.append(key.replace("trade-state-", ''))
            return codes
        finally:
            RedisUtils.realse(redis)
    def get_codes_by_trade_states_cache(self, states):
        # 获取
        codes = []
        for code in self.__trade_state_cache:
            if self.__trade_state_cache[code] in states:
                codes.append(code)
        return codes
    # 设置交易账户的可用金额
# 账户可用资金管理
class AccountAvailableMoneyManager:
    __db = 2
    __redis_manager = redis_manager.RedisManager(2)
    __available_money_cache = None
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(AccountAvailableMoneyManager, cls).__new__(cls, *args, **kwargs)
            __redis = cls.__get_redis()
            result = RedisUtils.get(cls.__get_redis(), "trade-account-canuse-money")
            if result:
                cls.__available_money_cache = round(float(result), 2)
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def set_available_money(self, client_id, money):
        self.__available_money_cache = round(float(money), 2)
        RedisUtils.set(self.__get_redis(), "trade-account-canuse-money", money)
    # 获取交易账户的可用金额
    def get_available_money(self):
        result = RedisUtils.get(self.__get_redis(), "trade-account-canuse-money")
        if result is None:
            return None
        return round(float(result), 2)
    def get_available_money_cache(self):
        return self.__available_money_cache
# 获取交易成功数据
def get_trade_success_data():
    day = datetime.datetime.now().strftime("%Y%m%d")
    mysqldb = mysql_data.Mysqldb()
    results = mysqldb.select_all("select * from ths_trade_success_record where day='{}'".format(day))
    datas = []
    for result in results:
        data = {"_id": result[0], "code": result[1], "money": result[2], "num": result[3], "price": result[4],
                "time": result[5], "trade_num": result[6], "type": result[7], "day": result[8],
                "create_time": result[9]}
        datas.append(data)
    return datas, RedisUtils.get(__redis_manager.getRedis(), "trade-success-latest-time")
# 获取交易委托数据
def get_trade_delegate_data():
    redis = __redis_manager.getRedis()
    try:
        result = RedisUtils.get(redis, "trade-delegate-latest", auto_free=False)
        time_str = RedisUtils.get(redis, "trade-delegate-latest-time", auto_free=False)
        if result is None:
            return [], time_str
        else:
            return json.loads(result), time_str
    finally:
        RedisUtils.realse(redis)
utils/global_util.py
New file
@@ -0,0 +1,32 @@
"""
全局临时变量
"""
# 代码行业映射
code_industry_map = {}
# 行业代码映射
industry_codes_map = {}
# 自由流通市值映射
zyltgb_map = {}
# 今日涨停代码隐射
today_limit_up_codes = {}
# 行业热度指数
industry_hot_num = {}
# 涨停股票的涨幅
limit_up_codes_percent = {}
# 名称代码映射
name_codes = {}
# 今日量
today_volumn = {}
# 60日最大量
max60_volumn = {}
# 昨日量
yesterday_volumn = {}
# 大单
big_money_num = {}
# 涨停时间
limit_up_time = {}
# 现价
cuurent_prices = {}
utils/hosting_api_util.py
@@ -35,7 +35,7 @@
API_TYPE_CODE_TRADE_STATE = "code_trade_state"  # 代码交易状态
API_TYPE_GET_ENV = "get_env"  # 获取环境信息
# 超时时间2s
TIMEOUT = 2.0
TIMEOUT = 5.0
# 等待响应的request_id
__request_response_dict = {}