Administrator
2023-01-02 ed9e2367eea9baa6c8bea82e0f81c209ffb2a56f
撤单策略再次修改
18个文件已修改
1374 ■■■■ 已修改文件
code_volumn_manager.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py 128 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_process.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gpcode_manager.py 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_code_operate.py 49 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager_new.py 771 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_util.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 90 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tool.py 32 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_data_manager.py 69 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_gui.py 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_queue_manager.py 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_volumn_manager.py
@@ -8,6 +8,7 @@
import global_util
import redis_manager
import tool
from log import logger_day_volumn
__redis_manager = redis_manager.RedisManager(0)
@@ -35,6 +36,7 @@
# 设置今日量
def set_today_volumn(code, volumn):
    logger_day_volumn.info("code:{} volumn:{}".format(code, volumn))
    redis = __redis_manager.getRedis()
    global_util.today_volumn[code] = volumn
    redis.setex("volumn_today-{}".format(code), tool.get_expire(), volumn)
data_export_util.py
@@ -7,15 +7,51 @@
import xlwt
import l2_data_util
import log
def export_l2_data(code, datas, dest_dir="D:/export/l2"):
import l2_data_manager
def export_l2_excel(code,date=None):
    # 获取L2的数据
    local_today_datas=log.load_l2_from_log(date)
    datas = local_today_datas[code]
    # 获取L2处理位置信息
    process_indexs = log.get_l2_process_position(code,date)
    trade_indexs = log.get_l2_trade_position(code,date)
    export_l2_data(code, datas, process_indexs, trade_indexs)
def export_l2_data(code, datas, process_indexs, trade_indexs, dest_dir="D:/export/l2"):
    def find_process_index(index):
        for i in range(0, len(process_indexs)):
            if process_indexs[i][0] <= index <= process_indexs[i][1]:
                return i
        return len(process_indexs)
    def find_trade_index(index):
        for i in range(0, len(trade_indexs)):
            if trade_indexs[i][1] == index:
                return trade_indexs[i]
        return None
    # 数据预处理
    num_operate_map = {}
    l2_data_util.load_num_operate_map(num_operate_map, code, datas)
    num_dict = {}
    for data in datas:
        if data["val"]["num"] not in num_dict:
            num_dict[data["val"]["num"]] = []
        num_dict[data["val"]["num"]].append(data)
    local_time = time.strftime("%Y%m%dT%H%M%S", time.localtime(time.time()))
    file_name = "{}/{}_{}.xls".format(dest_dir, code, local_time)
    file_name_txt = "{}/{}_{}.txt".format(dest_dir, code, local_time)
    openfile = open(file_name_txt,'w')
    openfile = open(file_name_txt, 'w')
    try:
        for data in datas:
            openfile.write(json.dumps(data)+"\n")
            openfile.write(json.dumps(data) + "\n")
    finally:
        openfile.close()
    wb = xlwt.Workbook()
@@ -23,20 +59,41 @@
    ws.write(0, 0, '序号')
    ws.write(0, 1, '时间')
    ws.write(0, 2, '买撤间隔')
    ws.write(0, 3, '价格')
    ws.write(0, 4, '手数')
    ws.write(0, 5, '类型')
    ws.write(0, 6, '重复数量')
    ws.write(0, 3, '金额')
    ws.write(0, 4, '价格')
    ws.write(0, 5, '手数')
    ws.write(0, 6, '类型')
    ws.write(0, 7, '重复数量')
    ws.write(0, 8, '撤单时间')
    index = 0
    for data in datas:
        index += 1
        ws.write(index, 0, data["index"])
        ws.write(index, 1, data["val"]["time"])
        trade_info = find_trade_index(data["index"])
        font = xlwt.Font()
        if trade_info:
            if trade_info[0] == 0:
                font.colour_index = 53
            elif trade_info[0] == 1:
                font.colour_index = 17
            elif trade_info[0] == 2:
                font.colour_index = 10
                ws.write(index, 8, trade_info[2])
        style = None
        if find_process_index(data["index"]) % 2 == 0:
            style = xlwt.easyxf('pattern: pattern solid')
        else:
            style = xlwt.easyxf('pattern: pattern solid, fore_colour light_yellow')
        style.font = font
        cancel_style = xlwt.easyxf('pattern: pattern solid, fore_colour gray25')
        ws.write(index, 0, data["index"], style)
        ws.write(index, 1, data["val"]["time"], style)
        cancel_time = data["val"]["cancelTime"]
        if cancel_time == '0':
            cancel_time = ''
        else:
            cancel_time= "{}".format(cancel_time)
            cancel_time = "{}".format(cancel_time)
        if len(cancel_time) > 0:
            if int(data["val"]["cancelTimeUnit"]) == 0:
                cancel_time += "s"
@@ -45,40 +102,52 @@
            elif int(data["val"]["cancelTimeUnit"]) == 2:
                cancel_time += "h"
        ws.write(index, 2, cancel_time)
        ws.write(index, 3, data["val"]["price"])
        ws.write(index, 2, cancel_time, style)
        ws.write(index, 4, data["val"]["price"], style)
        if int(data["val"]["operateType"]) == 1 or int(data["val"]["operateType"]) == 2:
            ws.write(index, 4, 0-int(data["val"]["num"]))
            ws.write(index, 5, 0 - int(data["val"]["num"]), style)
        else:
            ws.write(index, 4, int(data["val"]["num"]))
            ws.write(index, 5, int(data["val"]["num"]), style)
        limit_price=""
        limit_price = ""
        if int(data["val"]["limitPrice"]) == 1:
            limit_price="涨停"
            limit_price = "涨停"
        elif int(data["val"]["limitPrice"]) == 2:
            limit_price="跌停"
            limit_price = "跌停"
        if int(data["val"]["operateType"]) == 0:
            if len(limit_price)>0:
                ws.write(index, 5, '买 ({})'.format(limit_price))
            if len(limit_price) > 0:
                ws.write(index, 6, '买 ({})'.format(limit_price), style)
            else:
                ws.write(index, 5, '买')
                ws.write(index, 6, '买', style)
        elif int(data["val"]["operateType"]) == 1:
            if len(limit_price) > 0:
                ws.write(index, 5, '买撤 ({})'.format(limit_price))
                ws.write(index, 6, '买撤 ({})'.format(limit_price), style)
            else:
                ws.write(index, 5, '买撤')
                ws.write(index, 6, '买撤', style)
        elif int(data["val"]["operateType"]) == 2:
            if len(limit_price) > 0:
                ws.write(index, 5, '卖 ({})'.format(limit_price))
                ws.write(index, 6, '卖 ({})'.format(limit_price), style)
            else:
                ws.write(index, 5, '卖')
                ws.write(index, 6, '卖', style)
        elif int(data["val"]["operateType"]) == 3:
            if len(limit_price) > 0:
                ws.write(index, 5, '卖撤 ({})'.format(limit_price))
                ws.write(index, 6, '卖撤 ({})'.format(limit_price), style)
            else:
                ws.write(index, 5, '卖撤')
        ws.write(index, 6, data["re"])
                ws.write(index, 6, '卖撤', style)
        ws.write(index, 7, data["re"], style)
        # 查询是否撤单
        if int(data["val"]["operateType"]) == 0:
            cancel = False
            # 买
            for d in num_dict[data["val"]["num"]]:
                if int(d["val"]["operateType"]) == 1:
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(d, num_operate_map[code])
                    if buy_index == data["index"]:
                        ws.write(index, 8, "{}-{}".format(d["index"], d["val"]["time"]), cancel_style)
                        break
        ws.write(index, 3, "{}万".format(round(int(data["val"]["num"]) * float(data["val"]["price"]) / 100, 2)), style)
    wb.save(file_name)
    return file_name
@@ -124,5 +193,6 @@
if __name__ == "__main__":
    _t = "1661391666562"
    print(_t[-3:])
    codes = ["000610"]
    for code in codes:
        export_l2_excel(code,"2022-12-27")
data_process.py
@@ -2,6 +2,7 @@
import json
import logging
import redis_manager
from log import logger_l2_error
__redisManager = redis_manager.RedisManager(0)
@@ -21,6 +22,8 @@
        return dict["type"]
    except Exception as e:
        logging.exception(e)
        logger_l2_error.error(str)
        print(str)
        return -1
gpcode_manager.py
@@ -65,6 +65,18 @@
    return val.get(name)
def get_code_name(code):
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("gp_list_names")
    if not val:
        return None
    val = json.loads(val)
    for key in val:
        if val[key] == code:
            return key
    return None
def get_name_codes():
    redis_instance = __redisManager.getRedis()
    val = redis_instance.get("gp_list_names")
@@ -332,12 +344,26 @@
    redis_instance.setex("gp_operate-{}".format(code), 30, "1")
# 批量设置正在操作的代码
def set_operates(codes):
    redis_instance = __redisManager.getRedis()
    for code in codes:
        redis_instance.setex("gp_operate-{}".format(code), 30, "1")
# 移除正在操作的代码
def rm_operate(code):
    redis_instance = __redisManager.getRedis()
    redis_instance.delete("gp_operate-{}".format(code))
# 批量移除正在操作的代码
def rm_operates(codes):
    redis_instance = __redisManager.getRedis()
    for code in codes:
        redis_instance.delete("gp_operate-{}".format(code))
if __name__ == '__main__':
    _start = time.time()
    redis_instance = __redisManager.getRedis()
