Administrator
2023-08-08 72149b3076983701b17f3dc55fd3ca60243c1f58
redis异步数据提交
12个文件已修改
181 ■■■■■ 已修改文件
code_attribute/first_target_code_data_processor.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/industry_codes_sort.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/limit_up_time_manager.py 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 77 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/code_info_output.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/limit_up_data_filter.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/l2_trade_factor.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_data_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_result_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_attribute/first_target_code_data_processor.py
@@ -201,10 +201,10 @@
             "limit_up": is_limit_up})
        if code in new_add_codes:
            if is_limit_up:
                place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(
                place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count(
                    code)
                if place_order_count == 0:
                    trade_data_manager.placeordercountmanager.place_order(code)
                    trade_data_manager.PlaceOrderCountManager.place_order(code)
    gpcode_first_screen_manager.process_ticks(prices)
code_attribute/industry_codes_sort.py
@@ -21,7 +21,7 @@
        return int(a[1].replace(":", "")) - int(b[1].replace(":", ""))
    if not global_util.limit_up_time:
        limit_up_time_manager.load_limit_up_time()
        limit_up_time_manager.LimitUpTimeManager.load_limit_up_time()
    list = []
    for code in codes:
        limit_up_time = global_util.limit_up_time.get(code)
code_attribute/limit_up_time_manager.py
@@ -6,36 +6,52 @@
from db.redis_manager import RedisUtils
from utils import global_util, tool
_redisManager = redis_manager.RedisManager(0)
class LimitUpTimeManager:
    __limit_up_time_cache = {}
    _redisManager = redis_manager.RedisManager(0)
    __instance = None
def save_limit_up_time(code, time):
    _time = get_limit_up_time(code)
    if _time is None:
        RedisUtils.setex(
            _redisManager.getRedis(), "limit_up_time-{}".format(code), tool.get_expire(), time)
        global_util.limit_up_time[code] = time
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(LimitUpTimeManager, cls).__new__(cls, *args, **kwargs)
            cls.load_limit_up_time()
        return cls.__instance
    @classmethod
    def load_limit_up_time(cls):
        redis = cls._redisManager.getRedis()
        try:
            keys = RedisUtils.keys(redis, "limit_up_time-*", auto_free=False)
            for key in keys:
                code = key.replace("limit_up_time-", "")
                time_ = RedisUtils.get(redis, key, auto_free=False)
                global_util.limit_up_time[code] = time_
                tool.CodeDataCacheUtil.set_cache(cls.__limit_up_time_cache, code, time_)
        finally:
            RedisUtils.realse(redis)
def get_limit_up_time(code):
    time = global_util.limit_up_time.get(code)
    if time is None:
        time = RedisUtils.get(_redisManager.getRedis(), "limit_up_time-{}".format(code))
        if time is not None:
    def save_limit_up_time(self, code, time):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__limit_up_time_cache, code)
        if not cache_result[0] or cache_result[1] is None:
            global_util.limit_up_time[code] = time
            tool.CodeDataCacheUtil.set_cache(self.__limit_up_time_cache, code, time)
            RedisUtils.setex_async(
                self._redisManager.getRedis(), "limit_up_time-{}".format(code), tool.get_expire(), time)
    return time
    def get_limit_up_time(self, code):
        time = global_util.limit_up_time.get(code)
        if time is None:
            time = RedisUtils.get(self._redisManager.getRedis(), "limit_up_time-{}".format(code))
            if time is not None:
                global_util.limit_up_time[code] = time
        return time
def load_limit_up_time():
    redis = _redisManager.getRedis()
    try:
        keys = RedisUtils.keys(redis, "limit_up_time-*", auto_free=False)
        for key in keys:
            code = key.replace("limit_up_time-", "")
            global_util.limit_up_time[code] = RedisUtils.get(redis, key, auto_free=False)
    finally:
        RedisUtils.realse(redis)
    def get_limit_up_time_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__limit_up_time_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
if __name__ == "__main__":
l2/cancel_buy_strategy.py
@@ -42,7 +42,7 @@
        CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code,
                                    (process_index, buy_num, cancel_num))
        key = "s_big_num_cancel_compute_data-{}".format(code)
        RedisUtils.setex_async(cls.__get_redis(), key, tool.get_expire(),
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(),
                               json.dumps((process_index, buy_num, cancel_num)))
    @classmethod
@@ -156,7 +156,7 @@
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        process_index = process_index_old
        # 下单次数
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count(code)
        if buy_single_index == start_index:
            # 第1次计算需要计算买入信号-执行位的净值
@@ -272,7 +272,7 @@
    def __save_watch_index_set(cls, code, datas, process_index, finish):
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, (list(datas), process_index, finish))
        key = f"h_cancel_watch_indexs-{code}"
        RedisUtils.setex_async(cls.__get_redis(), key, tool.get_expire(),
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(),
                               json.dumps((list(datas), process_index, finish)))
    # 保存成交进度
