Administrator
2022-09-16 b7000cbf5e67e90abe53e96a4ea931afbf906e24
l2数据计算优化
2个文件已添加
10个文件已修改
1216 ■■■■ 已修改文件
data_process.py 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_code_operate.py 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 742 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 53 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_factor.py 123 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_data_manager.py 158 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_gui.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 50 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_process.py
@@ -12,7 +12,6 @@
import gpcode_manager
import mongo_data
# 统计今日卖出
# 统计今日买入
import tool
@@ -87,6 +86,7 @@
    code = data["code"]
    trade_data = data["data"]
    return code, trade_data
# 代码对应的价格是否正确
def is_same_code_with_price(code, price):
@@ -164,11 +164,11 @@
    mongo_data.save("ths-zylt", _list)
def saveClientActive(client_id, host):
def saveClientActive(client_id, host, thsDead):
    if client_id <= 0:
        return
    redis = __redisManager.getRedis();
    redis.setex("client-active-{}".format(client_id), 10, host)
    redis.setex("client-active-{}".format(client_id), 10, json.dumps((host, thsDead)))
def getValidL2Clients():
@@ -183,9 +183,24 @@
    return list(set(client_ids).intersection(set(l2_clients)))
# 获取客户端IP
def getActiveClientIP(client_id):
    redis = __redisManager.getRedis();
    return redis.get("client-active-{}".format(client_id))
    val = redis.get("client-active-{}".format(client_id))
    if val is None:
        return None
    val=json.loads(val)
    return val[0]
# 获取客户端同花顺状态
def getTHSState(client_id):
    redis = __redisManager.getRedis();
    val = redis.get("client-active-{}".format(client_id))
    if val is None:
        return None
    val = json.loads(val)
    return val[1]
# 保存量能
gui.py
@@ -237,7 +237,11 @@
            print("refresh-l2-data")
            for client_id in code_sv_map:
                ip = data_process.getActiveClientIP(client_id)
                ths_dead=data_process.getTHSState(client_id)
                if ip is not None and len(ip) > 0:
                    if ths_dead:
                        client_state[client_id].configure(text="(在线:{})".format(ip), foreground="#FF7F27")
                    else:
                    client_state[client_id].configure(text="(在线:{})".format(ip), foreground="#008000")
                else:
                    client_state[client_id].configure(text="(离线:未知IP)", foreground="#999999")
juejin.py
@@ -157,7 +157,7 @@
        # 保存最新价
        symbol = symbol.split(".")[1]
        logger_juejin_tick.info("{}   {}    {}".format(symbol, price, tick["created_at"]))
        accpt_price(symbol, price)
        __prices_now[symbol] = price
@@ -170,15 +170,15 @@
    if pricePre is not None:
        rate = round((price - pricePre) * 100 / pricePre, 1)
        if rate >= 7:
            print(code, price, rate)
            logger_juejin_tick.info("{}-{}-{}",code, price, rate)
            if not gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(
                    code) and not gpcode_manager.is_listen_full():
                L2CodeOperate.get_instance().add_operate(1, code)
                L2CodeOperate.get_instance().add_operate(1, code,"现价变化")
            # 进入监控
        elif rate < 5:
            # 移除监控
            if gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(code):
                L2CodeOperate.get_instance().add_operate(0, code)
                L2CodeOperate.get_instance().add_operate(0, code,"现价变化")
def on_bar(context, bars):
l2_code_operate.py
@@ -72,7 +72,9 @@
                print("读取操作队列", data, redis.llen("code_operate_queue"))
                if data is not None:
                    data = json.loads(data)
                    logger_code_operate.info("读取操作队列:{}", data)
                    type, code = data["type"], data["code"]
                    if type == 0:
                        # 是否在固定库
                        if l2_data_manager.is_in_l2_fixed_codes(code):
@@ -126,10 +128,10 @@
                logging.exception(e)
                print("发送操作异常:", str(e))
    def add_operate(self, type, code, client=None, pos=None):
    def add_operate(self, type, code, msg="", client=None, pos=None):
        redis = self.redis_manager_.getRedis()
        print("add_operate", type, code)
        redis.rpush("code_operate_queue", json.dumps({"type": type, "code": code, "client": client, "pos": pos}))
        redis.rpush("code_operate_queue",
                    json.dumps({"type": type, "msg": msg, "code": code, "client": client, "pos": pos}))
    def repaire_operate(self, client, pos, code):
        # 如果本来该位置代码为空则不用修复
@@ -137,9 +139,7 @@
        if code_ == "" or code_ is None:
            return
        logger_code_operate.info("客户端位置代码修复:client-{},pos-{},code-{}", client, pos, code)
        redis = self.redis_manager_.getRedis()
        redis.rpush("code_operate_queue", json.dumps({"type": 2, "client": client, "pos": pos, "code": code}))
        self.add_operate(2, code, "", client, pos)
    # 修复l2的数据错误
    def repaire_l2_data(self, code):
@@ -156,15 +156,15 @@
            redis.rpush("code_operate_queue", json.dumps({"type": 3, "code": code, "client": client_id, "data": data}))
    # 移除监控
    def remove_l2_listen(self, code):
    def remove_l2_listen(self, code, msg):
        # 是否正在监听
        if gpcode_manager.is_listen(code):
            self.add_operate(0, code)
            self.add_operate(0, code, msg=msg)
    # 设置代码操作状态,服务器保存的代码是否与实际设置的代码保持一致
    @classmethod
    def set_operate_code_state(cls, client_id, channel, state):
        cls.getRedis().setex("code-operate_state-{}-{}".format(client_id, channel), tool.get_expire(), state)
        cls.getRedis().setex("code-operate_state-{}-{}".format(client_id, channel), 10, state)
    def get_operate_code_state(self, client_id, channel):
        value = self.getRedis().get("code-operate_state-{}-{}".format(client_id, channel))
@@ -182,11 +182,12 @@
        data = json.loads(result["data"])
        codes = data["data"]
        result_list = {}
        if codes is not None:
        for d in codes:
            result_list[d["index"]]=d["code"]
        return result_list
    else:
        raise Exception("获取客户端监听代码出错")
        raise Exception("获取客户端监听代码出错:{}".format(result))
# 矫正客户端代码
@@ -198,11 +199,13 @@
            for index in range(0, 8):
                code = gpcode_manager.get_listen_code_by_pos(client_id, index)
                if code is not None and len(code) > 0 and index_codes.get(index) != code:
                    # 修复代码
                    # 交易时间才修复代码
                    if tool.is_trade_time():
                    L2CodeOperate().repaire_operate(client_id, index, code)
                elif code is None or len(code) == 0 and index_codes.get(index) is not None:
                    # 删除前端代码位
                    L2CodeOperate().add_operate(4, "", client_id, index)
                    # L2CodeOperate().add_operate(4, "", client_id, index)
                    pass
        except Exception as e:
            logger_code_operate.error("client:{} msg:{}".format(client_id, str(e)))