gui.py
@@ -323,6 +323,11 @@
            except:
                pass
            # 获取有效的L2客户端数量
            l2_client_count = client_manager.getValidL2Clients()
            if len(l2_client_count) < 2:
                normal = False
            # 状态有问题,需要报警
            if not normal:
                alert_util.alarm()
@@ -502,7 +507,6 @@
        def set_accept_l2():
            settings.set_accept_l2(accept_l2.get())
        width = 800
        height = 290
        frame = Frame(root, {"height": height, "width": width, "bg": "#DDDDDD"})
@@ -510,14 +514,13 @@
        cl.place(x=5, y=5)
        accept_l2 = IntVar()
        ch_accept_l2 = Checkbutton(frame, text='接受l2数据', variable=accept_l2, onvalue=1, offvalue=0, background="#DDDDDD",activebackground="#DDDDDD",command=set_accept_l2)
        ch_accept_l2 = Checkbutton(frame, text='接受l2数据', variable=accept_l2, onvalue=1, offvalue=0,
                                   background="#DDDDDD", activebackground="#DDDDDD", command=set_accept_l2)
        ch_accept_l2.place(x=width - 350, y=5)
        if settings.is_accept_l2_data():
            accept_l2.set(1)
        else:
            accept_l2.set(0)
        btn = Button(frame, text="每日初始化", command=init)
        btn.place(x=width - 250, y=5)
@@ -595,14 +598,15 @@
                table_delegate.model.addRow()
                table_delegate.model.setValueAt(data["time"], index, 0)
                table_delegate.model.setValueAt(data["code"], index, 1)
                table_delegate.model.setValueAt(data["num"], index, 2)
                table_delegate.model.setValueAt(data.get("price"), index, 3)
                table_delegate.model.setValueAt(data.get("trade_price"), index, 4)
                table_delegate.model.setValueAt(data.get("trade_num"), index, 5)
                table_delegate.model.setValueAt(data["apply_time"], index, 2)
                table_delegate.model.setValueAt(data["num"], index, 3)
                table_delegate.model.setValueAt(data.get("price"), index, 4)
                table_delegate.model.setValueAt(data.get("trade_price"), index, 5)
                table_delegate.model.setValueAt(data.get("trade_num"), index, 6)
                if int(data["type"]) > 0:
                    table_delegate.model.setValueAt("卖出", index, 6)
                    table_delegate.model.setValueAt("卖出", index, 7)
                else:
                    table_delegate.model.setValueAt("买入", index, 6)
                    table_delegate.model.setValueAt("买入", index, 7)
                index += 1
            table_delegate.redraw()
@@ -630,7 +634,7 @@
            pass
        def create_table(_frame, data, cell_width=70):
        def create_table(_frame, data, cell_width=75):
            table = tkintertable.TableCanvas(_frame, data=data, read_only=True, width=table_width,
                                             height=table_height, thefont=('微软雅黑', 10), cellwidth=cell_width,
                                             rowheaderwidth=20)
@@ -665,7 +669,7 @@
        cl = Label(frame, text="今日委托:", bg="#DDDDDD", fg="#666666")
        cl.place(x=5, y=30)
        delegate_datas = {}
        delegate_datas["row{}".format(0)] = {'委托时间': '', '代码': '', '委托数量': '', '委托价格': '', '成交均价': '', '成交数量': '',
        delegate_datas["row{}".format(0)] = {'委托时间': '', '代码': '', '申报时间': '', '委托数量': '', '委托价格': '', '成交均价': '', '成交数量': '',
                                             '操作': ''}
        cl = Label(frame, text="更新时间:", bg="#DDDDDD", fg="#666666")
juejin.py
@@ -58,6 +58,9 @@
def init_data():
    # 删除之前的分钟级大单撤单数据
    l2_data_manager_new.SecondAverageBigNumComputer.clear_data()
    l2_data_manager_new.AverageBigNumComputer.clear_data()
    # 删除所有的涨停卖数据
    l2_data_manager_new.L2LimitUpSellStatisticUtil.clear()
    # 重置所有的大单数据
l2_code_operate.py
@@ -58,6 +58,35 @@
        finally:
            gpcode_manager.rm_operate(gpcode)
    @staticmethod
    def betchSetGPCode(client_id, codes_info):
        # codes_info 格式[(0,"000333")]
        datas = []
        for info in codes_info:
            datas.append({"index": info[0], "code": info[1]})
        data = {"action": "betchSetGPCodes", "data": datas, "force": True}
        logger_code_operate.info("betchSetGPCodes:clientid-{}  info-{}".format(client_id, codes_info))
        codes = []
        for item in codes_info:
            codes.append(item[1])
        gpcode_manager.set_operates(codes)
        try:
            result = server.send_msg(client_id, data)
            logger_code_operate.info(
                "betchSetGPCodes结束({}):clientid-{}  info-{}".format(result, client_id, codes_info))
            jsonData = json.loads(result)
            if jsonData["code"] == 0:
                for item in codes_info:
                    gpcode_manager.set_listen_code_by_pos(client_id, item[0], item[1])
                    L2CodeOperate.set_operate_code_state(client_id, item[0], 1)
        except Exception as e:
            logging.exception(e)
            logger_code_operate.error("setGPCode出错:{}", str(e))
        finally:
            gpcode_manager.rm_operates(codes)
    @classmethod
    def run(cls):
        cls.__lock.acquire()
@@ -269,15 +298,13 @@
            logger_code_operate.error("client:{} msg:{}".format(client_id, str(e)))
# 批量设置代码
def betch_set_client_codes(client_id,codes_info):
    # 获取涨幅前16位代码
    L2CodeOperate.betchSetGPCode(client_id, codes_info)
if __name__ == "__main__":
    codes = [(0, "002210"), (1, "600056"), (2, "002591"), (3, "002193"), (4, "603186"), (5, "600833"), (6, "000736"),
             (7, "603000")]
    datas = []
    for item in codes:
        datas.append({"index": item[0], "code": item[1]})
    data = {"action": "betchSetGPCodes", "data": datas}
    try:
        result = server.send_msg(3, data)
        print("设置结果:", result)
    except Exception as e:
        logging.exception(e)
    codes = [(0, "000615"), (1, "002264"), (2, "600225"), (3, "002495"), (4, "600572"), (5, "002279"), (6, "002591"),
             (7, "002880")]
    L2CodeOperate.betchSetGPCode(3, codes)
l2_data_manager.py
@@ -828,6 +828,7 @@
            L2BetchCancelBigNumProcessor.del_recod(code)
            cls.debug(code, "执行撤单成功")
        except Exception as e:
            logging.exception(e)
            cls.debug(code, "执行撤单异常:{}", str(e))
    @classmethod
@@ -2173,7 +2174,4 @@
if __name__ == "__main__":
    # 处理数据
    code = "002898"
    load_l2_data(code)
    L2LimitUpMoneyStatisticUtil.verify_num(code, 70582, "09:42:00")
    clear_l2_data("603912")
l2_data_manager_new.py
@@ -6,6 +6,7 @@
import big_money_num_manager
import code_data_util
import constant
import global_util
import gpcode_manager
import industry_codes_sort
@@ -14,12 +15,14 @@
import l2_data_util
import l2_trade_factor
import l2_trade_test
import l2_trade_util
import limit_up_time_manager
import redis_manager
import ths_industry_util
import tool
import trade_manager
import trade_queue_manager
import trade_data_manager
from l2_data_manager import L2DataException, TradePointManager, local_today_datas, L2DataUtil, load_l2_data, \
    local_today_num_operate_map
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_buy_1_volumn, \
@@ -177,7 +180,6 @@
    # capture_timestamp:截图时间戳
    def process(cls, code, datas, capture_timestamp):
        cls.random_key[code] = random.randint(0, 100000)
        now_time_str = datetime.datetime.now().strftime("%H:%M:%S")
        __start_time = round(t.time() * 1000)
        try:
            if len(datas) > 0:
@@ -194,64 +196,86 @@
                        local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"] + 1
                add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
                # ---------- 判断是否需要计算大单 -----------
                try:
                    average_need, buy_single_index, buy_exec_index = AverageBigNumComputer.is_need_compute_average(code)
                    # 计算平均大单
                    if average_need:
                        end_index = local_today_datas[code][-1]["index"]
                        if len(add_datas) > 0:
                            end_index = add_datas[-1]["index"]
                        AverageBigNumComputer.compute_average_big_num(code, buy_exec_index, buy_single_index, end_index)
                except Exception as e:
                    logging.exception(e)
                # -------------数据增量处理------------
                if len(add_datas) > 0:
                    # 拼接数据
                    local_today_datas[code].extend(add_datas)
                    l2_data_util.load_num_operate_map(l2_data_manager.local_today_num_operate_map, code, add_datas)
                    # 第1条数据是否为09:30:00
                    if add_datas[0]["val"]["time"] == "09:30:00":
                        if global_util.cuurent_prices.get(code):
                            price_data = global_util.cuurent_prices.get(code)
                            if price_data[1]:
                                # 当前涨停价,设置涨停时间
                                logger_l2_process.info("开盘涨停:{}", code)
                                # 保存涨停时间
                                limit_up_time_manager.save_limit_up_time(code, "09:30:00")
                total_datas = local_today_datas[code]
                __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间")
                try:
                    if len(add_datas) > 0:
                        latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                        # 时间差不能太大才能处理
                        # TODO 暂时关闭处理
                        if l2_data_manager.L2DataUtil.is_same_time(now_time_str, latest_time):
                            # 判断是否已经挂单
                            state = trade_manager.get_trade_state(code)
                            start_index = len(total_datas) - len(add_datas)
                            end_index = len(total_datas) - 1
                            if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                                # 已挂单
                                cls.__process_order(code, start_index, end_index, capture_timestamp)
                            else:
                                # 未挂单
                                cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                        logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                               add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                               capture_timestamp)
                        __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
                    cls.process_add_datas(code, add_datas, capture_timestamp, __start_time)
                finally:
                    # 保存数据
                    l2_data_manager.save_l2_data(code, datas, add_datas)
                    __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                                       "保存数据时间({})".format(len(add_datas)))
        finally:
            if code in cls.unreal_buy_dict:
                cls.unreal_buy_dict.pop(code)
    @classmethod
    def process_add_datas(cls, code, add_datas, capture_timestamp, __start_time):
        if len(add_datas) > 0:
            now_time_str = datetime.datetime.now().strftime("%H:%M:%S")
            # 拼接数据
            local_today_datas[code].extend(add_datas)
            l2_data_util.load_num_operate_map(l2_data_manager.local_today_num_operate_map, code, add_datas)
            # ---------- 判断是否需要计算大单 -----------
            try:
                average_need, buy_single_index, buy_exec_index = AverageBigNumComputer.is_need_compute_average(
                    code, local_today_datas[code][-1])
                # 计算平均大单
                if average_need:
                    end_index = local_today_datas[code][-1]["index"]
                    if len(add_datas) > 0:
                        end_index = add_datas[-1]["index"]
                    AverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
                                                                  end_index)
            except Exception as e:
                logging.exception(e)
            try:
                average_need, buy_single_index, buy_exec_index = SecondAverageBigNumComputer.is_need_compute_average(
                    code, local_today_datas[code][-1])
                # 计算平均大单
                if average_need:
                    end_index = local_today_datas[code][-1]["index"]
                    if len(add_datas) > 0:
                        end_index = add_datas[-1]["index"]
                    SecondAverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_single_index,
                                                                        end_index)
            except Exception as e:
                logging.exception(e)
            # 第1条数据是否为09:30:00
            if add_datas[0]["val"]["time"] == "09:30:00":
                if global_util.cuurent_prices.get(code):
                    price_data = global_util.cuurent_prices.get(code)
                    if price_data[1]:
                        # 当前涨停价,设置涨停时间
                        logger_l2_process.info("开盘涨停:{}", code)
                        # 保存涨停时间
                        limit_up_time_manager.save_limit_up_time(code, "09:30:00")
        total_datas = local_today_datas[code]
        __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间")
        if len(add_datas) > 0:
            latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
            # 时间差不能太大才能处理
            if l2_data_manager.L2DataUtil.is_same_time(now_time_str,
                                                       latest_time) and not l2_trade_util.is_in_forbidden_trade_codes(
                code):
                # 判断是否已经挂单
                state = trade_manager.get_trade_state(code)
                start_index = len(total_datas) - len(add_datas)
                end_index = len(total_datas) - 1
                if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER or state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # 已挂单
                    cls.__process_order(code, start_index, end_index, capture_timestamp)
                else:
                    # 未挂单
                    cls.__process_not_order(code, start_index, end_index, capture_timestamp)
            logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                   add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                   capture_timestamp)
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
    # 处理未挂单
    @classmethod
