""" 客户端管理,用于向客户端发送指令 """ # 操作l2的代码 import json import logging import threading import client_manager import gpcode_manager from l2 import l2_data_manager from trade import l2_trade_util import server import tool import time from db import redis_manager from log import logger_code_operate __reset_code_dict = {} __set_operate_code_state_dict = {} class L2CodeOperate(object): __instance = None __lock = threading.RLock() redis_manager_ = redis_manager.RedisManager() @classmethod def getRedis(cls): return cls.redis_manager_.getRedis() @classmethod def get_instance(cls, *args, **kwargs): if not hasattr(cls, "ins"): insObject = cls(*args, **kwargs) setattr(cls, "ins", insObject) return getattr(cls, "ins") @staticmethod def setGPCode(client_id, position, gpcode): data = {"action": "setGPCode", "data": {"index": int(position), "code": gpcode}} logger_code_operate.info("setGPCode:clientid-{} position-{} code-{}".format(client_id, position, gpcode)) gpcode_manager.set_operate(gpcode) try: result = server.send_msg(client_id, data) logger_code_operate.info( "setGPCode结束({}):clientid-{} position-{} code-{}".format(result, client_id, position, gpcode)) jsonData = json.loads(result) if jsonData["code"] == 0: gpcode_manager.set_listen_code_by_pos(client_id, position, gpcode) L2CodeOperate.set_operate_code_state(client_id, position, 1) except Exception as e: logging.exception(e) logger_code_operate.error("setGPCode出错:{}", str(e)) finally: gpcode_manager.rm_operate(gpcode) @staticmethod def betchSetGPCode(client_id, codes_info): # codes_info 格式[(0,"000333")] datas = [] for info in codes_info: datas.append({"index": info[0], "code": info[1]}) data = {"action": "betchSetGPCodes", "data": datas, "force": True} logger_code_operate.info("betchSetGPCodes:clientid-{} info-{}".format(client_id, codes_info)) codes = [] for item in codes_info: codes.append(item[1]) gpcode_manager.set_operates(codes) try: result = server.send_msg(client_id, data) logger_code_operate.info( "betchSetGPCodes结束({}):clientid-{} info-{}".format(result, client_id, codes_info)) jsonData = json.loads(result) if jsonData["code"] == 0: for item in codes_info: gpcode_manager.set_listen_code_by_pos(client_id, item[0], item[1]) L2CodeOperate.set_operate_code_state(client_id, item[0], 1) return True else: return False except Exception as e: logging.exception(e) logger_code_operate.error("setGPCode出错:{}", str(e)) finally: gpcode_manager.rm_operates(codes) return False @classmethod def run(cls): cls.__lock.acquire() try: t1 = threading.Thread(target=lambda: L2CodeOperate.send_operate()) # 后台运行 t1.setDaemon(True) t1.start() finally: cls.__lock.release() @classmethod def send_operate(cls): redis = L2CodeOperate.getRedis() while True: cls.set_read_queue_valid() try: data = redis.lpop("code_operate_queue") # print("读取操作队列", data, redis.llen("code_operate_queue")) if data is not None: data = json.loads(data) # logger_code_operate.info("读取操作队列:{}", data) type, code = data["type"], data.get("code") create_time = data.get("create_time") if create_time is not None: # 设置10s超时时间 if round(time.time() * 1000) - create_time > 15 * 1000: # logger_code_operate.debug("读取操作超时:{}", data) continue if type == 0: # 是否在固定库 if l2_data_manager.is_in_l2_fixed_codes(code): continue if gpcode_manager.is_listen_old(code) and not gpcode_manager.is_operate(code): client_id, pos = gpcode_manager.get_listen_code_pos(code) if client_id is not None and pos is not None: L2CodeOperate.setGPCode(client_id, pos, "") elif type == 1: if l2_trade_util.is_in_forbidden_trade_codes(code): continue if not gpcode_manager.is_listen_old(code) and not gpcode_manager.is_operate( code) and not gpcode_manager.is_listen_full(): client_id, pos = gpcode_manager.get_can_listen_pos() if pos is not None and client_id is not None: L2CodeOperate.setGPCode(client_id, pos, code) elif type == 10: # 批量设置代码,通常在9:25-9:27期间设置 client_id = data.get("client_id") codes = data[codes] # TODO 需要完善分配 # 强制设置 elif type == 2: client_id = data["client"] pos = data["pos"] state = L2CodeOperate.get_instance().get_operate_code_state(client_id, pos) if state == 1: continue code_ = gpcode_manager.get_listen_code_by_pos(client_id, pos) if code_ == "" or code_ is None: continue logger_code_operate.info("修复代码一致:{}-{}-{}", client_id, pos, code) L2CodeOperate.setGPCode(client_id, pos, code) # 修复l2的数据错误 elif type == 3: if tool.is_repaire_time(): client = data["client"] data = data["data"] result = server.send_msg(client, data) print("L2數據修復結果:", result) else: print("非修复时间,放弃修复L2") elif type == 4: # 清理监听位置 client = data["client"] pos = data["pos"] L2CodeOperate.setGPCode(client, pos, "") else: time.sleep(1) except Exception as e: logging.exception(e) print("发送操作异常:", str(e)) def add_operate(self, type, code, msg="", client=None, pos=None): # 09:25:10之后才能操作 if int(tool.get_now_time_str().replace(":", "")) < int("092510"): return redis = self.redis_manager_.getRedis() redis.rpush("code_operate_queue", json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos, "create_time": round(time.time() * 1000)})) def repaire_operate(self, client, pos, code): # 如果本来该位置代码为空则不用修复 code_ = gpcode_manager.get_listen_code_by_pos(client, pos) if code_ == "" or code_ is None: return logger_code_operate.info("客户端位置代码修复:client-{},pos-{},code-{}", client, pos, code) self.add_operate(2, code, "", client, pos) # 修复l2的数据错误 def repaire_l2_data(self, code): logger_code_operate.info("修复单票的L2数据:" + code) client_id, pos = gpcode_manager.get_listen_code_pos(code) if client_id is not None and pos is not None: # 获取涨停价与跌停价 max_price = gpcode_manager.get_limit_up_price(code) min_price = gpcode_manager.get_limit_down_price(code) data = {"action": "repairL2Data", "data": {"index": int(pos), "code": code, "min_price": float(min_price), "max_price": float(max_price)}} redis = self.redis_manager_.getRedis() redis.rpush("code_operate_queue", json.dumps( {"type": 3, "code": code, "client": client_id, "data": data, "create_time": round(time.time() * 1000)})) # 移除监控 def remove_l2_listen(self, code, msg): # 是否正在监听 if gpcode_manager.is_listen_old(code): self.add_operate(0, code, msg=msg) # 设置代码操作状态,服务器保存的代码是否与实际设置的代码保持一致 @classmethod def set_operate_code_state(cls, client_id, channel, state): cls.getRedis().setex("code-operate_state-{}-{}".format(client_id, channel), 10, state) def get_operate_code_state(self, client_id, channel): value = self.getRedis().get("code-operate_state-{}-{}".format(client_id, channel)) if value is not None: return int(value) return value # 设置读取队列有效 @classmethod def set_read_queue_valid(cls): redis = cls.getRedis() redis.setex("operate_queue_read_state", 20, 1) @classmethod def is_read_queue_valid(cls): redis = cls.getRedis() return redis.get("operate_queue_read_state") is not None # 通过l2代码校验代码位 @tool.async_call def verify_with_l2_data_pos_info(code, client, channel): code_ = gpcode_manager.get_listen_code_by_pos(client, channel) if code_ != code: key = "{}-{}-{}".format(client, channel, code) # 间隔2s if key not in __reset_code_dict or round( time.time() * 1000) - __reset_code_dict[key] > 2000: L2CodeOperate.set_operate_code_state(client, channel, 0) __reset_code_dict[key] = round(time.time() * 1000) if code_ is None: code_ = "" if tool.is_repaire_time(): L2CodeOperate().repaire_operate(int(client), int(channel), code_) else: key = "{}-{}".format(client, channel) if key not in __set_operate_code_state_dict or round( time.time() * 1000) - __set_operate_code_state_dict[key] > 1000: __set_operate_code_state_dict[key] = round(time.time() * 1000) L2CodeOperate.set_operate_code_state(client, channel, 1) # 获取客户端正在监听的代码 def get_listen_codes_from_client(client_id): data = {"action": "getL2Codes"} result = server.send_msg(client_id, data) result = json.loads(result) if result["code"] == 0: data = json.loads(result["data"]) codes = data["data"] result_list = {} if codes is not None: for d in codes: result_list[d["index"]] = d["code"] return result_list else: raise Exception("获取客户端监听代码出错:{}".format(result)) # 矫正客户端代码 def correct_client_codes(): client_ids = client_manager.getValidL2Clients() for client_id in client_ids: try: index_codes = get_listen_codes_from_client(client_id) for index in range(0, 8): code = gpcode_manager.get_listen_code_by_pos(client_id, index) if code is not None and len(code) > 0 and index_codes.get(index) != code: # 修复时间才修复代码 if tool.is_repaire_time(): L2CodeOperate().repaire_operate(client_id, index, code) elif code is None or len(code) == 0 and index_codes.get(index) is not None: # 删除前端代码位 # L2CodeOperate().add_operate(4, "", client_id, index) pass except Exception as e: logger_code_operate.error("client:{} msg:{}".format(client_id, str(e))) # 批量设置代码 def betch_set_client_codes(client_id, codes_info): # 获取涨幅前16位代码 return L2CodeOperate.betchSetGPCode(client_id, codes_info) if __name__ == "__main__": codes = [(0, "000615"), (1, "002264"), (2, "600225"), (3, "002495"), (4, "600572"), (5, "002279"), (6, "002591"), (7, "002880")] L2CodeOperate.betchSetGPCode(3, codes)