Administrator
2023-08-07 f366d71c79fa6a17ecc6d994ad6f57e645b84209
redis异步数据提交
12个文件已修改
326 ■■■■■ 已修改文件
db/redis_manager.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/code_info_output.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/code_plate_key_manager.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/data_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_data_update.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_api_server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 183 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_queue_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager.py
@@ -88,8 +88,21 @@
        return cls.exec("expire", key, lambda: redis_.expire(key, expire))
    @classmethod
    def expire_async(cls, db, key, expire, auto_free=True):
        __start_time = time.time()
        cls.add_async_task(db, "expire", (key, expire))
        logger_redis_debug.info("expire_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
    @classmethod
    def sadd(cls, redis_, key, val, auto_free=True):
        return cls.exec("sadd", key, lambda: redis_.sadd(key, val))
    @classmethod
    def sadd_async(cls, db, key, val, auto_free=True):
        __start_time = time.time()
        cls.add_async_task(db, "sadd", (key, val))
        logger_redis_debug.info("sadd_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
    @classmethod
    def sismember(cls, redis_, key, val, auto_free=True):
@@ -104,6 +117,12 @@
        return cls.exec("srem", key, lambda: redis_.srem(key, val))
    @classmethod
    def srem_async(cls, db, key, val, auto_free=True):
        __start_time = time.time()
        cls.add_async_task(db, "srem", (key, val))
        logger_redis_debug.info("srem_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
    @classmethod
    def incrby(cls, redis_, key, num, auto_free=True, _async=False):
        return cls.exec("incrby", key, lambda: redis_.incrby(key, num))
gui.py
@@ -723,7 +723,7 @@
                time.sleep(1)
        def refresh_data():
            money = trade_manager.get_available_money()
            money = trade_manager.AccountAvailableMoneyManager().get_available_money()
            if money is not None:
                sv_trade_money.set(money)
            else:
l2/cancel_buy_strategy.py
@@ -42,7 +42,8 @@
        CodeDataCacheUtil.set_cache(cls.__s_big_num_cancel_compute_data_cache, code,
                                    (process_index, buy_num, cancel_num))
        key = "s_big_num_cancel_compute_data-{}".format(code)
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(), json.dumps((process_index, buy_num, cancel_num)))
        RedisUtils.setex_async(cls.__get_redis(), key, tool.get_expire(),
                               json.dumps((process_index, buy_num, cancel_num)))
    @classmethod
    def __get_compute_data(cls, code):
@@ -250,6 +251,7 @@
# --------------------------------H撤-------------------------------
class HourCancelBigNumComputer:
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __tradeBuyQueue = TradeBuyQueue()
    __buyL2SafeCountManager = BuyL2SafeCountManager()
@@ -270,7 +272,8 @@
    def __save_watch_index_set(cls, code, datas, process_index, finish):
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_cache, code, (list(datas), process_index, finish))
        key = f"h_cancel_watch_indexs-{code}"
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(), json.dumps((list(datas), process_index, finish)))
        RedisUtils.setex_async(cls.__get_redis(), key, tool.get_expire(),
                               json.dumps((list(datas), process_index, finish)))
    # 保存成交进度
    @classmethod
@@ -297,8 +300,8 @@
        CodeDataCacheUtil.set_cache(cls.__cancel_watch_indexs_exec_cache, code,
                                    (list(datas), process_index, total_count, big_num_count, finished))
        key = f"h_cancel_watch_indexs_exec-{code}"
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(),
                         json.dumps((list(datas), process_index, total_count, big_num_count, finished)))
        RedisUtils.setex_async(cls.__get_redis(), key, tool.get_expire(),
                               json.dumps((list(datas), process_index, total_count, big_num_count, finished)))
    # 保存成交进度
    @classmethod