@@ -260,8 +284,7 @@
        # 获取阈值
        threshold_money, msg = cls.__get_threshmoney(code)
        if round(t.time() * 1000) - __start_time > 10:
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                               "获取m值数据耗时")
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "获取m值数据耗时")
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    # 测试专用
@@ -279,9 +302,31 @@
            return
        # 获取买入信号起始点
        buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
        # 撤单计算,只看买1
        cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
                                                                           buy_single_index, buy_exec_index)
        # 撤单计算,看秒级大单撤单
        try:
            b_need_cancel, b_cancel_data = SecondAverageBigNumComputer.need_cancel(code, buy_single_index,
                                                                                   buy_exec_index, start_index,
                                                                                   end_index)
            if b_need_cancel and not cancel_data:
                cancel_data = b_cancel_data
                cancel_msg = "申报时间截至大单撤销比例触发阈值"
        except Exception as e:
            logging.exception(e)
        # 撤单计算,看分钟级大单撤单
        try:
            b_need_cancel, b_cancel_data = AverageBigNumComputer.need_cancel(code, buy_single_index, buy_exec_index,
                                                                             start_index, end_index)
            if b_need_cancel and not cancel_data:
                cancel_data = b_cancel_data
                cancel_msg = "1分钟内大单撤销比例触发阈值"
        except Exception as e:
            logging.exception(e)
        if not cancel_data:
            # 统计板上卖
@@ -294,13 +339,18 @@
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
                                          gpcode_manager.get_limit_up_price(code))
        if cancel_data:
            if cancel_data["index"] == 175:
                print("进入调试")
            cls.debug(code, "触发撤单,撤单位置:{} ,撤单原因:{}", cancel_data["index"], cancel_msg)
            # 撤单
            cls.cancel_buy(code, cancel_msg)
            # 继续计算下单
            cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time)
            if cls.cancel_buy(code, cancel_msg):
                # 撤单成功,继续计算下单
                cls.__process_not_order(code, cancel_data["index"] + 1, end_index, capture_time)
            else:
                # 撤单尚未成功
                pass
        else:
            # 如果有虚拟下单需要真实下单
            unreal_buy_info = cls.unreal_buy_dict.get(code)
@@ -336,9 +386,11 @@
                cls.debug(code, "开始执行买入")
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
                trade_data_manager.placeordercountmanager.place_order(code)
                # 获取买入位置信息
                try:
                    buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
                    SecondAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    LongAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                except Exception as e:
@@ -355,28 +407,31 @@
    # 是否可以取消
    @classmethod
    def __can_cancel(cls, code):
        if constant.TEST:
            return True, ""
        # 暂时注释掉
        # 14点后如果是板块老大就不需要取消了
        now_time_str = tool.get_now_time_str()
        if int(now_time_str.replace(":", "")) >= 140000:
            industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
            if industry is None:
                return True, "没有获取到行业"
            codes_index = industry_codes_sort.sort_codes(codes, code)
            if codes_index is not None and codes_index.get(code) is not None:
                # 同一板块中老二后面的不能买
                if codes_index.get(code) == 0:
                    return False, "14:00后老大不能撤单"
                elif codes_index.get(code) == 1:
                    # 判断老大是否都是09:30:00涨停的
                    # 同1板块老大是09:30:00涨停,老二14:00砸开的不撤
                    first_count = 0
                    for key in codes_index:
                        if codes_index[key] == 0:
                            first_count += 1
                            if limit_up_time_manager.get_limit_up_time(key) == "09:30:00":
                                first_count -= 1
                    if first_count == 0:
                        return False, "14:00后老大都开盘涨停,老二不能撤单"
        # now_time_str = tool.get_now_time_str()
        # if int(now_time_str.replace(":", "")) >= 140000:
        #     industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        #     if industry is None:
        #         return True, "没有获取到行业"
        #     codes_index = industry_codes_sort.sort_codes(codes, code)
        #     if codes_index is not None and codes_index.get(code) is not None:
        #         # 同一板块中老二后面的不能买
        #         if codes_index.get(code) == 0:
        #             return False, "14:00后老大不能撤单"
        #         elif codes_index.get(code) == 1:
        #             # 判断老大是否都是09:30:00涨停的
        #             # 同1板块老大是09:30:00涨停,老二14:00砸开的不撤
        #             first_count = 0
        #             for key in codes_index:
        #                 if codes_index[key] == 0:
        #                     first_count += 1
        #                     if limit_up_time_manager.get_limit_up_time(key) == "09:30:00":
        #                         first_count -= 1
        #             if first_count == 0:
        #                 return False, "14:00后老大都开盘涨停,老二不能撤单"
        return True, ""
@@ -414,11 +469,10 @@
        except Exception as e:
            logging.exception(e)
        # 量比超过1.1的不能买
        volumn_rate = l2_trade_factor.L2TradeFactorUtil.get_volumn_rate_by_code(code)
        if volumn_rate >= 1.1:
            return False, "最大量比超过1.1不能买"
        if volumn_rate >= 1.3:
            return False, "最大量比超过1.3不能买"
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2_data_manager.L2DataUtil.get_time_as_second(
@@ -496,15 +550,20 @@
            l2_data_manager.L2BetchCancelBigNumProcessor.del_recod(code)
            cls.debug(code, "执行撤单成功")
        except Exception as e:
            logging.exception(e)
            cls.debug(code, "执行撤单异常:{}", str(e))
    @classmethod
    def cancel_buy(cls, code, msg=None):
        can_cancel, reason = cls.__can_cancel(code)
        if not can_cancel:
            # 不能取消
            cls.cancel_debug(code, "撤单中断,原因:{}", reason)
            return
    def cancel_buy(cls, code, msg=None, source="l2"):
        # 是否是交易队列触发
        if source == "trade_queue":
            # 交易队列触发的需要下单后5s
            buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
            total_datas = local_today_datas[code]
            if buy_exec_index is not None and buy_exec_index > 0:
                now_time_str = datetime.datetime.now().strftime("%H:%M:%S")
                if tool.trade_time_sub(now_time_str, total_datas[buy_exec_index]["val"]["time"]) < 5:
                    return False
        l2_data_manager.L2ContinueLimitUpCountManager.del_data(code)
        if code in cls.unreal_buy_dict:
@@ -515,16 +574,26 @@
            l2_data_manager.TradePointManager.delete_compute_info_for_cancel_buy(code)
            l2_data_manager.TradePointManager.delete_count_info_for_cancel_buy(code)
        else:
            can_cancel, reason = cls.__can_cancel(code)
            if not can_cancel:
                # 不能取消
                cls.cancel_debug(code, "撤单中断,原因:{}", reason)
                cls.debug(code, "撤单中断,原因:{}", reason)
                return False
            cls.__cancel_buy(code)
        l2_data_manager.L2BigNumProcessor.del_big_num_pos(code)
        cls.debug(code, "执行撤单成功,原因:{}", msg)
        return True
    # 虚拟下单
    @classmethod
    def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time):
        cls.unreal_buy_dict[code] = (buy_exec_index, capture_time)
        SecondAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
        AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
        # 删除之前的板上卖信息
        L2LimitUpSellStatisticUtil.delete(code)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
