Administrator
2022-10-12 be73e2b78857adaf006063275726b69c4c60f0d7
l2_data_manager.py
@@ -17,6 +17,7 @@
import l2_trade_factor
import redis_manager
import ths_industry_util
import tool
import trade_manager
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process
@@ -283,10 +284,10 @@
    def get_add_data(cls, code, datas, _start_index):
        if datas is not None and len(datas) < 1:
            return []
        last_key = ""
        __latest_datas = local_latest_datas.get(code)
        if __latest_datas is not None and len(__latest_datas) > 0:
            last_key = __latest_datas[-1]["key"]
        last_data = None
        latest_datas_ = local_latest_datas.get(code)
        if latest_datas_ is not None and len(latest_datas_) > 0:
            last_data = latest_datas_[-1]
        count = 0
        start_index = -1
@@ -294,13 +295,19 @@
        # 设置add_data的序号
        for n in reversed(datas):
            count += 1
            if n["key"] == last_key:
            if n["key"] == (last_data["key"] if last_data is not None else ""):
                start_index = len(datas) - count
                break
        _add_datas = []
        if len(last_key) > 0:
            if start_index < 0 or start_index + 1 >= len(datas):
        if last_data is not None:
            if start_index < 0:
                if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second(
                        last_data["val"]["time"]):
                    _add_datas = datas
                else:
                    _add_datas = []
            elif start_index + 1 >= len(datas):
                _add_datas = []
            else:
                _add_datas = datas[start_index + 1:]
@@ -356,6 +363,9 @@
                else:
                    limitPrice = 0
                item["limitPrice"] = "{}".format(limitPrice)
            # 不需要非涨停数据/非跌停数据
            if int(item["limitPrice"]) == 0:
                continue
            operateType = item["operateType"]
            cancelTime = item["cancelTime"]
            cancelTimeUnit = item["cancelTimeUnit"]
@@ -471,9 +481,6 @@
                #                                                    add_datas)
                if len(add_datas) > 0:
                    _start_time = round(t.time() * 1000)
                    # 计算大单数量
                    cls.__compute_big_money_data(code, add_datas)
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
                    # TODO 暂时关闭处理
@@ -497,10 +504,12 @@
                cls.unreal_buy_dict.pop(code)
    @classmethod
    def __compute_big_money_data(cls, code, add_datas):
    def __compute_big_money_data(cls, code, start_index, end_index):
        # 计算大单
        total_datas = local_today_datas[code]
        num = 0
        for data in add_datas:
        for index in range(start_index, end_index + 1):
            data = total_datas[index]
            if l2_trade_factor.L2TradeFactorSourceDataUtil.is_big_money(data):
                if int(data["val"]["operateType"]) == 0:
                    num += data["re"]
@@ -693,6 +702,14 @@
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        can, reason = cls.__can_buy(code)
        # 不能购买
        if not can:
            cls.debug(code, "不可以下单,原因:{}", reason)
            return
        else:
            cls.debug(code, "可以下单,原因:{}", reason)
        # 删除虚拟下单
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
@@ -705,6 +722,36 @@
        except Exception as e:
            cls.debug(code, "执行买入异常:{}", str(e))
            pass
    # 是否可以买
    @classmethod
    def __can_buy(cls, code):
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and L2DataUtil.get_time_as_second(limit_up_time) >= L2DataUtil.get_time_as_second(
                "14:30:00"):
            return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
        # 同一板块中老二后面的不能买
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
        # 13:00后涨停,本板块中涨停票数<29不能买
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None:
            if int(limit_up_time.replace(":", "")) >= 130000 and global_util.industry_hot_num.get(industry) is not None:
                if global_util.industry_hot_num.get(industry) < 29:
                    return False, "13:00后涨停,本板块中涨停票数<29不能买"
        # 老二,本板块中涨停票数<29 不能买
        if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
                industry) is not None:
            if global_util.industry_hot_num.get(industry) < 29:
                return False, "老二,本板块中涨停票数<29不能买"
        # 可以下单
        return True, None
    @classmethod
    def __cancel_buy(cls, code):
@@ -731,6 +778,7 @@
            cls.unreal_buy_dict.pop(code)
        else:
            cls.__cancel_buy(code)
        L2BigNumProcessor.del_big_num_pos(code)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
@@ -753,11 +801,14 @@
                new_get_pos = True
                cls.debug(code, "获取到买入信号起始点:{}  数据:{}", buy_single_index, total_datas[buy_single_index])
                limit_up_time_manager.save_limit_up_time(code, total_datas[buy_single_index]["val"]["time"])
                # 重置大单计算
                big_money_num_manager.reset(code)
        if buy_single_index is None:
            # 未获取到买入信号,终止程序
            return None
        # TODO 可能存在问题 计算大单数量
        cls.__compute_big_money_data(code, max(compute_start_index, buy_single_index), compute_end_index)
        # 买入纯买额统计
        compute_index, buy_nums, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,
                                                                                           compute_start_index),
@@ -792,7 +843,7 @@
                                                                                       compute_index)
            # 计算大群撤的大单
            L2BetchCancelBigNumProcessor.process_new(code, buy_single_index, compute_index)
            # 连续涨停数计算
            L2ContinueLimitUpCountManager.process(code, buy_single_index, compute_index)
            # 数据是否处理完毕
