Administrator
2022-09-16 b7000cbf5e67e90abe53e96a4ea931afbf906e24
l2_data_manager.py
@@ -1,18 +1,19 @@
import decimal
import json
import os
import time as t
from datetime import datetime
import data_process
import l2_data_util
import mysql
import gpcode_manager
import mongo_data
import redis_manager
import tool
import trade_manager
from log import logger_l2_trade
from trade_data_manager import TradeBuyDataManager
_redisManager = redis_manager.RedisManager(1)
# l2数据管理
@@ -91,25 +92,30 @@
    @staticmethod
    def get_buy_cancel_compute_start_data(code):
        redis = TradePointManager.__get_redis()
        index = redis.get("buy_cancel_compute_index-{}".format(code))
        total_num = redis.get("buy_cancel_compute_num-{}".format(code))
        if index is None:
            return None, 0
        info = redis.get("buy_cancel_compute_info-{}".format(code))
        if info is None:
            return None, None , None
        else:
            return int(index), int(total_num)
            info=json.loads(info)
            return info[0],info[1],info[2]
    # 设置买撤点信息
    @staticmethod
    def set_buy_cancel_compute_start_data(code, num_add, index=None):
    # buy_num 纯买额  computed_index计算到的下标  index撤买信号起点
    @classmethod
    def set_buy_cancel_compute_start_data(cls,code, buy_num,computed_index, index):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        if index is not None:
            redis.setex("buy_cancel_compute_index-{}".format(code), expire, index)
        key = "buy_cancel_compute_num-{}".format(code)
        if redis.get(key) is None:
            redis.setex(key, expire, num_add)
        else:
            redis.incrby(key, num_add)
        redis.setex("buy_cancel_compute_info-{}".format(code), expire, json.dumps((index,buy_num,computed_index)))
    # 增加撤买的纯买额
    @classmethod
    def add_buy_nums_for_cancel(cls,code,num_add,computed_index):
        cancel_index,nums,c_index= cls.get_buy_cancel_compute_start_data(code)
        if cancel_index is None:
            raise Exception("无撤买信号记录")
        nums+=num_add
        cls.set_buy_cancel_compute_start_data(code,nums,computed_index)
def load_l2_data(code, force=False):
@@ -140,7 +146,8 @@
    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
def saveL2Data(code, datas):
def saveL2Data(code, datas, msg=""):
    start_time = round(t.time() * 1000)
    # 查询票是否在待监听的票里面
    if not gpcode_manager.is_in_gp_pool(code):
        return None
@@ -152,22 +159,21 @@
            # 计算保留的时间
            expire = tool.get_expire()
            index = 0
            start_index = redis_instance.get("l2-maxindex-{}".format(code))
            if start_index is None:
                start_index = 0
                start_index = -1
            else:
                start_index = int(start_index)
            max_index = start_index
            i = 0
            for _data in datas:
                index = index + 1
                i += 1
                key = "l2-" + _data["key"]
                value = redis_instance.get(key)
                if value is None:
                    # 新增
                    max_index = start_index + index
                    value = {"index": start_index + index, "re": _data['re']}
                    max_index = start_index + i
                    value = {"index": start_index + i, "re": _data["re"]}
                    redis_instance.setex(key, expire, json.dumps(value))
                else:
                    json_value = json.loads(value)
@@ -179,77 +185,29 @@
    finally:
        redis_instance.delete("l2-save-{}".format(code))
    print("保存新数据用时:", msg, round(t.time() * 1000) - start_time)
    return datas
# TODO 获取l2的数据
def get_l2_data_index(code, key):
    pass
