Administrator
2024-06-11 c9c4d1763e70e93c270490eae62d99bc14f23892
板块融入
4个文件已添加
3个文件已修改
1320 ■■■■■ 已修改文件
main.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_api.py 235 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_block_util.py 205 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_data_manager.py 528 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/kpl_util.py 325 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/constant.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.py
@@ -12,6 +12,7 @@
from huaxin_client.client_network import SendResponseSkManager
from log_module import async_log_util, log_export
from records import huaxin_trade_record_manager
from third_data.kpl_data_manager import PullTask
from trade import huaxin_trade_api, huaxin_trade_data_update, huaxin_sell_util, backtest_trade
from trade.buy_strategy import BuyStrategyDataManager
from utils import middle_api_protocol, outside_api_command_manager, constant, tool, huaxin_util, socket_util, sell_util, \
@@ -328,6 +329,10 @@
    # 华鑫交易数据更新
    huaxin_trade_data_update.run()
    # 定时拉取开盘啦涨停数据
    PullTask.run_limit_up_task()
    # 仿真交易不运行交易客户端
    # # ===========运行交易外部API==========
    #
third_data/kpl_api.py
New file
@@ -0,0 +1,235 @@
import json
import time
import requests
from third_data import kpl_util
from utils import middle_api_protocol
# 竞价
DABAN_TYPE_BIDDING = 8
# 涨停
DABAN_TYPE_LIMIT_UP = 1
# 炸板
DABAN_TYPE_OPEN_LIMIT_UP = 2
# 跌停
DABAN_TYPE_LIMIT_DOWN = 3
# 曾跌停
DABAN_TYPE_EVER_LIMIT_DOWN = 5
def __base_request(url, data, timeout=10):
    DELEGATE = True
    if not DELEGATE:
        headers = {
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
            "User-Agent": "Dalvik / 2.1.0(Linux;U;Android 6.0.1;MuMu Build/V417IR)"
        }
        # proxies={'https': '192.168.3.251:9002'}
        # 禁止代理,不然会走本地代理
        response = requests.post(url, data=data, headers=headers, proxies={"http": None, "https": None},
                                 timeout=timeout)
        if response.status_code != 200:
            raise Exception("请求出错")
        return response.text
    else:
        fdata = middle_api_protocol.load_kpl(url, data, timeout)
        return middle_api_protocol.request(fdata)
def __getLimitUpInfo(pidType, page, pageSize):
    data = f"Order=0&a=DailyLimitPerformance&st={pageSize}&apiv=w35&Type=4&c=HomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e24&VerSion=5.13.0.0&Index={(page - 1) * pageSize}&PidType={pidType}&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    return result
def getLimitUpInfoNew():
    pids = [(1, "首板"), (2, "2连板"), (3, "3连板"), (4, "4连板"), (5, "")]
    fresults = []
    for pid_info in pids:
        results = []
        for i in range(100):
            start_time = time.time()
            result = __getLimitUpInfo(pid_info[0], i + 1, 20)
            result = json.loads(result)
            datas = result["info"][0]
            results.extend(datas)
            day = result["info"][1]
            if len(datas) < 20:
                break
        for r in results:
            if not r[18] and pid_info[1]:
                r[18] = pid_info[1]
            # 替换掉板块名称
            for i in range(len(r)):
                if type(r[i]) == str:
                    r[i] = kpl_util.filter_block(r[i])
        fresults.extend(results)
    return json.dumps({"errcode": 0, "list": fresults})
def getHistoryLimitUpInfo(day):
    fresults = []
    for i in range(0, 100):
        data = f"Order=1&a=HisDaBanList&st=20&c=HisHomeDingPan&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index={i * 20}&Is_st=1&PidType=1&apiv=w32&Type=6&FilterMotherboard=0&Filter=0&FilterTIB=0&Day={day}&FilterGem=0&"
        result = __base_request("https://apphis.longhuvip.com/w1/api/index.php", data=data)
        result = json.loads(result)
        result = result["list"]
        fresults.extend(result)
        if len(result) < 20:
            break
    return fresults
# 市场行情-行业
def getMarketIndustryRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                            data=data)
    return result
# 市场行情-精选
def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=20&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                            data=data)
    return result
# 获取代码的板块
def getStockIDPlate(code):
    data = f"a=GetStockIDPlate_New&apiv=w32&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    result = json.loads(result)
    if int(result["errcode"]) != 0:
        return None
    return result["ListJX"] if result["ListJX"] else result["List"]
# 获取概念代码
def getCodesByPlate(plate_code):
    data = f"Order=1&a=ZhiShuStockList_W8&st=30&c=ZhiShuRanking&PhoneOSNew=1&old=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&IsZZ=0&Token=0&Index=0&apiv=w32&Type=6&IsKZZType=0&UserID=0&PlateID={plate_code}&"
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 获取概念中的板块强度
def getSonPlate(plate_code):
    data = f"a=SonPlate_Info&apiv=w32&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&PlateID={plate_code}&"
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
# 市场行情-行业
def getMarketIndustryRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=4&"
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 市场行情-精选
def getMarketJingXuanRealRankingInfo(orderJingE_DESC=True):
    data = f"Order={1 if orderJingE_DESC else 0}&a=RealRankingInfo&st=80&apiv=w32&Type=5&c=ZhiShuRanking&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Index=0&ZSType=7&"
    return __base_request("https://apphq.longhuvip.com/w1/api/index.php",
                          data=data)