@@ -846,7 +897,8 @@
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            if L2DataUtil.is_limit_up_price_buy(_val) and (last_index is None or (i - last_index == 1 and datas[last_index]["val"]["time"] == datas[i]["val"]["time"])):
            if L2DataUtil.is_limit_up_price_buy(_val) and (last_index is None or (
                    i - last_index == 1 and datas[last_index]["val"]["time"] == datas[i]["val"]["time"])):
                if start is None:
                    start = i
                last_index = i
@@ -925,10 +977,6 @@
            return start, end_index
        else:
            return None, None
    # 是否可以下单
    def __is_can_order(self):
        pass
    # 虚拟下单
    def __unreal_order(self):
@@ -1268,10 +1316,18 @@
    @classmethod
    def test2(cls):
        code = "000677"
        code = "600082"
        load_l2_data(code, True)
        cls.random_key[code] = random.randint(0, 100000)
        L2BetchCancelBigNumProcessor.process_new(code, 57, 150)
        need_cancel, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, 121, 123)
    @classmethod
    def test_can_order(cls):
        code = "002393"
        global_util.load_industry()
        limit_up_time_manager.load_limit_up_time()
        print(cls.__can_buy(code))
# 连续涨停买单数最大值管理器
@@ -1396,7 +1452,7 @@
class L2BigNumProcessor:
    # 是否需要根据大单撤单,返回是否需要撤单与撤单信号的数据
    @classmethod
    def __need_cancel_with_max_num(cls, code, max_num_info):
    def __need_cancel_with_max_num(cls, code, max_num_info, start_index, end_index):
        if max_num_info is None:
            return False, None
        # 如果是买入单,需要看他前面同一秒是否有撤单
@@ -1409,6 +1465,9 @@
                if cancel_datas is not None:
                    for cancel_data in cancel_datas:
                        # 只能在当前规定的数据范围查找,以防出现重复查找
                        if cancel_data["index"] < start_index or cancel_data["index"] > end_index:
                            continue
                        if cancel_data["index"] > max_num_info["index"]:
                            buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
                                                                                             local_today_num_operate_map[
@@ -1417,6 +1476,7 @@
                                continue
                            if buy_data["val"]["time"] != max_num_info["val"]["time"]:
                                continue
                            min_space, max_space = l2_data_util.compute_time_space_as_second(
                                cancel_data["val"]["cancelTime"],
                                cancel_data["val"][
@@ -1491,7 +1551,7 @@
        return index
    @classmethod
    def __del_big_num_pos(cls, code):
    def del_big_num_pos(cls, code):
        redis = _redisManager.getRedis()
        redis.delete("big_num_pos-{}".format(code))
@@ -1499,7 +1559,6 @@
    def __cancel_buy(cls, code, index):
        L2TradeDataProcessor.debug(code, "撤买,触发位置-{},触发条件:大单,数据:{}", index, local_today_datas[code][index])
        L2TradeDataProcessor.cancel_buy(code)
        cls.__del_big_num_pos(code)
    # 处理数据中的大单,返回是否已经撤单和撤单数据的时间
    @classmethod
@@ -1521,12 +1580,12 @@
            L2TradeDataProcessor.debug(code, "获取到大单位置信息:{}", json.dumps(new_max_info))
            index = new_max_info["index"]
            # 大单是否有撤单信号
            need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, new_max_info)
            need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, new_max_info, start_index, end_index)
            if need_cancel:
                # 需要撤单
                # 撤单
                L2TradeDataProcessor.cancel_debug(code, "新找到大单-{},需要撤买", new_max_info["index"])
                cls.__cancel_buy(code, index)
                cls.__cancel_buy(code, new_max_info["index"])
                return True, cancel_data,
            else:
@@ -1538,7 +1597,7 @@
            # 有大单记录
            need_cancel = False
            cancel_index = -1
            need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, total_data[index])
            need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, total_data[index], start_index, end_index)
            # 需要撤单
            if need_cancel:
                # 撤单
@@ -1554,7 +1613,8 @@
                L2TradeDataProcessor.debug(code, "找到大单位置信息:{}", json.dumps(max_num_data))
                # 大单是否有撤单信号
                need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, max_num_data)
                need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, max_num_data, max_num_data["index"],
                                                                          end_index)
                if need_cancel:
                    # 需要撤单
                    # 撤单
@@ -1566,6 +1626,13 @@
                    # 保存大单记录
                    cls.__save_big_num_pos(code, max_num_data["index"])
                    return False, cancel_data
    @classmethod
    def test(cls):
        code = "000036"
        load_l2_data(code, True)
        new_max_info = cls.__compute_max_num(code, 470, 476, None, "09:32:59")
        print(new_max_info)
# 大群撤大单跟踪
@@ -1627,6 +1694,7 @@
        for i in index_set:
            if i <= latest_buy_index:
                total_count += total_datas[i]["re"]
        L2TradeDataProcessor.debug(code, "大群撤大单数量:{}/{}", count, total_count)
        # 大单小于5笔无脑撤
        if total_count <= 5:
            return True
@@ -1967,4 +2035,4 @@
if __name__ == "__main__":
    L2TradeDataProcessor.test1()
    L2TradeDataProcessor.test_can_order()