import http import json import random import socketserver from http.server import BaseHTTPRequestHandler import dask from code_attribute import gpcode_manager from log_module import log_analyse, log_export, log, request_log_util from output import limit_up_data_filter, output_util from output.limit_up_data_filter import IgnoreCodeManager from third_data import kpl_util, kpl_data_manager, kpl_api from third_data.code_plate_key_manager import KPLPlateForbiddenManager from third_data.kpl_data_manager import KPLLimitUpDataRecordManager, KPLDataManager, KPLCodeLimitUpReasonManager from third_data.kpl_util import KPLPlatManager, KPLDataType from trade.l2_trade_util import BlackListCodeManager from utils import tool, global_util, hosting_api_util import urllib.parse as urlparse from urllib.parse import parse_qs class DataServer(BaseHTTPRequestHandler): ocr_temp_data = {} __kplDataManager = KPLDataManager() __IgnoreCodeManager = IgnoreCodeManager() __KPLPlatManager = KPLPlatManager() __KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager() __KPLPlateForbiddenManager = KPLPlateForbiddenManager() # 历史板块 __history_plates_dict = {} # 板块 __blocks_dict = {} # 精选,行业数据缓存 __jingxuan_cache_dict = {} __industry_cache_dict = {} def __get_limit_up_list(self): # 统计目前为止的代码涨停数量(分涨停原因) total_datas = KPLLimitUpDataRecordManager.total_datas if not total_datas: KPLLimitUpDataRecordManager.load_total_datas() total_datas = KPLLimitUpDataRecordManager.total_datas current_datas_results = hosting_api_util.common_request({"ctype": "get_kpl_limit_up_datas"}) if type(current_datas_results) == str: current_datas_results = json.loads(current_datas_results) current_datas = current_datas_results.get("data") # KPLLimitUpDataRecordManager.latest_origin_datas current_block_codes = {} for c in current_datas: if c[5] not in current_block_codes: current_block_codes[c[5]] = set() current_block_codes[c[5]].add(c[0]) # 通过涨停时间排序 total_datas = list(total_datas) # 统计涨停原因 limit_up_reason_dict = {} for d in total_datas: if d[2] not in limit_up_reason_dict: limit_up_reason_dict[d[2]] = [] limit_up_reason_dict[d[2]].append(d) for k in limit_up_reason_dict: limit_up_reason_dict[k].sort(key=lambda x: int(x[5])) # 统计想买单数量 want_codes = gpcode_manager.WantBuyCodesManager().list_code() limit_up_reason_want_count_dict = {} for d in total_datas: if d[2] not in limit_up_reason_want_count_dict: limit_up_reason_want_count_dict[d[2]] = 0 if d[3] in want_codes: limit_up_reason_want_count_dict[d[2]] += 1 # (板块名称,涨停代码数量,炸板数量,想买单数量,涨停时间) limit_up_reason_statistic_info = [ (k, len(limit_up_reason_dict[k]), len(limit_up_reason_dict[k]) - (len(current_block_codes[k]) if k in current_block_codes else 0), limit_up_reason_want_count_dict.get(k), limit_up_reason_dict[k][0][5]) for k in limit_up_reason_dict] limit_up_reason_statistic_info.sort( key=lambda x: len(current_block_codes.get(x[0])) if x[0] in current_block_codes else 0) limit_up_reason_statistic_info.reverse() codes_set = set([d[3] for d in total_datas]) # 判断是龙几,判断是否涨停,判断是否炸板,加载分数 rank_dict = limit_up_data_filter.get_limit_up_time_rank_dict(total_datas) limit_up_dict, limit_up_codes, open_limit_up_codes = limit_up_data_filter.get_limit_up_info(codes_set) score_dict = {} fresult = [] ignore_codes = self.__IgnoreCodeManager.list_ignore_codes("1") total_datas.sort(key=lambda x: int(x[5])) total_datas.reverse() # 获取涨停原因变化记录 reason_changes = log_export.load_kpl_reason_changes() reason_changes.reverse() reason_changes_dict = {} for r in reason_changes: if r[0] not in reason_changes_dict: reason_changes_dict[r[0]] = r[1] # 统计最近下单动作反馈 order_reasons_dict = log_analyse.get_cant_order_reasons_dict() kpl_can_buy_reasons_dict = log_analyse.get_kpl_can_buy_reasons_dict() for d in total_datas: code = d[3] # (代码, 名称, 涨停状态(0 - 无状态 1-涨停 2-炸板), 龙几, 首板, 分值, 涨停时间, 原因, 相同原因代码数量, 自由流通, 涨停原因是否变化,涨停原因的流入净额,下单简介) limit_up_state = 0 if code in limit_up_dict: if limit_up_dict[code][0]: limit_up_state = 1 elif limit_up_dict[code][1]: limit_up_state = 2 score = "" if code in score_dict: score = score_dict[code] if code in ignore_codes: continue # 涨停原因的净流入金额 reason = d[2] reason_money = '' if reason in self.__jingxuan_cache_dict: reason_money = output_util.money_desc(self.__jingxuan_cache_dict[reason][3]) elif reason in self.__industry_cache_dict: reason_money = output_util.money_desc(self.__industry_cache_dict[reason][3]) # 匹配下单反馈 order_desc = '' order_reason = order_reasons_dict.get(code) kpl_can_buy_reason = kpl_can_buy_reasons_dict.get(code) if order_reason and kpl_can_buy_reason: if int(order_reason[0].replace(":", "").replace(".", "")) > int( kpl_can_buy_reason[0].replace(":", "").replace(".", "")): order_desc = f"不:{order_reason[1]}" else: order_desc = f"买:{kpl_can_buy_reason[1]}" elif order_reason: order_desc = f"不:{order_reason[1]}" elif kpl_can_buy_reason: order_desc = f"买:{kpl_can_buy_reason[1]}" fresult.append((code, d[4], limit_up_state, f"龙{rank_dict.get(code)}", d[12], score, output_util.time_format(int(d[5])), d[2], d[10], output_util.money_desc(d[13]), reason_changes_dict.get(code), reason_money, order_desc)) response_data = json.dumps({"code": 0, "data": {"limit_up_count": len(limit_up_codes), "open_limit_up_count": len(open_limit_up_codes), "limit_up_reason_statistic": limit_up_reason_statistic_info, "limit_up_codes": fresult}}) return response_data def __get_plate_info(self, ps_dict): @dask.delayed def kpl_getStockIDPlate(code_): temp_data = kpl_api.getStockIDPlate(code_) return temp_data @dask.delayed def kpl_getSonPlate(plate_code_): if not plate_code: return None temp_data = kpl_api.getSonPlate(plate_code_) return temp_data @dask.delayed def kpl_getCodesByPlate(plate_code_): if not plate_code: return None temp_data = kpl_api.getCodesByPlate(plate_code_) return temp_data @dask.delayed def request_data(f1_, f2_): temp_data = f1_, f2_ return temp_data # 获取板块的代码 fresult = {} code = ps_dict["code"] code_info = KPLLimitUpDataRecordManager.list_by_code(code, tool.get_now_date_str())[0] hot_block_name = code_info[2] plate_code = self.__KPLPlatManager.get_plat(hot_block_name) f1 = kpl_getStockIDPlate(code) # f2 = kpl_getSonPlate(plate_code) f3 = kpl_getCodesByPlate(plate_code) dask_result = request_data(f1, f3) plate_info, codes_by_plate_info = dask_result.compute() if plate_info: plate_info.sort(key=lambda x: x[2]) plate_info.reverse() fresult["plate"] = plate_info # 获取代码的历史涨停数据,(涨停原因,日期,板块) fresult["code_records"] = KPLLimitUpDataRecordManager.get_latest_infos(code, 4, False)[:2] # 获取今日数据 fresult["today"] = (code_info[2], code_info[1], code_info[6]) fresult["industry"] = global_util.code_industry_map.get(code) if plate_code: # 获取强度 # datas = son_plate_info # # (代码,名称,强度) # temp = kpl_util.parseSonPlat(datas) # temp.sort(key=lambda x: x[2]) # temp.reverse() # fresult["plat_strength"] = temp # 获取涨停原因下面的列表 datas = codes_by_plate_info # (代码,名称,现价,涨幅,自由流通,几板,龙几,主力净额,300w净额,机构增仓) temps = kpl_util.parsePlateCodes(datas) # --数据准备开始-- codes_set = set([d[0] for d in temps]) limit_up_dict, limit_up_codes, open_limit_up_codes = limit_up_data_filter.get_limit_up_info(codes_set) score_dict = {} want_codes = gpcode_manager.WantBuyCodesManager().list_code() black_codes = BlackListCodeManager().list_codes() total_datas = KPLLimitUpDataRecordManager.total_datas code_info_dict = {} for val in total_datas: code_info_dict[val[3]] = val # --数据准备结束-- ignore_codes = self.__IgnoreCodeManager.list_ignore_codes("2") # 最终结果:(代码,名称,涨停状态(0-无状态 1-涨停 2-炸板),龙几,首板,分值,涨停时间,原因,相同原因代码数量,自由流通,涨停原因是否变化,涨幅,现价,黑名单,想买单,主力净值,300w,) codes_info_list = [] for t in temps: code = t[0] limit_up_state = 0 if code in limit_up_dict: if limit_up_dict[code][0]: limit_up_state = 1 elif limit_up_dict[code][1]: limit_up_state = 2 score = "" if code in score_dict: score = score_dict[code] limit_up_time = '' if code in code_info_dict: limit_up_time = output_util.time_format(code_info_dict[code][5]) final_code_info = {"code_info": ( t[0], t[1], limit_up_state, t[6], t[5], score, limit_up_time, code_info[2], code_info[10], output_util.money_desc(t[4]), 0, t[3], t[2], "黑名单" if code in black_codes else "", "想买单" if code in want_codes else "", output_util.money_desc(t[7]), output_util.money_desc(t[8]), output_util.money_desc(t[9]))} if code in code_info_dict: final_code_info["today"] = ( code_info_dict[code][2], code_info_dict[code][1], code_info_dict[code][6]) # 加载历史 if code in self.__history_plates_dict: final_code_info["code_records"] = self.__history_plates_dict[code][1] # 加载板块 if code in self.__blocks_dict: final_code_info["plate"] = self.__blocks_dict[code][1] # 获取二级行业 final_code_info["industry"] = global_util.code_industry_map.get(code) if code not in ignore_codes: codes_info_list.append(final_code_info) fresult["code_list_info"] = codes_info_list response_data = json.dumps({"code": 0, "data": fresult}) return response_data def do_GET(self): path = self.path url = urlparse.urlparse(path) request_log_util.request_info("DATA_SERVER_GET", f"GET 请求开始:{url.path}") try: if url.path == "/kpl/get_limit_up_list": response_data = self.__get_limit_up_list() self.send_response(200) # 发给请求客户端的响应数据 self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(response_data.encode()) else: ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) result = hosting_api_util.get_from_data_server(url.path, ps_dict) self.__send_response(result) finally: request_log_util.request_info("DATA_SERVER_GET", f"GET 请求结束") def do_POST(self): path = self.path url = urlparse.urlparse(path) request_log_util.request_info("DATA_SERVER_POST", f"POST 请求开始:{url.path}") try: if url.path == "/upload_kpl_data": # 接受开盘啦数据 params = self.__parse_request() result_str = self.__process_kpl_data(params) self.__send_response(result_str) finally: request_log_util.request_info("DATA_SERVER_POST", f"POST 请求结束") def __process_kpl_data(self, data): data = json.loads(json.dumps(data).replace("概念", "")) type_ = data["type"] print("开盘啦type:", type_) if type_ == KPLDataType.BIDDING.value: pass elif type_ == KPLDataType.LIMIT_UP.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_LIMIT_UP) if result_list: # 保存涨停时间 self.__kplDataManager.save_data(type_, result_list) kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), result_list) elif type_ == KPLDataType.OPEN_LIMIT_UP.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_OPEN_LIMIT_UP) if result_list: self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.LIMIT_DOWN.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_LIMIT_DOWN) if result_list: self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.EVER_LIMIT_DOWN.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_EVER_LIMIT_DOWN) if result_list: self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.FENG_KOU.value: fdata = data["data"] result_list = kpl_util.parseFengKou(fdata) result_list.sort(key=lambda x: x[3]) result_list.reverse() self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.BEST_FENG_KOU.value: result_list = kpl_util.parseBestFengKou(data["data"]) if result_list: self.__kplDataManager.save_data(type_, result_list) # 保存最强风口 elif type_ == KPLDataType.FENG_XIANG.value: result_list = kpl_util.parseFengXiang(data["data"]) # 保存风向数据 if result_list: self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.INDUSTRY_RANK.value: result_list = kpl_util.parseIndustryRank(data["data"]) # 保存行业数据 if result_list: self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.JINGXUAN_RANK.value: result_list = kpl_util.parseMarketJingXuan(data["data"]) # 保存精选数据 if result_list: self.__kplDataManager.save_data(type_, result_list) return json.dumps({"code": 0}) def __send_response(self, data): # 发给请求客户端的响应数据 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(data.encode()) def __parse_request(self): params = {} datas = self.rfile.read(int(self.headers['content-length'])) _str = str(datas, encoding="gbk") # print(_str) params = json.loads(_str) return params class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): pass def run(addr, port): # 运行看盘消息采集 # kp_client_msg_manager.run_capture() kpl_data_manager.run_pull_task() handler = DataServer # httpd = socketserver.TCPServer((addr, port), handler) httpd = ThreadedHTTPServer((addr, port), handler) print("DataServer is at: http://%s:%d/" % (addr, port)) httpd.serve_forever() if __name__ == "__main__": run("0.0.0.0", 9004)