def parseL2Data(str):
    now = int(t.time())
    day = datetime.now().strftime("%Y%m%d")
    dict = json.loads(str)
    data = dict["data"]
    client = dict["client"]
    code = data["code"]
    channel = data["channel"]
    capture_time = data["captureTime"]
    process_time = data["processTime"]
    data = data["data"]
    datas = []
    dataIndexs = {}
    # 获取涨停价
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    same_time_num = {}
    for item in data:
        # 解析数据
        time = item["time"]
        if time in same_time_num:
            same_time_num[time] = same_time_num[time] + 1
        else:
            same_time_num[time] = 1
        price = float(item["price"])
        num = item["num"]
        limitPrice = item["limitPrice"]
        # 涨停价
        if limit_up_price is not None and limit_up_price == tool.to_price(decimal.Decimal(price)):
            limitPrice = 1
            item["limitPrice"] = "{}".format(limitPrice)
        operateType = item["operateType"]
        cancelTime = item["cancelTime"]
        cancelTimeUnit = item["cancelTimeUnit"]
        key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
                                               cancelTimeUnit)
        if key in dataIndexs:
            # 数据重复次数+1
            datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
        else:
            # 数据重复次数默认为1
            datas.append({"key": key, "val": item, "re": 1})
            dataIndexs.setdefault(key, len(datas) - 1)
    for key in same_time_num:
        if same_time_num[key] > 50:
            # 只能保存近3s的数据
            ts1 = l2_data_util.get_time_as_seconds(datas[-1]["val"]["time"])
            ts_now = l2_data_util.get_time_as_seconds(datetime.now().strftime("%H:%M:%S"))
            if abs(ts1 - ts_now) <= 3:
                # TODO 保存数据
                redis = _redisManager.getRedis()
                redis.set("big_data-{}-{}".format(code, int(round(t.time() * 1000))), str)
    return day, client, channel, code, datas
# 纠正数据,将re字段替换为较大值
def correct_data(code, _datas):
    latest_data = local_latest_datas.get(code)
    if latest_data is None:
        latest_data = []
    for data in _datas:
        for _ldata in latest_data:
            if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]:
                max_re = max(_ldata["re"], data["re"])
                _ldata["re"] = max_re
                data["re"] = max_re
    return _datas
    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
    # 获取涨停价
    return day, client, channel, code, capture_time, process_time, datas
# 保存l2数据
@@ -267,55 +225,543 @@
        saveL2Data(code, add_datas)
# 获取增量数据
def get_add_data(code, datas):
    if datas is not None and len(datas) < 1:
        return []
    last_key = ""
    __latest_datas = local_latest_datas.get(code)
    if __latest_datas is not None and len(__latest_datas) > 0:
        last_key = __latest_datas[-1]["key"]
    count = 0
    start_index = -1
    # 如果原来没有数据
    for n in reversed(datas):
        count += 1
        if n["key"] == last_key:
            start_index = len(datas) - count
            break
    if len(last_key) > 0:
        if start_index < 0 or start_index + 1 >= len(datas):
            return []
class L2DataUtil:
    @classmethod
    def is_same_time(cls, time1, time2):
        # TODO 测试
        # if 1 > 0:
        #     return True
        time1_s = time1.split(":")
        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
        time2_s = time2.split(":")
        time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
        if abs(time2_second - time1_second) < 3:
            return True
        else:
            return datas[start_index + 1:]
    else:
        return datas[start_index + 1:]
            return False
    # 获取增量数据
    @classmethod
    def get_add_data(cls, code, datas, _start_index):
        if datas is not None and len(datas) < 1:
            return []
        last_key = ""
        __latest_datas = local_latest_datas.get(code)
        if __latest_datas is not None and len(__latest_datas) > 0:
            last_key = __latest_datas[-1]["key"]
        count = 0
        start_index = -1
        # 如果原来没有数据
        # TODO 设置add_data的序号
        for n in reversed(datas):
            count += 1
            if n["key"] == last_key:
                start_index = len(datas) - count
                break
