Administrator
2023-02-08 3ec79004bd769828c8dc18ed35280f81cfb473ff
交易结果整理
1 文件已重命名
10个文件已修改
1个文件已添加
1200 ■■■■■ 已修改文件
constant.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 104 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_log.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 469 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_util.py 27 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/safe_count_manager.py 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/transaction_progress.py 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 167 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 262 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_result_manager.py 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -1,5 +1,5 @@
# 是否为测试
TEST = False
TEST = True
# 是否允许交易
TRADE_ENABLE = False
# 水下捞累计连续水下时间最小值
@@ -19,6 +19,8 @@
H_CANCEL_FIRST_RATE = 0.79
H_CANCEL_SECOND_RATE = 0.69
H_CANCEL_THIRD_RATE = 0.59
H_CANCEL_MIN_MONEY = 10000000
H_CANCEL_MIN_COUNT=8
# h撤大单笔数
H_CANCEL_BUY_COUNT = 40
data_export_util.py
@@ -193,6 +193,6 @@
if __name__ == "__main__":
    codes = ["603660"]
    codes = ["002792"]
    for code in codes:
        export_l2_excel(code)
l2/cancel_buy_strategy.py
@@ -12,12 +12,12 @@
import big_money_num_manager
import constant
import gpcode_manager
import l2_data_log
import l2_data_util
from db import redis_manager
import tool
from l2.transaction_progress import TradeBuyQueue
from trade import trade_data_manager, trade_queue_manager, l2_trade_factor
from l2 import l2_log
from l2 import l2_log, l2_data_log
from l2.l2_data_util import L2DataUtil, local_today_num_operate_map, local_today_datas
from log import logger_buy_1_volumn
@@ -60,9 +60,15 @@
    # 计算净大单
    @classmethod
    def __compute_left_big_num(cls, code, start_index, end_index, total_data):
    def __compute_left_big_num(cls, code, buy_single_index, start_index, end_index, total_data, place_order_count):
        # 获取大单的最小手数
        left_big_num = 0
        # 点火大单数量
        fire_count = 5
        if place_order_count <= 4:
            fire_count = 6 - place_order_count
        else:
            fire_count = 2
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
@@ -71,14 +77,21 @@
                continue
            if L2DataUtil.is_limit_up_price_buy(val):
                left_big_num += val["num"] * data["re"]
                if i - buy_single_index < fire_count:
                    # 点火大单不算
                    left_big_num += 0
                else:
                    left_big_num += val["num"] * data["re"]
            elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 查询买入位置
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(
                                                                                     code))
                if buy_index is not None and start_index <= buy_index <= end_index:
                    left_big_num -= val["num"] * data["re"]
                    if buy_index - buy_single_index < fire_count:
                        left_big_num -= 0
                    else:
                        left_big_num -= val["num"] * data["re"]
                elif buy_index is None:
                    # 有部分撤销从而导致的无法溯源,这时就需要判断预估买入时间是否在a_start_index到a_end_index的时间区间
                    min_space, max_space = l2_data_util.compute_time_space_as_second(val["cancelTime"],
@@ -112,10 +125,14 @@
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        process_index = -1
        process_index = process_index_old
        # 下单次数
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        if buy_single_index == start_index:
            # 第1次计算需要计算买入信号-执行位的净值
            left_big_num = cls.__compute_left_big_num(code, buy_single_index, buy_exec_index, total_data)
            left_big_num = cls.__compute_left_big_num(code, buy_single_index, buy_single_index, buy_exec_index,
                                                      total_data, place_order_count)
            buy_num += left_big_num
            # 设置买入信号-买入执行位的数据不需要处理
            start_index = end_index + 1
@@ -139,13 +156,16 @@
            for i in range(start_index, end_index + 1):
                data = total_data[i]
                val = data["val"]
                process_index = i
                if process_index_old >= i:
                    # 已经处理过的数据不需要处理
                    continue
                if not l2_data_util.is_big_money(val):
                    continue
                process_index = i
                if L2DataUtil.is_limit_up_price_buy_cancel(val):
                if L2DataUtil.is_limit_up_price_buy(val):
                    buy_num += data["re"] * int(val["num"])
                elif L2DataUtil.is_limit_up_price_buy_cancel(val):
                    # 查询买入位置
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
@@ -161,56 +181,42 @@
                            buy_time = tool.trade_time_add_second(val["time"], 0 - min_space)
                            if int(total_data[buy_single_index]["val"]["time"].replace(":", "")) <= int(
                                    buy_time.replace(":", "")):
                                cancel_num += buy_data["re"] * int(buy_data["val"]["num"])
                                cancel_num += data["re"] * int(val["num"])
                    # 保存数据
                    if need_cancel:
                        cancel_rate_threshold = constant.S_CANCEL_FIRST_RATE
                        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
                        if place_order_count <= 1:
                            cancel_rate_threshold = constant.S_CANCEL_FIRST_RATE
                        elif place_order_count <= 2:
                            cancel_rate_threshold = constant.S_CANCEL_SECOND_RATE
                        else:
                            cancel_rate_threshold = constant.S_CANCEL_THIRD_RATE
                        if cancel_num / buy_num > cancel_rate_threshold:
                        if cancel_num / max(buy_num, 1) > cancel_rate_threshold:
                            return True, total_data[i]
        finally:
            l2_log.cancel_debug(threadId, code, "S级大单 范围:{}-{} 取消计算结果:{}/{}", start_index, end_index, cancel_num,
                                buy_num)
            l2_log.cancel_debug(threadId, code, "S级大单 范围:{}-{} 取消计算结果:{}/{},比例:{}", start_index, end_index, cancel_num,
                                buy_num, round(cancel_num / buy_num, 2))
            # 保存处理进度与数据
            cls.__save_compute_data(code, process_index, buy_num, cancel_num)
        return False, None
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index):
    def cancel_success(cls, code):
        cls.__clear_data(code)
# --------------------------------H撤-------------------------------
class HourCancelBigNumComputer:
    __redis_manager = redis_manager.RedisManager(0)
    __tradeBuyQueue = TradeBuyQueue()
    @classmethod
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    # 保存成交进度
    @classmethod
    def __save_trade_progress(cls, code, index):
        key = f"trade_progress_index-{code}"
        cls.__getRedis().setex(key, tool.get_expire(), index)
    # 保存成交进度
    @classmethod
    def __get_trade_progress(cls, code):
        key = f"trade_progress_index-{code}"
        val = cls.__getRedis().get(key)
        if val is None:
            return None
        return int(val)
    @classmethod
    def __save_watch_index_set(cls, code, datas):
