Administrator
2022-09-23 21d753614ea7bbe936b8560cbf466c4e438821b2
l2数据计算优化
11个文件已修改
1个文件已添加
385 ■■■■ 已修改文件
data_export_util.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
global_util.py 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_code_operate.py 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 195 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
limit_up_time_manager.py 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
redis_manager.py 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py
@@ -36,9 +36,22 @@
        ws.write(index, 2, cancel_time)
        ws.write(index, 3, data["val"]["price"])
        ws.write(index, 4, data["val"]["num"])
        limit_price=""
        if int(data["val"]["limitPrice"]) == 1:
            limit_price="涨停"
        elif int(data["val"]["limitPrice"]) == 2:
            limit_price="跌停"
        if int(data["val"]["operateType"]) == 0:
            if len(limit_price)>0:
                ws.write(index, 5, '买 ({})'.format(limit_price))
            else:
            ws.write(index, 5, '买')
        elif int(data["val"]["operateType"]) == 1:
            if len(limit_price) > 0:
                ws.write(index, 5, '买撤 ({})'.format(limit_price))
            else:
            ws.write(index, 5, '买撤')
        ws.write(index, 6, data["re"])
    wb.save(file_name)
global_util.py
@@ -1,10 +1,10 @@
# 代码行业映射
import pymongo
import code_volumn_manager
import ths_industry_util
import gpcode_manager
import mongo_data
import code_volumn_manager
import gpcode_manager
import ths_industry_util
TEST = False
gui.py
@@ -11,18 +11,15 @@
import win32gui
import data_export_util
import data_process
import juejin
import multiprocessing
import l2_code_operate
import l2_trade_factor
import redis_manager
import mongo_data
import server
import trade_gui
import authority
from l2_code_operate import L2CodeOperate
from l2_trade_factor import L2TradeFactorUtil
from server import *
@@ -629,7 +626,7 @@
            showinfo("提示", "导出完成")
        def compute_m(code):
            m = l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
            m = L2TradeFactorUtil.compute_m_value(code)
            showinfo("提示", "{}".format(m))
        frame = Frame(root, {"height": 280, "width": 300, "bg": "#DDDDDD"})
juejin.py
@@ -9,6 +9,7 @@
import big_money_num_manager
import code_volumn_manager
import data_process
import global_util
import gpcode_manager
import threading
@@ -165,13 +166,15 @@
def on_tick(context, tick):
    if global_util.TEST:
        return
    # print(tick["created_at"])
    relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60
    # 9点20-15:05接受数据
    start1 = 60 * 60 * 9 + 24 * 60;
    end1 = 60 * 60 * 11 + 35 * 60;
    start2 = 60 * 60 * 12 + 50 * 60;
    end2 = 60 * 60 * 15 + 5 * 60;
    start1 = 60 * 60 * 9 + 31 * 60
    end1 = 60 * 60 * 11 + 35 * 60
    start2 = 60 * 60 * 12 + 50 * 60
    end2 = 60 * 60 * 15 + 5 * 60
    # TODO 测试
    if (start1 < relative_timestamp < end1 or start2 < relative_timestamp < end2) or global_util.TEST:
        symbol = tick['symbol']
@@ -205,6 +208,54 @@
            # 移除监控
            if gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(code):
                L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
