Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
utils/tool.py
@@ -21,6 +21,22 @@
    return wrapper
def singleton(cls):
    """
    单例装饰器
    @param cls:
    @return:
    """
    instances = {}
    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    return get_instance
def get_expire():
    now = int(t.time())
    end = int(t.time()) + 60 * 60 * 24
@@ -28,7 +44,7 @@
    end = int(t.mktime(t.strptime(local_time, "%Y-%m-%d")))
    expire = end - now
    # 加随机数,防止一起销毁数据
    expire += random.randint(0, 60 * 60)
    expire += random.randint(0, 60 * 30)
    return expire
@@ -51,6 +67,18 @@
def get_now_time_str():
    time_str = datetime.datetime.now().strftime("%H:%M:%S")
    return time_str
def get_now_time_as_int():
    time_str = datetime.datetime.now().strftime("%H:%M:%S")
    return int(time_str.replace(":", ""))
def get_now_time_with_ms_str():
    now = datetime.datetime.now()
    ms = int(now.microsecond / 1000)
    time_str = now.strftime(f"%H:%M:%S.{ms:03d}")
    return time_str
@@ -156,12 +184,37 @@
    return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
def get_time_as_millionsecond(time_str):
    s_str, ms_str = time_str.split(".")
    ts = s_str.split(":")
    return (int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])) * 1000 + int(ms_str)
# 将秒数格式化为时间
def time_seconds_format(seconds):
    h = seconds // 3600
    m = seconds % 3600 // 60
    s = seconds % 60
    return "{0:0>2}:{1:0>2}:{2:0>2}".format(h, m, s)
def time_millionseconds_format(millionseconds):
    ms = millionseconds % 1000
    seconds = millionseconds // 1000
    h = seconds // 3600
    m = seconds % 3600 // 60
    s = seconds % 60
    return "{0:0>2}:{1:0>2}:{2:0>2}.{3:0>3}".format(h, m, s, ms)
def timestamp_format(timestamp, format):
    """
    时间戳格式化
    @param timestamp:
    @param format:
    @return:
    """
    return datetime.datetime.fromtimestamp(timestamp).strftime(format)
# 交易時間的差值
@@ -178,6 +231,26 @@
    return time_1 - time_2
def trade_time_sub_with_ms(time_str_1, time_str_2):
    split_time = get_time_as_second("11:30:00") * 1000
    time_1 = get_time_as_millionsecond(time_str_1)
    time_2 = get_time_as_millionsecond(time_str_2)
    if time_1 < split_time < time_2:
        time_2 = time_2 - 90 * 60 * 1000
    elif time_2 < split_time < time_1:
        time_2 = time_2 + 90 * 60 * 1000
    return time_1 - time_2
def compare_time(time_1: str, time_2: str):
    return int(time_1.replace(":", "")) - int(time_2.replace(":", ""))
def compare_time_with_ms(time_1: str, time_2: str):
    return int(time_1.replace(":", "").replace(".", "")) - int(time_2.replace(":", "").replace(".", ""))
# 交易时间加几s
def trade_time_add_second(time_str, second):
    ts = time_str.split(":")
@@ -189,11 +262,34 @@
    return time_seconds_format(s)
def trade_time_add_millionsecond(time_str, millionsecond):
    s_str, ms_str = time_str.split(".")
    ts = s_str.split(":")
    s_ = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    ms_ = s_ * 1000
    ms_ += int(ms_str)
    ms = ms_ + millionsecond
    # 是否在11:30:00
    if ms >= 11 * 3600 * 1000 + 30 * 60 * 1000 > ms_:
        ms += 90 * 60 * 1000
    return time_millionseconds_format(ms)
def compute_buy1_real_time(time_):
    ts = time_.split(":")
    s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    cha = (s - 2) % 3
    return time_seconds_format(s - 2 - cha)
def to_time_with_ms(time_s_str, time_ms):
    """
    将时间+毫秒数 转为字符串
    @param time_s_str: 时间如:10:00:00
    @param time_ms: 毫秒如: 12
    @return: 如:10:00:00.001
    """
    return f"{time_s_str}." + "{0:0>3}".format(time_ms)
# 全角转半角
@@ -222,11 +318,22 @@
    return max(price1, price2)
# 获取买入价格笼子的最低价
def get_shadow_price(price):
    # fprice = round((100 - random.randint(2, 10)) * price / 100, 2)
    # if price - 0.1 < fprice:
    #     fprice = price - 0.1
    # return round(fprice, 2)
    if price < 20:
        return round(get_buy_min_price(price) - 0.03, 2)
    else:
        # 大股价直接向下取2%
        return round(price * (1 - 0.02), 2)
if __name__ == "__main__":
    print(get_buy_min_price(20))
    # print(trade_time_sub("11:29:59", 5))
    # print(trade_time_sub("10:29:59", 10))
    # print(trade_time_add_second("13:29:59", 60))
    print(trade_time_sub("13:00:01",
                         "11:29:55"))
class CodeDataCacheUtil:
@@ -247,11 +354,39 @@
        return False, None
# 是否为主板代码
def is_shsz_code(code):
    if code.find("00") == 0 or code.find("60") == 0:
def is_can_buy_code(code):
    if code.find("00") == 0 or code.find("60") == 0 or code.find("30") == 0:
        return True
    return False
def is_target_code(code):
    """
    是否是目标代码
    @param code:
    @return:
    """
    prefixes = ["00", "60", "30"]
    for prefix in prefixes:
        if code.find(prefix) == 0:
            return True
    return False
def get_limit_up_rate(code):
    # 获取涨停倍数
    if code.find("00") == 0 or code.find("60") == 0:
        return 1.1
    else:
        return 1.2
def get_limit_down_rate(code):
    # 获取涨停倍数
    if code.find("00") == 0 or code.find("60") == 0:
        return 0.9
    else:
        return 0.8
def get_thread_id():
@@ -264,3 +399,67 @@
    except:
        pass
    return None
def get_buy_volume(limit_up_price):
    return get_buy_volume_by_money(limit_up_price, constant.BUY_MONEY_PER_CODE)
def get_buy_volume_by_money(limit_up_price, money):
    count = (money // int(round(float(limit_up_price) * 100))) * 100
    if count < 100:
        count = 100
    return count
# 深证
MARKET_TYPE_SZSE = 1
# 上证
MARKET_TYPE_SSE = 0
# 未知
MARKET_TYPE_UNKNOWN = -1
def get_market_type(code):
    """
    根据股票代码
    :param code:
    :return:
    """
    if code.find("00") == 0 or code.find("30") == 0 or code.find("12") == 0:
        return MARKET_TYPE_SZSE
    elif code.find("60") == 0 or code.find("68") == 0 or code.find("11") == 0:
        return MARKET_TYPE_SSE
    else:
        return MARKET_TYPE_UNKNOWN
def is_sh_code(code):
    """
    是否是上证
    @param code:
    @return:
    """
    return get_market_type(code) == MARKET_TYPE_SSE
def is_sz_code(code):
    """
    是否是深证
    @param code:
    @return:
    """
    return get_market_type(code) == MARKET_TYPE_SZSE
def is_ge_code(code):
    """
    是否是创业板
    @param code:
    @return:
    """
    return code.find("30") == 0
if __name__ == "__main__":
    print(timestamp_format(1726034271, "%H%M%S"))