| | |
| | | return wrapper |
| | | |
| | | |
| | | def singleton(cls): |
| | | """ |
| | | 单例装饰器 |
| | | @param cls: |
| | | @return: |
| | | """ |
| | | instances = {} |
| | | |
| | | def get_instance(*args, **kwargs): |
| | | if cls not in instances: |
| | | instances[cls] = cls(*args, **kwargs) |
| | | return instances[cls] |
| | | |
| | | return get_instance |
| | | |
| | | |
| | | def get_expire(): |
| | | now = int(t.time()) |
| | | end = int(t.time()) + 60 * 60 * 24 |
| | |
| | | end = int(t.mktime(t.strptime(local_time, "%Y-%m-%d"))) |
| | | expire = end - now |
| | | # 加随机数,防止一起销毁数据 |
| | | expire += random.randint(0, 60 * 60) |
| | | expire += random.randint(0, 60 * 30) |
| | | return expire |
| | | |
| | | |
| | |
| | | |
| | | def get_now_time_str(): |
| | | time_str = datetime.datetime.now().strftime("%H:%M:%S") |
| | | return time_str |
| | | |
| | | |
| | | def get_now_time_as_int(): |
| | | time_str = datetime.datetime.now().strftime("%H:%M:%S") |
| | | return int(time_str.replace(":", "")) |
| | | |
| | | |
| | | def get_now_time_with_ms_str(): |
| | | now = datetime.datetime.now() |
| | | ms = int(now.microsecond / 1000) |
| | | time_str = now.strftime(f"%H:%M:%S.{ms:03d}") |
| | | return time_str |
| | | |
| | | |
| | |
| | | return "{0:0>2}:{1:0>2}:{2:0>2}.{3:0>3}".format(h, m, s, ms) |
| | | |
| | | |
| | | def timestamp_format(timestamp, format): |
| | | """ |
| | | 时间戳格式化 |
| | | @param timestamp: |
| | | @param format: |
| | | @return: |
| | | """ |
| | | return datetime.datetime.fromtimestamp(timestamp).strftime(format) |
| | | |
| | | |
| | | # 交易時間的差值 |
| | | # 如11:29:59 与 13:00:00只相差1s |
| | | def trade_time_sub(time_str_1, time_str_2): |
| | |
| | | # if price - 0.1 < fprice: |
| | | # fprice = price - 0.1 |
| | | # return round(fprice, 2) |
| | | return round(get_buy_min_price(price) - 0.03, 2) |
| | | if price < 20: |
| | | return round(get_buy_min_price(price) - 0.03, 2) |
| | | else: |
| | | # 大股价直接向下取2% |
| | | return round(price * (1 - 0.02), 2) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | |
| | | @param code: |
| | | @return: |
| | | """ |
| | | prefixes = ["00", "60", "68", "30"] |
| | | prefixes = ["00", "60", "30"] |
| | | for prefix in prefixes: |
| | | if code.find(prefix) == 0: |
| | | return True |
| | |
| | | |
| | | |
| | | def get_buy_volume(limit_up_price): |
| | | count = (constant.BUY_MONEY_PER_CODE // int(round(float(limit_up_price) * 100))) * 100 |
| | | return get_buy_volume_by_money(limit_up_price, constant.BUY_MONEY_PER_CODE) |
| | | |
| | | |
| | | def get_buy_volume_by_money(limit_up_price, money): |
| | | count = (money // int(round(float(limit_up_price) * 100))) * 100 |
| | | if count < 100: |
| | | count = 100 |
| | | return count |
| | |
| | | return get_market_type(code) == MARKET_TYPE_SZSE |
| | | |
| | | |
| | | def is_ge_code(code): |
| | | """ |
| | | 是否是创业板 |
| | | @param code: |
| | | @return: |
| | | """ |
| | | return code.find("30") == 0 |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | print(is_sz_code("0000")) |
| | | print(timestamp_format(1726034271, "%H%M%S")) |