l2_data_manager.py
@@ -1,18 +1,19 @@
import decimal
import json
import os
import time as t
from datetime import datetime
import data_process
import l2_data_util
import mysql
import gpcode_manager
import mongo_data
import redis_manager
import tool
import trade_manager
from log import logger_l2_trade
from trade_data_manager import TradeBuyDataManager
_redisManager = redis_manager.RedisManager(1)
# l2数据管理
@@ -91,25 +92,30 @@
    @staticmethod
    def get_buy_cancel_compute_start_data(code):
        redis = TradePointManager.__get_redis()
        index = redis.get("buy_cancel_compute_index-{}".format(code))
        total_num = redis.get("buy_cancel_compute_num-{}".format(code))
        if index is None:
            return None, 0
        info = redis.get("buy_cancel_compute_info-{}".format(code))
        if info is None:
            return None, None , None
        else:
            return int(index), int(total_num)
            info=json.loads(info)
            return info[0],info[1],info[2]
    # 设置买撤点信息
    @staticmethod
    def set_buy_cancel_compute_start_data(code, num_add, index=None):
    # buy_num 纯买额  computed_index计算到的下标  index撤买信号起点
    @classmethod
    def set_buy_cancel_compute_start_data(cls,code, buy_num,computed_index, index):
        redis = TradePointManager.__get_redis()
        expire = tool.get_expire()
        if index is not None:
            redis.setex("buy_cancel_compute_index-{}".format(code), expire, index)
        key = "buy_cancel_compute_num-{}".format(code)
        if redis.get(key) is None:
            redis.setex(key, expire, num_add)
        else:
            redis.incrby(key, num_add)
        redis.setex("buy_cancel_compute_info-{}".format(code), expire, json.dumps((index,buy_num,computed_index)))
    # 增加撤买的纯买额
    @classmethod
    def add_buy_nums_for_cancel(cls,code,num_add,computed_index):
        cancel_index,nums,c_index= cls.get_buy_cancel_compute_start_data(code)
        if cancel_index is None:
            raise Exception("无撤买信号记录")
        nums+=num_add
        cls.set_buy_cancel_compute_start_data(code,nums,computed_index)
def load_l2_data(code, force=False):
@@ -140,7 +146,8 @@
    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
def saveL2Data(code, datas):
def saveL2Data(code, datas, msg=""):
    start_time = round(t.time() * 1000)
    # 查询票是否在待监听的票里面
    if not gpcode_manager.is_in_gp_pool(code):
        return None
@@ -152,22 +159,21 @@
            # 计算保留的时间
            expire = tool.get_expire()
            index = 0
            start_index = redis_instance.get("l2-maxindex-{}".format(code))
            if start_index is None:
                start_index = 0
                start_index = -1
            else:
                start_index = int(start_index)
            max_index = start_index
            i = 0
            for _data in datas:
                index = index + 1
                i += 1
                key = "l2-" + _data["key"]
                value = redis_instance.get(key)
                if value is None:
                    # 新增
                    max_index = start_index + index
                    value = {"index": start_index + index, "re": _data['re']}
                    max_index = start_index + i
                    value = {"index": start_index + i, "re": _data["re"]}
                    redis_instance.setex(key, expire, json.dumps(value))
                else:
                    json_value = json.loads(value)
@@ -179,23 +185,117 @@
    finally:
        redis_instance.delete("l2-save-{}".format(code))
    print("保存新数据用时:", msg, round(t.time() * 1000) - start_time)
    return datas
# TODO 获取l2的数据
def get_l2_data_index(code, key):
    pass
def parseL2Data(str):
    now = int(t.time())
    day = datetime.now().strftime("%Y%m%d")
    dict = json.loads(str)
    data = dict["data"]
    client = dict["client"]
    code = data["code"]
    channel = data["channel"]
    capture_time = data["captureTime"]
    process_time = data["processTime"]
    data = data["data"]
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    datas = L2DataUtil.format_l2_data(data, code, limit_up_price)
    # 获取涨停价
    return day, client, channel, code, capture_time, process_time, datas
# 保存l2数据
def save_l2_data(code, datas, add_datas):
    redis = _redisManager.getRedis()
    # 保存最近的数据
    redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
    # 设置进内存
    if code in local_latest_datas:
        local_latest_datas[code] = datas
    else:
        local_latest_datas.setdefault(code, datas)
    __set_l2_data_latest_count(code, len(datas))
    if len(add_datas) > 0:
        saveL2Data(code, add_datas)
class L2DataUtil:
    @classmethod
    def is_same_time(cls, time1, time2):
        # TODO 测试
        # if 1 > 0:
        #     return True
        time1_s = time1.split(":")
        time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
        time2_s = time2.split(":")
        time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
        if abs(time2_second - time1_second) < 3:
            return True
        else:
            return False
    # 获取增量数据
    @classmethod
    def get_add_data(cls, code, datas, _start_index):
        if datas is not None and len(datas) < 1:
            return []
        last_key = ""
        __latest_datas = local_latest_datas.get(code)
        if __latest_datas is not None and len(__latest_datas) > 0:
            last_key = __latest_datas[-1]["key"]
        count = 0
        start_index = -1
        # 如果原来没有数据
        # TODO 设置add_data的序号
        for n in reversed(datas):
            count += 1
            if n["key"] == last_key:
                start_index = len(datas) - count
                break
        _add_datas = []
        if len(last_key) > 0:
            if start_index < 0 or start_index + 1 >= len(datas):
                _add_datas = []
            else:
                _add_datas = datas[start_index + 1:]
        else:
            _add_datas = datas[start_index + 1:]
        for i in range(0, len(_add_datas)):
            _add_datas[i]["index"] = _start_index + i
        return _add_datas
    # 纠正数据,将re字段替换为较大值
    @classmethod
    def correct_data(cls, code, _datas):
        latest_data = local_latest_datas.get(code)
        if latest_data is None:
            latest_data = []
        save_list = []
        for data in _datas:
            for _ldata in latest_data:
                if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]:
                    max_re = max(_ldata["re"], data["re"])
                    _ldata["re"] = max_re
                    data["re"] = max_re
                    # 保存到数据库,更新re的数据
                    save_list.append(_ldata)
        if len(save_list) > 0:
            saveL2Data(code, save_list, "保存纠正数据")
        return _datas
    # 处理l2数据
    @classmethod
    def format_l2_data(cls, data, code, limit_up_price):
    datas = []
    dataIndexs = {}
    # 获取涨停价
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    same_time_num = {}
    for item in data:
        # 解析数据
@@ -224,98 +324,444 @@
            # 数据重复次数默认为1
            datas.append({"key": key, "val": item, "re": 1})
            dataIndexs.setdefault(key, len(datas) - 1)
    for key in same_time_num:
        if same_time_num[key] > 50:
            # 只能保存近3s的数据
            ts1 = l2_data_util.get_time_as_seconds(datas[-1]["val"]["time"])
            ts_now = l2_data_util.get_time_as_seconds(datetime.now().strftime("%H:%M:%S"))
            if abs(ts1 - ts_now) <= 3:
                # TODO 保存数据
                redis = _redisManager.getRedis()
                redis.set("big_data-{}-{}".format(code, int(round(t.time() * 1000))), str)
        l2_data_util.save_big_data(code, same_time_num, data)
        return datas
    return day, client, channel, code, datas
    @classmethod
    def get_time_as_second(time_str):
        ts = time_str.split(":")
        return int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