@@ -320,30 +326,33 @@
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index, total_data, local_today_num_operate_map):
        cls.__clear_data(code)
        cls.set_trade_progress(code, buy_exec_index, total_data, local_today_num_operate_map)
        cls.set_trade_progress(code, buy_exec_index, total_data, local_today_num_operate_map, True)
    # 设置成交进度
    @classmethod
    def set_trade_progress(cls, code, index, total_data, local_today_num_operate_map):
    def set_trade_progress(cls, code, index, total_data, local_today_num_operate_map, is_default=False):
        l2_log.cancel_debug(0, code, "成交进度:{}", index)
        last_index, is_default = cls.__tradeBuyQueue.get_traded_index(code)
        # 成交进度
        cls.__save_trade_progress(code, index)
        cls.compute_watch_end_index(code, total_data, local_today_num_operate_map)
        if is_default:
            cls.__tradeBuyQueue.set_default_traded_index(code, index)
        if last_index is None or last_index != index:
            cls.compute_watch_end_index(code, total_data, local_today_num_operate_map)
    @classmethod
    def compute_watch_end_index(cls, code, total_data, local_today_num_operate_map):
        trade_progress_index = cls.__get_trade_progress(code)
        threshold_money = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code) * 2
        # 最小值1500万
        if threshold_money < 15000000:
            threshold_money = 15000000
        trade_progress_index, is_default = cls.__tradeBuyQueue.get_traded_index(code)
        threshold_money, msg = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
        if threshold_money < constant.H_CANCEL_MIN_MONEY:
            threshold_money = constant.H_CANCEL_MIN_MONEY
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        threshold_num = round(threshold_money / (limit_up_price * 100))
        if trade_progress_index is None:
            raise Exception("尚未获取到成交进度")
        total_num = 0
        watch_set = set()
        for i in range(trade_progress_index + 1, total_data[-1]["index"] + 1):
        total_count = 0
        for i in range(trade_progress_index, total_data[-1]["index"] + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
@@ -360,15 +369,20 @@
                            # 已经买撤
                            total_num -= buy_data["val"]["num"] * cancel_data["re"]
                            canceled = True
                            if data["re"] - cancel_data["re"] > 0:
                                watch_set.add((i, data["re"] - cancel_data["re"]))
                            count = data["re"] - cancel_data["re"]
                            if count > 0:
                                total_count += count
                                watch_set.add((i, count))
                            break
                if not canceled:
                    watch_set.add((i, data["re"]))
                    count = data["re"]
                    total_count += count
                    watch_set.add((i, count))
                # 判断是否达到阈值
                if total_num >= threshold_num:
                    l2_log.cancel_debug(0, code, "获取到H撤监听数据:{}", json.dumps(watch_set))
                if total_num >= threshold_num and total_count >= constant.H_CANCEL_MIN_COUNT:
                    # 最小8笔
                    l2_log.cancel_debug(0, code, "获取到H撤监听数据:{}", json.dumps(list(watch_set)))
                    break
        # 保存计算范围
        cls.__save_watch_index_set(code, watch_set)
l2/l2_data_log.py
File was renamed from l2_data_log.py
@@ -4,10 +4,10 @@
import log
def l2_time(code, do_id, time_, description, new_line=False,force=False):
def l2_time(code, do_id, time_, description, new_line=False, force=False):
    timestamp = int(time.time() * 1000)
    # 只记录耗时较长的信息
    if time_ > 50 or force:
    if time_ > 1 or force:
        log.logger_l2_process_time.info("{}-{} {}: {}-{}{}", do_id, timestamp, description, code, time_,
                                        "\n" if new_line else "")
    return timestamp
l2/l2_data_manager_new.py
@@ -8,23 +8,26 @@
import global_util
import gpcode_manager
import industry_codes_sort
import l2_data_log
import l2_data_util
import l2_trade_test
import limit_up_time_manager
from db import redis_manager
import ths_industry_util
import tool
from trade import trade_data_manager, trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util
from l2 import safe_count_manager, l2_data_manager, l2_data_util
from trade import trade_data_manager, trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \
    trade_result_manager
from l2 import safe_count_manager, l2_data_manager, l2_data_log
from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil, \
    L2LimitUpSellStatisticUtil
from l2.l2_data_manager import L2DataException, TradePointManager
from l2.l2_data_util import local_today_datas, L2DataUtil, load_l2_data, local_today_num_operate_map, local_latest_datas
import l2.l2_data_util
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_l2_error
# TODO l2数据管理
from trade.trade_data_manager import CodeActualPriceProcessor
import dask
class L2DataManager:
@@ -185,21 +188,22 @@
                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                          "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
                # 加载历史数据
                l2_data_util.load_l2_data(code)
                l2.l2_data_util.load_l2_data(code)
                # 纠正数据
                datas = l2_data_util.L2DataUtil.correct_data(code,local_latest_datas.get(code), datas)
                datas = l2.l2_data_util.L2DataUtil.correct_data(code, local_latest_datas.get(code), datas)
                _start_index = 0
                if local_today_datas.get(code) is not None and len(
                        local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"] + 1
                add_datas = l2_data_util.L2DataUtil.get_add_data(code,local_latest_datas.get(code), datas, _start_index)
                add_datas = l2.l2_data_util.L2DataUtil.get_add_data(code, local_latest_datas.get(code), datas,
                                                                    _start_index)
                # -------------数据增量处理------------
                try:
                    cls.process_add_datas(code, add_datas, capture_timestamp, __start_time)
                finally:
                    # 保存数据
                    __start_time = round(t.time() * 1000)
                    l2_data_util.save_l2_data(code, datas, add_datas, cls.random_key[code])
                    l2.l2_data_util.save_l2_data(code, datas, add_datas, cls.random_key[code])
                    __start_time = l2_data_log.l2_time(code, cls.random_key[code],
                                                       round(t.time() * 1000) - __start_time,
                                                       "保存数据时间({})".format(len(add_datas)))
@@ -211,9 +215,10 @@
    def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time):
        now_time_str = tool.get_now_time_str()
        if len(add_datas) > 0:
            print(id(local_today_datas))
            # 拼接数据
            local_today_datas[code].extend(add_datas)
            l2_data_util.load_num_operate_map(l2_data_util.local_today_num_operate_map, code, add_datas)
            l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, add_datas)
            # 第1条数据是否为09:30:00
            if add_datas[0]["val"]["time"] == "09:30:00":
@@ -232,7 +237,9 @@
        if len(add_datas) > 0:
            latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
            # 时间差不能太大才能处理
            if l2_data_util.L2DataUtil.is_same_time(now_time_str, latest_time) and not l2_trade_util.is_in_forbidden_trade_codes(code):
            if l2.l2_data_util.L2DataUtil.is_same_time(now_time_str,
                                                       latest_time) and not l2_trade_util.is_in_forbidden_trade_codes(
                code):
                # 判断是否已经挂单
                state = trade_manager.get_trade_state(code)
                start_index = len(total_datas) - len(add_datas)
@@ -259,6 +266,7 @@
        if round(t.time() * 1000) - __start_time > 10:
            __start_time = l2_data_log.l2_time(code, cls.random_key.get(code), round(t.time() * 1000) - __start_time,
                                               "获取m值数据耗时")
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    # 测试专用
@@ -269,6 +277,115 @@
    # 处理已挂单
    @classmethod
    def __process_order(cls, code, start_index, end_index, capture_time, new_add=True):
        # 计算安全笔数
        @dask.delayed
        def compute_safe_count():
            _start_time = round(t.time() * 1000)
            # 处理安全笔数
            cls.__buyL2SafeCountManager.compute_left_rate(code, start_index, end_index, total_data,
                                                          local_today_num_operate_map.get(code))
            l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                "已下单-获取买入信息耗时")
            return None, ""
        @dask.delayed
        # m值大单计算
        def compute_m_big_num():
            _start_time = round(t.time() * 1000)
            # 计算m值大单
            cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
                                              gpcode_manager.get_limit_up_price(code))
            l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                "已下单-m值大单计算")
            return None, ""
        # 买1撤计算
        @dask.delayed
        def buy_1_cancel():
            _start_time = round(t.time() * 1000)
            # 撤单计算,只看买1
            cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, start_index,
                                                                               end_index,
                                                                               buy_single_index, buy_exec_index)
            l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                "已下单-买1统计耗时")
            return cancel_data, cancel_msg
        # S撤
        @dask.delayed
        def s_cancel():
            _start_time = round(t.time() * 1000)
            # S撤单计算,看秒级大单撤单
            try:
                b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                      buy_exec_index, start_index,
                                                                                      end_index, total_data,
                                                                                      cls.random_key[code])
                if b_need_cancel:
                    return b_cancel_data, "S大单撤销比例触发阈值"
            except Exception as e:
                logging.exception(e)
            finally:
                l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                    "已下单-s级大单估算")
            return None, ""
        # H撤
        @dask.delayed
        def h_cancel():
            _start_time = round(t.time() * 1000)
            try:
                b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_exec_index, start_index,
                                                                                    end_index, total_data,
                                                                                    cls.random_key[code])
                if b_need_cancel and not cancel_data:
                    return b_cancel_data, "H撤销比例触发阈值"
            except Exception as e:
                logging.exception(e)
            finally:
                l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "已下单-H撤大单计算")
            return None, ""
        # 板上卖撤
        @dask.delayed
        def sell_cancel():
            _start_time = round(t.time() * 1000)
            # 统计板上卖
            try:
                cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(cls.random_key[code], code, start_index,
                                                                             end_index,
                                                                             buy_exec_index)
                return cancel_data, cancel_msg
            except Exception as e:
                logging.exception(e)
            finally:
                l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time, "已下单-板上卖耗时")
            return None, ""
        # 是否需要撤销
        @dask.delayed
        def is_need_cancel(*args):
            f_cancel_data, f_cancel_msg = None, ""
            try:
                for i in range(0, len(args)):
                    _cancel_data, _cancel_msg = args[i]
                    if _cancel_data:
                        if not f_cancel_data:
                            f_cancel_data, f_cancel_msg = _cancel_data, _cancel_msg
                        else:
                            if _cancel_data["index"] < f_cancel_data["index"]:
                                # 取较早的撤销数据
                                f_cancel_data, f_cancel_msg = _cancel_data, _cancel_msg
            except Exception as e:
                logging.exception(e)
            finally:
                pass
            return f_cancel_data, f_cancel_msg
        if start_index < 0:
            start_index = 0