@@ -321,11 +324,14 @@
    # 保存已经撤单的监听位置
    @classmethod
    def __add_watch_canceled_index(cls, redis, code, index):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_canceled_indexs_cache, code)
    def __add_watch_canceled_indexes(cls, code, indexes):
        if code not in cls.__cancel_watch_canceled_indexs_cache:
            cls.__cancel_watch_canceled_indexs_cache[code] = set()
        key = f"h_cancel_watch_canceled_indexs-{code}"
        RedisUtils.sadd(redis, key, index)
        RedisUtils.expire(redis, key, tool.get_expire())
        for index in indexes:
            cls.__cancel_watch_canceled_indexs_cache[code].add(index)
            RedisUtils.sadd_async(cls.__db, key, index)
        RedisUtils.expire_async(cls.__db, key, tool.get_expire())
    @classmethod
    def __get_watch_canceled_index(cls, code):
@@ -353,8 +359,8 @@
        CodeDataCacheUtil.set_cache(cls.__cancel_traded_progress_cache, code,
                                    (origin_process_index, latest_process_index))
        key = "h_cancel_traded_progress-{}".format(code)
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(),
                         json.dumps((origin_process_index, latest_process_index)))
        RedisUtils.setex_async(cls.__get_redis(), key, tool.get_expire(),
                               json.dumps((origin_process_index, latest_process_index)))
    @classmethod
    def __get_traded_progress(cls, code):
@@ -380,7 +386,7 @@
        CodeDataCacheUtil.set_cache(cls.__cancel_compute_data_cache, code,
                                    (process_index, cancel_num))
        key = "h_cancel_compute_data-{}".format(code)
        RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(), json.dumps((process_index, cancel_num)))
        RedisUtils.setex_async(cls.__get_redis(), key, tool.get_expire(), json.dumps((process_index, cancel_num)))
    @classmethod
    def __get_compute_data(cls, code):
@@ -489,7 +495,7 @@
                cancel_num += total_data[nx]["re"] * num
        try:
            pipe = cls.__get_redis().pipeline()
            temp_watch_canceled_index = set()
            for i in range(start_index, end_index + 1):
                if i <= processed_index:
                    # 已经处理过了
@@ -505,7 +511,7 @@
                        has_watch_canceled = True
                        cancel_num += data["re"] * val["num"]
                        # 加入
                        cls.__add_watch_canceled_index(pipe, code, f"{buy_index}-{val['num']}")
                        temp_watch_canceled_index.add(f"{buy_index}-{val['num']}")
                        rate__ = round(cancel_num / total_nums, 4)
                        if rate__ > cancel_rate_threshold:
                            indexs__ = list(watch_indexs_dict.keys())
@@ -515,9 +521,9 @@
                                                len(watch_indexs_dict.keys()))
                            l2_log.trade_record(code, "H撤", "'index':{} , 'rate':{} ,'target_rate':{}", i, rate__,
                                                cancel_rate_threshold)
                            pipe.execute()
                            cls.__add_watch_canceled_indexes(code, temp_watch_canceled_index)
                            return True, data
            pipe.execute()
            cls.__add_watch_canceled_indexes(code, temp_watch_canceled_index)
            rate__ = round(cancel_num / total_nums, 4)
            if rate__ > cancel_rate_threshold:
@@ -761,7 +767,7 @@
    @classmethod
    def __set_real_order_index(cls, code, index):
        CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, index)
        RedisUtils.setex(cls.__get_redis(), f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}")
        RedisUtils.setex_async(cls.__get_redis(), f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}")
    @classmethod
    def __del_real_order_index(cls, code):
