Administrator
2023-10-26 97cc7f2d7428ea890c0a0ada76e5bffafd2463e4
L2总卖实现
17个文件已修改
838 ■■■■ 已修改文件
constant.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
inited_data.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/cancel_buy_strategy.py 180 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager.py 107 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_data_manager_new.py 386 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_log.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_sell_manager.py 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2/l2_transaction_data_manager.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log_module/log_export.py 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
output/code_info_output.py 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/l2_trade_test.py 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_order_processor.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/huaxin/huaxin_trade_server.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_manager.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade/trade_result_manager.py 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utils/tool.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
constant.py
@@ -123,6 +123,14 @@
# 小金额
L_CANCEL_MIN_MONEY = 50
# F撤
F_CANCEL_WATCH_COUNT = 5
F_CANCEL_CACEL_RATE = 0.69
# 华鑫L2的卡位数量
HUAXIN_L2_MAX_CODES_COUNT = 50
inited_data.py
@@ -11,6 +11,7 @@
from code_attribute import big_money_num_manager, global_data_loader, gpcode_manager, gpcode_first_screen_manager
from code_attribute.code_nature_analyse import LatestMaxVolumeManager
from db.redis_manager_delegate import RedisUtils
from l2.l2_sell_manager import L2MarketSellManager
from ths import client_manager
import constant
from trade.deal_big_money_manager import DealOrderNoManager
@@ -100,6 +101,8 @@
        DealOrderNoManager().clear()
        # 最近是否有最大量
        LatestMaxVolumeManager().clear()
        # L2卖行情清除
        L2MarketSellManager().clear()
# 每日初始化
l2/cancel_buy_strategy.py
@@ -14,6 +14,7 @@
import l2_data_util
from db import redis_manager_delegate as redis_manager
from db.redis_manager_delegate import RedisUtils
from l2.l2_data_manager import OrderBeginPosInfo
from utils import tool
from l2.transaction_progress import TradeBuyQueue
from trade import trade_queue_manager, l2_trade_factor, trade_record_log_util
@@ -1304,6 +1305,185 @@
        self.clear(code)
