Administrator
2023-08-03 4d321ae069fce2c0f6f7884bec8fa6c4ec534d1a
redis连接池归还
24个文件已修改
957 ■■■■ 已修改文件
code_attribute/big_money_num_manager.py 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/code_data_util.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/code_volumn_manager.py 59 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_first_screen_manager.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_manager.py 300 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/limit_up_time_manager.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/settings.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager.py 159 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
inited_data.py 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager.py 71 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/safe_count_manager.py 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/history_k_data_util.py 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths/client_manager.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths/l2_code_operate.py 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths/l2_listen_pos_health_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths/ths_util.py 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_api_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_data_manager.py 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 113 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/big_money_num_manager.py
@@ -19,24 +19,20 @@
def add_num(code, num):
    redis = __redisManager.getRedis()
    RedisUtils.incrby(redis,"big_money-{}".format(code), num)
    RedisUtils.incrby(__redisManager.getRedis(), "big_money-{}".format(code), num)
# 设置过期时间
def expire(code):
    redis = __redisManager.getRedis()
    RedisUtils.expire(redis, "big_money-{}".format(code), tool.get_expire())
    RedisUtils.expire(__redisManager.getRedis(), "big_money-{}".format(code), tool.get_expire())
def reset(code):
    redis = __redisManager.getRedis()
    RedisUtils.set(redis,"big_money-{}".format(code), 0)
    RedisUtils.set(__redisManager.getRedis(), "big_money-{}".format(code), 0)
def get_num(code):
    redis = __redisManager.getRedis()
    num = RedisUtils.get(redis, "big_money-{}".format(code))
    num = RedisUtils.get(__redisManager.getRedis(), "big_money-{}".format(code))
    if num is None:
        return 0
    return round(int(num) / 1000 / 4)
@@ -44,9 +40,12 @@
def reset_all():
    redis = __redisManager.getRedis()
    keys = RedisUtils.keys(redis, "big_money-*")
    for k in keys:
        RedisUtils.setex(redis, k, tool.get_expire(), 0)
    try:
        keys = RedisUtils.keys(redis, "big_money-*", auto_free=False)
        for k in keys:
            RedisUtils.setex(redis, k, tool.get_expire(), 0, auto_free=False)
    finally:
        redis.connection_pool.disconnect()
if __name__ == "__main__":
code_attribute/code_data_util.py
@@ -31,15 +31,13 @@
class ZYLTGBUtil:
    @classmethod
    def save(cls, code, val, unit):
        redis = _redisManager.getRedis()
        RedisUtils.setex(redis,"zyltgb-{}".format(code), tool.get_expire(),
                    round(float(val) * 100000000) if int(unit) == 0 else round(
                        float(val) * 10000))
        RedisUtils.setex(_redisManager.getRedis(), "zyltgb-{}".format(code), tool.get_expire(),
                         round(float(val) * 100000000) if int(unit) == 0 else round(
                             float(val) * 10000))
    @classmethod
    def get(cls, code):
        redis = _redisManager.getRedis()
        val = RedisUtils.get(redis, "zyltgb-{}".format(code))
        val = RedisUtils.get(_redisManager.getRedis(), "zyltgb-{}".format(code))
        if val is not None:
            return int(val)
        return None
@@ -67,7 +65,4 @@
if __name__ == "__main__":
    redis = _redisManager.getRedis()
    keys = RedisUtils.keys(redis, "zyltgb-*")
    for key in keys:
        RedisUtils.delete(redis, key)
    pass
code_attribute/code_volumn_manager.py
@@ -20,8 +20,11 @@
    redis = __redis_manager.getRedis()
    global_util.max60_volumn[code] = (max60, max60_day)
    global_util.yesterday_volumn[code] = yesterday
    RedisUtils.setex(redis, "volumn_max60-{}".format(code), tool.get_expire(), json.dumps((max60, max60_day)))
    RedisUtils.setex(redis, "volumn_yes-{}".format(code), tool.get_expire(), yesterday)
    try:
        RedisUtils.setex(redis, "volumn_max60-{}".format(code), tool.get_expire(), json.dumps((max60, max60_day)), auto_free=False)
        RedisUtils.setex(redis, "volumn_yes-{}".format(code), tool.get_expire(), yesterday, auto_free=False)
    finally:
        redis.connection_pool.disconnect()
# 获取历史量
@@ -29,13 +32,16 @@
    max60 = global_util.max60_volumn.get(code)
    yesterday = global_util.yesterday_volumn.get(code)
    redis = __redis_manager.getRedis()
    if max60 is None:
        max60 = RedisUtils.get(redis, "volumn_max60-{}".format(code))
        if max60:
            max60 = json.loads(max60)
    if yesterday is None:
        yesterday = RedisUtils.get(redis, "volumn_yes-{}".format(code))
    return max60, yesterday
    try:
        if max60 is None:
            max60 = RedisUtils.get(redis, "volumn_max60-{}".format(code), auto_free=False)
            if max60:
                max60 = json.loads(max60)
        if yesterday is None:
            yesterday = RedisUtils.get(redis, "volumn_yes-{}".format(code), auto_free=False)
        return max60, yesterday
    finally:
        redis.connection_pool.disconnect()
__today_volumn_cache = {}
@@ -51,16 +57,14 @@
    if code in __today_volumn_cache and volumn - __today_volumn_cache[code] < 100000:
        return
    __today_volumn_cache[code] = volumn
    redis = __redis_manager.getRedis()
    RedisUtils.setex(redis, "volumn_today-{}".format(code), tool.get_expire(), volumn)
    RedisUtils.setex( __redis_manager.getRedis(), "volumn_today-{}".format(code), tool.get_expire(), volumn)
# 获取今日量
def get_today_volumn(code):
    _volumn = global_util.today_volumn.get(code)
    if _volumn is None:
        redis = __redis_manager.getRedis()
        _volumn = RedisUtils.get(redis, "volumn_today-{}".format(code))
        _volumn = RedisUtils.get(__redis_manager.getRedis(), "volumn_today-{}".format(code))
    return _volumn
@@ -101,19 +105,22 @@
# 将量从数据库加入内存
def load():
    redis = __redis_manager.getRedis()
    keys = RedisUtils.keys(redis, "volumn_max60-*")
    if keys is not None:
        for k in keys:
            code = k.split("-")[1]
            max60_volumn = RedisUtils.get(redis, k)
            if max60_volumn:
                max60_volumn = json.loads(max60_volumn)
            global_util.max60_volumn[code] = max60_volumn
    keys = RedisUtils.keys(redis, "volumn_yes-*")
    if keys is not None:
        for k in keys:
            code = k.split("-")[1]
            global_util.yesterday_volumn[code] = RedisUtils.get(redis, k)
    try:
        keys = RedisUtils.keys(redis, "volumn_max60-*", auto_free=False)
        if keys is not None:
            for k in keys:
                code = k.split("-")[1]
                max60_volumn = RedisUtils.get(redis, k, auto_free=False)
                if max60_volumn:
                    max60_volumn = json.loads(max60_volumn)
                global_util.max60_volumn[code] = max60_volumn
        keys = RedisUtils.keys(redis, "volumn_yes-*", auto_free=False)
        if keys is not None:
            for k in keys:
                code = k.split("-")[1]
                global_util.yesterday_volumn[code] = RedisUtils.get(redis, k, auto_free=False)
    finally:
        redis.connection_pool.disconnect()
if __name__ == "__main__":
code_attribute/gpcode_first_screen_manager.py
@@ -50,22 +50,27 @@
# 添加进首板未筛选票
def __add_first_no_screen_codes(codes):
    redis = __redisManager.getRedis()
    if codes:
        for code in codes:
            RedisUtils.sadd(redis, "first_no_screen_codes", code)
        RedisUtils.expire(redis, "first_no_screen_codes", tool.get_expire())
    try:
        if codes:
            for code in codes:
                RedisUtils.sadd(redis, "first_no_screen_codes", code, auto_free=False)
            RedisUtils.expire(redis, "first_no_screen_codes", tool.get_expire(), auto_free=False)
    finally:
        redis.connection_pool.disconnect()
def clear_first_no_screen_codes():
    redis = __redisManager.getRedis()
    RedisUtils.delete(redis, "first_no_screen_codes")
    RedisUtils.delete(__redisManager.getRedis(), "first_no_screen_codes")
def __remove_first_no_screen_codes(codes):
    redis = __redisManager.getRedis()
    if codes:
        for code in codes:
            RedisUtils.srem(redis, "first_no_screen_codes", code)
    try:
        if codes:
            for code in codes:
                RedisUtils.srem(redis, "first_no_screen_codes", code, auto_free= False)
    finally:
        redis.connection_pool.disconnect()
