import json import logging import socketserver import socket import threading import time import data_process import gpcode_manager import authority import juejin import l2_data_manager import tool import trade_manager import l2_code_operate from log import logger_l2_error, logger_l2_process, logger_device, logger_trade_delegate class MyTCPServer(socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe=None): self.pipe = pipe # 增加的参数 socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate) # 如果使用异步的形式则需要再重写ThreadingTCPServer class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass class MyBaseRequestHandle(socketserver.BaseRequestHandler): reset_code_dict = {} set_operate_code_state_dict = {} l2_data_error_dict = {} last_trade_delegate_data = None def setup(self): super().setup() # 可以不调用父类的setup()方法,父类的setup方法什么都没做 # print("----setup方法被执行-----") # print("打印传入的参数:", self.server.pipe) self.l2CodeOperate = l2_code_operate.L2CodeOperate.get_instance() def handle(self): host = self.client_address[0] super().handle() # 可以不调用父类的handler(),方法,父类的handler方法什么都没做 # print("-------handler方法被执行----") # print(self.server) # print(self.request) # 服务 # print("客户端地址:", self.client_address) # 客户端地址 # print(self.__dict__) # print("- " * 30) # print(self.server.__dict__) # print("- " * 30) sk: socket.socket = self.request while True: data = sk.recv(102400) if len(data) == 0: # print("客户端断开连接") break; _str = data.decode() if len(_str) > 0: # print("结果:",_str) type = data_process.parseType(_str) return_str = "OK" if type == 0: try: __start_time = round(time.time() * 1000) # level2盘口数据 day, client, channel, code, datas = l2_data_manager.parseL2Data(_str) cid, pid = gpcode_manager.get_listen_code_pos(code) # 判断目标代码位置是否与上传数据位置一致 if cid is not None and pid is not None and client == int(cid) and channel == int(pid): try: # print("L2数据接受",day,code,len(datas)) # 查询 code_ = gpcode_manager.get_listen_code_by_pos(client, channel) if code_ != code: key = "{}-{}-{}".format(client, channel, code) # 间隔2s if key not in self.reset_code_dict or round( time.time() * 1000) - self.reset_code_dict[key] > 2000: self.l2CodeOperate.set_operate_code_state(client, channel, 0) self.reset_code_dict[key] = round(time.time() * 1000) if code_ is None: code_ = "" if tool.is_trade_time(): self.l2CodeOperate.repaire_operate(int(client), int(channel), code_) else: key = "{}-{}".format(client, channel) if key not in self.set_operate_code_state_dict or round( time.time() * 1000) - self.set_operate_code_state_dict[key] > 1000: self.set_operate_code_state_dict[key] = round(time.time() * 1000) self.l2CodeOperate.set_operate_code_state(client, channel, 1) if gpcode_manager.is_listen(code): l2_data_manager.process_data(code, datas) except l2_data_manager.L2DataException as l: # 单价不符 if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: key = "{}-{}-{}".format(client, channel, code) if key not in self.l2_data_error_dict or round( time.time() * 1000) - self.l2_data_error_dict[key] > 10000: # self.l2CodeOperate.repaire_l2_data(code) # todo 太敏感移除代码 logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg) # 单价不一致时需要移除代码重新添加 l2_code_operate.L2CodeOperate().remove_l2_listen(code) self.l2_data_error_dict[key] = round(time.time() * 1000) except Exception as e: print("异常", str(e)) logging.exception(e) logger_l2_error.error("出错:{}".format(str(e))) logger_l2_error.error("内容:{}".format(_str)) finally: __end_time = round(time.time() * 1000) # 只记录大于40ms的数据 if __end_time - __start_time > 40: logger_l2_process.info("l2处理时间:{}-{}".format(code, __end_time - __start_time)); except: pass elif type == 10: # level2交易队列 try: code, setData = data_process.parseL2TradeQueueData(_str) if gpcode_manager.is_listen(code): data_process.saveL2Data(day, code, setData) except: print("异常") elif type == 1: # 设置股票代码 data_list = data_process.parseGPCode(_str) data_process.saveZYLTSZ(data_list) code_list = [] for data in data_list: code_list.append(data["code"]) gpcode_manager.set_gp_list(code_list) # 重新订阅 self.server.pipe.send(json.dumps({"type": "resub"})) # 同步同花顺目标代码 t1 = threading.Thread(target=lambda: sync_target_codes_to_ths()) t1.setDaemon(True) t1.start() elif type == 2: # 涨停代码 codeList = data_process.parseGPCode(_str) gpcode_manager.set_limit_up_list(codeList) elif type == 3: # 交易成功信息 dataList = data_process.parseList(_str) try: trade_manager.process_trade_success_data(dataList) except Exception as e: logging.exception(e) trade_manager.save_trade_success_data(dataList) elif type == 5: # 交易委托信息 dataList = data_process.parseList(_str) if self.last_trade_delegate_data != _str: self.last_trade_delegate_data = _str # 保存委托信息 logger_trade_delegate.info(dataList) try: trade_manager.process_trade_delegate_data(dataList) except Exception as e: logging.exception(e) trade_manager.save_trade_delegate_data(dataList) elif type == 4: # 行业代码信息 dataList = data_process.parseList(_str) data_process.saveIndustryCode(dataList) elif type == 6: # 可用金额 datas = data_process.parseData(_str) client = datas["client"] money = datas["money"] # TODO存入缓存文件 trade_manager.set_available_money(client, money) elif type == 20: # 登录 data = data_process.parse(_str)["data"] try: client_id, _authoritys = authority.login(data["account"], data["pwd"]) return_str = data_process.toJson( {"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}}) except Exception as e: return_str = data_process.toJson({"code": 1, "msg": str(e)}) # 现价更新 elif type == 40: data = data_process.parse(_str)["data"] if data is not None: print("现价数量", len(data)) for item in data: juejin.accpt_price(item["code"], float(item["price"])) elif type == 30: data = data_process.parse(_str)["data"] client_id = data["client"] logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data))) data_process.saveClientActive(int(client_id), host) # print("心跳:", client_id) sk.send(return_str.encode()) # print("----------handler end ----------") def finish(self): super().finish() # 可以不调用父类的finish(),方法,父类的finish方法什么都没做 # print("--------finish方法被执行---") def send_msg(client_id, data): _ip = data_process.getActiveClientIP(client_id) print("ip", client_id, _ip) if _ip is None or len(_ip) <= 0: raise Exception("客户端IP为空") socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM) socketClient.connect((_ip, 9006)) # 连接socket try: socketClient.send(json.dumps(data).encode()) recv = socketClient.recv(1024) result = recv.decode().lstrip() return result finally: socketClient.close() # 客户端心跳机制 def test_client_server(): while True: clients = authority.get_l2_clients() for client in clients: print("心跳", client) try: send_msg(client, {"action": "test"}) except: pass # 矫正客户端代码 l2_code_operate.correct_client_codes() time.sleep(5) # 获取采集客户端的状态 def get_client_env_state(client): result = send_msg(client, {"action": "getEnvState"}) result = json.loads(result) if result["code"] == 0: return json.loads(result["data"]) else: raise Exception(result["msg"]) # 修复采集客户端 def repair_client_env(client): result = send_msg(client, {"action": "repairEnv"}) result = json.loads(result) if result["code"] != 0: raise Exception(result["msg"]) # 同步目标标的到同花顺 def sync_target_codes_to_ths(): codes = gpcode_manager.get_gp_list() code_list = [] for code in codes: code_list.append(code) client = authority._get_client_ids_by_rule("client-industry") result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list}) return result if __name__ == "__main__": try: result = get_client_env_state(3) print(result) except Exception as e: print(str(e))