# F撤
class FastCancelBigNumComputer:
    __db = 0
    __redis_manager = redis_manager.RedisManager(0)
    __cancel_real_order_index_cache = {}
    __watch_indexes_cache = {}
    __last_trade_progress_dict = {}
    __instance = None
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(FastCancelBigNumComputer, cls).__new__(cls, *args, **kwargs)
            cls.__load_datas()
        return cls.__instance
    @classmethod
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            keys = RedisUtils.keys(__redis, "f_cancel_real_order_index-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                CodeDataCacheUtil.set_cache(cls.__cancel_real_order_index_cache, code, int(val))
            keys = RedisUtils.keys(__redis, "f_cancel_watch_index-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.get(__redis, k)
                val = set(json.loads(val))
                CodeDataCacheUtil.set_cache(cls.__watch_indexes_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    @classmethod
    def __get_redis(cls):
        return cls.__redis_manager.getRedis()
    def __set_real_order_index(self, code, index):
        CodeDataCacheUtil.set_cache(self.__cancel_real_order_index_cache, code, index)
        RedisUtils.setex_async(self.__db, f"f_cancel_real_order_index-{code}", tool.get_expire(), f"{index}")
    def __del_real_order_index(self, code):
        CodeDataCacheUtil.clear_cache(self.__cancel_real_order_index_cache, code)
        RedisUtils.delete_async(self.__db, f"f_cancel_real_order_index-{code}")
    def __get_real_order_index(self, code):
        val = RedisUtils.get(self.__db, f"f_cancel_real_order_index-{code}")
        if val:
            return int(val)
        return None
    def __get_real_order_index_cache(self, code):
        cache_result = CodeDataCacheUtil.get_cache(self.__cancel_real_order_index_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    def __set_watch_indexes(self, code, indexes):
        CodeDataCacheUtil.set_cache(self.__watch_indexes_cache, code, indexes)
        RedisUtils.setex_async(self.__db, f"f_cancel_watch_index-{code}", tool.get_expire(), f"{json.dumps(list(indexes))}")
    def __get_watch_indexes(self, code):
        watch_indexes = self.__watch_indexes_cache.get(code)
        if watch_indexes:
            return watch_indexes
        return set()
    def __compute_watch_indexes(self, code, begin_pos_info: OrderBeginPosInfo, total_datas):
        MAX_COUNT = constant.F_CANCEL_WATCH_COUNT
        watch_indexes = self.__get_watch_indexes(code)
        if watch_indexes and len(watch_indexes) >= MAX_COUNT:
            return
        # 计算开始计算位置
        total_money = 0
        c_start_index = begin_pos_info.buy_exec_index
        for i in range(begin_pos_info.buy_single_index, total_datas[-1]["index"] + 1):
            val = total_datas[i]["val"]
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            money = val["num"] * float(val["price"])
            if money < 5000:
                continue
            total_money += int(money * 100)
            if total_money >= begin_pos_info.sell_info[1]:
                c_start_index = i
                break
        for i in range(c_start_index + 1, total_datas[-1]["index"] + 1):
            val = total_datas[i]["val"]
            if not L2DataUtil.is_limit_up_price_buy(val):
                continue
            if val["num"] * float(val["price"]) < 5000:
                continue
            watch_indexes.add(i)
            if len(watch_indexes) >= MAX_COUNT:
                break
        # 保存数据
        self.__set_watch_indexes(code, watch_indexes)
    def set_trade_progress(self, code, index):
        if self.__last_trade_progress_dict.get(code) == index:
            return
        self.__last_trade_progress_dict[code] = index
    def need_cancel(self, code, start_index, end_index, begin_pos_info: OrderBeginPosInfo):
        if begin_pos_info.mode != OrderBeginPosInfo.MODE_FAST:
            return False, None
        if code in self.__cancel_real_order_index_cache:
            # 获取到真实下单位置,不需要守护
            return False, None
        # 计算买入执行位置后的3笔涨停买
        total_datas = local_today_datas.get(code)
        self.__compute_watch_indexes(code, begin_pos_info, total_datas)
        watch_indexes = self.__get_watch_indexes(code)
        cancel_indexes = []
        if watch_indexes:
            # 判断其中一个是否撤单
            for i in watch_indexes:
                cancel_data = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_canceled_data_v2(code,
                                                                                                      i,
                                                                                                      total_datas,
                                                                                                      local_today_canceled_buyno_map.get(
                                                                                                          code))
                if cancel_data and cancel_data["index"] <= end_index:
                    cancel_indexes.append(cancel_data["index"])
            if len(cancel_indexes) / len(watch_indexes) >= constant.F_CANCEL_CACEL_RATE:
                cancel_indexes.sort()
                return True, total_datas[cancel_indexes[-1]]
        return False, None
    def clear(self, code=None):
        if code:
            self.__del_real_order_index(code)
        else:
            keys = RedisUtils.keys(self.__get_redis(), "f_cancel_real_order_index-*")
            if keys:
                for k in keys:
                    code = k.split("-")[1]
                    self.__del_real_order_index(code)
    # 设置真实的下单位置,返回是否需要撤单
    def set_real_order_index(self, code, index):
        self.__set_real_order_index(code, index)
        l2_log.f_cancel_debug(code, f"下单位置设置:{index}")
        trade_index = self.__last_trade_progress_dict.get(code)
        if trade_index:
            total_datas = local_today_datas.get(code)
            # 真实下单位置
            total_count = 0
            for i in range(trade_index+1,index):
                data = total_datas[i]
                val = data["val"]
                if not L2DataUtil.is_limit_up_price_buy(val):
                    continue
                if val["num"] * float(val["price"]) < 5000:
                    continue
                left_count = l2_data_source_util.L2DataSourceUtils.get_limit_up_buy_no_canceled_count_v2(code, i,
                                                                                                         total_datas,
                                                                                                         local_today_canceled_buyno_map.get(
                                                                                                             code))
                if left_count>0:
                    total_count += left_count
                    if total_count >= 2:
                        return False
        return True
    def place_order_success(self, code):
        self.clear(code)
    def cancel_success(self, code):
        self.clear(code)
# ---------------------------------G撤-------------------------------
class GCancelBigNumComputer:
    __SecondCancelBigNumComputer = SecondCancelBigNumComputer()
l2/l2_data_manager.py
@@ -12,6 +12,45 @@
_redisManager = redis_manager.RedisManager(1)
# 下单临时信息
class OrderBeginPosInfo(object):
    MODE_NORMAL = 0
    MODE_FAST = 1
    # mode: 0-普通交易  1-快速交易
    def __init__(self, buy_single_index=None, buy_exec_index=-1, buy_compute_index=None, num=0, count=0,
                 max_num_set=None, buy_volume_rate=None, sell_info=None, threshold_money=None, mode=0):
        self.buy_single_index = buy_single_index
        self.buy_exec_index = buy_exec_index
        self.buy_compute_index = buy_compute_index
        self.num = num
        self.count = count
        self.threshold_money = threshold_money
        if max_num_set:
            self.max_num_set = list(max_num_set)
        else:
            self.max_num_set = []
        self.buy_volume_rate = buy_volume_rate
        self.sell_info = sell_info
        self.mode = mode
    def get_max_num_set(self):
        if self.max_num_set:
            return set(self.max_num_set)
        return None
    def to_json_str(self):
        return json.dumps(vars(self))
    def to_dict(self):
        return vars(self)
    @classmethod
    def to_object(cls, json_str: str):
        d = json.loads(json_str)
        return OrderBeginPosInfo(**d)
class L2DataException(Exception):
    # 价格不匹配
    CODE_PRICE_ERROR = 1
@@ -56,8 +95,8 @@
        for k in keys:
            code = k.split("-")[-1]
            val = RedisUtils.get(redis_, k)
            val = json.loads(val)
            CodeDataCacheUtil.set_cache(cls.__buy_compute_index_info_cache, code, val)
            order = OrderBeginPosInfo.to_object(val)
            CodeDataCacheUtil.set_cache(cls.__buy_compute_index_info_cache, code, order)
        keys = RedisUtils.keys(redis_, "buy_cancel_single_pos-*")
        for k in keys:
@@ -66,7 +105,6 @@
            CodeDataCacheUtil.set_cache(cls.__buy_cancel_single_pos_cache, code, int(val))
    # 删除买入点数据
    def delete_buy_point(self, code):
        CodeDataCacheUtil.clear_cache(self.__buy_compute_index_info_cache, code)
        RedisUtils.delete_async(self.__db, "buy_compute_index_info-{}".format(code))
@@ -82,11 +120,12 @@
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2], _data[3], _data[4], _data[5], _data[6]
    def get_buy_compute_start_data_cache(self, code):
    def get_buy_compute_start_data_cache(self, code) -> OrderBeginPosInfo:
        cache_result = CodeDataCacheUtil.get_cache(self.__buy_compute_index_info_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None, None, None, 0, 0, [], 0
        return OrderBeginPosInfo()
    # 设置买入点的值
    # buy_single_index 买入信号位
@@ -94,51 +133,18 @@
    # compute_index 计算位置
    # nums 累计纯买额
    def set_buy_compute_start_data(self, code, buy_single_index, buy_exec_index, compute_index, nums, count,
                                   max_num_sets,
                                   volume_rate):
    def set_buy_compute_start_data_v2(self, code, order: OrderBeginPosInfo):
        expire = tool.get_expire()
        _key = "buy_compute_index_info-{}".format(code)
        data_ = None
        if buy_single_index is not None:
            data_ = (buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets),
                     volume_rate)
        if order.buy_single_index is not None:
            data_ = order
        else:
            _buy_single_index, _buy_exec_index, _compute_index, _nums, _count, _max_num_index, _volume_rate = self.get_buy_compute_start_data_cache(
                code)
            data_ = (_buy_single_index, buy_exec_index, compute_index, nums, count, list(max_num_sets),
                     volume_rate)
            old_order = self.get_buy_compute_start_data_cache(code)
            order.buy_single_index = old_order.buy_single_index
            data_ = order
        CodeDataCacheUtil.set_cache(self.__buy_compute_index_info_cache, code, data_)
        RedisUtils.setex_async(self.__db, _key, expire, json.dumps(data_))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
    def get_buy_cancel_single_pos(self, code):
        info = RedisUtils.get(self.__get_redis(), "buy_cancel_single_pos-{}".format(code))
        if info is None:
            return None
        else:
            return int(info)
    def get_buy_cancel_single_pos_cache(self, code):
        cache_result = tool.CodeDataCacheUtil.get_cache(self.__buy_cancel_single_pos_cache, code)
        if cache_result[0]:
            return cache_result[1]
        return None
    # 设置买撤点信息
    # buy_num 纯买额  computed_index计算到的下标  index撤买信号起点
    def set_buy_cancel_single_pos(self, code, index):
        tool.CodeDataCacheUtil.set_cache(self.__buy_cancel_single_pos_cache, code, index)
        expire = tool.get_expire()
        RedisUtils.setex_async(self.__db, "buy_cancel_single_pos-{}".format(code), expire, index)
    # 删除买撤点数据
    def delete_buy_cancel_point(self, code):
        tool.CodeDataCacheUtil.clear_cache(self.__buy_cancel_single_pos_cache, code)
        RedisUtils.delete_async(self.__db, "buy_cancel_single_pos-{}".format(code))
        RedisUtils.setex_async(self.__db, _key, expire, data_.to_json_str())
# 清除l2数据
@@ -191,4 +197,13 @@
if __name__ == "__main__":
    TradePointManager().get_buy_compute_start_data_cache("603912")
    code = "002886"
    TradePointManager().set_buy_compute_start_data_v2(code, OrderBeginPosInfo(buy_single_index=10,
                                                                              buy_exec_index=30,
                                                                              buy_compute_index=40,
                                                                              num=20000, count=10,
                                                                              buy_volume_rate=0.6,
                                                                              mode=OrderBeginPosInfo.MODE_NORMAL,
                                                                              ))
    print( TradePointManager().get_buy_compute_start_data_cache(code).max_num_set)
    RedisUtils.run_loop()
l2/l2_data_manager_new.py
@@ -1,6 +1,4 @@
import io
import logging
import threading
import time as t
from code_attribute import big_money_num_manager, code_volumn_manager, code_data_util, industry_codes_sort, \
@@ -8,6 +6,7 @@
import constant
from db.redis_manager_delegate import RedisUtils
from l2.huaxin import l2_huaxin_util, huaxin_delegate_postion_manager
from l2.l2_sell_manager import L2MarketSellManager
from log_module import async_log_util, log_export
from third_data import kpl_data_manager, block_info
from utils import global_util, ths_industry_util, tool
@@ -16,21 +15,17 @@
from third_data.code_plate_key_manager import CodePlateKeyBuyManager, KPLCodeJXBlockManager
from trade import trade_manager, trade_queue_manager, l2_trade_factor, l2_trade_util, \
    trade_result_manager, current_price_process_manager, trade_data_manager, trade_huaxin, trade_record_log_util
from l2 import safe_count_manager, l2_data_manager, l2_log, l2_data_source_util, code_price_manager, \
from l2 import l2_data_manager, l2_log, l2_data_source_util, code_price_manager, \
    transaction_progress, cancel_buy_strategy, l2_data_log
from l2.cancel_buy_strategy import SecondCancelBigNumComputer, HourCancelBigNumComputer, DCancelBigNumComputer, \
    LCancelBigNumComputer, LatestCancelIndexManager
from l2.l2_data_manager import L2DataException
    LCancelBigNumComputer, LatestCancelIndexManager, FastCancelBigNumComputer
from l2.l2_data_manager import L2DataException, OrderBeginPosInfo
from l2.l2_data_util import local_today_datas, L2DataUtil, local_today_num_operate_map, local_today_buyno_map, \
    local_latest_datas, local_today_canceled_buyno_map
import l2.l2_data_util
from log_module.log import logger_l2_trade_buy, logger_l2_process, logger_l2_error, logger_profile, logger_debug
from log_module.log import logger_l2_trade_buy, logger_l2_process, logger_l2_error, logger_debug
from trade.trade_data_manager import CodeActualPriceProcessor
from line_profiler import LineProfiler
import dask
from trade.trade_manager import TradeTargetCodeModeManager, AccountAvailableMoneyManager
@@ -234,6 +229,7 @@
    __TradeTargetCodeModeManager = TradeTargetCodeModeManager()
    __TradeOrderIdManager = trade_huaxin.TradeOrderIdManager()
    __LatestCancelIndexManager = LatestCancelIndexManager()
    __L2MarketSellManager = L2MarketSellManager()
    # 获取代码评分
    @classmethod
@@ -289,7 +285,11 @@
    @classmethod
    def set_real_place_order_index(cls, code, index, buy_single_index):
        trade_record_log_util.add_real_place_order_position_log(code, index, buy_single_index)
        cancel_buy_strategy.set_real_place_position(code, index, buy_single_index)
        need_cancel = FastCancelBigNumComputer().set_real_order_index(code, index)
        if need_cancel:
            cls.cancel_buy(code, msg="F撤不够2笔触发撤单")
        else:
            cancel_buy_strategy.set_real_place_position(code, index, buy_single_index)
    # 处理华鑫L2数据
    @classmethod
@@ -336,14 +336,21 @@
            l2_data_log.l2_time_log(code, "process_add_datas 加载完数据")
            if constant.L2_SOURCE_TYPE == constant.L2_SOURCE_TYPE_HUAXIN:
                try:
                    # 获取下单位置
                    place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, float(
                        gpcode_manager.get_limit_up_price(code)), add_datas)
                    if place_order_index:
                        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
                            code)
                        cls.set_real_place_order_index(code, place_order_index, buy_single_index)
                        async_log_util.info(logger_l2_process, "code:{} 获取到下单真实位置:{}", code, place_order_index)
                    if constant.TEST:
                        pass
                        # order_begin_pos = cls.__get_order_begin_pos(code)
                        # if order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index>=0:
                        #     place_order_index = add_datas[-1]["index"]
                        #     cls.set_real_place_order_index(code, place_order_index, order_begin_pos.buy_single_index)
                    else:
                        # 获取下单位置
                        place_order_index = huaxin_delegate_postion_manager.get_l2_place_order_position(code, float(
                            gpcode_manager.get_limit_up_price(code)), add_datas)
                        if place_order_index:
                            order_begin_pos = cls.__get_order_begin_pos(
                                code)
                            cls.set_real_place_order_index(code, place_order_index, order_begin_pos.buy_single_index)
                            async_log_util.info(logger_l2_process, "code:{} 获取到下单真实位置:{}", code, place_order_index)
                except:
                    async_log_util.error(logger_l2_error, f"{code} 处理真实下单位置出错")
            # 第1条数据是否为09:30:00
@@ -462,18 +469,35 @@
                                                                                          _buy_exec_index, start_index,
                                                                                          end_index, total_data,
                                                                                          code_volumn_manager.get_volume_rate_index(
                                                                                              buy_volume_rate),
                                                                                              order_begin_pos.buy_volume_rate),
                                                                                          cls.volume_rate_info[code][1],
                                                                                          is_first_code)
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, "H撤销比例触发阈值"
            except Exception as e:
                if constant.TEST:
                    logging.exception(e)
                async_log_util.error(logger_l2_error,
                                     f"H撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} {str(e)}")
                async_log_util.exception(logger_l2_error, e)
            finally:
                # l2_data_log.l2_time(code, round(t.time() * 1000) - _start_time, "已下单-H撤大单计算")
                pass
            return None, ""
        # F撤
        def f_cancel(_buy_single_index, _buy_exec_index):
            try:
                b_need_cancel, b_cancel_data = FastCancelBigNumComputer().need_cancel(code, start_index, end_index,
                                                                                      order_begin_pos)
                if b_need_cancel and b_cancel_data:
                    return b_cancel_data, f"F撤"
            except Exception as e:
                if constant.TEST:
                    logging.exception(e)
                async_log_util.error(logger_l2_error,
                                     f"F撤出错 参数:buy_single_index-{_buy_single_index} buy_exec_index-{_buy_exec_index} 错误原因:{str(e)}")
                async_log_util.exception(logger_l2_error, e)
            return None, ""
        # L撤