def __get_first_no_screen_codes():
code_attribute/gpcode_manager.py
@@ -291,12 +291,15 @@
def set_gp_list(code_datas):
    codes, name_codes = __parse_codes_data(code_datas)
    redis_instance = __redisManager.getRedis()
    # 删除之前的
    RedisUtils.delete(redis_instance, "gp_list")
    CodesNameManager.clear_second_code_names()
    for d in codes:
        RedisUtils.sadd(redis_instance, "gp_list", d)
    CodesNameManager.set_second_code_names(name_codes)
    try:
        # 删除之前的
        RedisUtils.delete(redis_instance, "gp_list", auto_free=False)
        CodesNameManager.clear_second_code_names()
        for d in codes:
            RedisUtils.sadd(redis_instance, "gp_list", d, auto_free=False)
        CodesNameManager.set_second_code_names(name_codes)
    finally:
        redis_instance.connection_pool.disconnect()
# 新增代码
@@ -304,16 +307,19 @@
    if len(code_datas) > 200:
        raise Exception("不能超过200个数据")
    redis_instance = __redisManager.getRedis()
    codes, name_codes = __parse_codes_data(code_datas)
    for d in codes:
        RedisUtils.sadd(redis_instance, "gp_list", d)
    old_name_codes = CodesNameManager.list_second_code_name_dict()
    if old_name_codes is None:
        old_name_codes = name_codes
    else:
        for key in name_codes:
            old_name_codes[key] = name_codes[key]
    CodesNameManager.set_second_code_names(old_name_codes)
    try:
        codes, name_codes = __parse_codes_data(code_datas)
        for d in codes:
            RedisUtils.sadd(redis_instance, "gp_list", d, auto_free=False)
        old_name_codes = CodesNameManager.list_second_code_name_dict()
        if old_name_codes is None:
            old_name_codes = name_codes
        else:
            for key in name_codes:
                old_name_codes[key] = name_codes[key]
        CodesNameManager.set_second_code_names(old_name_codes)
    finally:
        redis_instance.connection_pool.disconnect()
__gp_list_first_codes_cache = set()
@@ -324,52 +330,56 @@
# code_datas 掘金返回的数据
def set_first_gp_codes_with_data(code_datas):
    redis_instance = __redisManager.getRedis()
    codes, name_codes = __parse_codes_data(code_datas)
    codes_set = set()
    for code in codes:
        codes_set.add(code)
    old_codes_set = __gp_list_first_codes_cache
    if old_codes_set is None:
        old_codes_set = set()
    del_set = old_codes_set - codes_set
    add_codes = codes_set - old_codes_set
    for code in add_codes:
        RedisUtils.sadd(redis_instance, "gp_list_first", code)
    for code in del_set:
        RedisUtils.srem(redis_instance, "gp_list_first", code)
    if add_codes or del_set:
        RedisUtils.expire(redis_instance, "gp_list_first", tool.get_expire())
    # 更新缓存
    __gp_list_first_codes_cache.clear()
    for code in codes_set:
        __gp_list_first_codes_cache.add(code)
    try:
        codes, name_codes = __parse_codes_data(code_datas)
        codes_set = set()
        for code in codes:
            codes_set.add(code)
        old_codes_set = __gp_list_first_codes_cache
        if old_codes_set is None:
            old_codes_set = set()
        del_set = old_codes_set - codes_set
        add_codes = codes_set - old_codes_set
        for code in add_codes:
            RedisUtils.sadd(redis_instance, "gp_list_first", code, auto_free=False)
        for code in del_set:
            RedisUtils.srem(redis_instance, "gp_list_first", code, auto_free=False)
        if add_codes or del_set:
            RedisUtils.expire(redis_instance, "gp_list_first", tool.get_expire(), auto_free=False)
        # 更新缓存
        __gp_list_first_codes_cache.clear()
        for code in codes_set:
            __gp_list_first_codes_cache.add(code)
    old_name_codes = CodesNameManager.list_first_code_name_dict()
    if old_name_codes is None:
        old_name_codes = name_codes
    else:
        for key in name_codes:
            old_name_codes[key] = name_codes[key]
    CodesNameManager.set_first_code_names(old_name_codes)
        old_name_codes = CodesNameManager.list_first_code_name_dict()
        if old_name_codes is None:
            old_name_codes = name_codes
        else:
            for key in name_codes:
                old_name_codes[key] = name_codes[key]
        CodesNameManager.set_first_code_names(old_name_codes)
    finally:
        redis_instance.connection_pool.disconnect()
# 移除首板代码
def remove_first_gp_code(codes):
    redis_instance = __redisManager.getRedis()
    for code in codes:
        RedisUtils.srem(redis_instance, "gp_list_first", code)
    try:
        for code in codes:
            RedisUtils.srem(redis_instance, "gp_list_first", code, auto_free=False)
    finally:
        redis_instance.connection_pool.disconnect()
# 获取首板代码
def get_first_gp_codes():
    redis_instance = __redisManager.getRedis()
    return RedisUtils.smembers(redis_instance, "gp_list_first")
    return RedisUtils.smembers(__redisManager.getRedis(), "gp_list_first")
# 是否在首板里面
def is_in_first_gp_codes(code):
    redis_instance = __redisManager.getRedis()
    return RedisUtils.sismember(redis_instance, "gp_list_first", code)
    return RedisUtils.sismember(__redisManager.getRedis(), "gp_list_first", code)
# 获取名称对应的代码
@@ -409,43 +419,45 @@
        return
    # 获取基本信息
    redis_instance = __redisManager.getRedis()
    # 删除之前的
    RedisUtils.delete(redis_instance, "gp_limit_up_list")
    for d in gpset:
        RedisUtils.sadd(redis_instance, "gp_limit_up_list", json.dumps(d))
    RedisUtils.expire(redis_instance, "gp_limit_up_list", tool.get_expire())
    RedisUtils.setex(redis_instance, "gp_limit_up_list_update_time", tool.get_expire(), round(time.time() * 1000))
    try:
        # 删除之前的
        RedisUtils.delete(redis_instance, "gp_limit_up_list", auto_free=False)
        for d in gpset:
            RedisUtils.sadd(redis_instance, "gp_limit_up_list", json.dumps(d), auto_free=False)
        RedisUtils.expire(redis_instance, "gp_limit_up_list", tool.get_expire(), auto_free=False)
        RedisUtils.setex(redis_instance, "gp_limit_up_list_update_time", tool.get_expire(), round(time.time() * 1000), auto_free=False)
    finally:
        redis_instance.connection_pool.disconnect()
# 获取涨停列表
def get_limit_up_list():
    redis_instance = __redisManager.getRedis()
    return RedisUtils.get(redis_instance, "gp_limit_up_list_update_time"), RedisUtils.smembers(redis_instance,
                                                                                               "gp_limit_up_list")
    try:
        return RedisUtils.get(redis_instance, "gp_limit_up_list_update_time",
                              auto_free=False), RedisUtils.smembers(redis_instance,
                                                                    "gp_limit_up_list", auto_free=False)
    finally:
        redis_instance.connection_pool.disconnect()
def rm_gp(code):
    redis_instance = __redisManager.getRedis()
    RedisUtils.srem(redis_instance, "gp_list", code)
    RedisUtils.srem(__redisManager.getRedis(), "gp_list", code)
    remove_first_gp_code([code])
def is_in_gp_pool(code):
    redis_instance = __redisManager.getRedis()
    return RedisUtils.sismember(redis_instance, "gp_list", code) or is_in_first_gp_codes(code)
    return RedisUtils.sismember(__redisManager.getRedis(), "gp_list", code) or is_in_first_gp_codes(code)
def get_gp_list():
    redis_instance = __redisManager.getRedis()
    codes = RedisUtils.smembers(redis_instance, "gp_list")
    codes = RedisUtils.smembers( __redisManager.getRedis(), "gp_list")
    first_codes = get_first_gp_codes()
    return set.union(codes, first_codes)
# 获取二板代码
def get_second_gp_list():
    redis_instance = __redisManager.getRedis()
    codes = RedisUtils.smembers(redis_instance, "gp_list")
    codes = RedisUtils.smembers(__redisManager.getRedis(), "gp_list")
    return codes
@@ -468,8 +480,7 @@
    # 获取收盘价
    @classmethod
    def get_price_pre(cls, code):
        redis_instance = cls.__redisManager.getRedis()
        result = RedisUtils.get(redis_instance, "price-pre-{}".format(code))
        result = RedisUtils.get(cls.__redisManager.getRedis(), "price-pre-{}".format(code))
        if result is not None:
            return float(result)
        return None