# 纠正数据,将re字段替换为较大值
def correct_data(code, _datas):
    latest_data = local_latest_datas.get(code)
    if latest_data is None:
        latest_data = []
    for data in _datas:
        for _ldata in latest_data:
            if _ldata["key"] == data["key"] and _ldata["re"] != data["re"]:
                max_re = max(_ldata["re"], data["re"])
                _ldata["re"] = max_re
                data["re"] = max_re
    return _datas
# 保存l2数据
def save_l2_data(code, datas, add_datas):
    redis = _redisManager.getRedis()
    # 保存最近的数据
    redis.setex("l2-data-latest-{}".format(code), tool.get_expire(), json.dumps(datas))
    # 设置进内存
    if code in local_latest_datas:
        local_latest_datas[code] = datas
    else:
        local_latest_datas.setdefault(code, datas)
    __set_l2_data_latest_count(code, len(datas))
    if len(add_datas) > 0:
        saveL2Data(code, add_datas)
# 获取增量数据
def get_add_data(code, datas):
    if datas is not None and len(datas) < 1:
        return []
    last_key = ""
    __latest_datas = local_latest_datas.get(code)
    if __latest_datas is not None and len(__latest_datas) > 0:
        last_key = __latest_datas[-1]["key"]
    count = 0
    start_index = -1
    # 如果原来没有数据
    for n in reversed(datas):
        count += 1
        if n["key"] == last_key:
            start_index = len(datas) - count
            break
    if len(last_key) > 0:
        if start_index < 0 or start_index + 1 >= len(datas):
            return []
        else:
            return datas[start_index + 1:]
    else:
        return datas[start_index + 1:]
def __is_same_time(time1, time2):
    # TODO 测试
    # if 1 > 0:
    #     return True
    time1_s = time1.split(":")
    time1_second = int(time1_s[0]) * 3600 + int(time1_s[1]) * 60 + int(time1_s[2])
    time2_s = time2.split(":")
    time2_second = int(time2_s[0]) * 3600 + int(time2_s[1]) * 60 + int(time2_s[2])
    if abs(time2_second - time1_second) < 3:
        return True
    else:
    # 是否是涨停价买
    def is_limit_up_price_buy(val):
        if int(val["limitPrice"]) != 1:
        return False
        if int(val["operateType"]) != 0:
            return False
def process_data(code, datas):
        price = float(val["price"])
        num = int(val["num"])
        if price * num * 100 < 50 * 10000:
            return False
        return True
    # 是否涨停买撤
    def is_limit_up_price_buy_cancel(val):
        if int(val["limitPrice"]) != 1:
            return False
        if int(val["operateType"]) != 1:
            return False
        price = float(val["price"])
        num = int(val["num"])
        if price * num * 100 < 50 * 10000:
            return False
        return True