@@ -505,19 +529,23 @@
        total_data = local_today_datas.get(code)
        _start_time = tool.get_now_timestamp()
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
        order_begin_pos = cls.__get_order_begin_pos(
            code)
        # 默认量为0.2
        if buy_volume_rate is None:
        if order_begin_pos.buy_volume_rate is None:
            buy_volume_rate = 0.2
        cancel_data, cancel_msg = None, ""
        if order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST:
            cancel_data, cancel_msg = f_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index)
        # 依次处理
        cancel_data, cancel_msg = l_cancel(buy_single_index, buy_exec_index)
        if not cancel_data:
            cancel_data, cancel_msg = l_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index)
        # 暂时取消S撤
        # if not cancel_data:
        #     cancel_data, cancel_msg = s_cancel(buy_single_index, buy_exec_index)
        if not cancel_data:
            cancel_data, cancel_msg = h_cancel(buy_single_index, buy_exec_index)
            cancel_data, cancel_msg = h_cancel(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index)
        if cancel_data:
            l2_log.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
@@ -542,13 +570,14 @@
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
        order_begin_pos = cls.__get_order_begin_pos(
            code)
        if not can:
            l2_log.debug(code, "不可以下单,原因:{}", reason)
            trade_record_log_util.add_cant_place_order_log(code, reason)
            if need_clear_data:
                trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index,
                trade_result_manager.real_cancel_success(code, order_begin_pos.buy_single_index,
                                                         order_begin_pos.buy_exec_index,
                                                         local_today_datas.get(code))
            return False
        else:
@@ -556,12 +585,12 @@
            try:
                l2_log.debug(code, "开始执行买入")
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
                                        last_data_index, order_begin_pos.mode)
                l2_log.debug(code, "执行买入成功")
                ################下单成功处理################
                trade_result_manager.real_buy_success(code, cls.__TradePointManager)
                cancel_buy_strategy.set_real_place_position(code, local_today_datas.get(code)[-1]["index"],
                                                            buy_single_index)
                                                            order_begin_pos.buy_single_index)
                l2_log.debug(code, "处理买入成功")
                params_desc = cls.__l2PlaceOrderParamsManagerDict[code].get_buy_rank_desc()
                l2_log.debug(code, params_desc)
@@ -570,7 +599,7 @@
                    jx_blocks, jx_blocks_by = KPLCodeJXBlockManager().get_jx_blocks_cache(
                        code), KPLCodeJXBlockManager().get_jx_blocks_cache(code, by=True)
                    info = cls.__trade_log_placr_order_info_dict[code]
                    info.set_buy_index(buy_single_index, buy_exec_index)
                    info.set_buy_index(order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index)
                    if jx_blocks:
                        info.set_kpl_blocks(list(jx_blocks))
                    elif jx_blocks_by:
@@ -666,10 +695,9 @@
                if sell1_time is not None and sell1_volumn > 0:
                    # 获取执行位信息
                    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
                        code)
                    buy_nums = num
                    for i in range(buy_exec_index + 1, total_datas[-1]["index"] + 1):
                    order_begin_pos = cls.__get_order_begin_pos(code)
                    buy_nums = order_begin_pos.num
                    for i in range(order_begin_pos.buy_exec_index + 1, total_datas[-1]["index"] + 1):
                        _val = total_datas[i]["val"]
                        # 涨停买
                        if L2DataUtil.is_limit_up_price_buy(_val):
@@ -783,11 +811,11 @@
                return False, True, f"尚未获取到当前成交价"
            if float(limit_up_price) - float(trade_price) > 0.00001:
                # 计算信号起始位置到当前的手数
                buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
                order_begin_pos = cls.__get_order_begin_pos(
                    code)
                num_operate_map = local_today_num_operate_map.get(code)
                total_num = 0
                for i in range(buy_single_index, total_data[-1]["index"] + 1):
                for i in range(order_begin_pos.buy_single_index, total_data[-1]["index"] + 1):
                    data = total_data[i]
                    val = data["val"]
                    if not L2DataUtil.is_limit_up_price_buy(val):
@@ -934,20 +962,21 @@
    @classmethod
    def cancel_buy(cls, code, msg=None, source="l2", cancel_index=None):
        # 是否是交易队列触发
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
        order_begin_pos = cls.__get_order_begin_pos(
            code)
        total_datas = local_today_datas[code]
        if source == "trade_queue":
            # 交易队列触发的需要下单后5s
            if buy_exec_index is not None and buy_exec_index > 0:
            if order_begin_pos.buy_exec_index is not None and order_begin_pos.buy_exec_index > 0:
                now_time_str = tool.get_now_time_str()
                if tool.trade_time_sub(now_time_str, total_datas[buy_exec_index]["val"]["time"]) < 5:
                if tool.trade_time_sub(now_time_str, total_datas[order_begin_pos.buy_exec_index]["val"]["time"]) < 5:
                    return False
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
            # 取消买入标识
            trade_result_manager.virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas)
            trade_result_manager.virtual_cancel_success(code, order_begin_pos.buy_single_index,
                                                        order_begin_pos.buy_exec_index, total_datas)
        else:
            can_cancel, reason = cls.__can_cancel(code)
            if not can_cancel:
@@ -962,7 +991,8 @@
            trade_record_log_util.add_cancel_msg_log(code, msg)
            cancel_result = cls.__cancel_buy(code)
            if cancel_result:
                trade_result_manager.real_cancel_success(code, buy_single_index, buy_exec_index, total_datas)
                trade_result_manager.real_cancel_success(code, order_begin_pos.buy_single_index,
                                                         order_begin_pos.buy_exec_index, total_datas)
        l2_log.debug(code, "执行撤单结束,原因:{}", msg)
        return True
