Administrator
2022-09-20 5ae8b19fdc000fc719f3ad45fa5f7462fdbffbdf
l2_data_manager.py
@@ -1,6 +1,9 @@
import decimal
import json
import logging
import os
import random
import threading
import time as t
from datetime import datetime
@@ -15,7 +18,7 @@
import redis_manager
import tool
import trade_manager
from log import logger_l2_trade
from log import logger_l2_trade, logger_l2_trade_cancel
from trade_data_manager import TradeBuyDataManager
_redisManager = redis_manager.RedisManager(1)
@@ -67,21 +70,26 @@
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        if _data_json is None:
            return None, 0, None
            return None, None, None, 0
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2]
        return _data[0], _data[1], _data[2], _data[3]
    # 设置买入点的值
    # buy_single_index 买入信号位
    # buy_exec_index 买入执行位
    # compute_index 计算位置
    # nums 累计纯买额
    @staticmethod
    def set_buy_compute_start_data(code, nums, compute_index, buy_index):
    def set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, nums):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
        if buy_index is not None:
            redis.setex(_key, expire, json.dumps((buy_index, nums, compute_index)))
        if buy_single_index is not None:
            redis.setex(_key, expire, json.dumps((buy_single_index, buy_exec_index, compute_index, nums)))
        else:
            _buy_index, _nums, _compute_index = TradePointManager.get_buy_compute_start_data(code)
            redis.setex(_key, expire, json.dumps((_buy_index, nums, compute_index)))
            _buy_single_index, _buy_exec_index, _compute_index, _nums = TradePointManager.get_buy_compute_start_data(
                code)
            redis.setex(_key, expire, json.dumps((_buy_single_index, buy_exec_index, compute_index, nums)))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
@@ -161,12 +169,6 @@
            # 计算保留的时间
            expire = tool.get_expire()
            start_index = redis_instance.get("l2-maxindex-{}".format(code))
            if start_index is None:
                start_index = -1
            else:
                start_index = int(start_index)
            max_index = start_index
            i = 0
            for _data in datas:
                i += 1
@@ -174,20 +176,20 @@
                value = redis_instance.get(key)
                if value is None:
                    # 新增
                    max_index = start_index + i
                    value = {"index": start_index + i, "re": _data["re"]}
                    redis_instance.setex(key, expire, json.dumps(value))
                    try:
                        value = {"index": _data["index"], "re": _data["re"]}
                        redis_instance.setex(key, expire, json.dumps(value))
                    except:
                        logging.error("更正L2数据出错:{} key:{}".format(code, key))
                else:
                    json_value = json.loads(value)
                    if json_value["re"] != _data["re"]:
                        json_value["re"] = _data["re"]
                        redis_instance.setex(key, expire, json.dumps(json_value))
            redis_instance.setex("l2-maxindex-{}".format(code), expire, max_index)
    finally:
        redis_instance.delete("l2-save-{}".format(code))
    print("保存新数据用时:", msg, round(t.time() * 1000) - start_time)
    print("保存新数据用时:", msg, "耗时:{}".format(round(t.time() * 1000) - start_time))
    return datas
@@ -231,8 +233,8 @@
    @classmethod
    def is_same_time(cls, time1, time2):
        # TODO 测试
        # if 1 > 0:
        #     return True
        if global_util.TEST:
            return True
        time1_s = time1.split(":")
        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
        time2_s = time2.split(":")
@@ -251,10 +253,11 @@
        __latest_datas = local_latest_datas.get(code)
        if __latest_datas is not None and len(__latest_datas) > 0:
            last_key = __latest_datas[-1]["key"]
        count = 0
        start_index = -1
        # 如果原来没有数据
        # TODO 设置add_data的序号
        # 设置add_data的序号
        for n in reversed(datas):
            count += 1
            if n["key"] == last_key:
@@ -283,7 +286,8 @@
        save_list = []
        for data in _datas:
            for _ldata in latest_data:
                if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]:
                # 新数据条数比旧数据多才保存
                if _ldata["key"] == data["key"] and _ldata["re"] < data["re"]:
                    max_re = max(_ldata["re"], data["re"])
                    _ldata["re"] = max_re
                    data["re"] = max_re
@@ -291,6 +295,7 @@
                    save_list.append(_ldata)
        if len(save_list) > 0:
            saveL2Data(code, save_list, "保存纠正数据")
            local_latest_datas[code] = latest_data
        return _datas
    # 处理l2数据
@@ -330,12 +335,13 @@
        return datas
    @classmethod
    def get_time_as_second(time_str):
    def get_time_as_second(cls, time_str):
        ts = time_str.split(":")
        return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    # 是否是涨停价买
    def is_limit_up_price_buy(val):
    @classmethod
    def is_limit_up_price_buy(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
@@ -349,7 +355,8 @@
        return True
    # 是否涨停买撤
    def is_limit_up_price_buy_cancel(val):
    @classmethod
    def is_limit_up_price_buy_cancel(cls, val):
        if int(val["limitPrice"]) != 1:
            return False
@@ -371,14 +378,31 @@
# L2交易数据处理器
# 一些常见的概念:
# 买入信号位置(出现下单信号的第一条数据的位置):buy_single_index
# 买入执行位置(符合下单信号的最后一条数据):buy_exec_index
# 计算位置(当前计算的整个计算的位置):compute_index
#
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    random_key = {}
    @classmethod
    def debug(cls, code, content, *args):
        logger_l2_trade.debug(("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    def cancel_debug(cls, code, content, *args):
        logger_l2_trade_cancel.debug(
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp):
        cls.random_key[code] = random.randint(0, 100000)
        now_time_str = datetime.now().strftime("%H:%M:%S")
        __start_time = round(t.time() * 1000)
        try:
@@ -393,7 +417,7 @@
                datas = L2DataUtil.correct_data(code, datas)
                _start_index = 0
                if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"]
                    _start_index = local_today_datas[code][-1]["index"] + 1
                add_datas = L2DataUtil.get_add_data(code, datas, _start_index)
                if len(add_datas) > 0:
                    # 拼接数据
@@ -415,7 +439,7 @@
                        state = trade_manager.get_trade_state(code)
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已挂单
                            cls.__process_order(code, len(total_datas) - len(add_datas) - 3)
                            cls.__process_order(code, len(total_datas) - len(add_datas) - 3, capture_timestamp)
                        else:
                            # 未挂单
                            cls.__process_not_order(code, add_datas, capture_timestamp)
@@ -451,20 +475,30 @@
            start_index = 0
        # 获取之前是否有记录的撤买信号
        cancel_index, buy_num_for_cancel, computed_index = cls.__has_order_cancel_begin_pos(code)
        buy_index, buy_num = cls.__get_order_begin_pos(code)
        buy_single_index, buy_exec_index, buy_compute_index, buy_num = cls.__get_order_begin_pos(code)
        if cancel_index is None:
            # 无撤单信号起始点记录
            cancel_index = cls.__compute_order_cancel_begin_single(code, start_index, 3)
            buy_num_for_cancel = 0
            computed_index = buy_index
            cancel_index = cls.__compute_order_cancel_begin_single(code, max(start_index - 3, 0), 3)
            buy_num_for_cancel = buy_num
            computed_index = buy_single_index
            if cancel_index is not None:
                cls.debug(code, "找到撤单信号,数据处理起始点:{} 数据:{}", start_index, local_today_datas[code][start_index])
        if cancel_index is not None:
            # 获取阈值 有买撤信号,统计撤买纯买额
            threshold_money = cls.__get_threshmoney(code)
            cls.__start_compute_cancel(code, cancel_index, computed_index, buy_num_for_cancel, threshold_money,
            cls.__start_compute_cancel(code, cancel_index, max(computed_index, buy_exec_index + 1), buy_num_for_cancel,
                                       threshold_money,
                                       capture_time)
        else:
            # 无买撤信号,终止执行
            pass
            # 无买撤信号,是否有虚拟下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入")
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
                pass
    # 开始计算撤的信号
    @classmethod
@@ -474,30 +508,27 @@
                                                                                           origin_num, threshold_money)
        total_datas = local_today_datas[code]
        if computed_index is not None:
            cls.debug(code, "获取到撤单执行信号,信号位置:{},m2:{} 数据:{}", computed_index, threshold_money,
                      total_datas[computed_index])
            # 发出撤买信号,需要撤买
            if cls.unreal_buy_dict.get(code) is not None:
                # 有虚拟下单
                cls.debug(code, "之前有虚拟下单,执行虚拟撤买")
                # 删除虚拟下单标记
                cls.unreal_buy_dict.pop(code)
                # 删除下单标记位置
                TradePointManager.delete_buy_point(code)
            else:
                # 无虚拟下单,需要执行撤单
                logger_l2_trade.info(
                    "执行撤销:{} - {}".format(code, json.dumps(total_datas[computed_index])))
                try:
                    trade_manager.start_cancel_buy(code)
                    # 取消买入标识
                    TradePointManager.delete_buy_point(code)
                    TradePointManager.delete_buy_cancel_point(code)
                except Exception as e:
                    pass
                cls.debug(code, "之前无虚拟下单,执行真实撤单")
                cls.__cancel_buy(code)
            if computed_index < len(local_today_datas[code]) - 1:
                # 数据尚未处理完,重新进入下单计算流程
                cls.__start_compute_buy(code, computed_index + 1, 0, threshold_money, capture_time)
                cls.__start_compute_buy(code, computed_index + 1, threshold_money, capture_time)
                pass
        else:
            cls.debug(code, "未获取到撤单执行信号,计算开始位置:{}, 纯买额:{}", compute_start_index, buy_num_for_cancel)
            # 无需撤买,记录撤买信号
            TradePointManager.set_buy_cancel_compute_start_data(code, buy_num_for_cancel, len(total_datas) - 1,
                                                                cancel_index)
@@ -506,6 +537,7 @@
            if unreal_buy_info is not None:
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.debug(code, "无撤单执行信号,有虚拟下单,执行真实下单")
                cls.__buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]],
                          unreal_buy_info[0])
                pass