@@ -276,69 +393,17 @@
            return
        total_data = local_today_datas.get(code)
        _start_time = round(t.time() * 1000)
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
            code)
        # 处理安全笔数
        cls.__buyL2SafeCountManager.compute_left_rate(code, start_index, end_index, total_data,
                                                      local_today_num_operate_map.get(code))
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(code)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-获取买入信息耗时")
        # 撤单计算,只看买1
        cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(cls.random_key[code], code, start_index,
                                                                           end_index,
                                                                           buy_single_index, buy_exec_index)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-买1统计耗时")
        # S撤单计算,看秒级大单撤单
        try:
            b_need_cancel, b_cancel_data = SecondCancelBigNumComputer.need_cancel(code, buy_single_index,
                                                                                  buy_exec_index, start_index,
                                                                                  end_index, total_data,
                                                                                  cls.random_key[code])
            if b_need_cancel and not cancel_data:
                cancel_data = b_cancel_data
                cancel_msg = "S大单撤销比例触发阈值"
        except Exception as e:
            logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-s级大单估算")
        # H撤
        try:
            b_need_cancel, b_cancel_data = HourCancelBigNumComputer.need_cancel(code, buy_exec_index, start_index,
                                                                                end_index, total_data,
                                                                                cls.random_key[code])
            if b_need_cancel and not cancel_data:
                cancel_data = b_cancel_data
                cancel_msg = "H撤销比例触发阈值"
        except Exception as e:
            logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-H撤大单计算")
        if not cancel_data:
            # 统计板上卖
            try:
                cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(cls.random_key[code], code, start_index,
                                                                             end_index,
                                                                             buy_exec_index)
            except Exception as e:
                logging.exception(e)
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-板上卖耗时")
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
                                          gpcode_manager.get_limit_up_price(code))
        _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                          "已下单-m值大单计算")
        f1 = compute_safe_count()
        f2 = compute_m_big_num()
        f3 = buy_1_cancel()
        f4 = s_cancel()
        f5 = h_cancel()
        f6 = sell_cancel()
        dask_result = is_need_cancel(f1, f2, f3, f4, f5, f6)
        cancel_data, cancel_msg = dask_result.compute()
        if cancel_data:
            cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
@@ -383,24 +448,8 @@
                cls.debug(code, "开始执行买入")
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
                trade_data_manager.placeordercountmanager.place_order(code)
                # 下单成功,需要删除最大买1
                cls.__thsBuy1VolumnManager.clear_max_buy1_volume(code)
                # 获取买入位置信息
                try:
                    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
                        code)
                    cls.__buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, None)
                    SecondCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    HourCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index,
                                                                 local_today_datas.get(code),
                                                                 local_today_num_operate_map.get(code))
                except Exception as e:
                    logging.exception(e)
                    logger_l2_error.exception(e)
                l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
                ################下单成功处理################
                trade_result_manager.real_buy_success(code)
                cls.debug(code, "执行买入成功")
            except Exception as e:
                cls.debug(code, "执行买入异常:{}", str(e))