@@ -993,42 +1023,55 @@
        total_datas = local_today_datas[code]
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, buy_volume_rate = cls.__get_order_begin_pos(
        order_begin_pos = cls.__get_order_begin_pos(
            code)
        # 是否为新获取到的位置
        new_get_single = False
        buy_single_index = order_begin_pos.buy_single_index
        if buy_single_index is None:
            continue_count = cls.__l2PlaceOrderParamsManagerDict[code].get_begin_continue_buy_count()
            # 有买入信号
            has_single, _index = cls.__compute_order_begin_pos(code, max(
                (compute_start_index - continue_count - 1) if new_add else compute_start_index, 0), continue_count,
                                                               compute_end_index)
            # 尝试计算快速成交信号
            has_single, _index, sell_info = cls.__compute_fast_order_begin_pos(code, compute_start_index,
                                                                               compute_end_index)
            if has_single:
                order_begin_pos.mode = OrderBeginPosInfo.MODE_FAST
                order_begin_pos.sell_info = sell_info
            elif _index is not None and _index < 0:
                continue_count = cls.__l2PlaceOrderParamsManagerDict[code].get_begin_continue_buy_count()
                # 有买入信号
                has_single, _index = cls.__compute_order_begin_pos(code, max(
                    (compute_start_index - continue_count - 1) if new_add else compute_start_index, 0), continue_count,
                                                                   compute_end_index)
                order_begin_pos.mode = OrderBeginPosInfo.MODE_NORMAL
            # 如果买入信号与上次的买入信号一样就不能算新的信号
            if cls.__last_buy_single_dict.get(code) == _index:
                has_single = None
                _index = None
            if _index == 106:
                print("进入调试")
            buy_single_index = _index
            if has_single:
                cls.__last_buy_single_dict[code] = buy_single_index
                new_get_single = True
                num = 0
                count = 0
                l2_log.debug(code, "获取到买入信号起始点:{} ,计算范围:{}-{} ,量比:{},数据:{}", buy_single_index, compute_start_index,
                             compute_end_index, cls.volume_rate_info[code], total_datas[buy_single_index])
                order_begin_pos.num = 0
                order_begin_pos.count = 0
                order_begin_pos.buy_single_index = buy_single_index
                if sell_info:
                    order_begin_pos.threshold_money = sell_info[1]
                l2_log.debug(code, "获取到买入信号起始点:{} ,计算范围:{}-{} ,量比:{},数据:{} 模式:{}", buy_single_index,
                             compute_start_index,
                             compute_end_index, cls.volume_rate_info[code], total_datas[buy_single_index],
                             order_begin_pos.mode)
        # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "下单信号计算时间")
        if buy_single_index is None:
        if order_begin_pos.buy_single_index is None:
            # 未获取到买入信号,终止程序
            return None
        # 开始计算的位置
        start_process_index = max(buy_single_index, compute_start_index)
        start_process_index = max(order_begin_pos.buy_single_index, compute_start_index)
        if new_get_single:
            start_process_index = buy_single_index
            start_process_index = order_begin_pos.buy_single_index
        # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "计算m值大单")
@@ -1037,16 +1080,30 @@
        # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "m值阈值计算")
        # 买入纯买额统计
        new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(code,
                                                                                                                  start_process_index,
                                                                                                                  compute_end_index,
                                                                                                                  num,
                                                                                                                  count,
                                                                                                                  threshold_money,
                                                                                                                  buy_single_index,
                                                                                                                  max_num_set)
        # _start_time = l2_data_log.l2_time(code, tool.get_now_timestamp() - _start_time, "纯买额统计时间")
        new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = None, None, None, None, []
        if order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST:
            threshold_money = order_begin_pos.threshold_money
            new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, threshold_money_new = cls.__sum_buy_num_for_order_fast(
                code,
                start_process_index,
                compute_end_index,
                order_begin_pos.num,
                order_begin_pos.count,
                threshold_money,
                order_begin_pos.buy_single_index)
            threshold_money = threshold_money_new
            order_begin_pos.threshold_money = threshold_money
        else:
            new_buy_exec_index, buy_nums, buy_count, rebegin_buy_pos, max_num_set_new = cls.__sum_buy_num_for_order_3(
                code,
                start_process_index,
                compute_end_index,
                order_begin_pos.num,
                order_begin_pos.count,
                threshold_money,
                order_begin_pos.buy_single_index,
                order_begin_pos.max_num_set)
        l2_log.debug(code, "m值-{} 量比:{} rebegin_buy_pos:{}", threshold_money, cls.volume_rate_info[code][0],
                     rebegin_buy_pos)
@@ -1058,14 +1115,20 @@
            return
        if new_buy_exec_index is not None:
            l2_log.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 纯买单数:{} 数据:{} ,量比:{} ", new_buy_exec_index, threshold_money,
            l2_log.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 纯买单数:{} 数据:{} ,量比:{} ,下单模式:{}", new_buy_exec_index,
                         threshold_money,
                         buy_nums,
                         buy_count, total_datas[new_buy_exec_index], cls.volume_rate_info[code])
            cls.__save_order_begin_data(code, buy_single_index, new_buy_exec_index, new_buy_exec_index,
                                        buy_nums, buy_count, max_num_set_new,
                                        cls.volume_rate_info[code][0])
                         buy_count, total_datas[new_buy_exec_index], cls.volume_rate_info[code], order_begin_pos.mode)
            cls.__save_order_begin_data(code, OrderBeginPosInfo(buy_single_index=buy_single_index,
                                                                buy_exec_index=new_buy_exec_index,
                                                                buy_compute_index=new_buy_exec_index,
                                                                num=buy_nums, count=buy_count,
                                                                max_num_set=max_num_set_new,
                                                                buy_volume_rate=cls.volume_rate_info[code][0],
                                                                mode=order_begin_pos.mode,
                                                                sell_info=order_begin_pos.sell_info,
                                                                threshold_money=threshold_money))
            cls.__LimitUpTimeManager.save_limit_up_time(code, total_datas[new_buy_exec_index]["val"]["time"])
            cls.__TradePointManager.delete_buy_cancel_point(code)
            l2_log.debug(code, "delete_buy_cancel_point")
            # 直接下单
            ordered = cls.__buy(code, capture_time, total_datas[-1], total_datas[-1]["index"], is_first_code)
@@ -1082,25 +1145,26 @@
        else:
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, -1, compute_end_index, buy_nums, buy_count,
                                        max_num_set_new, None)
            cls.__save_order_begin_data(code, OrderBeginPosInfo(buy_single_index=buy_single_index, buy_exec_index=-1,
                                                                buy_compute_index=compute_end_index, num=buy_nums,
                                                                count=buy_count,
                                                                max_num_set=max_num_set_new, mode=order_begin_pos.mode,
                                                                sell_info=order_begin_pos.sell_info,
                                                                threshold_money=threshold_money))
            _start_time = t.time()
        l2_data_log.l2_time_log(code, "__start_compute_buy 结束")
    # 获取下单起始信号
    @classmethod
    def __get_order_begin_pos(cls, code):
        buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = cls.__TradePointManager.get_buy_compute_start_data_cache(
    def __get_order_begin_pos(cls, code) -> OrderBeginPosInfo:
        order_begin_pos = cls.__TradePointManager.get_buy_compute_start_data_cache(
            code)
        return buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate
        return order_begin_pos
    # 保存下单起始信号
    @classmethod
    def __save_order_begin_data(cls, code, buy_single_index, buy_exec_index, compute_index, num, count, max_num_set,
                                volume_rate):
        cls.__TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num,
                                                           count,
                                                           max_num_set, volume_rate)
    def __save_order_begin_data(cls, code, info: OrderBeginPosInfo):
        cls.__TradePointManager.set_buy_compute_start_data_v2(code, info)
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
@@ -1157,6 +1221,63 @@
                start = None
        return False, None
    # 快速买入法的信号位置查找
    @classmethod
    def __compute_fast_order_begin_pos(cls, code, start_index, end_index):
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if float(limit_up_price) >= 10:
            return False, -1, "股价大于10块"
        total_datas = local_today_datas[code]
        start_time_str = total_datas[start_index]["val"]["time"]
        if tool.trade_time_sub(start_time_str, "10:00:00") > 0:
            return False, -1, "超过规定时间"
        refer_sell_data = cls.__L2MarketSellManager.get_refer_sell_data(code, start_time_str)
        if refer_sell_data is None:
            return False, -1, "总卖为空"
        if cls.__L2MarketSellManager.is_refer_sell_time_used(code, refer_sell_data[0]):
            return False, -1, "总卖统计时间已被使用"
        # 是否大于500万
        if refer_sell_data[1] < 500 * 10000:
            return False, -1, "总卖小于指定金额"
        # 统计之前的卖
        threshold_money = refer_sell_data[1]
        for i in range(start_index - 1, -1, -1):
            val = total_datas[i]["val"]
            if tool.compare_time(val["time"], refer_sell_data[0]) <= 0:
                break
            if L2DataUtil.is_sell(val):
                threshold_money += val["num"] * int(float(val["price"]) * 100)
            elif L2DataUtil.is_sell_cancel(val):
                threshold_money -= val["num"] * int(float(val["price"]) * 100)
        # 是否为本秒的第一个涨停买
        for i in range(start_index, end_index + 1):
            data = total_datas[i]
            val = data['val']
            if not L2DataUtil.is_limit_up_price_buy(val):
                # 要统计卖与卖撤
                if L2DataUtil.is_sell(val):
                    threshold_money += val["num"] * int(float(val["price"]) * 100)
                elif L2DataUtil.is_sell_cancel(val):
                    threshold_money -= val["num"] * int(float(val["price"]) * 100)
                continue
            # 50 万以下的不需要
            if val["num"] * float(val["price"]) < 5000:
                continue
            # 是否为本s的第一次涨停
            is_first_limit_up = True
            for j in range(i - 1, -1, -1):
                temp_val = total_datas[j]["val"]
                if temp_val["time"] == val["time"]:
                    if L2DataUtil.is_limit_up_price_buy(temp_val) and temp_val["num"] * float(
                            temp_val["price"]) >= 5000:
                        is_first_limit_up = True
                        break
                else:
                    break
            if is_first_limit_up:
                return True, i, [refer_sell_data[0], threshold_money]
        return False, None, None
    @classmethod
    def __get_threshmoney(cls, code):
