Administrator
2023-09-05 8141778cb8935879cca5e9bab89826378022a560
L2代码订阅流程优化
4个文件已修改
16 ■■■■■ 已修改文件
code_attribute/gpcode_manager.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/current_price_process_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_api_server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_data_manager.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/gpcode_manager.py
@@ -13,6 +13,7 @@
from ths import l2_listen_pos_health_manager, client_manager
__redisManager = redis_manager.RedisManager(0)
__db = 0
class CodesNameManager:
@@ -510,7 +511,8 @@
def is_in_gp_pool(code):
    return RedisUtils.sismember(__redisManager.getRedis(), "gp_list", code) or FirstGPCodesManager().is_in_first_gp_codes_cache(code)
    return RedisUtils.sismember(__redisManager.getRedis(), "gp_list",
                                code) or FirstGPCodesManager().is_in_first_gp_codes_cache(code)
def get_gp_list():
@@ -620,14 +622,12 @@
# datas:[(code,price)]
def set_prices(datas):
    pipe = __redisManager.getRedis().pipeline()
    for d in datas:
        code, price = d[0], d[1]
        if code in __current_price_cache and __current_price_cache[code] == price:
            continue
        __current_price_cache[code] = price
        RedisUtils.setex(pipe, "price-{}".format(code), tool.get_expire(), price)
    pipe.execute()
        RedisUtils.setex_async(__db, "price-{}".format(code), tool.get_expire(), price)
# 获取正在监听的代码
trade/current_price_process_manager.py
@@ -27,7 +27,7 @@
    first_codes = gpcode_manager.FirstGPCodesManager().get_first_gp_codes_cache()
    print("总价格代码数量:", len(prices))
    if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS:
    __actualPriceProcessor.save_current_price_codes_count(len(prices))
    # 采集的代码数量不对, 暂时不需要
    # if len(gpcode_manager.get_gp_list()) - len(prices) > 10:
trade/huaxin/trade_api_server.py
@@ -450,7 +450,7 @@
            logging.exception(e)
            logger_l2_codes_subscript.exception(e)
        finally:
            time.sleep(1)
            time.sleep(0.01)
def __read_sync_task(pipe):
trade/trade_data_manager.py
@@ -291,15 +291,13 @@
    # 批量保存
    def __save_current_rates(self, datas):
        # 变化之后才会持久化
        pipe = self.__get_redis().pipeline()
        for d in datas:
            if self.__code_current_rate_latest.get(d[0]) == d[1]:
                continue
            self.__code_current_rate_latest[d[0]] = d[1]
            tool.CodeDataCacheUtil.set_cache(self.__code_current_rate_cache, d[0], d[1])
            key = "code_current_rate-{}".format(d[0])
            RedisUtils.setex(pipe, key, tool.get_expire(), d[1])
        pipe.execute()
            RedisUtils.setex_async(self.__db, key, tool.get_expire(), d[1])
    # 获取当前涨幅
    def __get_current_rate(self, code):