@@ -600,10 +669,27 @@
            # 数据是否处理完毕
            if compute_index >= compute_end_index:
                need_cancel, cancel_data = SecondAverageBigNumComputer.need_cancel(code, buy_single_index,
                                                                                   compute_index,
                                                                                   buy_single_index, compute_index,
                                                                                   True)
                # 分钟级大单计算
                # need_cancel, cancel_data = AverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
                #                                                              buy_single_index, compute_index, True)
                cls.debug(code, "数据处理完毕,下单, 数据截图时间-{}", capture_time)
                # 数据已经处理完毕,如果还没撤单就实际下单
                cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
                if need_cancel:
                    if cls.cancel_buy(code, "分钟级大单撤销"):
                        # 执行撤单成功
                        pass
                else:
                    cls.__buy(code, capture_time, total_datas[compute_index], compute_index)
            else:
                # AverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
                #                                   buy_single_index, compute_index, False)
                SecondAverageBigNumComputer.need_cancel(code, buy_single_index, compute_index,
                                                        buy_single_index, compute_index, False)
                # 数据尚未处理完毕,进行下一步处理
                cls.debug(code, "数据尚未处理完毕,进行下一步处理,处理进度:{}", compute_index)
                # 处理撤单步骤
@@ -743,11 +829,23 @@
        buy_single_time_seconds = L2DataUtil.get_time_as_second(total_datas[buy_single_index]["val"]["time"])
        # 可以触发买
        trigger_buy = True
        place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
        max_space_time = 2
        if place_order_count <= 0:
            max_space_time = 2
        elif place_order_count <= 1:
            max_space_time = 6 - 1
        else:
            max_space_time = 9 - 1
        for i in range(compute_start_index, compute_end_index + 1):
            data = total_datas[i]
            _val = total_datas[i]["val"]
            trigger_buy = False
            # 必须为连续3秒内的数据
            if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > 2:
            if L2DataUtil.get_time_as_second(_val["time"]) - buy_single_time_seconds > max_space_time:
                TradePointManager.delete_buy_point(code)
                if i == compute_end_index:
                    # 数据处理完毕
@@ -761,42 +859,48 @@
            if L2DataUtil.is_limit_up_price_buy(_val):
                if cls.__is_big_money(limit_up_price, _val):
                    sub_threshold_count += int(total_datas[i]["re"])
                # 涨停买
                buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                buy_count += int(total_datas[i]["re"])
                if buy_nums >= threshold_num and buy_count >= get_threshold_count():
                    logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}, 大单数量:{}", code, i,
                                             buy_nums,
                                             threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
                    trigger_buy = True
                    # 只统计59万以上的金额
                    buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                    buy_count += int(total_datas[i]["re"])
                    if buy_nums >= threshold_num and buy_count >= get_threshold_count():
                        logger_l2_trade_buy.info("{}获取到买入执行点:{} 统计纯买手数:{} 目标纯买手数:{} 统计纯买单数:{} 目标纯买单数:{}, 大单数量:{}", code,
                                                 i,
                                                 buy_nums,
                                                 threshold_num, buy_count, get_threshold_count(), sub_threshold_count)
            elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                if cls.__is_big_money(limit_up_price, _val):
                    sub_threshold_count -= int(total_datas[i]["re"])
                # 涨停买撤
                # 判断买入位置是否在买入信号之前
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                 local_today_num_operate_map.get(code))
                if buy_index is not None:
                    # 找到买撤数据的买入点
                    if buy_index >= buy_single_index:
                        buy_nums -= int(_val["num"]) * int(data["re"])
                        buy_count -= int(data["re"])
                        cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                    else:
                        cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                        if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                            # 同一秒,当作买入信号之后处理
                if round(int(_val["num"]) * float(_val["price"])) >= 5900:
                    # 只统计59万以上的金额
                    # 涨停买撤
                    # 判断买入位置是否在买入信号之前
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(total_datas[i],
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None:
                        # 找到买撤数据的买入点
                        if buy_index >= buy_single_index:
                            buy_nums -= int(_val["num"]) * int(data["re"])
                            buy_count -= int(data["re"])
                            cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                else:
                    # 未找到买撤数据的买入点
                    cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                    buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                    buy_count -= int(total_datas[i]["re"])
                            cls.buy_debug(code, "{}数据在买入信号之后 撤买纯买手数:{} 目标手数:{}", i, buy_nums, threshold_num)
                        else:
                            cls.buy_debug(code, "{}数据在买入信号之前,买入位:{}", i, buy_index)
                            if total_datas[buy_single_index]["val"]["time"] == buy_data["val"]["time"]:
                                # 同一秒,当作买入信号之后处理
                                buy_nums -= int(_val["num"]) * int(data["re"])
                                buy_count -= int(data["re"])
                                cls.buy_debug(code, "{}数据买入位与预估买入位在同一秒", i)
                    else:
                        # 未找到买撤数据的买入点
                        cls.buy_debug(code, "未找到买撤数据的买入点: 位置-{} 数据-{}", i, data)
                        buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                        buy_count -= int(total_datas[i]["re"])
            cls.buy_debug(code, "位置-{},总手数:{},目标手数:{}", i,
                          buy_nums, threshold_num)
            # 有撤单信号,且小于阈值
            if buy_nums >= threshold_num and buy_count >= get_threshold_count():
            if buy_nums >= threshold_num and buy_count >= get_threshold_count() and trigger_buy:
                return i, buy_nums, buy_count, None
        cls.buy_debug(code, "尚未获取到买入执行点,起始计算位置:{} 统计纯买手数:{} 目标纯买手数:{}  统计纯买单数:{} 目标纯买单数:{} 大单数量:{}",
@@ -1194,7 +1298,7 @@
                        # 与执行位相隔>=5s时规则生效
                        if exec_time_offset >= 5:
                            cancel_index = i
                            cancel_msg = "封单金额小于1000万"
                            cancel_msg = "封单金额小于1000万,为{}".format(total_num)
                            break
                    # 相邻2s内的数据减小50%
                    # 上1s的总数
@@ -1227,14 +1331,15 @@
                # ------大单撤处理-------
                # if total_num < min_volumn_big:
                if exec_time_offset < 1800:
                    try:
                        b_need_cancel, b_cancel_index = AverageBigNumComputer.need_cancel(code, buy_exec_index, i, i)
                        if b_need_cancel:
                            cancel_index = b_cancel_index
                            cancel_msg = "1分钟内大单撤销比例触发阈值"
                            break
                    except Exception as e:
                        logging.exception(e)
                    pass
                    # try:
                    #     b_need_cancel, b_cancel_index = AverageBigNumComputer.need_cancel(code, i, i)
                    #     if b_need_cancel:
                    #         cancel_index = b_cancel_index
                    #         cancel_msg = "1分钟内大单撤销比例触发阈值"
                    #         break
                    # except Exception as e:
                    #     logging.exception(e)
                # 30分钟外才执行
                elif 1800 <= exec_time_offset <= 5400:
                    try:
@@ -1322,7 +1427,8 @@
        # 获取涨停卖的阈值
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
        threshold_num = int(zyltgb * 0.015) // (limit_up_price * 100)
        # 大于自由流通市值的4.8%
        threshold_num = int(zyltgb * 0.048) // (limit_up_price * 100)
        total_num = cls.__get_sell_data(code)
        cancel_index = None
        process_index = cls.__get_process_index(code)
@@ -1332,7 +1438,7 @@
                continue
            if i <= process_index:
                continue
            if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]):
            if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]) or L2DataUtil.is_sell(total_datas[i]["val"]):
                num = int(total_datas[i]["val"]["num"])
                cls.__incre_sell_data(code, num)
                total_num += num
@@ -1343,7 +1449,9 @@
            process_index = cancel_index
        else:
            process_index = end_index
        # 保存处理的位置
        L2TradeDataProcessor.cancel_debug(code, "板上卖信息:计算位置:{}-{} 板上卖数据{}/{}", start_index, end_index, total_num,
                                          threshold_num)
        cls.__save_process_index(code, process_index)
        if cancel_index is not None:
            return total_datas[cancel_index], "板上卖的手数{} 超过{}".format(total_num, threshold_num)
@@ -1355,6 +1463,259 @@
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = 123123
        cls.process(code, 126, 171, 126)