# 获取到现价
def accpt_prices(prices):
    now_str = datetime.datetime.now().strftime("%H:%M:%S")
    now_strs = now_str.split(":")
    now_second = int(now_strs[0]) * 60 * 60 + int(now_strs[1]) * 60 + int(now_strs[2])
    start = 60 * 60 * 9 + 31 * 60
    if now_second > start:
        for d in prices:
            code, price = d["code"], float(d["price"])
            accpt_price(code, price)
    else:
        _code_list = []
        _delete_list = []
        for d in prices:
            code, price = d["code"], float(d["price"])
            gpcode_manager.set_price(code, price)
            # 获取收盘价
            pricePre = gpcode_manager.get_price_pre(code)
            if pricePre is not None:
                rate = round((price - pricePre) * 100 / pricePre, 1)
                if rate >= 0:
                    _code_list.append((rate, code))
                else:
                    _delete_list.append((rate, code))
        # 排序
        new_code_list = sorted(_code_list, key=lambda e: e.__getitem__(0), reverse=True)
        client_ids = data_process.getValidL2Clients()
        max_count = len(client_ids) * 8
        add_list = new_code_list[:max_count]
        _delete_list.extend(new_code_list[max_count:])
        add_code_list = []
        del_list = []
        for d in add_list:
            add_code_list.append(d[1])
        for d in _delete_list:
            del_list.append(d[1])
        for code in add_code_list:
            L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
        for code in del_list:
            L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
        print(add_code_list, del_list)
def on_bar(context, bars):
@@ -325,4 +376,5 @@
if __name__ == '__main__':
    everyday_init()
    accpt_prices_before_open([{"code": "001332", "price": "82.37"}, {"code": "002246", "price": "10.10"},
                              {"code": "600537", "price": "6.65"}])
l2_code_operate.py
@@ -74,6 +74,12 @@
                    data = json.loads(data)
                    logger_code_operate.info("读取操作队列:{}", data)
                    type, code = data["type"], data["code"]
                    create_time = data.get("create_time")
                    if create_time is not None:
                        # 设置10s超时时间
                        if round(time.time() * 1000) - create_time > 20 * 1000:
                            logger_code_operate.debug("读取操作超时:{}", data)
                            continue
                    if type == 0:
                        # 是否在固定库
@@ -131,7 +137,8 @@
    def add_operate(self, type, code, msg="", client=None, pos=None):
        redis = self.redis_manager_.getRedis()
        redis.rpush("code_operate_queue",
                    json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos}))
                    json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos,
                                "create_time": round(time.time() * 1000)}))
    def repaire_operate(self, client, pos, code):
        # 如果本来该位置代码为空则不用修复
@@ -153,7 +160,7 @@
                    "data": {"index": int(pos), "code": code, "min_price": float(min_price),
                             "max_price": float(max_price)}}
            redis = self.redis_manager_.getRedis()
            redis.rpush("code_operate_queue", json.dumps({"type": 3, "code": code, "client": client_id, "data": data}))
            redis.rpush("code_operate_queue", json.dumps({"type": 3, "code": code, "client": client_id, "data": data,  "create_time": round(time.time() * 1000)}))
    # 移除监控
    def remove_l2_listen(self, code, msg):
