Administrator
4 天以前 48fb7a00951f91bdc707e5dd2d196e5bccb752c3
utils/tool.py
@@ -21,6 +21,22 @@
    return wrapper
def singleton(cls):
    """
    单例装饰器
    @param cls:
    @return:
    """
    instances = {}
    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    return get_instance
def get_expire():
    now = int(t.time())
    end = int(t.time()) + 60 * 60 * 24
@@ -28,7 +44,7 @@
    end = int(t.mktime(t.strptime(local_time, "%Y-%m-%d")))
    expire = end - now
    # 加随机数,防止一起销毁数据
    expire += random.randint(0, 60 * 60)
    expire += random.randint(0, 60 * 30)
    return expire
@@ -51,6 +67,18 @@
def get_now_time_str():
    time_str = datetime.datetime.now().strftime("%H:%M:%S")
    return time_str
def get_now_time_as_int():
    time_str = datetime.datetime.now().strftime("%H:%M:%S")
    return int(time_str.replace(":", ""))
def get_now_time_with_ms_str():
    now = datetime.datetime.now()
    ms = int(now.microsecond / 1000)
    time_str = now.strftime(f"%H:%M:%S.{ms:03d}")
    return time_str
@@ -159,7 +187,7 @@
def get_time_as_millionsecond(time_str):
    s_str, ms_str = time_str.split(".")
    ts = s_str.split(":")
    return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) * 1000 + int(ms_str)
    return (int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])) * 1000 + int(ms_str)
# 将秒数格式化为时间
@@ -177,6 +205,16 @@
    m = seconds % 3600 // 60
    s = seconds % 60
    return "{0:0>2}:{1:0>2}:{2:0>2}.{3:0>3}".format(h, m, s, ms)
def timestamp_format(timestamp, format):
    """
    时间戳格式化
    @param timestamp:
    @param format:
    @return:
    """
    return datetime.datetime.fromtimestamp(timestamp).strftime(format)
# 交易時間的差值
@@ -244,6 +282,16 @@
    return time_seconds_format(s - 2 - cha)
def to_time_with_ms(time_s_str, time_ms):
    """
    将时间+毫秒数 转为字符串
    @param time_s_str: 时间如:10:00:00
    @param time_ms: 毫秒如: 12
    @return: 如:10:00:00.001
    """
    return f"{time_s_str}." + "{0:0>3}".format(time_ms)
# 全角转半角
def strQ2B(ustring):
    rstring = ""
@@ -276,7 +324,11 @@
    # if price - 0.1 < fprice:
    #     fprice = price - 0.1
    # return round(fprice, 2)
    return round(get_buy_min_price(price) - 0.03, 2)
    if price < 20:
        return round(get_buy_min_price(price) - 0.03, 2)
    else:
        # 大股价直接向下取2%
        return round(price * (1 - 0.02), 2)
if __name__ == "__main__":
@@ -302,11 +354,39 @@
        return False, None
# 是否为主板代码
def is_shsz_code(code):
    if code.find("00") == 0 or code.find("60") == 0:
def is_can_buy_code(code):
    if code.find("00") == 0 or code.find("60") == 0 or code.find("30") == 0:
        return True
    return False
def is_target_code(code):
    """
    是否是目标代码
    @param code:
    @return:
    """
    prefixes = ["00", "60", "30"]
    for prefix in prefixes:
        if code.find(prefix) == 0:
            return True
    return False
def get_limit_up_rate(code):
    # 获取涨停倍数
    if code.find("00") == 0 or code.find("60") == 0:
        return 1.1
    else:
        return 1.2
def get_limit_down_rate(code):
    # 获取涨停倍数
    if code.find("00") == 0 or code.find("60") == 0:
        return 0.9
    else:
        return 0.8
def get_thread_id():
@@ -322,11 +402,64 @@
def get_buy_volume(limit_up_price):
    count = (constant.BUY_MONEY_PER_CODE // int(round(float(limit_up_price) * 100))) * 100
    return get_buy_volume_by_money(limit_up_price, constant.BUY_MONEY_PER_CODE)
def get_buy_volume_by_money(limit_up_price, money):
    count = (money // int(round(float(limit_up_price) * 100))) * 100
    if count < 100:
        count = 100
    return count
# 深证
MARKET_TYPE_SZSE = 1
# 上证
MARKET_TYPE_SSE = 0
# 未知
MARKET_TYPE_UNKNOWN = -1
def get_market_type(code):
    """
    根据股票代码
    :param code:
    :return:
    """
    if code.find("00") == 0 or code.find("30") == 0 or code.find("12") == 0:
        return MARKET_TYPE_SZSE
    elif code.find("60") == 0 or code.find("68") == 0 or code.find("11") == 0:
        return MARKET_TYPE_SSE
    else:
        return MARKET_TYPE_UNKNOWN
def is_sh_code(code):
    """
    是否是上证
    @param code:
    @return:
    """
    return get_market_type(code) == MARKET_TYPE_SSE
def is_sz_code(code):
    """
    是否是深证
    @param code:
    @return:
    """
    return get_market_type(code) == MARKET_TYPE_SZSE
def is_ge_code(code):
    """
    是否是创业板
    @param code:
    @return:
    """
    return code.find("30") == 0
if __name__ == "__main__":
    print(trade_time_add_second("11:29:50",15))
    print(timestamp_format(1726034271, "%H%M%S"))