# s级平均大单计算
# 计算范围到申报时间的那一秒
class SecondAverageBigNumComputer:
    __redis_manager = redis_manager.RedisManager(0)
    __place_order_time_dict = {}
    @classmethod
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __save_average_data(cls, code, average_num, average_up_count, start_index, end_index):
        key = "s_average_big_num-{}".format(code)
        cls.__getRedis().setex(key, 2000, json.dumps((average_num, average_up_count, start_index, end_index)))
        L2TradeDataProcessor.cancel_debug(code, "保存秒级大单位置信息:平均手数-{} 大单数量-{} 计算开始范围-{}:{}".format(average_num,
                                                                                                 average_up_count,
                                                                                                 start_index,
                                                                                                 end_index))
    @classmethod
    def __get_average_data(cls, code):
        key = "s_average_big_num-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    # 保存买撤数据
    @classmethod
    def __save_cancel_data(cls, code, cancel_index):
        key = "s_average_big_num_comput_info-{}".format(code)
        cls.__getRedis().sadd(key, cancel_index)
    # 获取买撤的数据
    @classmethod
    def __get_cancel_datas(cls, code):
        key = "s_average_big_num_comput_info-{}".format(code)
        val = cls.__getRedis().smembers(key)
        return val
    # 保存买撤数据
    @classmethod
    def __save_apply_time(cls, code, time_str):
        key = "s_average_big_num_apply_time-{}".format(code)
        cls.__getRedis().setex(key, tool.get_expire(), time_str)
    # 获取买撤的数据
    @classmethod
    def __get_apply_time(cls, code):
        key = "s_average_big_num_apply_time-{}".format(code)
        val = cls.__getRedis().get(key)
        return val
    @classmethod
    def __clear_data(cls, code):
        key = "s_average_big_num_comput_info-{}".format(code)
        cls.__getRedis().delete(key)
        key = "s_average_big_num-{}".format(code)
        cls.__getRedis().delete(key)
    @classmethod
    def clear_data(cls):
        key = "s_average_big_num_comput_info-*"
        keys = cls.__getRedis().keys(key)
        for k in keys:
            cls.__getRedis().delete(k)
        key = "s_average_big_num-*"
        keys = cls.__getRedis().keys(key)
        for k in keys:
            cls.__getRedis().delete(k)
    # 计算平均手数
    # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
    @classmethod
    def compute_average_big_num(cls, code, buy_single_index, start_index, end_index):
        print("compute_average_big_num", code, buy_single_index, start_index, end_index)
        L2TradeDataProcessor.cancel_debug(code, "开始计算短大单位置")
        total_data = local_today_datas[code]
        num = 0
        count = 0
        apply_time_second = int(cls.get_apply_time(code).replace(":", ""))
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if int(val["time"].replace(":", "")) > apply_time_second:
                # 重新设置计算结束位置
                end_index = i - 1
                break
            if L2DataUtil.is_limit_up_price_buy(val):  # and float(val["price"]) * int(val["num"]) > 7500:
                # 75万以上的才参与计算平均大单
                count += data["re"]
                num += int(val["num"])
        # 如果没有找到75万以上的单就不添加75w的筛选条件
        if count == 0:
            for i in range(start_index, end_index + 1):
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy(val):
                    if int(val["time"].replace(":", "")) > apply_time_second:
                        break
                    # 75万以上的才参与计算平均大单
                    count += data["re"]
                    num += int(val["num"])
        average_num = num // count
        average_num = round(5900/ gpcode_manager.get_limit_up_price(code))
        average_up_count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                if int(val["time"].replace(":", "")) > apply_time_second:
                    break
                if int(val["num"]) >= average_num:
                    average_up_count += data["re"]
        print("平均手数:", average_num, "大单总数:", average_up_count)
        # 保存数据
        cls.__save_average_data(code, average_num, average_up_count, start_index, end_index)
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, need_cancel=True):
        average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
        if average_num is None:
            return False, None
        total_data = local_today_datas[code]
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        if buy_single_index == start_index:
            for i in range(buy_single_index - 1, 0, -1):
                data = total_data[i]
                val = data["val"]
                if val["time"] != total_data[buy_single_index]["val"]["time"]:
                    break
                if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["cancelTime"]) == 0:
                    # 涨停买撤销且撤销的间隔时间为0
                    # 查询买入信号,如果无法查询到或者是买入位置比买入信号小就不算
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                        # 在买入信号之后
                        cls.__save_cancel_data(code, i)
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            # print("处理进度", i)
            if L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 查询买入位置
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(
                                                                                     code))
                if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                    cls.__save_cancel_data(code, i)
        if need_cancel:
            # 计算买撤大单暂比
            cancel_datas = cls.__get_cancel_datas(code)
            if cancel_datas is not None and len(cancel_datas) > 0:
                cancel_rate_threshold = 0.49
                place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
                if place_order_count <= 1:
                    cancel_rate_threshold = 0.49
                elif place_order_count <= 2:
                    ancel_rate_threshold = 0.549
                else:
                    ancel_rate_threshold = 0.59
                cancel_indexs = []
                for index in cancel_datas:
                    cancel_indexs.append(int(index))
                cancel_indexs.sort()
                # print("取消的数据", cancel_indexs)
                cancel_count = 0
                for index in cancel_indexs:
                    data = total_data[index]
                    if int(data["val"]["num"]) >= average_num:
                        cancel_count += data["re"]
                        if cancel_count / average_up_count > cancel_rate_threshold:
                            return True, total_data[index]
        return False, None
    # 是否需要计算
    @classmethod
    def is_need_compute_average(cls, code, latest_data):
        total_datas = local_today_datas[code]
        data = cls.__place_order_time_dict.get(code)
        if data is None:
            return False, None, None
        elif tool.trade_time_sub(latest_data["val"]["time"], cls.get_apply_time(code)) < 5:
            # 有5s时间上传申报时间
            return True, data[1], data[2]
        else:
            cls.__place_order_time_dict.pop(code)
        return False, None, None
    # 设置申报时间
    @classmethod
    def set_apply_time(cls, code, time_str, force=False):
        old_time_str = cls.get_apply_time(code)
        if not force:
            if old_time_str is not None:
                sub_time = tool.trade_time_sub(time_str, old_time_str)
                if sub_time <= 0 or sub_time > 4:
                    # 申报时间与下单时间不能操过4s
                    return
        cls.__save_apply_time(code, time_str)
    @classmethod
    def get_apply_time(cls, code):
        return cls.__get_apply_time(code)
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index):
        cls.__clear_data(code)
        cls.__place_order_time_dict[code] = (t.time(), buy_single_index, buy_exec_index)
        # 以防万一,先保存下单信息
        total_data = local_today_datas[code]
        cls.set_apply_time(code, total_data[buy_exec_index]["val"]["time"], True)
        cls.compute_average_big_num(code, buy_single_index, buy_single_index, total_data[-1]["index"])
    @classmethod
    def __test(cls, datas):
        code = datas[0]
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = 123123
        # 先执行下单
        buy_single_index = datas[1]
        buy_exec_index = datas[2]
        local_today_datas[code] = local_today_datas[code][0:datas[4]]
        cls.place_order_success(code, buy_single_index, buy_exec_index)
        # 执行是否需要计算average
        cls.compute_average_big_num(code, buy_single_index, buy_single_index, datas[3])
        for i in range(buy_single_index, datas[4]):
            cancel, cancel_data = cls.need_cancel(code, i, i)
            if cancel:
                print("需要撤单", cancel, cancel_data["index"])
                break
    @classmethod
    def test(cls):
        cls.__test(("000716", 410, 420, 461, 536))
        # 代码 买入信号起始点  买入信息执行位置  计算末位 最远计算位置
        # cls.__test(("002793", 292, 308, 314, 410))
        # 执行是否需要撤销
# 平均大单计算
@@ -1384,19 +1745,18 @@
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    # 保存买撤数据
    @classmethod
    def __save_compute_info(cls, code, cancel_count, process_index):
    def __save_cancel_data(cls, code, cancel_index):
        key = "average_big_num_comput_info-{}".format(code)
        cls.__getRedis().setex(key, 2000, json.dumps((cancel_count, process_index)))
        cls.__getRedis().sadd(key, cancel_index)
    # 获取买撤的数据
    @classmethod
    def __get_compute_info(cls, code):
    def __get_cancel_datas(cls, code):
        key = "average_big_num_comput_info-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
        val = cls.__getRedis().smembers(key)
        return val
    @classmethod
    def __clear_data(cls, code):
@@ -1405,10 +1765,22 @@
        key = "average_big_num-{}".format(code)
        cls.__getRedis().delete(key)
    @classmethod
    def clear_data(cls):
        key = "average_big_num_comput_info-*"
        keys = cls.__getRedis().keys(key)
        for k in keys:
            cls.__getRedis().delete(k)
        key = "average_big_num-*"
        keys = cls.__getRedis().keys(key)
        for k in keys:
            cls.__getRedis().delete(k)
    # 计算平均手数
    # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
    @classmethod
    def compute_average_big_num(cls, code, buy_exec_index, start_index, end_index):
    def compute_average_big_num(cls, code, buy_single_index, start_index, end_index):
        print("compute_average_big_num", code, buy_single_index, start_index, end_index)
        L2TradeDataProcessor.cancel_debug(code, "开始计算短大单位置")
        total_data = local_today_datas[code]
        num = 0
@@ -1416,11 +1788,23 @@
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
            if L2DataUtil.is_limit_up_price_buy(val) and float(val["price"]) * int(val["num"]) >= 5000:
                # 75万以上的才参与计算平均大单
                count += data["re"]
                num += int(val["num"])
        average_num = num // count
        # 如果没有找到75万以上的单就不添加75w的筛选条件
        if count == 0:
            for i in range(start_index, end_index + 1):
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy(val):
                    # 75万以上的才参与计算平均大单
                    count += data["re"]
                    num += int(val["num"])
        average_num = num // count
        #average_num = 0
        average_num = round(5900 / gpcode_manager.get_limit_up_price(code))
        average_up_count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