l2_data_manager.py
@@ -18,8 +18,9 @@
import redis_manager
import tool
import trade_manager
from log import logger_l2_trade, logger_l2_trade_cancel
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy
from trade_data_manager import TradeBuyDataManager
import limit_up_time_manager
_redisManager = redis_manager.RedisManager(1)
# l2数据管理
@@ -94,38 +95,52 @@
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
    @staticmethod
    def get_buy_cancel_compute_start_data(code):
    def get_buy_cancel_single_pos(code):
        redis = TradePointManager.__get_redis()
        info = redis.get("buy_cancel_compute_info-{}".format(code))
        info = redis.get("buy_cancel_single_pos-{}".format(code))
        if info is None:
            return None, None, None
            return None
        else:
            info = json.loads(info)
            return info[0], info[1], info[2]
            return int(info)
    # 设置买撤点信息
    # buy_num 纯买额  computed_index计算到的下标  index撤买信号起点
    @classmethod
    def set_buy_cancel_compute_start_data(cls, code, buy_num, computed_index, index):
    def set_buy_cancel_single_pos(cls, code, index):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("buy_cancel_compute_info-{}".format(code), expire, json.dumps((index, buy_num, computed_index)))
    # 增加撤买的纯买额
    @classmethod
    def add_buy_nums_for_cancel(cls, code, num_add, computed_index):
        cancel_index, nums, c_index = cls.get_buy_cancel_compute_start_data(code)
        if cancel_index is None:
            raise Exception("无撤买信号记录")
        nums += num_add
        cls.set_buy_cancel_compute_start_data(code, nums, computed_index)
        redis.setex("buy_cancel_single_pos-{}".format(code), expire, index)
    # 删除买撤点数据
    @staticmethod
    def delete_buy_cancel_point(code):
    @classmethod
    def delete_buy_cancel_point(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_cancel_compute_info-{}".format(code))
        redis.delete("buy_cancel_single_pos-{}".format(code))
    # 设置买撤纯买额
    @classmethod
    def set_compute_info_for_cancel_buy(cls, code, index, nums):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("compute_info_for_cancel_buy-{}".format(code), expire, json.dumps((index, nums)))
        logger_l2_trade_buy.info("{}保存撤单纯买额信息:{},{}", code, index, nums)
    # 获取买撤纯买额计算信息
    @classmethod
    def get_compute_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        info = redis.get("compute_info_for_cancel_buy-{}".format(code))
        if info is None:
            return None, 0
        else:
            info = json.loads(info)
            return info[0], info[1]
    @classmethod
    def delete_compute_info_for_cancel_buy(cls, code):
        redis = TradePointManager.__get_redis()
        redis.delete("compute_info_for_cancel_buy-{}".format(code))
def load_l2_data(code, force=False):
@@ -398,6 +413,11 @@
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    def buy_debug(cls, code, content, *args):
        logger_l2_trade_buy.debug(
            ("thread-id={} code={}  ".format(cls.random_key[code], code) + content).format(*args))
    @classmethod
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
@@ -474,38 +494,45 @@
        if start_index < 0:
            start_index = 0
        # 获取之前是否有记录的撤买信号
        cancel_index, buy_num_for_cancel, computed_index = cls.__has_order_cancel_begin_pos(code)
        cancel_index = TradePointManager.get_buy_cancel_single_pos(code)
        cancel_computed_index, cancel_buy_num = TradePointManager.get_compute_info_for_cancel_buy(code)
        if cancel_computed_index is None:
            logger_l2_trade.error("{} 未获取到买撤纯买额,起始计算位:{}", code, start_index)
        buy_single_index, buy_exec_index, buy_compute_index, buy_num = cls.__get_order_begin_pos(code)
        if cancel_index is None:
            # 无撤单信号起始点记录
            cancel_index = cls.__compute_order_cancel_begin_single(code, max(start_index - 3, 0), 3)
            buy_num_for_cancel = buy_num
            computed_index = buy_single_index
            if cancel_index is not None:
                cls.debug(code, "找到撤单信号,数据处理起始点:{} 数据:{}", start_index, local_today_datas[code][start_index])
        if cancel_index is not None:
                cls.debug(code, "找到撤单信号-{},买入信号为 ,数据处理起始点:{}", cancel_index, start_index)
                # 保存撤单信号
                TradePointManager.set_buy_cancel_single_pos(code, cancel_index)
            # 获取阈值 有买撤信号,统计撤买纯买额
            threshold_money = cls.__get_threshmoney(code)
            cls.__start_compute_cancel(code, cancel_index, max(computed_index, buy_exec_index + 1), buy_num_for_cancel,
        cls.__start_compute_cancel(code, cancel_index, cancel_computed_index + 1,
                                   cancel_buy_num,
                                       threshold_money,
                                       capture_time)
        else:
            # 无买撤信号,是否有虚拟下单
        # 是否有虚拟下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入")
            cls.debug(code, "有虚拟下单,无买撤信号,开始执行买入,截图时间:{}", capture_time)
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
                pass
    # 开始计算撤的信号
    @classmethod
    def __start_compute_cancel(cls, code, cancel_index, compute_start_index, origin_num, threshold_money, capture_time):
        # sure_type 0-虚拟挂买位  1-真实挂买位
        cancel_single = cancel_index is not None
        computed_index, buy_num_for_cancel, sure_type = cls.__sum_buy_num_for_cancel_order(code, compute_start_index,
                                                                                           origin_num, threshold_money)
                                                                                           origin_num, threshold_money,
                                                                                           cancel_single)
        total_datas = local_today_datas[code]
        if computed_index is not None:
            cls.debug(code, "获取到撤单执行信号,信号位置:{},m2:{} 数据:{}", computed_index, threshold_money,
@@ -528,10 +555,10 @@
                cls.__start_compute_buy(code, computed_index + 1, threshold_money, capture_time)
                pass
        else:
            cls.debug(code, "未获取到撤单执行信号,计算开始位置:{}, 纯买额:{}", compute_start_index, buy_num_for_cancel)
            # 无需撤买,记录撤买信号
            TradePointManager.set_buy_cancel_compute_start_data(code, buy_num_for_cancel, len(total_datas) - 1,
                                                                cancel_index)
            cls.debug(code, "撤买纯买额计算,计算位置:{}-{},目前为止纯买手数:{}", compute_start_index, total_datas[-1]["index"],
                      buy_num_for_cancel)
            # 无需撤买,设置计算信息
            TradePointManager.set_compute_info_for_cancel_buy(code, int(total_datas[-1]["index"]), buy_num_for_cancel)
            # 判断是否有虚拟下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
@@ -547,6 +574,9 @@
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        # 删除虚拟下单
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
        cls.debug(code, "开始执行买入")
        try:
            trade_manager.start_buy(code, capture_timestamp, last_data,
@@ -565,6 +595,7 @@
            # 取消买入标识
            TradePointManager.delete_buy_point(code)
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            cls.debug(code, "执行撤单成功")
        except Exception as e:
            cls.debug(code, "执行撤单异常:{}", str(e))
@@ -585,22 +616,29 @@
                num = 0
                new_get_pos = True
                cls.debug(code, "获取到买入信号起始点:{}  数据:{}", buy_single_index, total_datas[buy_single_index])
                limit_up_time_manager.save_limit_up_time(code, total_datas[buy_single_index]["val"]["time"])
        if buy_single_index is None:
            # 未获取到买入信号,终止程序
            return None
        # 买入纯买额统计
        compute_index, buy_nums = cls.__sum_buy_num_for_order(code, max(buy_single_index, compute_start_index), num,
                                                              threshold_money)
        compute_index, buy_nums = cls.__sum_buy_num_for_order_2(code, max(buy_single_index, compute_start_index), num,threshold_money,buy_single_index)
        #cls.__sum_buy_num_for_order(code, max(buy_single_index, compute_start_index), num,threshold_money)
        if compute_index is not None:
            cls.debug(code, "获取到买入执行位置:{} m值:{} 数据:{}", compute_index, threshold_money, total_datas[compute_index])
            cls.debug(code, "获取到买入执行位置:{} m值:{} 纯买手数:{} 数据:{}", compute_index, threshold_money, buy_nums,
                      total_datas[compute_index])
            # 记录买入信号位置
            cls.__save_order_begin_data(code, buy_single_index, compute_index, compute_index, buy_nums)
            # 虚拟下单
            cls.unreal_buy_dict[code] = (compute_index, capture_time)
            # 删除之前的所有撤单信号
            TradePointManager.delete_buy_cancel_point(code)
            TradePointManager.delete_compute_info_for_cancel_buy(code)
            # 为买撤保存基础纯买额
            TradePointManager.set_compute_info_for_cancel_buy(code, compute_index, buy_nums)
            # 数据是否处理完毕
            if L2DataUtil.is_index_end(code, compute_index):
                cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
@@ -626,15 +664,6 @@
    @classmethod
    def __save_order_begin_data(self, code, buy_single_index, buy_exec_index, compute_index, num):
        TradePointManager.set_buy_compute_start_data(code, buy_single_index, buy_exec_index, compute_index, num)
    # 获取撤单起始位置
    @classmethod
    def __has_order_cancel_begin_pos(cls, code):
        # cancel_index:撤单信号起点
        # buy_num_for_cancel:从挂入点计算的纯买额
        # computed_index 计算的最后位置
        cancel_index, buy_num_for_cancel, computed_index = TradePointManager.get_buy_cancel_compute_start_data(code)
        return cancel_index, buy_num_for_cancel, computed_index
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
@@ -776,11 +805,66 @@
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", code, i, buy_nums, threshold_num)
                    return i, buy_nums
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停买撤
                buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
        logger_l2_trade_buy.info("{}尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", code, compute_start_index, buy_nums,
                                 threshold_num)
        return None, buy_nums
    # 统计买入净买量,不计算在买入信号之前的买撤单
    @classmethod
    def __sum_buy_num_for_order_2(cls, code, compute_start_index, origin_num, threshold_money, buy_single_index):
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        property_buy_num_count = 0
        same_time_property = cls.__get_same_time_property(code)
        for i in range(compute_start_index, len(total_datas)):
            data = total_datas[i]
            _val = total_datas[i]["val"]
            # 有连续4个涨停买就标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{}", code, i, buy_nums, threshold_num)
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停买撤
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index >= buy_single_index:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                        if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                            # 同一秒,而且还在预估买入位之后按概率计算
                            property_buy_num_count -= int(_val["num"]) * int(data["re"])
                            cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # 未找到买撤数据的买入点
                    cls.cancel_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
            property_buy_num = round(property_buy_num_count * same_time_property)
            cls.buy_debug(code, "买入信号点之前同一秒买入手数-{},位置-{},总手数:{},目标手数:{}", property_buy_num, i,
                          buy_nums + property_buy_num, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums + property_buy_num >= threshold_num:
                return i, buy_nums + property_buy_num
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}", compute_start_index,
                      buy_nums + property_buy_num,
                      threshold_num)
        return None, buy_nums + property_buy_num
    # 同一时间买入的概率计算
    @classmethod
@@ -795,7 +879,7 @@
    # 统计买撤净买量
    @classmethod
    def __sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money):
    def __sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money, cancel_single=True):
        buy_nums = origin_num
        total_datas = local_today_datas[code]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -807,7 +891,8 @@
        same_time_property = cls.__get_same_time_property(code)
        # 同一秒,在预估买入位之后的数据之和
        property_buy_num_count = 0
        cls.cancel_debug(code, "撤单纯买额计算位置:{}-{} 预估挂买位:{}", start_index, len(total_datas) - 1, sure_pos)
        cls.cancel_debug(code, "撤单纯买额计算位置:{}-{} 预估挂买位:{} 是否有撤单信号:{}", start_index, len(total_datas) - 1, sure_pos,
                         cancel_single)
        for i in range(start_index, len(total_datas)):
            data = total_datas[i]
            _val = data["val"]
