Administrator
2023-08-07 e8ea6340db725a3b36a78e090fc6f97a90900264
redis批量提交数据
11个文件已修改
147 ■■■■ 已修改文件
db/redis_manager.py 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/safe_count_manager.py 36 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/code_info_output.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
third_data/data_server.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/trade_api_server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 25 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_queue_manager.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
db/redis_manager.py
@@ -54,15 +54,22 @@
                pass
    @classmethod
    def delete(cls, redis_, key, auto_free=True):
    def delete(cls, redis_, key, auto_free=True, _async=False):
        __start_time = time.time()
        try:
            return redis_.delete(key)
        finally:
            logger_redis_debug.info("delete({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if _async:
                logger_redis_debug.info("delete_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            else:
                logger_redis_debug.info("delete({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
    @classmethod
    def delete_async(cls, redis_, key, auto_free=True):
        Thread(target=lambda: cls.delete(redis_, key, auto_free,_async=True)).start()
    @classmethod
    def keys(cls, redis_, key, auto_free=True):
@@ -88,19 +95,22 @@
                pass
    @classmethod
    def setex(cls, redis_, key, expire, val, auto_free=True):
    def setex(cls, redis_, key, expire, val, auto_free=True, _async = False):
        __start_time = time.time()
        try:
            return redis_.setex(key, expire, val)
        finally:
            logger_redis_debug.info("setex({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if _async:
                logger_redis_debug.info("setex_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            else:
                logger_redis_debug.info("setex({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
    @classmethod
    def setex_async(cls, redis_, key, expire, val, auto_free=True):
        Thread(target=lambda: cls.setex(redis_, key, expire, val, auto_free)).start()
        Thread(target=lambda: cls.setex(redis_, key, expire, val, auto_free,_async=True)).start()
    @classmethod
    def setnx(cls, redis_, key, val, auto_free=True):
@@ -169,17 +179,24 @@
                pass
    @classmethod
    def incrby(cls, redis_, key, num, auto_free=True):
    def incrby(cls, redis_, key, num, auto_free=True, _async=False):
        __start_time = time.time()
        try:
            return redis_.incrby(key, num)
        finally:
            logger_redis_debug.info("incrby({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if _async:
                logger_redis_debug.info("incrby_async({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            else:
                logger_redis_debug.info("incrby({}):{}", round((time.time() - __start_time) * 1000, 3), key)
            if auto_free:
                # redis_.connection_pool.disconnect()
                pass
    @classmethod
    def incrby_async(cls, redis_, key, num, auto_free=True):
        Thread(target=lambda: cls.incrby(redis_, key, num, auto_free)).start()
    @classmethod
    def lpush(cls, redis_, key, val, auto_free=True):
        __start_time = time.time()
        try:
l2/cancel_buy_strategy.py
@@ -66,7 +66,7 @@
        CodeDataCacheUtil.clear_cache(cls.__s_big_num_cancel_compute_data_cache, code)
        ks = ["s_big_num_cancel_compute_data-{}".format(code)]
        for key in ks:
            RedisUtils.delete(cls.__get_redis(), key)
            RedisUtils.delete_async(cls.__get_redis(), key)
    @classmethod
    def clear_data(cls):
@@ -762,6 +762,11 @@
        RedisUtils.setex(cls.__get_redis(), f"d_cancel_real_order_index-{code}", tool.get_expire(), f"{index}")
    @classmethod
    def __del_real_order_index(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_real_order_index_cache, code)
        RedisUtils.delete_async(cls.__get_redis(), f"d_cancel_real_order_index-{code}")
    @classmethod
    def __get_real_order_index(cls, code):
        val = RedisUtils.get(cls.__get_redis(), f"d_cancel_real_order_index-{code}")
        if val:
@@ -780,14 +785,13 @@
    @classmethod
    def clear(cls, code=None):
        if code:
            RedisUtils.delete(cls.__get_redis(), f"d_cancel_real_order_index-{code}")
            cls.__del_real_order_index(code)
        else:
            keys = RedisUtils.keys(cls.__get_redis(), "d_cancel_real_order_index-*")
            if keys:
                for k in keys:
                    code = k.replace("d_cancel_real_order_index-", "")
                    CodeDataCacheUtil.clear_cache(cls.__cancel_real_order_index_cache, code)
                    RedisUtils.delete(cls.__get_redis(), k)
                    cls.__del_real_order_index(code)
    # 设置成交位
    @classmethod
@@ -878,7 +882,7 @@
    @classmethod
    def del_watch_index(cls, code):
        CodeDataCacheUtil.clear_cache(cls.__cancel_watch_index_cache, code)
        RedisUtils.delete(cls.__get_redis(), f"l_cancel_watch_index-{code}")
        RedisUtils.delete_async(cls.__get_redis(), f"l_cancel_watch_index-{code}")
    @classmethod
    def clear(cls, code=None):
l2/l2_data_manager.py
@@ -42,7 +42,7 @@
    @staticmethod
    def delete_buy_point(code):
        CodeDataCacheUtil.clear_cache(TradePointManager.__buy_compute_index_info_cache, code)
        RedisUtils.delete(TradePointManager.__get_redis(), "buy_compute_index_info-{}".format(code))
        RedisUtils.delete_async(TradePointManager.__get_redis(), "buy_compute_index_info-{}".format(code))
    # 获取买入点信息
    # 返回数据为:买入点 累计纯买额 已经计算的数据索引
@@ -110,7 +110,7 @@
    # 删除买撤点数据
    @classmethod
    def delete_buy_cancel_point(cls, code):
        RedisUtils.delete(TradePointManager.__get_redis(), "buy_cancel_single_pos-{}".format(code))
        RedisUtils.delete_async(TradePointManager.__get_redis(), "buy_cancel_single_pos-{}".format(code))
    # 设置买撤纯买额
    @classmethod
@@ -131,7 +131,7 @@
    @classmethod
    def delete_compute_info_for_cancel_buy(cls, code):
        RedisUtils.delete(TradePointManager.__get_redis(), "compute_info_for_cancel_buy-{}".format(code))
        RedisUtils.delete_async(TradePointManager.__get_redis(), "compute_info_for_cancel_buy-{}".format(code))
    # 从买入信号开始设置涨停买与涨停撤的单数
    @classmethod
@@ -153,7 +153,7 @@
    @classmethod
    def delete_count_info_for_cancel_buy(cls, code):
        RedisUtils.delete(TradePointManager.__get_redis(), "count_info_for_cancel_buy-{}".format(code))
        RedisUtils.delete_async(TradePointManager.__get_redis(), "count_info_for_cancel_buy-{}".format(code))
# 清除l2数据
l2/l2_data_manager_new.py
@@ -57,6 +57,7 @@
# m值大单处理
m_big_money_begin_cache={}
class L2BigNumForMProcessor:
    def __init__(self):
@@ -67,7 +68,8 @@
    # 保存计算开始位置
    def set_begin_pos(self, code, index):
        if self.__get_begin_pos(code) is None:
        if self.__get_begin_pos_cache(code) is None:
            tool.CodeDataCacheUtil.set_cache(m_big_money_begin_cache, code, index)
            # 保存位置
            key = "m_big_money_begin-{}".format(code)
            RedisUtils.setex(self.__get_redis(), key, tool.get_expire(), index)
@@ -79,6 +81,15 @@
        if val is None:
            return None
        return int(val)
    def __get_begin_pos_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(m_big_money_begin_cache,code)
        if cache_result[0]:
            return cache_result[1]
        val =   self.__get_begin_pos(code)
        tool.CodeDataCacheUtil.set_cache(m_big_money_begin_cache,code,val)
        return val
    # 清除已经处理的数据
    def clear_processed_end_index(self, code):
@@ -101,7 +112,7 @@
    # 处理大单
    def process(self, code, start_index, end_index, limit_up_price):
        begin_pos = self.__get_begin_pos(code)
        begin_pos = self.__get_begin_pos_cache(code)
        if begin_pos is None:
            # 没有获取到开始买入信号
            return
@@ -320,7 +331,7 @@
            # 时间差不能太大才能处理
            if not l2_trade_util.is_in_forbidden_trade_codes(code):
                # 判断是否已经挂单
                state = trade_manager.get_trade_state(code)
                state = trade_manager.get_trade_state_cache(code)
                start_index = len(total_datas) - len(add_datas)
                end_index = len(total_datas) - 1
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
l2/safe_count_manager.py
@@ -11,6 +11,9 @@
from utils import tool
from l2.l2_data_util import L2DataUtil
latest_place_order_info_cache = {}
safe_count_l2_cache = {}
class BuyL2SafeCountManager(object):
    __redis_manager = redis_manager.RedisManager(0)
@@ -24,8 +27,10 @@
    # 记录每一次的处理进度
    def __save_compute_progress(self, code, last_buy_single_index, process_index, buy_num, cancel_num):
        key = "safe_count_l2-{}-{}".format(code, last_buy_single_index)
        RedisUtils.setex(self.__getRedis(), key, tool.get_expire(),
                         json.dumps((last_buy_single_index, process_index, buy_num, cancel_num)))
        tool.CodeDataCacheUtil.set_cache(safe_count_l2_cache, f"{code}-{last_buy_single_index}",
                                         (last_buy_single_index, process_index, buy_num, cancel_num))
        RedisUtils.setex_async(self.__getRedis(), key, tool.get_expire(),
                               json.dumps((last_buy_single_index, process_index, buy_num, cancel_num)))
    # 返回数据与更新时间
    def __get_compute_progress(self, code, last_buy_single_index):
@@ -36,8 +41,18 @@
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    def __get_compute_progress_cache(self, code, last_buy_single_index):
        cache_result = tool.CodeDataCacheUtil.get_cache(safe_count_l2_cache, f"{code}-{last_buy_single_index}")
        if cache_result[0]:
            return cache_result[1]
        val = self.__get_compute_progress(code, last_buy_single_index)
        tool.CodeDataCacheUtil.set_cache(safe_count_l2_cache, f"{code}-{last_buy_single_index}", val)
        return val
    # 保存最近的下单信息
    def __save_latest_place_order_info(self, code, buy_single_index, buy_exec_index, cancel_index):
        tool.CodeDataCacheUtil.set_cache(latest_place_order_info_cache, code,
                                         (buy_single_index, buy_exec_index, cancel_index))
        key = "latest_place_order_info-{}".format(code)
        RedisUtils.setex(self.__getRedis(), key, tool.get_expire(),
                         json.dumps((buy_single_index, buy_exec_index, cancel_index)))
@@ -49,6 +64,14 @@
            return None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2]
    def __get_latest_place_order_info_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(latest_place_order_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        val = self.__get_latest_place_order_info(code)
        tool.CodeDataCacheUtil.set_cache(latest_place_order_info_cache, code, val)
        return val
    def __get_all_compute_progress(self, code):
        key_regex = f"safe_count_l2-{code}-*"
@@ -66,8 +89,9 @@
        for k in keys:
            RedisUtils.delete(self.__getRedis(), k)
        tool.CodeDataCacheUtil.clear_cache(latest_place_order_info_cache, code)
        key = f"latest_place_order_info-{code}"
        RedisUtils.delete(self.__getRedis(), key)
        RedisUtils.delete_async(self.__getRedis(), key)
    # 获取基础的安全笔数
    def __get_base_save_count(self, code, is_first):
@@ -98,15 +122,15 @@
    # end_index 数据结束位置
    def compute_left_rate(self, code, start_index, end_index, total_datas,
                          local_today_num_operate_map):
        last_buy_single_index, buy_exec_index, cancel_index = self.__get_latest_place_order_info(code)
        last_buy_single_index, buy_exec_index, cancel_index = self.__get_latest_place_order_info_cache(code)
        if last_buy_single_index is None:
            return
        cancel_time = None
        if cancel_index is not None:
            cancel_time = total_datas[cancel_index]["val"]["time"]
        # 获取处理的进度
        last_buy_single_index_, process_index, buy_num, cancel_num = self.__get_compute_progress(code,
                                                                                                 last_buy_single_index)
        last_buy_single_index_, process_index, buy_num, cancel_num = self.__get_compute_progress_cache(code,
                                                                                                       last_buy_single_index)
        break_index = -1
        for i in range(start_index, end_index):
output/code_info_output.py
@@ -345,7 +345,7 @@
                                                "money": round(data['val']['num'] * float(
                                                    data['val']['price']) * 100 / 10000, 1)}
        params["trade_data"]["trade_state"] = {}
        trade_state = trade_manager.get_trade_state(code)
        trade_state = trade_manager.get_trade_state_cache(code)
        if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED:
            params["trade_data"]["trade_state"]["order"] = True
            params["trade_data"]["trade_state"]["desc"] = "已下单"
server.py
@@ -314,7 +314,7 @@
                                apply_time = item["apply_time"]
                                if apply_time and len(apply_time) >= 8:
                                    code = item["code"]
                                    trade_state = trade_manager.get_trade_state(code)
                                    trade_state = trade_manager.get_trade_state_cache(code)
                                    # 设置下单状态的代码为已委托
                                    if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                                        origin_apply_time = apply_time
@@ -634,7 +634,7 @@
                    data = json.loads(_str)
                    code = data["data"]["code"]
                    if code:
                        state = trade_manager.get_trade_state(code)
                        state = trade_manager.get_trade_state_cache(code)
                        if state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_CANCEL_ING:
                            try:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销")
@@ -811,7 +811,7 @@
                    data = json.loads(_str)
                    codes = data["data"]["codes"]
                    code = codes[0]
                    state = trade_manager.get_trade_state(code)
                    state = trade_manager.get_trade_state_cache(code)
                    if state != trade_manager.TRADE_STATE_BUY_CANCEL_SUCCESS and state != trade_manager.TRADE_STATE_BUY_SUCCESS:
                        return_str = json.dumps({"code": 0, "msg": "可以取消"})
                    else:
third_data/data_server.py
@@ -378,7 +378,7 @@
            for code_info in codes_info:
                code_info[4] = 1 if code_info[0] in want_codes else 0
                # 获取代码状态
                if trade_manager.get_trade_state(code_info[0]) != trade_manager.TRADE_STATE_NOT_TRADE:
                if trade_manager.get_trade_state_cache(code_info[0]) != trade_manager.TRADE_STATE_NOT_TRADE:
                    code_info[5] = 1
            response_data = json.dumps({"code": 0, "data": codes_info})
@@ -391,7 +391,7 @@
                    l2_data_util.load_l2_data(code)
                    total_datas = l2_data_util.local_today_datas.get(code)
                trade_state = trade_manager.get_trade_state(code)
                trade_state = trade_manager.get_trade_state_cache(code)
                if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or trade_state == trade_manager.TRADE_STATE_BUY_DELEGATED or trade_state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    hcancel_datas_dict, cancel_indexes_set = HourCancelBigNumComputer.get_watch_index_dict(code)
                    # 根据日志读取实时的计算数据
trade/huaxin/trade_api_server.py
@@ -153,7 +153,7 @@
                                raise Exception(result["msg"])
                        elif code:
                            state = trade_manager.get_trade_state(code)
                            state = trade_manager.get_trade_state_cache(code)
                            if state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_CANCEL_ING:
                                try:
                                    l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, "手动撤销")
trade/trade_manager.py
@@ -172,6 +172,9 @@
        RedisUtils.delete(self.__get_redis(), self.__key)
__trade_state_cache = {}
# 获取交易状态
def get_trade_state(code):
    state = RedisUtils.get(__redis_manager.getRedis(), "trade-state-{}".format(code))
@@ -180,10 +183,20 @@
    return int(state)
def get_trade_state_cache(code):
    cache_result = tool.CodeDataCacheUtil.get_cache(__trade_state_cache, code)
    if cache_result[0]:
        return cache_result[1]
    val = get_trade_state(code)
    tool.CodeDataCacheUtil.set_cache(__trade_state_cache, code, val)
    return val
# 设置交易状态
def set_trade_state(code, state):
    logger_trade.info("set_trade_state {}-{}".format(code, state))
    RedisUtils.setex(__redis_manager.getRedis(), "trade-state-{}".format(code), tool.get_expire(), state)
    tool.CodeDataCacheUtil.set_cache(__trade_state_cache, code, state)
    RedisUtils.setex_async(__redis_manager.getRedis(), "trade-state-{}".format(code), tool.get_expire(), state)
def get_codes_by_trade_state(state):
@@ -326,7 +339,7 @@
    @dask.delayed
    def is_state_right(code):
        trade_state = get_trade_state(code)
        trade_state = get_trade_state_cache(code)
        if trade_state != TRADE_STATE_NOT_TRADE and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS and trade_state != TRADE_STATE_BUY_CANCEL_ING:
            return Exception("代码处于不可交易状态"), trade_state
        return None, trade_state
@@ -439,7 +452,7 @@
# 开始取消买入
def start_cancel_buy(code, force=False):
    trade_state = get_trade_state(code)
    trade_state = get_trade_state_cache(code)
    if trade_state == TRADE_STATE_BUY_SUCCESS:
        return None
    if not force:
@@ -477,7 +490,7 @@
    time.sleep(0.02)
    for i in range(0, 5):
        # 如果时
        trade_state = get_trade_state(code)
        trade_state = get_trade_state_cache(code)
        if trade_state != TRADE_STATE_BUY_CANCEL_ING and trade_state != TRADE_STATE_BUY_CANCEL_SUCCESS:
            return
        try:
@@ -512,7 +525,7 @@
        # 买入成功
        if code is not None and int(data["type"]) == 0:
            l2_trade_util.forbidden_trade(code)
            state = get_trade_state(code)
            state = get_trade_state_cache(code)
            if state != TRADE_STATE_BUY_SUCCESS:
                set_trade_state(code, TRADE_STATE_BUY_SUCCESS)
                # 删除买撤记录的临时信息
@@ -544,7 +557,7 @@
        code = data["code"]
        if code is not None:
            codes.append(code)
            trade_state = get_trade_state(code)
            trade_state = get_trade_state_cache(code)
            # 设置下单状态的代码为已委托
            if trade_state == TRADE_STATE_BUY_PLACE_ORDER:
                set_trade_state(code, TRADE_STATE_BUY_DELEGATED)
trade/trade_queue_manager.py
@@ -126,7 +126,7 @@
        self.__save_recod(code, time_str, volumn)
        # 如果当前已挂单
        state = trade_manager.get_trade_state(code)
        state = trade_manager.get_trade_state_cache(code)
        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
            # 判断本次与上一次的封单额是否小于5000w
            limit_up_price = gpcode_manager.get_limit_up_price(code)