@@ -1428,53 +1812,87 @@
            if L2DataUtil.is_limit_up_price_buy(val):
                if int(val["num"]) >= average_num:
                    average_up_count += data["re"]
        print("平均大单:", average_num, average_up_count)
        print("平均手数:", average_num, "大单总数:", average_up_count)
        # 保存数据
        cls.__save_average_data(code, average_num, average_up_count, start_index, end_index)
        cls.__save_compute_info(code, 0, buy_exec_index)
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index):
    def need_cancel(cls, code, buy_single_index, buy_exec_index, start_index, end_index, need_cancel=True):
        average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
        if average_num is None:
            return False, None
        cancel_count, process_index = cls.__get_compute_info(code)
        total_data = local_today_datas[code]
        try:
            for i in range(start_index, end_index + 1):
                if i <= buy_exec_index:
                    continue
                if process_index >= i:
                    continue
        # 如果start_index与buy_single_index相同,即是下单后的第一次计算
        # 需要查询买入信号之前的同1s是否有涨停撤的数据
        if buy_single_index == start_index:
            for i in range(buy_single_index - 1, 0, -1):
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["num"]) >= average_num:
                    # 查询买入位置
                if val["time"] != total_data[buy_single_index]["val"]["time"]:
                    break
                if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["cancelTime"]) == 0:
                    # 涨停买撤销且撤销的间隔时间为0
                    # 查询买入信号,如果无法查询到或者是买入位置比买入信号小就不算
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                        # 买入位置要在平均值计算范围内
                        # 在买入信号之后
                        cls.__save_cancel_data(code, i)
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            # print("处理进度", i)
            if L2DataUtil.is_limit_up_price_buy_cancel(val):
                # 查询买入位置
                buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                 local_today_num_operate_map.get(
                                                                                     code))
                if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                    cls.__save_cancel_data(code, i)
        if need_cancel:
            # 计算买撤大单暂比
            cancel_datas = cls.__get_cancel_datas(code)
            if cancel_datas is not None and len(cancel_datas) > 0:
                cancel_rate_threshold = 0.49
                place_order_count = trade_data_manager.placeordercountmanager.get_place_order_count(code)
                if place_order_count <=1:
                    cancel_rate_threshold=0.49
                elif place_order_count <=2:
                    ancel_rate_threshold = 0.549
                else:
                    ancel_rate_threshold = 0.59
                cancel_indexs = []
                for index in cancel_datas:
                    cancel_indexs.append(int(index))
                cancel_indexs.sort()
                # print("取消的数据", cancel_indexs)
                cancel_count = 0
                for index in cancel_indexs:
                    data = total_data[index]
                    if int(data["val"]["num"]) >= average_num:
                        cancel_count += data["re"]
                        process_index = i
                        print("撤销大单", cancel_count)
                        if cancel_count / average_up_count >= 0.49:
                            return True, i
        finally:
            cls.__save_compute_info(code, cancel_count, process_index)
                        if cancel_count / average_up_count > cancel_rate_threshold:
                            return True, total_data[index]
        return False, None
    # 是否需要计算
    @classmethod
    def is_need_compute_average(cls, code):
    def is_need_compute_average(cls, code, latest_data):
        total_datas = local_today_datas[code]
        data = cls.__place_order_time_dict.get(code)
        if data is None:
            return False, None, None
        elif t.time() - data[0] < 0.5:
            # 500ms内的数据才需要计算average
            cls.__place_order_time_dict.pop(code)
        elif tool.trade_time_sub(latest_data["val"]["time"], total_datas[data[2]]["val"]["time"]) < 3:
            # 3s内的数据才需要计算average
            return True, data[1], data[2]
        else:
            cls.__place_order_time_dict.pop(code)
        return False, None, None
    # 下单成功
@@ -1484,7 +1902,7 @@
        cls.__place_order_time_dict[code] = (t.time(), buy_single_index, buy_exec_index)
        # 以防万一,先保存下单信息
        total_data = local_today_datas[code]
        cls.compute_average_big_num(code, buy_exec_index, buy_single_index, total_data[-1]["index"])
        cls.compute_average_big_num(code, buy_single_index, buy_single_index, total_data[-1]["index"])
    @classmethod
    def __test(cls, datas):
@@ -1497,18 +1915,18 @@
        local_today_datas[code] = local_today_datas[code][0:datas[4]]
        cls.place_order_success(code, buy_single_index, buy_exec_index)
        # 执行是否需要计算average
        if cls.is_need_compute_average(code):
            cls.compute_average_big_num(code, buy_exec_index, buy_single_index, datas[3])
        for i in range(buy_exec_index, datas[4]):
            cancel, index = cls.need_cancel(code, buy_exec_index, i, i)
        cls.compute_average_big_num(code, buy_single_index, buy_single_index, datas[3])
        for i in range(buy_single_index, datas[4]):
            cancel, cancel_data = cls.need_cancel(code, i, i)
            if cancel:
                print("需要撤单", cancel, index)
                print("需要撤单", cancel, cancel_data["index"])
                break
    @classmethod
    def test(cls):
        # cls.__test(("601579", 311, 319, 347, 404))
        cls.__test(("601579", 311, 319, 327, 404))
        cls.__test(("000716", 410, 420, 461, 536))
        # 代码 买入信号起始点  买入信息执行位置  计算末位 最远计算位置
        # cls.__test(("002793", 292, 308, 314, 410))
        # 执行是否需要撤销
@@ -1645,6 +2063,7 @@
    def __test(cls, datas):
        code = datas[0]
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = random.randint(0, 100000)
        # 先执行下单
        buy_single_index = datas[1]
        buy_exec_index = datas[2]
@@ -1662,13 +2081,19 @@
    @classmethod
    def test(cls):
        cls.__test(("002528", 212, 219, 372, 601))
        cls.__test(("003005", 212, 219, 372, 601))
        # 代码 买入信号起始点  买入信息执行位置  计算末位 最远计算位置
        cls.__test(("002793", 292, 308, 332, 410))
        # 执行是否需要撤销
if __name__ == "__main__":
    L2LimitUpSellStatisticUtil.test()
    print(t.time())
    # AverageBigNumComputer.test()
    # LongAverageBigNumComputer.test()
    # L2TradeDataProcessor.test()
    load_l2_data("600213")
    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(local_today_datas["600213"][84],
                                                                     local_today_num_operate_map.get(
                                                                         "600213"))
    print(buy_index, buy_data)
l2_trade_test.py
@@ -1,14 +1,21 @@
# 交易测试
# 清除交易数据
import random
import unittest
from unittest import mock
import big_money_num_manager
import l2_data_manager
import l2_data_manager_new
import l2_trade_factor
import log
import redis_manager
import tool
import trade_manager
from l2_data_manager import TradePointManager
# from l2_data_manager_new import L2TradeDataProcessor, L2LimitUpMoneyStatisticUtil, AverageBigNumComputer
# from trade_queue_manager import THSBuy1VolumnManager
def clear_trade_data(code):
@@ -30,6 +37,43 @@
            continue
        redis_info.delete(k)
#
# class VirtualTrade(unittest.TestCase):
#     code = "002419"
#     clear_trade_data(code)
#     l2_data_manager.load_l2_data(code)
#     total_datas = l2_data_manager.local_today_datas[code]
#     pos_list = log.get_l2_process_position(code)
#     del pos_list[-1]
#     if pos_list[-1][1] < total_datas[-1]["index"]:
#         # 剩下的数据根据秒来分
#         start_index = -1
#         for i in range(pos_list[-1][1] + 1, total_datas[-1]["index"] + 1):
#             if total_datas[i]["val"]["time"] != total_datas[i - 1]["val"]["time"]:
#                 if start_index < 0:
#                     start_index = i
#                 else:
#                     pos_list.append((start_index, i - 1))
#                     start_index = i
#     if pos_list[-1][1] < total_datas[-1]["index"]:
#         pos_list.append((pos_list[-1][1] + 1, total_datas[-1]["index"]))
#     l2_data_manager_new.local_today_datas = {code: []}
#     l2_trade_factor.L2TradeFactorUtil.get_safe_buy_count = mock.Mock(return_value=30)
#     for indexs in pos_list:
#         L2TradeDataProcessor.random_key[code] = mock.Mock(return_value=random.randint(0, 100000))
#         # 设置封单额,获取买1量
#         for i in range(0, 100):
#             time_ = total_datas[indexs[0]]["val"]["time"]
#             time_s = tool.get_time_as_second(time_) - i - 1
#             volumn = THSBuy1VolumnManager().get_buy_1_volumn(code, tool.time_seconds_format(time_s))
#             if volumn is not None:
#                 l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(volumn),
#                                                                            tool.time_seconds_format(time_s))
#                 break
#
#         print("----------------处理位置", indexs)
#         L2TradeDataProcessor.process_add_datas(code, total_datas[indexs[0]:indexs[1] + 1], 0, 0)
# class TestTrade(unittest.TestCase):
@@ -58,5 +102,5 @@
#     print(buy_single_index, buy_exec_index, compute_index, num, count)
# if __name__ == "__main__":
#     unittest.main()
if __name__ == "__main__":
    unittest.main()