@@ -300,7 +300,7 @@
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code,
                                    (list(datas), process_index, total_count, big_num_count, finished))
        key = f"h_cancel_watch_indexs_exec-{code}"
        RedisUtils.setex_async(cls.__get_redis(), key, tool.get_expire(),
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(),
                               json.dumps((list(datas), process_index, total_count, big_num_count, finished)))
    # 保存成交进度
@@ -359,7 +359,7 @@
        CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code,
                                    (origin_process_index, latest_process_index))
        key = "h_cancel_traded_progress-{}".format(code)
        RedisUtils.setex_async(cls.__get_redis(), key, tool.get_expire(),
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(),
                               json.dumps((origin_process_index, latest_process_index)))
    @classmethod
@@ -386,7 +386,7 @@
        CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code,
                                    (process_index, cancel_num))
        key = "h_cancel_compute_data-{}".format(code)
        RedisUtils.setex_async(cls.__get_redis(), key, tool.get_expire(), json.dumps((process_index, cancel_num)))
        RedisUtils.setex_async(cls.__db, key, tool.get_expire(), json.dumps((process_index, cancel_num)))
    @classmethod
    def __get_compute_data(cls, code):
@@ -477,7 +477,7 @@
        l2_log.cancel_debug(code, "H级是否需要撤单,数据范围:{}-{} ", start_index, end_index)
        # 获取下单次数
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count(code)
        cancel_rate_threshold = cls.__hCancelParamsManager.get_cancel_rate(volume_index)
        process_index = start_index
        # 是否有观测的数据撤单
@@ -767,7 +767,7 @@
    @classmethod
    def __set_real_order_index(cls, code, index):
        CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, index)
        RedisUtils.setex_async(cls.__get_redis(), f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}")
        RedisUtils.setex_async(cls.__db, f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}")
    @classmethod
    def __del_real_order_index(cls, code):
@@ -776,7 +776,7 @@
    @classmethod
    def __get_real_order_index(cls, code):
        val = RedisUtils.get(cls.__get_redis(), f"d_cancel_real_order_index-{code}")
        val = RedisUtils.get(cls.__db, f"d_cancel_real_order_index-{code}")
        if val:
            return int(val)
        return None
@@ -840,7 +840,7 @@
    @classmethod
    def set_real_order_index(cls, code, index):
        cls.__set_real_order_index(code, index)
        logger_l2_d_cancel.info(f"{code}成交进度设置:{index}")
        logger_l2_d_cancel.info(f"{code}下单位置设置:{index}")
    @classmethod
    def place_order_success(cls, code):
l2/l2_data_manager_new.py
@@ -57,25 +57,31 @@
# m值大单处理
m_big_money_begin_cache = {}
m_big_money_process_index_cache = {}
class L2BigNumForMProcessor:
    _db = 1
    _redis_manager = redis_manager.RedisManager(1)
    m_big_money_begin_cache = {}
    m_big_money_process_index_cache = {}
    __instance = None
    def __init__(self):
        self._redis_manager = redis_manager.RedisManager(1)
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(L2BigNumForMProcessor, cls).__new__(cls, *args, **kwargs)
        return cls.__instance
    def __get_redis(self):
        return self._redis_manager.getRedis()
    @classmethod
    def __get_redis(cls):
        return cls._redis_manager.getRedis()
    # 保存计算开始位置
    def set_begin_pos(self, code, index):
        if self.__get_begin_pos_cache(code) is None:
            tool.CodeDataCacheUtil.set_cache(m_big_money_begin_cache, code, index)
            tool.CodeDataCacheUtil.set_cache(self.m_big_money_begin_cache, code, index)
            # 保存位置
            key = "m_big_money_begin-{}".format(code)
            RedisUtils.setex_async(self.__get_redis(), key, tool.get_expire(), index)
            RedisUtils.setex_async(self._db, key, tool.get_expire(), index)
    # 获取计算开始位置
    def __get_begin_pos(self, code):
@@ -86,24 +92,24 @@
        return int(val)
    def __get_begin_pos_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(m_big_money_begin_cache, code)
        cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_begin_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = self.__get_begin_pos(code)
        tool.CodeDataCacheUtil.set_cache(m_big_money_begin_cache, code, val)
        tool.CodeDataCacheUtil.set_cache(self.m_big_money_begin_cache, code, val)
        return val
    # 清除已经处理的数据
    def clear_processed_end_index(self, code):
        tool.CodeDataCacheUtil.clear_cache(m_big_money_process_index_cache, code)
        tool.CodeDataCacheUtil.clear_cache(self.m_big_money_process_index_cache, code)
        key = "m_big_money_process_index-{}".format(code)
        RedisUtils.delete(self.__get_redis(), key)
        RedisUtils.delete_async(self._db, key)
    # 添加已经处理过的单
    def __set_processed_end_index(self, code, index):
        tool.CodeDataCacheUtil.set_cache(m_big_money_process_index_cache, code, index)
        tool.CodeDataCacheUtil.set_cache(self.m_big_money_process_index_cache, code, index)
        key = "m_big_money_process_index-{}".format(code)
        RedisUtils.setex_async(self.__get_redis(), key, tool.get_expire(), index)
        RedisUtils.setex_async(self._db, key, tool.get_expire(), index)
    # 是否已经处理过
    def __get_processed_end_index(self, code):
@@ -114,11 +120,11 @@
        return int(val)
    def __get_processed_end_index_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(m_big_money_process_index_cache, code)
        cache_result = tool.CodeDataCacheUtil.get_cache(self.m_big_money_process_index_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = self.__get_processed_end_index(code)
        tool.CodeDataCacheUtil.set_cache(m_big_money_process_index_cache, code, val)
        tool.CodeDataCacheUtil.set_cache(self.m_big_money_process_index_cache, code, val)
        return val
    # 处理大单
@@ -310,7 +316,7 @@
                        # 当前涨停价,设置涨停时间
                        logger_l2_process.info("开盘涨停:{}", code)
                        # 保存涨停时间
                        limit_up_time_manager.save_limit_up_time(code, "09:30:00")
                        limit_up_time_manager.LimitUpTimeManager().save_limit_up_time(code, "09:30:00")
        total_datas = local_today_datas[code]
        __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
@@ -323,7 +329,7 @@
            volume_rate = code_volumn_manager.get_volume_rate(code)
            volume_rate_index = code_volumn_manager.get_volume_rate_index(volume_rate)
            # 计算分值
            limit_up_time = limit_up_time_manager.get_limit_up_time(code)
            limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code)
            if limit_up_time is None:
                limit_up_time = tool.get_now_time_str()
            score = first_code_score_manager.get_score(code, volume_rate, limit_up_time, True)
@@ -681,10 +687,10 @@
        # is_limited_up = gpcode_manager.FirstCodeManager().is_limited_up(code)
        # if not is_limited_up:
        #     gpcode_manager.FirstCodeManager().add_limited_up_record([code])
        #     place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(
        #     place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count(
        #         code)
        #     if place_order_count == 0:
        #         trade_data_manager.placeordercountmanager.place_order(code)
        #         trade_data_manager.PlaceOrderCountManager.place_order(code)
        #     return False, True, "首板代码,且尚未涨停过"
        try:
@@ -729,7 +735,7 @@
            if volumn_rate >= 1.3:
                return False, False, "最大量比超过1.3不能买"
            limit_up_time = limit_up_time_manager.get_limit_up_time(code)
            limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code)
            if limit_up_time is not None:
                limit_up_time_seconds = l2.l2_data_util.L2DataUtil.get_time_as_second(
                    limit_up_time)
@@ -877,7 +883,7 @@
                                                                                                  0, min_money_w)
            if left_big_num > 0:
                # 重新获取分数与分数索引
                limit_up_time = limit_up_time_manager.get_limit_up_time(code)
                limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code)
                if limit_up_time is None:
                    limit_up_time = tool.get_now_time_str()
                score = first_code_score_manager.get_score(code, cls.volume_rate_info[code][0], limit_up_time, True,
@@ -895,17 +901,18 @@
            score = cls.__l2PlaceOrderParamsManagerDict[code].score
            score_info = cls.__l2PlaceOrderParamsManagerDict[code].score_info
            lp = LineProfiler()
            lp.enable()
            lp_wrap = lp(cls.can_buy_first)
            results = lp_wrap(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code])
            output = io.StringIO()
            lp.print_stats(stream=output)
            lp.disable()
            with open(f"{constant.get_path_prefix()}/logs/profile/{code}_can_buy_first.txt", 'w') as f:
                f.write(output.getvalue())
            # return cls.can_buy_first(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code])
            return results
            # lp = LineProfiler()
            # lp.enable()
            # lp_wrap = lp(cls.can_buy_first)
            # results = lp_wrap(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code])
            # output = io.StringIO()
            # lp.print_stats(stream=output)
            # lp.disable()
            # with open(f"{constant.get_path_prefix()}/logs/profile/{code}_can_buy_first.txt", 'w') as f:
            #     f.write(output.getvalue())
            # return results
            return cls.can_buy_first(code, limit_up_price, score_index, score, score_info, cls.volume_rate_info[code])
        else:
            return True, False, "在想买名单中"