# L2交易数据处理器
class L2TradeDataProcessor:
    unreal_buy_dict = {}
    @classmethod
    # 数据处理入口
    # datas: 本次截图数据
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp):
    now_time_str = datetime.now().strftime("%H:%M:%S")
    __start_time = round(t.time() * 1000)
    try:
        if len(datas) > 0:
            # 判断价格区间是否正确
            if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                    raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                          "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
                # 加载历史数据
                load_l2_data(code)
                # 纠正数据
                datas = L2DataUtil.correct_data(code, datas)
                _start_index = 0
                if local_today_datas.get(code) is not None and len(local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"]
                add_datas = L2DataUtil.get_add_data(code, datas, _start_index)
                if len(add_datas) > 0:
                    # 拼接数据
                    local_today_datas[code].extend(add_datas)
                    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
                total_datas = local_today_datas[code]
                # 买入确认点处理
                TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas,
                                                                   total_datas[-1],
                                                                   add_datas)
                if len(add_datas) > 0:
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
                    if L2DataUtil.is_same_time(now_time_str, latest_time):
                        # 判断是否已经挂单
                        state = trade_manager.get_trade_state(code)
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已挂单
                            cls.process_order(code, add_datas)
                        else:
                            # 未挂单
                            cls.process_not_order(code, add_datas)
                # 保存数据
                save_l2_data(code, datas, add_datas)
        finally:
            if code in cls.unreal_buy_dict:
                cls.unreal_buy_dict.pop(code)
                raise L2DataException(L2DataException.CODE_PRICE_ERROR, "股价不匹配 code-{} price-{}".format(code,datas[0]["val"]["price"]))
    # 处理未挂单
    @classmethod
    def process_not_order(cls, code, add_datas):
    # 处理已挂单
    @classmethod
    def process_order(cls, code, add_datas):
        # 获取之前是否有记录的撤买信号
        cancel_index, buy_num_for_cancel,computed_index= cls.has_order_cancel_begin_pos(code)
        buy_index, buy_num = cls.get_order_begin_pos(code)
        if cancel_index is None:
            # 无撤单信号起始点记录
            cancel_index = cls.compute_order_cancel_begin_single(code, len(add_datas) + 3, 3)
            buy_num_for_cancel = 0
            computed_index=buy_index
        if cancel_index is not None:
            # 获取阈值 有买撤信号,统计撤买纯买额
            threshold_money=10000000
            cls.start_compute_cancel(code,cancel_index,computed_index,buy_num_for_cancel,threshold_money)
        else:
            # 无买撤信号,终止执行
            pass
    #开始计算撤的信号
    @classmethod
    def start_compute_cancel(cls,code,cancel_index, compute_start_index,origin_num,threshold_money):
        # sure_type 0-虚拟挂买位  1-真实挂买位
        computed_index , buy_num_for_cancel,sure_type = cls.sum_buy_num_for_cancel_order(code,compute_start_index,origin_num,threshold_money)
        total_datas = local_today_datas[code]
        if computed_index is not None:
            # 发出撤买信号,需要撤买
            if cls.unreal_buy_dict.get(code) is not None:
                # 有虚拟下单
                # 删除虚拟下单标记
                cls.unreal_buy_dict.pop(code)
                # TODO 删除下单标记位置
                pass
            else:
                # 无虚拟下单,需要执行撤单
                logger_l2_trade.info(
                    "执行撤销:{} - {}".format(code, json.dumps(total_datas[computed_index])))
                try:
                    trade_manager.start_cancel_buy(code)
                    # 取消买入标识
                    TradePointManager.delete_buy_point(code)
                    TradePointManager.delete_buy_cancel_point(code)
                except Exception as e:
                    pass
            if computed_index < len(local_today_datas[code])-1:
                # TODO数据尚未处理完,重新进入下单计算流程
                cls.start_compute_buy(code,computed_index+1,0,threshold_money)
                pass
        else:
            #无需撤买,记录撤买信号
            TradePointManager.set_buy_cancel_compute_start_data(code,buy_num_for_cancel,len(total_datas)-1,cancel_index)
            # 判断是否有虚拟下单
            unreal_buy_info=cls.unreal_buy_dict.get(code)
            if unreal_buy_info is not None:
                # unreal_buy_info 的内容格式为:(触法买操作下标,截图时间)
                # 真实下单
                logger_l2_trade.info(
                    "执行买入:{} ".format(code))
                try:
                    trade_manager.start_buy(code, unreal_buy_info[1], total_datas[unreal_buy_info[0]],
                                            unreal_buy_info[0])
                    TradePointManager.delete_buy_cancel_point(code)
                except Exception as e:
                    pass
                pass
            else:
                #终止执行
                pass
    @classmethod
    def start_compute_buy(cls,code,compute_start_index,origin_num,threshold_money):
        total_datas=local_today_datas[code]
        # 获取买入信号计算起始位置
        index, num = cls.get_order_begin_pos(code)
        # 是否为新获取到的位置
        new_get_pos = False
        if index is None:
            # 有买入信号
            has_single, index = cls.compute_order_begin_pos(code, len(total_datas) - compute_start_index , 3)
            if has_single:
                num = 0
                new_get_pos = True
                # TODO 记录买入信号位置
        if index is None:
            # 未获取到买入信号,终止程序
            return None
        # 买入纯买额统计
        # TODO 获取阈值
        threshold_money=10000000
        compute_index,buy_nums = cls.sum_buy_num_for_order(code,compute_start_index,num,threshold_money)
        if compute_index is not None:
            # 达到下单条件
            # 虚拟下单
            cls.unreal_buy_dict[code]=(compute_index,capture_time)
        else:
            # TODO 未达到下单条件,保存纯买额,设置纯买额
        pass
    # 获取下单起始信号
    @classmethod
    def get_order_begin_pos(cls, code):
        index, num = TradePointManager.get_buy_compute_start_data(code)
        return index, num
    # 获取撤单起始位置
    @classmethod
    def has_order_cancel_begin_pos(cls):
        # cancel_index:撤单信号起点
        # buy_num_for_cancel:从挂入点计算的纯买额
        # computed_index 计算的最后位置
        cancel_index, buy_num_for_cancel,computed_index = TradePointManager.get_buy_cancel_compute_start_data(code)
        return cancel_index, buy_num_for_cancel,computed_index
    # 计算下单起始信号
    # compute_data_count 用于计算的l2数据数量
    def compute_order_begin_pos(self, code, compute_data_count, continue_count):
        # 倒数100条数据查询
        datas = local_today_datas[code]
        __len = len(datas)
        if __len < continue_count:
            return None
        start_index = 0
        if compute_data_count > __len:
            compute_data_count = __len
        if __len > compute_data_count:
            start_index = __len - compute_data_count
        __time = None
        _limit_up_count_1s = 0
        _limit_up_count_1s_start_index = -1
        for i in range(start_index, __len - (continue_count - 1)):
            _val = datas[i]["val"]
            # 时间要>=09:30:00
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            # 有连续4个涨停买就标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                index_0 = i
                index_1 = -1
                index_2 = -1
                # index_3 = -1
                for j in range(index_0 + 1, __len):
                    # 涨停买
                    if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]):
                        index_1 = j
                        break
                if index_1 > 0:
                    for j in range(index_1 + 1, __len):
                        # 涨停买
                        if L2DataUtil.is_limit_up_price_buy(datas[j]["val"]):
                            index_2 = j
                            break
                # if index_2 > 0:
                #     for j in range(index_2 + 1, __len):
                #         # 涨停买
                #         if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0:
                #             index_3 = j
                if index_1 - index_0 == 1 and index_2 - index_1 == 1:  # and index_3 - index_2 == 1
                    logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i]))
                    return i
            # 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                if __time is None:
                    _time = datas[i]["val"]["time"]
                    _limit_up_count_1s = 1
                    _limit_up_count_1s_start_index = i
                elif _time == _val["time"]:
                    _limit_up_count_1s += 1
                else:
                    _time = datas[i]["val"]["time"]
                    _limit_up_count_1s = 1
                    _limit_up_count_1s_start_index = i
            elif _val["operateType"] == 1:
                # 买撤
                _time = None
                _limit_up_count_1s = 0
                _limit_up_count_1s_start_index = -1
            if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1:
                logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i]))
                return _limit_up_count_1s_start_index
        return None
    # 是否有撤销信号
    @classmethod
    def compute_order_cancel_begin_single(cls, code, compute_data_count, continue_count):
        datas = local_today_datas[code]
        __len = len(datas)
        if __len < continue_count:
            return None
        start_index = 0
        if compute_data_count > __len:
            compute_data_count = __len
        if __len > compute_data_count:
            start_index = __len - compute_data_count
        for i in range(start_index, __len - (continue_count - 1)):
            _val = datas[i]["val"]
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            # 有连续3个买撤
            if L2DataUtil.is_limit_up_price_buy_cancel(_val):
                index_0 = i
                index_1 = -1
                index_2 = -1
                for j in range(index_0 + 1, __len):
                    # 涨停买
                    if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]):
                        index_1 = j
                        break
                if index_1 > 0:
                    for j in range(index_1 + 1, __len):
                        # 涨停买
                        if L2DataUtil.is_limit_up_price_buy_cancel(datas[j]["val"]):
                            index_2 = j
                            break
                if index_1 - index_0 == 1 and index_2 - index_1 == 1:
                    logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i])))
                    return i
        return None
    # 保存下单位置
    def save_order_pos(self):
        pass
    # 是否可以下单
    def is_can_order(self):
        pass
    # 虚拟下单
    def unreal_order(self):
        pass
    # 设置虚拟挂买位
    def set_unreal_sure_order_pos(self):
        pass
    # 获取预估挂买位
    @classmethod
    def get_sure_order_pos(cls, code):
        index, data = TradeBuyDataManager.get_buy_sure_position(code)
        if index is None:
            return 0, len(local_today_datas[code]) - 1, local_today_datas[code][-1]
        else:
            return 1, index, data
    # 统计买入净买量
    @classmethod
    def sum_buy_num_for_order(cls, code, compute_start_index, origin_num, threshold_money):
        total_datas = local_today_datas[code]
        buy_nums = origin_num
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        for i in range(compute_start_index, len(total_datas)):
            _val = total_datas[i]["val"]
            # 有连续4个涨停买就标记计算起始点
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums >= threshold_num:
                    return i, buy_nums
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停买撤
                buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
        return None, buy_nums
    # 同一时间买入的概率计算
    @classmethod
    def get_same_time_property(cls, code):
        # TODO 与板块热度有关
        return 0.5
    # 统计买撤净买量
    @classmethod
    def sum_buy_num_for_cancel_order(cls, code, start_index, origin_num, threshold_money):
        buy_nums = origin_num
        total_datas = local_today_datas[code]
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price is None:
            raise Exception("涨停价无法获取")
        threshold_num = threshold_money / (limit_up_price * 100)
        # 获取预估挂买位 sure_type:0 虚拟挂买 1 实际挂买
        sure_type, sure_pos, sure_data = cls.get_sure_order_pos(code)
        same_time_property = cls.get_same_time_property(code)
        # 同一秒,在预估买入位之后的数据之和
        property_buy_num_count = 0
        for i in range(start_index, len(total_datas)):
            data = total_datas[i]
            _val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(_val):
                # 涨停买
                if i < sure_pos:
                    buy_nums += int(_val["num"]) * int(data["re"])
                elif sure_data["val"]["time"] == _val["time"]:
                    # 同一秒买入,而且还在预估买入位之后
                    property_buy_num_count += int(_val["num"]) * int(data["re"])
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                # 涨停撤买
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data, total_datas)
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index < sure_pos:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                    elif sure_data["val"]["time"] == _val["time"]:
                        # 同一秒,而且还在预估买入位之后按概率计算
                        property_buy_num_count -= int(_val["num"]) * int(data["re"])
                else:
                    # TODO 未找到买撤数据的买入点
                    pass
            property_buy_num = round(property_buy_num_count * same_time_property)
            if buy_nums + property_buy_num <= threshold_num:
                return i, buy_nums + property_buy_num,sure_type
        return None, buy_nums + round(property_buy_num_count * same_time_property),sure_type