@@ -858,15 +864,24 @@
        return cls.__redis_manager.getRedis()
    @classmethod
    def __add_watch_index(cls, redis, code, index):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_index_cache, code)
        RedisUtils.sadd(redis, f"l_cancel_watch_index-{code}", index)
        RedisUtils.expire(redis, f"l_cancel_watch_index-{code}", tool.get_expire())
    def __add_watch_indexes(cls, code, indexes):
        if not indexes:
            return
        if code not in cls.__cancel_watch_index_cache:
            cls.__cancel_watch_index_cache[code] = set()
        for index in indexes:
            cls.__cancel_watch_index_cache[code].add(index)
            RedisUtils.sadd_async(cls.__db, f"l_cancel_watch_index-{code}", index)
        RedisUtils.expire_async(cls.__db, f"l_cancel_watch_index-{code}", tool.get_expire())
    @classmethod
    def __del_watch_index(cls, pipe, code, index):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_index_cache, code)
        RedisUtils.srem(pipe, f"l_cancel_watch_index-{code}", index)
    def __del_watch_indexes(cls, code, indexes):
        if not indexes:
            return
        for index in indexes:
            if code in cls.__cancel_watch_index_cache:
                cls.__cancel_watch_index_cache[code].discard(index)
            RedisUtils.srem_async(cls.__db, f"l_cancel_watch_index-{code}", index)
    @classmethod
    def __get_watch_indexes(cls, code):
@@ -932,13 +947,8 @@
        # 数据维护
        add_indexes = watch_indexes - old_watch_indexes
        delete_indexes = old_watch_indexes - watch_indexes
        pipe = cls.__get_redis().pipeline()
        for i in add_indexes:
            cls.__add_watch_index(pipe, code, i)
        for i in delete_indexes:
            cls.__del_watch_index(pipe, code, i)
        if add_indexes or delete_indexes:
            pipe.execute()
        cls.__add_watch_indexes(code, add_indexes)
        cls.__del_watch_indexes(code, delete_indexes)
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index, total_data, local_today_num_operate_map,
l2/l2_data_manager_new.py
@@ -57,8 +57,10 @@
# m值大单处理
m_big_money_begin_cache={}
m_big_money_process_index_cache={}
m_big_money_begin_cache = {}
m_big_money_process_index_cache = {}
class L2BigNumForMProcessor:
    def __init__(self):
@@ -73,24 +75,23 @@
            tool.CodeDataCacheUtil.set_cache(m_big_money_begin_cache, code, index)
            # 保存位置
            key = "m_big_money_begin-{}".format(code)
            RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), index)
            RedisUtils.setex_async(self.__get_redis(), key, tool.get_expire(), index)
    # 获取计算开始位置
    def __get_begin_pos(self, code):
        key = "m_big_money_begin-{}".format(code)
        val = RedisUtils.get(self.__get_redis(),key)
        val = RedisUtils.get(self.__get_redis(), key)
        if val is None:
            return None
        return int(val)
    def __get_begin_pos_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(m_big_money_begin_cache,code)
        cache_result = tool.CodeDataCacheUtil.get_cache(m_big_money_begin_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val =   self.__get_begin_pos(code)
        tool.CodeDataCacheUtil.set_cache(m_big_money_begin_cache,code,val)
        val = self.__get_begin_pos(code)
        tool.CodeDataCacheUtil.set_cache(m_big_money_begin_cache, code, val)
        return val
    # 清除已经处理的数据
    def clear_processed_end_index(self, code):
@@ -100,9 +101,9 @@
    # 添加已经处理过的单
    def __set_processed_end_index(self, code, index):
        tool.CodeDataCacheUtil.set_cache(m_big_money_process_index_cache,code,index)
        tool.CodeDataCacheUtil.set_cache(m_big_money_process_index_cache, code, index)
        key = "m_big_money_process_index-{}".format(code)
        RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), index)
        RedisUtils.setex_async(self.__get_redis(), key, tool.get_expire(), index)
    # 是否已经处理过
    def __get_processed_end_index(self, code):
@@ -119,7 +120,6 @@
        val = self.__get_processed_end_index(code)
        tool.CodeDataCacheUtil.set_cache(m_big_money_process_index_cache, code, val)
        return val
    # 处理大单
    def process(self, code, start_index, end_index, limit_up_price):
