| | |
| | | from ths import l2_listen_pos_health_manager, client_manager |
| | | |
| | | __redisManager = redis_manager.RedisManager(0) |
| | | __db = 0 |
| | | |
| | | |
| | | class CodesNameManager: |
| | |
| | | |
| | | |
| | | def is_in_gp_pool(code): |
| | | return RedisUtils.sismember(__redisManager.getRedis(), "gp_list", code) or FirstGPCodesManager().is_in_first_gp_codes_cache(code) |
| | | return RedisUtils.sismember(__redisManager.getRedis(), "gp_list", |
| | | code) or FirstGPCodesManager().is_in_first_gp_codes_cache(code) |
| | | |
| | | |
| | | def get_gp_list(): |
| | |
| | | |
| | | # datas:[(code,price)] |
| | | def set_prices(datas): |
| | | pipe = __redisManager.getRedis().pipeline() |
| | | for d in datas: |
| | | code, price = d[0], d[1] |
| | | if code in __current_price_cache and __current_price_cache[code] == price: |
| | | continue |
| | | __current_price_cache[code] = price |
| | | RedisUtils.setex(pipe, "price-{}".format(code), tool.get_expire(), price) |
| | | pipe.execute() |
| | | RedisUtils.setex_async(__db, "price-{}".format(code), tool.get_expire(), price) |
| | | |
| | | |
| | | # 获取正在监听的代码 |
| | |
| | | first_codes = gpcode_manager.FirstGPCodesManager().get_first_gp_codes_cache() |
| | | |
| | | print("总价格代码数量:", len(prices)) |
| | | |
| | | __actualPriceProcessor.save_current_price_codes_count(len(prices)) |
| | | if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_THS: |
| | | __actualPriceProcessor.save_current_price_codes_count(len(prices)) |
| | | # 采集的代码数量不对, 暂时不需要 |
| | | # if len(gpcode_manager.get_gp_list()) - len(prices) > 10: |
| | | # logger_l2_codes_subscript.info("采集到的代码数量不正确:{}", len(prices)) |
| | |
| | | logging.exception(e) |
| | | logger_l2_codes_subscript.exception(e) |
| | | finally: |
| | | time.sleep(1) |
| | | time.sleep(0.01) |
| | | |
| | | |
| | | def __read_sync_task(pipe): |
| | |
| | | # 批量保存 |
| | | def __save_current_rates(self, datas): |
| | | # 变化之后才会持久化 |
| | | pipe = self.__get_redis().pipeline() |
| | | for d in datas: |
| | | if self.__code_current_rate_latest.get(d[0]) == d[1]: |
| | | continue |
| | | self.__code_current_rate_latest[d[0]] = d[1] |
| | | tool.CodeDataCacheUtil.set_cache(self.__code_current_rate_cache, d[0], d[1]) |
| | | key = "code_current_rate-{}".format(d[0]) |
| | | RedisUtils.setex(pipe, key, tool.get_expire(), d[1]) |
| | | pipe.execute() |
| | | RedisUtils.setex_async(self.__db, key, tool.get_expire(), d[1]) |
| | | |
| | | # 获取当前涨幅 |
| | | def __get_current_rate(self, code): |