def process_data(code, datas, capture_timestamp):
    now_time_str = datetime.now().strftime("%H:%M:%S")
    __start_time = round(t.time() * 1000)
    try:
        if len(datas) > 0:
            # 判断价格区间是否正确
            if not data_process.is_same_code_with_price(code, float(datas[0]["val"]["price"])):
                raise L2DataException(L2DataException.CODE_PRICE_ERROR,
                                      "股价不匹配 code-{} price-{}".format(code, datas[0]["val"]["price"]))
            # 加载历史数据
            load_l2_data(code)
            # 纠正数据
@@ -325,7 +771,11 @@
                # 拼接数据
                local_today_datas[code].extend(add_datas)
                l2_data_util.load_num_operate_map(local_today_num_operate_map, code, add_datas)
            total_datas = local_today_datas[code]
            # 买入确认点处理
            TradeBuyDataManager.process_buy_sure_position_info(code, capture_timestamp, total_datas, total_datas[-1],
                                                               add_datas)
            if len(add_datas) > 0:
                latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                # 时间差不能太大才能处理
                if __is_same_time(now_time_str, latest_time):
@@ -342,11 +792,14 @@
                        # 没有计算开始点
                        c_index = __get_limit_up_buy_start(code, len(add_datas) + 3, 3)
                        if c_index is not None:
                            total_datas = local_today_datas[code]
                            logger_l2_trade.info("找到买点:{} - {}".format(code, json.dumps(total_datas[c_index])))
                            # 触发数据分析 ,获取连续涨停标记数据
                            buy_nums = 0
                            # 获取涨停价
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            last_data_index = -1
                            for i in range(c_index, len(total_datas)):
                                _val = total_datas[i]["val"]
                                # 有连续4个涨停买就标记计算起始点
@@ -356,24 +809,32 @@
                                elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                    # 涨停买撤
                                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                                if buy_nums * limit_up_price * 100 > 1000 * 10000:
                                    last_data_index = i
                                    break
                            TradePointManager.set_buy_compute_start_data(code, buy_nums, c_index)
                            # 获取涨停价
                            limit_up_price = gpcode_manager.get_limit_up_price(code)
                            if limit_up_price is not None:
                                if buy_nums * limit_up_price * 100 > 1000 * 10000:
                                if last_data_index > -1:
                                    # 大于1000w就买
                                    logger_l2_trade.info(
                                        "执行买入:{} - 计算结束点: {}".format(code, json.dumps(total_datas[-1])))
                                    try:
                                        trade_manager.start_buy(code)
                                        trade_manager.start_buy(code, capture_timestamp, total_datas[last_data_index],
                                                                last_data_index)
                                        TradePointManager.delete_buy_cancel_point(code)
                                    except Exception as e:
                                        pass
                    else:
                        # 有计算开始点,计算新增的数据
                        buy_nums = 0
                        buy_nums = c_num
                        last_data = None
                        last_data_index = len(total_datas) - len(add_datas) - 1
                        # 获取涨停价
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        for data in add_datas:
                            last_data_index += 1
                            _val = data["val"]
                            if int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 0:
                                # 涨停买
@@ -381,16 +842,17 @@
                            elif int(_val["limitPrice"]) == 1 and int(_val["operateType"]) == 1:
                                # 涨停买撤
                                buy_nums -= int(_val["num"]) * int(data["re"])
                            if buy_nums * limit_up_price * 100 > 1000 * 10000:
                                last_data = data
                                break
                        TradePointManager.set_buy_compute_start_data(code, buy_nums)
                        latest_num = c_num + buy_nums
                        # 获取涨停价
                        limit_up_price = gpcode_manager.get_limit_up_price(code)
                        if limit_up_price is not None:
                            if latest_num * limit_up_price * 100 > 1000 * 10000:
                            if last_data is not None:
                                # 大于1000w就买
                                logger_l2_trade.info("执行买入:{} - 计算结束点: {}".format(code, json.dumps(add_datas[-1])))
                                try:
                                    trade_manager.start_buy(code)
                                    trade_manager.start_buy(code, capture_timestamp, last_data, last_data_index)
                                    TradePointManager.delete_buy_cancel_point(code)
                                except Exception as e:
                                    pass
@@ -697,25 +1159,25 @@
if __name__ == "__main__":
    # 删除大数据
    redis = redis_manager.RedisManager(1).getRedis()
    keys = redis.keys("big_data*")
    for key in keys:
        redis.delete(key)
    # print("big_data-{}-{}".format("123", int(round(t.time() * 1000))))
    # load_l2_data("002868")
    # keys= local_today_num_operate_map["002868"]
    # for k in keys:
    #     print(len( local_today_num_operate_map["002868"][k]))
    # pass
    # __set_buy_compute_start_data("000000", 100, 1)
    # __set_buy_compute_start_data("000000", 100)
    # __set_l2_data_latest_count("000333", 20)
    # print(type(get_l2_data_latest_count("000333")))
    # datas = ["2", "3", "4", "5"]
    # print(datas[4:])
    # print(decimal.Decimal("19.294").quantize(decimal.Decimal("0.00"), decimal.ROUND_HALF_UP))
    # 获取增量数据
    # 保存数据
    code = "000868"
    local_today_datas.setdefault(code, [])
    path = "C:/Users/Administrator/Desktop/demo/000868/"
    for file_name in os.listdir(path):
        p = "{}{}".format(path, file_name)
        f = open(p)
        for line in f.readlines():  # 依次读取每行
            line = line.strip()
            data = json.loads(line)
            result = __format_l2_data(data, code, 10.00)
            add_datas = get_add_data(code, result)
            print("增加的数量:", len(add_datas))
            if len(add_datas) > 0:
    # 拼接数据
                local_today_datas[code].extend(add_datas)
            if code in local_latest_datas:
                local_latest_datas[code] = result
            else:
                local_latest_datas.setdefault(code, result)
        f.close()
    for d in local_today_datas[code]:
        print(d["val"]["time"], d["val"]["num"], d["val"]["operateType"], d["re"])