@@ -343,7 +343,7 @@
            # 时间差不能太大才能处理
            if not l2_trade_util.is_in_forbidden_trade_codes(code):
                # 判断是否已经挂单
                state = trade_manager.get_trade_state_cache(code)
                state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
                start_index = len(total_datas) - len(add_datas)
                end_index = len(total_datas) - 1
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
@@ -828,7 +828,9 @@
                for i in range(trade_index + 1, total_data[-1]["index"] + 1):
                    if L2DataUtil.is_limit_up_price_buy(total_data[i]["val"]):
                        left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count(code,
                                                                                                              total_data[i]["index"],
                                                                                                              total_data[
                                                                                                                  i][
                                                                                                                  "index"],
                                                                                                              total_data,
                                                                                                              num_operate_map)
                        if left_count > 0:
output/code_info_output.py
@@ -345,7 +345,7 @@
                                                "money": round(data['val']['num'] * float(
                                                    data['val']['price']) * 100 / 10000, 1)}
        params["trade_data"]["trade_state"] = {}
        trade_state = trade_manager.get_trade_state_cache(code)
        trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
        if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED:
            params["trade_data"]["trade_state"]["order"] = True
            params["trade_data"]["trade_state"]["desc"] = "已下单"
server.py
@@ -314,7 +314,7 @@
                                apply_time = item["apply_time"]
                                if apply_time and len(apply_time) >= 8:
                                    code = item["code"]
                                    trade_state = trade_manager.get_trade_state_cache(code)
                                    trade_state = trade_manager.CodesTradeStateManager().get_trade_state(code)
                                    # 设置下单状态的代码为已委托
                                    if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                                        origin_apply_time = apply_time
@@ -361,7 +361,7 @@
                    client = datas["client"]
                    money = datas["money"]
                    # TODO存入缓存文件
                    trade_manager.set_available_money(client, money)
                    trade_manager.AccountAvailableMoneyManager().set_available_money(client, money)
                # l2交易队列
                elif type == 10:
                    # 可用金额
@@ -634,7 +634,7 @@
                    data = json.loads(_str)
                    code = data["data"]["code"]
                    if code:
                        state = trade_manager.get_trade_state_cache(code)
                        state = trade_manager.CodesTradeStateManager().get_trade_state(code)
                        if state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_CANCEL_ING:
                            try:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销")
@@ -811,7 +811,7 @@
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    code = codes[0]
                    state = trade_manager.get_trade_state_cache(code)
                    state = trade_manager.CodesTradeStateManager().get_trade_state(code)
                    if state != trade_manager.TRADE_STATE_BUY_CANCEL_SUCCESS and state != trade_manager.TRADE_STATE_BUY_SUCCESS:
                        return_str = json.dumps({"code": 0, "msg": "可以取消"})
                    else:
