import builtins import hashlib import json import logging import queue import random import socket import socketserver import threading import time import constant import log import socket_manager from db import mysql_data from db.redis_manager import RedisUtils, RedisManager from log import logger_debug, logger_request_debug from output import push_msg_manager from utils import socket_util, kpl_api_util, hosting_api_util, kp_client_msg_manager, global_data_cache_util, tool from utils.juejin_util import JueJinHttpApi trade_data_request_queue = queue.Queue() class MyTCPServer(socketserver.TCPServer): def __init__(self, server_address, RequestHandlerClass): socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=True) # 如果使用异步的形式则需要再重写ThreadingTCPServer class MyThreadingTCPServer(socketserver.ThreadingMixIn, MyTCPServer): pass class MyBaseRequestHandle(socketserver.BaseRequestHandler): __inited = False def setup(self): self.__init() @classmethod def __init(cls): if cls.__inited: return True cls.__inited = True cls.__req_socket_dict = {} def __is_sign_right(self, data_json): list_str = [] sign = data_json["sign"] data_json.pop("sign") for k in data_json: list_str.append(f"{k}={data_json[k]}") list_str.sort() __str = "&".join(list_str) + "JiaBei@!*." md5 = hashlib.md5(__str.encode(encoding='utf-8')).hexdigest() if md5 != sign: raise Exception("签名出错") @classmethod def getRecvData(cls, skk): data = "" header_size = 10 buf = skk.recv(header_size) header_str = buf if buf: start_time = time.time() buf = buf.decode('utf-8') if buf.startswith("##"): content_length = int(buf[2:10]) received_size = 0 while not received_size == content_length: r_data = skk.recv(10240) received_size += len(r_data) data += r_data.decode('utf-8') else: data = skk.recv(1024 * 1024) data = buf + data.decode('utf-8') return data, header_str def handle(self): host = self.client_address[0] super().handle() sk: socket.socket = self.request while True: try: data, header = self.getRecvData(sk) if data: data_str = data # print("收到数据------", f"{data_str[:20]}......{data_str[-20:]}") data_json = None try: data_json = json.loads(data_str) except json.decoder.JSONDecodeError as e: # JSON解析失败 sk.sendall(socket_util.load_header(json.dumps( {"code": 100, "msg": f"JSON解析失败"}).encode( encoding='utf-8'))) continue type_ = data_json["type"] __start_time = time.time() try: if data_json["type"] == 'l1_data': datas = data_json["data"] L1DataManager.add_datas(datas) break except Exception as e: log.logger_tuoguan_request_debug.exception(e) finally: if time.time() - __start_time > 2: log.logger_tuoguan_request_debug.info( f"耗时:{int(time.time() - __start_time)}s 数据:{data_json}") else: # 断开连接 break # sk.close() except Exception as e: # log.logger_tuoguan_request_debug.exception(e) logging.exception(e) break def finish(self): super().finish() # L1数据管理 class L1DataManager: __l1_datas_dict = {} @classmethod def add_datas(cls, datas): for data in datas: """ data数据结构:(代码,昨日收盘价,最新价,总成交量,总成交额,更新时间) """ cls.__l1_datas_dict[data[0]] = data @classmethod def get_current_l1_data(cls): return [cls.__l1_datas_dict[x] for x in cls.__l1_datas_dict] def run(port): print("create MiddleL1DataServer") laddr = "0.0.0.0", port print("MiddleServer is at: http://%s:%d/" % (laddr)) tcpserver = MyThreadingTCPServer(laddr, MyBaseRequestHandle) # 注意:参数是MyBaseRequestHandle tcpserver.serve_forever() if __name__ == "__main__": print(builtins.type("") == str)