l2_data_util.py
@@ -1,6 +1,13 @@
# l2数据工具
# 比较时间的大小
import hashlib
import json
import time
import l2_data_manager
import tool
from log import logger_l2_trade, logger_l2_big_data
from trade_gui import async_call
def compare_time(time1, time2):
@@ -56,7 +63,6 @@
    return time_seconds
# 计算时间的区间
def __compute_time_space_as_second(cancel_time, cancel_time_unit):
    __time = int(cancel_time)
@@ -75,21 +81,51 @@
# 根据买撤数据(与今日总的数据)计算买入数据
def get_buy_data_with_cancel_data(cancel_data, today_datas):
def get_buy_data_with_cancel_data(cancel_data):
    # 计算时间区间
    min_space, max_space = __compute_time_space_as_second(cancel_data["val"]["cancelTime"],
                                                          cancel_data["val"]["cancelTimeUnit"])
    max_time = __sub_time(cancel_data["val"]["time"], min_space)
    min_time = __sub_time(cancel_data["val"]["time"], max_space)
    for data in today_datas:
    buy_datas = l2_data_manager.local_today_num_operate_map.get("{}-{}".format(cancel_data["val"]["num"], "0"))
    if buy_datas is None:
        # 无数据
        return None, None
    for i in range(0, len(buy_datas)):
        data = buy_datas[i]
        if int(data["val"]["operateType"]) != 0:
            continue
        if int(data["val"]["num"]) != int(cancel_data["val"]["num"]):
            continue
        if compare_time(data["val"]["time"], min_time) > 0 and compare_time(data["val"]["time"], max_time) <= 0:
            return data
            return data["index"], data
    return None, None
__last_big_data = {}
@async_call
def save_big_data(code, same_time_nums, datas):
    latest_datas = __last_big_data.get(code)
    d1 = json.dumps(datas)
    d2 = json.dumps(latest_datas)
    if latest_datas is not None and d1.strip() == d2.strip():
    return None
    __last_big_data[code] = datas
    # 获取不一样的快照
    if latest_datas is not None:
        for i in range(len(d1)):
            if d1[i] != d2[i]:
                # 保存快照
                logger_l2_big_data.debug("code:{} d1:{}  d2:{}", code, d1[i - 60: i + 30], d2[i - 60: i + 30])
                break
    for key in same_time_nums:
        if same_time_nums[key] > 20:
            redis = l2_data_manager._redisManager.getRedis()
            redis.setex("big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(), d1)
            break
def test(datas):
@@ -101,6 +137,7 @@
    # today_datas=[{"val": {"operateType": 1, "num": 1520, "cancelTime": 1, "cancelTimeUnit": 0, "time": "09:32:30"}},{"val": {"operateType": 0, "num": 1520, "cancelTime": 0, "cancelTimeUnit": 0, "time": "09:31:31"}}]
    # result= get_buy_data_with_cancel_data(cancel_data,today_datas)
    # print(result)
    __datas = {}
    test(__datas)
    print(__datas)
    redis = l2_data_manager._redisManager.getRedis()
    keys = redis.keys("big_data-*")
    for k in keys:
        redis.delete(k)
l2_trade_factor.py
New file
@@ -0,0 +1,123 @@
# l2交易因子
class L2TradeFactorUtil:
    # 获取基础m值,返回单位为元
    @classmethod
    def get_base_safe_val(cls, zyltgb):
        yi = round(zyltgb / 100000000)
        if yi < 1:
            yi = 1
        return 6000000 + (yi - 1) * 500000
    # 自由流通市值影响比例
    @classmethod
    def get_zylt_rate(cls, zyltgb):
        yi = round(zyltgb / 100000000)
        if yi < 1:
            yi = 1
        if yi <= 30:
            rate = -0.04 + 0.01 * (yi - 1)
            if rate > 0.1:
                rate = 0.1
        else:
            rate = 0.09 - (yi - 31) * 0.002
            if rate < -0.1:
                rate = -0.1
        return round(rate, 4)
    # 获取行业影响比例
    # total_limit_percent为统计的比例之和乘以100
    @classmethod
    def get_industry_rate(cls, total_limit_percent):
        t = total_limit_percent / 10
        rate = t / 0.5 * 0.04
        if rate > 0.52:
            rate = 0.52
        return round(rate, 2)
    # 获取量影响比例
    @classmethod
    def get_volumn_rate(cls, day60_max, yest, today):
        old_volumn = yest
        base_rate = 0.25
        if day60_max > yest:
            old_volumn = day60_max
            base_rate = 0.26
        r = round(today / old_volumn, 2)
        print("比例:", r)
        rate = 0
        if r < 0.11:
            rate = base_rate - (r - 0.01)
        elif r < 0.45:
            rate = base_rate - r
        elif r < 0.75:
            rate = (base_rate - 0.2049) + (r - 0.74) * 0.4
        elif r < 1.38:
            rate = base_rate - (r - 0.75) * 0.8
        else:
            rate = base_rate - 0.5
        return round(rate, 4)
    # 当前股票首次涨停时间的影响比例
    @classmethod
    def get_limit_up_time_rate(cls, time_str):
        times = time_str.split(":")
        start_m = 9 * 60 + 30
        m = int(times[0]) * 60 + int(times[1])
        dif = m - start_m
        base_rate = 0.15
        rate = 0
        if dif < 1:
            rate = base_rate
        elif dif <= 5:
            rate = base_rate - dif * 0.01
        elif dif <= 120:
            # 11:30之前
            rate = 0.0985 - (dif - 6) * 0.0015
        else:
            rate = 0.0985 - (dif - 89 - 6) * 0.0015
            if rate < -0.15:
                rate = -0.15
        return round(rate, 4)
    # 纯万手哥影响值(手数》=9000 OR 金额》=500w)
    @classmethod
    def get_big_money_rate(cls, num):
        if num >= 8:
            return 0.08
        else:
            return num * 0.01
    @classmethod
    def compute_rate(cls, zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today,
                     limit_up_time, big_money_num):
        # 自由流通股本影响比例
        zyltgb_rate = cls.get_zylt_rate(zyltgb)
        # 行业涨停影响比例
        industry_rate = cls.get_industry_rate(total_industry_limit_percent)
        # 量影响比例
        volumn_rate=cls.get_volumn_rate(volumn_day60_max,volumn_yest,volumn_today)
        # 涨停时间影响比例
        limit_up_time_rate=cls.get_limit_up_time_rate(limit_up_time)
        # 万手哥影响
        big_money_rate=cls.get_big_money_rate(big_money_num)
        return 1-(zyltgb_rate+industry_rate+volumn_rate+limit_up_time_rate+big_money_rate);
# l2因子归因数据
class L2TradeFactorSourceDataUtil:
    # 是否为大单
    @classmethod
    def is_big_money(cls, data):
        if int(data["val"]["num"]) >= 9000:
            return True
        money = round(float(data["val"]["price"]) * int(data["val"]["num"]) * 100)
        if money >= 5000000:
            return True
        return False
if __name__ == "__main__":
    print(L2TradeFactorUtil.get_big_money_rate(32))
    print(L2TradeFactorUtil.get_big_money_rate(8))
    print(L2TradeFactorUtil.get_big_money_rate(0))
log.py
@@ -22,6 +22,9 @@
logger.add(get_path("l2", "l2_trade"), filter=lambda record: record["extra"].get("name") == "l2_trade",
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("l2", "l2_big_data"), filter=lambda record: record["extra"].get("name") == "l2_big_data",
           rotation="00:00", compression="zip", enqueue=True)
logger.add(get_path("juejin", "juejin_tick"), filter=lambda record: record["extra"].get("name") == "juejin_tick",
           rotation="00:00", compression="zip", enqueue=True)
@@ -40,6 +43,7 @@
logger_l2_error = logger.bind(name="l2_error")
logger_l2_process = logger.bind(name="l2_process")
logger_l2_trade = logger.bind(name="l2_trade")
logger_l2_big_data = logger.bind(name="l2_big_data")
logger_juejin_tick = logger.bind(name="juejin_tick")
logger_code_operate = logger.bind(name="code_operate")
logger_device = logger.bind(name="device")
server.py
@@ -10,11 +10,13 @@
import authority
import juejin
import l2_data_manager
import l2_data_util
import tool
import trade_manager
import l2_code_operate
from log import logger_l2_error, logger_l2_process, logger_device, logger_trade_delegate
from trade_data_manager import TradeCancelDataManager
class MyTCPServer(socketserver.TCPServer):
@@ -56,7 +58,7 @@
            if len(data) == 0:
                # print("客户端断开连接")
                break;
            _str = data.decode()
            _str = str(data, encoding="gb2312")
            if len(_str) > 0:
                # print("结果:",_str)
                type = data_process.parseType(_str)
@@ -65,8 +67,14 @@
                    try:
                        __start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, datas = l2_data_manager.parseL2Data(_str)
                        day, client, channel, code, capture_time, process_time, datas = l2_data_manager.parseL2Data(
                            _str)
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
                        # 保存l2截图时间
                        TradeCancelDataManager.save_l2_capture_time(client, channel, code, capture_time)
                        cid, pid = gpcode_manager.get_listen_code_pos(code)
                        # 判断目标代码位置是否与上传数据位置一致
                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
@@ -95,7 +103,7 @@
                                        self.l2CodeOperate.set_operate_code_state(client, channel, 1)
                                if gpcode_manager.is_listen(code):
                                    l2_data_manager.process_data(code, datas)
                                    l2_data_manager.process_data(code, datas, capture_timestamp)
                            except l2_data_manager.L2DataException as l:
                                # 单价不符
                                if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
@@ -106,7 +114,7 @@
                                        # todo 太敏感移除代码
                                        logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
                                        # 单价不一致时需要移除代码重新添加
                                        l2_code_operate.L2CodeOperate().remove_l2_listen(code)
                                        l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误")
                                        self.l2_data_error_dict[key] = round(time.time() * 1000)
                            except Exception as e:
@@ -199,10 +207,12 @@
                            juejin.accpt_price(item["code"], float(item["price"]))
                elif type == 30:
                    # 心跳信息
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
                    thsDead = data.get("thsDead")
                    logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data)))
                    data_process.saveClientActive(int(client_id), host)
                    data_process.saveClientActive(int(client_id), host, thsDead)
                    # print("心跳:", client_id)
                sk.send(return_str.encode())