@@ -442,122 +491,123 @@
    # 是否可以买
    @classmethod
    def __can_buy(cls, code):
        # 买1价格必须为涨停价才能买
        # buy1_price = cls.buy1PriceManager.get_price(code)
        # if buy1_price is None:
        #     return False, "买1价尚未获取到"
        # limit_up_price = gpcode_manager.get_limit_up_price(code)
        # if limit_up_price is None:
        #     return False, "尚未获取到涨停价"
        # if abs(float(buy1_price) - float(limit_up_price)) >= 0.01:
        #     return False, "买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price)
        # 从买入信号起始点到当前数据末尾的纯买手数与当前的卖1做比较,如果比卖1小则不能买入
        total_datas = local_today_datas[code]
        __start_time = t.time()
        try:
            sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code)
            cls.buy_debug(code, "卖1信息为:({},{},{})", sell1_time, sell1_price, sell1_volumn)
            if sell1_time is not None and sell1_volumn > 0:
                # 获取执行位信息
            # 买1价格必须为涨停价才能买
            # buy1_price = cls.buy1PriceManager.get_price(code)
            # if buy1_price is None:
            #     return False, "买1价尚未获取到"
            # limit_up_price = gpcode_manager.get_limit_up_price(code)
            # if limit_up_price is None:
            #     return False, "尚未获取到涨停价"
            # if abs(float(buy1_price) - float(limit_up_price)) >= 0.01:
            #     return False, "买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price)
            # 从买入信号起始点到当前数据末尾的纯买手数与当前的卖1做比较,如果比卖1小则不能买入
            total_datas = local_today_datas[code]
            try:
                sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code)
                cls.buy_debug(code, "卖1信息为:({},{},{})", sell1_time, sell1_price, sell1_volumn)
                if sell1_time is not None and sell1_volumn > 0:
                    # 获取执行位信息
                buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
                    code)
                buy_nums = num
                for i in range(buy_exec_index + 1, total_datas[-1]["index"] + 1):
                    _val = total_datas[i]["val"]
                    # 涨停买
                    if L2DataUtil.is_limit_up_price_buy(_val):
                    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
                        code)
                    buy_nums = num
                    for i in range(buy_exec_index + 1, total_datas[-1]["index"] + 1):
                        _val = total_datas[i]["val"]
                        # 涨停买
                        buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                    elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                        buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums < sell1_volumn * 0.49:
                    return False, "纯买量({})小于卖1量的49%{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time)
        except Exception as e:
            logging.exception(e)
                        if L2DataUtil.is_limit_up_price_buy(_val):
                            # 涨停买
                            buy_nums += _val["num"] * total_datas[i]["re"]
                        elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                            buy_nums -= _val["num"] * total_datas[i]["re"]
                    if buy_nums < sell1_volumn * 0.49:
                        return False, "纯买量({})小于卖1量的49%{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time)
            except Exception as e:
                logging.exception(e)
        # 量比超过1.3的不能买
        volumn_rate = l2_trade_factor.L2TradeFactorUtil.get_volumn_rate_by_code(code)
        if volumn_rate >= 1.3:
            return False, "最大量比超过1.3不能买"
            # 量比超过1.3的不能买
            volumn_rate = l2_trade_factor.L2TradeFactorUtil.get_volumn_rate_by_code(code)
            if volumn_rate >= 1.3:
                return False, "最大量比超过1.3不能买"
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2_data_util.L2DataUtil.get_time_as_second(
                limit_up_time) >= l2_data_util.L2DataUtil.get_time_as_second(
            "14:30:00"):
            return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
            limit_up_time = limit_up_time_manager.get_limit_up_time(code)
            if limit_up_time is not None and l2.l2_data_util.L2DataUtil.get_time_as_second(
                    limit_up_time) >= l2.l2_data_util.L2DataUtil.get_time_as_second(
                "14:30:00"):
                return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
        # 同一板块中老二后面的不能买
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
            # 同一板块中老二后面的不能买
            industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
            if industry is None:
                return True, "没有获取到行业"
        codes_index = industry_codes_sort.sort_codes(codes, code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            # 当老大老二当前没涨停
            return False, "同一板块中老三,老四,...不能买"
            codes_index = industry_codes_sort.sort_codes(codes, code)
            if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
                # 当老大老二当前没涨停
                return False, "同一板块中老三,老四,...不能买"
        if cls.__codeActualPriceProcessor.is_under_water(code, total_datas[-1]["val"]["time"]):
            # 水下捞且板块中的票小于16不能买
            # if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get(
            #         industry) <= 16:
            #     return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry))
            # 水下捞自由流通市值大于老大的不要买
            if codes_index.get(code) != 0:
                # 获取老大的市值
                for c in codes_index:
                    if codes_index.get(c) == 0 and global_util.zyltgb_map.get(code) > global_util.zyltgb_map.get(c):
                        return False, "水下捞,不是老大,且自由流通市值大于老大"
            if cls.__codeActualPriceProcessor.is_under_water(code, total_datas[-1]["val"]["time"]):
                # 水下捞且板块中的票小于16不能买
                # if global_util.industry_hot_num.get(industry) is not None and global_util.industry_hot_num.get(
                #         industry) <= 16:
                #     return False, "水下捞,板块中的票小于2只,为{}".format(global_util.industry_hot_num.get(industry))
                # 水下捞自由流通市值大于老大的不要买
                if codes_index.get(code) != 0:
                    # 获取老大的市值
                    for c in codes_index:
                        if codes_index.get(c) == 0 and global_util.zyltgb_map.get(code) > global_util.zyltgb_map.get(c):
                            return False, "水下捞,不是老大,且自由流通市值大于老大"
        # 13:30后涨停,本板块中涨停票数<29不能买
        # if limit_up_time is not None:
        #     if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None:
        #         if global_util.industry_hot_num.get(industry) < 16:
        #             return False, "13:30后涨停,本板块中涨停票数<16不能买"
            # 13:30后涨停,本板块中涨停票数<29不能买
            # if limit_up_time is not None:
            #     if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None:
            #         if global_util.industry_hot_num.get(industry) < 16:
            #             return False, "13:30后涨停,本板块中涨停票数<16不能买"
        if codes_index.get(code) is not None and codes_index.get(code) == 1:
            # 如果老大已经买成功了, 老二就不需要买了
            first_codes = []
            for key in codes_index:
                if codes_index.get(key) == 0:
                    first_codes.append(key)
            # 暂时注释掉
            # for key in first_codes:
            #     state = trade_manager.get_trade_state(key)
            #     if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
            #         # 老大已经买成功了
            #         return False, "老大{}已经买成功,老二无需购买".format(key)
            #
            # # 有9点半涨停的老大才能买老二,不然不能买
            # # 获取老大的涨停时间
            # for key in first_codes:
            #     # 找到了老大
            #     time_ = limit_up_time_manager.get_limit_up_time(key)
            #     if time_ == "09:30:00":
            #         return True, "9:30涨停的老大,老二可以下单"
            # return False, "老大非9:30涨停,老二不能下单"
            if codes_index.get(code) is not None and codes_index.get(code) == 1:
                # 如果老大已经买成功了, 老二就不需要买了
                first_codes = []
                for key in codes_index:
                    if codes_index.get(key) == 0:
                        first_codes.append(key)
                # 暂时注释掉
                # for key in first_codes:
                #     state = trade_manager.get_trade_state(key)
                #     if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                #         # 老大已经买成功了
                #         return False, "老大{}已经买成功,老二无需购买".format(key)
                #
                # # 有9点半涨停的老大才能买老二,不然不能买
                # # 获取老大的涨停时间
                # for key in first_codes:
                #     # 找到了老大
                #     time_ = limit_up_time_manager.get_limit_up_time(key)
                #     if time_ == "09:30:00":
                #         return True, "9:30涨停的老大,老二可以下单"
                # return False, "老大非9:30涨停,老二不能下单"
        # 过时  老二,本板块中涨停票数<29 不能买
        # if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
        #         industry) is not None:
        #     if global_util.industry_hot_num.get(industry) < 29:
        #         return False, "老二,本板块中涨停票数<29不能买"
        # 可以下单
        return True, None
            # 过时  老二,本板块中涨停票数<29 不能买
            # if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
            #         industry) is not None:
            #     if global_util.industry_hot_num.get(industry) < 29:
            #         return False, "老二,本板块中涨停票数<29不能买"
            # 可以下单
            return True, None
        finally:
            l2_data_log.l2_time(code, cls.random_key[code], round((t.time() - __start_time) * 1000), "是否可以下单计算")
    @classmethod
    def __cancel_buy(cls, code):
        try:
            cls.debug(code, "开始执行撤单")
            trade_manager.start_cancel_buy(code)
            # 取消买入标识
            l2_data_manager.TradePointManager.delete_buy_point(code)
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
            l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
            cls.debug(code, "执行撤单成功")
            return True
        except Exception as e:
            logging.exception(e)
            cls.debug(code, "执行撤单异常:{}", str(e))
            return False
    @classmethod
    def cancel_buy(cls, code, msg=None, source="l2"):
@@ -575,10 +625,7 @@
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
            # 取消买入标识
            l2_data_manager.TradePointManager.delete_buy_point(code)
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
            l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
            l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
            trade_result_manager.virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas)
        else:
            can_cancel, reason = cls.__can_cancel(code)
            if not can_cancel:
@@ -586,21 +633,17 @@
                cls.cancel_debug(code, "撤单中断,原因:{}", reason)
                cls.debug(code, "撤单中断,原因:{}", reason)
                return False
            cls.__cancel_buy(code)
            # 撤单成功
            cls.__buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index,
                                                              total_datas[-1]["index"])
        cls.debug(code, "执行撤单成功,原因:{}", msg)
            cancel_result = cls.__cancel_buy(code)
            if cancel_result:
                trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index, total_datas)
        cls.debug(code, "执行撤单结束,原因:{}", msg)
        return True
    # 虚拟下单
    @classmethod
    def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time):
        cls.unreal_buy_dict[code] = (buy_exec_index, capture_time)
        SecondCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
        # 删除之前的板上卖信息
        L2LimitUpSellStatisticUtil.delete(code)
        trade_result_manager.virtual_buy_success(code)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
@@ -609,6 +652,10 @@
            return
        _start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        # 处理安全笔数
        cls.__buyL2SafeCountManager.compute_left_rate(code, compute_start_index, compute_end_index, total_datas,
                                                      local_today_num_operate_map.get(code))
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = cls.__get_order_begin_pos(
            code)