@@ -1296,6 +1417,105 @@
        return None, buy_nums, buy_count, None, max_buy_num_set
    # 返回(买入执行点, 总手, 总笔数, 从新计算起点, 纯买额阈值)
    # 计算快速买入
    @classmethod
    def __sum_buy_num_for_order_fast(cls, code, compute_start_index, compute_end_index, origin_num, origin_count,
                                     threshold_money_origin, buy_single_index):
        _start_time = t.time()
        total_datas = local_today_datas[code]
        # is_first_code = gpcode_manager.FirstCodeManager().is_in_first_record_cache(code)
        buy_nums = origin_num
        buy_count = origin_count
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        limit_up_price = float(limit_up_price)
        threshold_money = threshold_money_origin
        # 目标手数
        threshold_num = round(threshold_money / (limit_up_price * 100))
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
        # 可以触发买,当有涨停买信号时才会触发买
        trigger_buy = True
        # 间隔最大时间为3s
        max_space_time = 3
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
            trigger_buy = False
            # 必须为连续2秒内的数据
            if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds + 1 > max_space_time:
                cls.__TradePointManager.delete_buy_point(code)
                if i == compute_end_index:
                    # 数据处理完毕
                    return None, buy_nums, buy_count, None, threshold_money
                else:
                    # 计算买入信号,不能同一时间开始计算
                    for ii in range(buy_single_index + 1, compute_end_index + 1):
                        if total_datas[buy_single_index]["val"]["time"] != total_datas[ii]["val"]["time"]:
                            return None, buy_nums, buy_count, ii, threshold_money
            if L2DataUtil.is_sell(_val):
                threshold_money += _val["num"] * int(float(_val["price"]) * 100)
                threshold_num = round(threshold_money / (limit_up_price * 100))
            elif L2DataUtil.is_sell_cancel(_val):
                threshold_money -= _val["num"] * int(float(_val["price"]) * 100)
                threshold_num = round(threshold_money / (limit_up_price * 100))
            # 涨停买
            elif L2DataUtil.is_limit_up_price_buy(_val):
                trigger_buy = True
                # 只统计59万以上的金额
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                buy_count += int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    async_log_util.info(logger_l2_trade_buy,
                                        f"{code}获取到买入执行点(快速买入):{i} 统计纯买手数:{buy_nums} 目标纯买手数:{threshold_num} 统计纯买单数:{buy_count}")
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 判断买入位置是否在买入信号之前
                buy_index = l2_data_source_util.L2DataSourceUtils.get_buy_index_with_cancel_data_v2(total_datas[i],
                                                                                                    local_today_buyno_map.get(
                                                                                                        code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index >= buy_single_index:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        buy_count -= int(data["re"])
                        l2_log.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        l2_log.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                        if total_datas[buy_single_index]["val"]["time"] == total_datas[buy_index]["val"]["time"]:
                            # 同一秒,当作买入信号之后处理
                            buy_nums -= int(_val["num"]) * int(data["re"])
                            buy_count -= int(data["re"])
                            # 大单撤销
                            l2_log.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # 未找到买撤数据的买入点
                    l2_log.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                    buy_count -= int(total_datas[i]["re"])
            l2_log.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                             buy_nums, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num and trigger_buy:
                try:
                    info = cls.__trade_log_placr_order_info_dict[code]
                    info.set_trade_factor(threshold_money, 0, [])
                except Exception as e:
                    async_log_util.error(logger_l2_error, f"记录交易因子出错:{str(e)}")
                return i, buy_nums, buy_count, None, threshold_money
        l2_log.buy_debug(code, "尚未获取到买入执行点(快速买入),起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{}",
                         compute_start_index,
                         buy_nums,
                         threshold_num, buy_count)
        return None, buy_nums, buy_count, None, threshold_money
def test_trade_record():
    code = "000333"
@@ -1328,8 +1548,10 @@
    if datas is None:
        datas = []
    l2.l2_data_util.local_today_datas[code] = datas[:191]
    l2.l2_data_util.load_buy_no_map(l2.l2_data_util.local_today_buyno_map,code, l2.l2_data_util.local_today_datas[code])
    l2.l2_data_util.load_canceled_buy_no_map(l2.l2_data_util.local_today_canceled_buyno_map, code, l2.l2_data_util.local_today_datas[code])
    l2.l2_data_util.load_buy_no_map(l2.l2_data_util.local_today_buyno_map, code,
                                    l2.l2_data_util.local_today_datas[code])
    l2.l2_data_util.load_canceled_buy_no_map(l2.l2_data_util.local_today_canceled_buyno_map, code,
                                             l2.l2_data_util.local_today_datas[code])
    start_index = 73
    end_index = 190
    LCancelBigNumComputer().compute_watch_index(code, start_index, end_index)
l2/l2_log.py
@@ -1,6 +1,6 @@
from log_module import async_log_util
from log_module.log import logger_l2_trade_cancel, logger_l2_trade_buy, logger_trade_record, logger_l2_trade, \
    logger_l2_s_cancel, logger_l2_h_cancel, logger_l2_l_cancel, logger_l2_error, logger_l2_d_cancel
    logger_l2_s_cancel, logger_l2_h_cancel, logger_l2_l_cancel, logger_l2_error, logger_l2_d_cancel, logger_l2_f_cancel
threadIds = {}
@@ -44,6 +44,8 @@
def d_cancel_debug(code, content, *args):
    __add_async_log(logger_l2_d_cancel, code, content, *args)
def f_cancel_debug(code, content, *args):
    __add_async_log(logger_l2_f_cancel, code, content, *args)
# 交易记录
def trade_record(code, type, content, *args):
l2/l2_sell_manager.py
@@ -7,6 +7,8 @@
from db.redis_manager import RedisUtils
from log_module import async_log_util
from log_module.log import logger_l2_market_sell
from utils import tool
from utils.tool import CodeDataCacheUtil
class L2MarketSellManager:
@@ -15,6 +17,7 @@
    __instance = None
    __current_total_sell_data_cache = {}
    __last_total_sell_data_cache = {}
    __used_refer_sell_data_cache = {}
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
@@ -30,9 +33,31 @@
    def __load_datas(cls):
        __redis = cls.__get_redis()
        try:
            pass
            keys = RedisUtils.keys(__redis, "fast_buy_used_sell_data-*")
            for k in keys:
                code = k.split("-")[-1]
                val = RedisUtils.smembers(__redis, k)
                CodeDataCacheUtil.set_cache(cls.__used_refer_sell_data_cache, code, val)
        finally:
            RedisUtils.realse(__redis)
    def set_sell_time_used(self, code, time_str):
        if code not in self.__used_refer_sell_data_cache:
            self.__used_refer_sell_data_cache[code] = set()
        self.__used_refer_sell_data_cache[code].add(time_str)
        RedisUtils.sadd_async(self.__db, f"fast_buy_used_sell_data-{code}", time_str)
        RedisUtils.expire_async(self.__db, f"fast_buy_used_sell_data-{code}", tool.get_expire())
    def is_refer_sell_time_used(self, code, time_str):
        if code not in self.__used_refer_sell_data_cache:
            return False
        return time_str in self.__used_refer_sell_data_cache[code]
    def clear(self):
        self.__used_refer_sell_data_cache.clear()
        keys = RedisUtils.keys(self.__get_redis(), "fast_buy_used_sell_data-*")
        for k in keys:
            RedisUtils.delete_async(self.__db, k)
    # 设置当前的总卖
    def set_current_total_sell_data(self, code, time_str, money):
@@ -40,14 +65,20 @@
        async_log_util.info(logger_l2_market_sell, f"{code}: {time_str}-{money}")
        if code in self.__current_total_sell_data_cache:
            self.__last_total_sell_data_cache[code] = self.__current_total_sell_data_cache.get(code)
        self.__current_total_sell_data_cache[code] = (time_str, money)
        self.__current_total_sell_data_cache[code] = (time_str, round(money))
    # 获取参考卖的数据
    def get_refer_sell_data(self, code, time_str):
        cuurent = self.__current_total_sell_data_cache.get(code)
        if cuurent is None:
            return None
        if int(time_str.replace(":", "")) > int(cuurent[0].replace(":", "")):
            return cuurent
        last = self.__last_total_sell_data_cache.get(code)
        if int(time_str.replace(":", "")) > int(last[0].replace(":", "")):
            return last
        return None
if __name__ == "__main__":
    pass
l2/l2_transaction_data_manager.py
@@ -73,7 +73,7 @@
                    LCancelRateManager().set_big_num_deal_rate(code, rate)
                    # 获取执行位时间
            buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(code)
            order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(code)
            if buy_progress_index is not None:
                cls.__TradeBuyQueue.set_traded_index(code, buy_progress_index)
                async_log_util.info(logger_l2_trade_buy_queue, "获取成交位置成功: code-{} index-{}", code,
@@ -95,11 +95,11 @@
                SecondCancelBigNumComputer().set_transaction_index(
                    code,
                    buy_progress_index)
                if buy_exec_index and buy_exec_index > -1:
                if order_begin_pos and order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index > -1:
                    HourCancelBigNumComputer().set_transaction_index(code, buy_progress_index)
            else:
                pass
            if buy_exec_index and buy_exec_index > -1:
            if order_begin_pos and order_begin_pos.buy_exec_index and order_begin_pos.buy_exec_index > -1:
                # 触发L撤上重新计算
                LCancelBigNumComputer().re_compute_l_up_watch_indexes(code)
log_module/log.py
@@ -72,6 +72,10 @@
                   filter=lambda record: record["extra"].get("name") == "d_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "cancel/f_cancel"),
                   filter=lambda record: record["extra"].get("name") == "f_cancel",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_trade_buy"),
                   filter=lambda record: record["extra"].get("name") == "l2_trade_buy",
                   rotation="00:00", compression="zip", enqueue=True)
@@ -297,6 +301,7 @@
logger_l2_s_cancel = __mylogger.get_logger("s_cancel")
logger_l2_h_cancel = __mylogger.get_logger("h_cancel")
logger_l2_d_cancel = __mylogger.get_logger("d_cancel")
logger_l2_f_cancel = __mylogger.get_logger("f_cancel")
logger_l2_l_cancel = __mylogger.get_logger("l_cancel")
logger_l2_g_cancel = __mylogger.get_logger("g_cancel")
logger_l2_trade_buy = __mylogger.get_logger("l2_trade_buy")
log_module/log_export.py
@@ -350,6 +350,27 @@
    return fdatas
# 加载华鑫本地买入订单号
def load_l2_market_data():
    path = f"{constant.get_path_prefix()}/logs/huaxin/l2/marketdata.{tool.get_now_date_str()}.log"
    fdatas = {}
    if os.path.exists(path):
        with open(path, 'r', encoding="utf-8") as f:
            lines = f.readlines()
            for line in lines:
                if line:
                    data = line.split(" - ")[1].strip()
                    if data.startswith("["):
                        data = data[data.find("]") + 1:].strip()
                    code = data.split("#")[0]
                    d = data.split("#")[1].strip()
                    d = eval(d)
                    if code not in fdatas:
                        fdatas[code] = []
                    fdatas[code].append(d)
    return fdatas
# 读取系统日志
def load_system_log():
    path = f"{constant.get_path_prefix()}/logs/gp/system/system.{tool.get_now_date_str()}.log"
@@ -372,7 +393,7 @@
if __name__ == '__main__':
    datas = load_buy_score_recod("000333")
    datas = load_l2_market_data()
    for d in datas:
        print(d)
    # print(get_h_cancel_compute_info("603912"))
output/code_info_output.py
@@ -175,23 +175,23 @@
                params["trade_data"]["trade_progress"] = trade_progress_datas
        # 买入信号
        buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
        order_begin_pos = l2_data_manager.TradePointManager().get_buy_compute_start_data_cache(
            code)
        if buy_single_index is None:
        if order_begin_pos.buy_single_index is None:
            # buy_params_info.append("无信号")
            pass
        else:
            data = total_datas[buy_single_index]
            data = total_datas[order_begin_pos.buy_single_index]
            params["trade_data"]["buy_single"] = {"time": data['val']['time'], "num": data['val']['num'],
                                                  "money": round(data['val']['num'] * float(
                                                      data['val']['price']) * 100 / 10000, 1)}
        if buy_exec_index is None or buy_exec_index < 0:
        if order_begin_pos.buy_exec_index is None or order_begin_pos.buy_exec_index < 0:
            # buy_params_info.append("未下单")
            pass
        else:
            data = total_datas[buy_exec_index]
            data = total_datas[order_begin_pos.buy_exec_index]
            params["trade_data"]["buy_exec"] = {"time": data['val']['time'], "num": data['val']['num'],
                                                "money": round(data['val']['num'] * float(
                                                    data['val']['price']) * 100 / 10000, 1)}
test/l2_trade_test.py
@@ -13,6 +13,7 @@
from code_attribute import big_money_num_manager, gpcode_manager
from db.redis_manager_delegate import RedisUtils
from l2.huaxin import huaxin_delegate_postion_manager
from l2.l2_sell_manager import L2MarketSellManager
from log_module import log, log_export, async_log_util
from trade.huaxin import huaxin_trade_api
from utils import tool
@@ -84,10 +85,10 @@
                except Exception as e:
                    pass
    @unittest.skip("跳过此单元测试")
    def test_trade(self):
        threading.Thread(target=async_log_util.run_sync, daemon=True).start()
        code = "002771"
        code = "600203"
        clear_trade_data(code)
        l2.l2_data_util.load_l2_data(code)
        total_datas = deepcopy(l2.l2_data_util.local_today_datas[code])
@@ -101,6 +102,11 @@
                data = total_datas[0].copy()
                data["index"] = i
                total_datas.insert(i, data)
        l2_market_datas = log_export.load_l2_market_data()
        l2_market_datas = l2_market_datas.get(code)
        if l2_market_datas:
            l2_market_datas.reverse()
        pos_list = log_export.get_l2_process_position(code)
        # pos_list.insert(108,(375,448))
@@ -170,8 +176,16 @@
                    break
            print("----------------处理位置", indexs)
            if indexs[0] >= 661:
                print("进入调试")
            for l2_m in l2_market_datas:
                if l2_m["dataTimeStamp"] < int(total_datas[indexs[0]]["val"]["time"].replace(":","")):
                    time_str = f"{l2_m['dataTimeStamp']}"
                    if time_str.startswith("9"):
                        time_str = "0" + time_str
                    time_str = time_str[:6]
                    time_str = f"{time_str[0:2]}:{time_str[2:4]}:{time_str[4:6]}"
                    L2MarketSellManager().set_current_total_sell_data(code,time_str,l2_m["totalAskVolume"]*l2_m["avgAskPrice"])
                    break
            l2.l2_data_manager_new.L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0,
                                                                          0)
@@ -192,6 +206,7 @@
        l2.l2_data_util.local_today_datas[code] = total_datas
        l2.l2_data_util.load_num_operate_map(l2.l2_data_util.local_today_num_operate_map, code, total_datas, True)
    @unittest.skip("跳过此单元测试")
    def test_block(self):
        code = "000628"
        KPLCodeJXBlockManager().load_jx_blocks(code, 17.96, 17.96)
trade/huaxin/huaxin_trade_order_processor.py
@@ -120,14 +120,14 @@
                time.sleep(delay_s)
            new_place_order_index = cls.order_success(order_)
            if new_place_order_index:
                buy_single_index, buy_exec_index, compute_index, num, count, max_num_set, volume_rate = TradePointManager().get_buy_compute_start_data_cache(
                order_begin_pos = TradePointManager().get_buy_compute_start_data_cache(
                    order.code)
                cancel_buy_strategy.set_real_place_position(order.code,
                                                            new_place_order_index,
                                                            buy_single_index)
                                                            order_begin_pos.buy_single_index)
                trade_record_log_util.add_real_place_order_position_log(order.code,
                                                                        new_place_order_index,
                                                                        buy_single_index)
                                                                        order_begin_pos.buy_single_index)
            return new_place_order_index
        # 只处理买入单
