import builtins import copy import hashlib import json import logging import queue import random import socket import socketserver import threading import time import constant from log_module import log from trade import huaxin_trade_api from utils import socket_util trade_data_request_queue = queue.Queue() __mysql_config_dict = {} def get_mysql_config(db_name): """ 获取mysql的配置 :param db_name: :return: """ if db_name in __mysql_config_dict: return __mysql_config_dict.get(db_name) config = copy.deepcopy(constant.MYSQL_CONFIG) config["database"] = db_name __mysql_config_dict[db_name] = config return config class MyTCPServer(socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass): socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True) # 如果使用异步的形式则需要再重写ThreadingTCPServer class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass class MyBaseRequestHandle(socketserver.BaseRequestHandler): __inited = False def setup(self): self.__init() @classmethod def __init(cls): if cls.__inited: return True cls.__inited = True cls.__req_socket_dict = {} def __is_sign_right(self, data_json): list_str = [] sign = data_json["sign"] data_json.pop("sign") for k in data_json: list_str.append(f"{k}={data_json[k]}") list_str.sort() __str = "&".join(list_str) + "JiaBei@!*." md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest() if md5 != sign: raise Exception("签名出错") @classmethod def getRecvData(cls, skk): data = "" header_size = 10 buf = skk.recv(header_size) header_str = buf if buf: start_time = time.time() buf = buf.decode('utf-8') if buf.startswith("##"): content_length = int(buf[2:10]) received_size = 0 while not received_size == content_length: r_data = skk.recv(10240) received_size += len(r_data) data += r_data.decode('utf-8') else: data = skk.recv(1024 * 1024) data = buf + data.decode('utf-8') return data, header_str def handle(self): host = self.client_address[0] super().handle() sk: socket.socket = self.request while True: try: data, header = self.getRecvData(sk) if data: data_str = data # print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}") data_json = None try: data_json = json.loads(data_str) except json.decoder.JSONDecodeError as e: # JSON解析失败 sk.sendall(socket_util.load_header(json.dumps( {"code": 100, "msg": f"JSON解析失败"}).encode( encoding='utf-8'))) continue type_ = data_json["type"] __start_time = time.time() try: if data_json["type"] == "simulation_trade": datas = data_json["data"] ctype = datas["ctype"] data = datas["data"] result = huaxin_trade_api.request(ctype,data) result_str = json.dumps({"code": 0, "data": result}, default=str) sk.sendall(socket_util.load_header(result_str.encode(encoding='utf-8'))) except Exception as e: log.logger_tuoguan_request_debug.exception(e) finally: if time.time() - __start_time > 2: log.logger_tuoguan_request_debug.info( f"耗时:{int(time.time() - __start_time)}s 数据:{data_json}") else: # 断开连接 break # sk.close() except Exception as e: # log.logger_tuoguan_request_debug.exception(e) logging.exception(e) break def finish(self): super().finish() def __recv_pipe_l1(pipe_trade, pipe_l1): if pipe_trade is not None and pipe_l1 is not None: while True: try: val = pipe_l1.recv() if val: val = json.loads(val) print("收到来自L1的数据:", val) # 处理数据 except: pass def run(port=constant.MIDDLE_SERVER_PORT): print("create MiddleServer") laddr = "0.0.0.0", port print("MiddleServer is at: http://%s:%d/" % (laddr)) tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle tcpserver.serve_forever() if __name__ == "__main__": pass