@@ -490,8 +501,7 @@
        codes = get_gp_list()
        if code not in codes and not FirstCodeManager().is_in_first_record_cache(code) and not force:
            return
        redis_instance = cls.__redisManager.getRedis()
        RedisUtils.setex(redis_instance, "price-pre-{}".format(code), tool.get_expire(), str(price))
        RedisUtils.setex( cls.__redisManager.getRedis(), "price-pre-{}".format(code), tool.get_expire(), str(price))
        cls.__price_pre_cache[code] = float(price)
@@ -527,8 +537,7 @@
# 获取现价
def get_price(code):
    redis_instance = __redisManager.getRedis()
    result = RedisUtils.get(redis_instance, "price-{}".format(code))
    result = RedisUtils.get(__redisManager.getRedis(), "price-{}".format(code))
    if result is not None:
        return float(result)
    return None
@@ -542,35 +551,35 @@
    if code in __current_price_cache and __current_price_cache[code] == price:
        return
    __current_price_cache[code] = price
    redis_instance = __redisManager.getRedis()
    RedisUtils.setex(redis_instance, "price-{}".format(code), tool.get_expire(), price)
    RedisUtils.setex(__redisManager.getRedis(), "price-{}".format(code), tool.get_expire(), price)
# 获取正在监听的代码
def get_listen_codes():
    redis_instance = __redisManager.getRedis()
    keys = RedisUtils.keys(redis_instance, "listen_code-*-*")
    codes = set()
    for k in keys:
        code = RedisUtils.get(redis_instance, k)
        if code is not None and len(code) > 0:
            codes.add(code)
    return codes
    try:
        keys = RedisUtils.keys(redis_instance, "listen_code-*-*", auto_free=False)
        codes = set()
        for k in keys:
            code = RedisUtils.get(redis_instance, k, auto_free=False)
            if code is not None and len(code) > 0:
                codes.add(code)
        return codes
    finally:
        redis_instance.connection_pool.disconnect()
# 根据位置获取正在监听的代码
def get_listen_code_by_pos(client_id, pos):
    redis_instance = __redisManager.getRedis()
    key = "listen_code-{}-{}".format(client_id, pos)
    value = RedisUtils.get(redis_instance, key)
    value = RedisUtils.get(__redisManager.getRedis(), key)
    # print("redis:", key,value)
    return value
# 设置位置的监听代码
def set_listen_code_by_pos(client_id, pos, code):
    redis_instance = __redisManager.getRedis()
    RedisUtils.setex(redis_instance, "listen_code-{}-{}".format(client_id, pos), tool.get_expire(), code)
    RedisUtils.setex(__redisManager.getRedis(), "listen_code-{}-{}".format(client_id, pos), tool.get_expire(), code)
    # 同步监听的代码集合
    __sync_listen_codes_pos()
@@ -578,56 +587,66 @@
# 同步监听代码位置信息
def __sync_listen_codes_pos():
    redis_instance = __redisManager.getRedis()
    # 获取已经正在监听的代码
    keys = RedisUtils.keys(redis_instance, "code_listen_pos-*")
    codes_set = set()
    for key in keys:
        codes_set.add(key.replace("code_listen_pos-", ""))
    try:
        # 获取已经正在监听的代码
        keys = RedisUtils.keys(redis_instance, "code_listen_pos-*", auto_free=False)
        codes_set = set()
        for key in keys:
            codes_set.add(key.replace("code_listen_pos-", ""))
    keys = RedisUtils.keys(redis_instance, "listen_code-*-*")
        keys = RedisUtils.keys(redis_instance, "listen_code-*-*", auto_free=False)
    for key in keys:
        result = RedisUtils.get(redis_instance, key)
        if result:
            # 移除需要添加的代码
            codes_set.discard(result)
            client_id_, pos_, code_ = int(key.split("-")[1]), int(key.split("-")[2]), result
            key_ = "code_listen_pos-{}".format(code_)
            val = RedisUtils.get(redis_instance, key_)
            if val is None:
                RedisUtils.setex(redis_instance, key_, tool.get_expire(), json.dumps((client_id_, pos_)))
            else:
                val = json.loads(val)
                if val[0] != client_id_ or val[1] != pos_:
                    RedisUtils.setex(redis_instance, key_, tool.get_expire(), json.dumps((client_id_, pos_)))
        for key in keys:
            result = RedisUtils.get(redis_instance, key, auto_free=False)
            if result:
                # 移除需要添加的代码
                codes_set.discard(result)
                client_id_, pos_, code_ = int(key.split("-")[1]), int(key.split("-")[2]), result
                key_ = "code_listen_pos-{}".format(code_)
                val = RedisUtils.get(redis_instance, key_, auto_free=False)
                if val is None:
                    RedisUtils.setex(redis_instance, key_, tool.get_expire(), json.dumps((client_id_, pos_)),
                                     auto_free=False)
                else:
                    val = json.loads(val)
                    if val[0] != client_id_ or val[1] != pos_:
                        RedisUtils.setex(redis_instance, key_, tool.get_expire(), json.dumps((client_id_, pos_)),
                                         auto_free=False)
    # 移除没有监听的代码
    for code_ in codes_set:
        RedisUtils.delete(redis_instance, code_)
        # 移除没有监听的代码
        for code_ in codes_set:
            RedisUtils.delete(redis_instance, code_, auto_free=False)
    finally:
        redis_instance.connection_pool.disconnect()
# 初始化位置
def init_listen_code_by_pos(client_id, pos):
    redis_instance = __redisManager.getRedis()
    key = "listen_code-{}-{}".format(client_id, pos)
    RedisUtils.setnx(redis_instance, key, "")
    RedisUtils.expire(redis_instance, key, tool.get_expire())
    RedisUtils.setnx(__redisManager.getRedis(), key, "")
    RedisUtils.expire(__redisManager.getRedis(), key, tool.get_expire())
# 清除所有监听代码
def clear_listen_codes():
    redis_instance = __redisManager.getRedis()
    keys = RedisUtils.keys(redis_instance, "listen_code-*-*")
    for key in keys:
        RedisUtils.setex(redis_instance, key, tool.get_expire(), "")
    try:
        keys = RedisUtils.keys(redis_instance, "listen_code-*-*", auto_free=False)
        for key in keys:
            RedisUtils.setex(redis_instance, key, tool.get_expire(), "", auto_free=False)
    finally:
        redis_instance.connection_pool.disconnect()
def clear_first_codes():
    redis_instance = __redisManager.getRedis()
    RedisUtils.delete(redis_instance, "gp_list_first")
    RedisUtils.delete(redis_instance, "gp_list_names_first")
    RedisUtils.delete(redis_instance, "first_code_record")
    RedisUtils.delete(redis_instance, "first_code_limited_up_record")
    try:
        RedisUtils.delete(redis_instance, "gp_list_first", auto_free=False)
        RedisUtils.delete(redis_instance, "gp_list_names_first", auto_free=False)
        RedisUtils.delete(redis_instance, "first_code_record", auto_free=False)
        RedisUtils.delete(redis_instance, "first_code_limited_up_record", auto_free=False)
    finally:
        redis_instance.connection_pool.disconnect()
# 获取可以操作的位置
@@ -642,18 +661,19 @@
    for client_id in client_ids:
        redis_instance = __redisManager.getRedis()
        k = "listen_code-{}-*".format(client_id)
        keys = RedisUtils.keys(redis_instance, k)
        keys = RedisUtils.keys(redis_instance, k, auto_free=False)
        # random.shuffle(keys)
        codes = []
        for key in keys:
            index = key.split("-")[-1]
            if int(index) + 1 > constant.L2_CODE_COUNT_PER_DEVICE:
                continue
            result = RedisUtils.get(redis_instance, key)
            result = RedisUtils.get(redis_instance, key, auto_free=False)
            if result is None or len(result) == 0:
                available_positions.append((client_id, int(key.replace("listen_code-{}-".format(client_id), ""))))
            else:
                codes.append((key, result))
        redis_instance.connection_pool.disconnect()
        # 查询是否有重复的代码
        codes_set = set()
        count = 0
@@ -683,19 +703,21 @@
    free_count = 0
    for client_id in client_ids:
        redis_instance = __redisManager.getRedis()
        k = "listen_code-{}-*".format(client_id)
        keys = RedisUtils.keys(redis_instance, k)
        for key in keys:
            code = RedisUtils.get(redis_instance, key)
            if not code:
                free_count += 1
        try:
            k = "listen_code-{}-*".format(client_id)
            keys = RedisUtils.keys(redis_instance, k, auto_free=False)
            for key in keys:
                code = RedisUtils.get(redis_instance, key, auto_free=False)
                if not code:
                    free_count += 1
        finally:
            redis_instance.connection_pool.disconnect()
    return free_count