@@ -710,7 +757,7 @@
                # 处理撤单步骤
                cls.__process_order(code, compute_index + 1, compute_end_index, capture_time, False)
                _start_time = l2_data_log.l2_time(code, cls.random_key[code], round(t.time() * 1000) - _start_time,
                                                  "处理撤单步骤耗时", force=True)
                                                  f"处理撤单步骤耗时,范围:{compute_index + 1}-{compute_end_index}", force=True)
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
@@ -823,7 +870,7 @@
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        # 目标手数
        threshold_num = threshold_money / (limit_up_price * 100)
        threshold_num = round(threshold_money / (limit_up_price * 100))
        buy1_factor = 1
        # 获取买1是否为涨停价
@@ -835,7 +882,7 @@
            print("买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price))
            buy1_factor = 1.3
        # 目标订单数量
        threshold_count = safe_count_manager.BuyL2SafeCountManager.get_safe_count(code)
        threshold_count = cls.__buyL2SafeCountManager.get_safe_count(code)
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
@@ -849,6 +896,13 @@
        # 最大买量
        max_buy_num = 0
        max_buy_num_set = set(max_num_set)
        # 需要的最小大单笔数
        big_num_count = 2
        if place_order_count > 1:
            # 第一次下单需要大单最少2笔,以后只需要1笔
            big_num_count = 1
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
@@ -878,7 +932,7 @@
                        logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}, 大单数量:{}", code,
                                                 i,
                                                 buy_nums,
                                                 threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
                                                 threshold_num, buy_count, get_threshold_count(), sub_threshold_count, )
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                if l2_data_util.is_big_money(_val):
                    sub_threshold_count -= int(total_datas[i]["re"])
@@ -910,20 +964,15 @@
            cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                          buy_nums, threshold_num)
            # 需要的最小大单笔数
            big_num_count = 2
            if place_order_count > 1:
                # 第一次下单需要大单最少2笔,以后只需要1笔
                big_num_count = 1
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy and len(
                    max_buy_num_set) >= big_num_count:
                return i, buy_nums, buy_count, None, max_buy_num_set
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{}",
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{} 目标大单数量:{}",
                      compute_start_index,
                      buy_nums,
                      threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
                      threshold_num, buy_count, get_threshold_count(), len(max_buy_num_set), big_num_count)
        return None, buy_nums, buy_count, None, max_buy_num_set
@@ -988,7 +1037,7 @@
        load_l2_data(code, True)
        _start = t.time()
        capture_timestamp = 1999999999
        cls.process(code, l2_data_util.local_today_datas[code][1552:1641], capture_timestamp)
        cls.process(code, local_today_datas[code][1552:1641], capture_timestamp)
        print("时间花费:", round((t.time() - _start) * 1000))
        pass
