# 操作l2的代码 import json import logging import os import queue import threading import data_process import gpcode_manager import l2_data_manager import server import tool import trade_manager import time import redis_manager from log import logger_code_operate class L2CodeOperate(object): __instance = None __lock = threading.RLock() redis_manager_ = redis_manager.RedisManager() @classmethod def getRedis(cls): return cls.redis_manager_.getRedis() @classmethod def get_instance(cls, *args, **kwargs): if not hasattr(cls, "ins"): insObject = cls(*args, **kwargs) setattr(cls, "ins", insObject) return getattr(cls, "ins") @staticmethod def setGPCode(client_id, position, gpcode): data = {"action": "setGPCode", "data": {"index": int(position), "code": gpcode}} logger_code_operate.info("setGPCode:clientid-{} position-{} code-{}".format(client_id, position, gpcode)) gpcode_manager.set_operate(gpcode) try: result = server.send_msg(client_id, data) logger_code_operate.info( "setGPCode结束({}):clientid-{} position-{} code-{}".format(result, client_id, position, gpcode)) jsonData = json.loads(result) if jsonData["code"] == 0: gpcode_manager.set_listen_code_by_pos(client_id, position, gpcode) L2CodeOperate.set_operate_code_state(client_id, position, 1) except Exception as e: logger_code_operate.error("setGPCode出错:{}", str(e)) finally: gpcode_manager.rm_operate(gpcode) @classmethod def run(cls): cls.__lock.acquire() try: t1 = threading.Thread(target=lambda: L2CodeOperate.send_operate()) # 后台运行 t1.setDaemon(True) t1.start() finally: cls.__lock.release() @staticmethod def send_operate(): redis = L2CodeOperate.getRedis() while True: try: data = redis.lpop("code_operate_queue") print("读取操作队列", data, redis.llen("code_operate_queue")) if data is not None: data = json.loads(data) logger_code_operate.info("读取操作队列:{}", data) type, code = data["type"], data["code"] if type == 0: # 是否在固定库 if l2_data_manager.is_in_l2_fixed_codes(code): continue if gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(code): client_id, pos = gpcode_manager.get_listen_code_pos(code) if client_id is not None and pos is not None: L2CodeOperate.setGPCode(client_id, pos, "") elif type == 1: if trade_manager.is_in_forbidden_trade_codes(code): continue if not gpcode_manager.is_listen(code) and not gpcode_manager.is_operate( code) and not gpcode_manager.is_listen_full(): client_id, pos = gpcode_manager.get_can_listen_pos() if pos is not None and client_id is not None: L2CodeOperate.setGPCode(client_id, pos, code) # 强制设置 elif type == 2: client_id = data["client"] pos = data["pos"] state = L2CodeOperate.get_instance().get_operate_code_state(client_id, pos) if state == 1: continue code_ = gpcode_manager.get_listen_code_by_pos(client_id, pos) if code_ == "" or code_ is None: continue logger_code_operate.info("修复代码一致:{}-{}-{}", client_id, pos, code) L2CodeOperate.setGPCode(client_id, pos, code) # 修复l2的数据错误 elif type == 3: if tool.is_trade_time(): client = data["client"] data = data["data"] result = server.send_msg(client, data) print("L2數據修復結果:", result) else: print("非交易时间,放弃修复L2") elif type == 4: # 清理监听位置 client = data["client"] pos = data["pos"] L2CodeOperate.setGPCode(client, pos, "") else: time.sleep(1) except Exception as e: logging.exception(e) print("发送操作异常:", str(e)) def add_operate(self, type, code, msg="", client=None, pos=None): redis = self.redis_manager_.getRedis() redis.rpush("code_operate_queue", json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos})) def repaire_operate(self, client, pos, code): # 如果本来该位置代码为空则不用修复 code_ = gpcode_manager.get_listen_code_by_pos(client, pos) if code_ == "" or code_ is None: return logger_code_operate.info("客户端位置代码修复:client-{},pos-{},code-{}", client, pos, code) self.add_operate(2, code, "", client, pos) # 修复l2的数据错误 def repaire_l2_data(self, code): logger_code_operate.info("修复单票的L2数据:" + code) client_id, pos = gpcode_manager.get_listen_code_pos(code) if client_id is not None and pos is not None: # 获取涨停价与跌停价 max_price = gpcode_manager.get_limit_up_price(code) min_price = gpcode_manager.get_limit_down_price(code) data = {"action": "repairL2Data", "data": {"index": int(pos), "code": code, "min_price": float(min_price), "max_price": float(max_price)}} redis = self.redis_manager_.getRedis() redis.rpush("code_operate_queue", json.dumps({"type": 3, "code": code, "client": client_id, "data": data})) # 移除监控 def remove_l2_listen(self, code, msg): # 是否正在监听 if gpcode_manager.is_listen(code): self.add_operate(0, code, msg=msg) # 设置代码操作状态,服务器保存的代码是否与实际设置的代码保持一致 @classmethod def set_operate_code_state(cls, client_id, channel, state): cls.getRedis().setex("code-operate_state-{}-{}".format(client_id, channel), 10, state) def get_operate_code_state(self, client_id, channel): value = self.getRedis().get("code-operate_state-{}-{}".format(client_id, channel)) if value is not None: return int(value) return value # 获取客户端正在监听的代码 def get_listen_codes_from_client(client_id): data = {"action": "getL2Codes"} result = server.send_msg(client_id, data) result = json.loads(result) if result["code"] == 0: data = json.loads(result["data"]) codes = data["data"] result_list = {} if codes is not None: for d in codes: result_list[d["index"]] = d["code"] return result_list else: raise Exception("获取客户端监听代码出错:{}".format(result)) # 矫正客户端代码 def correct_client_codes(): client_ids = data_process.getValidL2Clients() for client_id in client_ids: try: index_codes = get_listen_codes_from_client(client_id) for index in range(0, 8): code = gpcode_manager.get_listen_code_by_pos(client_id, index) if code is not None and len(code) > 0 and index_codes.get(index) != code: # 交易时间才修复代码 if tool.is_trade_time(): L2CodeOperate().repaire_operate(client_id, index, code) elif code is None or len(code) == 0 and index_codes.get(index) is not None: # 删除前端代码位 # L2CodeOperate().add_operate(4, "", client_id, index) pass except Exception as e: logger_code_operate.error("client:{} msg:{}".format(client_id, str(e))) if __name__ == "__main__": correct_client_codes()