# 操作l2的代码 import json import os import queue import threading import gpcode_manager import l2_data_manager import server import tool import trade_manager import time import redis_manager from log import logger_code_operate class L2CodeOperate(object): __instance = None __lock = threading.RLock() redis_manager_ = redis_manager.RedisManager() @classmethod def getRedis(cls): return cls.redis_manager_.getRedis() @classmethod def get_instance(cls, *args, **kwargs): if not hasattr(cls, "ins"): insObject = cls(*args, **kwargs) setattr(cls, "ins", insObject) return getattr(cls, "ins") @staticmethod def setGPCode(client_id, position, gpcode): data = {"action": "setGPCode", "data": {"index": int(position), "code": gpcode}} logger_code_operate.info("setGPCode:clientid-{} position-{} code-{}".format(client_id, position, gpcode)) gpcode_manager.set_operate(gpcode) try: result = server.send_msg(client_id, data) logger_code_operate.info( "setGPCode结束({}):clientid-{} position-{} code-{}".format(result, client_id, position, gpcode)) if result.__contains__('OK'): gpcode_manager.set_listen_code_by_pos(client_id, position, gpcode) except Exception as e: logger_code_operate.error("setGPCode出错:{}", str(e)) finally: gpcode_manager.rm_operate(gpcode) @classmethod def run(cls): cls.__lock.acquire() try: t1 = threading.Thread(target=lambda: L2CodeOperate.send_operate()) # 后台运行 t1.setDaemon(True) t1.start() finally: cls.__lock.release() @staticmethod def send_operate(): redis = L2CodeOperate.getRedis() while True: try: data = redis.lpop("code_operate_queue") print("读取操作队列", data, redis.llen("code_operate_queue")) if data is not None: data = json.loads(data) type, code = data["type"], data["code"] if type == 0: # 是否在固定库 if l2_data_manager.is_in_l2_fixed_codes(code): continue if gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(code): client_id, pos = gpcode_manager.get_listen_code_pos(code) if client_id is not None and pos is not None: L2CodeOperate.setGPCode(client_id, pos, "") elif type == 1: if trade_manager.is_in_forbidden_trade_codes(code): continue if not gpcode_manager.is_listen(code) and not gpcode_manager.is_operate( code) and not gpcode_manager.is_listen_full(): client_id, pos = gpcode_manager.get_can_listen_pos() if pos is not None and client_id is not None: L2CodeOperate.setGPCode(client_id, pos, code) # 强制设置 elif type == 2: client_id = data["client"] pos = data["pos"] state = L2CodeOperate.get_instance().get_operate_code_state(client_id, pos) if state == 1: continue code_ = gpcode_manager.get_listen_code_by_pos(client_id, pos) if code_ == "" or code_ is None: continue logger_code_operate.info("修复代码一致:{}-{}-{}", client, pos, code) L2CodeOperate.setGPCode(client_id, pos, code) # 修复l2的数据错误 elif type == 3: data = data["data"] client = data["client"] server.send_msg(client, json.dumps(data)) else: time.sleep(1) except: print("发送操作异常") def add_operate(self, type, code): redis = self.redis_manager_.getRedis() print("add_operate", type, code) redis.rpush("code_operate_queue", json.dumps({"type": type, "code": code})) def repaire_operate(self, client, pos, code): redis = self.redis_manager_.getRedis() redis.rpush("code_operate_queue", json.dumps({"type": 2, "client": client, "pos": pos, "code": code})) # 修复l2的数据错误 def repaire_l2_data(self, code): logger_code_operate.info("修复单票的L2数据:" + code) client_id, pos = gpcode_manager.get_listen_code_pos(code) if client_id is not None and pos is not None: # 获取涨停价与跌停价 max_price = gpcode_manager.get_limit_up_price(code) min_price = gpcode_manager.get_limit_down_price(code) data = {"action": "repairL2Data", "data": {"index": int(pos), "code": code, "min_price": float(min_price), "max_price": float(max_price)}} redis = self.redis_manager_.getRedis() redis.rpush("code_operate_queue", json.dumps({"type": 3, "client": client_id, "data": data})) # 移除监控 def remove_l2_listen(self, code): # 是否正在监听 if gpcode_manager.is_listen(code): self.add_operate(0, code) # 设置代码操作状态,服务器保存的代码是否与实际设置的代码保持一致 def set_operate_code_state(self, client_id, channel, state): self.getRedis().setex("code-operate_state-{}-{}".format(client_id, channel), tool.get_expire(), state) def get_operate_code_state(self, client_id, channel): value = self.getRedis().get("code-operate_state-{}-{}".format(client_id, channel)) if value is not None: return int(value) return value