""" 网络代理管理器 """ # 本地看盘网络代理 import decimal import json import time from urllib.parse import parse_qs, urlparse import setting import trade_gui from kpl import kpl_util, kpl_api from kpl.kpl_data_manager import KPLLimitUpDataManager from utils import tool, xgb_api class LocalKanPanNetworkDelegate: # 保存代码涨幅 __code_limit_rate_dict = {} # HTTP代理请求 @classmethod def http_delegate_request(cls, url): if url.startswith("/kpl/get_limit_up_list"): return cls.__get_limit_up_list(url) elif url.startswith("/kpl/get_market_data"): return cls.__get_market_data(url) elif url.startswith("/kpl/get_plate_codes"): return cls.__get_plate_codes(url) elif url.startswith("/get_xgb_limit_up_reasons"): return cls.__get_xgb_limit_up_reasons(url) elif url.startswith("/get_kpl_market_feelings"): return cls.__get_kpl_market_feelings() elif url.startswith("/buy_by_ths"): # 获取可转债的列表 return cls.buy_by_ths(url) return None, False @classmethod def __get_limit_up_list(cls, url): ps_dict = dict([(k, v[0]) for k, v in parse_qs(urlparse(url).query).items()]) day = ps_dict.get("day") if day is None: day = tool.get_now_date_str() # (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额, 涨停原因代码, 涨停原因代码数量) records = KPLLimitUpDataManager().get_limit_up_history_datas(day) currents = KPLLimitUpDataManager().get_limit_up_current_datas(day) current_codes = [d[0] for d in currents] fresult = [] # 计算涨停时间排序 record_reason_dict = {} current_reason_dict = {} for r in records: if r[5] not in record_reason_dict: record_reason_dict[r[5]] = [] record_reason_dict[r[5]].append(r) for r in currents: if r[5] not in current_reason_dict: current_reason_dict[r[5]] = [] current_reason_dict[r[5]].append(r) for k in record_reason_dict: record_reason_dict[k].sort(key=lambda x: x[2]) for r in records: if r[0] in current_codes: limit_up_state = 1 else: limit_up_state = 2 rank = 0 for i in range(len(record_reason_dict[r[5]])): if record_reason_dict[r[5]][i][0] == r[0]: rank = i # (代码, 名称, 涨停状态(0 - 无状态 1-涨停 2-炸板), 龙几, 首板, 分值, 涨停时间, 原因, 相同原因代码数量, 自由流通, 涨停原因是否变化,涨停原因的流入净额,下单简介) fresult.append((r[0], r[1], limit_up_state, f"龙{rank + 1}", r[4], 0, tool.time_format(int(r[2])), r[5], r[10], tool.money_desc(r[7]), '', '', '')) # (板块名称,涨停代码数量,炸板数量,涨停时间) limit_up_reason_statistic_info = [(k, len(record_reason_dict[k]), len(record_reason_dict[k]) - len( current_reason_dict.get(k) if k in current_reason_dict else []), record_reason_dict[k][0][5]) for k in record_reason_dict] limit_up_reason_statistic_info.sort(key=lambda x: x[1] - x[2]) limit_up_reason_statistic_info.reverse() result = json.dumps({"code": 0, "data": {"limit_up_count": len(currents), "open_limit_up_count": len(records) - len(currents), "limit_up_reason_statistic": limit_up_reason_statistic_info, "limit_up_codes": fresult}}) return result, True @classmethod def __get_market_data(cls, url): ps_dict = dict([(k, v[0]) for k, v in parse_qs(urlparse(url).query).items()]) type_ = int(ps_dict['type']) result = [] if type_ == 0: # 行业,主力净额倒序 result = kpl_api.getMarketIndustryRealRankingInfo(True) result = kpl_util.parseMarketIndustry(result) elif type_ == 1: # 行业,主力净额顺序 result = kpl_api.getMarketIndustryRealRankingInfo(False) result = kpl_util.parseMarketIndustry(result) elif type_ == 2: # 精选,主力净额倒序 result = [] for p in range(2): result_ = kpl_api.getMarketJingXuanRealRankingInfo(p + 1, True) result_ = kpl_util.parseMarketJingXuan(result_) result.extend(result_) elif type_ == 3: # 精选,主力净额顺序 result = [] for p in range(2): result_ = kpl_api.getMarketJingXuanRealRankingInfo(p + 1, False) result_ = kpl_util.parseMarketJingXuan(result_) result.extend(result_) fresult = [] forbidden_plates = [] for d in result: d = list(d) d.append(1 if d[1] in forbidden_plates else 0) fresult.append(d) response_data = json.dumps({"code": 0, "data": fresult}) return response_data, True @classmethod def __get_plate_codes(cls, url): ps_dict = dict([(k, v[0]) for k, v in parse_qs(urlparse(url).query).items()]) day = ps_dict.get("day") plate = ps_dict.get("plate") if not day: return None, False records = KPLLimitUpDataManager().get_limit_up_history_datas(day) currents = KPLLimitUpDataManager().get_limit_up_current_datas(day) rs = [] for r in records: if r[5] != plate and plate not in r[6].split('、'): continue rs.append(r) current_codes = set() for r in currents: current_codes.add(r[0]) rs.sort(key=lambda x: x[2]) # 代码,名称,涨停时间,是否炸板,是否想买,是否已经下过单 results = [( x[0], x[1], time.strftime("%H:%M:%S", time.localtime(x[2])), 0 if x[0] in current_codes else 1, 0, 0, x[4], f" {round(x[7] / 100000000, 1)}亿", 1 if x[5] == plate else 0) for x in rs] response_data = json.dumps({"code": 0, "data": results}) return response_data, True # 获取昨日收盘价 __pre_close_cache = {} @classmethod def __get_xgb_limit_up_reasons(cls, url): # 获取 昨日涨停数据 ps_dict = dict([(k, v[0]) for k, v in parse_qs(urlparse(url).query).items()]) code = ps_dict["code"] reasons = xgb_api.get_code_limit_up_reasons(code) if reasons: response_data = json.dumps( {"code": 0, "data": {"reasons": reasons[0], "description": reasons[1], "blocks": reasons[2], "zylt": f"{reasons[3]}亿"}}) else: response_data = json.dumps({"code": 1, "msg": "尚未获取到涨停原因"}) return response_data, True @classmethod def __get_kpl_market_feelings(cls): market_feelings = kpl_api.getMarketFelling() market_strong = kpl_api.getMarketStrong() ps = [('涨停', market_feelings['ZT'], 'red')] ps.append(('>%7', int(market_feelings['8']) + int(market_feelings['9']) + int( market_feelings['10']), 'red')) ps.append(('7~5%', int(market_feelings['6']) + int(market_feelings['7']), 'red')) ps.append(('5~2%', int(market_feelings['3']) + int(market_feelings['4']) + int(market_feelings['5']), 'red')) ps.append(('2~0%', int(market_feelings['2']) + int(market_feelings['1']), 'red')) ps.append(('平', int(market_feelings['0']), 'gray')) ps.append(('0~2%', int(market_feelings['-1']) + int(market_feelings['-2']), 'green')) ps.append( ('2~5%', int(market_feelings['-3']) + int(market_feelings['-4']) + int(market_feelings['-5']), 'green')) ps.append(('5~7%', int(market_feelings['-6']) + int(market_feelings['-7']), 'green')) ps.append( ('7%<', int(market_feelings['-8']) + int(market_feelings['-9']) + int(market_feelings['-10']), 'green')) ps.append(('跌停', market_feelings['DT'], 'green')) fdata = {"strong": market_strong, "map": ps, "SJZT": market_feelings['SJZT'], "SJDT": market_feelings['SJDT'], "SZJS": market_feelings['SZJS'], "XDJS": market_feelings['XDJS'], "sign": market_feelings['sign'], "qscln": market_feelings['qscln'] // 10000, "q_zrcs": market_feelings['q_zrcs'] // 10000} # 获取市场情绪 return json.dumps( {"code": 0, "data": fdata}), True __cb_codes_list = None # 所有可转债数据 __pre_price_dict = {} # 昨日收盘价 @classmethod def buy_by_ths(cls, url): can_buy = setting.is_can_ths_buy() if can_buy: try: ps_dict = dict([(k, v[0]) for k, v in parse_qs(urlparse(url).query).items()]) code = ps_dict["code"] trade_gui.start_buy(code) return json.dumps( {"code": 0, "data": {}}), True except Exception as e: return json.dumps( {"code": 1, "msg": str(e)}), True else: return json.dumps( {"code": 1, "msg": '同花顺不能买入'}), True if __name__ == "__main__": pass