| | |
| | | data = json.loads(data) |
| | | logger_code_operate.info("读取操作队列:{}", data) |
| | | type, code = data["type"], data["code"] |
| | | create_time = data.get("create_time") |
| | | if create_time is not None: |
| | | # 设置10s超时时间 |
| | | if round(time.time() * 1000) - create_time > 20 * 1000: |
| | | logger_code_operate.debug("读取操作超时:{}", data) |
| | | continue |
| | | |
| | | if type == 0: |
| | | # 是否在固定库 |
| | |
| | | def add_operate(self, type, code, msg="", client=None, pos=None): |
| | | redis = self.redis_manager_.getRedis() |
| | | redis.rpush("code_operate_queue", |
| | | json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos})) |
| | | json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos, |
| | | "create_time": round(time.time() * 1000)})) |
| | | |
| | | def repaire_operate(self, client, pos, code): |
| | | # 如果本来该位置代码为空则不用修复 |
| | |
| | | "data": {"index": int(pos), "code": code, "min_price": float(min_price), |
| | | "max_price": float(max_price)}} |
| | | redis = self.redis_manager_.getRedis() |
| | | redis.rpush("code_operate_queue", json.dumps({"type": 3, "code": code, "client": client_id, "data": data})) |
| | | redis.rpush("code_operate_queue", json.dumps({"type": 3, "code": code, "client": client_id, "data": data, "create_time": round(time.time() * 1000)})) |
| | | |
| | | # 移除监控 |
| | | def remove_l2_listen(self, code, msg): |