trade/huaxin/huaxin_trade_server.py
@@ -337,7 +337,7 @@
        async_log_util.info(hx_logger_l2_market_data, f"{code}#{data}")
        L2MarketSellManager().set_current_total_sell_data(code, time_str,
                                                          data["L2MarketSellManager"] * data["L2MarketSellManager"])
                                                          data["totalAskVolume"] * data["avgAskPrice"])
    @classmethod
    def trading_order_canceled(cls, code, order_no):
trade/trade_manager.py
@@ -42,7 +42,7 @@
# 买成功
TRADE_STATE_BUY_SUCCESS = 12
guiTrade = None#trade_gui.THSGuiTrade() if trade_gui is not None else None
guiTrade = None  # trade_gui.THSGuiTrade() if trade_gui is not None else None
latest_trade_delegate_data = []
@@ -392,7 +392,7 @@
# 开始交易
def start_buy(code, capture_timestamp, last_data, last_data_index):
def start_buy(code, capture_timestamp, last_data, last_data_index, mode=0):
    def is_forbidden(code):
        if l2_trade_util.is_in_forbidden_trade_codes(code):
            return Exception("禁止交易")
@@ -432,7 +432,7 @@
            async_log_util.info(logger_trade, "{} trade.manager.start_buy 判断是否可买".format(code))
        __CodesTradeStateManager.set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER)
        # 状态改变过后必须要有本地下单编号
        __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index)
        __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index, mode)
    finally:
        async_log_util.info(logger_trade, "{} trade.manager.start_buy 结束".format(code))