trade_data_manager.py
New file
@@ -0,0 +1,158 @@
import json
import time
# 交易撤销数据管理器
import l2_data_util
import redis_manager
import tool
from log import logger_trade
class TradeCancelDataManager:
    capture_time_dict = {}
    # 保存截图时间
    @classmethod
    def save_l2_capture_time(cls, client_id, pos, code, capture_time):
        cls.capture_time_dict["{}-{}-{}".format(client_id, pos, code)] = {"create_time": round(time.time() * 1000),
                                                                          "capture_time": capture_time}
    # 获取最近一次的截图时间
    @classmethod
    def get_latest_l2_capture_time(cls, client_id, pos, code):
        val = cls.capture_time_dict.get("{}-{}-{}".format(client_id, pos, code))
        if val is None:
            return -1
        # 间隔时间不能大于1s
        if round(time.time() * 1000) - val["create_time"] > 1000:
            return -1
        return val["capture_time"]
    # 获取l2数据的增长速度
    @classmethod
    def get_l2_data_grow_speed(cls, client_id, pos, code, add_datas, capture_time):
        count = 0
        for data in add_datas:
            count += data["re"]
        lastest_capture_time = cls.get_latest_l2_capture_time(client_id, pos, code)
        if lastest_capture_time < 0:
            raise Exception("获取上次l2数据截图时间出错")
        return count / (capture_time - lastest_capture_time)
    # 获取买入确认点的位置
    @classmethod
    def get_buy_sure_position(cls, index, speed, trade_time):
        return index + round(speed * trade_time)
