import http import json import logging import socketserver import time import urllib from http.server import BaseHTTPRequestHandler import dask import requests import constant import inited_data from code_attribute.gpcode_manager import BlackListCodeManager, HumanRemoveForbiddenManager from l2.huaxin import huaxin_target_codes_manager from l2.l2_transaction_data_manager import HuaXinBuyOrderManager from log_module.log import logger_system, logger_debug, logger_kpl_limit_up, logger_request_api, \ logger_kpl_market_strong, logger_kpl_new_blocks from third_data.custom_block_in_money_manager import CodeInMoneyManager from third_data.kpl_data_constant import LimitUpCodesBlockRecordManager, LimitUpDataConstant, \ ContainsLimitupCodesBlocksManager from third_data.kpl_limit_up_data_manager import LatestLimitUpBlockManager, CodeLimitUpSequenceManager from third_data.third_blocks_manager import BlockMapManager from trade.buy_radical import radical_buy_data_manager, new_block_processor from trade.buy_radical.block_special_codes_manager import BlockSpecialCodesManager from trade.buy_radical.new_block_processor import BeforeBlocksComputer from trade.buy_strategy import OpenLimitUpGoodBlocksBuyStrategy from trade.buy_radical.radical_buy_data_manager import RadicalBuyBlockManager, BeforeSubDealBigOrderManager from utils import global_util, tool, data_export_util, init_data_util from code_attribute import gpcode_manager, code_nature_analyse from log_module import log_analyse, log_export, async_log_util from l2 import code_price_manager, l2_data_util, transaction_progress from cancel_strategy.s_l_h_cancel_strategy import HourCancelBigNumComputer, LCancelRateManager from output.limit_up_data_filter import IgnoreCodeManager from third_data import kpl_util, kpl_data_manager, kpl_api, block_info from third_data.code_plate_key_manager import RealTimeKplMarketData, KPLPlateForbiddenManager from third_data.history_k_data_util import HistoryKDatasUtils from third_data.kpl_data_manager import KPLDataManager, KPLLimitUpDataRecordManager, \ KPLCodeLimitUpReasonManager from third_data.kpl_util import KPLDataType, KPLPlatManager import urllib.parse as urlparse from urllib.parse import parse_qs from output import code_info_output, limit_up_data_filter, output_util, kp_client_msg_manager from trade import bidding_money_manager, trade_manager, l2_trade_util, trade_record_log_util, trade_constant, \ trade_data_manager, current_price_process_manager import concurrent.futures # 禁用http.server的日志输出 logger = logging.getLogger("http.server") logger.setLevel(logging.CRITICAL) class DataServer(BaseHTTPRequestHandler): ocr_temp_data = {} __kplDataManager = KPLDataManager() __IgnoreCodeManager = IgnoreCodeManager() __KPLPlatManager = KPLPlatManager() __KPLCodeLimitUpReasonManager = KPLCodeLimitUpReasonManager() # 历史板块 __history_plates_dict = {} # 板块 __blocks_dict = {} # 精选,行业数据缓存 __jingxuan_cache_dict = {} __industry_cache_dict = {} __latest_limit_up_codes_set = set() __data_process_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) # 新题材请求 __new_blocks_codes_request_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=5) # 代码的涨幅 __code_limit_rate_dict = {} # 禁用日志输出 def log_message(self, format, *args): pass def __get_limit_up_statistic_infos(self): # 统计目前为止的代码涨停数量(分涨停原因) currents = LimitUpDataConstant.current_limit_up_datas records = LimitUpDataConstant.history_limit_up_datas if not currents: currents = self.__kplDataManager.get_data(KPLDataType.LIMIT_UP) if currents is None: currents = [] # 获取历史涨停 if not records: KPLLimitUpDataRecordManager.load_total_datas() records = KPLLimitUpDataRecordManager.total_datas records_map = {x[3]: x for x in records} current_codes = [d[0] for d in currents] record_codes = [d[3] for d in records] # 计算涨停时间排序 record_reason_dict = {} current_reason_dict = {} for _code in record_codes: blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(_code) if not blocks: blocks = set() for b in blocks: if b not in record_reason_dict: record_reason_dict[b] = [] record_reason_dict[b].append(_code) for _code in current_codes: blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(_code) if not blocks: blocks = set() for b in blocks: if b not in current_reason_dict: current_reason_dict[b] = [] current_reason_dict[b].append(_code) # (板块名称,涨停代码数量,炸板数量,涨停时间, 是否有辨识度的票) limit_up_reason_statistic_info = [[k, len(record_reason_dict[k]), len(record_reason_dict[k]) - len( current_reason_dict.get(k) if k in current_reason_dict else []), 0, 0] for k in record_reason_dict] try: for b in limit_up_reason_statistic_info: codes_ = BlockSpecialCodesManager().get_block_codes(b[0]) if not codes_: codes_ = set() b[4] = len(set(record_reason_dict[b[0]]) & set(codes_)) except: pass limit_up_reason_statistic_info.sort(key=lambda x: x[1] - x[2]) limit_up_reason_statistic_info.reverse() response_data = json.dumps({"code": 0, "data": {"limit_up_count": len(current_codes), "open_limit_up_count": len(record_codes) - len(current_codes), "limit_up_reason_statistic": limit_up_reason_statistic_info}}) return response_data def __get_plate_info(self, ps_dict): @dask.delayed def kpl_getStockIDPlate(code_): temp_data = kpl_api.getStockIDPlate(code_) return temp_data @dask.delayed def kpl_getSonPlate(plate_code_): if not plate_code: return None temp_data = kpl_api.getSonPlate(plate_code_) return temp_data @dask.delayed def kpl_getCodesByPlate(plate_code_): if not plate_code: return None temp_data = kpl_api.getCodesByPlate(plate_code_) return temp_data @dask.delayed def request_data(f1_, f2_): temp_data = f1_, f2_ return temp_data # 获取板块的代码 fresult = {} code = ps_dict["code"] code_info = KPLLimitUpDataRecordManager.list_by_code(code, tool.get_now_date_str())[0] hot_block_name = code_info[2] plate_code = self.__KPLPlatManager.get_plat(hot_block_name) f1 = kpl_getStockIDPlate(code) # f2 = kpl_getSonPlate(plate_code) f3 = kpl_getCodesByPlate(plate_code) dask_result = request_data(f1, f3) plate_info, codes_by_plate_info = dask_result.compute() if plate_info: plate_info.sort(key=lambda x: x[2]) plate_info.reverse() fresult["plate"] = plate_info # 获取代码的历史涨停数据,(涨停原因,日期,板块) fresult["code_records"] = KPLLimitUpDataRecordManager.get_latest_infos(code, 4, False)[:2] # 获取今日数据 fresult["today"] = (code_info[2], code_info[1], code_info[6]) fresult["industry"] = global_util.code_industry_map.get(code) if plate_code: # 获取强度 # datas = son_plate_info # # (代码,名称,强度) # temp = kpl_util.parseSonPlat(datas) # temp.sort(key=lambda x: x[2]) # temp.reverse() # fresult["plat_strength"] = temp # 获取涨停原因下面的列表 datas = codes_by_plate_info # (代码,名称,现价,涨幅,自由流通,几板,龙几,主力净额,300w净额,机构增仓) temps = kpl_util.parsePlateCodes(datas) # --数据准备开始-- codes_set = set([d[0] for d in temps]) limit_up_dict, limit_up_codes, open_limit_up_codes = limit_up_data_filter.get_limit_up_info(codes_set) score_dict = {} want_codes = gpcode_manager.WantBuyCodesManager().list_code_cache() black_codes = BlackListCodeManager().list_codes() total_datas = KPLLimitUpDataRecordManager.total_datas code_info_dict = {} for val in total_datas: code_info_dict[val[3]] = val # --数据准备结束-- ignore_codes = self.__IgnoreCodeManager.list_ignore_codes("2") # 最终结果:(代码,名称,涨停状态(0-无状态 1-涨停 2-炸板),龙几,首板,分值,涨停时间,原因,相同原因代码数量,自由流通,涨停原因是否变化,涨幅,现价,黑名单,想买单,主力净值,300w,) codes_info_list = [] for t in temps: code = t[0] limit_up_state = 0 if code in limit_up_dict: if limit_up_dict[code][0]: limit_up_state = 1 elif limit_up_dict[code][1]: limit_up_state = 2 score = "" if code in score_dict: score = score_dict[code] limit_up_time = '' if code in code_info_dict: limit_up_time = output_util.time_format(code_info_dict[code][5]) final_code_info = {"code_info": ( t[0], t[1], limit_up_state, t[6], t[5], score, limit_up_time, code_info[2], code_info[10], output_util.money_desc(t[4]), 0, t[3], t[2], "黑名单" if code in black_codes else "", "想买单" if code in want_codes else "", output_util.money_desc(t[7]), output_util.money_desc(t[8]), output_util.money_desc(t[9]))} if code in code_info_dict: final_code_info["today"] = ( code_info_dict[code][2], code_info_dict[code][1], code_info_dict[code][6]) # 加载历史 if code in self.__history_plates_dict: final_code_info["code_records"] = self.__history_plates_dict[code][1] # 加载板块 if code in self.__blocks_dict: final_code_info["plate"] = self.__blocks_dict[code][1] # 获取二级行业 final_code_info["industry"] = global_util.code_industry_map.get(code) if code not in ignore_codes: codes_info_list.append(final_code_info) fresult["code_list_info"] = codes_info_list response_data = json.dumps({"code": 0, "data": fresult}) return response_data def do_GET(self): path = self.path url = urlparse.urlparse(path) async_log_util.info(logger_request_api, f"开始请求{tool.get_thread_id()}-{url}") response_data = "" if url.path == "/get_kpl_data": best_feng_kou = self.__kplDataManager.get_data(kpl_util.KPLDataType.BEST_FENG_KOU) if not best_feng_kou: best_feng_kou = [] best_feng_kou = best_feng_kou[:22] feng_kou = self.__kplDataManager.get_data(kpl_util.KPLDataType.FENG_KOU) if not feng_kou: feng_kou = [] feng_kou = feng_kou[:22] industry_rank = self.__kplDataManager.get_data(kpl_util.KPLDataType.INDUSTRY_RANK) if not industry_rank: industry_rank = [] industry_rank = industry_rank[:22] feng_xiang = self.__kplDataManager.get_data(kpl_util.KPLDataType.FENG_XIANG) if not feng_xiang: feng_xiang = [] feng_xiang = feng_xiang[:22] response_data = json.dumps({"code": 0, "data": {"best_feng_kou": best_feng_kou, "feng_kou": feng_kou, "industry_rank": industry_rank, "feng_xiang": feng_xiang}}) elif url.path == "/get_score_info": start_time = time.time() ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict['code'] name = ps_dict.get('name') date = ps_dict.get('date') try: data = code_info_output.get_output_params(code, self.__jingxuan_cache_dict, self.__industry_cache_dict) if data["code_name"].find("None") > -1 and name: data["code_name"] = f"{name} {code}" self.__history_plates_dict[code] = (time.time(), data["kpl_code_info"]["code_records"]) if "plate" in data["kpl_code_info"]: self.__blocks_dict[code] = (time.time(), data["kpl_code_info"]["plate"]) response_data = json.dumps({"code": 0, "data": data}) print("get_score_info 耗时:", time.time() - start_time) except Exception as e: logger_debug.exception(e) logging.exception(e) elif url.path == "/get_trade_records": # 获取挂撤信息 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict['code'] date = ps_dict.get('date') local_today_datas = log_export.load_l2_from_log(date) total_datas = local_today_datas.get(code) trade_info = code_info_output.load_trade_record(code, total_datas, date) response_data = json.dumps({"code": 0, "data": {"open_limit_up": trade_info[0], "records": trade_info[2]}}) elif url.path == "/get_l2_cant_buy_reasons": # 获取L2没买的原因 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict['code'] fdatas = log_export.get_l2_cant_buy_reasons(code) response_data = json.dumps({"code": 0, "data": fdatas}) elif url.path == "/get_kpl_block_info": start_time = time.time() ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict['code'] try: data = code_info_output.get_kpl_block_info(code) response_data = json.dumps({"code": 0, "data": data}) print("get_kpl_block_info 耗时:", time.time() - start_time) except Exception as e: logger_debug.exception(e) logging.exception(e) elif url.path == "/get_l2_datas": try: # 获取L2的数据 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict['code'] date = ps_dict.get('date') time_str = ps_dict.get('time') end_index = ps_dict.get('end_index') if end_index: end_index = int(end_index) total_datas = l2_data_util.local_today_datas.get(code) if not total_datas: total_datas = [] if date or time_str: total_datas = None else: date = tool.get_now_date_str() delegate_datas = data_export_util.get_l2_datas(code, total_datas, date=date, end_index=end_index) transaction_datas = data_export_util.get_l2_transaction_datas(code, date=date) code_name = gpcode_manager.get_code_name(code) response_data = json.dumps({"code": 0, "data": {"code": code, "code_name": code_name, "data": {"delegates": delegate_datas, "transactions": transaction_datas}}}) except Exception as e: logger_debug.exception(e) elif url.path == "/get_trade_progress": # 获取交易进度 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict['code'] trade_progress, is_default = transaction_progress.TradeBuyQueue().get_traded_index(code) # 获取正在成交, 计算成交进度 dealing_info = HuaXinBuyOrderManager.get_dealing_order_info(code) dealing_active_info = HuaXinBuyOrderManager.get_dealing_active_order_info(code) percent = 100 if dealing_info: total_datas = l2_data_util.local_today_datas.get(code) if str(total_datas[trade_progress]['val']["orderNo"]) == str(dealing_info[0]): num = total_datas[trade_progress]['val']['num'] if dealing_active_info and dealing_info[0] == dealing_active_info[0]: if tool.is_sh_code(code): num += dealing_active_info[1] // 100 percent = int(dealing_info[1] / num) response_data = json.dumps( {"code": 0, "data": {"trade_progress": trade_progress, "is_default": is_default, "percent": percent}}) elif url.path == "/get_l_cancel_datas": # 最新的l撤数据 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict['code'] date = ps_dict.get('date') if not date: date = tool.get_now_date_str() buy_single_index = ps_dict.get('buy_single_index') if buy_single_index is not None: buy_single_index = int(buy_single_index) records = code_info_output.load_trade_record_cancel_watch_indexes(code, date=date) # 获取最新的L上与L下 records.reverse() up_indexes = [] down_indexes = [] for r in records: if buy_single_index and buy_single_index != r[1]: continue if r[0] == trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_L_UP: up_indexes = r[2] break for r in records: if buy_single_index and buy_single_index != r[1]: continue if r[0] == trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_L_DOWN: down_indexes = r[2] break response_data = json.dumps( {"code": 0, "data": {"up": up_indexes, "down": down_indexes}}) elif url.path == "/get_h_cancel_datas": # 最新的H撤数据 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict['code'] buy_single_index = ps_dict.get('buy_single_index') records = code_info_output.load_trade_record_cancel_watch_indexes(code, trade_record_log_util.CancelWatchIndexesInfo.CANCEL_TYPE_H) # 获取最新的L上与L下 records.reverse() indexes = [] for r in records: if buy_single_index and buy_single_index != r[1]: continue indexes = r[2] break response_data = json.dumps( {"code": 0, "data": indexes}) elif url.path == "/kpl/get_limit_up_statistic_infos": try: # 统计最近的涨停板块 response_data = self.__get_limit_up_statistic_infos() except Exception as e: logger_debug.exception(e) elif url.path == "/kpl/get_plate_info": ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) response_data = self.__get_plate_info(ps_dict) elif url.path == "/kpl/get_market_data": # 获取板块信息 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) type_ = int(ps_dict['type']) result = [] if type_ == 0: # 行业,主力净额倒序 result = kpl_api.getMarketIndustryRealRankingInfo(True) result = kpl_util.parseMarketIndustry(result) elif type_ == 1: # 行业,主力净额顺序 result = kpl_api.getMarketIndustryRealRankingInfo(False) result = kpl_util.parseMarketIndustry(result) elif type_ == 2: # 精选,主力净额倒序 result = kpl_api.getMarketJingXuanRealRankingInfo(True) result = kpl_util.parseMarketJingXuan(result) elif type_ == 3: # 精选,主力净额顺序 result = kpl_api.getMarketJingXuanRealRankingInfo(False) result = kpl_util.parseMarketJingXuan(result) forbidden_plates = KPLPlateForbiddenManager().list_all() fresult = [] for d in result: if type_ == 2 or type_ == 3: self.__jingxuan_cache_dict[d[1]] = d elif type_ == 0 or type_ == 1: self.__industry_cache_dict[d[1]] = d d = list(d) d.append(1 if d[1] in forbidden_plates else 0) fresult.append(d) response_data = json.dumps({"code": 0, "data": fresult}) elif url.path == "/kpl/add_ignore_code": ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict['code'] type_ = ps_dict['type'] self.__IgnoreCodeManager.ignore_code(type_, code) response_data = json.dumps({"code": 0}) elif url.path == "/kpl/forbidden_plate": # 添加不能买的板块 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) plate = ps_dict["plate"] # 加入禁止 KPLPlateForbiddenManager().save_plate(plate) response_data = json.dumps({"code": 0}) elif url.path == "/kpl/del_forbidden_plate": # 删除不能买的板块 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) plate = ps_dict["plate"] # 加入禁止 KPLPlateForbiddenManager().delete_plate(plate) response_data = json.dumps({"code": 0}) elif url.path == "/kpl/list_forbidden_plate": # 不能买的板块列表 results = KPLPlateForbiddenManager().list_all_cache() response_data = json.dumps({"code": 0, "data": list(results)}) elif url.path == "/kpl/list_deleted_forbidden_plate": # 获取已经删除的板块 results = KPLPlateForbiddenManager().list_all_deleted_cache() if results: results -= KPLPlateForbiddenManager().list_all_cache() response_data = json.dumps({"code": 0, "data": list(results)}) elif url.path == "/kpl/get_plate_codes": # 获取涨停原因下面的代码 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) plate = kpl_util.filter_block(ps_dict["plate"]) special_codes = BlockSpecialCodesManager().get_block_codes(plate) if special_codes is None: special_codes = set() # 获取板块下的代码 # 统计目前为止的代码涨停数量(分涨停原因) now_limit_up_codes_info = self.__kplDataManager.get_data(KPLDataType.LIMIT_UP) now_limit_up_codes = set([d[0] for d in now_limit_up_codes_info]) # 获取历史涨停 record_limit_up_datas = KPLLimitUpDataRecordManager.total_datas if not record_limit_up_datas: KPLLimitUpDataRecordManager.load_total_datas() record_limit_up_datas = KPLLimitUpDataRecordManager.total_datas codes_info = [] for d in record_limit_up_datas: if kpl_util.filter_block(d[2]) != plate: continue if not tool.is_can_buy_code(d[3]): continue # 代码,名称,涨停时间,是否炸板,是否想买,是否已经下过单,涨停时间,自由流通市值,是否在黑名单里面 codes_info.append( [d[3], d[4], tool.to_time_str(int(d[5])), 1 if d[3] not in now_limit_up_codes else 0, 0, 0, d[12], output_util.money_desc(d[13]), 1, 1 if l2_trade_util.is_in_forbidden_trade_codes(d[3]) else 0, 1 if d[3] in special_codes else 0]) for d in record_limit_up_datas: if kpl_util.filter_block(d[2]) == plate: continue if plate not in [kpl_util.filter_block(k) for k in d[6].split("、")]: continue if not tool.is_can_buy_code(d[3]): continue # 代码,名称,涨停时间,是否炸板,是否想买,是否已经下过单,涨停时间,自由流通市值,是否在黑名单里面 codes_info.append( [d[3], d[4], tool.to_time_str(int(d[5])), 1 if d[3] not in now_limit_up_codes else 0, 0, 0, d[12], output_util.money_desc(d[13]), 0, 1 if l2_trade_util.is_in_forbidden_trade_codes(d[3]) else 0]) codes_info.sort(key=lambda x: x[2]) # 查询是否为想买单 green_codes = gpcode_manager.GreenListCodeManager().list_codes_cache() for code_info in codes_info: code_info[4] = 1 if code_info[0] in green_codes else 0 # 获取代码状态 if trade_manager.CodesTradeStateManager().get_trade_state_cache( code_info[0]) != trade_constant.TRADE_STATE_NOT_TRADE: code_info[5] = 1 response_data = json.dumps({"code": 0, "data": codes_info}) elif url.path == "/kpl/get_plate_codes_new": # 获取涨停原因下面的代码 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) plate = kpl_util.filter_block(ps_dict["plate"]) special_codes = set() plates = BlockMapManager().filter_blocks({plate}) for p in plates: _codes = BlockSpecialCodesManager().get_block_codes(p) if _codes is None: _codes = set() special_codes |= _codes # 获取板块下的代码 # 统计目前为止的代码涨停数量(分涨停原因) now_limit_up_codes_info = self.__kplDataManager.get_data(KPLDataType.LIMIT_UP) now_limit_up_codes = set([d[0] for d in now_limit_up_codes_info]) # 获取历史涨停 record_limit_up_datas = KPLLimitUpDataRecordManager.total_datas if not record_limit_up_datas: KPLLimitUpDataRecordManager.load_total_datas() record_limit_up_datas = KPLLimitUpDataRecordManager.total_datas codes_info = [] for d in record_limit_up_datas: _code = d[3] # blocks = LimitUpDataConstant.get_blocks_with_history(_code) blocks = LimitUpCodesBlockRecordManager().get_radical_buy_blocks(_code) if not blocks: blocks = set() blocks = BlockMapManager().filter_blocks(blocks) if blocks is not None and plate not in blocks: continue if not tool.is_can_buy_code(d[3]): continue # 代码,名称,涨停时间,是否炸板,是否加绿,是否已经下过单,涨停时间,自由流通市值,是否在黑名单里面, 是否有辨识度, 大单净额, 是否加想 codes_info.append( [d[3], d[4], tool.to_time_str(int(d[5])), 1 if d[3] not in now_limit_up_codes else 0, 0, 0, d[12], output_util.money_desc(d[13]), 1, 1 if l2_trade_util.is_in_forbidden_trade_codes(d[3]) else 0, 1 if d[3] in special_codes else 0, CodeInMoneyManager().get_money(d[3]), 0]) codes_info.sort(key=lambda x: x[2]) # 查询是否为想买单 green_codes = gpcode_manager.GreenListCodeManager().list_codes_cache() want_codes = gpcode_manager.WantBuyCodesManager().list_code_cache() for code_info in codes_info: code_info[4] = 1 if code_info[0] in green_codes else 0 code_info[12] = 1 if code_info[0] in want_codes else 0 # 获取代码状态 if trade_manager.CodesTradeStateManager().get_trade_state_cache( code_info[0]) != trade_constant.TRADE_STATE_NOT_TRADE: code_info[5] = 1 # 涨停数据 fdatas = {"limit_up_list": codes_info} # 辨识度票 fdatas["speical_codes"] = [(x, gpcode_manager.get_code_name(x)) for x in special_codes] forbidden_refer_codes = KPLPlateForbiddenManager().get_watch_high_codes_by_block(plate) if forbidden_refer_codes is None: forbidden_refer_codes = set() fdatas["forbidden_refer_codes"] = [(x, gpcode_manager.get_code_name(x)) for x in forbidden_refer_codes] response_data = json.dumps({"code": 0, "data": fdatas}) elif url.path == "/kpl/get_open_limit_up_count_rank": # 获取炸板次数排行 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict.get("code") results = log_export.load_kpl_open_limit_up() statistic = {} for result in results: for c in result[1]: if not tool.is_can_buy_code(c): continue if code and code != c: continue if c not in statistic: statistic[c] = 0 statistic[c] += 1 # 倒序排 statistic_list = [(k, statistic[k]) for k in statistic] statistic_list.sort(key=lambda x: x[1], reverse=True) fresults = [] limit_up_records = KPLLimitUpDataRecordManager.list_all_cache(tool.get_now_date_str()) limit_up_count_dict = {} if limit_up_records: for d in limit_up_records: limit_up_count_dict[d[3]] = d[12] for x in statistic_list: fresults.append((x[0], gpcode_manager.get_code_name(x[0]), x[1], limit_up_count_dict.get(x[0]))) fresults = fresults[:30] response_data = json.dumps({"code": 0, "data": fresults}) elif url.path == "/kpl/get_latest_limit_up_queue": # 获取最近的涨停队列 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) time_str = ps_dict.get("time") day = ps_dict.get("day") if not day: day = tool.get_now_date_str() results = log_export.load_kpl_limit_up_records(time_str, date=day) if not results: results = [] response_data = json.dumps({"code": 0, "data": results}) elif url.path == "/get_h_cancel_data": ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict["code"] if code: total_datas = l2_data_util.local_today_datas.get(code) if total_datas is None: l2_data_util.load_l2_data(code) total_datas = l2_data_util.local_today_datas.get(code) trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code) if trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_constant.TRADE_STATE_BUY_DELEGATED or trade_state == trade_constant.TRADE_STATE_BUY_SUCCESS: hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer().get_watch_index_dict(code) # 根据日志读取实时的计算数据 h_cancel_latest_compute_info = log_export.get_h_cancel_compute_info(code) if hcancel_datas_dict: temp_list = [(k, hcancel_datas_dict[k][0]) for k in hcancel_datas_dict] canceled_indexs = set([int(k.split("-")[0]) for k in cancel_indexes_set]) temp_list.sort(key=lambda x: x[0]) fdata = { "computed_info": list( h_cancel_latest_compute_info) if h_cancel_latest_compute_info else None, "datas": []} for i in range(0, len(temp_list)): temp = temp_list[i] val = total_datas[temp[0]]["val"] canceled = temp[0] in canceled_indexs fdata["datas"].append( (val["time"], val["num"], code_info_output.money_desc(val["num"] * float(val["price"]) * 100), (1 if canceled else 0))) response_data = json.dumps({"code": 0, "data": fdata}) else: response_data = json.dumps({"code": 1, "msg": "无H撤数据"}) else: response_data = json.dumps({"code": 1, "msg": "无H撤数据"}) else: response_data = json.dumps({"code": 1, "msg": "请上传code"}) elif url.path == "/get_last_trade_day_reasons": # 计算平均涨幅 def get_limit_rate_list(codes): if not codes: return [] need_request_codes = set() if tool.trade_time_sub(tool.get_now_time_str(), "09:30:00") < 0: need_request_codes |= set(codes) else: now_time = time.time() for c in codes: if c not in self.__code_limit_rate_dict: need_request_codes.add(c) elif now_time - self.__code_limit_rate_dict[c][1] > 60: need_request_codes.add(c) if need_request_codes: _limit_rate_list = HistoryKDatasUtils.get_codes_limit_rate(list(need_request_codes)) for d in _limit_rate_list: self.__code_limit_rate_dict[d[0]] = (d[1], time.time()) return [(c_, self.__code_limit_rate_dict[c_][0]) for c_ in codes] try: raise Exception("接口暂停使用") # 获取上个交易日的相同涨停原因的代码信息 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict["code"] # 获取昨日涨停数据 day = HistoryKDatasUtils.get_previous_trading_date_cache(tool.get_now_date_str()) limit_up_records = kpl_data_manager.KPLLimitUpDataRecordManager.list_all_cache(day) reasons = [] for d in limit_up_records: if d[3] == code: reasons.append(d) # 获取代码的原因 if reasons: reasons = list(reasons) reasons.sort(key=lambda x: x[9]) reason = reasons[-1][2] # 获取涨停数据 datas = self.__kplDataManager.get_from_file_cache(kpl_util.KPLDataType.LIMIT_UP, day) # (代码,名称,首次涨停时间,最近涨停时间,几板,涨停原因,板块,实际流通,主力净额,涨停原因代码,涨停原因代码数量) yesterday_result_list = [] percent_rate = 0 if datas: yesterday_codes = set() for d in datas: if d[5] == reason: yesterday_codes.add(d[0]) # 获取涨幅 limit_rate_list = get_limit_rate_list(yesterday_codes) limit_rate_dict = {} if limit_rate_list: total_rate = 0 for d in limit_rate_list: limit_rate_dict[d[0]] = d[1] total_rate += d[1] percent_rate = round(total_rate / len(limit_rate_list), 2) for d in datas: if d[5] == reason: yesterday_codes.add(d[0]) if d[0] != code: # (代码,名称, 涨幅) yesterday_result_list.append((d[0], d[1], limit_rate_dict.get(d[0]))) current_limit_up_list = kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas current_result_list = [] if current_limit_up_list: for c in current_limit_up_list: if c[5] == reason and c[0] != code: current_result_list.append((c[0], c[1])) response_data = json.dumps({"code": 0, "data": {"reason": reason, "reason_rate": percent_rate, "data": {"yesterday": yesterday_result_list, "current": current_result_list}}}) else: response_data = json.dumps({"code": 1, "msg": "昨日未涨停"}) except Exception as e: logger_debug.exception(e) raise e elif url.path == "/pull_kp_client_msg": # 拉取客户端消息 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) client = ps_dict["client"] msg = kp_client_msg_manager.read_msg(client) if msg: response_data = json.dumps({"code": 0, "data": msg}) else: response_data = json.dumps({"code": 1, "msg": "暂无消息"}) elif url.path == "/list_kp_client_msg": msg_list = kp_client_msg_manager.list_msg_from_local() msg_list.reverse() msg_list = [f"{msg.split('|')[0]}{msg.split('|')[-1].split('-')[1].strip()}" for msg in msg_list] response_data = json.dumps({"code": 0, "data": msg_list}) elif url.path == "/statistic_latest_limit_up_block": try: # 统计最近的涨停板块 datas = LatestLimitUpBlockManager().statistics_limit_up_block_infos() response_data = json.dumps({"code": 0, "data": datas}) except Exception as e: logger_debug.exception(e) elif url.path == "/get_new_blocks": # 获取新板块 blocks = KPLLimitUpDataRecordManager.get_new_blocks(tool.get_now_date_str()) response_data = json.dumps({"code": 0, "data": blocks}) elif url.path == "/get_account_commission_detail": # 获取手续费详情 try: fdata = {"delegates": {}} # 获取本月的手续费 end_date = tool.get_now_date_str("%Y%m%d") start_date = f"{end_date[:6]}01" delegates_month = trade_data_manager.AccountMoneyManager().get_delegated_count_info(start_date, end_date) # 股票,上证可转债 , 深证可转债 deals_month = trade_data_manager.AccountMoneyManager().get_deal_count_info(start_date, end_date) cost_month = sum([round(0.1 * x[1], 2) for x in delegates_month]) make_month = 0 make_month += max(1 * deals_month[0][1] if deals_month[0][1] else 0, deals_month[0][2] * 1.854 / 10000 if deals_month[0][2] else 0) + 1 * deals_month[1][ 1] + 0 * deals_month[2][1] fdata["month_commission"] = round(make_month - cost_month, 2) # 计算当日手续费详情 delegates = trade_data_manager.AccountMoneyManager().get_delegated_count_info() delegates = [{"count": x[1], "price": 0.1, "money": round(0.1 * x[1], 2)} for x in delegates] fdata["delegates"]["buy"] = delegates[0] fdata["delegates"]["buy_cancel"] = delegates[1] fdata["delegates"]["sell_cancel"] = delegates[2] fdata["delegates"]["sell"] = delegates[3] deals = trade_data_manager.AccountMoneyManager().get_deal_count_info() fdata["deals"] = {} fdata["deals"]["stock"] = {"count": deals[0][1], "price": 1, "money": round(1 * deals[0][1], 2)} fdata["deals"]["sh_cb"] = {"count": deals[1][1], "price": 1, "money": round(1 * deals[1][1], 2)} fdata["deals"]["sz_cb"] = {"count": deals[2][1], "price": 0, "money": round(0 * deals[2][1], 2)} fdata["commission"] = trade_data_manager.AccountMoneyManager().get_commission_cache() response_data = json.dumps({"code": 0, "data": fdata}) except Exception as e: logger_debug.exception(e) elif url.path == "/get_place_order_records": # 获取下单记录 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) try: day = ps_dict.get("day") if not day: day = tool.get_now_date_str() records = log_export.load_trade_recod_by_type("place_order", date=day) fdata = [] for record in records: print(record) # (下单时间, 代码, 名称, 下单模式, 板块信息) fdata.append((record[0], record[1], gpcode_manager.get_code_name(record[1]), record[3]["mode_desc"], record[3].get("block_info"))) response_data = json.dumps({"code": 0, "data": fdata}) except: pass elif url.path == "/get_blocks_in_money_info": # 获取板块资金流入状况 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) type_ = int(ps_dict.get("type")) try: fdatas = [] if type_ == 0: in_blocks = RealTimeKplMarketData().get_top_market_jingxuan_blocks() if not in_blocks: in_blocks = set() fdatas = RealTimeKplMarketData.top_in_list_cache if not fdatas: datas = self.__kplDataManager.get_data(KPLDataType.JINGXUAN_RANK) fdatas = datas # 返回是否在流入前几 temp_datas = [] for d in fdatas: temp = list(d) if d[1] in in_blocks: temp.append(1) else: temp.append(0) if in_blocks and d[1] == in_blocks[-1]: temp.append(RealTimeKplMarketData.get_market_strong()) else: temp.append(0) temp_datas.append(temp) fdatas = temp_datas elif type_ == 1: out_blocks = RealTimeKplMarketData().get_top_market_jingxuan_out_blocks() if not out_blocks: out_blocks = set() fdatas = RealTimeKplMarketData.top_out_list_cache if not fdatas: datas = self.__kplDataManager.get_data(KPLDataType.JINGXUAN_RANK_OUT) fdatas = datas # 返回是否在流入前几 temp_datas = [] for d in fdatas: temp = list(d) if d[1] in out_blocks: temp.append(1) else: temp.append(0) temp_datas.append(temp) fdatas = temp_datas response_data = json.dumps({"code": 0, "data": fdatas}) except: pass elif url.path == "/get_block_codes_with_money": # 获取板块资金流入状况 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) block = ps_dict.get("block") # 是否倒序排 desc = int(ps_dict.get("desc")) try: response_data = requests.get( "http://127.0.0.1:9005/get_block_codes_money?block=" + urllib.parse.quote(block)) r_str = response_data.text response_data = json.loads(r_str) if response_data["code"] == 0: datas = response_data["data"] fdatas = [] for d in datas: # (代码, 名称, 流入金额, 是否被排除成分股) fdatas.append((d[0], gpcode_manager.get_code_name(d[0]), d[1], d[2])) if desc: fdatas.sort(key=lambda x: x[2], reverse=True) else: fdatas.sort(key=lambda x: x[2]) fdatas = fdatas[:50] response_data = json.dumps({"code": 0, "data": fdatas}) else: response_data = r_str except Exception as e: response_data = json.dumps({"code": 1, "data": str(1)}) elif url.path == "/get_l2_subscript_codes": # 获取L2订阅的代码 ps_dict = dict([(k, v[0]) for k, v in parse_qs(url.query).items()]) code = ps_dict.get('code') fresults = [] try: if code: codes = [code] else: codes = huaxin_target_codes_manager.HuaXinL2SubscriptCodesManager.get_subscript_codes() if codes: for code in codes: deal_big_order_detail_info = None try: # 获取成交大单:(参考大单金额,已成交大单金额,大单要求金额, 已成交涨停买金额, 已成交涨停卖金额) th_temp_buy_info = BeforeSubDealBigOrderManager().get_temp_deal_big_order_threshold_info( code) th_buy, th_buy_default = BeforeSubDealBigOrderManager().get_big_order_threshold(code) try: th_sell = BeforeSubDealBigOrderManager().get_big_sell_order_threshold(code) except: th_sell = 0 deal_big_money_info = radical_buy_data_manager.get_total_deal_big_order_info( code, gpcode_manager.get_limit_up_price_as_num(code)) if deal_big_money_info[1] == 0 and len(codes) == 1: deal_big_money_info = list(deal_big_money_info) # 没有订阅L2会出现没有值的情况,如果涨停过就拉取之前的涨停买/卖大单 deal_big_orders_result = radical_buy_data_manager.request_deal_big_orders(code) if deal_big_orders_result: buy_datas, sell_datas = deal_big_orders_result[0], deal_big_orders_result[1] limit_up_price = gpcode_manager.get_limit_up_price_as_num(code) limit_up_price_money_list = [x[0] for x in buy_datas if x[1] == limit_up_price] threshold_money = BeforeSubDealBigOrderManager().compute_re_limit_up_big_money_threshold( limit_up_price_money_list) logger_debug.info(f"{code}-临时回封均大单:{threshold_money}") # 设置买单阈值 th_buy = threshold_money buy_money = sum(limit_up_price_money_list) sell_money = sum([x[0] for x in sell_datas if x[1] == limit_up_price]) # 涨停大单净买入 deal_big_money_info[1] = buy_money - sell_money deal_big_money_info[ 2] = radical_buy_data_manager.compute_total_deal_big_order_threshold_money(code, limit_up_price, threshold_money) logger_debug.info(f"{code}-累计大单阈值:{deal_big_money_info[2]}") big_money_rate = radical_buy_data_manager.TotalDealBigOrderInfoManager.get_big_order_rate( code) if not big_money_rate: big_money_rate = 0 # 大单成交信息 deal_big_order_info = [ (output_util.money_desc(th_temp_buy_info[0] if th_temp_buy_info else 0), # 临时买大单阈值 output_util.money_desc(th_buy), # 买大单阈值 output_util.money_desc(th_sell), # 卖单阈值 big_money_rate * 100 # 大单成交比 ), output_util.money_desc(deal_big_money_info[1]), output_util.money_desc(deal_big_money_info[2]), output_util.money_desc(deal_big_money_info[3]), ] if len(codes) == 1: # 加载大单详情 deal_big_order_detail_info = radical_buy_data_manager.get_l2_big_order_deal_info(code) # 加载涨停大单详情 limit_up_big_order_detail = radical_buy_data_manager.get_total_detal_big_order_details( code) if max(limit_up_big_order_detail) == 0: # 没有数据,从网络加载 limit_up_big_order_detail = list(limit_up_big_order_detail) limit_up_big_order_detail[1] = deal_big_order_detail_info[1][0] limit_up_big_order_detail[3] = deal_big_order_detail_info[2][0] deal_big_order_info.append( output_util.money_desc(limit_up_big_order_detail[0] + limit_up_big_order_detail[1])) deal_big_order_info.append( output_util.money_desc(limit_up_big_order_detail[2] + limit_up_big_order_detail[3])) deal_big_order_info.append( radical_buy_data_manager.TotalDealBigOrderInfoManager().get_big_order_rate(code)) except Exception as e: logger_debug.error(f"可能没有获取到涨停价:{code}") if not gpcode_manager.get_limit_up_price(code): init_data_util.re_set_price_pre(code) logger_debug.exception(e) deal_big_order_info = None code_name = gpcode_manager.get_code_name(code) fresults.append((code, code_name, deal_big_order_info, deal_big_order_detail_info)) response_data = json.dumps({"code": 0, "data": fresults}) except Exception as e: response_data = json.dumps({"code": 1, "data": str(1)}) elif url.path == "/get_all_special_codes": # 获取所有辨识度的代码 code_blocks_dict = BlockSpecialCodesManager().get_code_blocks_dict() fdata = {} for k in code_blocks_dict: fdata[k] = list(code_blocks_dict[k]) response_data = json.dumps({"code": 0, "data": fdata}) elif url.path == "/get_new_blocks_special_codes": # 获取所有辨识度的代码 code_blocks_dict = BlockSpecialCodesManager().get_temp_code_blocks_dict() fdata = {} for k in code_blocks_dict: fdata[k] = list(code_blocks_dict[k]) response_data = json.dumps({"code": 0, "data": fdata}) async_log_util.info(logger_request_api, f"结束请求{tool.get_thread_id()}-{url}") self.send_response(200) # 发给请求客户端的响应数据 self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(response_data.encode()) def do_POST(self): path = self.path url = urlparse.urlparse(path) if url.path == "/upload_kpl_data": # 接受开盘啦数据 params = self.__parse_request() result_str = self.__process_kpl_data(params) self.__send_response(result_str) if url.path == "/upload_codes_in_money": # 接收代码净流入金额 params = self.__parse_request() d = params["data"] d = json.loads(d) try: for code in d: CodeInMoneyManager().set_money(code, d[code]) except Exception as e: logging.exception(e) result_str = json.dumps({"code": 0}) self.__send_response(result_str) def __process_kpl_data(self, data_origin): def do_limit_up(result_list_): def request_new_blocks_codes(blocks_info, all_new_blocks): """ 请求新板块的代码 @param blocks_info:[(板块名称,板块代码)] @return: """ yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() blocks = set() for bi in blocks_info: if bi[0] in blocks: continue blocks.add(bi[0]) result = kpl_api.getCodesByPlate(bi[1]) result = json.loads(result) code_info_list = [] for d in result["list"]: if d[0] in yesterday_codes: continue # 涨幅要大于5% rate = d[6] / int(round((tool.get_limit_up_rate(d[0]) - 1) * 10)) if rate / ((tool.get_limit_up_rate(d[0]) - 1) * 10) < 5: continue # 格式:(代码,涨幅) code_info_list.append((d[0], d[6])) # 保存新题材 datas = [(d[0], d[6]) for d in result["list"]] async_log_util.info(logger_kpl_new_blocks, f"{(tool.get_thread_id(), bi, datas)}") if code_info_list: # 将代码加入新题材 new_block_processor.process_new_block_by_component_codes(bi[0], set([x[0] for x in code_info_list]), all_new_blocks) try: if result_list_: # 保存涨停时间 codes_set = set() limit_up_reasons = {} limit_up_time_dict = {} for d in result_list_: code = d[0] limit_up_reasons[code] = d[5] codes_set.add(code) limit_up_time = time.strftime("%H:%M:%S", time.localtime(d[2])) if tool.is_can_buy_code(code): code_price_manager.Buy1PriceManager().set_limit_up_time(code, limit_up_time) limit_up_time_dict[code] = limit_up_time add_codes = codes_set - self.__latest_limit_up_codes_set self.__latest_limit_up_codes_set = codes_set if limit_up_reasons: # 统计涨停原因的票的个数 limit_up_reason_code_dict = {} for code in limit_up_reasons: b = limit_up_reasons[code] if b not in limit_up_reason_code_dict: limit_up_reason_code_dict[b] = set() limit_up_reason_code_dict[b].add(code) # 涨停时间code LCancelRateManager.set_block_limit_up_count(limit_up_reason_code_dict, limit_up_time_dict) if add_codes: for code in add_codes: # 根据涨停原因判断是否可以买 if tool.is_can_buy_code(code): try: # 判断是否下单 trade_state = trade_manager.CodesTradeStateManager().get_trade_state(code) if trade_state == trade_constant.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_constant.TRADE_STATE_BUY_DELEGATED: # 委托中的订单,判断是否需要撤单 if not gpcode_manager.WantBuyCodesManager().is_in_cache(code): yesterday_codes = kpl_data_manager.get_yesterday_limit_up_codes() current_limit_up_datas, limit_up_record_datas, yesterday_current_limit_up_codes, before_blocks_dict = kpl_data_manager.KPLLimitUpDataRecordManager.latest_origin_datas, kpl_data_manager.KPLLimitUpDataRecordManager.total_datas, yesterday_codes, block_info.get_before_blocks_dict() if not current_limit_up_datas: current_limit_up_datas = [] if not limit_up_record_datas: limit_up_record_datas = [] # 买绝对老大 # 中途不能撤单 # if CodePlateKeyBuyManager.is_need_cancel(code, limit_up_reasons.get(code), # current_limit_up_datas, # limit_up_record_datas, # yesterday_current_limit_up_codes, # before_blocks_dict): # l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, # f"涨停原因({limit_up_reasons.get(code)})不是老大撤单", # "板块撤") except Exception as e: logger_debug.exception(e) kpl_data_manager.KPLLimitUpDataRecordManager.save_record(tool.get_now_date_str(), result_list_) try: LatestLimitUpBlockManager().set_current_limit_up_data(tool.get_now_date_str(), result_list_) except: pass try: CodeLimitUpSequenceManager().set_current_limit_up_datas(result_list_) ContainsLimitupCodesBlocksManager().set_current_limit_up_datas(result_list_) except: pass try: OpenLimitUpGoodBlocksBuyStrategy.set_current_limit_up_data(result_list_) RadicalBuyBlockManager.set_current_limit_up_datas(result_list_) except: pass try: # 新题材 new_block_processor.process_limit_up_list({x[0]: x[5] for x in result_list_}) new_block_codes = new_block_processor.screen_new_blocks_with_limit_up_datas( [(x[0], x[5]) for x in result_list_]) if new_block_codes: # 统计板块的代码 records = KPLLimitUpDataRecordManager.total_datas block_plate_code_dict = {} for x in records: block_plate_code_dict[kpl_util.filter_block(x[2])] = x[15] # 新板块 update_new_block_plates = [] for b in new_block_codes: for c in new_block_codes[b]: new_block_processor.process_new_block_by_limit_up_list(c, b) for r in new_block_codes: if r in block_plate_code_dict: update_new_block_plates.append((r, block_plate_code_dict[r])) if update_new_block_plates: # 需要获取板块下的代码 self.__new_blocks_codes_request_thread_pool.submit( lambda: request_new_blocks_codes(update_new_block_plates, new_block_codes.keys())) except Exception as e: logger_debug.exception(e) self.__kplDataManager.save_data(type_, result_list_) except Exception as e: logger_debug.exception(e) # 将"概念"二字替换掉 data = data_origin type_ = data["type"] print("开盘啦type:", type_) if type_ == KPLDataType.BIDDING.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_BIDDING) # 竞价取前20 if result_list: result_list.sort(key=lambda x: x[2]) result_list.reverse() result_list = result_list[:20] bs = [] for d in result_list: bs.append((d[0], f"{d[2] // 10000}万")) bidding_money_manager.set_bidding_money(bs[:10]) self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.LIMIT_UP.value: result_list, day = kpl_util.parseLimitUpData(data["data"]) if day and day != tool.get_now_date_str(): pass else: self.__data_process_thread_pool.submit(lambda: do_limit_up(result_list)) # 记录涨停日志 logger_kpl_limit_up.info(result_list) elif type_ == KPLDataType.OPEN_LIMIT_UP.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_OPEN_LIMIT_UP) if result_list: self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.LIMIT_DOWN.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_LIMIT_DOWN) if result_list: self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.EVER_LIMIT_DOWN.value: result_list = kpl_util.parseDaBanData(data["data"], kpl_util.DABAN_TYPE_EVER_LIMIT_DOWN) if result_list: self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.FENG_KOU.value: fdata = data["data"] result_list = kpl_util.parseFengKou(fdata) result_list.sort(key=lambda x: x[3]) result_list.reverse() self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.BEST_FENG_KOU.value: result_list = kpl_util.parseBestFengKou(data["data"]) if result_list: self.__kplDataManager.save_data(type_, result_list) # 保存最强风口 elif type_ == KPLDataType.FENG_XIANG.value: result_list = kpl_util.parseFengXiang(data["data"]) # 保存风向数据 if result_list: self.__kplDataManager.save_data(type_, result_list) elif type_ == KPLDataType.INDUSTRY_RANK.value: result_list = kpl_util.parseIndustryRank(data["data"]) # 保存行业数据 if result_list: self.__kplDataManager.save_data(type_, result_list) RealTimeKplMarketData.set_top_5_industry(result_list) elif type_ == KPLDataType.JINGXUAN_RANK.value: # result_list = kpl_util.parseMarketJingXuan(data["data"]) result_list = json.loads(data["data"]) # 保存精选数据 if result_list: self.__kplDataManager.save_data(type_, result_list) RealTimeKplMarketData.set_market_jingxuan_blocks(result_list) elif type_ == KPLDataType.JINGXUAN_RANK_OUT.value: # result_list = kpl_util.parseMarketJingXuan(data["data"]) result_list = json.loads(data["data"]) # 保存精选数据 if result_list: self.__kplDataManager.save_data(type_, result_list) RealTimeKplMarketData.set_market_jingxuan_out_blocks(result_list) elif type_ == KPLDataType.MARKET_STRONG.value: strong = data["data"] logger_kpl_market_strong.info(strong) # 保存市场热度 if strong is not None: RealTimeKplMarketData.set_market_strong(strong) return json.dumps({"code": 0}) def __send_response(self, data): # 发给请求客户端的响应数据 self.send_response(200) self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(data.encode()) def __parse_request(self): params = {} datas = self.rfile.read(int(self.headers['content-length'])) _str = str(datas, encoding="gbk") # print(_str) params = json.loads(_str) return params class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): pass def run(addr, port): # 运行看盘消息采集 # kp_client_msg_manager.run_capture() kpl_data_manager.PullTask.run_pull_task() handler = DataServer # httpd = socketserver.TCPServer((addr, port), handler) try: httpd = ThreadedHTTPServer((addr, port), handler) print("HTTP server is at: http://%s:%d/" % (addr, port)) httpd.serve_forever() except Exception as e: logger_system.exception(e) logger_system.error(f"端口服务器:{port} 启动失败") if __name__ == "__main__": run("", 9004)