# 获取正在监听的代码的位置
def get_listen_code_pos(code):
    redis_instance = __redisManager.getRedis()
    val = RedisUtils.get(redis_instance, "code_listen_pos-{}".format(code))
    val = RedisUtils.get(__redisManager.getRedis(), "code_listen_pos-{}".format(code))
    if val is None:
        return None, None
    val = json.loads(val)
@@ -710,8 +732,7 @@
# 是否正在监听
def is_listen(code):
    redis_instance = __redisManager.getRedis()
    val = RedisUtils.get(redis_instance, "code_listen_pos-{}".format(code))
    val = RedisUtils.get(__redisManager.getRedis(), "code_listen_pos-{}".format(code))
    if val is None:
        return False
    else:
@@ -734,36 +755,33 @@
# 是否正在操作
def is_operate(code):
    redis_instance = __redisManager.getRedis()
    return RedisUtils.get(redis_instance, "gp_operate-{}".format(code)) is not None
    return RedisUtils.get(__redisManager.getRedis(), "gp_operate-{}".format(code)) is not None
# 设置正在操作的代码
def set_operate(code):
    redis_instance = __redisManager.getRedis()
    RedisUtils.setex(
        redis_instance, "gp_operate-{}".format(code), 30, "1")
    RedisUtils.setex( __redisManager.getRedis(), "gp_operate-{}".format(code), 30, "1")
# 批量设置正在操作的代码
def set_operates(codes):
    redis_instance = __redisManager.getRedis()
    for code in codes:
        RedisUtils.setex(
            redis_instance, "gp_operate-{}".format(code), 30, "1")
        RedisUtils.setex(__redisManager.getRedis(), "gp_operate-{}".format(code), 30, "1")
# 移除正在操作的代码
def rm_operate(code):
    redis_instance = __redisManager.getRedis()
    RedisUtils.delete(redis_instance, "gp_operate-{}".format(code))
    RedisUtils.delete(__redisManager.getRedis(), "gp_operate-{}".format(code))
# 批量移除正在操作的代码
def rm_operates(codes):
    redis_instance = __redisManager.getRedis()
    for code in codes:
        RedisUtils.delete(redis_instance, "gp_operate-{}".format(code))
    try:
        for code in codes:
            RedisUtils.delete(redis_instance, "gp_operate-{}".format(code), auto_free=False)
    finally:
        redis_instance.connection_pool.disconnect()
if __name__ == '__main__':
code_attribute/limit_up_time_manager.py
@@ -12,17 +12,15 @@
def save_limit_up_time(code, time):
    _time = get_limit_up_time(code)
    if _time is None:
        redis = _redisManager.getRedis()
        RedisUtils.setex(
            redis, "limit_up_time-{}".format(code), tool.get_expire(), time)
            _redisManager.getRedis(), "limit_up_time-{}".format(code), tool.get_expire(), time)
        global_util.limit_up_time[code] = time
def get_limit_up_time(code):
    time = global_util.limit_up_time.get(code)
    if time is None:
        redis = _redisManager.getRedis()
        time = RedisUtils.get(redis, "limit_up_time-{}".format(code))
        time = RedisUtils.get(_redisManager.getRedis(), "limit_up_time-{}".format(code))
        if time is not None:
            global_util.limit_up_time[code] = time
@@ -31,10 +29,13 @@
def load_limit_up_time():
    redis = _redisManager.getRedis()
    keys = RedisUtils.keys(redis, "limit_up_time-*")
    for key in keys:
        code = key.replace("limit_up_time-", "")
        global_util.limit_up_time[code] = RedisUtils.get(redis, key)
    try:
        keys = RedisUtils.keys(redis, "limit_up_time-*", auto_free=False)
        for key in keys:
            code = key.replace("limit_up_time-", "")
            global_util.limit_up_time[code] = RedisUtils.get(redis, key, auto_free=False)
    finally:
        redis.connection_pool.disconnect()
if __name__ == "__main__":
config/settings.py
@@ -22,8 +22,6 @@
def set_accept_l2_data(val):
    if val:
        RedisUtils.set(
            __get_redis(),"not_accpt_l2_data", 0)
        RedisUtils.set(__get_redis(), "not_accpt_l2_data", 0)
    else:
        RedisUtils.set(
            __get_redis(),"not_accpt_l2_data", 1)
        RedisUtils.set(__get_redis(), "not_accpt_l2_data", 1)
db/redis_manager.py
@@ -19,7 +19,7 @@
            pool = pool_cache[db]
        else:
            pool = redis.ConnectionPool(host=config["host"], port=config["port"], password=config["pwd"],
                                        db=db, decode_responses=True, max_connections=200)
                                        db=db, decode_responses=True, max_connections=50)
            pool_cache[db] = pool
        self.pool = pool
@@ -29,83 +29,148 @@
class RedisUtils:
    @classmethod
    def get(cls, redis_, key):
        logger_redis_debug.info("get:{}",key)
        return redis_.get(key)
    def get(cls, redis_, key, auto_free=True):
        try:
            logger_redis_debug.info("get:{}",key)
            return redis_.get(key)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def scard(cls, redis_, key):
        return redis_.scard(key)
    def scard(cls, redis_, key, auto_free=True):
        try:
            return redis_.scard(key)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def delete(cls, redis_, key):
        logger_redis_debug.info("delete:{}", key)
        return redis_.delete(key)
    def delete(cls, redis_, key, auto_free=True):
        try:
            logger_redis_debug.info("delete:{}", key)
            return redis_.delete(key)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def keys(cls, redis_, key):
        logger_redis_debug.info("keys:{}", key)
        return redis_.keys(key)
    def keys(cls, redis_, key, auto_free=True):
        try:
            logger_redis_debug.info("keys:{}", key)
            return redis_.keys(key)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def set(cls, redis_, key, val):
        logger_redis_debug.info("set:{}", key)
        return redis_.set(key, val)
    def set(cls, redis_, key, val, auto_free=True):
        try:
            logger_redis_debug.info("set:{}", key)
            return redis_.set(key, val)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def setex(cls, redis_, key, expire, val):
    def setex(cls, redis_, key, expire, val, auto_free=True):
        logger_redis_debug.info("setex:{}", key)
        return redis_.setex(key, expire, val)
        try:
            return redis_.setex(key, expire, val)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def setnx(cls, redis_, key, val):
        logger_redis_debug.info("setnx:{}", key)
        return redis_.setnx(key, val)
    def setnx(cls, redis_, key, val, auto_free=True):
        try:
            logger_redis_debug.info("setnx:{}", key)
            return redis_.setnx(key, val)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def expire(cls, redis_, key, expire):
        logger_redis_debug.info("expire:{}", key)
        return redis_.expire(key, expire)
    def expire(cls, redis_, key, expire, auto_free=True):
        try:
            logger_redis_debug.info("expire:{}", key)
            return redis_.expire(key, expire)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def sadd(cls, redis_, key, val):
        logger_redis_debug.info("sadd:{}", key)
        return redis_.sadd(key, val)
    def sadd(cls, redis_, key, val, auto_free=True):
        try:
            logger_redis_debug.info("sadd:{}", key)
            return redis_.sadd(key, val)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def sismember(cls, redis_, key, val):
        logger_redis_debug.info("sismember:{}", key)
        return redis_.sismember(key, val)
    def sismember(cls, redis_, key, val, auto_free=True):
        try:
            logger_redis_debug.info("sismember:{}", key)
            return redis_.sismember(key, val)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def smembers(cls, redis_, key):
        logger_redis_debug.info("smembers:{}", key)
        return redis_.smembers(key)
    def smembers(cls, redis_, key, auto_free=True):
        try:
            logger_redis_debug.info("smembers:{}", key)
            return redis_.smembers(key)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def srem(cls, redis_, key, val):
        logger_redis_debug.info("srem:{}", key)
        return redis_.srem(key, val)
    def srem(cls, redis_, key, val, auto_free=True):
        try:
            logger_redis_debug.info("srem:{}", key)
            return redis_.srem(key, val)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def incrby(cls, redis_, key, num):
        logger_redis_debug.info("incrby:{}", key)
        return redis_.incrby(key, num)
    def incrby(cls, redis_, key, num, auto_free=True):
        try:
            logger_redis_debug.info("incrby:{}", key)
            return redis_.incrby(key, num)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def lpush(cls, redis_, key, val):
        logger_redis_debug.info("lpush:{}", key)
        return redis_.lpush(key, val)
    def lpush(cls, redis_, key, val, auto_free=True):
        try:
            logger_redis_debug.info("lpush:{}", key)
            return redis_.lpush(key, val)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def lpop(cls, redis_, key):
        logger_redis_debug.info("lpop:{}", key)
        return redis_.lpop(key)
    def lpop(cls, redis_, key, auto_free=True):
        try:
            logger_redis_debug.info("lpop:{}", key)
            return redis_.lpop(key)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
    @classmethod
    def rpush(cls, redis_, key, val):
        logger_redis_debug.info("rpush:{}", key)
        return redis_.rpush(key, val)
    def rpush(cls, redis_, key, val, auto_free=True):
        try:
            logger_redis_debug.info("rpush:{}", key)
            return redis_.rpush(key, val)
        finally:
            if auto_free:
                redis_.connection_pool.disconnect()
