import base64 import json import logging import socketserver import time from http.server import BaseHTTPRequestHandler import cv2 import tool from l2 import code_price_manager from third_data import kpl_util, kpl_data_manager from third_data.kpl_data_manager import KPLDataManager from third_data.kpl_util import KPLDataType import urllib.parse as urlparse from trade import bidding_money_manager class DataServer(BaseHTTPRequestHandler): ocr_temp_data = {} __kplDataManager = KPLDataManager() def do_GET(self): path = self.path self.send_response(200) # 发给请求客户端的响应数据 self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write("".encode()) def do_POST(self): path = self.path url = urlparse.urlparse(path) if url.path == "/upload_kpl_data": # 接受开盘啦数据 params = self.__parse_request() result_str = self.__process_kpl_data(params) self.__send_response(result_str) def __process_kpl_data(self, data): type_ = data["type"] print("开盘啦type:",type_) if type_ == KPLDataType.BIDDING.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_LIMIT_UP) # 竞价取前20 if result_list: result_list.sort(key=lambda x: x[2]) result_list.reverse() result_list = result_list[:20] bs = [] for d in result_list: bs.append((d[0], f"{d[2] // 10000 }万")) bidding_money_manager.set_bidding_money(bs[:10]) self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.LIMIT_UP.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_LIMIT_UP) if result_list: # 保存涨停时间 for d in result_list: code = d[0] if code.find("00") == 0 or code.find("60") == 0: limit_up_time = time.strftime("%H:%M:%S", time.localtime(d[2])) code_price_manager.Buy1PriceManager.set_limit_up_time(code, limit_up_time) self.__kplDataManager.save_data(type_, result_list) kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(),result_list) elif type_ == KPLDataType.OPEN_LIMIT_UP.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_OPEN_LIMIT_UP) if result_list: self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.LIMIT_DOWN.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_LIMIT_DOWN) if result_list: self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.EVER_LIMIT_DOWN.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_EVER_LIMIT_DOWN) if result_list: self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.FENG_KOU.value: fdata = data["data"] result_list = kpl_util.parseFengKou(fdata) result_list.sort(key=lambda x: x[3]) result_list.reverse() self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.BEST_FENG_KOU.value: result_list = kpl_util.parseBestFengKou(data["data"]) if result_list: self.__kplDataManager.save_data(type_, result_list) # 保存最强风口 elif type_ == KPLDataType.FENG_XIANG.value: result_list = kpl_util.parseFengXiang(data["data"]) # 保存风向数据 if result_list: self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.INDUSTRY_RANK.value: result_list = kpl_util.parseIndustryRank(data["data"]) # 保存风向数据 if result_list: self.__kplDataManager.save_data(type_, result_list) return json.dumps({"code": 0}) def __send_response(self, data): # 发给请求客户端的响应数据 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(data.encode()) def __parse_request(self): params = {} datas = self.rfile.read(int(self.headers['content-length'])) _str = str(datas, encoding="gbk") # print(_str) params = json.loads(_str) return params def run(addr, port): handler = DataServer httpd = socketserver.TCPServer((addr, port), handler) print("HTTP server is at: http://%s:%d/" % (addr, port)) httpd.serve_forever() if __name__ == "__main__": run("0.0.0.0", 9004)