third_data/code_plate_key_manager.py
@@ -538,9 +538,9 @@
        # ---------------------------------加载已经下单/成交的代码信息------------start-------------
        # match_reasons = match_limit_up_result.keys()
        # 判断匹配到的原因是否已经有下单/买入成功的代码
        codes_delegate = set(trade_manager.get_codes_by_trade_states(
        codes_delegate = set(trade_manager.CodesTradeStateManager().get_codes_by_trade_states_cache(
            {trade_manager.TRADE_STATE_BUY_DELEGATED, trade_manager.TRADE_STATE_BUY_PLACE_ORDER}))
        codes_success = set(trade_manager.get_codes_by_trade_states(
        codes_success = set(trade_manager.CodesTradeStateManager().get_codes_by_trade_states_cache(
            {trade_manager.TRADE_STATE_BUY_SUCCESS}))
        codes = codes_delegate | codes_success
third_data/data_server.py
@@ -378,7 +378,7 @@
            for code_info in codes_info:
                code_info[4] = 1 if code_info[0] in want_codes else 0
                # 获取代码状态
                if trade_manager.get_trade_state_cache(code_info[0]) != trade_manager.TRADE_STATE_NOT_TRADE:
                if trade_manager.CodesTradeStateManager().get_trade_state_cache(code_info[0]) != trade_manager.TRADE_STATE_NOT_TRADE:
                    code_info[5] = 1
            response_data = json.dumps({"code": 0, "data": codes_info})
@@ -391,7 +391,7 @@
                    l2_data_util.load_l2_data(code)
                    total_datas = l2_data_util.local_today_datas.get(code)
                trade_state = trade_manager.get_trade_state_cache(code)
                trade_state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
                if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED or trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer.get_watch_index_dict(code)
                    # 根据日志读取实时的计算数据
trade/huaxin/huaxin_trade_data_update.py
@@ -52,7 +52,7 @@
                            if data:
                                usefulMoney = data[0]["usefulMoney"]
                                # 设置可用资金
                                trade_manager.set_available_money(0, usefulMoney)
                                trade_manager.AccountAvailableMoneyManager().set_available_money(0, usefulMoney)
                            # 设置可用资金
                    elif type_ == "deal_list":
                        dataJSON = huaxin_trade_api.get_deal_list()
trade/huaxin/trade_api_server.py
@@ -153,7 +153,7 @@
                                raise Exception(result["msg"])
                        elif code:
                            state = trade_manager.get_trade_state_cache(code)
                            state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
                            if state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_CANCEL_ING:
                                try:
                                    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销")
trade/trade_manager.py
@@ -173,72 +173,121 @@
        RedisUtils.delete(self.__get_redis(), self.__key)
__trade_state_cache = {}
# 代码的交易状态管理
class CodesTradeStateManager:
    __trade_state_cache = {}
    __db = 2
    __redis_manager = redis_manager.RedisManager(2)
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(CodesTradeStateManager, cls).__new__(cls, *args, **kwargs)
            __redis = cls.__get_redis()
            # 初始化数据
            keys = RedisUtils.keys(__redis, "trade-state-*", auto_free=False)
            if keys:
                for key in keys:
                    code = key.replace("trade-state-", '')
                    cls.__trade_state_cache[code] = int(RedisUtils.get(__redis, key, auto_free=False))
# 获取交易状态
def get_trade_state(code):
    state = RedisUtils.get(__redis_manager.getRedis(), "trade-state-{}".format(code))
    if state is None:
        return TRADE_STATE_NOT_TRADE
    return int(state)
        return cls.__instance
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
def get_trade_state_cache(code):
    cache_result = tool.CodeDataCacheUtil.get_cache(__trade_state_cache, code)
    if cache_result[0]:
        return cache_result[1]
    val = get_trade_state(code)
    tool.CodeDataCacheUtil.set_cache(__trade_state_cache, code, val)
    return val
    # 获取交易状态
    def get_trade_state(self, code):
        state = RedisUtils.get(self.__get_redis(), "trade-state-{}".format(code))
        if state is None:
            return TRADE_STATE_NOT_TRADE
        return int(state)
    def get_trade_state_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__trade_state_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = self.get_trade_state(code)
        tool.CodeDataCacheUtil.set_cache(self.__trade_state_cache, code, val)
        return val
# 设置交易状态
def set_trade_state(code, state):
    logger_trade.info("set_trade_state {}-{}".format(code, state))
    tool.CodeDataCacheUtil.set_cache(__trade_state_cache, code, state)
    RedisUtils.setex_async(__db, "trade-state-{}".format(code), tool.get_expire(), state)
    # 设置交易状态
    def set_trade_state(self, code, state):
        logger_trade.info("set_trade_state {}-{}".format(code, state))
        tool.CodeDataCacheUtil.set_cache(self.__trade_state_cache, code, state)
        RedisUtils.setex_async(self.__db, "trade-state-{}".format(code), tool.get_expire(), state)
    def get_codes_by_trade_state(self, state):
        redis = self.__get_redis()
        try:
            keys = RedisUtils.keys(redis, "trade-state-*", auto_free=False)
            codes = []
            if keys is not None:
                for key in keys:
                    if int(RedisUtils.get(redis, key, auto_free=False)) == state:
                        codes.append(key.replace("trade-state-", ''))
            return codes
        finally:
            RedisUtils.realse(redis)
def get_codes_by_trade_state(state):
    redis = __redis_manager.getRedis()
    try:
        keys = RedisUtils.keys(redis, "trade-state-*", auto_free=False)
    def get_codes_by_trade_states(self, states):
        redis = self.__get_redis()
        try:
            keys = RedisUtils.keys(redis, "trade-state-*", auto_free=False)
            codes = []
            if keys is not None:
                for key in keys:
                    if int(RedisUtils.get(redis, key, auto_free=False)) in states:
                        codes.append(key.replace("trade-state-", ''))
            return codes
        finally:
            RedisUtils.realse(redis)
    def get_codes_by_trade_states_cache(self, states):
        # 获取
        codes = []
        if keys is not None:
            for key in keys:
                if int(RedisUtils.get(redis, key, auto_free=False)) == state:
                    codes.append(key.replace("trade-state-", ''))
        for code in self.__trade_state_cache:
            if self.__trade_state_cache[code] in states:
                codes.append(code)
        return codes
    finally:
        RedisUtils.realse(redis)
    # 设置交易账户的可用金额
def get_codes_by_trade_states(states):
    redis = __redis_manager.getRedis()
    try:
        keys = RedisUtils.keys(redis, "trade-state-*", auto_free=False)
        codes = []
        if keys is not None:
            for key in keys:
                if int(RedisUtils.get(redis, key, auto_free=False)) in states:
                    codes.append(key.replace("trade-state-", ''))
        return codes
    finally:
        RedisUtils.realse(redis)
# 账户可用资金管理
class AccountAvailableMoneyManager:
    __db = 2
    __redis_manager = redis_manager.RedisManager(2)
    __available_money_cache = None
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(AccountAvailableMoneyManager, cls).__new__(cls, *args, **kwargs)
            __redis = cls.__get_redis()
            result = RedisUtils.get(cls.__get_redis(), "trade-account-canuse-money")
            if result:
                cls.__available_money_cache = round(float(result), 2)
        return cls.__instance
# 设置交易账户的可用金额
def set_available_money(client_id, money):
    RedisUtils.set(__redis_manager.getRedis(), "trade-account-canuse-money", money)
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def set_available_money(self, client_id, money):
        self.__available_money_cache = round(float(money), 2)
        RedisUtils.set(self.__get_redis(), "trade-account-canuse-money", money)
# 获取交易账户的可用金额
def get_available_money():
    result = RedisUtils.get(__redis_manager.getRedis(), "trade-account-canuse-money")
    if result is None:
        return None
    return round(float(result), 2)
    # 获取交易账户的可用金额
    def get_available_money(self):
        result = RedisUtils.get(self.__get_redis(), "trade-account-canuse-money")
        if result is None:
            return None
        return round(float(result), 2)
    def get_available_money_cache(self):
        return self.__available_money_cache
# 保存交易成功的数据
@@ -340,14 +389,14 @@
    @dask.delayed
    def is_state_right(code):
        trade_state = get_trade_state_cache(code)
        trade_state = CodesTradeStateManager().get_trade_state_cache(code)
        if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
            return Exception("代码处于不可交易状态"), trade_state
        return None, trade_state
    @dask.delayed
    def is_money_enough(code):
        money = get_available_money()
        money = AccountAvailableMoneyManager().get_available_money_cache()
        if money is None:
            return Exception("未获取到账户可用资金"), None
        price = gpcode_manager.get_limit_up_price(code)
@@ -394,7 +443,7 @@
    print("开始买入")
    logger_trade.info("{}开始买入".format(code))
    set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER)
    CodesTradeStateManager().set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER)
    _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "买入判断时间", force=True)
    __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index)
    l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "异步买入时间", force=True)
@@ -448,12 +497,12 @@
def __place_order_fail(code, trade_state):
    print("买入异常")
    # 状态还原
    set_trade_state(code, trade_state)
    CodesTradeStateManager().set_trade_state(code, trade_state)
# 开始取消买入
def start_cancel_buy(code, force=False):
    trade_state = get_trade_state_cache(code)
    trade_state = CodesTradeStateManager().get_trade_state_cache(code)
    if trade_state == TRADE_STATE_BUY_SUCCESS:
        return None
    if not force:
@@ -461,7 +510,7 @@
            return None
    try:
        logger_trade.info("{}开始撤单".format(code))
        set_trade_state(code, TRADE_STATE_BUY_CANCEL_ING)
        CodesTradeStateManager().set_trade_state(code, TRADE_STATE_BUY_CANCEL_ING)
        logger_trade.info("{}撤单方法开始".format(code))
        if constant.API_TRADE_ENABLE:
            if constant.TRADE_WAY == constant.TRADE_WAY_JUEJIN:
@@ -479,7 +528,7 @@
        #     pass
    except Exception as e:
        # 状态还原
        set_trade_state(code, trade_state)
        CodesTradeStateManager().set_trade_state(code, trade_state)
        logger_trade.error("{}撤单异常:{}".format(code, str(e)))
        raise e
    logger_trade.info("{}撤单完毕".format(code))
@@ -491,7 +540,7 @@
    time.sleep(0.02)
    for i in range(0, 5):
        # 如果时
        trade_state = get_trade_state_cache(code)
        trade_state = CodesTradeStateManager().get_trade_state_cache(code)
        if trade_state != TRADE_STATE_BUY_CANCEL_ING and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS:
            return
        try:
@@ -526,9 +575,9 @@
        # 买入成功
        if code is not None and int(data["type"]) == 0:
            l2_trade_util.forbidden_trade(code)
            state = get_trade_state_cache(code)
            state = CodesTradeStateManager().get_trade_state_cache(code)
            if state != TRADE_STATE_BUY_SUCCESS:
                set_trade_state(code, TRADE_STATE_BUY_SUCCESS)
                CodesTradeStateManager().set_trade_state(code, TRADE_STATE_BUY_SUCCESS)
                # 删除买撤记录的临时信息
                kp_client_msg_manager.add_msg(code, "买入成交")
                l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
@@ -558,18 +607,18 @@
        code = data["code"]
        if code is not None:
            codes.append(code)
            trade_state = get_trade_state_cache(code)
            trade_state = CodesTradeStateManager().get_trade_state_cache(code)
            # 设置下单状态的代码为已委托
            if trade_state == TRADE_STATE_BUY_PLACE_ORDER:
                set_trade_state(code, TRADE_STATE_BUY_DELEGATED)
    ing_codes = get_codes_by_trade_state(TRADE_STATE_BUY_CANCEL_ING)
                CodesTradeStateManager().set_trade_state(code, TRADE_STATE_BUY_DELEGATED)
    ing_codes = CodesTradeStateManager().get_codes_by_trade_state(TRADE_STATE_BUY_CANCEL_ING)
    if ing_codes is not None:
        for code in ing_codes:
            if code in codes:
                # 强制重新取消
                start_cancel_buy(code, True)
            else:
                set_trade_state(code, TRADE_STATE_BUY_CANCEL_SUCCESS)
                CodesTradeStateManager().set_trade_state(code, TRADE_STATE_BUY_CANCEL_SUCCESS)
                l2_data_manager.remove_from_l2_fixed_codes(code)
@@ -610,6 +659,6 @@
if __name__ == "__main__":
    price = 5
    r = (5000 // int(round(float(price) * 100))) * 100
    print(r)
    print(CodesTradeStateManager().get_codes_by_trade_states_cache([0, 1]))
    print(CodesTradeStateManager().get_trade_state_cache("002235"))
    print(CodesTradeStateManager().get_trade_state_cache("002235"))
trade/trade_queue_manager.py
@@ -126,7 +126,7 @@
        self.__save_recod(code, time_str, volumn)
        # 如果当前已挂单
        state = trade_manager.get_trade_state_cache(code)
        state = trade_manager.CodesTradeStateManager().get_trade_state_cache(code)
        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
            # 判断本次与上一次的封单额是否小于5000w
            limit_up_price = gpcode_manager.get_limit_up_price(code)