Administrator
2022-09-18 b946684114d097e937b766f986d12c7eea8edce8
l2数据计算优化
6个文件已修改
2个文件已添加
663 ■■■■■ 已修改文件
big_money_num_manager.py 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_volumn_manager.py 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
global_util.py 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 426 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_factor.py 46 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths_industry_util.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
big_money_num_manager.py
New file
@@ -0,0 +1,29 @@
# 大单管理
import redis_manager
import tool
__redisManager = redis_manager.RedisManager(0)
def add_num(code, num):
    redis = __redisManager.getRedis()
    redis.incrby("big_money-{}".format(code), num)
# 设置过期时间
def expire(code):
    redis = __redisManager.getRedis()
    redis.expire("big_money-{}".format(code), tool.get_expire())
def get_num(code):
    redis = __redisManager.getRedis()
    num = redis.get("big_money-{}".format(code))
    if num is None:
        return 0
    return num
if __name__ == "__main__":
    add_num("000332",0)
    expire("000332")
code_volumn_manager.py
New file
@@ -0,0 +1,62 @@
# 成交量管理
# 设置历史量
# max60 60天最大量
# yesterday 昨天的量
import global_util
import redis_manager
import tool
__redis_manager = redis_manager.RedisManager(0)
# 设置历史量
def set_histry_volumn(code, max60, yesterday):
    redis = __redis_manager.getRedis()
    global_util.max60_volumn[code] = max60
    global_util.yesterday_volumn[code] = yesterday
    redis.setex("volumn_max60-{}".format(code), tool.get_expire(), max60)
    redis.setex("volumn_yes-{}".format(code), tool.get_expire(), yesterday)
# 获取历史量
def get_histry_volumn(code):
    max60 = global_util.max60_volumn.get(code)
    yesterday = global_util.yesterday_volumn.get(code)
    redis = __redis_manager.getRedis()
    if max60 is None:
        max60 = redis.get("volumn_max60-{}".format(code))
    if yesterday is None:
        yesterday = redis.get("volumn_yes-{}".format(code))
    return max60, yesterday
# 设置今日量
def set_today_volumn(code, volumn):
    redis = __redis_manager.getRedis()
    global_util.today_volumn[code] = volumn
    redis.setex("volumn_today-{}".format(code), tool.get_expire(), volumn)
# 获取今日量
def get_today_volumn(code):
    _volumn = global_util.today_volumn.get(code)
    redis = __redis_manager.getRedis()
    if _volumn is None:
        _volumn = redis.get("volumn_today-{}".format(code))
    return _volumn
# 将量从数据库加入内存
def load():
    redis = __redis_manager.getRedis()
    keys = redis.keys("volumn_max60-*")
    if keys is not None:
        for k in keys:
            code = k.split("-")[1]
            global_util.max60_volumn[code] = redis.get(k)
    keys = redis.keys("volumn_yes-*")
    if keys is not None:
        for k in keys:
            code = k.split("-")[1]
            global_util.yesterday_volumn[code] = redis.get(k)
global_util.py
@@ -12,6 +12,18 @@
zyltgb_map = {}
# 今日涨停代码隐射
today_limit_up_codes = {}
# 行业热度指数
industry_hot_num = {}
# 今日量
today_volumn = {}
# 60日最大量
max60_volumn = {}
# 昨日量
yesterday_volumn = {}
# 大单
big_money_num = {}
# 涨停时间
limit_up_time = {}
# 加载行业数据
@@ -31,7 +43,11 @@
        if results is not None:
            results = [doc for doc in results]
            if len(results) > 0:
                zyltgb_map[code] = results[0]
                result = results[0]
                if result["zyltgb_unit"] == 0:
                    zyltgb_map[code] = round(float(result["zyltgb"]) * 100000000)
                else:
                    zyltgb_map[code] = round(float(result["zyltgb"]) * 10000)
# 添加今日涨停数据
@@ -47,4 +63,5 @@
if __name__ == "__main__":
    load_industry()
    load_zyltgb()
    print(zyltgb_map["002819"])
juejin.py
@@ -7,6 +7,8 @@
import gm.api as gmapi
import big_money_num_manager
import code_volumn_manager
import global_util
import gpcode_manager
import threading
@@ -41,6 +43,7 @@
# 每日初始化
def everyday_init():
    codes = gpcode_manager.get_gp_list()
    logger_system.info("每日初始化")
    # 载入行业股票代码
    global_util.load_industry()
@@ -50,7 +53,21 @@
    global_util.add_limit_up_codes([], True)
    # 主要获取收盘价
    get_latest_info(None)
    # 获取60天最大量与昨日量
    global_util.today_volumn.clear()
    global_util.max60_volumn.clear()
    global_util.yesterday_volumn.clear()
    volumn_dict = get_volumns(codes)
    for key in volumn_dict:
        code_volumn_manager.set_histry_volumn(key, volumn_dict[key][0], volumn_dict[key][1])
    # 清除大单数据
    global_util.big_money_num.clear()
    # 初始化大单数据
    for code in codes:
        big_money_num_manager.add_num(code, 0)
        big_money_num_manager.expire(code)
    # 清除涨停时间
    global_util.limit_up_time.clear()
def __run_schedule():
    while True:
@@ -170,15 +187,15 @@
    if pricePre is not None:
        rate = round((price - pricePre) * 100 / pricePre, 1)
        if rate >= 7:
            logger_juejin_tick.info("{}-{}-{}",code, price, rate)
            logger_juejin_tick.info("{}-{}-{}", code, price, rate)
            if not gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(
                    code) and not gpcode_manager.is_listen_full():
                L2CodeOperate.get_instance().add_operate(1, code,"现价变化")
                L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
            # 进入监控
        elif rate < 5:
            # 移除监控
            if gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(code):
                L2CodeOperate.get_instance().add_operate(0, code,"现价变化")
                L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
def on_bar(context, bars):
@@ -290,14 +307,14 @@
        volumn = int(result["volume"])
        day = "{:%Y-%m-%d}".format(result["eob"])
        if _fresult.get(code) is None:
            _fresult[code] = {"max_volumn": volumn, "latest_volumn": volumn}
            _fresult[code] = (volumn, volumn)
        if volumn > _fresult[code]["max_volumn"]:
            _fresult[code]["max_volumn"] = volumn;
        _fresult[code]["latest_volumn"] = volumn;
        if volumn > _fresult[code][0]:
            _fresult[code][0] = volumn;
        _fresult[code][1] = volumn;
    return _fresult
if __name__ == '__main__':
    _fresult=get_volumns(["000333","002531"])
    _fresult = get_volumns(["000333", "002531"])
    print(_fresult)
l2_data_manager.py
@@ -4,10 +4,13 @@
import time as t
from datetime import datetime
import big_money_num_manager
import data_process
import global_util
import l2_data_util
import gpcode_manager
import l2_trade_factor
import redis_manager
import tool
@@ -54,68 +57,67 @@
    @staticmethod
    def delete_buy_point(code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_compute_index-{}".format(code))
        redis.delete("buy_compute_num-{}".format(code))
    # 删除买撤点数据
    @staticmethod
    def delete_buy_cancel_point(code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_cancel_compute_index-{}".format(code))
        redis.delete("buy_cancel_compute_num-{}".format(code))
        redis.delete("buy_compute_index_info-{}".format(code))
    # 获取买入点信息
    # 返回数据为:买入点 累计纯买额 已经计算的数据索引
    @staticmethod
    def get_buy_compute_start_data(code):
        redis = TradePointManager.__get_redis()
        index = redis.get("buy_compute_index-{}".format(code))
        total_num = redis.get("buy_compute_num-{}".format(code))
        if index is None:
            return None, 0
        else:
            return int(index), int(total_num)
        _key = "buy_compute_index_info-{}".format(code)
        _data_json = redis.get(_key)
        if _data_json is None:
            return None, 0, None
        _data = json.loads(_data_json)
        return _data[0], _data[1], _data[2]
    # 设置买入点的值
    @staticmethod
    def set_buy_compute_start_data(code, num_add, index=None):
    def set_buy_compute_start_data(code, nums, compute_index, buy_index):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        if index is not None:
            redis.setex("buy_compute_index-{}".format(code), expire, index)
        key = "buy_compute_num-{}".format(code)
        if redis.get(key) is None:
            redis.setex(key, expire, num_add)
        _key = "buy_compute_index_info-{}".format(code)
        if buy_index is not None:
            redis.setex(_key, expire, json.dumps((buy_index, nums, compute_index)))
        else:
            redis.incrby(key, num_add)
            _buy_index, _nums, _compute_index = TradePointManager.get_buy_compute_start_data(code)
            redis.setex(_key, expire, json.dumps((_buy_index, nums, compute_index)))
    # 获取撤买入开始计算的信息
    # 返回数据的内容为:撤销点索引 撤买纯买额 计算的数据索引
    @staticmethod
    def get_buy_cancel_compute_start_data(code):
        redis = TradePointManager.__get_redis()
        info = redis.get("buy_cancel_compute_info-{}".format(code))
        if info is None:
            return None, None , None
            return None, None, None
        else:
            info=json.loads(info)
            return info[0],info[1],info[2]
            info = json.loads(info)
            return info[0], info[1], info[2]
    # 设置买撤点信息
    # buy_num 纯买额  computed_index计算到的下标  index撤买信号起点
    @classmethod
    def set_buy_cancel_compute_start_data(cls,code, buy_num,computed_index, index):
    def set_buy_cancel_compute_start_data(cls, code, buy_num, computed_index, index):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        redis.setex("buy_cancel_compute_info-{}".format(code), expire, json.dumps((index,buy_num,computed_index)))
        redis.setex("buy_cancel_compute_info-{}".format(code), expire, json.dumps((index, buy_num, computed_index)))
    # 增加撤买的纯买额
    @classmethod
    def add_buy_nums_for_cancel(cls,code,num_add,computed_index):
        cancel_index,nums,c_index= cls.get_buy_cancel_compute_start_data(code)
    def add_buy_nums_for_cancel(cls, code, num_add, computed_index):
        cancel_index, nums, c_index = cls.get_buy_cancel_compute_start_data(code)
        if cancel_index is None:
            raise Exception("无撤买信号记录")
        nums+=num_add
        cls.set_buy_cancel_compute_start_data(code,nums,computed_index)
        nums += num_add
        cls.set_buy_cancel_compute_start_data(code, nums, computed_index)
    # 删除买撤点数据
    @staticmethod
    def delete_buy_cancel_point(code):
        redis = TradePointManager.__get_redis()
        redis.delete("buy_cancel_compute_info-{}".format(code))
def load_l2_data(code, force=False):
@@ -360,6 +362,13 @@
            return False
        return True
    @staticmethod
    def is_index_end(code, index):
        if index >= len(local_today_datas[code]) - 1:
            return True
        else:
            return False
# L2交易数据处理器
class L2TradeDataProcessor:
@@ -396,6 +405,9 @@
                                                                   total_datas[-1],
                                                                   add_datas)
                if len(add_datas) > 0:
                    # 计算大单数量
                    cls.__compute_big_money_data(code, add_datas)
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
                    if L2DataUtil.is_same_time(now_time_str, latest_time):
@@ -403,45 +415,63 @@
                        state = trade_manager.get_trade_state(code)
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已挂单
                            cls.process_order(code, add_datas)
                            cls.__process_order(code, len(total_datas) - len(add_datas) - 3)
                        else:
                            # 未挂单
                            cls.process_not_order(code, add_datas)
                            cls.__process_not_order(code, add_datas, capture_timestamp)
                # 保存数据
                save_l2_data(code, datas, add_datas)
        finally:
            if code in cls.unreal_buy_dict:
                cls.unreal_buy_dict.pop(code)
    @classmethod
    def __compute_big_money_data(cls, code, add_datas):
        # 计算大单
        num = 0
        for data in add_datas:
            if l2_trade_factor.L2TradeFactorSourceDataUtil.is_big_money(data):
                if int(data["val"]["operateType"]) == 0:
                    num += data["re"]
                elif int(data["val"]["operateType"]) == 1:
                    num -= data["re"]
        big_money_num_manager.add_num(code, num)
    # 处理未挂单
    @classmethod
    def process_not_order(cls, code, add_datas):
    def __process_not_order(cls, code, add_datas, capture_time):
        # 获取阈值
        threshold_money = cls.__get_threshmoney(code)
        cls.__start_compute_buy(code, len(local_today_datas[code]) - len(add_datas), threshold_money, capture_time)
    # 处理已挂单
    @classmethod
    def process_order(cls, code, add_datas):
    def __process_order(cls, code, start_index, capture_time):
        if start_index < 0:
            start_index = 0
        # 获取之前是否有记录的撤买信号
        cancel_index, buy_num_for_cancel,computed_index= cls.has_order_cancel_begin_pos(code)
        buy_index, buy_num = cls.get_order_begin_pos(code)
        cancel_index, buy_num_for_cancel, computed_index = cls.__has_order_cancel_begin_pos(code)
        buy_index, buy_num = cls.__get_order_begin_pos(code)
        if cancel_index is None:
            # 无撤单信号起始点记录
            cancel_index = cls.compute_order_cancel_begin_single(code, len(add_datas) + 3, 3)
            cancel_index = cls.__compute_order_cancel_begin_single(code, start_index, 3)
            buy_num_for_cancel = 0
            computed_index=buy_index
            computed_index = buy_index
        if cancel_index is not None:
            # 获取阈值 有买撤信号,统计撤买纯买额
            threshold_money=10000000
            cls.start_compute_cancel(code,cancel_index,computed_index,buy_num_for_cancel,threshold_money)
            threshold_money = cls.__get_threshmoney(code)
            cls.__start_compute_cancel(code, cancel_index, computed_index, buy_num_for_cancel, threshold_money,
                                       capture_time)
        else:
            # 无买撤信号,终止执行
            pass
    #开始计算撤的信号
    # 开始计算撤的信号
    @classmethod
    def start_compute_cancel(cls,code,cancel_index, compute_start_index,origin_num,threshold_money):
    def __start_compute_cancel(cls, code, cancel_index, compute_start_index, origin_num, threshold_money, capture_time):
        # sure_type 0-虚拟挂买位  1-真实挂买位
        computed_index , buy_num_for_cancel,sure_type = cls.sum_buy_num_for_cancel_order(code,compute_start_index,origin_num,threshold_money)
        computed_index, buy_num_for_cancel, sure_type = cls.__sum_buy_num_for_cancel_order(code, compute_start_index,
                                                                                           origin_num, threshold_money)
        total_datas = local_today_datas[code]
        if computed_index is not None:
            # 发出撤买信号,需要撤买
@@ -449,8 +479,8 @@
                # 有虚拟下单
                # 删除虚拟下单标记
                cls.unreal_buy_dict.pop(code)
                # TODO 删除下单标记位置
                pass
                # 删除下单标记位置
                TradePointManager.delete_buy_point(code)
            else:
                # 无虚拟下单,需要执行撤单
                logger_l2_trade.info(
@@ -463,86 +493,101 @@
                except Exception as e:
                    pass
            if computed_index < len(local_today_datas[code])-1:
                # TODO数据尚未处理完,重新进入下单计算流程
                cls.start_compute_buy(code,computed_index+1,0,threshold_money)
            if computed_index < len(local_today_datas[code]) - 1:
                # 数据尚未处理完,重新进入下单计算流程
                cls.__start_compute_buy(code, computed_index + 1, 0, threshold_money, capture_time)
                pass
        else:
            #无需撤买,记录撤买信号
            TradePointManager.set_buy_cancel_compute_start_data(code,buy_num_for_cancel,len(total_datas)-1,cancel_index)
            # 无需撤买,记录撤买信号
            TradePointManager.set_buy_cancel_compute_start_data(code, buy_num_for_cancel, len(total_datas) - 1,
                                                                cancel_index)
            # 判断是否有虚拟下单
            unreal_buy_info=cls.unreal_buy_dict.get(code)
            unreal_buy_info = cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                logger_l2_trade.info(
                    "执行买入:{} ".format(code))
                try:
                    trade_manager.start_buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]],
                                            unreal_buy_info[0])
                    TradePointManager.delete_buy_cancel_point(code)
                except Exception as e:
                    pass
                cls.__buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]],
                          unreal_buy_info[0])
                pass
            else:
                #终止执行
                # 终止执行
                pass
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        logger_l2_trade.info(
            "执行买入:{} ".format(code))
        try:
            trade_manager.start_buy(code, capture_timestamp, last_data,
                                    last_data_index)
            TradePointManager.delete_buy_cancel_point(code)
        except Exception as e:
            pass
    @classmethod
    def start_compute_buy(cls,code,compute_start_index,origin_num,threshold_money):
        total_datas=local_today_datas[code]
    def __start_compute_buy(cls, code, compute_start_index, threshold_money, capture_time):
        total_datas = local_today_datas[code]
        # 获取买入信号计算起始位置
        index, num = cls.get_order_begin_pos(code)
        index, num = cls.__get_order_begin_pos(code)
        # 是否为新获取到的位置
        new_get_pos = False
        if index is None:
            # 有买入信号
            has_single, index = cls.compute_order_begin_pos(code, len(total_datas) - compute_start_index , 3)
            has_single, _index = cls.__compute_order_begin_pos(code, len(total_datas) - compute_start_index, 3)
            index = _index
            if has_single:
                num = 0
                new_get_pos = True
                # TODO 记录买入信号位置
        if index is None:
            # 未获取到买入信号,终止程序
            return None
        # 买入纯买额统计
        # TODO 获取阈值
        threshold_money=10000000
        compute_index,buy_nums = cls.sum_buy_num_for_order(code,compute_start_index,num,threshold_money)
        compute_index, buy_nums = cls.sum_buy_num_for_order(code, compute_start_index, num, threshold_money)
        if compute_index is not None:
            # 达到下单条件
            # 记录买入信号位置
            cls.__save_order_begin_data(code, compute_index, buy_nums, index)
            # 虚拟下单
            cls.unreal_buy_dict[code]=(compute_index,capture_time)
            cls.unreal_buy_dict[code] = (compute_index, capture_time)
            # 删除之前的所有撤单信号
            TradePointManager.delete_buy_cancel_point(code)
            # 数据是否处理完毕
            if L2DataUtil.is_index_end(code, compute_index):
                # 数据已经处理完毕,下单
                cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
            else:
                # 数据尚未处理完毕,进行下一步处理
                cls.__process_order(code, compute_index + 1, capture_time)
        else:
            # TODO 未达到下单条件,保存纯买额,设置纯买额
            # 未达到下单条件,保存纯买额,设置纯买额
            # 记录买入信号位置
            cls.__save_order_begin_data(code, len(total_datas) - 1, buy_nums, index)
        pass
    # 获取下单起始信号
    @classmethod
    def get_order_begin_pos(cls, code):
        index, num = TradePointManager.get_buy_compute_start_data(code)
    def __get_order_begin_pos(cls, code):
        index, num, compute_index = TradePointManager.get_buy_compute_start_data(code)
        return index, num
    @classmethod
    def __save_order_begin_data(self, code, compute_index, num, buy_index=None):
        TradePointManager.set_buy_compute_start_data(code, num, compute_index, buy_index)
    # 获取撤单起始位置
    @classmethod
    def has_order_cancel_begin_pos(cls):
    def __has_order_cancel_begin_pos(cls, code):
        # cancel_index:撤单信号起点
        # buy_num_for_cancel:从挂入点计算的纯买额
        # computed_index 计算的最后位置
        cancel_index, buy_num_for_cancel,computed_index = TradePointManager.get_buy_cancel_compute_start_data(code)
        return cancel_index, buy_num_for_cancel,computed_index
        cancel_index, buy_num_for_cancel, computed_index = TradePointManager.get_buy_cancel_compute_start_data(code)
        return cancel_index, buy_num_for_cancel, computed_index
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
    def compute_order_begin_pos(self, code, compute_data_count, continue_count):
    def __compute_order_begin_pos(self, code, compute_data_count, continue_count):
        # 倒数100条数据查询
        datas = local_today_datas[code]
        __len = len(datas)
@@ -617,17 +662,11 @@
    # 是否有撤销信号
    @classmethod
    def compute_order_cancel_begin_single(cls, code, compute_data_count, continue_count):
    def __compute_order_cancel_begin_single(cls, code, start_index, continue_count):
        datas = local_today_datas[code]
        __len = len(datas)
        if __len < continue_count:
        if len(datas) - start_index < continue_count:
            return None
        start_index = 0
        if compute_data_count > __len:
            compute_data_count = __len
        if __len > compute_data_count:
            start_index = __len - compute_data_count
        for i in range(start_index, __len - (continue_count - 1)):
            _val = datas[i]["val"]
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
@@ -654,25 +693,20 @@
                    return i
        return None
    # 保存下单位置
    def save_order_pos(self):
        pass
    # 是否可以下单
    def is_can_order(self):
    def __is_can_order(self):
        pass
    # 虚拟下单
    def unreal_order(self):
    def __unreal_order(self):
        pass
    # 设置虚拟挂买位
    def set_unreal_sure_order_pos(self):
        pass
    def __get_threshmoney(cls, code):
        l2_trade_factor.L2TradeFactorUtil.compute_m_value(code)
    # 获取预估挂买位
    @classmethod
    def get_sure_order_pos(cls, code):
    def __get_sure_order_pos(cls, code):
        index, data = TradeBuyDataManager.get_buy_sure_position(code)
        if index is None:
            return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1]
@@ -681,7 +715,7 @@
    # 统计买入净买量
    @classmethod
    def sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money):
    def __sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money):
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -703,13 +737,18 @@
    # 同一时间买入的概率计算
    @classmethod
    def get_same_time_property(cls, code):
        # TODO 与板块热度有关
    def __get_same_time_property(cls, code):
        # 计算板块热度
        industry = global_util.code_industry_map.get(code)
        if industry is not None:
            hot_num = global_util.industry_hot_num.get(industry)
            if hot_num is not None:
                return 1 - l2_trade_factor.L2TradeFactorUtil.get_industry_rate(hot_num)
        return 0.5
    # 统计买撤净买量
    @classmethod
    def sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money):
    def __sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money):
        buy_nums = origin_num
        total_datas = local_today_datas[code]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
@@ -717,8 +756,8 @@
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买
        sure_type, sure_pos, sure_data = cls.get_sure_order_pos(code)
        same_time_property = cls.get_same_time_property(code)
        sure_type, sure_pos, sure_data = cls.__get_sure_order_pos(code)
        same_time_property = cls.__get_same_time_property(code)
        # 同一秒,在预估买入位之后的数据之和
        property_buy_num_count = 0
        for i in range(start_index, len(total_datas)):
@@ -749,169 +788,8 @@
            property_buy_num = round(property_buy_num_count * same_time_property)
            if buy_nums + property_buy_num <= threshold_num:
                return i, buy_nums + property_buy_num,sure_type
        return None, buy_nums + round(property_buy_num_count * same_time_property),sure_type
def process_data(code, datas, capture_timestamp):
    now_time_str = datetime.now().strftime("%H:%M:%S")
    __start_time = round(t.time() * 1000)
    try:
        if len(datas) > 0:
            # 判断价格区间是否正确
            if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                      "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
            # 加载历史数据
            load_l2_data(code)
            # 纠正数据
            datas = correct_data(code, datas)
            add_datas = get_add_data(code, datas)
            if len(add_datas) > 0:
                # 拼接数据
                local_today_datas[code].extend(add_datas)
                l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
            total_datas = local_today_datas[code]
            # 买入确认点处理
            TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas, total_datas[-1],
                                                               add_datas)
            if len(add_datas) > 0:
                latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                # 时间差不能太大才能处理
                if __is_same_time(now_time_str, latest_time):
                    # logger.info("及时的数据,新增数据数量{}".format(len(add_datas)))
                    # 是否已经有买入开始计算点
                    c_index, c_num = TradePointManager.get_buy_compute_start_data(code)
                    if c_index is None:
                        # 判断是否出现禁止交易信号
                        forbidden = __is_have_forbidden_feature(code, len(add_datas) + 6, 6)
                        if forbidden:
                            trade_manager.forbidden_trade(code)
                        # 没有计算开始点
                        c_index = __get_limit_up_buy_start(code, len(add_datas) + 3, 3)
                        if c_index is not None:
                            logger_l2_trade.info("找到买点:{} - {}".format(code, json.dumps(total_datas[c_index])))
                            # 触发数据分析 ,获取连续涨停标记数据
                            buy_nums = 0
                            # 获取涨停价
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            last_data_index = -1
                            for i in range(c_index, len(total_datas)):
                                _val = total_datas[i]["val"]
                                # 有连续4个涨停买就标记计算起始点
                                if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0:
                                    # 涨停买
                                    buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                                elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                    # 涨停买撤
                                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                                if buy_nums * limit_up_price * 100 > 1000 * 10000:
                                    last_data_index = i
                                    break
                            TradePointManager.set_buy_compute_start_data(code, buy_nums, c_index)
                            if limit_up_price is not None:
                                if last_data_index > -1:
                                    # 大于1000w就买
                                    logger_l2_trade.info(
                                        "执行买入:{} - 计算结束点: {}".format(code, json.dumps(total_datas[-1])))
                                    try:
                                        trade_manager.start_buy(code, capture_timestamp, total_datas[last_data_index],
                                                                last_data_index)
                                        TradePointManager.delete_buy_cancel_point(code)
                                    except Exception as e:
                                        pass
                    else:
                        # 有计算开始点,计算新增的数据
                        buy_nums = c_num
                        last_data = None
                        last_data_index = len(total_datas) - len(add_datas) - 1
                        # 获取涨停价
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        for data in add_datas:
                            last_data_index += 1
                            _val = data["val"]
                            if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0:
                                # 涨停买
                                buy_nums += int(_val["num"]) * int(data["re"])
                            elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                # 涨停买撤
                                buy_nums -= int(_val["num"]) * int(data["re"])
                            if buy_nums * limit_up_price * 100 > 1000 * 10000:
                                last_data = data
                                break
                        TradePointManager.set_buy_compute_start_data(code, buy_nums)
                        if limit_up_price is not None:
                            if last_data is not None:
                                # 大于1000w就买
                                logger_l2_trade.info("执行买入:{} - 计算结束点: {}".format(code, json.dumps(add_datas[-1])))
                                try:
                                    trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index)
                                    TradePointManager.delete_buy_cancel_point(code)
                                except Exception as e:
                                    pass
                    if c_index is not None:
                        # 是否处于委托待成交
                        state = trade_manager.get_trade_state(code)
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已经委托,检测取消接口
                            cancel_index, cancel_num = TradePointManager.get_buy_cancel_compute_start_data(code)
                            if cancel_index is None:
                                # 之前尚未监测到买撤起点
                                cancel_index = __get_limit_up_buy_cancel_start(code, len(add_datas) + 3, 3)
                                if cancel_index is not None:
                                    total_datas = local_today_datas[code]
                                    # print("找到买撤点", cancel_index, total_datas[cancel_index])
                                    logger_l2_trade.info(
                                        "找到买撤点:{} - {}".format(code, json.dumps(total_datas[cancel_index])))
                                    # 触发数据分析 ,获取连续涨停标记数据
                                    nums = 0
                                    for i in range(c_index, len(total_datas)):
                                        _val = total_datas[i]["val"]
                                        if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                            # 涨停买撤
                                            nums += int(_val["num"]) * int(total_datas[i]["re"])
                                    TradePointManager.set_buy_cancel_compute_start_data(code, nums, cancel_index)
                            else:
                                # 之前监测到了买撤销起点
                                cancel_nums_add = 0
                                for data in add_datas:
                                    _val = data["val"]
                                    if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                        # 涨停买撤
                                        cancel_nums_add += int(_val["num"]) * int(data["re"])
                                TradePointManager.set_buy_cancel_compute_start_data(code, cancel_nums_add)
                                latest_num = cancel_num + cancel_nums_add
                                # 获取涨停价
                                limit_up_price = gpcode_manager.get_limit_up_price(code)
                                if limit_up_price is not None:
                                    if latest_num * limit_up_price * 100 > 1000 * 10000:
                                        # 大于1000w就买
                                        # print("执行撤销")
                                        logger_l2_trade.info(
                                            "执行撤销:{} - {}".format(code, json.dumps(add_datas[-1])))
                                        try:
                                            trade_manager.start_cancel_buy(code)
                                            # 取消买入标识
                                            TradePointManager.delete_buy_point(code)
                                            TradePointManager.delete_buy_cancel_point(code)
                                        except Exception as e:
                                            pass
                            pass
            # 保存数据
            save_l2_data(code, datas, add_datas)
    finally:
        pass
                return i, buy_nums + property_buy_num, sure_type
        return None, buy_nums + round(property_buy_num_count * same_time_property), sure_type
def __get_time_second(time_str):
@@ -1168,8 +1046,8 @@
        for line in f.readlines():  # 依次读取每行
            line = line.strip()
            data = json.loads(line)
            result = __format_l2_data(data, code, 10.00)
            add_datas = get_add_data(code, result)
            result = L2DataUtil.format_l2_data(data, code, 10.00)
            add_datas = L2DataUtil.get_add_data(code, result)
            print("增加的数量:", len(add_datas))
            if len(add_datas) > 0:
                # 拼接数据
l2_trade_factor.py
@@ -1,4 +1,7 @@
# l2交易因子
import global_util
class L2TradeFactorUtil:
    # 获取基础m值,返回单位为元
    @classmethod
@@ -93,15 +96,45 @@
        # 自由流通股本影响比例
        zyltgb_rate = cls.get_zylt_rate(zyltgb)
        # 行业涨停影响比例
        industry_rate = cls.get_industry_rate(total_industry_limit_percent)
        industry_rate=0
        if total_industry_limit_percent is not None:
            industry_rate = cls.get_industry_rate(total_industry_limit_percent)
        # 量影响比例
        volumn_rate=cls.get_volumn_rate(volumn_day60_max,volumn_yest,volumn_today)
        volumn_rate = 0
        if volumn_day60_max is not None and volumn_yest is not None and volumn_today is not None:
            volumn_rate = cls.get_volumn_rate(volumn_day60_max, volumn_yest, volumn_today)
        # 涨停时间影响比例
        limit_up_time_rate=cls.get_limit_up_time_rate(limit_up_time)
        limit_up_time_rate=0
        if limit_up_time is not None:
            limit_up_time_rate = cls.get_limit_up_time_rate(limit_up_time)
        # 万手哥影响
        big_money_rate=cls.get_big_money_rate(big_money_num)
        big_money_rate = 0
        if big_money_num is not None:
            big_money_rate = cls.get_big_money_rate(big_money_num)
        print("zyltgb_rate:{} industry_rate:{} volumn_rate:{} limit_up_time_rate:{} big_money_rate:{}",zyltgb_rate,industry_rate,volumn_rate,limit_up_time_rate,big_money_rate)
        return 1-(zyltgb_rate+industry_rate+volumn_rate+limit_up_time_rate+big_money_rate);
        return 1 - (zyltgb_rate + industry_rate + volumn_rate + limit_up_time_rate + big_money_rate);
    @classmethod
    def compute_rate_by_code(cls, code):
        zyltgb = global_util.zyltgb_map.get(code)
        total_industry_limit_percent = global_util.industry_hot_num.get(code)
        volumn_day60_max, volumn_yest, volumn_today = global_util.max60_volumn.get(
            code), global_util.yesterday_volumn.get(code), global_util.today_volumn.get(code)
        limit_up_time = global_util.limit_up_time.get(code)
        big_money_num = global_util.big_money_num.get(code)
        return cls.compute_rate(zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today,
                                limit_up_time, big_money_num)
    @classmethod
    def compute_m_value(cls, code):
        zyltgb = global_util.zyltgb_map.get(code)
        if zyltgb is None:
            print("没有获取到自由流通市值")
            return 10000000
        rate = cls.compute_rate_by_code(code)
        print("m值获取:",code,round(zyltgb*rate))
        return round(zyltgb*rate)
# l2因子归因数据
@@ -109,6 +142,9 @@
    # 是否为大单
    @classmethod
    def is_big_money(cls, data):
        if int(data["val"]["limitPrice"]) != 1:
            return False
        if int(data["val"]["num"]) >= 9000:
            return True
        money = round(float(data["val"]["price"]) * int(data["val"]["num"]) * 100)
server.py
@@ -6,11 +6,13 @@
import time
import data_process
import global_util
import gpcode_manager
import authority
import juejin
import l2_data_manager
import l2_data_util
import ths_industry_util
import tool
import trade_manager
import l2_code_operate
@@ -103,7 +105,7 @@
                                        self.l2CodeOperate.set_operate_code_state(client, channel, 1)
                                if gpcode_manager.is_listen(code):
                                    l2_data_manager.process_data(code, datas, capture_timestamp)
                                    l2_data_manager.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                            except l2_data_manager.L2DataException as l:
                                # 单价不符
                                if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
@@ -154,8 +156,14 @@
                    t1.start()
                elif type == 2:
                    # 涨停代码
                    codeList = data_process.parseGPCode(_str)
                    gpcode_manager.set_limit_up_list(codeList)
                    dataList = data_process.parseGPCode(_str)
                    # 设置涨停时间
                    for d in dataList:
                        _time = d["time"]
                        if _time != "00:00:00":
                            global_util.limit_up_time[d["code"]] = _time
                    gpcode_manager.set_limit_up_list(dataList)
                    ths_industry_util.set_industry_hot_num(dataList)
                elif type == 3:
                    # 交易成功信息
                    dataList = data_process.parseList(_str)
ths_industry_util.py
@@ -1,4 +1,5 @@
# 同花顺行业
import global_util
import mongo_data
@@ -17,6 +18,33 @@
    return __code_map, __industry_map
# 设置行业热度
def set_industry_hot_num(limit_up_datas):
    industry_hot_dict = {}
    code_industry_map = global_util.code_industry_map
    if code_industry_map is None or len(code_industry_map) == 0:
        global_util.load_industry();
        code_industry_map = global_util.code_industry_map
    if code_industry_map is None:
        raise Exception("获取代码对应的行业出错")
    for data in limit_up_datas:
        code = data["code"]
        industry = code_industry_map.get(code)
        if industry is None:
            # 获取代码对应的行业出错
            continue
        if industry_hot_dict.get(industry) is None:
            industry_hot_dict.setdefault(industry, 0)
        percent = float(data["limitUpPercent"])
        if percent > 21:
            percent = 21
        industry_hot_dict[industry] = round(industry_hot_dict[industry] + percent, 2)
    global_util.industry_hot_num = industry_hot_dict
if __name__ == "__main__":
    _code_map, _industry_map = get_code_industry_maps()
    print(_code_map,_industry_map)
    print(_code_map, _industry_map)