def __is_same_time(time1, time2):
    # TODO 测试
    # if 1 > 0:
    #     return True
    time1_s = time1.split(":")
    time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
    time2_s = time2.split(":")
    time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
    if abs(time2_second - time1_second) < 3:
        _add_datas = []
        if len(last_key) > 0:
            if start_index < 0 or start_index + 1 >= len(datas):
                _add_datas = []
            else:
                _add_datas = datas[start_index + 1:]
        else:
            _add_datas = datas[start_index + 1:]
        for i in range(0, len(_add_datas)):
            _add_datas[i]["index"] = _start_index + i
        return _add_datas
    # 纠正数据,将re字段替换为较大值
    @classmethod
    def correct_data(cls, code, _datas):
        latest_data = local_latest_datas.get(code)
        if latest_data is None:
            latest_data = []
        save_list = []
        for data in _datas:
            for _ldata in latest_data:
                if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]:
                    max_re = max(_ldata["re"], data["re"])
                    _ldata["re"] = max_re
                    data["re"] = max_re
                    # 保存到数据库,更新re的数据
                    save_list.append(_ldata)
        if len(save_list) > 0:
            saveL2Data(code, save_list, "保存纠正数据")
        return _datas
    # 处理l2数据
    @classmethod
    def format_l2_data(cls, data, code, limit_up_price):
        datas = []
        dataIndexs = {}
        same_time_num = {}
        for item in data:
            # 解析数据
            time = item["time"]
            if time in same_time_num:
                same_time_num[time] = same_time_num[time] + 1
            else:
                same_time_num[time] = 1
            price = float(item["price"])
            num = item["num"]
            limitPrice = item["limitPrice"]
            # 涨停价
            if limit_up_price is not None and limit_up_price == tool.to_price(decimal.Decimal(price)):
                limitPrice = 1
                item["limitPrice"] = "{}".format(limitPrice)
            operateType = item["operateType"]
            cancelTime = item["cancelTime"]
            cancelTimeUnit = item["cancelTimeUnit"]
            key = "{}-{}-{}-{}-{}-{}-{}-{}".format(code, operateType, time, num, price, limitPrice, cancelTime,
                                                   cancelTimeUnit)
            if key in dataIndexs:
                # 数据重复次数+1
                datas[dataIndexs[key]]['re'] = datas[dataIndexs[key]]['re'] + 1
            else:
                # 数据重复次数默认为1
                datas.append({"key": key, "val": item, "re": 1})
                dataIndexs.setdefault(key, len(datas) - 1)
        l2_data_util.save_big_data(code, same_time_num, data)
        return datas
    @classmethod
    def get_time_as_second(time_str):
        ts = time_str.split(":")
        return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    # 是否是涨停价买
    def is_limit_up_price_buy(val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 0:
            return False
        price = float(val["price"])
        num = int(val["num"])
        if price * num * 100 < 50 * 10000:
            return False
        return True
    else:
        return False
    # 是否涨停买撤
    def is_limit_up_price_buy_cancel(val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 1:
            return False
        price = float(val["price"])
        num = int(val["num"])
        if price * num * 100 < 50 * 10000:
            return False
        return True
def process_data(code, datas):
# L2交易数据处理器
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    @classmethod
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp):
        now_time_str = datetime.now().strftime("%H:%M:%S")
        __start_time = round(t.time() * 1000)
        try:
            if len(datas) > 0:
                # 判断价格区间是否正确
                if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                          "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
                # 加载历史数据
                load_l2_data(code)
                # 纠正数据
                datas = L2DataUtil.correct_data(code, datas)
                _start_index = 0
                if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"]
                add_datas = L2DataUtil.get_add_data(code, datas, _start_index)
                if len(add_datas) > 0:
                    # 拼接数据
                    local_today_datas[code].extend(add_datas)
                    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
                total_datas = local_today_datas[code]
                # 买入确认点处理
                TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas,
                                                                   total_datas[-1],
                                                                   add_datas)
                if len(add_datas) > 0:
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
                    if L2DataUtil.is_same_time(now_time_str, latest_time):
                        # 判断是否已经挂单
                        state = trade_manager.get_trade_state(code)
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已挂单
                            cls.process_order(code, add_datas)
                        else:
                            # 未挂单
                            cls.process_not_order(code, add_datas)
                # 保存数据
                save_l2_data(code, datas, add_datas)
        finally:
            if code in cls.unreal_buy_dict:
                cls.unreal_buy_dict.pop(code)
    # 处理未挂单
    @classmethod
    def process_not_order(cls, code, add_datas):
    # 处理已挂单
    @classmethod
    def process_order(cls, code, add_datas):
        # 获取之前是否有记录的撤买信号
        cancel_index, buy_num_for_cancel,computed_index= cls.has_order_cancel_begin_pos(code)
        buy_index, buy_num = cls.get_order_begin_pos(code)
        if cancel_index is None:
            # 无撤单信号起始点记录
            cancel_index = cls.compute_order_cancel_begin_single(code, len(add_datas) + 3, 3)
            buy_num_for_cancel = 0
            computed_index=buy_index
        if cancel_index is not None:
            # 获取阈值 有买撤信号,统计撤买纯买额
            threshold_money=10000000
            cls.start_compute_cancel(code,cancel_index,computed_index,buy_num_for_cancel,threshold_money)
        else:
            # 无买撤信号,终止执行
            pass
    #开始计算撤的信号
    @classmethod
    def start_compute_cancel(cls,code,cancel_index, compute_start_index,origin_num,threshold_money):
        # sure_type 0-虚拟挂买位  1-真实挂买位
        computed_index , buy_num_for_cancel,sure_type = cls.sum_buy_num_for_cancel_order(code,compute_start_index,origin_num,threshold_money)
        total_datas = local_today_datas[code]
        if computed_index is not None:
            # 发出撤买信号,需要撤买
            if cls.unreal_buy_dict.get(code) is not None:
                # 有虚拟下单
                # 删除虚拟下单标记
                cls.unreal_buy_dict.pop(code)
                # TODO 删除下单标记位置
                pass
            else:
                # 无虚拟下单,需要执行撤单
                logger_l2_trade.info(
                    "执行撤销:{} - {}".format(code, json.dumps(total_datas[computed_index])))
                try:
                    trade_manager.start_cancel_buy(code)
                    # 取消买入标识
                    TradePointManager.delete_buy_point(code)
                    TradePointManager.delete_buy_cancel_point(code)
                except Exception as e:
                    pass
            if computed_index < len(local_today_datas[code])-1:
                # TODO数据尚未处理完,重新进入下单计算流程
                cls.start_compute_buy(code,computed_index+1,0,threshold_money)
                pass
        else:
            #无需撤买,记录撤买信号
            TradePointManager.set_buy_cancel_compute_start_data(code,buy_num_for_cancel,len(total_datas)-1,cancel_index)
            # 判断是否有虚拟下单
            unreal_buy_info=cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                logger_l2_trade.info(
                    "执行买入:{} ".format(code))
                try:
                    trade_manager.start_buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]],
                                            unreal_buy_info[0])
                    TradePointManager.delete_buy_cancel_point(code)
                except Exception as e:
                    pass
                pass
            else:
                #终止执行
                pass
    @classmethod
    def start_compute_buy(cls,code,compute_start_index,origin_num,threshold_money):
        total_datas=local_today_datas[code]
        # 获取买入信号计算起始位置
        index, num = cls.get_order_begin_pos(code)
        # 是否为新获取到的位置
        new_get_pos = False
        if index is None:
            # 有买入信号
            has_single, index = cls.compute_order_begin_pos(code, len(total_datas) - compute_start_index , 3)
            if has_single:
                num = 0
                new_get_pos = True
                # TODO 记录买入信号位置
        if index is None:
            # 未获取到买入信号,终止程序
            return None
        # 买入纯买额统计
        # TODO 获取阈值
        threshold_money=10000000
        compute_index,buy_nums = cls.sum_buy_num_for_order(code,compute_start_index,num,threshold_money)
        if compute_index is not None:
            # 达到下单条件
            # 虚拟下单
            cls.unreal_buy_dict[code]=(compute_index,capture_time)
        else:
            # TODO 未达到下单条件,保存纯买额,设置纯买额
        pass
    # 获取下单起始信号
    @classmethod
    def get_order_begin_pos(cls, code):
        index, num = TradePointManager.get_buy_compute_start_data(code)
        return index, num
    # 获取撤单起始位置
    @classmethod
    def has_order_cancel_begin_pos(cls):
        # cancel_index:撤单信号起点
        # buy_num_for_cancel:从挂入点计算的纯买额
        # computed_index 计算的最后位置
        cancel_index, buy_num_for_cancel,computed_index = TradePointManager.get_buy_cancel_compute_start_data(code)
        return cancel_index, buy_num_for_cancel,computed_index
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
    def compute_order_begin_pos(self, code, compute_data_count, continue_count):
        # 倒数100条数据查询
        datas = local_today_datas[code]
        __len = len(datas)
        if __len < continue_count:
            return None
        start_index = 0
        if compute_data_count > __len:
            compute_data_count = __len
        if __len > compute_data_count:
            start_index = __len - compute_data_count
        __time = None
        _limit_up_count_1s = 0
        _limit_up_count_1s_start_index = -1
        for i in range(start_index, __len - (continue_count - 1)):
            _val = datas[i]["val"]
            # 时间要>=09:30:00
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            # 有连续4个涨停买就标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                index_0 = i
                index_1 = -1
                index_2 = -1
                # index_3 = -1
                for j in range(index_0 + 1, __len):
                    # 涨停买
                    if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]):
                        index_1 = j
                        break
                if index_1 > 0:
                    for j in range(index_1 + 1, __len):
                        # 涨停买
                        if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]):
                            index_2 = j
                            break
                # if index_2 > 0:
                #     for j in range(index_2 + 1, __len):
                #         # 涨停买
                #         if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0:
                #             index_3 = j
                if index_1 - index_0 == 1 and index_2 - index_1 == 1:  # and index_3 - index_2 == 1
                    logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i]))
                    return i
            # 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                if __time is None:
                    _time = datas[i]["val"]["time"]
                    _limit_up_count_1s = 1
                    _limit_up_count_1s_start_index = i
                elif _time == _val["time"]:
                    _limit_up_count_1s += 1
                else:
                    _time = datas[i]["val"]["time"]
                    _limit_up_count_1s = 1
                    _limit_up_count_1s_start_index = i
            elif _val["operateType"] == 1:
                # 买撤
                _time = None
                _limit_up_count_1s = 0
                _limit_up_count_1s_start_index = -1
            if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1:
                logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i]))
                return _limit_up_count_1s_start_index
        return None
    # 是否有撤销信号
    @classmethod
    def compute_order_cancel_begin_single(cls, code, compute_data_count, continue_count):
        datas = local_today_datas[code]
        __len = len(datas)
        if __len < continue_count:
            return None
        start_index = 0
        if compute_data_count > __len:
            compute_data_count = __len
        if __len > compute_data_count:
            start_index = __len - compute_data_count
        for i in range(start_index, __len - (continue_count - 1)):
            _val = datas[i]["val"]
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            # 有连续3个买撤
            if L2DataUtil.is_limit_up_price_buy_cancel(_val):
                index_0 = i
                index_1 = -1
                index_2 = -1
                for j in range(index_0 + 1, __len):
                    # 涨停买
                    if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]):
                        index_1 = j
                        break
                if index_1 > 0:
                    for j in range(index_1 + 1, __len):
                        # 涨停买
                        if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]):
                            index_2 = j
                            break
                if index_1 - index_0 == 1 and index_2 - index_1 == 1:
                    logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i])))
                    return i
        return None
    # 保存下单位置
    def save_order_pos(self):
        pass
    # 是否可以下单
    def is_can_order(self):
        pass
    # 虚拟下单
    def unreal_order(self):
        pass
    # 设置虚拟挂买位
    def set_unreal_sure_order_pos(self):
        pass
    # 获取预估挂买位
    @classmethod
    def get_sure_order_pos(cls, code):
        index, data = TradeBuyDataManager.get_buy_sure_position(code)
        if index is None:
            return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1]
        else:
            return 1, index, data
    # 统计买入净买量
    @classmethod
    def sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money):
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        for i in range(compute_start_index, len(total_datas)):
            _val = total_datas[i]["val"]
            # 有连续4个涨停买就标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    return i, buy_nums
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停买撤
                buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
        return None, buy_nums
    # 同一时间买入的概率计算
    @classmethod
    def get_same_time_property(cls, code):
        # TODO 与板块热度有关
        return 0.5
    # 统计买撤净买量
    @classmethod
    def sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money):
        buy_nums = origin_num
        total_datas = local_today_datas[code]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买
        sure_type, sure_pos, sure_data = cls.get_sure_order_pos(code)
        same_time_property = cls.get_same_time_property(code)
        # 同一秒,在预估买入位之后的数据之和
        property_buy_num_count = 0
        for i in range(start_index, len(total_datas)):
            data = total_datas[i]
            _val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                if i < sure_pos:
                    buy_nums += int(_val["num"]) * int(data["re"])
                elif sure_data["val"]["time"] == _val["time"]:
                    # 同一秒买入,而且还在预估买入位之后
                    property_buy_num_count += int(_val["num"]) * int(data["re"])
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停撤买
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, total_datas)
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index < sure_pos:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                    elif sure_data["val"]["time"] == _val["time"]:
                        # 同一秒,而且还在预估买入位之后按概率计算
                        property_buy_num_count -= int(_val["num"]) * int(data["re"])
                else:
                    # TODO 未找到买撤数据的买入点
                    pass
            property_buy_num = round(property_buy_num_count * same_time_property)
            if buy_nums + property_buy_num <= threshold_num:
                return i, buy_nums + property_buy_num,sure_type
        return None, buy_nums + round(property_buy_num_count * same_time_property),sure_type
def process_data(code, datas, capture_timestamp):
    now_time_str = datetime.now().strftime("%H:%M:%S")
    __start_time = round(t.time() * 1000)
    try:
        if len(datas) > 0:
            # 判断价格区间是否正确
            if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code,datas[0]["val"]["price"]))
                raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                      "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
            # 加载历史数据
            load_l2_data(code)
            # 纠正数据
@@ -325,7 +771,11 @@
                # 拼接数据
                local_today_datas[code].extend(add_datas)
                l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
            total_datas = local_today_datas[code]
            # 买入确认点处理
            TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas, total_datas[-1],
                                                               add_datas)
            if len(add_datas) > 0:
                latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                # 时间差不能太大才能处理
                if __is_same_time(now_time_str, latest_time):
@@ -342,11 +792,14 @@
                        # 没有计算开始点
                        c_index = __get_limit_up_buy_start(code, len(add_datas) + 3, 3)
                        if c_index is not None:
                            total_datas = local_today_datas[code]
                            logger_l2_trade.info("找到买点:{} - {}".format(code, json.dumps(total_datas[c_index])))
                            # 触发数据分析 ,获取连续涨停标记数据
                            buy_nums = 0
                            # 获取涨停价
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            last_data_index = -1
                            for i in range(c_index, len(total_datas)):
                                _val = total_datas[i]["val"]
                                # 有连续4个涨停买就标记计算起始点
@@ -356,24 +809,32 @@
                                elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                    # 涨停买撤
                                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                                if buy_nums * limit_up_price * 100 > 1000 * 10000:
                                    last_data_index = i
                                    break
                            TradePointManager.set_buy_compute_start_data(code, buy_nums, c_index)
                            # 获取涨停价
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            if limit_up_price is not None:
                                if buy_nums * limit_up_price * 100 > 1000 * 10000:
                                if last_data_index > -1:
                                    # 大于1000w就买
                                    logger_l2_trade.info(
                                        "执行买入:{} - 计算结束点: {}".format(code, json.dumps(total_datas[-1])))
                                    try:
                                        trade_manager.start_buy(code)
                                        trade_manager.start_buy(code, capture_timestamp, total_datas[last_data_index],
                                                                last_data_index)
                                        TradePointManager.delete_buy_cancel_point(code)
                                    except Exception as e:
                                        pass
                    else:
                        # 有计算开始点,计算新增的数据
                        buy_nums = 0
                        buy_nums = c_num
                        last_data = None
                        last_data_index = len(total_datas) - len(add_datas) - 1
                        # 获取涨停价
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        for data in add_datas:
                            last_data_index += 1
                            _val = data["val"]
                            if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0:
                                # 涨停买
@@ -381,16 +842,17 @@
                            elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                # 涨停买撤
                                buy_nums -= int(_val["num"]) * int(data["re"])
                            if buy_nums * limit_up_price * 100 > 1000 * 10000:
                                last_data = data
                                break
                        TradePointManager.set_buy_compute_start_data(code, buy_nums)
                        latest_num = c_num + buy_nums
                        # 获取涨停价
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        if limit_up_price is not None:
                            if latest_num * limit_up_price * 100 > 1000 * 10000:
                            if last_data is not None:
                                # 大于1000w就买
                                logger_l2_trade.info("执行买入:{} - 计算结束点: {}".format(code, json.dumps(add_datas[-1])))
                                try:
                                    trade_manager.start_buy(code)
                                    trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index)
                                    TradePointManager.delete_buy_cancel_point(code)
                                except Exception as e:
                                    pass