gui.py
@@ -157,15 +157,18 @@
            text.delete('1.0', END)
            # 验证redis
            redis = redis_manager.RedisManager().getRedis()
            try:
                redis = redis_manager.RedisManager().getRedis()
                redis.set("test", "1")
                RedisUtils.delete(redis, "test")
                RedisUtils.set( redis,"test", "1",auto_free=False)
                RedisUtils.delete(redis, "test", auto_free=False)
                text.insert(END, "redis连接成功!\n")
            except:
                error = "redis连接失败...\n"
                text.insert(END, error)
                _set_error_color(text, 1, error)
            finally:
                redis.connection_pool.disconnect()
            # 验证mongodb
            try:
                counts = mysql_data.Mysqldb().select_one("select count(*) from clients")
@@ -220,8 +223,7 @@
    # 绘制开盘前的数据准备情况
    def __draw_pre_data_check(self, frame):
        def refresh_close_price_data():
            redis = redis_manager.RedisManager(0).getRedis()
            count = len(RedisUtils.keys(redis, "price-pre-*"))
            count = len(RedisUtils.keys(redis_manager.RedisManager(0).getRedis(), "price-pre-*"))
            sv_num.set("获取到收盘价数量:{}".format(count))
        def re_get_close_price():
@@ -942,10 +944,10 @@
        def export_l2_data_origin(code):
            redis = redis_manager.RedisManager(1).getRedis()
            keys = RedisUtils.keys(redis, "big_data-{}-*".format(code))
            try:
                keys = RedisUtils.keys(redis, "big_data-{}-*".format(code), auto_free=False)
                for k in keys:
                    datas = redis.get(k)
                    datas = redis.get(k, auto_free=False)
                    datas = json.loads(datas)
                    _t = k.split("-")[2]
                    k = time.strftime("%Y_%m_%d_%H_%M_%S_", time.localtime(float(_t) / 1000))
@@ -953,6 +955,8 @@
                    data_export_util.export_l2_data_origin(code, datas, k)
            except Exception as e1:
                showerror("导出失败", str(e1))
            finally:
                redis.connection_pool.disconnect()
            showinfo("提示", "导出完成")
inited_data.py
@@ -36,17 +36,23 @@
# 设置账户信息
def setAccountInfo(accountId, strategyId, token):
    redis = redisManager.getRedis()
    RedisUtils.set(redis, "juejin-account-id", accountId)
    RedisUtils.set(redis, "juejin-strategy-id", strategyId)
    RedisUtils.set(redis, "juejin-token", token)
    try:
        RedisUtils.set(redis, "juejin-account-id", accountId, auto_free=False)
        RedisUtils.set(redis, "juejin-strategy-id", strategyId, auto_free=False)
        RedisUtils.set(redis, "juejin-token", token, auto_free=False)
    finally:
        redis.connection_pool.disconnect()
def getAccountInfo():
    redis = redisManager.getRedis()
    account_id = RedisUtils.get(redis, "juejin-account-id")
    strategy_id = RedisUtils.get(redis, "juejin-strategy-id")
    token = RedisUtils.get(redis, "juejin-token")
    return account_id, strategy_id, token
    try:
        account_id = RedisUtils.get(redis, "juejin-account-id", auto_free=False)
        strategy_id = RedisUtils.get(redis, "juejin-strategy-id", auto_free=False)
        token = RedisUtils.get(redis, "juejin-token", auto_free=False)
        return account_id, strategy_id, token
    finally:
        redis.connection_pool.disconnect()
def init_data():
l2/l2_data_manager.py
@@ -42,16 +42,14 @@
    @staticmethod
    def delete_buy_point(code):
        CodeDataCacheUtil.clear_cache(TradePointManager.__buy_compute_index_info_cache, code)
        redis = TradePointManager.__get_redis()
        RedisUtils.delete(redis, "buy_compute_index_info-{}".format(code))
        RedisUtils.delete(TradePointManager.__get_redis(), "buy_compute_index_info-{}".format(code))
    # 获取买入点信息
    # 返回数据为:买入点 累计纯买额 已经计算的数据索引
    @staticmethod
    def get_buy_compute_start_data(code):
        redis = TradePointManager.__get_redis()
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = RedisUtils.get(redis, _key)
        _data_json = RedisUtils.get(TradePointManager.__get_redis(), _key)
        if _data_json is None:
            return None, None, None, 0, 0, [], 0
        _data = json.loads(_data_json)
@@ -74,7 +72,6 @@
    @staticmethod
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count, max_num_sets,
                                   volume_rate):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
        data_ = None
@@ -89,15 +86,14 @@
                     volume_rate)
        CodeDataCacheUtil.set_cache(TradePointManager.__buy_compute_index_info_cache, code, data_)
        RedisUtils.setex(
            redis, _key, expire,
            TradePointManager.__get_redis(), _key, expire,
            json.dumps(data_))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
    @staticmethod
    def get_buy_cancel_single_pos(code):
        redis = TradePointManager.__get_redis()
        info = RedisUtils.get(redis, "buy_cancel_single_pos-{}".format(code))
        info = RedisUtils.get(TradePointManager.__get_redis(), "buy_cancel_single_pos-{}".format(code))
        if info is None:
            return None
        else:
@@ -108,29 +104,25 @@
    @classmethod
    def set_buy_cancel_single_pos(cls, code, index):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        RedisUtils.setex(redis, "buy_cancel_single_pos-{}".format(code), expire, index)
        RedisUtils.setex(TradePointManager.__get_redis(), "buy_cancel_single_pos-{}".format(code), expire, index)
    # 删除买撤点数据
    @classmethod
    def delete_buy_cancel_point(cls, code):
        redis = TradePointManager.__get_redis()
        RedisUtils.delete(redis, "buy_cancel_single_pos-{}".format(code))
        RedisUtils.delete(TradePointManager.__get_redis(), "buy_cancel_single_pos-{}".format(code))
    # 设置买撤纯买额
    @classmethod
    def set_compute_info_for_cancel_buy(cls, code, index, nums):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        RedisUtils.setex(redis, "compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums)))
        RedisUtils.setex(TradePointManager.__get_redis(), "compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, nums)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_compute_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = RedisUtils.get(redis, "compute_info_for_cancel_buy-{}".format(code))
        info = RedisUtils.get(TradePointManager.__get_redis(), "compute_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0
        else:
@@ -139,23 +131,20 @@
    @classmethod
    def delete_compute_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        RedisUtils.delete(redis, "compute_info_for_cancel_buy-{}".format(code))
        RedisUtils.delete(TradePointManager.__get_redis(), "compute_info_for_cancel_buy-{}".format(code))
    # 从买入信号开始设置涨停买与涨停撤的单数
    @classmethod
    def set_count_info_for_cancel_buy(cls, code, index, buy_count, cancel_count):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        RedisUtils.setex(redis, "count_info_for_cancel_buy-{}".format(code), expire,
        RedisUtils.setex(TradePointManager.__get_redis(), "count_info_for_cancel_buy-{}".format(code), expire,
                         json.dumps((index, buy_count, cancel_count)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, buy_count, cancel_count)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_count_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = RedisUtils.get(redis, "count_info_for_cancel_buy-{}".format(code))
        info = RedisUtils.get(TradePointManager.__get_redis(), "count_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0, 0
        else:
@@ -164,18 +153,20 @@
    @classmethod
    def delete_count_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        RedisUtils.delete(redis, "count_info_for_cancel_buy-{}".format(code))
        RedisUtils.delete(TradePointManager.__get_redis(), "count_info_for_cancel_buy-{}".format(code))
# 清除l2数据
def clear_l2_data(code):
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    keys = RedisUtils.keys(redis_l2, "l2-{}-*".format(code))
    for k in keys:
        RedisUtils.delete(redis_l2, k)
    try:
        keys = RedisUtils.keys(redis_l2, "l2-{}-*".format(code), auto_free=False)
        for k in keys:
            RedisUtils.delete(redis_l2, k, auto_free=False)
    RedisUtils.delete(redis_l2, "l2-data-latest-{}".format(code))
        RedisUtils.delete(redis_l2, "l2-data-latest-{}".format(code), auto_free=False)
    finally:
        redis_l2.connection_pool.disconnect()
second_930 = 9 * 3600 + 30 * 60 + 0
@@ -185,33 +176,33 @@
def init_l2_fixed_codes():
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    count = RedisUtils.scard(redis, key)
    if count > 0:
        RedisUtils.delete(redis, key)
    RedisUtils.sadd(redis, key, "000000")
    RedisUtils.expire(redis, key, tool.get_expire())
    try:
        count = RedisUtils.scard(redis, key, auto_free=False)
        if count > 0:
            RedisUtils.delete(redis, key, auto_free=False)
        RedisUtils.sadd(redis, key, "000000", auto_free=False)
        RedisUtils.expire(redis, key, tool.get_expire(), auto_free=False)
    finally:
        redis.connection_pool.disconnect()
# 移除l2固定监控代码
def remove_from_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    RedisUtils.srem(redis, key, code)
    RedisUtils.srem(_redisManager.getRedis(), key, code)
# 添加代码到L2固定监控
def add_to_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    RedisUtils.sadd(redis, key, code)
    RedisUtils.expire(redis, key, tool.get_expire())
    RedisUtils.sadd(_redisManager.getRedis(), key, code)
    RedisUtils.expire(_redisManager.getRedis(), key, tool.get_expire())
# 是否在l2固定监控代码中
def is_in_l2_fixed_codes(code):
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    return RedisUtils.sismember(redis, key, code)
    return RedisUtils.sismember( _redisManager.getRedis(), key, code)
if __name__ == "__main__":
l2/l2_data_manager_new.py
@@ -70,7 +70,7 @@
        if self.__get_begin_pos(code) is None:
            # 保存位置
            key = "m_big_money_begin-{}".format(code)
            RedisUtils.setex( self.__get_redis(), key, tool.get_expire(), index)
            RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), index)
    # 获取计算开始位置
    def __get_begin_pos(self, code):
l2/l2_data_util.py
@@ -34,12 +34,11 @@
def load_l2_data(code, load_latest=True, force=False):
    redis = _redisManager.getRedis()
    # 加载最近的l2数据
    if load_latest:
        if local_latest_datas.get(code) is None or force:
            # 获取最近的数据
            _data = RedisUtils.get(redis, "l2-data-latest-{}".format(code))
            _data = RedisUtils.get(_redisManager.getRedis(), "l2-data-latest-{}".format(code))
            if _data is not None:
                if code in local_latest_datas:
                    local_latest_datas[code] = json.loads(_data)
@@ -118,28 +117,29 @@
    redis_instance = _redisManager.getRedis()
    try:
        if RedisUtils.setnx(redis_instance, "l2-save-{}".format(code), "1") > 0:
        if RedisUtils.setnx(redis_instance, "l2-save-{}".format(code), "1", auto_free=False) > 0:
            # 计算保留的时间
            expire = tool.get_expire()
            i = 0
            for _data in datas:
                i += 1
                key = "l2-" + _data["key"]
                value = RedisUtils.get(redis_instance, key)
                value = RedisUtils.get(redis_instance, key, auto_free=False)
                if value is None:
                    # 新增
                    try:
                        value = {"index": _data["index"], "re": _data["re"]}
                        RedisUtils.setex(redis_instance, key, expire, json.dumps(value))
                        RedisUtils.setex(redis_instance, key, expire, json.dumps(value), auto_free=False)
                    except:
                        logging.error("更正L2数据出错:{} key:{}".format(code, key))
                else:
                    json_value = json.loads(value)
                    if json_value["re"] != _data["re"]:
                        json_value["re"] = _data["re"]
                        RedisUtils.setex(redis_instance, key, expire, json.dumps(json_value))
                        RedisUtils.setex(redis_instance, key, expire, json.dumps(json_value), auto_free=False)
    finally:
        RedisUtils.delete(redis_instance, "l2-save-{}".format(code))
        RedisUtils.delete(redis_instance, "l2-save-{}".format(code), auto_free=False)
        redis_instance.connection_pool.disconnect()
    print("保存新数据用时:", msg, "耗时:{}".format(round(time.time() * 1000) - start_time))
    return datas
@@ -147,13 +147,12 @@
# 保存l2数据
def save_l2_data(code, datas, add_datas):
    redis = _redisManager.getRedis()
    # 只有有新曾数据才需要保存
    if len(add_datas) > 0:
        # 保存最近的数据
        __start_time = round(time.time() * 1000)
        if datas:
            RedisUtils.setex(redis, "l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
            RedisUtils.setex(_redisManager.getRedis(), "l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
            l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "保存最近l2数据用时")
            # 设置进内存
            local_latest_datas[code] = datas
@@ -168,9 +167,8 @@
# 设置最新的l2数据采集的数量
def set_l2_data_latest_count(code, count):
    redis = _redisManager.getRedis()
    key = "latest-l2-count-{}".format(code)
    RedisUtils.setex(redis, key, 2, count)
    RedisUtils.setex(_redisManager.getRedis(), key, 2, count)
    pass
@@ -178,10 +176,9 @@
def get_l2_data_latest_count(code):
    if code is None or len(code) < 1:
        return 0
    redis = _redisManager.getRedis()
    key = "latest-l2-count-{}".format(code)
    result = RedisUtils.get(redis, key)
    result = RedisUtils.get(_redisManager.getRedis(), key)
    if result is None:
        return 0
    else:
l2/safe_count_manager.py
@@ -67,8 +67,7 @@
            RedisUtils.delete(self.__getRedis(), k)
        key = f"latest_place_order_info-{code}"
        RedisUtils.delete(
            self.__getRedis(), key)
        RedisUtils.delete(self.__getRedis(), key)
    # 获取基础的安全笔数
    def __get_base_save_count(self, code, is_first):
l2_data_util.py
@@ -179,8 +179,7 @@
        if abs(get_time_as_seconds(time_str) - get_time_as_seconds(time_)) > 3:
            continue
        if same_time_nums[time_] > 20:
            redis = l2_data_manager._redisManager.getRedis()
            RedisUtils.setex(redis, "big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(),
            RedisUtils.setex(l2_data_manager._redisManager.getRedis(), "big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(),
                             d1)
            break
@@ -189,14 +188,12 @@
# TODO 测试数据
# @async_call
def save_l2_latest_data_number(code, num):
    redis = l2_data_manager._redisManager.getRedis()
    RedisUtils.setex(redis, "l2_latest_data_num-{}".format(code), 3, num)
    RedisUtils.setex(l2_data_manager._redisManager.getRedis(), "l2_latest_data_num-{}".format(code), 3, num)
# 获取最新数据条数
def get_l2_latest_data_number(code):
    redis = l2_data_manager._redisManager.getRedis()
    num =RedisUtils.get(redis, "l2_latest_data_num-{}".format(code))
    num =RedisUtils.get(l2_data_manager._redisManager.getRedis(), "l2_latest_data_num-{}".format(code))
    if num is not None:
        return int(num)
    return None
test/l2_trade_test.py
@@ -28,20 +28,21 @@
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    keys = ["buy1_volumn_latest_info-{}", "m_big_money_begin-{}", "m_big_money_process_index-{}"]
    for k in keys:
        RedisUtils.delete(redis_l2, k.format(code))
        RedisUtils.delete(redis_l2, k.format(code), auto_free=False)
    redis_l2.connection_pool.disconnect()
    l2.l2_data_manager.TradePointManager.delete_buy_point(code)
    big_money_num_manager.reset(code)
    redis_trade = redis_manager.RedisManager(2).getRedis()
    RedisUtils.delete( redis_trade, "trade-state-{}".format(code))
    RedisUtils.delete( redis_manager.RedisManager(2).getRedis(), "trade-state-{}".format(code))
    trade_data_manager.placeordercountmanager.clear_place_order_count(code)
    redis_info = redis_manager.RedisManager(0).getRedis()
    keys = RedisUtils.keys(redis_info, "*{}*".format(code))
    keys = RedisUtils.keys(redis_info, "*{}*".format(code), auto_free=False)
    for k in keys:
        if k.find("pre") is not None:
            continue
        if k.find("zyltgb") is not None:
            continue
        RedisUtils.delete(redis_info, k)
        RedisUtils.delete(redis_info, k, auto_free=False)
    redis_info.connection_pool.disconnect()
    BuyL2SafeCountManager().clear_data(code)
    transaction_progress.TradeBuyQueue().set_traded_index(code, 0)
third_data/history_k_data_util.py
@@ -85,10 +85,14 @@
    @classmethod
    def getJueJinAccountInfo(cls):
        redis = cls.__redisManager.getRedis()
        account_id = RedisUtils.get(redis, "juejin-account-id")
        strategy_id =RedisUtils.get(redis, "juejin-strategy-id")
        token = RedisUtils.get(redis, "juejin-token")
        return account_id, strategy_id, token
        try:
            account_id = RedisUtils.get(redis, "juejin-account-id", auto_free=False)
            strategy_id =RedisUtils.get(redis, "juejin-strategy-id", auto_free=False)
            token = RedisUtils.get(redis, "juejin-token", auto_free=False)
            return account_id, strategy_id, token
        finally:
            redis.connection_pool.disconnect()
    @classmethod
    def get_juejin_code_list_with_prefix(cls, codes):
ths/client_manager.py
@@ -12,8 +12,7 @@
def getValidL2Clients():
    redis = __redisManager.getRedis()
    keys = RedisUtils.keys(redis, "client-active-*")
    keys = RedisUtils.keys(__redisManager.getRedis(), "client-active-*")
    client_ids = []
    for k in keys:
        _id = k.split("client-active-")[1]
@@ -27,8 +26,7 @@
# 获取客户端IP
def getActiveClientIP(client_id):
    redis = __redisManager.getRedis()
    val = RedisUtils.get(redis, "client-active-{}".format(client_id))
    val = RedisUtils.get(__redisManager.getRedis(), "client-active-{}".format(client_id))
    if val is None:
        return None
    val = json.loads(val)
@@ -38,17 +36,14 @@
def saveClientActive(client_id, host, thsDead):
    if client_id <= 0:
        return
    redis = __redisManager.getRedis()
    RedisUtils.setex(redis,"client-active-{}".format(client_id), 10, json.dumps((host, thsDead)))
    RedisUtils.setex( __redisManager.getRedis(),"client-active-{}".format(client_id), 10, json.dumps((host, thsDead)))
    if ths_util:
        ths_util.set_ths_dead_state(client_id, thsDead)
# 获取客户端同花顺状态
def getTHSState(client_id):
    redis = __redisManager.getRedis()
    val = RedisUtils.get(redis, "client-active-{}".format(client_id))
    val = RedisUtils.get(__redisManager.getRedis(), "client-active-{}".format(client_id))
    if val is None:
        return None
    val = json.loads(val)
ths/l2_code_operate.py
@@ -110,7 +110,7 @@
        while True:
            cls.set_read_queue_valid()
            try:
                data =RedisUtils.lpop(redis,"code_operate_queue")
                data = RedisUtils.lpop(redis, "code_operate_queue", auto_free= False)
                # print("读取操作队列", data, redis.llen("code_operate_queue"))
                if data is not None:
                    data = json.loads(data)
@@ -186,10 +186,9 @@
        # 09:25:10之后才能操作
        if int(tool.get_now_time_str().replace(":", "")) < int("092510"):
            return
        redis = self.redis_manager_.getRedis()
        RedisUtils.rpush(redis, "code_operate_queue",
                    json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos,
                                "create_time": round(time.time() * 1000)}))
        RedisUtils.rpush(self.redis_manager_.getRedis(), "code_operate_queue",
                         json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos,
                                     "create_time": round(time.time() * 1000)}))
    def repaire_operate(self, client, pos, code):
        # 如果本来该位置代码为空则不用修复
@@ -210,8 +209,7 @@
            data = {"action": "repairL2Data",
                    "data": {"index": int(pos), "code": code, "min_price": float(min_price),
                             "max_price": float(max_price)}}
            redis = self.redis_manager_.getRedis()
            RedisUtils.rpush(redis, "code_operate_queue", json.dumps(
            RedisUtils.rpush(self.redis_manager_.getRedis(), "code_operate_queue", json.dumps(
                {"type": 3, "code": code, "client": client_id, "data": data, "create_time": round(time.time() * 1000)}))
    # 移除监控
@@ -223,7 +221,7 @@
    # 设置代码操作状态,服务器保存的代码是否与实际设置的代码保持一致
    @classmethod
    def set_operate_code_state(cls, client_id, channel, state):
        RedisUtils.setex(cls.getRedis(),"code-operate_state-{}-{}".format(client_id, channel), 10, state)
        RedisUtils.setex(cls.getRedis(), "code-operate_state-{}-{}".format(client_id, channel), 10, state)
    def get_operate_code_state(self, client_id, channel):
        value = RedisUtils.get(self.getRedis(), "code-operate_state-{}-{}".format(client_id, channel))
@@ -234,13 +232,11 @@
    # 设置读取队列有效
    @classmethod
    def set_read_queue_valid(cls):
        redis = cls.getRedis()
        RedisUtils.setex( redis,"operate_queue_read_state", 20, 1)
        RedisUtils.setex(cls.getRedis(), "operate_queue_read_state", 20, 1)
    @classmethod
    def is_read_queue_valid(cls):
        redis = cls.getRedis()
        return RedisUtils.get(redis, "operate_queue_read_state") is not None
        return RedisUtils.get(cls.getRedis(), "operate_queue_read_state") is not None
# 通过l2代码校验代码位
ths/l2_listen_pos_health_manager.py
@@ -18,7 +18,7 @@
def __set_health_state(client_id, pos, state):
    RedisUtils.setex(__get_redis(),f"l2_pos_health_state-{client_id}-{pos}", tool.get_expire(), state)
    RedisUtils.setex(__get_redis(), f"l2_pos_health_state-{client_id}-{pos}", tool.get_expire(), state)
def __get_health_state(client_id, pos):
ths/ths_util.py
@@ -178,19 +178,21 @@
def set_ths_dead_state(client_id, dead):
    redis = __redisManager.getRedis()
    key = "ths_state_dead_count-{}".format(client_id)
    if not dead:
        RedisUtils.setex(redis, key, tool.get_expire(), 0)
    else:
        RedisUtils.incrby(redis, key, 1)
        RedisUtils.expire(redis, key, tool.get_expire())
    try:
        key = "ths_state_dead_count-{}".format(client_id)
        if not dead:
            RedisUtils.setex(redis, key, tool.get_expire(), 0, auto_free=False)
        else:
            RedisUtils.incrby(redis, key, 1, auto_free=False)
            RedisUtils.expire(redis, key, tool.get_expire(), auto_free=False)
    finally:
        redis.connection_pool.disconnect()
# 同花顺是否卡死
def is_ths_dead(client_id):
    key = "ths_state_dead_count-{}".format(client_id)
    redis = __redisManager.getRedis()
    val = RedisUtils.get(redis, key)
    val = RedisUtils.get( __redisManager.getRedis(), key)
    if val is not None and int(val) >= 5:
        return True
    else:
trade/huaxin/trade_api_server.py
@@ -355,11 +355,11 @@
                            _start_time = time.time()
                            times = []
                            for i in range(0, 100):
                                RedisUtils.sadd(redis, "test_set", f"000000:{i}")
                                RedisUtils.sadd(redis, "test_set", f"000000:{i}",auto_free=False)
                            times.append(time.time() - _start_time)
                            _start_time = time.time()
                            for i in range(0, 20):
                                RedisUtils.smembers(redis, "test_set")
                                RedisUtils.smembers(redis, "test_set",auto_free=False)
                            times.append(time.time() - _start_time)
                            return_str = json.dumps(
                                {"code": 0, "data": times, "msg": ""})
trade/trade_data_manager.py
@@ -61,15 +61,13 @@
    # last_data: 买入点最后一条数据
    @classmethod
    def set_buy_position_info(cls, code, capture_time, trade_time, last_data, last_data_index):
        redis = cls.redisManager.getRedis()
        RedisUtils.setex(redis, "buy_position_info-{}".format(code), tool.get_expire(),
        RedisUtils.setex(cls.redisManager.getRedis(), "buy_position_info-{}".format(code), tool.get_expire(),
                         json.dumps((capture_time, trade_time, last_data, last_data_index)))
    # 获取买入点信息
    @classmethod
    def get_buy_position_info(cls, code):
        redis = cls.redisManager.getRedis()
        val_str = RedisUtils.get(redis, "buy_position_info-{}".format(code))
        val_str = RedisUtils.get( cls.redisManager.getRedis(), "buy_position_info-{}".format(code))
        if val_str is None:
            return None, None, None, None
        else:
@@ -79,16 +77,14 @@
    # 删除买入点信息
    @classmethod
    def remove_buy_position_info(cls, code):
        redis = cls.redisManager.getRedis()
        RedisUtils.delete(redis, "buy_position_info-{}".format(code))
        RedisUtils.delete( cls.redisManager.getRedis(), "buy_position_info-{}".format(code))
    # 设置买入确认点信息
    @classmethod
    def __set_buy_sure_position(cls, code, index, data):
        logger_trade.debug("买入确认点信息: code:{} index:{} data:{}", code, index, data)
        redis = cls.redisManager.getRedis()
        key = "buy_sure_position-{}".format(code)
        RedisUtils.setex(redis, key, tool.get_expire(), json.dumps((index, data)))
        RedisUtils.setex(cls.redisManager.getRedis(), key, tool.get_expire(), json.dumps((index, data)))
        cls.buy_sure_position_dict[code] = (index, data)
        # 移除下单信号的详细信息
        cls.remove_buy_position_info(code)
@@ -96,9 +92,8 @@
    # 清除买入确认点信息
    @classmethod
    def __clear_buy_sure_position(cls, code):
        redis = cls.redisManager.getRedis()
        key = "buy_sure_position-{}".format(code)
        RedisUtils.delete(redis, key)
        RedisUtils.delete(cls.redisManager.getRedis(), key)
        if code in cls.buy_sure_position_dict:
            cls.buy_sure_position_dict.pop(code)
@@ -109,9 +104,8 @@
        if temp is not None:
            return temp[0], temp[1]
        redis = cls.redisManager.getRedis()
        key = "buy_sure_position-{}".format(code)
        val = RedisUtils.get(redis, key)
        val = RedisUtils.get(cls.redisManager.getRedis(), key)
        if val is None:
            return None, None
        else:
trade/trade_manager.py
@@ -142,8 +142,7 @@
# 获取交易状态
def get_trade_state(code):
    redis = __redis_manager.getRedis()
    state = RedisUtils.get(redis, "trade-state-{}".format(code))
    state = RedisUtils.get(__redis_manager.getRedis(), "trade-state-{}".format(code))
    if state is None:
        return TRADE_STATE_NOT_TRADE
    return int(state)
@@ -152,42 +151,45 @@
# 设置交易状态
def set_trade_state(code, state):
    logger_trade.info("set_trade_state {}-{}".format(code, state))
    redis = __redis_manager.getRedis()
    RedisUtils.setex(redis, "trade-state-{}".format(code), tool.get_expire(), state)
    RedisUtils.setex( __redis_manager.getRedis(), "trade-state-{}".format(code), tool.get_expire(), state)
def get_codes_by_trade_state(state):
    redis = __redis_manager.getRedis()
    keys = RedisUtils.keys(redis, "trade-state-*")
    codes = []
    if keys is not None:
        for key in keys:
            if int(redis.get(key)) == state:
                codes.append(key.replace("trade-state-", ''))
    return codes
    try:
        keys = RedisUtils.keys(redis, "trade-state-*", auto_free=False)
        codes = []
        if keys is not None:
            for key in keys:
                if int(RedisUtils.get(redis, key, auto_free=False)) == state:
                    codes.append(key.replace("trade-state-", ''))
        return codes
    finally:
        redis.connection_pool.disconnect()
def get_codes_by_trade_states(states):
    redis = __redis_manager.getRedis()
    keys = RedisUtils.keys(redis, "trade-state-*")
    codes = []
    if keys is not None:
        for key in keys:
            if int(RedisUtils.get(redis, key)) in states:
                codes.append(key.replace("trade-state-", ''))
    return codes
    try:
        keys = RedisUtils.keys(redis, "trade-state-*", auto_free=False)
        codes = []
        if keys is not None:
            for key in keys:
                if int(RedisUtils.get(redis, key, auto_free=False)) in states:
                    codes.append(key.replace("trade-state-", ''))
        return codes
    finally:
        redis.connection_pool.disconnect()
# 设置交易账户的可用金额
def set_available_money(client_id, money):
    redis = __redis_manager.getRedis()
    RedisUtils.set(redis, "trade-account-canuse-money", money)
    RedisUtils.set(__redis_manager.getRedis(), "trade-account-canuse-money", money)
# 获取交易账户的可用金额
def get_available_money():
    redis = __redis_manager.getRedis()
    result = RedisUtils.get(redis, "trade-account-canuse-money")
    result = RedisUtils.get(__redis_manager.getRedis(), "trade-account-canuse-money")
    if result is None:
        return None
    return round(float(result), 2)
@@ -195,9 +197,8 @@
# 保存交易成功的数据
def save_trade_success_data(datas, day=datetime.datetime.now().strftime("%Y%m%d")):
    redis = __redis_manager.getRedis()
    time_str = tool.get_now_time_str()
    RedisUtils.setex(redis, "trade-success-latest-time", tool.get_expire(), time_str)
    RedisUtils.setex(__redis_manager.getRedis(), "trade-success-latest-time", tool.get_expire(), time_str)
    mysqldb = mysql_data.Mysqldb()
    # 合并同一合同编号
    dict_ = {}
@@ -251,14 +252,12 @@
                    data["trade_num"], data["trade_price"], data["type"], data["day"], round(t.time() * 1000)))
    # 保存最新的委托数据
    redis = __redis_manager.getRedis()
    RedisUtils.setex(redis, "trade-delegate-latest", tool.get_expire(), json.dumps(datas))
    RedisUtils.setex(redis, "trade-delegate-latest-time", tool.get_expire(), time_str)
    RedisUtils.setex(__redis_manager.getRedis(), "trade-delegate-latest", tool.get_expire(), json.dumps(datas))
    RedisUtils.setex(__redis_manager.getRedis(), "trade-delegate-latest-time", tool.get_expire(), time_str)
# 获取交易成功数据
def get_trade_success_data():
    redis = __redis_manager.getRedis()
    day = datetime.datetime.now().strftime("%Y%m%d")
    mysqldb = mysql_data.Mysqldb()
    results = mysqldb.select_all("select * from ths_trade_success_record where day='{}'".format(day))
@@ -268,18 +267,21 @@
                "time": result[5], "trade_num": result[6], "type": result[7], "day": result[8],
                "create_time": result[9]}
        datas.append(data)
    return datas, RedisUtils.get(redis, "trade-success-latest-time")
    return datas, RedisUtils.get(__redis_manager.getRedis(), "trade-success-latest-time")
# 获取交易委托数据
def get_trade_delegate_data():
    redis = __redis_manager.getRedis()
    result = RedisUtils.get(redis, "trade-delegate-latest")
    time_str = RedisUtils.get(redis, "trade-delegate-latest-time")
    if result is None:
        return [], time_str
    else:
        return json.loads(result), time_str
    try:
        result = RedisUtils.get(redis, "trade-delegate-latest", auto_free=False)
        time_str = RedisUtils.get(redis, "trade-delegate-latest-time", auto_free=False)
        if result is None:
            return [], time_str
        else:
            return json.loads(result), time_str
    finally:
        redis.connection_pool.disconnect()
# 开始交易
@@ -527,31 +529,38 @@
def __clear_data(code):
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    try:
        keys = RedisUtils.keys(redis_l2, "*{}*".format(code), auto_free=False)
        for k in keys:
            # if (k.find("l2-") is None or k.find("l2-") < 0) and (k.find("big_data-") is None or k.find("big_data-") < 0):
            RedisUtils.delete(redis_l2, k, auto_free=False)
    finally:
        redis_l2.connection_pool.disconnect()
    keys = RedisUtils.keys(redis_l2, "*{}*".format(code))
    for k in keys:
        # if (k.find("l2-") is None or k.find("l2-") < 0) and (k.find("big_data-") is None or k.find("big_data-") < 0):
        RedisUtils.delete(redis_l2, k)
    redis_trade = redis_manager.RedisManager(2).getRedis()
    RedisUtils.delete(redis_trade, "trade-state-{}".format(code))
    RedisUtils.delete(redis_manager.RedisManager(2).getRedis(), "trade-state-{}".format(code))
    redis_info = redis_manager.RedisManager(0).getRedis()
    keys = RedisUtils.keys(redis_info, "*{}*".format(code))
    for k in keys:
        if k.find("pre") is not None:
            continue
        if k.find("zyltgb") is not None:
            continue
    try:
        keys = RedisUtils.keys(redis_info, "*{}*".format(code), auto_free=False)
        for k in keys:
            if k.find("pre") is not None:
                continue
            if k.find("zyltgb") is not None:
                continue
        RedisUtils.delete(redis_info, k)
            RedisUtils.delete(redis_info, k, auto_free=False)
    finally:
        redis_info.connection_pool.disconnect()
def __clear_big_data():
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    keys = RedisUtils.keys(redis_l2, "big_data-*")
    for k in keys:
        RedisUtils.delete(redis_l2, k)
    try:
        keys = RedisUtils.keys(redis_l2, "big_data-*", auto_free=False)
        for k in keys:
            RedisUtils.delete(redis_l2, k, auto_free=False)
    finally:
        redis_l2.connection_pool.disconnect()
if __name__ == "__main__":