l2_trade_util.py
@@ -5,7 +5,6 @@
__redis_manager = redis_manager.RedisManager(2)
#  初始化禁止交易代码库
def init_forbidden_trade_codes():
    key = "forbidden-trade-codes"
@@ -43,5 +42,3 @@
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    return redis.sismember(key, code)
log.py
@@ -8,6 +8,7 @@
from loguru import logger
import gpcode_manager
import tool
@@ -39,7 +40,8 @@
        logger.add(self.get_path("l2", "l2_data"), filter=lambda record: record["extra"].get("name") == "l2_data",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_latest_data"), filter=lambda record: record["extra"].get("name") == "l2_latest_data",
        logger.add(self.get_path("l2", "l2_latest_data"),
                   filter=lambda record: record["extra"].get("name") == "l2_latest_data",
                   rotation="00:00", compression="zip", enqueue=True)
        # 显示在控制台
@@ -86,6 +88,10 @@
                   filter=lambda record: record["extra"].get("name") == "buy_1_volumn_record",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("ths", "day_volumn"),
                   filter=lambda record: record["extra"].get("name") == "day_volumn",
                   rotation="00:00", compression="zip", enqueue=True)
    def get_path(self, dir_name, log_name):
        return "D:/logs/gp/{}/{}".format(dir_name, log_name) + ".{time:YYYY-MM-DD}.log"
@@ -118,6 +124,8 @@
logger_buy_1_volumn = __mylogger.get_logger("buy_1_volumn")
logger_buy_1_volumn_record = __mylogger.get_logger("buy_1_volumn_record")
logger_day_volumn = __mylogger.get_logger("day_volumn")
class LogUtil:
@@ -183,9 +191,10 @@
    return tool.time_seconds_format(s - 2 - cha)
def load_l2_from_log():
def load_l2_from_log(date=None):
    today_data = {}
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    if  date is None:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    with open("D:/logs/gp/l2/l2_data.{}.log".format(date), mode='r') as f:
        while True:
            data = f.readline()
@@ -208,11 +217,78 @@
    return today_data
# 获取L2每次批量处理数据的位置范围
def get_l2_process_position(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    pos_list = []
    with open("D:/logs/gp/l2/l2_process.{}.log".format(date), mode='r', encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            if line.find("code:{}".format(code)) < 0:
                continue
            line = line[line.find("处理数据范围") + len("处理数据范围") + 1:line.find("处理时间")].strip()
            if len(pos_list) == 0 or pos_list[-1][1] < int(line.split("-")[0]):
                pos_list.append((int(line.split("-")[0]), int(line.split("-")[1])))
    return pos_list
# 获取L2每次批量处理数据的位置范围
def get_l2_trade_position(code, date=None):
    if not date:
        date = datetime.datetime.now().strftime("%Y-%m-%d")
    pos_list = []
    with open("D:/logs/gp/l2/l2_trade.{}.log".format(date), mode='r', encoding="utf-8") as f:
        while True:
            line = f.readline()
            if not line:
                break
            if line.find("code={}".format(code)) < 0:
                continue
            print(line)
            if line.find("获取到买入信号起始点") > 0:
                str_ = line.split("获取到买入信号起始点:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("信号起始位置:", index)
                pos_list.append((0, int(index), ""))
            elif line.find("获取到买入执行位置") > 0:
                str_ = line.split("获取到买入执行位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("买入执行位置:", index)
                pos_list.append((1, int(index), ""))
            elif line.find("触发撤单") > 0:
                str_ = line.split("触发撤单,撤单位置:")[1].strip()
                index = str_[0:str_.find(" ")].strip()
                # print("撤单位置:", index)
                pos_list.append((2, int(index), line.split("撤单原因:")[1]))
                pass
            else:
                continue
    return pos_list
def export_logs(code):
    code_name = gpcode_manager.get_code_name(code)
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    # 导出交易日志
    LogUtil.extract_log_from_key("code={}".format(code), "D:/logs/gp/l2/l2_trade.{}.log".format(date),
                                 "D:/logs/gp/l2/l2_trade.{}_{}.{}.log".format(code, code_name, date))
    # 导出取消日志
    LogUtil.extract_log_from_key("code={}".format(code), "D:/logs/gp/l2/l2_trade_cancel.{}.log".format(date),
                                 "D:/logs/gp/l2/l2_trade_cancel.{}_{}.{}.log".format(code, code_name, date))
    LogUtil.extract_log_from_key("{}".format(code), "D:/logs/gp/l2/l2_process.{}.log".format(date),
                                 "D:/logs/gp/l2/l2_process.{}_{}.{}.log".format(code, code_name, date))
if __name__ == '__main__':
    # logger_l2_process_time.info("test123")
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    LogUtil.extract_log_from_key("002193", "D:/logs/gp/l2/l2_trade_queue.{}.log".format(date),
                                 "D:/logs/gp/l2/l2_trade_queue{}.{}.log".format("002193", date))
    codes = ["603255", "600853", "000620", "002044", "001256"]
    for code in codes:
        export_logs(code)
    # parse_l2_data()
server.py
@@ -26,6 +26,7 @@
import ths_industry_util
import ths_util
import tool
import trade_data_manager
import trade_gui
import trade_manager
import l2_code_operate
@@ -96,7 +97,8 @@
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
                            _str)
                        # 间隔1s保存一条l2的最后一条数据
                        if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[code] >= 1000 and len(datas) > 0:
                        if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
                            code] >= 1000 and len(datas) > 0:
                            self.l2_save_time_dict[code] = origin_start_time
                            logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1])
@@ -157,7 +159,7 @@
                                                        "l2数据处理总耗时",
                                                        True)
                    except Exception as e:
                        logging.exception(e)
                        logger_l2_error.exception(e)
                elif type == 1:
                    # 设置股票代码
                    data_list, is_add = data_process.parseGPCode(_str)
@@ -219,6 +221,28 @@
                        # 保存委托信息
                        logger_trade_delegate.info(dataList)
                    try:
                        # 设置申报时间
                        for item in dataList:
                            apply_time = item["apply_time"]
                            if apply_time and len(apply_time) >= 8:
                                code = item["code"]
                                trade_state = trade_manager.get_trade_state(code)
                                # 设置下单状态的代码为已委托
                                if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                                    origin_apply_time = apply_time
                                    apply_time = apply_time[0:6]
                                    apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4], apply_time[4:6])
                                    ms = origin_apply_time[6:9]
                                    if int(ms) > 500:
                                        # 时间+1s
                                        apply_time = tool.trade_time_add_second(apply_time, 1)
                                    print(apply_time)
                                    l2_data_manager_new.SecondAverageBigNumComputer.set_apply_time(code, apply_time)
                    except Exception as e:
                        logging.exception(e)
                    try:
                        trade_manager.process_trade_delegate_data(dataList)
                    except Exception as e:
                        logging.exception(e)
@@ -251,11 +275,12 @@
                    if self.ths_l2_trade_queue_manager.save_recod(code, data):
                        if buy_time != "00:00:00":
                            logger_l2_trade_queue.info("{}-{}", code, data)
                            self.buy1_price_manager.save(code, buy_one_price)
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
                                                                                               int(buy_one_volumn),
                                                                                               buy_one_price)
                            if need_cancel:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg)
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn),
@@ -285,7 +310,6 @@
                elif type == 50:
                    data = data_process.parse(_str)["data"]
                    if data is not None:
                        print(data)
                        index = data["index"]
                        code_name = data["codeName"].replace(" ", "")
                        volumn = data["volumn"]
@@ -309,7 +333,7 @@
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn,
                                                                                               price)
                            if need_cancel:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg)
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
@@ -332,6 +356,29 @@
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
                    print("L2自启动成功", client_id)
                    now_str = datetime.datetime.now().strftime("%H:%M:%S")
                    ts = tool.get_time_as_second(now_str)
                    # 9点25到9点28之间的自启动就需要批量设置代码
                    if tool.get_time_as_second("09:24:50") <= ts <= tool.get_time_as_second("09:28:00"):
                        # 准备批量设置代码
                        return_json = {"code": 1, "msg": "等待批量设置代码"}
                        return_str = json.dumps(return_json)
                        # 获取排名前16位的代码
                        codes = trade_data_manager.CodeActualPriceProcessor().get_top_rate_codes(16)
                        codes = sorted(codes)
                        if client_id == 2:
                            codes = codes[:8]
                        else:
                            codes = codes[8:]
                        codes_datas = []
                        for i in range(0, 8):
                            if i >= len(codes):
                                break
                            codes_datas.append((i, codes[i]))
                        l2_code_operate.betch_set_client_codes(client_id, codes_datas)
                    else:
                        return_json = {"code": 0, "msg": "开启在线状态"}
                        return_str = json.dumps(return_json)
                    # print("心跳:", client_id)
                sk.send(return_str.encode())
tool.py
@@ -157,6 +157,17 @@
    return time_1 - time_2
# 交易时间加几s
def trade_time_add_second(time_str, second):
    ts = time_str.split(":")
    s_ = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
    s = s_ + second
    # 是否在11:30:00
    if s >= 11 * 3600 + 30 * 60 > s_:
        s += 90 * 60
    return time_seconds_format(s)
def compute_buy1_real_time(time_):
    ts = time_.split(":")
    s = int(ts[0]) * 3600 + int(ts[1]) * 60 + int(ts[2])
@@ -164,8 +175,21 @@
    return time_seconds_format(s - 2 - cha)