@@ -998,8 +1047,8 @@
        load_l2_data(code)
        limit_up_time_manager.load_limit_up_time()
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2_data_util.L2DataUtil.get_time_as_second(
                limit_up_time) >= l2_data_util.L2DataUtil.get_time_as_second(
        if limit_up_time is not None and l2.l2_data_util.L2DataUtil.get_time_as_second(
                limit_up_time) >= l2.l2_data_util.L2DataUtil.get_time_as_second(
            "14:30:00"):
            return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
l2/l2_data_util.py
@@ -11,7 +11,7 @@
import constant
import gpcode_manager
import l2_data_log
from l2 import l2_data_log
import log
from db import redis_manager
import tool
@@ -170,6 +170,23 @@
    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
    # 获取涨停价
    return day, client, channel, code, capture_time, process_time, datas, data
# 是否为大单
def is_big_money(val):
    price = float(val["price"])
    money = price * int(val["num"])
    if price > 3.0:
        if money >= 30000:
            return True
        else:
            return False
    else:
        max_money = price * 10000
        if money >= max_money * 0.95:
            return True
        else:
            return False
class L2DataUtil:
@@ -363,7 +380,7 @@
class L2TradeQueueUtils(object):
    # 获取成交进度索引
    @classmethod
    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList,
    def find_traded_progress_index(cls, buy_1_price, total_datas, local_today_num_operate_map, queueList,last_index,
                                   latest_not_limit_up_time=None):
        if len(queueList) == 0:
            return None
@@ -380,7 +397,8 @@
                    # 在最近一次非涨停买1更新的时间之后才有效
                    if latest_not_limit_up_time is None or tool.trade_time_sub(data["val"]["time"],
                                                                               latest_not_limit_up_time) >= 0:
                        index_set.add(data["index"])
                        if data["index"]>=last_index:
                            index_set.add(data["index"])
        index_list = list(index_set)
        index_list.sort()
        num_list = []
@@ -396,7 +414,8 @@
            temp_str = index_list_str[0:find_index]
            if temp_str.endswith(","):
                temp_str = temp_str[:-1]
            if temp_str == "":
                return new_index_list[0]
            return new_index_list[len(temp_str.split(","))]
        raise Exception("尚未找到成交进度")
l2/safe_count_manager.py
@@ -59,7 +59,13 @@
        return vals
    def clear_data(self, code):
        pass
        key_regex = f"safe_count_l2-{code}-*"
        keys = self.__getRedis().keys(key_regex)
        for k in keys:
            self.__getRedis().delete(k)
        key = f"latest_place_order_info-{code}"
        self.__getRedis().delete(key)
    # 获取基础的安全笔数
    def __get_base_save_count(self, code):
@@ -68,6 +74,9 @@
    # 获取最后的安全笔数
    def get_safe_count(self, code):
        rate = self.__get_rate(code)
        print("--------------------------------")
        print("安全笔数比例:", rate)
        print("--------------------------------")
        count = self.__get_base_save_count(code)
        count = round(count * rate)
        if count < 8:
@@ -98,13 +107,17 @@
            data = total_datas[i]
            val = data["val"]
            # 如果没有取消位置就一直计算下去, 计算截至时间不能大于取消时间
            if cancel_time and int(cancel_time.split(":", "")) < int(val["time"].split(":", "")):
            if cancel_time and int(cancel_time.replace(":", "")) < int(val["time"].replace(":", "")):
                break_index = i
                break
        if break_index >= 0:
            end_index = break_index - 1
        # 获取开始计算的位置
        start_compute_index = min(start_index, last_buy_single_index)
        if start_compute_index <= process_index:
            start_compute_index = process_index + 1
        for i in range(start_index, end_index):
        for i in range(start_compute_index, end_index):
            data = total_datas[i]
            val = data["val"]
            if process_index >= i:
@@ -128,7 +141,10 @@
        vals = self.__get_all_compute_progress(code)
        rate = (1 - 0)
        for val in vals:
            rate *= (1 - round((val[2] - val[3]) / val[2], 4))
            temp_rate = (1 - round((val[2] - val[3]) / val[2], 4))
            if temp_rate > 1:
                temp_rate = 1
            rate *= temp_rate
        return rate
    # 下单成功
l2/transaction_progress.py
@@ -9,6 +9,7 @@
from db import redis_manager
import tool
import l2.l2_data_util
from log import logger_l2_trade_buy_queue
class TradeBuyQueue:
@@ -33,17 +34,18 @@
        val = json.loads(val)
        return val[0], [1]
    def __save_buy_progress_index(self, code, index):
    def __save_buy_progress_index(self, code, index, is_default):
        key = "trade_buy_progress_index-{}".format(code)
        self.__getRedis().setex(key, tool.get_expire(), index)
        self.__getRedis().setex(key, tool.get_expire(), json.dumps((index, is_default)))
        # 返回数据与更新时间
    def __get_buy_progress_index(self, code):
        key = "trade_buy_progress_index-{}".format(code)
        val = self.__getRedis().get(key)
        if val is None:
            return None
        return int(val)
            return None, True
        val = json.loads(val)
        return int(val[0]), bool(val[1])
    # 最近的非涨停买1的时间
    def __save_latest_not_limit_up_time(self, code, time_str):
@@ -67,7 +69,9 @@
        self.last_buy_queue_data[code] = queues
        min_num = round(constant.L2_MIN_MONEY / (limit_up_price * 100))
        num_list = []
        for num in queues:
        # 忽略第一条数据
        for i in range(1, len(queues)):
            num = queues[i]
            if num > min_num:
                num_list.append(num)
        # 保存列表
@@ -78,21 +82,35 @@
    def save_traded_index(self, code, buy1_price, buyQueueBig):
        total_datas = l2.l2_data_util.local_today_datas.get(code)
        today_num_operate_map = l2.l2_data_util.local_today_num_operate_map.get(code)
        index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(buy1_price, total_datas,
                                                                             today_num_operate_map, buyQueueBig,self.__get_latest_not_limit_up_time(code))
        if index is not None:
            # 保存成交进度
            self.__save_buy_progress_index(code, index)
            return index
        for i in range(0, len(buyQueueBig)):
            buyQueueBigTemp = buyQueueBig[i:]
            if i > 0 and len(buyQueueBigTemp) < 2:
                # 已经执行过一次,且数据量小于2条就终止计算
                break
            last_index, is_default = self.get_traded_index(code)
            index = l2.l2_data_util.L2TradeQueueUtils.find_traded_progress_index(buy1_price, total_datas,
                                                                                 today_num_operate_map, buyQueueBigTemp,
                                                                                 (
                                                                                     last_index if last_index is not is_default else 0),
                                                                                 self.__get_latest_not_limit_up_time(
                                                                                     code))
            if index is not None:
                logger_l2_trade_buy_queue.info(f"确定交易进度:code-{code} index-{index}")
                # 保存成交进度
                self.__save_buy_progress_index(code, index, False)
                return index
        return None
    # 获取成交进度索引
    def get_traded_index(self, code):
        index = self.__get_buy_progress_index(code)
        return index
        index, is_default = self.__get_buy_progress_index(code)
        return index, is_default
    def set_default_traded_index(self, code, index):
        self.__save_buy_progress_index(code, index, True)
if __name__ == '__main':
    pass
l2_trade_test.py
@@ -1,13 +1,23 @@
# 交易测试
# 清除交易数据
import decimal
import json
import logging
import random
import unittest
from copy import deepcopy
from unittest import mock
import big_money_num_manager
import gpcode_manager
import log
import tool
from db import redis_manager
from l2.safe_count_manager import BuyL2SafeCountManager
from l2.transaction_progress import TradeBuyQueue
from trade import trade_data_manager
from l2.l2_data_manager import TradePointManager
# from l2_data_manager_new import L2TradeDataProcessor, L2LimitUpMoneyStatisticUtil, AverageBigNumComputer
from trade.trade_queue_manager import THSBuy1VolumnManager
import l2.l2_data_manager_new, l2.l2_data_manager, l2.l2_data_util, l2.cancel_buy_strategy
def clear_trade_data(code):
@@ -15,7 +25,7 @@
    keys = ["buy1_volumn_latest_info-{}", "m_big_money_begin-{}", "m_big_money_process_index-{}"]
    for k in keys:
        redis_l2.delete(k.format(code))
    TradePointManager.delete_buy_point(code)
    l2.l2_data_manager.TradePointManager.delete_buy_point(code)
    big_money_num_manager.reset(code)
    redis_trade = redis_manager.RedisManager(2).getRedis()
    redis_trade.delete("trade-state-{}".format(code))
@@ -27,54 +37,102 @@
            continue
        if k.find("zyltgb") is not None:
            continue
        redis_info.delete(k)
    BuyL2SafeCountManager().clear_data(code)
#
# class VirtualTrade(unittest.TestCase):
#     code = "000701"
#     clear_trade_data(code)
#     l2_data_manager.load_l2_data(code)
#     total_datas = l2_data_manager.local_today_datas[code]
#     if total_datas[0]["index"] > 0:
#         # 拼接数据
#         for i in range(0, total_datas[0]["index"]):
#             data = total_datas[0].copy()
#             data["index"] = i
#             total_datas.insert(i, data)
#
#     pos_list = log.get_l2_process_position(code)
#     if pos_list[0][0] > 0:
#         pos_list.insert(0, (0, pos_list[0][0] - 1))
#     del pos_list[-1]
#     if pos_list[-1][1] < total_datas[-1]["index"]:
#         # 剩下的数据根据秒来分
#         start_index = -1
#         for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
#             if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
#                 if start_index < 0:
#                     start_index = i
#                 else:
#                     pos_list.append((start_index, i - 1))
#                     start_index = i
#     if pos_list[-1][1] < total_datas[-1]["index"]:
#         pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
#     l2_data_manager_new.local_today_datas = {code: []}
#     l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=12)
#     for indexs in pos_list:
#         L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
#         # 设置封单额,获取买1量
#         for i in range(0, 100):
#             time_ = total_datas[indexs[0]]["val"]["time"]
#             time_s = tool.get_time_as_second(time_) - i - 1
#             volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
#             if volumn is not None:
#                 l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
#                                                                            tool.time_seconds_format(time_s))
#                 break
#
#         print("----------------处理位置", indexs)
#         L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
class VirtualTrade(unittest.TestCase):
    def __process_buy_queue(code, buy_queue, time_):
        if time_ == "09:32:37":
            print("进入调试")
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        buy_one_price = limit_up_price
        if limit_up_price is not None:
            buy_queue_result_list = TradeBuyQueue().save(code, limit_up_price, limit_up_price, time_,
                                                         buy_queue)
            if buy_queue_result_list:
                # 有数据
                try:
                    buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                        decimal.Decimal("0.00"))
                    buy_progress_index = TradeBuyQueue().save_traded_index(code, buy_one_price_, buy_queue_result_list)
                    if buy_progress_index is not None:
                        l2.cancel_buy_strategy.HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                                                                           l2.l2_data_util.local_today_datas.get(
                                                                                               code),
                                                                                           l2.l2_data_util.local_today_num_operate_map.get(
                                                                                               code))
                    log.logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                       buy_progress_index,
                                                       json.dumps(buy_queue_result_list))
                except Exception as e:
                    pass
    code = "002792"
    clear_trade_data(code)
    l2.l2_data_util.load_l2_data(code)
    total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
    if total_datas[0]["index"] > 0:
        # 拼接数据
        for i in range(0, total_datas[0]["index"]):
            data = total_datas[0].copy()
            data["index"] = i
            total_datas.insert(i, data)
    pos_list = log.get_l2_process_position(code)
    if pos_list[0][0] > 0:
        pos_list.insert(0, (0, pos_list[0][0] - 1))
    del pos_list[-1]
    if pos_list[-1][1] < total_datas[-1]["index"]:
        # 剩下的数据根据秒来分
        start_index = -1
        for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
            if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
                if start_index < 0:
                    start_index = i
                else:
                    pos_list.append((start_index, i - 1))
                    start_index = i
    if pos_list[-1][1] < total_datas[-1]["index"]:
        pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
    l2.l2_data_util.local_today_datas[code].clear()
    print("id:", id(l2.l2_data_util.local_today_datas))
    # l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=12)
    # pos_list.insert(41,(225,306))
    # pos_list.insert(63, (345, 423))
    # pos_list.insert(66, (440, 447))
    # pos_list.insert(75, (472, 488))
    # pos_list.insert(84, (516, 532))
    # 获取交易进度
    trade_progress_list, buy_queues = log.get_trade_progress(code)
    for indexs in pos_list:
        l2.l2_data_manager_new.L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
        # 设置封单额,获取买1量
        for i in range(0, 100):
            time_ = total_datas[indexs[0]]["val"]["time"]
            time_s = tool.get_time_as_second(time_) - i - 1
            volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
            if volumn is not None:
                l2.cancel_buy_strategy.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
                                                                              tool.time_seconds_format(time_s))
                break
        # 设置委买队列
        for i in range(0, len(buy_queues)):
            if tool.trade_time_sub(buy_queues[i][1], total_datas[indexs[0]]["val"]["time"]) > 0:
                print("委买队列", buy_queues[i])
                try:
                    __process_buy_queue(code, buy_queues[i - 1][0], buy_queues[i - 1][1])
                except:
                    pass
                break
        print("----------------处理位置", indexs)
        if indexs[0] >= 224:
            print("进入调试")
        l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
