| | |
| | | return wrapper |
| | | |
| | | |
| | | def singleton(cls): |
| | | """ |
| | | 单例装饰器 |
| | | @param cls: |
| | | @return: |
| | | """ |
| | | instances = {} |
| | | |
| | | def get_instance(*args, **kwargs): |
| | | if cls not in instances: |
| | | instances[cls] = cls(*args, **kwargs) |
| | | return instances[cls] |
| | | |
| | | return get_instance |
| | | |
| | | |
| | | def get_expire(): |
| | | now = int(t.time()) |
| | | end = int(t.time()) + 60 * 60 * 24 |
| | |
| | | end = int(t.mktime(t.strptime(local_time, "%Y-%m-%d"))) |
| | | expire = end - now |
| | | # 加随机数,防止一起销毁数据 |
| | | expire += random.randint(0, 60 * 60) |
| | | expire += random.randint(0, 60 * 30) |
| | | return expire |
| | | |
| | | |
| | |
| | | |
| | | def get_now_time_str(): |
| | | time_str = datetime.datetime.now().strftime("%H:%M:%S") |
| | | return time_str |
| | | |
| | | |
| | | def get_now_time_as_int(): |
| | | time_str = datetime.datetime.now().strftime("%H:%M:%S") |
| | | return int(time_str.replace(":", "")) |
| | | |
| | | |
| | | def get_now_time_with_ms_str(): |
| | | now = datetime.datetime.now() |
| | | ms = int(now.microsecond / 1000) |
| | | time_str = now.strftime(f"%H:%M:%S.{ms:03d}") |
| | | return time_str |
| | | |
| | | |
| | |
| | | return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) |
| | | |
| | | |
| | | def get_time_as_millionsecond(time_str): |
| | | s_str, ms_str = time_str.split(".") |
| | | ts = s_str.split(":") |
| | | return (int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])) * 1000 + int(ms_str) |
| | | |
| | | |
| | | # 将秒数格式化为时间 |
| | | def time_seconds_format(seconds): |
| | | h = seconds // 3600 |
| | | m = seconds % 3600 // 60 |
| | | s = seconds % 60 |
| | | return "{0:0>2}:{1:0>2}:{2:0>2}".format(h, m, s) |
| | | |
| | | |
| | | def time_millionseconds_format(millionseconds): |
| | | ms = millionseconds % 1000 |
| | | seconds = millionseconds // 1000 |
| | | h = seconds // 3600 |
| | | m = seconds % 3600 // 60 |
| | | s = seconds % 60 |
| | | return "{0:0>2}:{1:0>2}:{2:0>2}.{3:0>3}".format(h, m, s, ms) |
| | | |
| | | |
| | | def timestamp_format(timestamp, format): |
| | | """ |
| | | 时间戳格式化 |
| | | @param timestamp: |
| | | @param format: |
| | | @return: |
| | | """ |
| | | return datetime.datetime.fromtimestamp(timestamp).strftime(format) |
| | | |
| | | |
| | | # 交易時間的差值 |
| | |
| | | return time_1 - time_2 |
| | | |
| | | |
| | | def trade_time_sub_with_ms(time_str_1, time_str_2): |
| | | split_time = get_time_as_second("11:30:00") * 1000 |
| | | time_1 = get_time_as_millionsecond(time_str_1) |
| | | time_2 = get_time_as_millionsecond(time_str_2) |
| | | if time_1 < split_time < time_2: |
| | | time_2 = time_2 - 90 * 60 * 1000 |
| | | elif time_2 < split_time < time_1: |
| | | time_2 = time_2 + 90 * 60 * 1000 |
| | | |
| | | return time_1 - time_2 |
| | | |
| | | |
| | | def compare_time(time_1: str, time_2: str): |
| | | return int(time_1.replace(":", "")) - int(time_2.replace(":", "")) |
| | | |
| | | |
| | | def compare_time_with_ms(time_1: str, time_2: str): |
| | | return int(time_1.replace(":", "").replace(".", "")) - int(time_2.replace(":", "").replace(".", "")) |
| | | |
| | | |
| | | # 交易时间加几s |
| | | def trade_time_add_second(time_str, second): |
| | | ts = time_str.split(":") |
| | |
| | | return time_seconds_format(s) |
| | | |
| | | |
| | | def trade_time_add_millionsecond(time_str, millionsecond): |
| | | s_str, ms_str = time_str.split(".") |
| | | ts = s_str.split(":") |
| | | s_ = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) |
| | | ms_ = s_ * 1000 |
| | | ms_ += int(ms_str) |
| | | ms = ms_ + millionsecond |
| | | # 是否在11:30:00 |
| | | if ms >= 11 * 3600 * 1000 + 30 * 60 * 1000 > ms_: |
| | | ms += 90 * 60 * 1000 |
| | | return time_millionseconds_format(ms) |
| | | |
| | | |
| | | def compute_buy1_real_time(time_): |
| | | ts = time_.split(":") |
| | | s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2]) |
| | | cha = (s - 2) % 3 |
| | | return time_seconds_format(s - 2 - cha) |
| | | |
| | | |
| | | def to_time_with_ms(time_s_str, time_ms): |
| | | """ |
| | | 将时间+毫秒数 转为字符串 |
| | | @param time_s_str: 时间如:10:00:00 |
| | | @param time_ms: 毫秒如: 12 |
| | | @return: 如:10:00:00.001 |
| | | """ |
| | | return f"{time_s_str}." + "{0:0>3}".format(time_ms) |
| | | |
| | | |
| | | # 全角转半角 |
| | |
| | | return max(price1, price2) |
| | | |
| | | |
| | | # 获取买入价格笼子的最低价 |
| | | def get_shadow_price(price): |
| | | # fprice = round((100 - random.randint(2, 10)) * price / 100, 2) |
| | | # if price - 0.1 < fprice: |
| | | # fprice = price - 0.1 |
| | | # return round(fprice, 2) |
| | | if price < 20: |
| | | return round(get_buy_min_price(price) - 0.03, 2) |
| | | else: |
| | | # 大股价直接向下取2% |
| | | return round(price * (1 - 0.02), 2) |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | print(get_buy_min_price(20)) |
| | | # print(trade_time_sub("11:29:59", 5)) |
| | | # print(trade_time_sub("10:29:59", 10)) |
| | | # print(trade_time_add_second("13:29:59", 60)) |
| | | print(trade_time_sub("13:00:01", |
| | | "11:29:55")) |
| | | |
| | | |
| | | class CodeDataCacheUtil: |
| | |
| | | return False, None |
| | | |
| | | |
| | | # 是否为主板代码 |
| | | def is_shsz_code(code): |
| | | if code.find("00") == 0 or code.find("60") == 0: |
| | | def is_can_buy_code(code): |
| | | if code.find("00") == 0 or code.find("60") == 0 or code.find("30") == 0: |
| | | return True |
| | | return False |
| | | |
| | | |
| | | def is_target_code(code): |
| | | """ |
| | | 是否是目标代码 |
| | | @param code: |
| | | @return: |
| | | """ |
| | | prefixes = ["00", "60", "30"] |
| | | for prefix in prefixes: |
| | | if code.find(prefix) == 0: |
| | | return True |
| | | return False |
| | | |
| | | |
| | | def get_limit_up_rate(code): |
| | | # 获取涨停倍数 |
| | | if code.find("00") == 0 or code.find("60") == 0: |
| | | return 1.1 |
| | | else: |
| | | return 1.2 |
| | | |
| | | |
| | | def get_limit_down_rate(code): |
| | | # 获取涨停倍数 |
| | | if code.find("00") == 0 or code.find("60") == 0: |
| | | return 0.9 |
| | | else: |
| | | return 0.8 |
| | | |
| | | |
| | | def get_thread_id(): |
| | |
| | | except: |
| | | pass |
| | | return None |
| | | |
| | | |
| | | def get_buy_volume(limit_up_price): |
| | | return get_buy_volume_by_money(limit_up_price, constant.BUY_MONEY_PER_CODE) |
| | | |
| | | |
| | | def get_buy_volume_by_money(limit_up_price, money): |
| | | count = (money // int(round(float(limit_up_price) * 100))) * 100 |
| | | if count < 100: |
| | | count = 100 |
| | | return count |
| | | |
| | | |
| | | # 深证 |
| | | MARKET_TYPE_SZSE = 1 |
| | | # 上证 |
| | | MARKET_TYPE_SSE = 0 |
| | | # 未知 |
| | | MARKET_TYPE_UNKNOWN = -1 |
| | | |
| | | |
| | | def get_market_type(code): |
| | | """ |
| | | 根据股票代码 |
| | | :param code: |
| | | :return: |
| | | """ |
| | | if code.find("00") == 0 or code.find("30") == 0 or code.find("12") == 0: |
| | | return MARKET_TYPE_SZSE |
| | | elif code.find("60") == 0 or code.find("68") == 0 or code.find("11") == 0: |
| | | return MARKET_TYPE_SSE |
| | | else: |
| | | return MARKET_TYPE_UNKNOWN |
| | | |
| | | |
| | | def is_sh_code(code): |
| | | """ |
| | | 是否是上证 |
| | | @param code: |
| | | @return: |
| | | """ |
| | | return get_market_type(code) == MARKET_TYPE_SSE |
| | | |
| | | |
| | | def is_sz_code(code): |
| | | """ |
| | | 是否是深证 |
| | | @param code: |
| | | @return: |
| | | """ |
| | | return get_market_type(code) == MARKET_TYPE_SZSE |
| | | |
| | | |
| | | def is_ge_code(code): |
| | | """ |
| | | 是否是创业板 |
| | | @param code: |
| | | @return: |
| | | """ |
| | | return code.find("30") == 0 |
| | | |
| | | |
| | | if __name__ == "__main__": |
| | | print(timestamp_format(1726034271, "%H%M%S")) |