@@ -515,66 +547,85 @@
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        logger_l2_trade.info(
            "执行买入:{} ".format(code))
        cls.debug(code, "开始执行买入")
        try:
            trade_manager.start_buy(code, capture_timestamp, last_data,
                                    last_data_index)
            TradePointManager.delete_buy_cancel_point(code)
            cls.debug(code, "执行买入成功")
        except Exception as e:
            cls.debug(code, "执行买入异常:{}", str(e))
            pass
    @classmethod
    def __cancel_buy(cls, code):
        try:
            cls.debug(code, "开始执行撤单")
            trade_manager.start_cancel_buy(code)
            # 取消买入标识
            TradePointManager.delete_buy_point(code)
            TradePointManager.delete_buy_cancel_point(code)
            cls.debug(code, "执行撤单成功")
        except Exception as e:
            cls.debug(code, "执行撤单异常:{}", str(e))
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, threshold_money, capture_time):
        total_datas = local_today_datas[code]
        # 获取买入信号计算起始位置
        index, num = cls.__get_order_begin_pos(code)
        # index, num, finish_index = cls.__get_order_begin_pos(code)
        buy_single_index, buy_exec_index, buy_compute_index, num = cls.__get_order_begin_pos(code)
        # 是否为新获取到的位置
        new_get_pos = False
        if index is None:
        if buy_single_index is None:
            # 有买入信号
            has_single, _index = cls.__compute_order_begin_pos(code, len(total_datas) - compute_start_index, 3)
            index = _index
            has_single, _index = cls.__compute_order_begin_pos(code, max(compute_start_index - 3, 0), 3)
            buy_single_index = _index
            if has_single:
                num = 0
                new_get_pos = True
                cls.debug(code, "获取到买入信号起始点:{}  数据:{}", buy_single_index, total_datas[buy_single_index])
        if index is None:
        if buy_single_index is None:
            # 未获取到买入信号,终止程序
            return None
        # 买入纯买额统计
        compute_index, buy_nums = cls.sum_buy_num_for_order(code, compute_start_index, num, threshold_money)
        compute_index, buy_nums = cls.__sum_buy_num_for_order(code, max(buy_single_index, compute_start_index), num,
                                                              threshold_money)
        if compute_index is not None:
            cls.debug(code, "获取到买入执行位置:{} m值:{} 数据:{}", compute_index, threshold_money, total_datas[compute_index])
            # 记录买入信号位置
            cls.__save_order_begin_data(code, compute_index, buy_nums, index)
            cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums)
            # 虚拟下单
            cls.unreal_buy_dict[code] = (compute_index, capture_time)
            # 删除之前的所有撤单信号
            TradePointManager.delete_buy_cancel_point(code)
            # 数据是否处理完毕
            if L2DataUtil.is_index_end(code, compute_index):
                cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
                # 数据已经处理完毕,下单
                cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
            else:
                # 数据尚未处理完毕,进行下一步处理
                cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                cls.__process_order(code, compute_index + 1, capture_time)
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
            cls.__save_order_begin_data(code, len(total_datas) - 1, buy_nums, index)
            cls.__save_order_begin_data(code, buy_single_index, -1, len(total_datas) - 1, buy_nums)
        pass
    # 获取下单起始信号
    @classmethod
    def __get_order_begin_pos(cls, code):
        index, num, compute_index = TradePointManager.get_buy_compute_start_data(code)
        return index, num
        buy_single_index, buy_exec_index, compute_index, num = TradePointManager.get_buy_compute_start_data(code)
        return buy_single_index, buy_exec_index, compute_index, num
    @classmethod
    def __save_order_begin_data(self, code, compute_index, num, buy_index=None):
        TradePointManager.set_buy_compute_start_data(code, num, compute_index, buy_index)
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num):
        TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num)
    # 获取撤单起始位置
    @classmethod