@@ -444,12 +444,14 @@
# 购买
# @tool.async_call
def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index):
def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index, mode=0):
    async_log_util.info(logger_trade, "{} trade_manager.__buy 开始".format(code))
    try:
        if constant.API_TRADE_ENABLE:
            count = (constant.BUY_MONEY_PER_CODE // int(round(float(price) * 100))) * 100
            if mode != 0:
                count = 100
            # 最低下单1手
            if count < 100:
                count = 100
@@ -584,7 +586,6 @@
                CodesTradeStateManager().set_trade_state(code, TRADE_STATE_BUY_SUCCESS)
                # 删除买撤记录的临时信息
                kp_client_msg_manager.add_msg(code, "买入成交")
                l2_data_manager.TradePointManager().delete_buy_cancel_point(code)
                l2_data_manager.TradePointManager().delete_buy_point(code)
                # 移除交易窗口分配
                if trade_gui is not None:
trade/trade_result_manager.py
@@ -3,8 +3,10 @@
from l2 import l2_data_manager
from l2.cancel_buy_strategy import HourCancelBigNumComputer, SecondCancelBigNumComputer, \
    LCancelBigNumComputer, DCancelBigNumComputer
    LCancelBigNumComputer, DCancelBigNumComputer, FastCancelBigNumComputer
from l2.l2_data_manager import OrderBeginPosInfo
from l2.l2_data_util import local_today_datas, local_today_num_operate_map
from l2.l2_sell_manager import L2MarketSellManager
from log_module.log import logger_l2_error
from trade.trade_queue_manager import THSBuy1VolumnManager
@@ -23,7 +25,6 @@
# 虚拟撤成功
def virtual_cancel_success(code, buy_single_index, buy_exec_index, total_datas):
    l2_data_manager.TradePointManager().delete_buy_point(code)
    l2_data_manager.TradePointManager().delete_buy_cancel_point(code)
    SecondCancelBigNumComputer().cancel_success(code)
    DCancelBigNumComputer().cancel_success(code)
    LCancelBigNumComputer().cancel_success(code)
@@ -58,25 +59,36 @@
            logging.exception(e)
            logger_l2_error.exception(e)
    buy_single_index, buy_exec_index, buy_compute_index, num, count, max_num_set, volume_rate = tradePointManager.get_buy_compute_start_data_cache(
    def f_cancel(code):
        try:
            FastCancelBigNumComputer().place_order_success(code)
        except Exception as e:
            logging.exception(e)
            logger_l2_error.exception(e)
    order_begin_pos = tradePointManager.get_buy_compute_start_data_cache(
        code)
    clear_max_buy1_volume(code)
    s_cancel(code)
    # H撤暂时不生效
    h_cancel(code, buy_single_index, buy_exec_index)
    h_cancel(code, order_begin_pos.buy_single_index, order_begin_pos.buy_exec_index)
    l_cancel(code)
    tradePointManager.delete_buy_cancel_point(code)
    if order_begin_pos.mode == OrderBeginPosInfo.MODE_FAST:
        f_cancel(code)
        # 记录卖盘统计时间被用
        L2MarketSellManager().set_sell_time_used(code, order_begin_pos.sell_info[0])
        FastCancelBigNumComputer().set_trade_progress(code, order_begin_pos.buy_exec_index)
# 真实撤成功
def real_cancel_success(code, buy_single_index, buy_exec_index, total_datas):
    # 取消买入标识
    l2_data_manager.TradePointManager().delete_buy_point(code)
    l2_data_manager.TradePointManager().delete_buy_cancel_point(code)
    SecondCancelBigNumComputer().cancel_success(code)
    DCancelBigNumComputer().cancel_success(code)
    LCancelBigNumComputer().cancel_success(code)
    FastCancelBigNumComputer().cancel_success(code)
if __name__ == "__main__":
utils/tool.py
@@ -178,6 +178,10 @@
    return time_1 - time_2
def compare_time(time_1: str, time_2: str):
    return int(time_1.replace(":", "")) - int(time_2.replace(":", ""))
# 交易时间加几s
def trade_time_add_second(time_str, second):
    ts = time_str.split(":")
@@ -274,4 +278,3 @@
    except:
        pass
    return None