# 全角转半角
def strQ2B(ustring):
    rstring = ""
    for uchar in ustring:
        inside_code = ord(uchar)
        if inside_code == 12288:  # 全角空格直接转换
            inside_code = 32
        elif 65281 <= inside_code <= 65374:  # 全角字符(除空格)根据关系转化
            inside_code -= 65248
        rstring += chr(inside_code)
    return rstring
if __name__ == "__main__":
    print(trade_time_sub("11:29:59", "13:00:00"))
    print(trade_time_sub("11:29:59", "14:00:00"))
    print(trade_time_sub("10:29:59", "11:29:59"))
    print(trade_time_sub("13:29:59", "14:29:59"))
    print(trade_time_add_second("11:29:59", 1))
    print(trade_time_add_second("11:29:59", 5))
    print(trade_time_add_second("10:29:59", 10))
    print(trade_time_add_second("13:29:59", 60))
trade_data_manager.py
@@ -210,7 +210,22 @@
        count = self.__get_redis().get(key)
        return 0 if count is None else count
    # 保存当前涨幅
    def __save_current_rate(self, code, rate):
        key = "code_current_rate-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), rate)
    # 获取当前涨幅
    def __get_current_rate(self, code):
        key = "code_current_rate-{}".format(code)
        rate = self.__get_redis().get(key)
        if rate is not None:
            return float(rate)
        return None
    def process_rate(self, code, rate, time_str):
        # 保存目前的代码涨幅
        self.__save_current_rate(code, rate)
        # 9点半之前的数据不处理
        if int(time_str.replace(":", "")) < int("093000"):
            return
@@ -261,11 +276,55 @@
            return None
        return data[1]
    # 获取涨幅前几的代码
    def get_top_rate_codes(self, top_n):
        keys = "code_current_rate-*"
        keys = self.__get_redis().keys(keys)
        infos = []
        for k in keys:
            code = k.split("-")[1]
            rate = self.__get_current_rate(code)
            infos.append((code, rate))
        # 排序信息
        sorted_infos = sorted(infos, key=lambda tup: tup[1], reverse=True)
        sorted_infos = sorted_infos[:top_n]
        codes = []
        for data in sorted_infos:
            codes.append(data[0])
        return codes
# 涨停次数管理
class placeordercountmanager:
    __redisManager = redis_manager.RedisManager(0)
    @classmethod
    def __get_redis(cls):
        return cls.__redisManager.getRedis()
    @classmethod
    def __incre_place_order_count(cls, code):
        key = "place_order_count-{}".format(code)
        cls.__get_redis().incrby(key, 1)
        cls.__get_redis().expire(key, tool.get_expire())
    @classmethod
    def __get_place_order_count(cls, code):
        key = "place_order_count-{}".format(code)
        count = cls.__get_redis().get(key)
        if count is not None:
            return int(count)
        return 0
    @classmethod
    def place_order(cls, code):
        cls.__incre_place_order_count(code)
    @classmethod
    def get_place_order_count(cls, code):
        return cls.__get_place_order_count(code)
if __name__ == "__main__":
    processor = CodeActualPriceProcessor()
    processor.process_rate("123456", -0.2, "09:30:00")
    processor.process_rate("123456", -0.3, "09:40:00")
    processor.process_rate("123456", 0.3, "09:50:00")
    processor.is_under_water("123456")
    print(processor.get_top_rate_codes(30))
trade_gui.py
@@ -328,12 +328,14 @@
    # 撤买
    def cancel_buy(self, code):
        if constant.TEST:
            return
        self.buy_cancel_lock.acquire()
        code_input = 0
        try:
            logger_trade_gui.info("开始撤单:code-{}".format(code))
            win = self.cancel_win
            if win <= 0:
            if win <= 0 or not win32gui.IsWindowVisible(win):
                self.cancel_win = self.getCancelBuyWin()
                win = self.cancel_win
                if win <= 0:
@@ -776,7 +778,7 @@
        name = THSGuiUtil.getText(code_name_win)
        if name is not None:
            name=name.replace(" ","")
        return name
        return tool.strQ2B(name)
    @classmethod
    def fill_codes(cls, codes):
@@ -814,6 +816,7 @@
                    cls.cancel_distribute_win_for_code(code)
                else:
                    code_name = cls.__get_code_name(win)
                    #'深振业A'
                    if name_codes.get(code_name) != code:
                        cls.cancel_distribute_win_for_code(code)
                continue
@@ -848,7 +851,7 @@
if __name__ == '__main__':
    THSGuiTrade().buy("002900", "16.18")
    THSGuiTrade().cancel_buy("000419")
    # GUITest().test_distribute()
    # try:
    #     THSGuiUtil.set_buy_window_code(0x000112D0, "000333")
trade_manager.py
@@ -5,11 +5,12 @@
# 交易管理器
import time
import constant
import gpcode_manager
import l2_trade_util
import mysql_data
import trade_data_manager
from trade_gui import THSBuyWinManagerNew,THSGuiTrade
from trade_gui import THSBuyWinManagerNew, THSGuiTrade
import time as t
import l2_data_manager
@@ -83,12 +84,24 @@
    time_str = datetime.datetime.now().strftime("%H:%M:%S")
    redis.setex("trade-success-latest-time", tool.get_expire(), time_str)
    mysqldb = mysql_data.Mysqldb()
    # 合并同一合同编号
    dict_ = {}
    for data in datas:
        trade_num = data["trade_num"]
        if trade_num not in dict_:
            dict_[trade_num] = data
        else:
            # 合并成交数量与成交金额
            dict_[trade_num]["num"] = int(dict_[trade_num]["num"]) + int(data["num"])
            dict_[trade_num]["money"] = round(float(dict_[trade_num]["money"]) + float(data["money"]), 3)
    for key in dict_:
        data= dict_[key]
        _time = data["time"]
        # 过滤错误数据
        if _time == "00:00:00":
            continue
        data["_id"] = data["trade_num"]
        data["_id"] = "{}_{}".format(day, data["trade_num"])
        data["day"] = day
        data["create_time"] = int(round(t.time() * 1000))
        counts = mysqldb.select_one("select count(*) from ths_trade_success_record where _id='{}'".format(data["_id"]))
@@ -186,6 +199,8 @@
# 购买
@tool.async_call
def __buy(code, price, trade_state, capture_timestamp, last_data, last_data_index):
    if constant.TEST:
        return
    try:
        guiTrade.buy(code, price)
        __place_order_success(code, capture_timestamp, last_data, last_data_index)
@@ -203,7 +218,8 @@
    # 下单成功,加入固定代码库
    l2_data_manager.add_to_l2_fixed_codes(code)
    # 记录下单的那一帧图片的截图时间与交易用时
    trade_data_manager.TradeBuyDataManager.set_buy_position_info(code, capture_timestamp, use_time, last_data, last_data_index)
    trade_data_manager.TradeBuyDataManager.set_buy_position_info(code, capture_timestamp, use_time, last_data,
                                                                 last_data_index)
    print("买入结束")
    logger_trade.info("{}买入成功".format(code))
@@ -264,8 +280,7 @@
                l2_data_manager.TradePointManager.delete_buy_point(code)
                # 移除交易窗口分配
                THSBuyWinManagerNew.cancel_distribute_win_for_code(code)
                #TODO 完全成交后移除L2
                # TODO 完全成交后移除L2
# 处理委托成功数据
@@ -324,6 +339,5 @@
if __name__ == "__main__":
    # time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    # print(time_str)
    __clear_data("002388")
    # __clear_big_data()
    pass
trade_queue_manager.py
@@ -58,6 +58,10 @@
        key = "buy1_volumn_codes"
        return self.__get_redis().smembers(key)
    def get_buy_1_volumn(self, code, time_str):
        key = "buy1_volumn-{}-{}".format(code, time_str)
        return self.__get_redis().get(key)
    # 返回是否需要更新数据,是否需要撤单,撤单原因
    def save(self, code, time_str, volumn, price):
        # 客户端数据未加载出来过滤
@@ -107,11 +111,12 @@
                # 下降趋势
                if volumn < last_volumn:
                    if (last_volumn - volumn) / last_volumn > 0.5:
                        return True, True, "连续两次封单量降幅达50%以上,时间:{} 封单量:{}-{}".format(time_str, last_volumn, volumn)
                        return True, True, "买1主动触发,连续两次封单量降幅达50%以上,时间:{} 封单量:{}-{}".format(time_str, last_volumn,
                                                                                           volumn)
                    # 当封单额小于1000万需要撤单
                    min_num = 10000000 // (limit_up_price * 100)
                    if volumn < min_num:
                        return True, True, "最新封单额小于1000万,时间:{} 封单量:{}".format(time_str, volumn)
                        return True, True, "买1主动触发,最新封单额小于1000万,时间:{} 封单量:{}".format(time_str, volumn)
        return True, False, None
@@ -219,10 +224,25 @@
            return None
        return json.loads(val)
        # 添加记录
    def __add_buy1_code(self, code):
        key = "buy1_volumn_codes"
        self.__get_redis().sadd(key, code)
        self.__get_redis().expire(key, 10)
        # 获取当前正在监听的代码
    def get_current_codes(self):
        key = "buy1_volumn_codes"
        return self.__get_redis().smembers(key)
    def save_recod(self, code, data):
        _str = json.dumps(data)
        if code in self.__filter_dict and self.__filter_dict[code] == _str:
            return False
        # 添加买1记录
        self.__add_buy1_code(code)
        self.__filter_dict[code] = _str
        self.__save_latest_recod(code, data)
        buy_time = data["buyTime"]