| | |
| | | finally: |
| | | cls.__lock.release() |
| | | |
| | | @staticmethod |
| | | def send_operate(): |
| | | @classmethod |
| | | def send_operate(cls): |
| | | redis = L2CodeOperate.getRedis() |
| | | while True: |
| | | cls.set_read_queue_valid() |
| | | try: |
| | | data = redis.lpop("code_operate_queue") |
| | | # print("读取操作队列", data, redis.llen("code_operate_queue")) |
| | |
| | | "data": {"index": int(pos), "code": code, "min_price": float(min_price), |
| | | "max_price": float(max_price)}} |
| | | redis = self.redis_manager_.getRedis() |
| | | redis.rpush("code_operate_queue", json.dumps({"type": 3, "code": code, "client": client_id, "data": data, "create_time": round(time.time() * 1000)})) |
| | | redis.rpush("code_operate_queue", json.dumps( |
| | | {"type": 3, "code": code, "client": client_id, "data": data, "create_time": round(time.time() * 1000)})) |
| | | |
| | | # 移除监控 |
| | | def remove_l2_listen(self, code, msg): |
| | |
| | | return int(value) |
| | | return value |
| | | |
| | | # 设置读取队列有效 |
| | | @classmethod |
| | | def set_read_queue_valid(cls): |
| | | redis = cls.getRedis() |
| | | redis.setex("operate_queue_read_state", 20, 1) |
| | | |
| | | @classmethod |
| | | def is_read_queue_valid(cls): |
| | | redis = cls.getRedis() |
| | | return redis.get("operate_queue_read_state") is not None |
| | | |
| | | |
| | | |
| | | # 获取客户端正在监听的代码 |
| | | def get_listen_codes_from_client(client_id): |