@@ -697,25 +1159,25 @@
if __name__ == "__main__":
    # 删除大数据
    redis = redis_manager.RedisManager(1).getRedis()
    keys = redis.keys("big_data*")
    for key in keys:
        redis.delete(key)
    # print("big_data-{}-{}".format("123", int(round(t.time() * 1000))))
    # load_l2_data("002868")
    # keys= local_today_num_operate_map["002868"]
    # for k in keys:
    #     print(len( local_today_num_operate_map["002868"][k]))
    # pass
    # __set_buy_compute_start_data("000000", 100, 1)
    # __set_buy_compute_start_data("000000", 100)
    # __set_l2_data_latest_count("000333", 20)
    # print(type(get_l2_data_latest_count("000333")))
    # datas = ["2", "3", "4", "5"]
    # print(datas[4:])
    # print(decimal.Decimal("19.294").quantize(decimal.Decimal("0.00"), decimal.ROUND_HALF_UP))
    # 获取增量数据
    # 保存数据
    # 拼接数据
    code = "000868"
    local_today_datas.setdefault(code, [])
    path = "C:/Users/Administrator/Desktop/demo/000868/"
    for file_name in os.listdir(path):
        p = "{}{}".format(path, file_name)
        f = open(p)
        for line in f.readlines():  # 依次读取每行
            line = line.strip()
            data = json.loads(line)
            result = __format_l2_data(data, code, 10.00)
            add_datas = get_add_data(code, result)
            print("增加的数量:", len(add_datas))
            if len(add_datas) > 0:
                # 拼接数据
                local_today_datas[code].extend(add_datas)
            if code in local_latest_datas:
                local_latest_datas[code] = result
            else:
                local_latest_datas.setdefault(code, result)
        f.close()
    for d in local_today_datas[code]:
        print(d["val"]["time"], d["val"]["num"], d["val"]["operateType"], d["re"])