import http import json import logging import socketserver from http.server import BaseHTTPRequestHandler from log_module.log import logger_system, logger_debug, logger_request_api from strategy import strategy_manager from strategy.env_info import RealTimeEnvInfo from utils import tool from log_module import async_log_util import urllib.parse as urlparse # 禁用http.server的日志输出 logger = logging.getLogger("http.server") logger.setLevel(logging.CRITICAL) class DataServer(BaseHTTPRequestHandler): ocr_temp_data = {} def do_GET(self): path = self.path url = urlparse.urlparse(path) async_log_util.info(logger_request_api, f"开始请求{tool.get_thread_id()}-{url}") response_data = "" if url.path == "/get_kpl_data": response_data = json.dumps({"code": 0, "data": {}}) async_log_util.info(logger_request_api, f"结束请求{tool.get_thread_id()}-{url}") self.send_response(200) # 发给请求客户端的响应数据 self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(response_data.encode()) def do_POST(self): path = self.path url = urlparse.urlparse(path) result_str = "" try: if url.path == "/upload_big_order_datas": # 接收成交大单数据 params = self.__parse_request() strategy_manager.low_suction_strtegy.add_big_orders(params) # logger_debug.info("upload_big_order_datas:{}", f"{params}") RealTimeEnvInfo().big_order_update_time = tool.get_now_time_str() result_str = json.dumps({"code": 0}) elif url.path == "/upload_block_in_datas": # 接收板块流入数据 params = self.__parse_request() strategy_manager.low_suction_strtegy.add_block_in(params) # logger_debug.info("upload_block_in_datas:{}", f"{params}") RealTimeEnvInfo().block_in=(tool.get_now_time_str(), len(params)) result_str = json.dumps({"code": 0}) elif url.path == "/upload_limit_up_list": params = self.__parse_request() strategy_manager.low_suction_strtegy.add_limit_up_list(params) # logger_debug.info("upload_limit_up_list:{}", f"{params}") RealTimeEnvInfo().kpl_current_limit_up = (tool.get_now_time_str(), len(params)) result_str = json.dumps({"code": 0}) else: pass except Exception as e: logger_debug.exception(e) self.__send_response(result_str) def __send_response(self, data): # 发给请求客户端的响应数据 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(data.encode()) def __parse_request(self): params = {} datas = self.rfile.read(int(self.headers['content-length'])) _str = str(datas, encoding="gbk") params = json.loads(_str) return params class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): pass def run(addr, port): # 运行看盘消息采集 # kp_client_msg_manager.run_capture() handler = DataServer # httpd = socketserver.TCPServer((addr, port), handler) try: httpd = ThreadedHTTPServer((addr, port), handler) print("HTTP server is at: http://%s:%d/" % (addr, port)) httpd.serve_forever() except Exception as e: logger_system.exception(e) logger_system.error(f"端口服务器:{port} 启动失败") if __name__ == "__main__": run("", 9004)