# 获取代码的精选板块
# 返回格式:[(板块代码,板块名称,涨幅百分比)]
def getCodeJingXuanBlocks(code):
    data = f"a=GetStockIDPlate&apiv=w32&Type=2&c=StockL2Data&StockID={code}&PhoneOSNew=1&UserID=0&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&Token=0&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data)
    result = json.loads(result)
    print(result)
    return result.get("ListJX") if result.get("ListJX") else result.get("List")
# 获取自由流通市值
def getZYLTAmount(code):
    data = f"a=GetStockPanKou_Narrow&apiv=w32&c=StockL2Data&VerSion=5.8.0.2&State=1&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&StockID={code}&"
    result = __base_request("https://apphq.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    if "real" in result:
        return result["real"].get("actualcirculation_value")
    return None
# 获取F10中的精选板块
def __getConceptJXBK(code):
    data = f"a=GetConceptJXBKw23&apiv=w32&c=StockF10Basic&StockID={code}&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&"
    result = __base_request("https://apparticle.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    if result:
        if "List" in result:
            names = [kpl_util.filter_block(x["CName"]) for x in result["List"]]
            return names
    return []
# 获取F10常规板块
def __getConceptBK(code):
    data = f"a=GetConceptw23&apiv=w32&c=StockF10Basic&StockID={code}&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb9-6d893c846e23&VerSion=5.8.0.2&"
    result = __base_request("https://apparticle.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    if result:
        if "List" in result:
            names = [kpl_util.filter_block(x["CName"]) for x in result["List"]]
            return names
    return []
# 获取代码的板块
def getCodeBlocks(code):
    blocks = []
    try:
        _bks = __getConceptJXBK(code)
        print(_bks)
        if _bks:
            blocks.extend(_bks)
    except:
        pass
    if not blocks:
        try:
            _bks = __getConceptBK(code)
            print(_bks)
            if _bks:
                blocks.extend(_bks)
        except:
            pass
    return list(set(blocks))
# 获取F10里面的大事件
def __getF10BigReminders(code):
    data = f"a=BigReminder&st=25&apiv=w32&c=StockF10Basic&StockID={code}&PhoneOSNew=1&DeviceID=a38adabd-99ef-3116-8bb8-6d893c846e23&VerSion=5.8.0.2&Index=0&"
    result = __base_request("https://apparticle.longhuvip.com/w1/api/index.php", data=data, timeout=3)
    result = json.loads(result)
    if int(result["errcode"]) == 0:
        return result["info"]
    return None
# 获取股票减持日期
def getCodeReductionDate(code):
    infos = __getF10BigReminders(code)
    if not infos:
        return None
    keys = ["终止", "结束", "结果", "不减持", "完成"]
    for info in infos:
        if info.get("type") == 5:
            title = str(info.get("title"))
            if title.find("减持") < 0:
                continue
            wrong = False
            for k in keys:
                if title.find(k) >= 0:
                    wrong = True
            if wrong:
                continue
            return info.get("date").split(" ")[0]
    return None
if __name__ == "__main__":
    print(getZYLTAmount("300198"))
third_data/kpl_block_util.py
New file
@@ -0,0 +1,205 @@
"""
开盘啦板块工具
"""
# 是否是强势板块
# current_limit_up_datas:实时涨停数据 (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额,涨停原因代码,涨停原因代码数量)
import datetime
import time
import constant
from utils import tool
# 是否主板开1
# limit_up_record_datas 今日历史涨停
def get_shsz_open_limit_up_codes(code, block, limit_up_record_datas, code_block_dict):
    # 获取今日9:30的时间戳
    time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00"
    timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))
    limit_up_codes = set()
    if limit_up_record_datas:
        for k in limit_up_record_datas:
            if code_block_dict.get(k[3]) == block:
                if int(k[5]) < timestamp:
                    limit_up_codes.add(k[3])
    return limit_up_codes
# 获取主板开1且目前是涨停的代码
def get_shsz_open_limit_up_codes_current(code, block, current_limit_up_datas):
    # 获取今日9:30的时间戳
    time_str = datetime.datetime.now().strftime("%Y-%m-%d") + " 09:30:00"
    timestamp = time.mktime(time.strptime(time_str, '%Y-%m-%d %H:%M:%S'))
    limit_up_codes = set()
    for k in current_limit_up_datas:
        if k[5] == block:
            if int(k[2]) < timestamp:
                limit_up_codes.add(k[0])
    return limit_up_codes
# 代码是否是后排
def is_back_row(code, block, current_limit_up_datas):
    codes = set()
    for k in current_limit_up_datas:
        if k[5] == block:
            codes.add(k[0])
    codes.discard(code)
    if len(codes) == 0:
        return False
    else:
        return True
# 是否是前几的板块
# 板块中有主板涨停的才参与排序(排序时间按照板块中的涨停时间来排序)
def __is_top_block(block, block_codes_infos, topn):
    block_limit_up_dict = {}
    for b in block_codes_infos:
        if b[1] not in block_limit_up_dict:
            block_limit_up_dict[b[1]] = []
        block_limit_up_dict[b[1]].append(b)
    # 剔除只有非主板涨停的板块
    invalid_blocks = []
    for k in block_limit_up_dict:
        has_shsz = False
        for b in block_limit_up_dict[k]:
            if tool.is_can_buy_code(b[0]):
                has_shsz = True
                break
        if not has_shsz:
            invalid_blocks.append(k)
    for k in invalid_blocks:
        block_limit_up_dict.pop(k)
    # 每个板块涨停时间排序
    invalid_blocks = []
    for k in block_limit_up_dict:
        # 删除宽泛概念
        if k in constant.KPL_INVALID_BLOCKS:
            invalid_blocks.append(k)
            continue
        block_limit_up_dict[k].sort(key=lambda x: x[2])
    for k in invalid_blocks:
        block_limit_up_dict.pop(k)
    block_codes_infos = [block_limit_up_dict[k][0] for k in block_limit_up_dict]
    block_codes_infos.sort(key=lambda x: x[2])
    # 去除通用涨停原因
    index = 0
    for d in block_codes_infos:
        if d[1] == block:
            if index + 1 <= topn:
                return True, block_codes_infos[:topn]
            else:
                return False, block_codes_infos[:topn]
        index += 1
    if index <= topn:
        return True, block_codes_infos[:topn]
    return False, block_codes_infos[:topn]
def is_record_top_block(code, block, limit_up_record_datas, yesterday_current_limit_up_codes, topn):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in limit_up_record_datas:
        # 判断是否是首板
        if k[0] in yesterday_current_limit_up_codes:
            continue
        if k[3] != code:
            block_codes_infos.append((k[3], k[2], int(k[5])))
        else:
            limit_up_time = int(k[5])
    block_codes_infos.append((code, block, limit_up_time))
    # 排序
    return __is_top_block(block, block_codes_infos, topn)
def is_current_top_block(code, block, current_limit_up_datas, yesterday_current_limit_up_codes, topn):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in current_limit_up_datas:
        # 判断是否是首板
        if k[0] in yesterday_current_limit_up_codes:
            continue
        if k[0] != code:
            block_codes_infos.append((k[0], k[5], int(k[2])))
        else:
            limit_up_time = int(k[2])
    # 排序
    block_codes_infos.append((code, block, limit_up_time))
    # 排序
    return __is_top_block(block, block_codes_infos, topn)
# 获取当日历史身位
# shsz:是否主板
def get_code_record_rank(code, block, limit_up_record_datas, code_limit_up_reason_dict,
                         yesterday_current_limit_up_codes, shsz=True):
    block_codes_infos = []
    limit_up_time = time.time()
    for k in limit_up_record_datas:
        if k[3] == code:
            # 获取当前代码涨停时间
            limit_up_time = int(k[5])
        if shsz and not tool.is_can_buy_code(k[3]):
            continue
        # 剔除高位板
        if k[3] in yesterday_current_limit_up_codes:
            continue
        if code_limit_up_reason_dict.get(k[3]) == block:
            if k[3] != code:
                block_codes_infos.append((k[3], int(k[5])))
    block_codes_infos.append((code, limit_up_time))
    block_codes_infos.sort(key=lambda x: x[1])
    front_codes = []
    for i in range(0, len(block_codes_infos)):
        if block_codes_infos[i][0] == code:
            return i, front_codes
        else:
            front_codes.append(block_codes_infos[i][0])
    return 0, []
# 获取当日实时身位
# before_blocks_dict格式位{"代码":set("板块")}
def get_code_current_rank(code, block, current_limit_up_datas, code_limit_up_reasons_dict,
                          yesterday_current_limit_up_codes, exclude_codes, open_limit_up_count, shsz=False,
                          limit_up_time=time.time()):
    block_codes_infos = []
    for k in current_limit_up_datas:
        if k[0] == code:
            # 获取当前代码涨停时间
            limit_up_time = int(k[2])
        if shsz and not tool.is_can_buy_code(k[0]):
            continue
        # 剔除高位板
        if k[0] in yesterday_current_limit_up_codes:
            continue
        if code_limit_up_reasons_dict.get(k[0]) and block in code_limit_up_reasons_dict.get(k[0]):
            if k[0] != code:
                # 代码.涨停时间
                block_codes_infos.append((k[0], int(k[2])))
    block_codes_infos.append((code, limit_up_time))
    block_codes_infos.sort(key=lambda x: x[1])
    front_codes = []
    first_count = 0
    for i in range(0, len(block_codes_infos)):
        if i == open_limit_up_count and exclude_codes and block_codes_infos[i][0] in exclude_codes:
            # 非开1老大被排除
            first_count += 1
            continue
        if block_codes_infos[i][0] == code:
            return i - first_count, front_codes
        else:
            front_codes.append(block_codes_infos[i][0])
    return 0, []
if __name__ == "__main__":
    pass
third_data/kpl_data_manager.py
New file
@@ -0,0 +1,528 @@
import copy
import json
import logging
import os
import threading
import time
import requests
from db.redis_manager_delegate import RedisUtils
from log_module import async_log_util, log
from utils import tool, constant
# 开盘啦历史涨停数据管理
from db import mysql_data_delegate as mysql_data, redis_manager_delegate as redis_manager
from log_module.log import logger_kpl_limit_up_reason_change, logger_debug, \
    logger_kpl_open_limit_up, logger_kpl_block_can_buy
from third_data import kpl_util, kpl_api
# 代码对应的涨停原因保存
from third_data.kpl_util import KPLPlatManager, KPLDataType
class KPLLimitUpDataRecordManager:
    total_datas = None
    latest_datas = {}
    latest_origin_datas = []
    __kplPlatManager = KPLPlatManager()
    __current_code_reasons_dict = {}
    # 当前涨停原因+推荐原因的代码集合
    __current_reason_codes_dict = {}
    # 当前涨停原因的代码集合
    __current_limit_up_reason_codes_dict = {}
    __records_cache = {}
    record_code_dict = {}
    @classmethod
    def save_record(cls, day, records):
        # 统计炸板
        try:
            last_codes = set()
            if cls.latest_origin_datas:
                last_codes = set([x[0] for x in cls.latest_origin_datas])
            now_codes = set()
            if records:
                now_codes = set([x[0] for x in records])
            open_limit_up_codes = last_codes - now_codes
            if open_limit_up_codes:
                logger_kpl_open_limit_up.info(f"炸板代码:{open_limit_up_codes}")
        except Exception as e:
            pass
        # 统计代码所属板块
        code_block_dict = {}
        for data in records:
            cls.record_code_dict[data[0]] = data
            blocks = set(data[5].split("、"))
            code = data[0]
            for b in blocks:
                if not code_block_dict.get(code):
                    code_block_dict[code] = set()
                code_block_dict[code].add(b)
                # 设置涨停数据
        if records:
            cls.latest_origin_datas = records
        code_reasons_dict = {}
        reason_codes_dict = {}
        limit_up_reason_codes_dict = {}
        for d in records:
            if d[5] not in limit_up_reason_codes_dict:
                limit_up_reason_codes_dict[d[5]] = set()
            limit_up_reason_codes_dict[d[5]].add(d[0])
            # 涨停原因 + 推荐原因
            bs = {d[5]}
            if d[6]:
                bs |= set(d[6].split("、"))
            code_reasons_dict[d[0]] = bs
            for b in bs:
                if b not in reason_codes_dict:
                    reason_codes_dict[b] = set()
                reason_codes_dict[b].add(d[0])
        cls.__current_code_reasons_dict = code_reasons_dict
        cls.__current_reason_codes_dict = reason_codes_dict
        cls.__current_limit_up_reason_codes_dict = limit_up_reason_codes_dict
        # 涨停数据记录
        mysqldb = mysql_data.Mysqldb()
        # 统计涨停原因和概念代码
        plats = {}
        for d in records:
            plats[d[5]] = d[9]
        for p in plats:
            cls.__kplPlatManager.save_plat(plats[p], p)
        for d in records:
            # (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额,涨停原因代码,涨停原因代码数量)
            code = d[0]
            _id = f"{day}_{code}_{d[5]}"
            result = mysqldb.select_one("select * from kpl_limit_up_record where _id='{}'".format(_id))
            if not result:
                mysqldb.execute(
                    f"insert into kpl_limit_up_record(_id,_day,_hot_block_name,_code,_code_name,_limit_up_time,_blocks,_latest_limit_up_time,_update_time,_create_time,_hot_block_code_count,_limit_up_high_info,_zylt_val) values('{_id}','{day}','{d[5]}','{d[0]}','{d[1]}','{d[2]}','{d[6]}','{d[3]}',now(),now(),{d[10]},'{d[4]}',{d[7]})")
            else:
                if _id in cls.latest_datas and json.dumps(cls.latest_datas.get(_id)) != json.dumps(d):
                    mysqldb.execute(
                        f"update kpl_limit_up_record set _latest_limit_up_time='{d[3]}',_limit_up_time='{d[2]}',_hot_block_code_count={d[10]},_limit_up_high_info='{d[4]}' ,_update_time=now() where _id='{_id}'")
                    cls.latest_datas[_id] = d
            cls.latest_datas[_id] = d
            # 获取原来的代码所属板块,删除之前错误的板块
            old_datas = KPLLimitUpDataRecordManager.list_by_code(code, day)
            if old_datas:
                for dd in old_datas:
                    if dd[2] not in code_block_dict[code]:
                        mysqldb.execute(f"delete from kpl_limit_up_record where _id='{dd[0]}'")
                        logger_kpl_limit_up_reason_change.info(f"code-{dd[3]}:{dd[2]}-{code_block_dict[code]}")
                        # 板块更改过
                        mysqldb.execute(
                            f"update kpl_limit_up_record set _hot_block_change = f'{dd[2]}' where _day='{dd[1]}' and _code='{code}'")
                        cls.__LimitUpCodesPlateKeyManager.set_today_limit_up_reason_change(code, dd[2],
                                                                                           code_block_dict[code])
                        if dd[0] in cls.latest_datas:
                            cls.latest_datas.pop(dd[0])
        cls.total_datas = KPLLimitUpDataRecordManager.list_all(tool.get_now_date_str())
    @classmethod
    def load_total_datas(cls):
        cls.total_datas = KPLLimitUpDataRecordManager.list_all(tool.get_now_date_str())
    @staticmethod
    def list_all(day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(f"select * from kpl_limit_up_record where _day='{day}'")
    @classmethod
    def list_all_cache(cls, day):
        if day in cls.__records_cache:
            return cls.__records_cache[day]
        fdata = cls.list_all(day)
        if fdata:
            cls.__records_cache[day] = fdata
        return fdata
    @staticmethod
    def list_by_code(code, day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(f"select * from kpl_limit_up_record where _code='{code}' and _day='{day}'")
    @staticmethod
    def list_by_block(block_name, day):
        mysqldb = mysql_data.Mysqldb()
        return mysqldb.select_all(
            f"select * from kpl_limit_up_record where _hot_block_name='{block_name}' and _day='{day}'")
    @staticmethod
    def list_blocks_with_day(days):
        mysqldb = mysql_data.Mysqldb()
        sql = "select _hot_block_name,_day from kpl_limit_up_record where "
        wheres = []
        for day in days:
            wheres.append(f"_day = '{day}'")
        sql += " or ".join(wheres)
        sql += " group by _hot_block_name,_day"
        results = mysqldb.select_all(sql)
        return results
    @classmethod
    def get_current_blocks(cls, code):
        return cls.__current_code_reasons_dict.get(code)
    @classmethod
    def get_current_codes_by_block(cls, block):
        return cls.__current_reason_codes_dict.get(block)
    @classmethod
    def get_current_reason_codes_dict(cls):
        return copy.deepcopy(cls.__current_reason_codes_dict)
    @classmethod
    def get_current_limit_up_reason_codes_dict(cls):
        return copy.deepcopy(cls.__current_limit_up_reason_codes_dict)
    @classmethod
    def get_current_reasons(cls):
        if cls.__current_reason_codes_dict:
            return cls.__current_reason_codes_dict.keys()
        return set()
# 代码精选板块管理
class KPLCodeJXBlockManager:
    __db = constant.REDIS_DB
    __redisManager = redis_manager.RedisManager(constant.REDIS_DB)
    __code_blocks = {}
    # 备用
    __code_by_blocks = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(KPLCodeJXBlockManager, cls).__new__(cls, *args, **kwargs)
        return cls.__instance
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def save_jx_blocks(self, code, blocks: list, current_limit_up_blocks: set, by=False):
        if not blocks:
            return
        final_blocks = copy.deepcopy(blocks)
        if len(blocks) > 2:
            final_blocks.clear()
            for b in blocks:
                if b not in constant.KPL_INVALID_BLOCKS:
                    final_blocks.append(b)
            if len(final_blocks) < 2:
                final_blocks = blocks
        # 保存前2条数据
        if by:
            RedisUtils.setex_async(self.__db, f"kpl_jx_blocks_by-{code}", tool.get_expire(), json.dumps(final_blocks))
            self.__code_by_blocks[code] = (final_blocks, time.time())
        else:
            RedisUtils.setex_async(self.__db, f"kpl_jx_blocks-{code}", tool.get_expire(), json.dumps(final_blocks))
            self.__code_blocks[code] = (final_blocks, time.time())
    # 获取精选板块
    def get_jx_blocks(self, code, by=False):
        if by:
            if code in self.__code_by_blocks:
                return self.__code_by_blocks[code]
            val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks_by-{code}")
            if val is None:
                return None
            else:
                val = json.loads(val)
                self.__code_by_blocks[code] = val
            return self.__code_by_blocks[code]
        else:
            if code in self.__code_blocks:
                return self.__code_blocks[code]
            val = RedisUtils.get(self.__get_redis(), f"kpl_jx_blocks-{code}")
            if val is None:
                return None
            else:
                val = json.loads(val)
                self.__code_blocks[code] = val
            return self.__code_blocks[code]
    def get_jx_blocks_cache(self, code, by=False):
        if by:
            return self.__code_by_blocks.get(code)
        else:
            return self.__code_blocks.get(code)
    # 从网络上加载精选板块, 当前涨停的板块
    def load_jx_blocks(self, code, buy_1_price, limit_up_price, current_limit_up_blocks):
        try:
            # logger_kpl_block_can_buy.info(f"准备更新精选板块:{code}-{buy_1_price}-{limit_up_price}")
            if limit_up_price and buy_1_price:
                # 处理买1,卖1信息
                pre_close_price = round(float(limit_up_price) / tool.get_limit_up_rate(code), 2)
                # 如果涨幅大于7%就读取板块
                price_rate = (buy_1_price - pre_close_price) / pre_close_price
                if price_rate > 0.07:
                    jx_blocks_info = self.get_jx_blocks_cache(code)
                    if not jx_blocks_info:
                        start_time = time.time()
                        blocks = kpl_api.getCodeBlocks(code)
                        async_log_util.info(logger_kpl_block_can_buy,
                                            f"{code}:获取到精选板块-{blocks}  耗时:{int(time.time() - start_time)}s")
                        self.save_jx_blocks(code, blocks, current_limit_up_blocks)
                    else:
                        # 还没涨停的需要更新精选板块 更新精选板块
                        if abs(float(buy_1_price) - float(limit_up_price)) >= 0.001:
                            # 非涨停状态
                            UPDATE_TIME_SPACE = 5 * 60
                            time_diff = tool.trade_time_sub(tool.get_now_time_str(), "09:30:00")
                            if time_diff < 0:
                                UPDATE_TIME_SPACE = 60 * 60
                            else:
                                UPDATE_TIME_SPACE = int(time_diff / 30) + 60
                                if UPDATE_TIME_SPACE > 5 * 60:
                                    UPDATE_TIME_SPACE = 5 * 60
                            if time.time() - jx_blocks_info[1] > UPDATE_TIME_SPACE:
                                start_time = time.time()
                                # 距离上次更新时间过去了5分钟
                                blocks = kpl_api.getCodeBlocks(code)
                                async_log_util.info(logger_kpl_block_can_buy,
                                                    f"{code}:获取到精选板块(更新)-{blocks}  耗时:{int(time.time() - start_time)}s")
                                self.save_jx_blocks(code, blocks, current_limit_up_blocks)
                elif price_rate > 0.03:
                    # 添加备用板块
                    if not self.get_jx_blocks_cache(code, by=True):
                        start_time = time.time()
                        blocks = kpl_api.getCodeBlocks(code)
                        self.save_jx_blocks(code, blocks, current_limit_up_blocks, by=True)
                        async_log_util.info(logger_kpl_block_can_buy,
                                            f"{code}:获取到精选板块(备用)-{blocks}  耗时:{int(time.time() - start_time)}s")
        except Exception as e:
            logger_kpl_block_can_buy.error(f"{code} 获取板块出错")
            logger_kpl_block_can_buy.exception(e)
class KPLDataManager:
    __latest_datas = {}
    kpl_data_update_info = {}
    __file_content_cache = {}
    @classmethod
    def __save_in_file(cls, key, datas):
        name = f"{tool.get_now_date_str()}_{key}.log"
        path = f"{constant.CACHE_PATH}/{name}"
        with open(path, 'w') as f:
            f.write(json.dumps(datas))
    @classmethod
    def __get_from_file(cls, key, day=tool.get_now_date_str()):
        name = f"{day}_{key}.log"
        path = f"{constant.CACHE_PATH}/{name}"
        if not os.path.exists(path):
            return None
        with open(path, 'r') as f:
            lines = f.readlines()
            if lines:
                return json.loads(lines[0])
        return None
    @classmethod
    def get_from_file(cls, type, day):
        name = f"{day}_{type.value}.log"
        path = f"{constant.CACHE_PATH}/{name}"
        if not os.path.exists(path):
            return None
        with open(path, 'r') as f:
            lines = f.readlines()
            if lines:
                return json.loads(lines[0])
        return None
    @classmethod
    def get_from_file_cache(cls, type, day):
        key = f"{type}-{day}"
        if key in cls.__file_content_cache:
            return cls.__file_content_cache.get(key)
        fdata = cls.get_from_file(type, day)
        if fdata:
            cls.__file_content_cache[key] = fdata
        return fdata
    @classmethod
    # 获取最近几天的数据,根据日期倒序返回
    def get_latest_from_file(cls, type, count):
        files = os.listdir(constant.CACHE_PATH)
        file_name_list = []
        for f in files:
            if f[10:] == f"_{type.value}.log":
                file_name_list.append((f.split("_")[0], f))
        file_name_list.sort(key=lambda x: x[0], reverse=True)
        file_name_list = file_name_list[:count]
        fresults = []
        for file in file_name_list:
            path = f"{constant.CACHE_PATH}/{file[1]}"
            if not os.path.exists(path):
                continue
            with open(path, 'r') as f:
                lines = f.readlines()
                if lines:
                    fresults.append((file[0], json.loads(lines[0])))
        return fresults
    @classmethod
    def save_data(cls, type, datas):
        cls.kpl_data_update_info[type] = (tool.get_now_time_str(), len(datas))
        cls.__latest_datas[type] = datas
        cls.__save_in_file(type, datas)
    @classmethod
    def get_data(cls, type):
        type = type.value
        if type in cls.__latest_datas:
            return cls.__latest_datas[type]
        result = cls.__get_from_file(type)
        if result is not None:
            cls.__latest_datas[type] = result
        return result
def load_history_limit_up():
    for file_name in os.listdir(f"{constant.get_path_prefix()}/kpl/his"):
        if file_name.find("HisDaBanList_1.log") < 0:
            continue
        day = file_name[:10]
        with open(f"{constant.get_path_prefix()}/kpl/his/{file_name}", 'r', encoding="utf-16") as f:
            lines = f.readlines()
            line = lines[0]
            result = json.loads(line)
            list_ = kpl_util.parseDaBanData(result, kpl_util.DABAN_TYPE_LIMIT_UP)
            # KPLLimitUpDataRecordManager.save_record(day, list_)
            for r in list_:
                print(r[-1], r[5])
                KPLPlatManager().save_plat(r[-1], r[5])
            # print(day, list_)
# 历史涨停列表
__limit_up_list_records_dict = {}
# 获取最近几天的实时涨停信息
# 返回格式([日期,数据])
def get_current_limit_up_data_records(count):
    fresults = []
    day = tool.get_now_date_str()
    datas = []
    if day in __limit_up_list_records_dict:
        datas = __limit_up_list_records_dict[day]
    else:
        logger_debug.info("从文件中获取前几天的实时涨停数据")
        datas = KPLDataManager().get_latest_from_file(KPLDataType.LIMIT_UP, count + 2)
        if datas:
            # 保存数据
            __limit_up_list_records_dict[day] = datas
    for i in range(len(datas)):
        if datas[i][0] == day:
            continue
        fresults.append(datas[i])
        if len(fresults) >= count:
            break
    return fresults
def get_yesterday_limit_up_codes():
    yesterday_limit_up_data_records = get_yesterday_current_limit_up_records()
    yesterday_codes = set([x[0] for x in yesterday_limit_up_data_records])
    return yesterday_codes
def get_yesterday_current_limit_up_records():
    yesterday_limit_up_data_records = get_current_limit_up_data_records(1)[0][1]
    return yesterday_limit_up_data_records
# 获取最近几天涨停原因
__latest_current_limit_up_records = {}
def get_latest_current_limit_up_records():
    day = tool.get_now_date_str()
    if day not in __latest_current_limit_up_records:
        fdatas = get_current_limit_up_data_records(15)
        __latest_current_limit_up_records[day] = fdatas
    return __latest_current_limit_up_records.get(day)
class PullTask:
    # 最近更新时间
    __latest_update_time_dict = {}
    @classmethod
    def __upload_data(cls, type, datas):
        root_data = {
            "type": type,
            "data": datas
        }
        requests.post("http://127.0.0.1:9004/upload_kpl_data", json.dumps(root_data))
    @classmethod
    def repaire_pull_task(cls):
        """
        修复拉取任务
        @return:
        """
        # 修复涨停
        logger_debug.info("任务修复-开盘啦:启动修复")
        key = "limit_up"
        if key not in cls.__latest_update_time_dict or time.time() - cls.__latest_update_time_dict[key] > 20:
            logger_debug.info("任务修复-开盘啦:涨停列表")
            # 大于20s就需要更新
            threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
    @classmethod
    def run_limit_up_task(cls):
        # 关闭log
        log.close_print()
        while True:
            try:
                if (tool.is_trade_time() and int(tool.get_now_time_str().replace(':', '')) > int("092530")):
                    results = kpl_api.getLimitUpInfoNew()
                    result = json.loads(results)
                    start_time = time.time()
                    cls.__upload_data("limit_up", result)
            except Exception as e:
                try:
                    logging.exception(e)
                    logger_debug.exception(e)
                except:
                    pass
            except:
                pass
            finally:
                cls.__latest_update_time_dict["limit_up"] = time.time()
                time.sleep(3)
    @classmethod
    # 运行拉取任务
    def run_pull_task(cls):
        threading.Thread(target=cls.run_limit_up_task, daemon=True).start()
        # threading.Thread(target=get_bidding_money, daemon=True).start()
        # threading.Thread(target=get_market_industry, daemon=True).start()
        # threading.Thread(target=get_market_jingxuan, daemon=True).start()
if __name__ == "__main__":
    print(get_latest_current_limit_up_records())
    print(get_latest_current_limit_up_records())
    input()
third_data/kpl_util.py
New file
@@ -0,0 +1,325 @@
import enum
import json
from db import mysql_data_delegate as mysql_data
def parse_kpl_datas(results):
    start_y = -1
    end_x = -1
    index = 0
    datas = []
    for result in results:
        text = result[1]
        if text.find("股票名称") > -1:
            start_y = result[0][0][1]
        if text.find("竞价涨幅") > -1:
            end_x = result[0][0][0]
        if start_y > 0 and end_x > 0:
            if result[0][0][0] < end_x and result[0][0][1] > start_y and (result[0][1][0] - result[0][0][0]) > 30:
                datas.append(text)
                index += 1
    datas = datas[:3 * 5]
    fdatas = []
    temp = []
    for data in datas:
        temp.append(data)
        if len(temp) == 3:
            fdatas.append((temp[2][:6], temp[1]))
            temp = []
    return fdatas
# 涨停代码:
# (代码,名称,首次涨停时间,最近涨停时间,几板,涨停原因,板块,实际流通,主力净额,涨停原因代码,涨停原因代码数量)
# (0,1,6,25,9,16,11,15,12)
# 竞价代码:
# (代码,名称,涨停委买额,板块,竞价成交额,实际流通)
# (0,1,18,11,22,15)
# 炸板:
# (代码,名称,涨幅,板块,实际流通)
# (0,1,4,11,15)
# 跌停:
# (代码,名称,板块,实际流通)
# (0,1,11,15)
# 曾跌停:
# (代码,名称,涨幅,板块,实际流通)
# (0,1,4,11,15)
DABAN_TYPE_BIDDING = 8
DABAN_TYPE_LIMIT_UP = 1
DABAN_TYPE_OPEN_LIMIT_UP = 2
DABAN_TYPE_LIMIT_DOWN = 3
DABAN_TYPE_EVER_LIMIT_DOWN = 5
class KPLDataType(enum.Enum):
    BIDDING = "biddings"
    LIMIT_UP = "limit_up"
    OPEN_LIMIT_UP = "open_limit_up"
    LIMIT_DOWN = "limit_down"
    EVER_LIMIT_DOWN = "ever_limit_down"
    FENG_KOU = "feng_kou"
    BEST_FENG_KOU = "best_feng_kou"
    FENG_XIANG = "feng_xiang"
    INDUSTRY_RANK = "industry_rank"
    JINGXUAN_RANK = "jingxuan_rank"
def __parseDaBanItemData(data, type):
    if type == DABAN_TYPE_BIDDING:
        return data[0], data[1], data[18], data[11], data[22], data[15]
    elif type == DABAN_TYPE_LIMIT_UP:
        # (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额, 涨停原因代码, 涨停原因代码数量)
        return data[0], data[1], data[6], data[25], data[9], data[16], data[11], data[15], data[12], data[26], data[27]
    elif type == DABAN_TYPE_OPEN_LIMIT_UP:
        return data[0], data[1], data[4], data[11], data[15]
    elif type == DABAN_TYPE_LIMIT_DOWN:
        return data[0], data[1], data[11], data[15]
    elif type == DABAN_TYPE_EVER_LIMIT_DOWN:
        return data[0], data[1], data[4], data[11], data[15]
    return None
def __parseLimitUpItemData(data):
    # (代码, 名称, 首次涨停时间, 最近涨停时间, 几板, 涨停原因, 板块, 实际流通, 主力净额, 涨停原因代码, 涨停原因代码数量)
    return data[0], data[1], data[4], data[4], data[18], data[5], data[12], data[13], data[8], data[19], data[20]
# 最强风口
# (代码,名称,强度,涨幅,热门板块,所有板块)
def __parseBestFengKouItemData(data):
    return data[0], data[1], data[2], data[4], data[12], data[10]
# 市场风口
# (代码,名称,涨幅,主力净额,风口概念)
def __parseFengKouItemData(data):
    return data[0], data[1], data[3], data[7], data[11]
# 风向标
# (代码, 名称, 现价, 涨幅, 板块, 300万大单净额, 主力净额, 自由市值)
def __parseFengXiangBiaoItemData(data):
    return data[0], data[1], data[5], data[6], data[4], data[-3], data[13], data[10]
# 行业涨幅
# (代码,名称,主力净额,涨跌幅)
def __parseIndustry_rank(data):
    return data[0], data[1], data[6], data[3]
def parseDaBanData(data, type_):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["list"]
    fresult_ = []
    for d in list_:
        pdata = __parseDaBanItemData(d, type_)
        if pdata:
            fresult_.append(pdata)
    return fresult_
def parseLimitUpData(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["list"]
    fresult_ = []
    for d in list_:
        pdata = __parseLimitUpItemData(d)
        if pdata:
            fresult_.append(pdata)
    return fresult_
def parseFengKou(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["List"]
    fresult_ = []
    for d in list_:
        pdata = __parseFengKouItemData(d)
        if pdata:
            fresult_.append(pdata)
    return fresult_
def parseBestFengKou(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["List"]
    fresult_ = []
    for d in list_:
        pdata = __parseBestFengKouItemData(d)
        if pdata:
            fresult_.append(pdata)
    return fresult_
def parseFengXiang(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["list"]
    fresult_ = []
    for d in list_:
        pdata = __parseFengXiangBiaoItemData(d)
        if pdata:
            fresult_.append(pdata)
    return fresult_
def parseIndustryRank(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["list"]
    fresult_ = []
    for d in list_:
        pdata = __parseIndustry_rank(d)
        if pdata:
            fresult_.append(pdata)
    return fresult_
# 解析板块代码
def parsePlateCodes(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["list"]
    fresult_ = []
    for d in list_:
        # (代码,名称,现价,涨幅,自由流通,几板,龙几,主力净额,300w净额,机构增仓)
        fresult_.append((d[0], d[1], d[5], d[6], d[10], d[23], d[24], d[13], d[50], d[42]))
    return fresult_
# 解析概念中的板块强度
def parseSonPlat(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["List"]
    fresult_ = []
    for d in list_:
        # (代码,名称,强度)
        fresult_.append((d[0], d[1], d[2]))
    return fresult_
def __money_desc(money):
    if abs(money) > 100000000:
        return f"{round(money / 100000000, 2)}亿"
    else:
        return f"{round(money / 10000, 2)}万"
def parseMarketIndustry(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["list"]
    fresult_ = []
    for d in list_:
        # (代码,名称,涨幅,主力净额)
        fresult_.append((d[0], d[1], d[3], d[6]))
    return fresult_
def parseMarketJingXuan(data):
    if type(data) == str:
        data = json.loads(data)
    if int(data["errcode"]) != 0:
        raise Exception(f"解析数据出错,errcode:{data['errcode']}")
    list_ = data["list"]
    fresult_ = []
    for d in list_:
        # (代码,名称,强度,主力净额)
        fresult_.append((d[0], d[1], d[2], d[6]))
    return fresult_
class KPLPlatManager:
    def save_plat(self, _id, name):
        if not _id:
            return
        mysqldb = mysql_data.Mysqldb()
        key = f"{_id}-{name}"
        results = mysqldb.select_one(f"select * from kpl_plate where _name='{name}'")
        if not results:
            mysqldb.execute(f"insert into kpl_plate(_id,_name,_key) values({_id},'{name}','{key}')")
    def get_plat(self, name):
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_one(f"select * from kpl_plate where _name='{name}'")
        if results:
            return results[0]
        return None
    def get_same_plat_names(self, name):
        mysqldb = mysql_data.Mysqldb()
        plate = self.get_plat(name)
        if not plate:
            return {name}
        results = mysqldb.select_all(f"select _name from kpl_plate where _id='{plate}'")
        return set([r[0] for r in results])
    def get_same_plat_names_by_id(self, id_):
        mysqldb = mysql_data.Mysqldb()
        results = mysqldb.select_all(f"select _name from kpl_plate where _id='{id_}'")
        return set([r[0] for r in results])
# 获取高位板的数量
def get_high_level_count(key):
    if key.find("连板") >= 0:
        # 形式如: 3连板
        return int(key.replace("连板", ""))
    elif key.find("天") >= 0 and key.find("板") >= 0:
        # 形式如:5天4板
        return int(key.split("天")[1].replace("板", ""))
    # 形式如:首板
    return 1
# 获取实时涨停列表中的涨停原因
def get_current_limit_up_reasons(item) -> set:
    b = {item[5]}
    if item[6]:
        b |= set(item[6].split("、"))
    return b
# 获取目前的涨停原因
def get_current_limit_up_reason(item) -> str:
    return item[5]
def filter_block(block):
    if not block:
        return block
    return block.replace("概念", "")
if __name__ == "__main__":
    print(get_high_level_count("首板"))
    print(get_high_level_count("5天4板"))
    print(get_high_level_count("2连板"))
    print(get_high_level_count("4连板"))
utils/constant.py
@@ -13,8 +13,14 @@
    return False
CACHE_PATH = f"{'D:' if is_windows() else '/home/userzjj'}/trade_cache"
REDIS_DB = 10
# 开盘啦
KPL_INVALID_BLOCKS = {"一季报增长", "二季报增长", "三季报增长", "四季报增长", "业绩增长", "中报增长", "年报增长", "年报预增", "无", "次新股", "ST摘帽", "超跌",
                      "股权转让", "并购重组", "再融资", "年报预增", " 专精特新", "壳资源", "行业龙头", "科创板", "实控人变更"}
# redis设置
REDIS_CONFIG = {
    "host": "gz-crs-6l6xbc4j.sql.tencentcdb.com",
utils/tool.py
@@ -344,5 +344,21 @@
        return MARKET_TYPE_UNKNOWN
def get_limit_up_rate(code):
    # 获取涨停倍数
    if code.find("00") == 0 or code.find("60") == 0:
        return 1.1
    else:
        return 1.2
def get_limit_down_rate(code):
    # 获取涨停倍数
    if code.find("00") == 0 or code.find("60") == 0:
        return 0.9
    else:
        return 0.8
if __name__ == "__main__":
    pass