# class TestTrade(unittest.TestCase):
@@ -103,11 +161,10 @@
#     print(buy_single_index, buy_exec_index, compute_index, num, count)
class TestData(unittest.TestCase):
    code = "002103"
    # l2_data_manager.load_l2_data(code)
    # TradeBuyQueue().save_traded_index(code, "6.94", [1511, 888, 796])
# class TestData(unittest.TestCase):
#     code = "002103"
#     # l2_data_manager.load_l2_data(code)
#     # TradeBuyQueue().save_traded_index(code, "6.94", [1511, 888, 796])
if __name__ == "__main__":
log.py
@@ -291,6 +291,31 @@
    return pos_list
# 获取交易进度
def get_trade_progress(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    index_list = []
    buy_queues = []
    with open("D:/logs/gp/l2/l2_trade_buy_queue.{}.log".format(date), mode='r', encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            time_ = __get_log_time(line).strip()
            if int(time_.replace(":", "")) > int("150000"):
                continue
            if line.find(f"{code}-[") >= 0:
                buy_queues.append((eval(line.split(f"{code}-")[1]), time_))
            if line.find("获取成交位置成功: code-{}".format(code)) < 0:
                continue
            index = int(line.split("index-")[1].split(" ")[0])
            index_list.append((index, time_))
    return index_list, buy_queues
def export_logs(code):
    code_name = gpcode_manager.get_code_name(code)
    date = datetime.datetime.now().strftime("%Y-%m-%d")
@@ -310,7 +335,7 @@
if __name__ == '__main__':
    # logger_l2_process_time.info("test123")
    codes = ["002842"]
    codes = ["002963"]
    for code in codes:
        export_logs(code)
server.py
@@ -19,10 +19,10 @@
import gpcode_manager
import authority
import juejin
import l2_data_log
from l2 import l2_data_manager_new, l2_data_manager
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log
import l2_data_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil
import l2.l2_data_util
import ths_industry_util
import ths_util
@@ -60,6 +60,7 @@
    l2_save_time_dict = {}
    l2_trade_buy_queue_dict = {}
    tradeBuyQueue = l2.transaction_progress.TradeBuyQueue()
    last_time = {}
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -80,90 +81,101 @@
        # print("- " * 30)
        sk: socket.socket = self.request
        while True:
            data = sk.recv(1024 * 1024 * 20)
            data = sk.recv(1024 * 100)
            if len(data) == 0:
                # print("客户端断开连接")
                break
            _str = str(data, encoding="gbk")
            if len(_str) > 0:
                # print("结果:",_str)
                type = data_process.parseType(_str)
                type = -1
                try:
                    type = data_process.parseType(_str)
                except:
                    print(_str)
                return_str = "OK"
                if type == 0:
                    try:
                        origin_start_time = round(time.time() * 1000)
                        __start_time = round(time.time() * 1000)
                        do_id = random.randint(0, 100000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data(
                            _str)
                        # 间隔1s保存一条l2的最后一条数据
                        if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
                            code] >= 1000 and len(datas) > 0:
                            self.l2_save_time_dict[code] = origin_start_time
                            logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1])
                        if channel == 0:
                            now_time = round(time.time() * 1000)
                            if self.last_time.get(channel) is not None:
                                #print("接受到L2的数据", channel, now_time - self.last_time.get(channel), "解析耗时",now_time - origin_start_time)
                                pass
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
                        # print("截图时间:", process_time)
                        __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                           "截图时间:{} 数据解析时间".format(process_time))
                            self.last_time[channel] = now_time
                        cid, pid = gpcode_manager.get_listen_code_pos(code)
                        if True:
                            # 间隔1s保存一条l2的最后一条数据
                            if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
                                code] >= 1000 and len(datas) > 0:
                                self.l2_save_time_dict[code] = origin_start_time
                                logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1])
                        __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                           "l2获取代码位置耗时")
                        # 判断目标代码位置是否与上传数据位置一致
                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                            try:
                                # 校验客户端代码
                                l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                __start_time = round(time.time() * 1000)
                                if gpcode_manager.is_listen(code):
                                    __start_time = l2_data_log.l2_time(code, do_id,
                                                                       round(time.time() * 1000) - __start_time,
                                                                       "l2外部数据预处理耗时")
                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp,
                                                                                     do_id)
                                    __start_time = l2_data_log.l2_time(code, do_id,
                                                                       round(time.time() * 1000) - __start_time,
                                                                       "l2数据有效处理外部耗时",
                                                                       False)
                                    # 保存原始数据数量
                                    l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                    if round(time.time() * 1000) - __start_time > 20:
                                        l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                            "异步保存原始数据条数耗时",
                                                            False)
                            # 10ms的网络传输延时
                            capture_timestamp = __start_time - process_time - 10
                            # print("截图时间:", process_time)
                            __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                               "截图时间:{} 数据解析时间".format(process_time))
                            except l2_data_manager.L2DataException as l:
                                # 单价不符
                                if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
                                    key = "{}-{}-{}".format(client, channel, code)
                                    if key not in self.l2_data_error_dict or round(
                                            time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
                                        # self.l2CodeOperate.repaire_l2_data(code)
                                        # todo 太敏感移除代码
                                        logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
                                        # 单价不一致时需要移除代码重新添加
                                        l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误")
                                        self.l2_data_error_dict[key] = round(time.time() * 1000)
                            cid, pid = gpcode_manager.get_listen_code_pos(code)
                            except Exception as e:
                                print("异常", str(e), code)
                                logging.exception(e)
                                logger_l2_error.error("出错:{}".format(str(e)))
                                logger_l2_error.error("内容:{}".format(_str))
                            finally:
                            __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                               "l2获取代码位置耗时")
                            # 判断目标代码位置是否与上传数据位置一致
                            if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                                try:
                                    # 校验客户端代码
                                    l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                    __start_time = round(time.time() * 1000)
                                    if gpcode_manager.is_listen(code):
                                        __start_time = l2_data_log.l2_time(code, do_id,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2外部数据预处理耗时")
                                        l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp,
                                                                                         do_id)
                                        __start_time = l2_data_log.l2_time(code, do_id,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2数据有效处理外部耗时",
                                                                           False)
                                        # 保存原始数据数量
                                        l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                        if round(time.time() * 1000) - __start_time > 20:
                                            l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                                "异步保存原始数据条数耗时",
                                                                False)
                                __end_time = round(time.time() * 1000)
                                # 只记录大于40ms的数据
                                if __end_time - origin_start_time > 100:
                                    l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time,
                                                        "l2数据处理总耗时",
                                                        True)
                                except l2_data_manager.L2DataException as l:
                                    # 单价不符
                                    if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
                                        key = "{}-{}-{}".format(client, channel, code)
                                        if key not in self.l2_data_error_dict or round(
                                                time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
                                            # self.l2CodeOperate.repaire_l2_data(code)
                                            # todo 太敏感移除代码
                                            logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
                                            # 单价不一致时需要移除代码重新添加
                                            l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误")
                                            self.l2_data_error_dict[key] = round(time.time() * 1000)
                                except Exception as e:
                                    print("异常", str(e), code)
                                    logging.exception(e)
                                    logger_l2_error.error("出错:{}".format(str(e)))
                                    logger_l2_error.error("内容:{}".format(_str))
                                finally:
                                    __end_time = round(time.time() * 1000)
                                    # 只记录大于40ms的数据
                                    if __end_time - origin_start_time > 100:
                                        l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time,
                                                            "l2数据处理总耗时",
                                                            True)
                    except Exception as e:
                        logger_l2_error.exception(e)
                elif type == 1:
@@ -221,40 +233,45 @@
                elif type == 5:
                    logger_trade_delegate.debug("接收到委托信息")
                    # 交易委托信息
                    dataList = data_process.parseList(_str)
                    if self.last_trade_delegate_data != _str:
                        self.last_trade_delegate_data = _str
                        # 保存委托信息
                        logger_trade_delegate.info(dataList)
                    __start_time = round(time.time() * 1000)
                    try:
                        # 设置申报时间
                        for item in dataList:
                            apply_time = item["apply_time"]
                            if apply_time and len(apply_time) >= 8:
                                code = item["code"]
                                trade_state = trade_manager.get_trade_state(code)
                                # 设置下单状态的代码为已委托
                                if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                                    origin_apply_time = apply_time
                                    apply_time = apply_time[0:6]
                                    apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4], apply_time[4:6])
                                    ms = origin_apply_time[6:9]
                                    if int(ms) > 500:
                                        # 时间+1s
                                        apply_time = tool.trade_time_add_second(apply_time, 1)
                        # 交易委托信息
                        dataList = data_process.parseList(_str)
                        if self.last_trade_delegate_data != _str:
                            self.last_trade_delegate_data = _str
                            # 保存委托信息
                            logger_trade_delegate.info(dataList)
                        try:
                            # 设置申报时间
                            for item in dataList:
                                apply_time = item["apply_time"]
                                if apply_time and len(apply_time) >= 8:
                                    code = item["code"]
                                    trade_state = trade_manager.get_trade_state(code)
                                    # 设置下单状态的代码为已委托
                                    if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                                        origin_apply_time = apply_time
                                        apply_time = apply_time[0:6]
                                        apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4],
                                                                       apply_time[4:6])
                                        ms = origin_apply_time[6:9]
                                        if int(ms) > 500:
                                            # 时间+1s
                                            apply_time = tool.trade_time_add_second(apply_time, 1)
                                    print(apply_time)
                    except Exception as e:
                        logging.exception(e)
                                        print(apply_time)
                        except Exception as e:
                            logging.exception(e)
                    try:
                        trade_manager.process_trade_delegate_data(dataList)
                    except Exception as e:
                        logging.exception(e)
                    trade_manager.save_trade_delegate_data(dataList)
                    # 刷新交易界面
                    trade_gui.THSGuiTrade().refresh_data()
                        try:
                            trade_manager.process_trade_delegate_data(dataList)
                        except Exception as e:
                            logging.exception(e)
                        trade_manager.save_trade_delegate_data(dataList)
                        # 刷新交易界面
                        trade_gui.THSGuiTrade().refresh_data()
                    finally:
                        pass
                elif type == 4:
                    # 行业代码信息
@@ -278,28 +295,33 @@
                    buy_one_price = data["buyOnePrice"]
                    buy_one_volumn = data["buyOneVolumn"]
                    buy_queue = data["buyQueue"]
                    buy_queue_result_list = self.tradeBuyQueue.save(code, gpcode_manager.get_limit_up_price(code),
                                                                    buy_one_price, buy_time,
                                                                    buy_queue)
                    if buy_queue_result_list:
                        # 有数据
                        try:
                            buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                decimal.Decimal("0.00"))
                            buy_progress_index = self.tradeBuyQueue.save_traded_index(code, buy_one_price_,
                                                                                      buy_queue_result_list)
                            if buy_progress_index is not None:
                                HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                                                            l2_data_manager.local_today_datas.get(code),
                                                                            l2_data_manager.local_today_num_operate_map.get(
                                                                                code))
                            logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                           buy_progress_index,
                                                           json.dumps(buy_queue_result_list))
                        except Exception as e:
                            print("买入队列", code, buy_queue_result_list)
                            logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{}  数据-{}", code, str(e),
                                                              json.dumps(buy_queue_result_list))
                    if buy_one_price is None:
                        print('买1价没有,', code)
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if limit_up_price is not None:
                        buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price, buy_time,
                                                                        buy_queue)
                        if buy_queue_result_list:
                            # 有数据
                            try:
                                buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                    decimal.Decimal("0.00"))
                                buy_progress_index = self.tradeBuyQueue.save_traded_index(code, buy_one_price_,
                                                                                          buy_queue_result_list)
                                if buy_progress_index is not None:
                                    HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                                                                l2.l2_data_util.local_today_datas.get(
                                                                                    code),
                                                                                l2.l2_data_util.local_today_num_operate_map.get(
                                                                                    code))
                                logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                               buy_progress_index,
                                                               json.dumps(buy_queue_result_list))
                            except Exception as e:
                                logging.exception(e)
                                print("买入队列", code, buy_queue_result_list)
                                logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{}  数据-{}", code, str(e),
                                                                  json.dumps(buy_queue_result_list))
                    # buy_queue是否有变化
                    if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
@@ -318,7 +340,7 @@
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                L2LimitUpMoneyStatisticUtil.verify_num(0, code, int(buy_one_volumn), buy_time)
                                L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
                    # print(buy_time, buy_one_price, buy_one_volumn)
                    # print("L2买卖队列",datas)
trade/trade_result_manager.py
New file
@@ -0,0 +1,62 @@
# 虚拟买成功
import logging
from l2 import l2_data_manager
from l2.cancel_buy_strategy import HourCancelBigNumComputer, SecondCancelBigNumComputer, L2LimitUpSellStatisticUtil
from l2.l2_data_util import local_today_datas, local_today_num_operate_map
from l2.safe_count_manager import BuyL2SafeCountManager
from log import logger_l2_error
from trade import trade_data_manager
from trade.trade_queue_manager import THSBuy1VolumnManager
__thsBuy1VolumnManager = THSBuy1VolumnManager()
__buyL2SafeCountManager = BuyL2SafeCountManager()
def virtual_buy_success(code):
    # 增加下单计算
    trade_data_manager.placeordercountmanager.place_order(code)
    # 删除之前的板上卖信息
    L2LimitUpSellStatisticUtil.delete(code)
# 虚拟撤成功
def virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas):
    l2_data_manager.TradePointManager.delete_buy_point(code)
    l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
    l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
    l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
    # 安全笔数计算
    __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, total_datas[-1]["index"])
    SecondCancelBigNumComputer.cancel_success(code)
# 真实买成功
def real_buy_success(code):
    # 下单成功,需要删除最大买1
    __thsBuy1VolumnManager.clear_max_buy1_volume(code)
    # 获取买入位置信息
    try:
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set = l2_data_manager.TradePointManager.get_buy_compute_start_data(
            code)
        __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, None)
        HourCancelBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index,
                                                     local_today_datas.get(code),
                                                     local_today_num_operate_map.get(code))
    except Exception as e:
        logging.exception(e)
        logger_l2_error.exception(e)
    l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
# 真实撤成功
def real_cancel_success(code, buy_single_index, buy_exec_index, total_datas):
    # 安全笔数计算
    __buyL2SafeCountManager.save_place_order_info(code, buy_single_index, buy_exec_index, total_datas[-1]["index"])
    # 取消买入标识
    l2_data_manager.TradePointManager.delete_buy_point(code)
    l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
    l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
    l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
    SecondCancelBigNumComputer.cancel_success(code)