class TradeBuyDataManager:
    redisManager = redis_manager.RedisManager(0)
    buy_sure_position_dict = {}
    # 设置买入点的信息
    # trade_time: 买入点截图时间与下单提交时间差值
    # capture_time: 买入点截图时间
    # last_data: 买入点最后一条数据
    @classmethod
    def set_buy_position_info(cls, code, capture_time, trade_time, last_data, last_data_index):
        redis = cls.redisManager.getRedis()
        redis.setex("buy_position_info-{}".format(code), tool.get_expire(),
                    json.dumps((capture_time, trade_time, last_data, last_data_index)))
    # 获取买入点信息
    @classmethod
    def get_buy_position_info(cls, code):
        redis = cls.redisManager.getRedis()
        val_str = redis.get("buy_position_info-{}".format(code))
        if val_str is None:
            return None, None, None,None
        else:
            val = json.loads(val_str)
            return val[0], val[1], val[2], val[3]
    # 删除买入点信息
    @classmethod
    def remove_buy_position_info(cls, code):
        redis = cls.redisManager.getRedis()
        redis.delete("buy_position_info-{}".format(code))
    # 设置买入确认点信息
    @classmethod
    def __set_buy_sure_position(cls, code, index, data):
        logger_trade.debug("买入确认点信息: code:{} index:{} data:{}", code, index, data)
        redis = cls.redisManager.getRedis()
        key = "buy_sure_position-{}".format(code)
        redis.setex(key, tool.get_expire(), json.dumps((index, data)))
        cls.buy_sure_position_dict[code] = (index, data)
        # 移除下单信号的详细信息
        cls.remove_buy_position_info(code)
    # 清除买入确认点信息
    @classmethod
    def __clear_buy_sure_position(cls, code):
        redis = cls.redisManager.getRedis()
        key = "buy_sure_position-{}".format(code)
        redis.delete(key)
        if code in cls.buy_sure_position_dict:
            cls.buy_sure_position_dict.pop(code)
    # 获取买入确认点信息
    @classmethod
    def get_buy_sure_position(cls, code):
        temp = cls.buy_sure_position_dict.get(code)
        if temp is not None:
            return temp[0], temp[1]
        redis = cls.redisManager.getRedis()
        key = "buy_sure_position-{}".format(code)
        val = redis.get(key)
        if val is None:
            return None, None
        else:
            val = json.loads(val)
            cls.buy_sure_position_dict[code] = (val[0], val[1])
            return val[0], val[1]
    # 处理买入确认点信息
    @classmethod
    def process_buy_sure_position_info(cls, code, capture_time, l2_today_datas, l2_latest_data, l2_add_datas):
        buy_capture_time, trade_time, l2_data, l2_data_index = cls.get_buy_position_info(code)
        if buy_capture_time is None:
            # 没有购买者信息
            return None
        if capture_time - buy_capture_time < trade_time:
            # 时间未等待足够
            return None
        # 时间差是否相差2s及以上
        old_time = l2_data["val"]["time"]
        new_time = l2_latest_data["val"]["time"]
        old_time_int = l2_data_util.get_time_as_seconds(old_time)
        new_time_int = l2_data_util.get_time_as_seconds(new_time)
        if new_time_int - old_time_int >= 2:
            # 间隔2s及其以上表示数据异常
            # 间隔2s以上的就以下单时间下一秒末尾作为确认点
            start_index = l2_data_index
            if len(l2_today_datas)-1 > start_index:
                for i in range(start_index + 1, len(l2_today_datas)):
                    _time = l2_today_datas[i]["val"]["time"]
                    if l2_data_util.get_time_as_seconds(_time) - old_time_int >= 2:
                        index = i - 1
                        data = l2_today_datas[index]
                        cls.__set_buy_sure_position(code, index, data)
                        break
            else:
                cls.__set_buy_sure_position(code, l2_data_index, l2_data)
        elif new_time_int - old_time_int >= 0:
            # 间隔2s内表示数据正常,将其位置设置为新增数据的中间位置
            index = len(l2_today_datas)-1 - (len(l2_add_datas)) // 2
            data = l2_today_datas[index]
            cls.__set_buy_sure_position(code, index, data)
        else:
            # 间隔时间小于0 ,一般产生原因是数据回溯产生,故不做处理
            logger_trade.warning("预估委托位置错误:数据间隔时间小于0 code-{}", code)
            pass
if __name__ == "__main__":
    TradeBuyDataManager.set_buy_capture_time("123456", 178938828, 1232)
    print(TradeBuyDataManager.get_buy_capture_time("123456"))
trade_gui.py
@@ -8,11 +8,13 @@
from log import *
from threading import Thread
def async_call(fn):
    def wrapper(*args, **kwargs):
        Thread(target=fn, args=args, kwargs=kwargs).start()
    return wrapper
class THSGuiTrade(object):
    __instance = None
@@ -307,7 +309,7 @@
    # 撤买
    def cancel_buy(self, code):
        self.buy_cancel_lock.acquire()
        global code_input
        code_input = 0
        try:
            logger_trade_gui.info("开始撤单:code-{}".format(code))
            win = self.cancel_win
trade_manager.py
@@ -4,7 +4,8 @@
import l2_code_operate
import mongo_data
import tool
from trade_gui import THSGuiTrade
from trade_data_manager import TradeBuyDataManager
from trade_gui import THSGuiTrade, async_call
import time as t
from l2_code_operate import *
import l2_data_manager
@@ -156,7 +157,7 @@
def forbidden_trade(code):
    add_to_forbidden_trade_codes(code)
    l2_data_manager.remove_from_l2_fixed_codes(code)
    l2_code_operate.L2CodeOperate.get_instance().remove_l2_listen(code)
    l2_code_operate.L2CodeOperate.get_instance().remove_l2_listen(code, "禁止代码交易")
# 是否在禁止交易代码中
@@ -167,7 +168,7 @@
# 开始交易
def start_buy(code):
def start_buy(code, capture_timestamp,last_data,last_data_index):
    # 是否禁止交易
    if is_in_forbidden_trade_codes(code):
        raise Exception("禁止交易")
@@ -183,21 +184,44 @@
    # 买一手的资金是否足够
    if price * 100 > money:
        raise Exception("账户可用资金不足")
    try:
        print("开始买入")
        logger_trade.info("{}开始买入".format(code))
        set_trade_state(code, TRADE_STATE_BUY_PLACE_ORDER)
    __buy(code, price, trade_state, capture_timestamp,last_data,last_data_index)
# 购买
@async_call
def __buy(code, price, trade_state, capture_timestamp, last_data,last_data_index):
    try:
        guiTrade.buy(code, price)
        __place_order_success(code, capture_timestamp, last_data,last_data_index)
    except Exception as e:
        __place_order_fail(code, trade_state)
        logger_trade.error("{}买入异常{}".format(code, str(e)))
        raise e
# 下单成功
def __place_order_success(code, capture_timestamp, last_data,last_data_index):
    # 买入结束点
    use_time = round(time.time() * 1000) - capture_timestamp
    logger_trade.info("{}-从截图到下单成功总费时:{}".format(code, use_time))
        # 下单成功,加入固定代码库
        l2_data_manager.add_to_l2_fixed_codes(code)
    # 记录下单的那一帧图片的截图时间与交易用时
    TradeBuyDataManager.set_buy_position_info(code, capture_timestamp, use_time, last_data,last_data_index)
        print("买入结束")
        logger_trade.info("{}买入成功".format(code))
    except Exception as e:
# 下单失败
def __place_order_fail(code, trade_state):
        print("买入异常")
        logger_trade.error("{}买入异常{}".format(code, str(e)))
        # 状态还原
        set_trade_state(code, trade_state)
        raise e
# 开始取消买入
@@ -212,9 +236,7 @@
        logger_trade.info("{}开始撤单".format(code))
        set_trade_state(code, TRADE_STATE_BUY_CANCEL_ING)
        guiTrade.cancel_buy(code)
        # 下单成功,加入固定代码库
        l2_data_manager.remove_from_l2_fixed_codes(code)
        logger_trade.info("{}撤单成功".format(code))
        __cancel_success(code)
    except Exception as e:
        # 状态还原
        set_trade_state(code, trade_state)
@@ -222,6 +244,14 @@
        raise e
# 取消委托成功
def __cancel_success(code):
    TradeBuyDataManager.remove_buy_capture_time(code)
    # 下单成功,加入固定代码库
    l2_data_manager.remove_from_l2_fixed_codes(code)
    logger_trade.info("{}撤单成功".format(code))
# 处理交易成功数据
def process_trade_success_data(datas):
    if datas is None: