Administrator
2023-08-09 b814fe4d36bcd714832eff6017ac8390199cffce
单例+缓存优化
1个文件已删除
16个文件已修改
639 ■■■■ 已修改文件
db/redis_manager.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/huaxin/huaxin_delegate_postion_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/safe_count_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/code_info_output.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/limit_up_data_filter.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/deal_big_money_manager.py 166 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_server.py 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin_trade.py 167 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_data_manager.py 98 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_huaxin.py 61 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_juejin.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_queue_manager.py 47 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/data_export_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager.py
@@ -57,9 +57,8 @@
    @classmethod
    def delete_async(cls, db, key, auto_free=True):
        __start_time = time.time()
        cls.add_async_task(db, "delete", (key))
        logger_redis_debug.info("delete_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
        logger_redis_debug.info("delete_async({}):{}", 0, key)
    @classmethod
    def keys(cls, redis_, key, auto_free=True):
@@ -75,9 +74,8 @@
    @classmethod
    def setex_async(cls, db, key, expire, val, auto_free=True):
        __start_time = time.time()
        cls.add_async_task(db, "setex", (key, expire, val))
        logger_redis_debug.info("setex_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
        logger_redis_debug.info("setex_async({}):{}", 0, key)
    @classmethod
    def setnx(cls, redis_, key, val, auto_free=True):
@@ -89,10 +87,8 @@
    @classmethod
    def expire_async(cls, db, key, expire, auto_free=True):
        __start_time = time.time()
        cls.add_async_task(db, "expire", (key, expire))
        logger_redis_debug.info("expire_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
        logger_redis_debug.info("expire_async({}):{}", 0, key)
    @classmethod
    def sadd(cls, redis_, key, val, auto_free=True):
@@ -100,9 +96,8 @@
    @classmethod
    def sadd_async(cls, db, key, val, auto_free=True):
        __start_time = time.time()
        cls.add_async_task(db, "sadd", (key, val))
        logger_redis_debug.info("sadd_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
        logger_redis_debug.info("sadd_async({}):{}", 0, key)
    @classmethod
    def sismember(cls, redis_, key, val, auto_free=True):
@@ -118,9 +113,8 @@
    @classmethod
    def srem_async(cls, db, key, val, auto_free=True):
        __start_time = time.time()
        cls.add_async_task(db, "srem", (key, val))
        logger_redis_debug.info("srem_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
        logger_redis_debug.info("srem_async({}):{}", 0, key)
    @classmethod
    def incrby(cls, redis_, key, num, auto_free=True, _async=False):
@@ -128,9 +122,8 @@
    @classmethod
    def incrby_async(cls, db, key, num, auto_free=True):
        __start_time = time.time()
        cls.add_async_task(db, "incrby", (key, num))
        logger_redis_debug.info("incrby_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
        logger_redis_debug.info("incrby_async({}):{}", 0, key)
    @classmethod
    def lpush(cls, redis_, key, val, auto_free=True):
@@ -165,7 +158,7 @@
                if data:
                    db = data[0]
                    method_name = data[1]
                    print(db,method_name)
                    print(db, method_name)
                    args = data[2]
                    _redis = RedisManager(db).getRedisNoPool()
                    method = getattr(_redis, method_name)
@@ -183,7 +176,7 @@
if __name__ == "__main__":
    RedisUtils.setex_async(0, "test", tool.get_expire(), "123123")
    print("大小",RedisUtils.get_async_task_count())
    print("大小", RedisUtils.get_async_task_count())
    RedisUtils.incrby_async(0, "test_1", 1)
    print("大小", RedisUtils.get_async_task_count())
    RedisUtils.delete_async(0, "test")
l2/cancel_buy_strategy.py
@@ -445,7 +445,7 @@
    def __del_compute_data(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_compute_data_cache, code)
        key = "h_cancel_compute_data-{}".format(code)
        RedisUtils.delete(self.__get_redis(), key)
        RedisUtils.delete_async(self.__db, key)
    def __clear_data(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_watch_indexs_cache, code)
@@ -458,7 +458,7 @@
              f"h_cancel_watch_indexs-{code}", f"h_cancel_traded_progress-{code}",
              f"h_cancel_watch_canceled_indexs-{code}"]
        for key in ks:
            RedisUtils.delete(self.__get_redis(), key)
            RedisUtils.delete_async(self.__db, key)
    def need_cancel(self, code, buy_single_index, buy_exec_index, start_index, end_index, total_data,
                    local_today_num_operate_map,
@@ -1277,7 +1277,7 @@
        buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index]["val"]["time"])
        # 获取最大封单额
        max_buy1_volume = self._thsBuy1VolumnManager.get_max_buy1_volume(code)
        max_buy1_volume = self._thsBuy1VolumnManager.get_max_buy1_volume_cache(code)
        # 从同花顺买1矫正过后的位置开始计算,到end_index结束
l2/huaxin/huaxin_delegate_postion_manager.py
@@ -38,7 +38,7 @@
    exec_index = order_info[2]
    # 获取量
    for d in datas:
        if d["val"]["num"] != volume:
        if d["val"]["num"] != volume//100:
            continue
        if abs(float(price) - float(d["val"]["price"])) >= 0.01:
            continue
l2/safe_count_manager.py
@@ -81,8 +81,8 @@
        tool.CodeDataCacheUtil.set_cache(self.latest_place_order_info_cache, code,
                                         (buy_single_index, buy_exec_index, cancel_index))
        key = "latest_place_order_info-{}".format(code)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(),
                         json.dumps((buy_single_index, buy_exec_index, cancel_index)))
        RedisUtils.setex_async(self.__db, key, tool.get_expire(),
                               json.dumps((buy_single_index, buy_exec_index, cancel_index)))
    def __get_latest_place_order_info(self, code):
        key = "latest_place_order_info-{}".format(code)
output/code_info_output.py
@@ -123,7 +123,7 @@
                                           "money": (score_source_list[2] if score_source_list[2] else 0)}
        # 资金力度
        deal_indexes = trade.deal_big_money_manager.get_traded_indexes(code)
        deal_indexes = trade.deal_big_money_manager.DealComputeProgressManager().get_traded_indexes(code)
        deal_info = ""
        params["score_data"]["deal_big_money"] = {"score": score_list[8], "money": score_source_list[8][0] // 10000,
                                                  "base_money": score_source_list[8][1] // 10000,
output/limit_up_data_filter.py
@@ -24,7 +24,7 @@
    def ignore_code(self, type, code):
        RedisUtils.sadd(self.__get_redis(), f"kp_ignore_codes_{type}", code)
        RedisUtils.expire( self.__get_redis(), f"kp_ignore_codes_{type}", tool.get_expire())
        RedisUtils.expire(self.__get_redis(), f"kp_ignore_codes_{type}", tool.get_expire())
    def list_ignore_codes(self, type):
        return RedisUtils.smembers(self.__get_redis(), f"kp_ignore_codes_{type}")
@@ -46,7 +46,9 @@
# 获取涨停信息
def get_limit_up_info(codes):
    limit_up_data = __kplDataManager.get_data(KPLDataType.LIMIT_UP)
    limit_up_codes = set([val[0] for val in limit_up_data])
    limit_up_codes = []
    if limit_up_data:
        limit_up_codes = set([val[0] for val in limit_up_data])
    open_limit_up_data = __kplDataManager.get_data(KPLDataType.OPEN_LIMIT_UP)
    open_limit_up_codes = set()
    if open_limit_up_data:
server.py
@@ -440,7 +440,7 @@
                                                                           buy_progress_index,
                                                                           json.dumps(buy_queue_result_list))
                                            # 计算大单成交额
                                            deal_big_money_manager.set_trade_progress(code, buy_progress_index,
                                            deal_big_money_manager.DealComputeProgressManager().set_trade_progress(code, buy_progress_index,
                                                                                      l2.l2_data_util.local_today_datas.get(
                                                                                          code),
                                                                                      l2.l2_data_util.local_today_num_operate_map.get(
third_data/code_plate_key_manager.py
@@ -323,8 +323,8 @@
        k1 = set()
        if code in LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict:
            k1 = {LimitUpCodesPlateKeyManager.today_total_limit_up_reason_dict[code]}
        # 加载今日历史原因
        k11 = RedisUtils.smembers(self.__get_redis(), f"kpl_limit_up_reason_his-{code}")
        # 加载今日历史原因,暂时不需要历史原因了
        k11 = set()  # RedisUtils.smembers(self.__get_redis(), f"kpl_limit_up_reason_his-{code}")
        k2 = self.__CodesPlateKeysManager.get_history_limit_up_reason(code)
        if k2 is None:
            k2 = set()
@@ -361,7 +361,6 @@
    __TargetCodePlateKeyManager = TargetCodePlateKeyManager()
    __LimitUpCodesPlateKeyManager = LimitUpCodesPlateKeyManager()
    __CodesHisReasonAndBlocksManager = CodesHisReasonAndBlocksManager()
    # 获取可以买的板块
    # current_limit_up_datas: 今日实时涨停
@@ -426,9 +425,11 @@
            # 获取主板实时身位,剔除高位板
            current_shsz_rank = kpl_block_util.get_code_current_rank(code, block, current_limit_up_datas,
                                                                     code_limit_up_reason_dict, yesterday_current_limit_up_codes, shsz=True)
                                                                     code_limit_up_reason_dict,
                                                                     yesterday_current_limit_up_codes, shsz=True)
            record_shsz_rank = kpl_block_util.get_code_record_rank(code, block, limit_up_record_datas,
                                                                   code_limit_up_reason_dict, yesterday_current_limit_up_codes, shsz=True)
                                                                   code_limit_up_reason_dict,
                                                                   yesterday_current_limit_up_codes, shsz=True)
            # 获取主板历史身位
            if is_top_4:
                pen_limit_up_codes = kpl_block_util.get_shsz_open_limit_up_codes(code, block, limit_up_record_datas,
@@ -552,7 +553,7 @@
        for c in codes:
            keys_, k1_, k11_, k2_, k3_, k4_ = cls.__TargetCodePlateKeyManager.get_plate_keys(c)
            # 实时涨停原因
            trade_codes_blocks_dict[c] = k1_|k4_
            trade_codes_blocks_dict[c] = k1_ | k4_
        # 统计板块中的代码
        trade_block_codes_dict = {}
        for c in trade_codes_blocks_dict:
trade/deal_big_money_manager.py
@@ -8,96 +8,112 @@
from db import redis_manager
from l2 import l2_data_util, l2_data_source_util
__last_progress = {}
__redisManager = redis_manager.RedisManager(2)
class DealComputeProgressManager:
    __db = 2
    __redisManager = redis_manager.RedisManager(2)
    __deal_compute_progress_cache = {}
    __last_progress = {}
    __instance = None
def __get_redis():
    return __redisManager.getRedis()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(DealComputeProgressManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
def __get_cancel_data(code, buy_data, local_today_num_operate_map):
    val = buy_data['val']
    cancel_datas = local_today_num_operate_map.get(
        "{}-{}-{}".format(val["num"], "1", val["price"]))
    if cancel_datas:
        for cancel_data in cancel_datas:
            buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
                                                                                             local_today_num_operate_map)
            if buy_index == buy_data["index"]:
                return cancel_data
    return None
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "deal_compute_info-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.__deal_compute_progress_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
def __save_traded_index(code, index):
    RedisUtils.sadd(__get_redis(), f"deal_indexes-{code}", index)
    RedisUtils.expire(__get_redis(), f"deal_indexes-{code}", tool.get_expire())
    # 获取成交计算进度
    def __get_deal_compute_progress(self, code):
        val = RedisUtils.get(self.__get_redis(), f"deal_compute_info-{code}")
        if val is None:
            return -1, 0
        val = json.loads(val)
        return val[0], val[1]
def __get_traded_indexes(code):
    return RedisUtils.smembers(__get_redis(), f"deal_indexes-{code}")
# 获取成交的索引
def get_traded_indexes(code):
    return __get_traded_indexes(code)
__deal_compute_progress_cache = {}
# 获取成交计算进度
def __get_deal_compute_progress(code):
    val = RedisUtils.get(__get_redis(), f"deal_compute_info-{code}")
    if val is None:
    # 获取成交计算进度
    def get_deal_compute_progress_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__deal_compute_progress_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return -1, 0
    val = json.loads(val)
    return val[0], val[1]
    # 设置成交进度
    def __set_deal_compute_progress(self, code, index, money):
        tool.CodeDataCacheUtil.set_cache(self.__deal_compute_progress_cache, code, (index, money))
        RedisUtils.setex_async(self.__db, f"deal_compute_info-{code}", tool.get_expire(), json.dumps((index, money)))
# 获取成交计算进度
def __get_deal_compute_progress_cache(code):
    cache_result = tool.CodeDataCacheUtil.get_cache(__deal_compute_progress_cache, code)
    if cache_result[0]:
        return cache_result[1]
    val = __get_deal_compute_progress(code)
    tool.CodeDataCacheUtil.set_cache(__deal_compute_progress_cache, code, val)
    return val
    # 设置成交进度
    def set_trade_progress(self, code, progress, total_data, local_today_num_operate_map):
        if self.__last_progress.get(code) == progress:
            return
        self.__last_progress[code] = progress
        # 计算从开始位置到成交位置
        c_index, deal_num = self.get_deal_compute_progress_cache(code)
        process_index = c_index
        for i in range(c_index + 1, progress):
            data = total_data[i]
            val = data['val']
            process_index = i
            # 是否有大单
            if not l2_data_util.is_big_money(val):
                continue
            if l2_data_util.L2DataUtil.is_limit_up_price_buy(val):
                # 是否已经取消
                cancel_data = self.__get_cancel_data(code, data, local_today_num_operate_map)
                if cancel_data is None:
                    deal_num += val["num"] * data["re"]
                    self.__save_traded_index(code, data["index"])
        self.__set_deal_compute_progress(code, process_index, deal_num)
# 设置成交进度
def __set_deal_compute_progress(code, index, money):
    tool.CodeDataCacheUtil.set_cache(__deal_compute_progress_cache, code, (index, money))
    RedisUtils.setex(__get_redis(), f"deal_compute_info-{code}", tool.get_expire(), json.dumps((index, money)))
    def get_deal_big_money_num(self, code):
        if code in self.__deal_compute_progress_cache:
            return self.__deal_compute_progress_cache.get(code)[1]
        compute_index, num = self.get_deal_compute_progress_cache(code)
        return num
    def __get_cancel_data(self, code, buy_data, local_today_num_operate_map):
        val = buy_data['val']
        cancel_datas = local_today_num_operate_map.get(
            "{}-{}-{}".format(val["num"], "1", val["price"]))
        if cancel_datas:
            for cancel_data in cancel_datas:
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data(code, cancel_data,
                                                                                                 local_today_num_operate_map)
                if buy_index == buy_data["index"]:
                    return cancel_data
        return None
# 设置成交进度
def set_trade_progress(code, progress, total_data, local_today_num_operate_map):
    if __last_progress.get(code) == progress:
        return
    __last_progress[code] = progress
    # 计算从开始位置到成交位置
    c_index, deal_num = __get_deal_compute_progress_cache(code)
    process_index = c_index
    for i in range(c_index + 1, progress):
        data = total_data[i]
        val = data['val']
        process_index = i
        # 是否有大单
        if not l2_data_util.is_big_money(val):
            continue
        if l2_data_util.L2DataUtil.is_limit_up_price_buy(val):
            # 是否已经取消
            cancel_data = __get_cancel_data(code, data, local_today_num_operate_map)
            if cancel_data is None:
                deal_num += val["num"] * data["re"]
                __save_traded_index(code, data["index"])
    def __save_traded_index(self, code, index):
        RedisUtils.sadd(self.__get_redis(), f"deal_indexes-{code}", index)
        RedisUtils.expire(self.__get_redis(), f"deal_indexes-{code}", tool.get_expire())
    __set_deal_compute_progress(code, process_index, deal_num)
    def __get_traded_indexes(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"deal_indexes-{code}")
    # 获取成交的索引
    def get_traded_indexes(self, code):
        return self.__get_traded_indexes(code)
def get_deal_big_money_num(code):
    if code in __deal_compute_progress_cache:
        return __deal_compute_progress_cache.get(code)[1]
    compute_index, num = __get_deal_compute_progress_cache(code)
    return num
    val = DealComputeProgressManager().get_deal_compute_progress_cache(code)
    return val[1]
trade/huaxin/trade_server.py
@@ -212,13 +212,13 @@
                                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                                        if buy_exec_index:
                                            need_cancel, msg = DCancelBigNumComputer().set_trade_progress(code,
                                                                                                        buy_progress_index,
                                                                                                        buy_exec_index,
                                                                                                        total_datas,
                                                                                                        num_operate_map,
                                                                                                        num * 100 * float(
                                                                                                            limit_up_price),
                                                                                                        limit_up_price)
                                                                                                          buy_progress_index,
                                                                                                          buy_exec_index,
                                                                                                          total_datas,
                                                                                                          num_operate_map,
                                                                                                          num * 100 * float(
                                                                                                              limit_up_price),
                                                                                                          limit_up_price)
                                            if need_cancel:
                                                L2TradeDataProcessor.cancel_buy(code, f"D撤:{msg}", source="d_cancel")
@@ -228,12 +228,14 @@
                                                                                                         total_datas,
                                                                                                         num_operate_map)
                                        f2 = dask.delayed(LCancelBigNumComputer().set_trade_progress)(code,
                                                                                                    buy_progress_index,
                                                                                                    total_datas)
                                        f3 = dask.delayed(deal_big_money_manager.set_trade_progress)(code,
                                                                                                     buy_progress_index,
                                                                                                     total_datas,
                                                                                                     num_operate_map)
                                                                                                      buy_progress_index,
                                                                                                      total_datas)
                                        f3 = dask.delayed(
                                            deal_big_money_manager.DealComputeProgressManager().set_trade_progress)(
                                            code,
                                            buy_progress_index,
                                            total_datas,
                                            num_operate_map)
                                        dask.compute(f1, f2, f3)
                            except Exception as e:
                                hx_logger_l2_transaction.exception(e)
trade/huaxin_trade.py
File was deleted
trade/trade_data_manager.py
@@ -52,71 +52,107 @@
class TradeBuyDataManager:
    __db = 0
    redisManager = redis_manager.RedisManager(0)
    buy_sure_position_dict = {}
    __buy_position_info_cache = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradeBuyDataManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "buy_position_info-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = json.loads(val)
                tool.CodeDataCacheUtil.set_cache(cls.__buy_position_info_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    # 设置买入点的信息
    # trade_time: 买入点截图时间与下单提交时间差值
    # capture_time: 买入点截图时间
    # last_data: 买入点最后一条数据
    @classmethod
    def set_buy_position_info(cls, code, capture_time, trade_time, last_data, last_data_index):
        RedisUtils.setex(cls.redisManager.getRedis(), "buy_position_info-{}".format(code), tool.get_expire(),
                         json.dumps((capture_time, trade_time, last_data, last_data_index)))
    def set_buy_position_info(self, code, capture_time, trade_time, last_data, last_data_index):
        val = (capture_time, trade_time, last_data, last_data_index)
        tool.CodeDataCacheUtil.set_cache(self.__buy_position_info_cache, code, val)
        RedisUtils.setex_async(self.__db, "buy_position_info-{}".format(code), tool.get_expire(),
                               json.dumps(val))
    # 获取买入点信息
    @classmethod
    def get_buy_position_info(cls, code):
        val_str = RedisUtils.get(cls.redisManager.getRedis(), "buy_position_info-{}".format(code))
    def get_buy_position_info(self, code):
        val_str = RedisUtils.get(self.redisManager.getRedis(), "buy_position_info-{}".format(code))
        if val_str is None:
            return None, None, None, None
        else:
            val = json.loads(val_str)
            return val[0], val[1], val[2], val[3]
    def get_buy_position_info_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy_position_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None, None, None, None
    # 删除买入点信息
    @classmethod
    def remove_buy_position_info(cls, code):
        RedisUtils.delete(cls.redisManager.getRedis(), "buy_position_info-{}".format(code))
    def remove_buy_position_info(self, code):
        tool.CodeDataCacheUtil.clear_cache(self.__buy_position_info_cache, code)
        RedisUtils.delete_async(self.__db, "buy_position_info-{}".format(code))
    # 设置买入确认点信息
    @classmethod
    def __set_buy_sure_position(cls, code, index, data):
    def __set_buy_sure_position(self, code, index, data):
        logger_trade.debug("买入确认点信息: code:{} index:{} data:{}", code, index, data)
        key = "buy_sure_position-{}".format(code)
        RedisUtils.setex(cls.redisManager.getRedis(), key, tool.get_expire(), json.dumps((index, data)))
        cls.buy_sure_position_dict[code] = (index, data)
        RedisUtils.setex(self.redisManager.getRedis(), key, tool.get_expire(), json.dumps((index, data)))
        self.buy_sure_position_dict[code] = (index, data)
        # 移除下单信号的详细信息
        cls.remove_buy_position_info(code)
        self.remove_buy_position_info(code)
    # 清除买入确认点信息
    @classmethod
    def __clear_buy_sure_position(cls, code):
    def __clear_buy_sure_position(self, code):
        key = "buy_sure_position-{}".format(code)
        RedisUtils.delete(cls.redisManager.getRedis(), key)
        if code in cls.buy_sure_position_dict:
            cls.buy_sure_position_dict.pop(code)
        RedisUtils.delete(self.redisManager.getRedis(), key)
        if code in self.buy_sure_position_dict:
            self.buy_sure_position_dict.pop(code)
    # 获取买入确认点信息
    @classmethod
    def get_buy_sure_position(cls, code):
        temp = cls.buy_sure_position_dict.get(code)
    def get_buy_sure_position(self, code):
        temp = self.buy_sure_position_dict.get(code)
        if temp is not None:
            return temp[0], temp[1]
        key = "buy_sure_position-{}".format(code)
        val = RedisUtils.get(cls.redisManager.getRedis(), key)
        val = RedisUtils.get(self.redisManager.getRedis(), key)
        if val is None:
            return None, None
        else:
            val = json.loads(val)
            cls.buy_sure_position_dict[code] = (val[0], val[1])
            self.buy_sure_position_dict[code] = (val[0], val[1])
            return val[0], val[1]
    # 处理买入确认点信息
    @classmethod
    def process_buy_sure_position_info(cls, code, capture_time, l2_today_datas, l2_latest_data, l2_add_datas):
        buy_capture_time, trade_time, l2_data, l2_data_index = cls.get_buy_position_info(code)
    def process_buy_sure_position_info(self, code, capture_time, l2_today_datas, l2_latest_data, l2_add_datas):
        buy_capture_time, trade_time, l2_data, l2_data_index = self.get_buy_position_info_cache(code)
        if buy_capture_time is None:
            # 没有购买者信息
            return None
@@ -138,15 +174,15 @@
                    if l2_data_util.get_time_as_seconds(_time) - old_time_int >= 2:
                        index = i - 1
                        data = l2_today_datas[index]
                        cls.__set_buy_sure_position(code, index, data)
                        self.__set_buy_sure_position(code, index, data)
                        break
            else:
                cls.__set_buy_sure_position(code, l2_data_index, l2_data)
                self.__set_buy_sure_position(code, l2_data_index, l2_data)
        elif new_time_int - old_time_int >= 0:
            # 间隔2s内表示数据正常,将其位置设置为新增数据的中间位置
            index = len(l2_today_datas) - 1 - (len(l2_add_datas)) // 2
            data = l2_today_datas[index]
            cls.__set_buy_sure_position(code, index, data)
            self.__set_buy_sure_position(code, index, data)
        else:
            # 间隔时间小于0 ,一般产生原因是数据回溯产生,故不做处理
            logger_trade.warning("预估委托位置错误:数据间隔时间小于0 code-{}", code)
trade/trade_huaxin.py
@@ -16,27 +16,58 @@
# 交易订单号管理
class TradeOrderIdManager:
    __db = 2
    __redisManager = RedisManager(2)
    __instance = None
    __huaxin_order_id_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(TradeOrderIdManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    # 添加订单ID
    @classmethod
    def add_order_id(cls, code, account_id, order_id):
        RedisUtils.sadd( cls.__get_redis(), f"huaxin_order_id-{code}", json.dumps((account_id, order_id)))
        RedisUtils.expire(cls.__get_redis(), f"huaxin_order_id-{code}", tool.get_expire())
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "huaxin_order_id-*")
            for k in keys:
                code = k.split("-")[-1]
                vals = RedisUtils.smembers(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__huaxin_order_id_cache, code, vals)
        finally:
            RedisUtils.realse(__redis)
    # 添加订单ID
    def add_order_id(self, code, account_id, order_id):
        val = json.dumps((account_id, order_id))
        if code not in self.__huaxin_order_id_cache:
            self.__huaxin_order_id_cache[code] = set()
        self.__huaxin_order_id_cache[code].add(val)
        RedisUtils.sadd_async(self.__db, f"huaxin_order_id-{code}", val)
        RedisUtils.expire_async(self.__db, f"huaxin_order_id-{code}", tool.get_expire())
    # 删除订单ID
    @classmethod
    def remove_order_id(cls, code, account_id, order_id):
        RedisUtils.srem(cls.__get_redis(), f"huaxin_order_id-{code}", json.dumps((account_id, order_id)))
    def remove_order_id(self, code, account_id, order_id):
        val = json.dumps((account_id, order_id))
        if code in self.__huaxin_order_id_cache:
            self.__huaxin_order_id_cache[code].discard(val)
        RedisUtils.srem_async(self.__get_redis(), f"huaxin_order_id-{code}", val)
    # 查询所有的订单号
    @classmethod
    def list_order_ids(cls, code):
        return RedisUtils.smembers(cls.__get_redis(), f"huaxin_order_id-{code}")
    def list_order_ids(self, code):
        return RedisUtils.smembers(self.__get_redis(), f"huaxin_order_id-{code}")
    def list_order_ids_cache(self, code):
        if code in self.__huaxin_order_id_cache:
            return self.__huaxin_order_id_cache[code]
        return set()
def init(context):
@@ -82,7 +113,7 @@
                logger_juejin_trade.info(f"{code}:下单失败:{result.get('statusMsg')}")
                raise Exception(result.get('statusMsg'))
            else:
                TradeOrderIdManager.add_order_id(code, result["accountID"], result["orderSysID"])
                TradeOrderIdManager().add_order_id(code, result["accountID"], result["orderSysID"])
                logger_juejin_trade.info(f"{code}:下单成功 orderSysID:{result['orderSysID']}")
                return result["securityId"], result["accountID"], result["orderSysID"]
        else:
@@ -92,16 +123,16 @@
def order_success(code, accountId, orderSysID):
    TradeOrderIdManager.add_order_id(code, accountId, orderSysID)
    TradeOrderIdManager().add_order_id(code, accountId, orderSysID)
def cancel_order_success(code, accountId, orderSysID):
    TradeOrderIdManager.remove_order_id(code, accountId, orderSysID)
    TradeOrderIdManager().remove_order_id(code, accountId, orderSysID)
# 撤单
def cancel_order(code):
    orders_info = TradeOrderIdManager.list_order_ids(code)
    orders_info = TradeOrderIdManager().list_order_ids_cache(code)
    orders = []
    if orders_info:
        for order in orders_info:
@@ -112,7 +143,7 @@
        logger_juejin_trade.info(f"{code}:撤单成功,撤单数量:{len(orders)}")
        for order in orders:
            huaxin_trade_api.cancel_order(1, code, order["orderSysID"])
            TradeOrderIdManager.remove_order_id(code, order["accountId"], order["orderSysID"])
            TradeOrderIdManager().remove_order_id(code, order["accountId"], order["orderSysID"])
if __name__ == "__main__":
trade/trade_juejin.py
@@ -86,7 +86,7 @@
            logger_juejin_trade.info(f"{code}:下单失败:{result['ord_rej_reason_detail']}")
            raise Exception(result["ord_rej_reason_detail"])
        else:
            TradeOrderIdManager.add_order_id(code, result["account_id"], result["cl_ord_id"])
            TradeOrderIdManager().add_order_id(code, result["account_id"], result["cl_ord_id"])
            logger_juejin_trade.info(f"{code}:下单成功 ord_id:{result['cl_ord_id']}")
            return result["symbol"].split(".")[1], result["account_id"], result["cl_ord_id"]
    else:
@@ -95,7 +95,7 @@
# 撤单
def cancel_order(code):
    orders_info = TradeOrderIdManager.list_order_ids(code)
    orders_info = TradeOrderIdManager().list_order_ids(code)
    orders = []
    if orders_info:
        for order in orders_info:
@@ -108,7 +108,7 @@
            gmapi.order_cancel(orders)
        logger_juejin_trade.info(f"{code}:撤单成功,撤单数量:{len(orders)}")
        for order in orders:
            TradeOrderIdManager.remove_order_id(code, order["account_id"], order["cl_ord_id"])
            TradeOrderIdManager().remove_order_id(code, order["account_id"], order["cl_ord_id"])
# 撤单
trade/trade_manager.py
@@ -456,7 +456,7 @@
# 中断买入
def break_buy(code, reason):
    trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
    trade_data_manager.TradeBuyDataManager().remove_buy_position_info(code)
# 购买
@@ -490,7 +490,7 @@
    # 下单成功,加入固定代码库
    l2_data_manager.add_to_l2_fixed_codes(code)
    # 记录下单的那一帧图片的截图时间与交易用时
    trade_data_manager.TradeBuyDataManager.set_buy_position_info(code, capture_timestamp, use_time, last_data,
    trade_data_manager.TradeBuyDataManager().set_buy_position_info(code, capture_timestamp, use_time, last_data,
                                                                 last_data_index)
    print("买入结束")
@@ -561,7 +561,7 @@
# 取消委托成功
def __cancel_success(code):
    trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
    trade_data_manager.TradeBuyDataManager().remove_buy_position_info(code)
    # 下单成功,加入固定代码库
    l2_data_manager.remove_from_l2_fixed_codes(code)
    logger_trade.info("{}撤单成功".format(code))
trade/trade_queue_manager.py
@@ -10,17 +10,41 @@
class THSBuy1VolumnManager:
    __db = 1
    __redisManager = redis_manager.RedisManager(1)
    __last_data = {}
    __code_time_volumn_dict = {}
    __max_buy1_volumn_cache = {}
    __instance = None
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(THSBuy1VolumnManager, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "max_buy1_volumn-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                tool.CodeDataCacheUtil.set_cache(cls.__max_buy1_volumn_cache, code, int(val))
        finally:
            RedisUtils.realse(__redis)
    # 保存最大量
    def __save_max_buy1_volume(self, code, volume):
        tool.CodeDataCacheUtil.set_cache(self.__max_buy1_volumn_cache, code, volume)
        key = "max_buy1_volumn-{}".format(code)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), volume)
        RedisUtils.setex_async(self.__db, key, tool.get_expire(), volume)
    def __get_max_buy1_volume(self, code):
        key = "max_buy1_volumn-{}".format(code)
@@ -30,11 +54,11 @@
        return None
    def __del_max_buy1_volume(self, code):
        tool.CodeDataCacheUtil.clear_cache(self.__max_buy1_volumn_cache, code)
        key = "max_buy1_volumn-{}".format(code)
        val = RedisUtils.delete(self.__get_redis(), key)
        RedisUtils.delete_async(self.__db, key)
    def __save_recod(self, code, time_str, volumn):
        # 保存每一次的
        key = "buy1_volumn-{}-{}".format(code, time_str)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), volumn)
@@ -67,7 +91,7 @@
    # 添加记录
    def __add_recod(self, code):
        key = "buy1_volumn_codes"
        RedisUtils.sadd( self.__get_redis(), key, code)
        RedisUtils.sadd(self.__get_redis(), key, code)
        RedisUtils.expire(self.__get_redis(), key, 10)
    # 获取当前正在监听的代码
@@ -160,9 +184,14 @@
            return -1
        return val
    def get_max_buy1_volume_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__max_buy1_volumn_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return -1
    def clear_max_buy1_volume(self, code):
        self.__del_max_buy1_volume(code)
class JueJinBuy1VolumnManager:
@@ -209,8 +238,6 @@
        return time_str, volumn
class thsl2tradequeuemanager:
    __redisManager = redis_manager.RedisManager(0)
    __filter_dict = {}
@@ -234,7 +261,7 @@
    def __add_buy1_code(self, code):
        key = "buy1_volumn_codes"
        RedisUtils.sadd( self.__get_redis(), key, code)
        RedisUtils.sadd(self.__get_redis(), key, code)
        RedisUtils.expire(self.__get_redis(), key, 10)
        # 获取当前正在监听的代码
utils/data_export_util.py
@@ -203,7 +203,7 @@
    num_operate_map = {}
    l2.l2_data_util.load_num_operate_map(num_operate_map, code, datas)
    for progress in progresses:
        deal_big_money_manager.set_trade_progress(code, progress, datas, num_operate_map[code])
        deal_big_money_manager.DealComputeProgressManager().set_trade_progress(code, progress, datas, num_operate_map[code])
if __name__ == "__main__":