@@ -828,7 +913,7 @@
                    # 找到买撤数据的买入点
                    if buy_index < sure_pos:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        cls.cancel_debug(code, "{}数据在预估买入位之前 撤买纯买额:{}", i, buy_nums * limit_up_price)
                        cls.cancel_debug(code, "{}数据在预估买入位之前 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        cls.cancel_debug(code, "{}数据在预估买入位之后,买入位:{}", i, buy_index)
                        if sure_data["val"]["time"] == buy_data["val"]["time"]:
@@ -840,10 +925,14 @@
                    cls.cancel_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
            property_buy_num = round(property_buy_num_count * same_time_property)
            cls.cancel_debug(code, "预估买入点之后同一秒买入手数-{},位置-{},总手数:{}", property_buy_num, i, buy_nums + property_buy_num)
            if buy_nums + property_buy_num <= threshold_num:
            cls.cancel_debug(code, "预估买入点之后同一秒买入手数-{},位置-{},总手数:{},目标手数:{}", property_buy_num, i,
                             buy_nums + property_buy_num, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums + property_buy_num <= threshold_num and cancel_single:
                return i, buy_nums + property_buy_num, sure_type
        return None, buy_nums + round(property_buy_num_count * same_time_property), sure_type
        buy_num_news = buy_nums + round(property_buy_num_count * same_time_property)
        cls.cancel_debug(code, "处理起始位置:{} 最终纯买额:{}", start_index, buy_num_news)
        return None, buy_num_news, sure_type
    @classmethod
    def test(cls):
l2_data_util.py
@@ -42,7 +42,7 @@
        local_today_num_operate_map[code] = {}
    for data in source_datas:
        key = "{}-{}".format(data["val"]["num"], data["val"]["operateType"])
        key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"],data["val"]["price"])
        if local_today_num_operate_map[code].get(key) is None:
            local_today_num_operate_map[code].setdefault(key, [])
        local_today_num_operate_map[code].get(key).append(data)
@@ -87,7 +87,7 @@
                                                          cancel_data["val"]["cancelTimeUnit"])
    max_time = __sub_time(cancel_data["val"]["time"], min_space)
    min_time = __sub_time(cancel_data["val"]["time"], max_space)
    buy_datas = local_today_num_operate_map.get("{}-{}".format(cancel_data["val"]["num"], "0"))
    buy_datas = local_today_num_operate_map.get("{}-{}-{}".format(cancel_data["val"]["num"], "0",cancel_data["val"]["price"]))
    if buy_datas is None:
        # 无数据
        return None, None
@@ -97,7 +97,11 @@
            continue
        if int(data["val"]["num"]) != int(cancel_data["val"]["num"]):
            continue
        if compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0:
        if min_space == 0 and max_space == 0:
            if compare_time(data["val"]["time"], min_time) == 0:
                return data["index"], data
        elif compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0:
            return data["index"], data
    return None, None
@@ -107,6 +111,7 @@
@async_call
def save_big_data(code, same_time_nums, datas):
    return None
    latest_datas = __last_big_data.get(code)
    d1 = json.dumps(datas)
    d2 = json.dumps(latest_datas)
@@ -118,7 +123,7 @@
        for i in range(len(d1)):
            if d1[i] != d2[i]:
                # 保存快照
                logger_l2_big_data.debug("code:{} d1:{}  d2:{}", code, d1[i - 60: i + 30], d2[i - 60: i + 30])
                # logger_l2_big_data.debug("code:{} d1:{}  d2:{}", code, d1[i - 60: i + 30], d2[i - 60: i + 30])
                break
    for key in same_time_nums:
@@ -137,7 +142,8 @@
    # today_datas=[{"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 0, "time": "09:32:30"}},{"val": {"operateType": 0, "num": 1520, "cancelTime": 0, "cancelTimeUnit": 0, "time": "09:31:31"}}]
    # result= get_buy_data_with_cancel_data(cancel_data,today_datas)
    # print(result)
    redis = l2_data_manager._redisManager.getRedis()
    keys = redis.keys("big_data-*")
    for k in keys:
        redis.delete(k)
    code = "001209"
    l2_data_manager.load_l2_data(code)
    total_datas = l2_data_manager.local_today_datas[code]
    index, data = get_buy_data_with_cancel_data(total_datas[118], l2_data_manager.local_today_num_operate_map.get(code))
    print(index, data)
limit_up_time_manager.py
New file
@@ -0,0 +1,26 @@
# 涨停时间管理器
import redis_manager
import tool
import global_util
_redisManager = redis_manager.RedisManager(0)
def save_limit_up_time(code, time):
    _time = get_limit_up_time(code)
    if _time is None:
        redis = _redisManager.getRedis()
        redis.setex("limit_up_time-{}".format(code), tool.get_expire(), time)
        global_util.limit_up_time[code] = time
def get_limit_up_time(code):
    time = global_util.limit_up_time.get(code)
    if time is None:
        redis = _redisManager.getRedis()
        time = redis.get("limit_up_time-{}".format(code))
        if time is not None:
            redis = _redisManager.getRedis()
            redis.setex("limit_up_time-{}".format(code), tool.get_expire(), time)
    return time
log.py
@@ -25,6 +25,8 @@
logger.add(get_path("l2", "l2_trade_cancel"), filter=lambda record: record["extra"].get("name") == "l2_trade_cancel",
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("l2", "l2_trade_buy"), filter=lambda record: record["extra"].get("name") == "l2_trade_buy",
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("l2", "l2_big_data"), filter=lambda record: record["extra"].get("name") == "l2_big_data",
           rotation="00:00", compression="zip", enqueue=True)
@@ -48,6 +50,7 @@
logger_l2_process = logger.bind(name="l2_process")
logger_l2_trade = logger.bind(name="l2_trade")
logger_l2_trade_cancel = logger.bind(name="l2_trade_cancel")
logger_l2_trade_buy = logger.bind(name="l2_trade_buy")
logger_l2_big_data = logger.bind(name="l2_big_data")
redis_manager.py
@@ -20,10 +20,7 @@
if __name__ == "__main__":
    _redisManager = RedisManager(1)
    redis = _redisManager.getRedis()
    keys = redis.keys("l2-*")
    for k in keys:
        redis.delete(k)
    keys = redis.keys("l2-data-latest-*")
    keys = redis.keys("*601975*")
    for k in keys:
        redis.delete(k)
server.py
@@ -12,7 +12,6 @@
import authority
import juejin
import l2_data_manager
import l2_data_util
import ths_industry_util
import tool
import trade_manager
@@ -159,10 +158,6 @@
                    # 涨停代码
                    dataList = data_process.parseGPCode(_str)
                    # 设置涨停时间
                    for d in dataList:
                        _time = d["time"]
                        if _time != "00:00:00":
                            global_util.limit_up_time[d["code"]] = _time
                    gpcode_manager.set_limit_up_list(dataList)
                    ths_industry_util.set_industry_hot_num(dataList)
                elif type == 3:
@@ -216,8 +211,7 @@
                            volumn = item["volumn"]
                            volumnUnit = item["volumnUnit"]
                            code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
                            juejin.accpt_price(item["code"], float(item["price"]))
                        juejin.accpt_prices(data)
                elif type == 30:
                    # 心跳信息
trade_manager.py
@@ -1,13 +1,15 @@
# 交易管理器
import datetime
import json
import time
import gpcode_manager
import l2_code_operate
import mongo_data
import tool
from trade_data_manager import TradeBuyDataManager
from trade_gui import THSGuiTrade, async_call
import time as t
from l2_code_operate import *
import l2_data_manager
from log import *
@@ -263,7 +265,13 @@
            continue
        if code is not None and int(data["type"]) == 0:
            forbidden_trade(code)
            state = get_trade_state(code)
            if state != TRADE_STATE_BUY_SUCCESS:
            set_trade_state(code, TRADE_STATE_BUY_SUCCESS)
                # 删除买撤记录的临时信息
                l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
                l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
                l2_data_manager.TradePointManager.delete_buy_point(code)
# 处理委托成功数据
@@ -293,20 +301,21 @@
def __clear_data(code):
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    redis_l2.delete("buy_compute_index-{}".format(code))
    redis_l2.delete("buy_compute_num-{}".format(code))
    keys = redis_l2.keys("l2-{}-*".format(code))
    keys = redis_l2.keys("*{}*".format(code))
    for k in keys:
        redis_l2.delete(k)
    redis_l2.delete("l2-data-latest-{}".format(code))
    redis_l2.delete("l2-maxindex-{}".format(code))
    redis_l2.delete("latest-l2-count-{}".format(code))
    redis_trade = redis_manager.RedisManager(2).getRedis()
    redis_trade.delete("trade-state-{}".format(code))
    redis_info = redis_manager.RedisManager(0).getRedis()
    keys = redis_info.keys("*{}*".format(code))
    for k in keys:
        if k.find("pre") is None or k.find("pre") < 0:
            redis_info.delete(k)
if __name__ == "__main__":
    # time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    # print(time_str)
    __clear_data("000503")
    __clear_data("000068")