Administrator
2023-01-06 59fba698b03a51a8da5b56a919ebbf94d4784f74
l2_data_manager.py
@@ -9,19 +9,24 @@
from datetime import datetime
import big_money_num_manager
import data_process
import code_data_util
import constant
import global_data_loader
import global_util
import industry_codes_sort
import l2_data_log
import l2_data_util
import gpcode_manager
import l2_trade_factor
import log
import redis_manager
import ths_industry_util
import tool
import trade_manager
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process
from trade_data_manager import TradeBuyDataManager
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process,logger_l2_data
import trade_data_manager
import limit_up_time_manager
_redisManager = redis_manager.RedisManager(1)
@@ -73,9 +78,9 @@
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        if _data_json is None:
            return None, None, None, 0
            return None, None, None, 0, 0
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2], _data[3]
        return _data[0], _data[1], _data[2], _data[3], _data[4]
    # 设置买入点的值
    # buy_single_index 买入信号位
@@ -83,16 +88,16 @@
    # compute_index 计算位置
    # nums 累计纯买额
    @staticmethod
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums):
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums, count):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
        if buy_single_index is not None:
            redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums)))
            redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums, count)))
        else:
            _buy_single_index, _buy_exec_index, _compute_index, _nums = TradePointManager.get_buy_compute_start_data(
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count = TradePointManager.get_buy_compute_start_data(
                code)
            redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums)))
            redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums, count)))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
@@ -183,20 +188,28 @@
        # 获取今日的数据
    if local_today_datas.get(code) is None or force:
        datas = []
        keys = redis.keys("l2-{}-*".format(code))
        for k in keys:
            value = redis.get(k)
            _data = l2_data_util.l2_data_key_2_obj(k, value)
            datas.append(_data)
        # 排序
        new_datas = sorted(datas,
                           key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
        local_today_datas[code] = new_datas
    # 根据今日数据加载
    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
        datas = log.load_l2_from_log()
        datas = datas.get(code)
        if datas is None:
            datas= []
        local_today_datas[code] = datas
        # 从数据库加载
        # datas = []
        # keys = redis.keys("l2-{}-*".format(code))
        # for k in keys:
        #     value = redis.get(k)
        #     _data = l2_data_util.l2_data_key_2_obj(k, value)
        #     datas.append(_data)
        # # 排序
        # new_datas = sorted(datas,
        #                    key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
        # local_today_datas[code] = new_datas
        # 根据今日数据加载
        l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
@tool.async_call
def saveL2Data(code, datas, msg=""):
    start_time = round(t.time() * 1000)
    # 查询票是否在待监听的票里面
@@ -245,30 +258,45 @@
    process_time = data["processTime"]
    data = data["data"]
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
    # 获取涨停价
    return day, client, channel, code, capture_time, process_time, datas
    return day, client, channel, code, capture_time, process_time, datas, data
# 保存l2数据
def save_l2_data(code, datas, add_datas):
    redis = _redisManager.getRedis()
    # 保存最近的数据
    redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
    # 设置进内存
    if code in local_latest_datas:
        local_latest_datas[code] = datas
    else:
        local_latest_datas.setdefault(code, datas)
    __set_l2_data_latest_count(code, len(datas))
    # 只有有新曾数据才需要保存
    if len(add_datas) > 0:
        # 保存最近的数据
        __start_time = round(t.time() * 1000)
        redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
        l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "保存最近l2数据用时")
        # 设置进内存
        local_latest_datas[code] = datas
        __set_l2_data_latest_count(code, len(datas))
        try:
            logger_l2_data.info("{}-{}",code,add_datas)
        except Exception as e:
            logging.exception(e)
        saveL2Data(code, add_datas)
# 清除l2数据
def clear_l2_data(code):
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    keys = redis_l2.keys("l2-{}-*".format(code))
    for k in keys:
        redis_l2.delete(k)
    redis_l2.delete("l2-data-latest-{}".format(code))
class L2DataUtil:
    @classmethod
    def is_same_time(cls, time1, time2):
        if global_util.TEST:
        if constant.TEST:
            return True
        time1_s = time1.split(":")
        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
@@ -363,10 +391,11 @@
                else:
                    limitPrice = 0
                item["limitPrice"] = "{}".format(limitPrice)
            # 不需要非涨停数据/非跌停数据
            if int(item["limitPrice"]) == 0:
                continue
            operateType = item["operateType"]
            # 不需要非涨停买与买撤
            if int(item["limitPrice"]) != 1 and (int(operateType) == 0 or int(operateType) == 1):
                continue
            cancelTime = item["cancelTime"]
            cancelTimeUnit = item["cancelTimeUnit"]
            key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
@@ -378,7 +407,8 @@
                # 数据重复次数默认为1
                datas.append({"key": key, "val": item, "re": 1})
                dataIndexs.setdefault(key, len(datas) - 1)
        l2_data_util.save_big_data(code, same_time_num, data)
        # TODO 测试的时候开启,方便记录大单数据
        # l2_data_util.save_big_data(code, same_time_num, data)
        return datas
    @classmethod
@@ -406,6 +436,21 @@
            return False
        return True
    # 是否为涨停卖
    @classmethod
    def is_limit_up_price_sell(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 2:
            return False
        price = float(val["price"])
        num = int(val["num"])
        if price * num * 100 < 50 * 10000:
            return False
        return True
    # 是否涨停买撤
    @classmethod
    def is_limit_up_price_buy_cancel(cls, val):
@@ -420,6 +465,20 @@
        if price * num * 100 < 50 * 10000:
            return False
        return True
    # 是否卖撤
    @classmethod
    def is_sell_cancel(cls, val):
        if int(val["operateType"]) == 3:
            return True
        return False
    # 是否为卖
    @classmethod
    def is_sell(cls, val):
        if int(val["operateType"]) == 2:
            return True
        return False
# L2交易数据处理器
@@ -459,7 +518,7 @@
            if len(datas) > 0:
                # 判断价格区间是否正确
                if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                if not code_data_util.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                          "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
                # 加载历史数据
@@ -484,17 +543,17 @@
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
                    # TODO 暂时关闭处理
                    if L2DataUtil.is_same_time(now_time_str, latest_time):
                        # 判断是否已经挂单
                        state = trade_manager.get_trade_state(code)
                        start_index = len(total_datas) - len(add_datas)
                        end_index = len(total_datas) - 1
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已挂单
                            cls.__process_order(code, start_index, end_index, capture_timestamp)
                        else:
                            # 未挂单
                            cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    # if L2DataUtil.is_same_time(now_time_str, latest_time):
                    #     # 判断是否已经挂单
                    #     state = trade_manager.get_trade_state(code)
                    #     start_index = len(total_datas) - len(add_datas)
                    #     end_index = len(total_datas) - 1
                    #     if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                    #         # 已挂单
                    #         cls.__process_order(code, start_index, end_index, capture_timestamp)
                    #     else:
                    #         # 未挂单
                    #         cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time)
                # 保存数据
@@ -722,6 +781,8 @@
        except Exception as e:
            cls.debug(code, "执行买入异常:{}", str(e))
            pass
        finally:
            cls.debug(code, "m值影响因子:", l2_trade_factor.L2TradeFactorUtil.factors_to_string(code))
    # 是否可以买
    @classmethod
@@ -735,7 +796,7 @@
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
        codes_index = industry_codes_sort.sort_codes(codes,code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
@@ -767,6 +828,7 @@
            L2BetchCancelBigNumProcessor.del_recod(code)
            cls.debug(code, "执行撤单成功")
        except Exception as e:
            logging.exception(e)
            cls.debug(code, "执行撤单异常:{}", str(e))
    @classmethod
@@ -774,10 +836,19 @@
        # 删除大群撤事件的大单
        L2BetchCancelBigNumProcessor.del_recod(code)
        L2ContinueLimitUpCountManager.del_data(code)
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
            # 取消买入标识
            TradePointManager.delete_buy_point(code)
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            # 删除大群撤事件的大单
            L2BetchCancelBigNumProcessor.del_recod(code)
        else:
            cls.__cancel_buy(code)
        L2BigNumProcessor.del_big_num_pos(code)
    @classmethod
@@ -831,7 +902,7 @@
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            TradePointManager.delete_count_info_for_cancel_buy(code)
            TradeBuyDataManager.remove_buy_position_info(code)
            trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
            # 已过时 为买撤保存基础纯买额
            # TradePointManager.set_compute_info_for_cancel_buy(code, compute_index, buy_nums)
            b_buy_count, b_buy_cancel_count = cls.__count_l2_data_before_for_cancel(code, buy_single_index)
@@ -905,7 +976,7 @@
                count += datas[i]["re"]
                if count >= continue_count:
                    return True, start
            else:
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                last_index = None
                count = 0
                start = None
@@ -931,7 +1002,7 @@
                    start = i
                    start_time = L2DataUtil.get_time_as_second(_val["time"])
                count += datas[i]["re"]
            else:
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                if count >= continue_count:
                    return start, i - 1
                start = -1
@@ -967,7 +1038,7 @@
                    start = i
                    start_time = L2DataUtil.get_time_as_second(_val["time"])
                count += int(datas[i]["re"])
            else:
            elif not L2DataUtil.is_limit_up_price_sell(_val):
                if count >= continue_count:
                    return start, i - 1
                start = -1
@@ -984,12 +1055,13 @@
    @classmethod
    def __get_threshmoney(cls, code):
        return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        money, msg = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        return money
    # 获取预估挂买位
    @classmethod
    def __get_sure_order_pos(cls, code):
        index, data = TradeBuyDataManager.get_buy_sure_position(code)
        index, data = trade_data_manager.TradeBuyDataManager.get_buy_sure_position(code)
        if index is None:
            return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1]
        else:
@@ -1323,9 +1395,9 @@
    @classmethod
    def test_can_order(cls):
        code = "002393"
        code = "000948"
        global_util.load_industry()
        global_data_loader.load_industry()
        limit_up_time_manager.load_limit_up_time()
        print(cls.__can_buy(code))
@@ -1618,8 +1690,9 @@
                if need_cancel:
                    # 需要撤单
                    # 撤单
                    cls.__cancel_buy(code, max_num_data["index"])
                    L2TradeDataProcessor.cancel_debug(code, "跟踪到大单无撤买信号-{},新跟踪的大单需要撤买-{}", index, max_num_data["index"])
                    cls.__cancel_buy(code, max_num_data["index"] if cancel_data is None else cancel_data)
                    L2TradeDataProcessor.cancel_debug(code, "原来跟踪到大单无撤买信号-{},新跟踪的大单需要撤买-{}", index,
                                                      max_num_data["index"])
                    return True, cancel_data
                else:
                    # 无需撤单
@@ -1695,8 +1768,8 @@
            if i <= latest_buy_index:
                total_count += total_datas[i]["re"]
        L2TradeDataProcessor.debug(code, "大群撤大单数量:{}/{}", count, total_count)
        # 大单小于5笔无脑撤
        if total_count <= 5:
        # 大单小于5笔无脑撤,后修改为无大单无脑撤
        if total_count <= 0:
            return True
        # 大单撤单笔数大于总大单笔数的1/5就撤单
@@ -1788,6 +1861,72 @@
                    index_set.add(d[1])
                    big_nums_info_new.append(d)
            cls.__save_recod(code, max_big_num_info, big_nums_info_new)
# 卖跟踪
class L2SellProcessor:
    @classmethod
    def __get_recod(cls, code):
        redis = _redisManager.getRedis()
        _val = redis.get("sell_num-{}".format(code))
        if _val is None:
            return None, None
        else:
            datas = json.loads(_val)
            return datas[0], datas[1]
    @classmethod
    def del_recod(cls, code):
        redis = _redisManager.getRedis()
        key = "sell_num-{}".format(code)
        redis.delete(key)
    @classmethod
    def __save_recod(cls, code, process_index, count):
        redis = _redisManager.getRedis()
        key = "sell_num-{}".format(code)
        redis.setex(key, tool.get_expire(), json.dumps((process_index, count)))
    # 暂时弃用
    @classmethod
    def need_cancel(cls, code, start_index, end_index):
        # 是否需要撤单
        process_index, count = cls.__get_recod(code)
        if process_index is None:
            # 无卖的信息
            return False
        if count is None:
            count = 0
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            return False
        if float(limit_up_price) * count * 100 >= l2_trade_factor.L2TradeFactorUtil.get_base_safe_val(
                global_util.zyltgb_map[code]):
            return True
        return False
    @classmethod
    def process(cls, code, start_index, end_index):
        # 处理大单
        # 获取大单列表,大单格式为:((num,index,re),[(num,index,re),(num,index,re)])
        total_datas = local_today_datas[code]
        process_index, count = cls.__get_recod(code)
        # 寻找最大值
        for index in range(start_index, end_index + 1):
            # 只处理涨停卖
            if not L2DataUtil.is_limit_up_price_sell(
                    total_datas[index]["val"]):
                continue
            # 不处理历史数据
            if process_index is not None and process_index >= index:
                continue
            if count is None:
                count = 0
            count += int(total_datas[index]["val"]["num"])
        if process_index is None:
            process_index = end_index
        cls.__save_recod(code, process_index, count)
def __get_time_second(time_str):
@@ -2035,4 +2174,4 @@
if __name__ == "__main__":
    L2TradeDataProcessor.test_can_order()
    clear_l2_data("603912")