@@ -1092,7 +1099,7 @@
            f1 = dask.delayed(cls.__save_order_begin_data)(code, buy_single_index, compute_index, compute_index,
                                                           buy_nums, buy_count, max_num_set_new,
                                                           cls.volume_rate_info[code][0])
            f2 = dask.delayed(limit_up_time_manager.save_limit_up_time)(code, total_datas[compute_index]["val"]["time"])
            f2 = dask.delayed(limit_up_time_manager.LimitUpTimeManager().save_limit_up_time)(code, total_datas[compute_index]["val"]["time"])
            f3 = dask.delayed(cls.__virtual_buy)(code, buy_single_index, compute_index, capture_time)
            f4 = dask.delayed(l2_data_manager.TradePointManager.delete_buy_cancel_point)(code)
            f5 = dask.delayed(L2LimitUpMoneyStatisticUtil.process_data)(code, buy_single_index,
@@ -1262,7 +1269,7 @@
        # 目标手数
        threshold_num = round(threshold_money / (limit_up_price * 100))
        # place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        # place_order_count = trade_data_manager.PlaceOrderCountManager.get_place_order_count(code)
        # 目标订单数量
        threshold_count = cls.__l2PlaceOrderParamsManagerDict[code].get_safe_count()
output/code_info_output.py
@@ -102,7 +102,7 @@
    if is_target_code:
        params["score_data"] = {}
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code)
        volume_rate, volume_info = code_volumn_manager.get_volume_rate(code, True)
        (score, score_list), score_source_list = first_code_score_manager.get_score(code, volume_rate, limit_up_time,
                                                                                    True)
@@ -457,7 +457,7 @@
        if code not in scores:
            # 获取分数
            try:
                limit_up_time = limit_up_time_manager.get_limit_up_time(code)
                limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code)
                volume_rate, volume_info = code_volumn_manager.get_volume_rate(code, True)
                (score, score_list), score_source_list = first_code_score_manager.get_score(code, volume_rate,
                                                                                            limit_up_time,
output/limit_up_data_filter.py
@@ -68,7 +68,7 @@
        if code not in scores:
            # 获取分数
            try:
                limit_up_time = limit_up_time_manager.get_limit_up_time(code)
                limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code)
                volume_rate, volume_info = code_volumn_manager.get_volume_rate(code, True)
                (score, score_list), score_source_list = first_code_score_manager.get_score(code, volume_rate,
                                                                                            limit_up_time,
test/l2_trade_test.py
@@ -33,7 +33,7 @@
    l2.l2_data_manager.TradePointManager.delete_buy_point(code)
    big_money_num_manager.reset(code)
    RedisUtils.delete( redis_manager.RedisManager(2).getRedis(), "trade-state-{}".format(code))
    trade_data_manager.placeordercountmanager.clear_place_order_count(code)
    trade_data_manager.PlaceOrderCountManager.clear_place_order_count(code)
    redis_info = redis_manager.RedisManager(0).getRedis()
    keys = RedisUtils.keys(redis_info, "*{}*".format(code), auto_free=False)
    for k in keys:
third_data/code_plate_key_manager.py
@@ -422,7 +422,7 @@
                                                                                  yesterday_current_limit_up_codes, 50)
            is_top_4 = is_top_8_record and is_top_4_current
            msg_list.append(f"\n实时top10(涨停数量:{len(current_limit_up_datas)})")
            msg_list.append(f"历史top20(涨停数量:{top_8_record})")
            msg_list.append(f"历史top20(涨停数量:{len(top_8_record)})")
            # 获取主板实时身位,剔除高位板
            current_shsz_rank = kpl_block_util.get_code_current_rank(code, block, current_limit_up_datas,
trade/l2_trade_factor.py
@@ -431,7 +431,7 @@
        # 首次涨停时间
        limit_up_time = global_util.limit_up_time.get(code)
        if limit_up_time is None:
            limit_up_time = limit_up_time_manager.get_limit_up_time(code)
            limit_up_time = limit_up_time_manager.LimitUpTimeManager().get_limit_up_time_cache(code)
        big_money_num = global_util.big_money_num.get(code)
        if big_money_num is None:
trade/trade_data_manager.py
@@ -369,7 +369,7 @@
# 涨停次数管理
class placeordercountmanager:
class PlaceOrderCountManager:
    __redisManager = redis_manager.RedisManager(0)
    @classmethod
trade/trade_result_manager.py
@@ -18,7 +18,7 @@
def virtual_buy_success(code):
    # 增加下单计算
    trade_data_manager.placeordercountmanager.place_order(code)
    trade_data_manager.PlaceOrderCountManager.place_order(code)
    # 删除之前的板上卖信息
    L2LimitUpSellStatisticUtil.delete(code)