@@ -587,18 +638,13 @@
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
    def __compute_order_begin_pos(self, code, compute_data_count, continue_count):
    @classmethod
    def __compute_order_begin_pos(cls, code, start_index, continue_count):
        # 倒数100条数据查询
        datas = local_today_datas[code]
        __len = len(datas)
        if __len < continue_count:
            return None
        start_index = 0
        if compute_data_count > __len:
            compute_data_count = __len
        if __len > compute_data_count:
            start_index = __len - compute_data_count
        if len(datas) - start_index < continue_count:
            return False, None
        __time = None
        _limit_up_count_1s = 0
        _limit_up_count_1s_start_index = -1
@@ -634,7 +680,7 @@
                #             index_3 = j
                if index_1 - index_0 == 1 and index_2 - index_1 == 1:  # and index_3 - index_2 == 1
                    logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i]))
                    return i
                    return True, i
            # 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
@@ -656,9 +702,9 @@
            if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1:
                logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i]))
                return _limit_up_count_1s_start_index
                return True, _limit_up_count_1s_start_index
        return None
        return False, None
    # 是否有撤销信号
    @classmethod
@@ -701,8 +747,9 @@
    def __unreal_order(self):
        pass
    @classmethod
    def __get_threshmoney(cls, code):
        l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        return l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
    # 获取预估挂买位
    @classmethod
@@ -760,6 +807,7 @@
        same_time_property = cls.__get_same_time_property(code)
        # 同一秒,在预估买入位之后的数据之和
        property_buy_num_count = 0
        cls.cancel_debug(code, "撤单纯买额计算位置:{}-{} 预估挂买位:{}", start_index, len(total_datas) - 1, sure_pos)
        for i in range(start_index, len(total_datas)):
            data = total_datas[i]
            _val = data["val"]
@@ -774,22 +822,40 @@
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停撤买
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, total_datas)
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index < sure_pos:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                    elif sure_data["val"]["time"] == _val["time"]:
                        # 同一秒,而且还在预估买入位之后按概率计算
                        property_buy_num_count -= int(_val["num"]) * int(data["re"])
                        cls.cancel_debug(code, "{}数据在预估买入位之前 撤买纯买额:{}", i, buy_nums * limit_up_price)
                    else:
                        cls.cancel_debug(code, "{}数据在预估买入位之后,买入位:{}", i, buy_index)
                        if sure_data["val"]["time"] == buy_data["val"]["time"]:
                            # 同一秒,而且还在预估买入位之后按概率计算
                            property_buy_num_count -= int(_val["num"]) * int(data["re"])
                            cls.debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # TODO 未找到买撤数据的买入点
                    pass
                    # 未找到买撤数据的买入点
                    cls.cancel_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
            property_buy_num = round(property_buy_num_count * same_time_property)
            cls.cancel_debug(code, "预估买入点之后同一秒买入手数-{},位置-{},总手数:{}", property_buy_num, i, buy_nums + property_buy_num)
            if buy_nums + property_buy_num <= threshold_num:
                return i, buy_nums + property_buy_num, sure_type
        return None, buy_nums + round(property_buy_num_count * same_time_property), sure_type
    @classmethod
    def test(cls):
        code = "002336"
        cls.random_key[code] = random.randint(0, 100000)
        load_l2_data(code)
        try:
            # cls.__sum_buy_num_for_cancel_order(code, 112, 100000, 10000000)
            has_single, _index = cls.__compute_order_begin_pos(code, max(9, 0), 3)
            print(has_single, _index)
        except Exception as e:
            logging.exception(e)
def __get_time_second(time_str):
@@ -1037,25 +1103,4 @@
if __name__ == "__main__":
    code = "000868"
    local_today_datas.setdefault(code, [])
    path = "C:/Users/Administrator/Desktop/demo/000868/"
    for file_name in os.listdir(path):
        p = "{}{}".format(path, file_name)
        f = open(p)
        for line in f.readlines():  # 依次读取每行
            line = line.strip()
            data = json.loads(line)
            result = L2DataUtil.format_l2_data(data, code, 10.00)
            add_datas = L2DataUtil.get_add_data(code, result)
            print("增加的数量:", len(add_datas))
            if len(add_datas) > 0:
                # 拼接数据
                local_today_datas[code].extend(add_datas)
            if code in local_latest_datas:
                local_latest_datas[code] = result
            else:
                local_latest_datas.setdefault(code, result)
        f.close()
    for d in local_today_datas[code]:
        print(d["val"]["time"], d["val"]["num"], d["val"]["operateType"], d["re"])
    L2TradeDataProcessor.test()