Administrator
2022-12-18 86e0061f9cf211b98252a9e6b71d6c9801e4a16b
撤单策略再次修改
19个文件已修改
2个文件已添加
1069 ■■■■ 已修改文件
data_export_util.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_process.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gpcode_manager.py 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
industry_codes_sort.py 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_code_operate.py 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager_new.py 567 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_test.py 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_factor.py 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_test.py 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_util.py 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
limit_up_time_manager.py 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 55 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 63 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
settings.py 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_data_manager.py 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_gui.py 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_queue_manager.py 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py
@@ -35,13 +35,15 @@
        cancel_time = data["val"]["cancelTime"]
        if cancel_time == '0':
            cancel_time = ''
        else:
            cancel_time= "{}".format(cancel_time)
        if len(cancel_time) > 0:
            if int(data["val"]["cancelTimeUnit"]) == 0:
                cancel_time += "s";
                cancel_time += "s"
            elif int(data["val"]["cancelTimeUnit"]) == 1:
                cancel_time += "m";
                cancel_time += "m"
            elif int(data["val"]["cancelTimeUnit"]) == 2:
                cancel_time += "h";
                cancel_time += "h"
        ws.write(index, 2, cancel_time)
        ws.write(index, 3, data["val"]["price"])
data_process.py
@@ -3,9 +3,7 @@
import logging
import redis_manager
__redisManager = redis_manager.RedisManager(0)
def parse(str):
@@ -30,7 +28,8 @@
def parseGPCode(str):
    dict = json.loads(str)
    data = dict["data"]
    return data
    add = dict.get("add")
    return data, add
def parseList(str):
@@ -46,7 +45,6 @@
    _dict = json.loads(str)
    data = _dict["data"]
    return data
if __name__ == '__main__':
gpcode_manager.py
@@ -14,7 +14,7 @@
__redisManager = redis_manager.RedisManager(0)
def set_gp_list(code_datas):
def __parse_codes_data(code_datas):
    codes = []
    name_codes = {}
    for _data in code_datas:
@@ -26,7 +26,11 @@
                codes.append(code)
                # 保存代码对应的名称
                name_codes[name] = code
    return codes, name_codes
def set_gp_list(code_datas):
    codes, name_codes = __parse_codes_data(code_datas)
    redis_instance = __redisManager.getRedis()
    # 删除之前的
    redis_instance.delete("gp_list")
@@ -36,6 +40,21 @@
    redis_instance.set("gp_list_names", json.dumps(name_codes))
# 新增代码
def add_gp_list(code_datas):
    redis_instance = __redisManager.getRedis()
    codes, name_codes = __parse_codes_data(code_datas)
    for d in codes:
        redis_instance.sadd("gp_list", d)
    old_name_codes = get_name_codes()
    if old_name_codes is None:
        old_name_codes = name_codes
    else:
        for key in name_codes:
            old_name_codes[key] = name_codes[key]
    redis_instance.set("gp_list_names", json.dumps(old_name_codes))
# 获取名称对应的代码
def get_name_code(name):
    redis_instance = __redisManager.getRedis()
gui.py
@@ -14,6 +14,7 @@
import mysql_data
import redis_manager
import server
import settings
import trade_gui
from juejin import JueJinManager
from l2_code_operate import L2CodeOperate
@@ -498,12 +499,26 @@
        def init():
            juejin.everyday_init()
        def set_accept_l2():
            settings.set_accept_l2(accept_l2.get())
        width = 800
        height = 290
        frame = Frame(root, {"height": height, "width": width, "bg": "#DDDDDD"})
        cl = Label(frame, text="L2采集状态", bg="#DDDDDD")
        cl.place(x=5, y=5)
        accept_l2 = IntVar()
        ch_accept_l2 = Checkbutton(frame, text='接受l2数据', variable=accept_l2, onvalue=1, offvalue=0, background="#DDDDDD",activebackground="#DDDDDD",command=set_accept_l2)
        ch_accept_l2.place(x=width - 350, y=5)
        if settings.is_accept_l2_data():
            accept_l2.set(1)
        else:
            accept_l2.set(0)
        btn = Button(frame, text="每日初始化", command=init)
        btn.place(x=width - 250, y=5)
industry_codes_sort.py
New file
@@ -0,0 +1,41 @@
# 板块中的代码强度排序
# 板块强度排序,根据当前时间是否涨停,涨停时间排序
import functools
import global_util
import limit_up_time_manager
import trade_data_manager
__codeActualPriceProcessor = trade_data_manager.CodeActualPriceProcessor()
# 同一板块的数据排序
def sort_codes(codes, target_code):
    def cmp(a, b):
        la = 1 if a[2] else 0
        lb = 1 if b[2] else 0
        if la != lb:
            return lb - la
        return int(a[1].replace(":", "")) - int(b[1].replace(":", ""))
    if not global_util.limit_up_time:
        limit_up_time_manager.load_limit_up_time()
    list = []
    for code in codes:
        limit_up_time = global_util.limit_up_time.get(code)
        if limit_up_time is not None:
            is_limit_up = __codeActualPriceProcessor.current_is_limit_up(code)
            if code == target_code:
                is_limit_up = True
            list.append((code, limit_up_time,is_limit_up))
    new_s = sorted(list, key=functools.cmp_to_key(cmp))
    dict_ = {}
    # 相同值为同一排序
    sort_index = 0
    for i in range(0, len(new_s)):
        if new_s[i - 1][1] != new_s[i][1] and i > 0:
            sort_index += 1
        dict_[new_s[i][0]] = sort_index
    return dict_
juejin.py
@@ -31,7 +31,7 @@
import trade_gui
from l2_code_operate import L2CodeOperate
from l2_data_manager import L2DataUtil
import l2_data_manager_new
from log import logger_juejin_tick, logger_system
from trade_data_manager import CodeActualPriceProcessor
from trade_queue_manager import JueJinBuy1VolumnManager
@@ -58,6 +58,8 @@
def init_data():
    # 删除所有的涨停卖数据
    l2_data_manager_new.L2LimitUpSellStatisticUtil.clear()
    # 重置所有的大单数据
    big_money_num_manager.reset_all()
    # 清除水下捞数据
l2_code_operate.py
@@ -80,7 +80,7 @@
                if data is not None:
                    data = json.loads(data)
                    # logger_code_operate.info("读取操作队列:{}", data)
                    type, code = data["type"], data["code"]
                    type, code = data["type"], data.get("code")
                    create_time = data.get("create_time")
                    if create_time is not None:
                        # 设置10s超时时间
@@ -105,6 +105,14 @@
                            client_id, pos = gpcode_manager.get_can_listen_pos()
                            if pos is not None and client_id is not None:
                                L2CodeOperate.setGPCode(client_id, pos, code)
                    elif type == 10:
                        # 批量设置代码,通常在9:25-9:27期间设置
                        client_id = data.get("client_id")
                        codes = data[codes]
                        # TODO 需要完善分配
                    # 强制设置
                    elif type == 2:
                        client_id = data["client"]
@@ -262,4 +270,14 @@
if __name__ == "__main__":
    correct_client_codes()
    codes = [(0, "002210"), (1, "600056"), (2, "002591"), (3, "002193"), (4, "603186"), (5, "600833"), (6, "000736"),
             (7, "603000")]
    datas = []
    for item in codes:
        datas.append({"index": item[0], "code": item[1]})
    data = {"action": "betchSetGPCodes", "data": datas}
    try:
        result = server.send_msg(3, data)
        print("设置结果:", result)
    except Exception as e:
        logging.exception(e)
l2_data_manager.py
@@ -11,20 +11,21 @@
import big_money_num_manager
import code_data_util
import constant
import data_process
import global_data_loader
import global_util
import industry_codes_sort
import l2_data_log
import l2_data_util
import gpcode_manager
import l2_trade_factor
import log
import redis_manager
import ths_industry_util
import tool
import trade_manager
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process,logger_l2_data
import trade_data_manager
import limit_up_time_manager
@@ -187,16 +188,23 @@
        # 获取今日的数据
    if local_today_datas.get(code) is None or force:
        datas = log.load_l2_from_log()
        datas = datas.get(code)
        if datas is None:
        datas = []
        keys = redis.keys("l2-{}-*".format(code))
        for k in keys:
            value = redis.get(k)
            _data = l2_data_util.l2_data_key_2_obj(k, value)
            datas.append(_data)
        # 排序
        new_datas = sorted(datas,
                           key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
        local_today_datas[code] = new_datas
        local_today_datas[code] = datas
        # 从数据库加载
        # datas = []
        # keys = redis.keys("l2-{}-*".format(code))
        # for k in keys:
        #     value = redis.get(k)
        #     _data = l2_data_util.l2_data_key_2_obj(k, value)
        #     datas.append(_data)
        # # 排序
        # new_datas = sorted(datas,
        #                    key=lambda e: (int(e.__getitem__('val')["time"].replace(":", "")), e.__getitem__('index')))
        # local_today_datas[code] = new_datas
    # 根据今日数据加载
    l2_data_util.load_num_operate_map(local_today_num_operate_map, code, local_today_datas.get(code), force)
@@ -268,6 +276,10 @@
        # 设置进内存
        local_latest_datas[code] = datas
        __set_l2_data_latest_count(code, len(datas))
        try:
            logger_l2_data.info("{}-{}",code,add_datas)
        except Exception as e:
            logging.exception(e)
        saveL2Data(code, add_datas)
@@ -784,7 +796,7 @@
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
        codes_index = industry_codes_sort.sort_codes(codes,code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
l2_data_manager_new.py
@@ -8,6 +8,7 @@
import code_data_util
import global_util
import gpcode_manager
import industry_codes_sort
import l2_data_log
import l2_data_manager
import l2_data_util
@@ -17,12 +18,12 @@
import redis_manager
import ths_industry_util
import tool
import trade_data_manager
import trade_manager
import trade_queue_manager
from l2_data_manager import L2DataException, TradePointManager, local_today_datas, L2DataUtil, load_l2_data, \
    local_today_num_operate_map
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_buy_1_volumn
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process, logger_buy_1_volumn, \
    logger_l2_error
# TODO l2数据管理
from trade_data_manager import CodeActualPriceProcessor
@@ -154,6 +155,7 @@
    l2BigNumForMProcessor = L2BigNumForMProcessor()
    __codeActualPriceProcessor = CodeActualPriceProcessor()
    buy1PriceManager = trade_queue_manager.Buy1PriceManager()
    __ths_l2_trade_queue_manager = trade_queue_manager.thsl2tradequeuemanager()
    @classmethod
    def debug(cls, code, content, *args):
@@ -192,6 +194,18 @@
                        local_today_datas[code]) > 0:
                    _start_index = local_today_datas[code][-1]["index"] + 1
                add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
                # ---------- 判断是否需要计算大单 -----------
                try:
                    average_need, buy_single_index, buy_exec_index = AverageBigNumComputer.is_need_compute_average(code)
                    # 计算平均大单
                    if average_need:
                        end_index = local_today_datas[code][-1]["index"]
                        if len(add_datas) > 0:
                            end_index = add_datas[-1]["index"]
                        AverageBigNumComputer.compute_average_big_num(code, buy_exec_index, buy_single_index, end_index)
                except Exception as e:
                    logging.exception(e)
                # -------------数据增量处理------------
                if len(add_datas) > 0:
                    # 拼接数据
                    local_today_datas[code].extend(add_datas)
@@ -208,6 +222,7 @@
                total_datas = local_today_datas[code]
                __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据预处理时间")
                try:
                if len(add_datas) > 0:
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
@@ -223,10 +238,12 @@
                        else:
                            # 未挂单
                            cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{} 截图时间戳:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time,
                                           capture_timestamp)
                    __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time, "l2数据处理时间")
                finally:
                # 保存数据
                l2_data_manager.save_l2_data(code, datas, add_datas)
                __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
@@ -239,13 +256,18 @@
    # 处理未挂单
    @classmethod
    def __process_not_order(cls, code, start_index, end_index, capture_time):
        __start_time = t.time()
        __start_time = round(t.time() * 1000)
        # 获取阈值
        threshold_money, msg = cls.__get_threshmoney(code)
        if round(t.time() * 1000) - __start_time > 10:
            __start_time = l2_data_log.l2_time(code, round(t.time() * 1000) - __start_time,
                                               "获取m值数据耗时")
        cls.__start_compute_buy(code, start_index, end_index, threshold_money, capture_time)
    # 测试专用
    @classmethod
    def process_order(cls, code, start_index, end_index, capture_time, new_add=True):
        cls.__process_order(code, start_index, end_index, capture_time, new_add)
    # 处理已挂单
    @classmethod
@@ -260,6 +282,14 @@
        # 撤单计算,只看买1
        cancel_data, cancel_msg = L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
                                                                           buy_single_index, buy_exec_index)
        if not cancel_data:
            # 统计板上卖
            try:
                cancel_data, cancel_msg = L2LimitUpSellStatisticUtil.process(code, start_index, end_index,
                                                                             buy_exec_index)
            except Exception as e:
                logging.exception(e)
        # 计算m值大单
        cls.l2BigNumForMProcessor.process(code, max(buy_single_index, start_index), end_index,
@@ -281,6 +311,12 @@
                cls.__buy(code, unreal_buy_info[1], local_today_datas[code][unreal_buy_info[0]],
                          unreal_buy_info[0])
        # 判断是否需要计算长大单的信息
        try:
            LongAverageBigNumComputer.compute_average_big_num(code, buy_single_index, buy_exec_index)
        except Exception as e:
            logging.exception(e)
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        can, reason = cls.__can_buy(code)
@@ -300,6 +336,14 @@
                cls.debug(code, "开始执行买入")
                trade_manager.start_buy(code, capture_timestamp, last_data,
                                        last_data_index)
                # 获取买入位置信息
                try:
                    buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
                    AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                    LongAverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
                except Exception as e:
                    logging.exception(e)
                    logger_l2_error.exception(e)
                l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
                cls.debug(code, "执行买入成功")
            except Exception as e:
@@ -317,7 +361,7 @@
            industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
            if industry is None:
                return True, "没有获取到行业"
            codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
            codes_index = industry_codes_sort.sort_codes(codes, code)
            if codes_index is not None and codes_index.get(code) is not None:
                # 同一板块中老二后面的不能买
                if codes_index.get(code) == 0:
@@ -348,11 +392,33 @@
        #     return False, "尚未获取到涨停价"
        # if abs(float(buy1_price) - float(limit_up_price)) >= 0.01:
        #     return False, "买1价不为涨停价,买1价-{} 涨停价-{}".format(buy1_price, limit_up_price)
        # 从买入信号起始点到当前数据末尾的纯买手数与当前的卖1做比较,如果比卖1小则不能买入
        try:
            sell1_time, sell1_price, sell1_volumn = cls.__ths_l2_trade_queue_manager.get_sell1_info(code)
            cls.buy_debug(code, "卖1信息为:({},{},{})", sell1_time, sell1_price, sell1_volumn)
            if sell1_time is not None and sell1_volumn > 0:
                # 获取执行位信息
                total_datas = local_today_datas[code]
                buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
                buy_nums = num
                for i in range(buy_exec_index + 1, total_datas[-1]["index"] + 1):
                    _val = total_datas[i]["val"]
                    # 涨停买
                    if L2DataUtil.is_limit_up_price_buy(_val):
                        # 涨停买
                        buy_nums += int(_val["num"]) * int(total_datas[i]["re"])
                    elif L2DataUtil.is_limit_up_price_buy_cancel(_val):
                        buy_nums -= int(_val["num"]) * int(total_datas[i]["re"])
                if buy_nums < sell1_volumn:
                    return False, "纯买量({})小于卖1量{} 卖1时间:{}".format(buy_nums, sell1_volumn, sell1_time)
        except Exception as e:
            logging.exception(e)
        # 量比超过1.3的不能买
        # 量比超过1.1的不能买
        volumn_rate = l2_trade_factor.L2TradeFactorUtil.get_volumn_rate_by_code(code)
        if volumn_rate >= 1.3:
            return False, "最大量比超过1.3不能买"
        if volumn_rate >= 1.1:
            return False, "最大量比超过1.1不能买"
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and l2_data_manager.L2DataUtil.get_time_as_second(
@@ -364,8 +430,11 @@
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
        codes_index = industry_codes_sort.sort_codes(codes, code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            # 当老大老二当前没涨停
            return False, "同一板块中老三,老四,...不能买"
        if cls.__codeActualPriceProcessor.is_under_water(code):
@@ -378,14 +447,13 @@
                return False, "水下捞,不是老大,是老{}".format(codes_index.get(code))
        # 13:30后涨停,本板块中涨停票数<29不能买
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None:
            if int(limit_up_time.replace(":", "")) >= 133000 and global_util.industry_hot_num.get(industry) is not None:
                if global_util.industry_hot_num.get(industry) < 16:
                    return False, "13:30后涨停,本板块中涨停票数<16不能买"
        if codes_index.get(code) is not None and codes_index.get(code) == 1:
            # 如果老大已经买成功了,老二就不需要买了
            # 如果老大已经买成功了, 老二就不需要买了
            first_codes = []
            for key in codes_index:
                if codes_index.get(key) == 0:
@@ -396,15 +464,15 @@
                if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # 老大已经买成功了
                    return False, "老大{}已经买成功,老二无需购买".format(key)
            # 有9点半涨停的老大才能买老二,不然不能买
            # 获取老大的涨停时间
            for key in first_codes:
                # 找到了老大
                time_ = limit_up_time_manager.get_limit_up_time(key)
                if time_ == "09:30:00":
                    return True, "9:30涨停的老大,老二可以下单"
            return False, "老大非9:30涨停,老二不能下单"
            #
            # # 有9点半涨停的老大才能买老二,不然不能买
            # # 获取老大的涨停时间
            # for key in first_codes:
            #     # 找到了老大
            #     time_ = limit_up_time_manager.get_limit_up_time(key)
            #     if time_ == "09:30:00":
            #         return True, "9:30涨停的老大,老二可以下单"
            # return False, "老大非9:30涨停,老二不能下单"
        # 过时  老二,本板块中涨停票数<29 不能买
        # if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
@@ -452,12 +520,18 @@
        l2_data_manager.L2BigNumProcessor.del_big_num_pos(code)
        cls.debug(code, "执行撤单成功,原因:{}", msg)
    # 虚拟下单
    @classmethod
    def __virtual_buy(cls, code, buy_single_index, buy_exec_index, capture_time):
        cls.unreal_buy_dict[code] = (buy_exec_index, capture_time)
        AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
                            new_add=True):
        if compute_end_index < compute_start_index:
            return
        _start_time = t.time()
        _start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        # 获取买入信号计算起始位置
        buy_single_index, buy_exec_index, buy_compute_index, num, count = cls.__get_order_begin_pos(code)
@@ -514,7 +588,7 @@
            # 如果是今天第一次有下单执行信号,涨停时间(买入执行位时间)
            limit_up_time_manager.save_limit_up_time(code, total_datas[compute_index]["val"]["time"])
            # 虚拟下单
            cls.unreal_buy_dict[code] = (compute_index, capture_time)
            cls.__virtual_buy(code, buy_single_index, compute_index, capture_time)
            # 删除之前的所有撤单信号
            l2_data_manager.TradePointManager.delete_buy_cancel_point(code)
@@ -632,8 +706,8 @@
                count = 3
            count = round(count * buy1_factor)
            # 最高30笔,最低8笔
            if count > 30:
                count = 30
            if count > 21:
                count = 21
            if count < 8:
                count = 8
            return count
@@ -812,7 +886,7 @@
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
        codes_index = industry_codes_sort.sort_codes(codes, code)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
@@ -833,37 +907,52 @@
                    return False, "13:30后涨停,本板块中涨停票数<16不能买"
        if codes_index.get(code) is not None and codes_index.get(code) == 1:
            # ----此条注释-----
            # 如果老大已经买成功了,老二就不需要买了
            first_codes = []
            for key in codes_index:
                if codes_index.get(key) == 0:
                    first_codes.append(key)
            # first_codes = []
            # for key in codes_index:
            #     if codes_index.get(key) == 0:
            #         first_codes.append(key)
            #
            # for key in first_codes:
            #     state = trade_manager.get_trade_state(key)
            #     if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
            #         # 老大已经买成功了
            #         return False, "老大{}已经买成功,老二无需购买".format(key)
            # ----此条注释-----
            for key in first_codes:
                state = trade_manager.get_trade_state(key)
                if state == trade_manager.TRADE_STATE_BUY_SUCCESS:
                    # 老大已经买成功了
                    return False, "老大{}已经买成功,老二无需购买".format(key)
            # ----此条注释-----
            # 有9点半涨停的老大才能买老二,不然不能买
            # 获取老大的涨停时间
            for key in first_codes:
                # 找到了老大
                time_ = limit_up_time_manager.get_limit_up_time(key)
                if time_ == "09:30:00":
                    return True, "9:30涨停的老大,老二可以下单"
            return False, "老大非9:30涨停,老二不能下单"
            # for key in first_codes:
            #     # 找到了老大
            #     time_ = limit_up_time_manager.get_limit_up_time(key)
            #     if time_ == "09:30:00":
            #         return True, "9:30涨停的老大,老二可以下单"
            # return False, "老大非9:30涨停,老二不能下单"
            # ----此条注释-----
            return True, "老二可以下单"
    @classmethod
    def test3(cls):
        code = "002693"
        code = "002094"
        load_l2_data(code, True)
        start_index = 334
        end_index = 341
        buy_single_index = 152
        cls.random_key[code] = random.randint(0, 100000)
        L2LimitUpMoneyStatisticUtil.process_data(code, start_index, end_index,
                                                 buy_single_index)
        buy_single_begin_index, buy_exec_index = 426, 479
        L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519,
                                                 buy_single_begin_index, buy_exec_index, False)
        L2LimitUpMoneyStatisticUtil.process_data(code, 480, 519,
                                                 buy_single_begin_index, buy_exec_index, False)
    @classmethod
    def test_can_buy(cls):
        code = "002923"
        load_l2_data(code, True)
        limit_up_time_manager.load_limit_up_time()
        can, msg = cls.__can_buy(code)
        print(can, msg)
# 涨停封单额统计
@@ -989,6 +1078,8 @@
    # with_cancel 是否需要判断是否撤销
    @classmethod
    def process_data(cls, code, start_index, end_index, buy_single_begin_index, buy_exec_index, with_cancel=True):
        if buy_single_begin_index is None or buy_exec_index is None:
            return None, None
        start_time = round(t.time() * 1000)
        total_datas = local_today_datas[code]
        time_dict_num = {}
@@ -1036,6 +1127,7 @@
        # 待计算量
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        min_volumn = round(10000000 / (limit_up_price * 100))
        min_volumn_big = min_volumn * 5
        # 不同时间的数据开始坐标
        time_start_index_dict = {}
        # 数据时间分布
@@ -1045,18 +1137,19 @@
        # 大单撤销笔数
        cancel_big_num_count = 0
        buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index])
        buy_exec_time = tool.get_time_as_second(total_datas[buy_exec_index]["val"]["time"])
        # 从同花顺买1矫正过后的位置开始计算,到end_index结束
        for i in range(index + 1, end_index + 1):
            data = total_datas[i]
            # 统计撤销数量
            try:
            if big_money_num_manager.is_big_num(data["val"]):
                if L2DataUtil.is_limit_up_price_buy_cancel(data["val"]):
                    cancel_big_num_count += int(data["re"])
                    # TODO 大量重复的工作需要处理,可以暂存在内存中,从而减少计算
                    # 获取是否在买入执行信号周围2s
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data["val"],
                        buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and buy_data is not None:
@@ -1067,6 +1160,8 @@
                elif L2DataUtil.is_limit_up_price_buy(data["val"]):
                    cancel_big_num_count -= int(data["re"])
            except Exception as e:
                logging.exception(e)
            threshold_rate = 0.5
            if cancel_big_num_count >= 0:
@@ -1084,14 +1179,20 @@
                # 上一段时间的总数
                time_total_num_dict[time_] = total_num
            exec_time_offset = tool.trade_time_sub(data["val"]["time"], total_datas[buy_exec_index]["val"]["time"])
            val = num_dict.get(i)
            if val is None:
                val = cls.__compute_num(code, data, total_datas[buy_single_begin_index])
            total_num += val
            # 如果是减小项,且在处理数据的范围内,就需要判断是否要撤单了
            if val < 0 and start_index <= i <= end_index:
            # 在处理数据的范围内,就需要判断是否要撤单了
            if start_index <= i <= end_index:
                # 如果是减小项
                if val < 0:
                # 累计封单金额小于1000万
                if total_num < min_volumn:
                        # 与执行位相隔>=5s时规则生效
                        if exec_time_offset >= 5:
                    cancel_index = i
                    cancel_msg = "封单金额小于1000万"
                    break
@@ -1100,6 +1201,8 @@
                last_second_total_volumn = time_total_num_dict.get(time_list[-1])
                if last_second_total_volumn > 0 and (
                        last_second_total_volumn - total_num) / last_second_total_volumn >= threshold_rate:
                        # 与执行位相隔>=5s时规则生效
                        if exec_time_offset >= 5:
                    # 相邻2s内的数据减小50%
                    cancel_index = i
                    cancel_msg = "相邻2s({})内的封单量减小50%({}->{})".format(time_, last_second_total_volumn,
@@ -1113,13 +1216,38 @@
                        if last_2_second_total_volumn > last_second_total_volumn > total_num:
                            dif = last_2_second_total_volumn - total_num
                            if dif / last_2_second_total_volumn >= threshold_rate:
                                    # 与执行位相隔>=5s时规则生效
                                    if exec_time_offset >= 5:
                                cancel_index = i
                                cancel_msg = "相邻3s({})内的封单量(第3秒 与 第1的 减小比例)减小50%({}->{}->{})".format(time_,
                                                                                                     last_2_second_total_volumn,
                                                                                                     last_second_total_volumn,
                                                                                                     total_num)
                                break
                # ------大单撤处理-------
                # if total_num < min_volumn_big:
                if exec_time_offset < 1800:
                    try:
                        b_need_cancel, b_cancel_index = AverageBigNumComputer.need_cancel(code, buy_exec_index, i, i)
                        if b_need_cancel:
                            cancel_index = b_cancel_index
                            cancel_msg = "1分钟内大单撤销比例触发阈值"
                            break
                    except Exception as e:
                        logging.exception(e)
                # 30分钟外才执行
                elif 1800 <= exec_time_offset <= 5400:
                    try:
                        b_need_cancel, b_cancel_index = LongAverageBigNumComputer.need_cancel(code, buy_exec_index, i,
                                                                                              i)
                        if b_need_cancel:
                            cancel_index = b_cancel_index
                            cancel_msg = "60s-1h内大单撤销比例触发阈值"
                            break
                    except Exception as e:
                        logging.exception(e)
                # ------大单撤处理结束-------
        if not with_cancel:
            cancel_index = None
@@ -1173,7 +1301,7 @@
            return -1
        return int(val)
    # 清除数据
    # 清除数据,当取消成功与买入之前需要清除数据
    @classmethod
    def delete(cls, code):
        key = "limit_up_sell_num-{}".format(code)
@@ -1181,22 +1309,29 @@
        key = "limit_up_sell_index-{}".format(code)
        cls.__get_redis().delete(key)
    @classmethod
    def clear(cls):
        keys = cls.__get_redis().keys("limit_up_sell_num-*")
        for k in keys:
            cls.__get_redis().delete(k)
    # 处理数据,返回是否需要撤单
    # 处理范围:买入执行位-当前最新位置
    @classmethod
    def process(cls, code, start_index, end_index, buy_exec_index):
        # 获取涨停卖的阈值
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        zyltgb = l2_trade_factor.L2TradeFactorUtil.get_zyltgb(code)
        threshold_num = zyltgb * 0.015 // (limit_up_price * 100)
        threshold_num = int(zyltgb * 0.015) // (limit_up_price * 100)
        total_num = cls.__get_sell_data(code)
        cancel_index = None
        process_index = cls.__get_process_index(code)
        total_datas = local_today_datas.get(code)
        for i in range(start_index, end_index + 1):
            if i < buy_exec_index:
                continue
            if i <= process_index:
                continue
            total_datas = local_today_datas.get(code)
            if L2DataUtil.is_limit_up_price_sell(total_datas[i]["val"]):
                num = int(total_datas[i]["val"]["num"])
                cls.__incre_sell_data(code, num)
@@ -1210,10 +1345,330 @@
            process_index = end_index
        # 保存处理的位置
        cls.__save_process_index(code, process_index)
        return cancel_index
        if cancel_index is not None:
            return total_datas[cancel_index], "板上卖的手数{} 超过{}".format(total_num, threshold_num)
        return None, ""
    @classmethod
    def test(cls):
        code = "003005"
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = 123123
        cls.process(code, 126, 171, 126)
# 平均大单计算
class AverageBigNumComputer:
    __redis_manager = redis_manager.RedisManager(0)
    __place_order_time_dict = {}
    @classmethod
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __save_average_data(cls, code, average_num, average_up_count, start_index, end_index):
        key = "average_big_num-{}".format(code)
        cls.__getRedis().setex(key, 2000, json.dumps((average_num, average_up_count, start_index, end_index)))
        L2TradeDataProcessor.cancel_debug(code, "保存短大单位置信息:平均手数-{} 大单数量-{} 计算开始范围-{}:{}".format(average_num,
                                                                                                average_up_count,
                                                                                                start_index,
                                                                                                end_index))
    @classmethod
    def __get_average_data(cls, code):
        key = "average_big_num-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    @classmethod
    def __save_compute_info(cls, code, cancel_count, process_index):
        key = "average_big_num_comput_info-{}".format(code)
        cls.__getRedis().setex(key, 2000, json.dumps((cancel_count, process_index)))
    @classmethod
    def __get_compute_info(cls, code):
        key = "average_big_num_comput_info-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
    @classmethod
    def __clear_data(cls, code):
        key = "average_big_num_comput_info-{}".format(code)
        cls.__getRedis().delete(key)
        key = "average_big_num-{}".format(code)
        cls.__getRedis().delete(key)
    # 计算平均手数
    # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
    @classmethod
    def compute_average_big_num(cls, code, buy_exec_index, start_index, end_index):
        L2TradeDataProcessor.cancel_debug(code, "开始计算短大单位置")
        total_data = local_today_datas[code]
        num = 0
        count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                count += data["re"]
                num += int(val["num"])
        average_num = num // count
        average_up_count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                if int(val["num"]) >= average_num:
                    average_up_count += data["re"]
        print("平均大单:", average_num, average_up_count)
        # 保存数据
        cls.__save_average_data(code, average_num, average_up_count, start_index, end_index)
        cls.__save_compute_info(code, 0, buy_exec_index)
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index):
        average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
        if average_num is None:
            return False, None
        cancel_count, process_index = cls.__get_compute_info(code)
        total_data = local_today_datas[code]
        try:
            for i in range(start_index, end_index + 1):
                if i <= buy_exec_index:
                    continue
                if process_index >= i:
                    continue
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["num"]) >= average_num:
                    # 查询买入位置
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                        # 买入位置要在平均值计算范围内
                        cancel_count += data["re"]
                        process_index = i
                        print("撤销大单", cancel_count)
                        if cancel_count / average_up_count >= 0.49:
                            return True, i
        finally:
            cls.__save_compute_info(code, cancel_count, process_index)
        return False, None
    # 是否需要计算
    @classmethod
    def is_need_compute_average(cls, code):
        data = cls.__place_order_time_dict.get(code)
        if data is None:
            return False, None, None
        elif t.time() - data[0] < 0.5:
            # 500ms内的数据才需要计算average
            cls.__place_order_time_dict.pop(code)
            return True, data[1], data[2]
        return False, None, None
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index):
        cls.__clear_data(code)
        cls.__place_order_time_dict[code] = (t.time(), buy_single_index, buy_exec_index)
        # 以防万一,先保存下单信息
        total_data = local_today_datas[code]
        cls.compute_average_big_num(code, buy_exec_index, buy_single_index, total_data[-1]["index"])
    @classmethod
    def __test(cls, datas):
        code = datas[0]
        load_l2_data(code)
        L2TradeDataProcessor.random_key[code] = 123123
        # 先执行下单
        buy_single_index = datas[1]
        buy_exec_index = datas[2]
        local_today_datas[code] = local_today_datas[code][0:datas[4]]
        cls.place_order_success(code, buy_single_index, buy_exec_index)
        # 执行是否需要计算average
        if cls.is_need_compute_average(code):
            cls.compute_average_big_num(code, buy_exec_index, buy_single_index, datas[3])
        for i in range(buy_exec_index, datas[4]):
            cancel, index = cls.need_cancel(code, buy_exec_index, i, i)
            if cancel:
                print("需要撤单", cancel, index)
                break
    @classmethod
    def test(cls):
        # cls.__test(("601579", 311, 319, 347, 404))
        cls.__test(("601579", 311, 319, 327, 404))
        # 执行是否需要撤销
# 平均大单计算
class LongAverageBigNumComputer:
    __redis_manager = redis_manager.RedisManager(0)
    @classmethod
    def __getRedis(cls):
        return cls.__redis_manager.getRedis()
    @classmethod
    def __save_average_data(cls, code, average_num, average_up_count, start_index, end_index):
        L2TradeDataProcessor.cancel_debug(code, "获取到长大单位置信息:平均手数-{} 大单数量-{} 计算开始范围-{}:{}".format(average_num,
                                                                                                 average_up_count,
                                                                                                 start_index,
                                                                                                 end_index))
        key = "l_average_big_num-{}".format(code)
        cls.__getRedis().setex(key, 3600, json.dumps((average_num, average_up_count, start_index, end_index)))
    @classmethod
    def __get_average_data(cls, code):
        key = "l_average_big_num-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None, None, None
        val = json.loads(val)
        return val[0], val[1], val[2], val[3]
    @classmethod
    def __save_compute_info(cls, code, cancel_count, process_index):
        key = "l_average_big_num_comput_info-{}".format(code)
        cls.__getRedis().setex(key, 3600, json.dumps((cancel_count, process_index)))
    @classmethod
    def __get_compute_info(cls, code):
        key = "l_average_big_num_comput_info-{}".format(code)
        val = cls.__getRedis().get(key)
        if val is None:
            return None, None
        val = json.loads(val)
        return val[0], val[1]
    @classmethod
    def __clear_data(cls, code):
        key = "l_average_big_num_comput_info-{}".format(code)
        cls.__getRedis().delete(key)
        key = "l_average_big_num-{}".format(code)
        cls.__getRedis().delete(key)
    # 计算平均手数
    # 计算范围:买入信号起始点到买入执行位的下一张图结束点数据为止
    @classmethod
    def compute_average_big_num(cls, code, buy_single_index, buy_exec_index):
        total_data = local_today_datas[code]
        end_index = total_data[-1]["index"]
        start_index = buy_single_index
        if tool.trade_time_sub(total_data[end_index]["val"]["time"], total_data[buy_exec_index]["val"]["time"]) < 3:
            return
        exec_time = total_data[buy_exec_index]["val"]["time"]
        o_average_num, o_average_up_count, o_start_index, o_start_index = cls.__get_average_data(code)
        if o_average_num is not None:
            return
        # 获取买入执行位后2s的数据末位
        for i in range(end_index, buy_exec_index, - 1):
            time_ = total_data[i]["val"]["time"]
            if tool.trade_time_sub(time_, exec_time) <= 2:
                end_index = i
                break
        num = 0
        count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                count += data["re"]
                num += int(val["num"])
        average_num = num / count
        average_num = round(average_num)
        average_up_count = 0
        for i in range(start_index, end_index + 1):
            data = total_data[i]
            val = data["val"]
            if L2DataUtil.is_limit_up_price_buy(val):
                if int(val["num"]) >= average_num:
                    average_up_count += data["re"]
        # 保存数据
        cls.__save_average_data(code, average_num, average_up_count, start_index, end_index)
        cls.__save_compute_info(code, 0, buy_exec_index)
    # 是否需要撤单
    @classmethod
    def need_cancel(cls, code, buy_exec_index, start_index, end_index):
        average_num, average_up_count, a_start_index, a_end_index = cls.__get_average_data(code)
        if average_num is None:
            return False, None
        cancel_count, process_index = cls.__get_compute_info(code)
        total_data = local_today_datas[code]
        try:
            for i in range(start_index, end_index + 1):
                if i <= buy_exec_index:
                    continue
                if process_index >= i:
                    continue
                data = total_data[i]
                val = data["val"]
                if L2DataUtil.is_limit_up_price_buy_cancel(val) and int(val["num"]) >= average_num:
                    # 查询买入位置
                    buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(data,
                                                                                     local_today_num_operate_map.get(
                                                                                         code))
                    if buy_index is not None and a_start_index <= buy_index <= a_end_index:
                        # 买入位置要在平均值计算范围内
                        cancel_count += data["re"]
                        process_index = i
                        if tool.trade_time_sub(val["time"], total_data[buy_exec_index]["val"]["time"]) > 3600:
                            continue
                        sj = 0  # 5 * tool.trade_time_sub(val["time"],total_data[buy_exec_index]["val"]["time"])
                        print("计算结果", cancel_count, average_up_count, sj)
                        if cancel_count / (average_up_count - sj) >= 0.79:
                            return True, i
        finally:
            cls.__save_compute_info(code, cancel_count, process_index)
        return False, None
    # 下单成功
    @classmethod
    def place_order_success(cls, code, buy_single_index, buy_exec_index):
        cls.__clear_data(code)
    @classmethod
    def __test(cls, datas):
        code = datas[0]
        load_l2_data(code)
        # 先执行下单
        buy_single_index = datas[1]
        buy_exec_index = datas[2]
        cls.__clear_data(code)
        cls.place_order_success(code, buy_single_index, buy_exec_index)
        # 执行是否需要计算average
        cls.compute_average_big_num(code, buy_single_index, buy_exec_index)
        for i in range(buy_exec_index + 1, datas[4]):
            cancel, index = cls.need_cancel(code, buy_exec_index, i, i)
            if cancel:
                print("需要撤单", cancel, index)
                break
    @classmethod
    def test(cls):
        cls.__test(("002528", 212, 219, 372, 601))
        cls.__test(("003005", 212, 219, 372, 601))
        # 执行是否需要撤销
if __name__ == "__main__":
    L2TradeDataProcessor.test3()
    print("----------------------")
    L2LimitUpSellStatisticUtil.test()
    print(t.time())
    # L2TradeDataProcessor.test()
l2_data_test.py
@@ -1,3 +1,4 @@
import datetime
import json
import l2_data_manager
@@ -44,6 +45,29 @@
            l2_data_manager.local_today_datas[code].extend(add_datas)
            print(l2_data_manager.local_today_datas[code])
    def get_space_position(self, code):
        date = datetime.datetime.now().strftime("%Y-%m-%d")
        path = "D:/logs/gp/l2/l2_process.{}.log".format(date)
        list = []
        with open(path, encoding="utf-8") as f:
            while True:
                line = f.readline()
                if line:
                    if line.find(code) > -1:
                        start = line.find("处理数据范围:")
                        end = line.find("处理时间:")
                        line = (line[start:end])
                        line = line[line.find(":") + 1:len(line)]
                        line = line.strip()
                        print(line)
                        list.append((int(line.split("-")[0]), int(line.split("-")[1])))
                else:
                    break
        return list
if __name__ == '__main__':
    L2DataTest().test_concat_l2_data()
    L2DataTest().get_space_position("002094")
l2_trade_factor.py
@@ -3,10 +3,13 @@
"""
# l2交易因子
import functools
import big_money_num_manager
import global_data_loader
import global_util
import limit_up_time_manager
import trade_data_manager
class L2TradeFactorUtil:
@@ -220,22 +223,24 @@
    def get_safe_buy_count(cls, code):
        gb = cls.get_zyltgb(code)
        if not gb:
            # 默认10笔
            # 默认8笔
            return 8
        count = gb // 100000000 - 2
        if count > 30:
            count = 30
        elif count < 3:
            count = 3
        count = gb // 100000000
        if count <= 6:
            count = 8
        elif count < 32:
            count = round(8 + 0.5 * (count - 6))
        else:
            count = 21
        volumn_day60_max, volumn_yest, volumn_today = cls.__get_volumns(code)
        rate = cls.get_volumn_rate(volumn_day60_max, volumn_yest, volumn_today)
        # 取大单影响值与行业影响值的较大值
        count = round(count * (1 - rate))
        if count < 3:
            count = 3
        elif count > 30:
            count = 30
        if count < 8:
            count = 8
        elif count > 21:
            count = 21
        return count
@@ -256,10 +261,10 @@
if __name__ == "__main__":
    print(L2TradeFactorUtil.get_industry_rate(10))
    print(L2TradeFactorUtil.get_safe_buy_count("003005"))
    # print(L2TradeFactorUtil.get_rate_factors("003004"))
    # print(L2TradeFactorUtil.factors_to_string("003004"))
    # print(L2TradeFactorUtil.get_safe_buy_count("002864"))
    print(L2TradeFactorUtil.get_safe_buy_count("002864"))
    # print(L2TradeFactorUtil.get_limit_up_time_rate("11:30:00"))
    # print(L2TradeFactorUtil.get_limit_up_time_rate("13:00:00"))
    # print(L2TradeFactorUtil.get_limit_up_time_rate("13:48:00"))
l2_trade_test.py
@@ -1,8 +1,14 @@
# 交易测试
# 清除交易数据
import unittest
from unittest import mock
import big_money_num_manager
import l2_data_manager
import redis_manager
import trade_manager
from l2_data_manager import TradePointManager
# from l2_data_manager_new import L2TradeDataProcessor, L2LimitUpMoneyStatisticUtil, AverageBigNumComputer
def clear_trade_data(code):
@@ -26,4 +32,31 @@
        redis_info.delete(k)
# class TestTrade(unittest.TestCase):
#     processor = L2TradeDataProcessor()
#     code = "002094"
#     l2_data_manager.load_l2_data(code)
#     l2_data_manager.local_today_datas[code] = l2_data_manager.local_today_datas[code][0:520]
#     buy_single_index = 426
#     buy_exec_index = 479
#     processor.random_key[code] = mock.Mock(return_value=123123)
#     L2LimitUpMoneyStatisticUtil._L2LimitUpMoneyStatisticUtil__get_l2_latest_money_record = mock.Mock(
#         return_value=(0, -1))
#
#     AverageBigNumComputer.place_order_success(code, buy_single_index, buy_exec_index)
#
#     L2LimitUpMoneyStatisticUtil.process_data(code, buy_single_index, buy_exec_index, buy_single_index,
#                                              buy_exec_index, False)
#
#     l2_data_manager.TradePointManager.get_buy_compute_start_data = mock.Mock(return_value=(426, 479, 479, 0, 100))
#     buy_single_index, buy_exec_index, compute_index, num, count = l2_data_manager.TradePointManager.get_buy_compute_start_data(
#         code)
#     processor.unreal_buy_dict[code] = mock.Mock(return_value=(479, 167234623))
#
#
#     processor.process_order(code, 480, 519, 167234623, False)
#     print(buy_single_index, buy_exec_index, compute_index, num, count)
# if __name__ == "__main__":
#     unittest.main()
l2_trade_util.py
@@ -5,6 +5,7 @@
__redis_manager = redis_manager.RedisManager(2)
#  初始化禁止交易代码库
def init_forbidden_trade_codes():
    key = "forbidden-trade-codes"
@@ -14,6 +15,7 @@
        redis.delete(key)
    redis.sadd(key, "000000")
    redis.expire(key, tool.get_expire())
# 移除禁止交易代码
def remove_from_forbidden_trade_codes(code):
@@ -29,6 +31,7 @@
    redis.sadd(key, code)
    redis.expire(key, tool.get_expire())
# 禁止代码交易
def forbidden_trade(code):
    add_to_forbidden_trade_codes(code)
@@ -40,3 +43,5 @@
    key = "forbidden-trade-codes"
    redis = __redis_manager.getRedis()
    return redis.sismember(key, code)
limit_up_time_manager.py
@@ -5,8 +5,12 @@
import redis_manager
import tool
import global_util
import functools
_redisManager = redis_manager.RedisManager(0)
def save_limit_up_time(code, time):
@@ -36,34 +40,5 @@
        global_util.limit_up_time[code] = redis.get(key)
# 板块强度排序
def sort_code_by_limit_time(codes):
    if not global_util.limit_up_time:
        load_limit_up_time()
    list = []
    for code in codes:
        limit_up_time = global_util.limit_up_time.get(code)
        if limit_up_time is not None:
            list.append((code, limit_up_time))
    new_s = sorted(list, key=lambda e: int(e[1].replace(":", "")))
    dict_ = {}
    # 相同值为同一排序
    sort_index = 0
    for i in range(0, len(new_s)):
        if new_s[i - 1][1] != new_s[i][1] and i > 0:
            sort_index += 1
        dict_[new_s[i][0]] = sort_index
    return dict_
if __name__ == "__main__":
    list = [("1234578", "09:00:03"), ("12345", "09:00:00"), ("123456", "09:00:00"), ("123457", "09:00:03")]
    new_s = sorted(list, key=lambda e: int(e[1].replace(":", "")))
    dict_ = {}
    # 相同值为同一排序
    sort_index = 0
    for i in range(0, len(new_s)):
        if new_s[i - 1][1] != new_s[i][1] and i > 0:
            sort_index += 1
        dict_[new_s[i][0]] = sort_index
    print(dict_)
    list = [("1234578", "09:00:03",None), ("12345", "09:00:01",True), ("123456", "09:00:00",True), ("123457", "09:00:04",False)]
log.py
@@ -35,6 +35,13 @@
        logger.add(self.get_path("l2", "l2_trade"), filter=lambda record: record["extra"].get("name") == "l2_trade",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_data"), filter=lambda record: record["extra"].get("name") == "l2_data",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_latest_data"), filter=lambda record: record["extra"].get("name") == "l2_latest_data",
                   rotation="00:00", compression="zip", enqueue=True)
        # 显示在控制台
        logger.add(sys.stdout,
                   filter=lambda record: record["extra"].get("name") == "l2_trade")
@@ -48,6 +55,10 @@
        logger.add(self.get_path("l2", "l2_big_data"),
                   filter=lambda record: record["extra"].get("name") == "l2_big_data",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("l2", "l2_trade_queue"),
                   filter=lambda record: record["extra"].get("name") == "l2_trade_queue",
                   rotation="00:00", compression="zip", enqueue=True)
        logger.add(self.get_path("juejin", "juejin_tick"),
@@ -90,10 +101,13 @@
logger_l2_error = __mylogger.get_logger("l2_error")
logger_l2_process = __mylogger.get_logger("l2_process")
logger_l2_process_time = __mylogger.get_logger("l2_process_time")
logger_l2_data = __mylogger.get_logger("l2_data")
logger_l2_latest_data = __mylogger.get_logger("l2_latest_data")
logger_l2_trade = __mylogger.get_logger("l2_trade")
logger_l2_trade_cancel = __mylogger.get_logger("l2_trade_cancel")
logger_l2_trade_buy = __mylogger.get_logger("l2_trade_buy")
logger_l2_trade_queue = __mylogger.get_logger("l2_trade_queue")
logger_l2_big_data = __mylogger.get_logger("l2_big_data")
logger_juejin_tick = __mylogger.get_logger("juejin_tick")
@@ -169,21 +183,36 @@
    return tool.time_seconds_format(s - 2 - cha)
def load_l2_from_log():
    today_data = {}
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    with open("D:/logs/gp/l2/l2_data.{}.log".format(date), mode='r') as f:
        while True:
            data = f.readline()
            if not data:
                break
            index = data.find('save_l2_data:')
            index = data.find('-', index)
            data = data[index + 1:].strip()
            code = data[0:6]
            data = data[7:]
            dict_ = eval(data)
            if code not in today_data:
                today_data[code] = dict_
            else:
                today_data[code].extend(dict_)
    for key in today_data:
        news = sorted(today_data[key], key=lambda x: x["index"])
        today_data[key] = news
        print(key, len(today_data[key]) - 1, today_data[key][-1]["index"])
    return today_data
if __name__ == '__main__':
    # logger_l2_process_time.info("test123")
    date = datetime.datetime.now().strftime("%Y-%m-%d")
    LogUtil.extract_log_from_key("000666", "D:/logs/gp/ths/buy_1_volumn_record.{}.log".format(date),
                                 "D:/logs/gp/ths/buy_1_volumn_record{}.{}.log".format("000666", date))
    # __analyse_pricess_time()
    # with open("D:\\logs\\gp\\ths\\buy_1_volumn_record002911.2022-12-01.log",encoding="utf-8") as f:
    #     line = "1"
    #     while line:
    #         line = f.readline()
    #         line = (line.split("-")[-1].replace("'","\""))
    #         data = json.loads(line)
    #         print(compute_space_time(data["time"]),data["volumn"])
    #
    # print( compute_space_time("10:00:06"))
    LogUtil.extract_log_from_key("002193", "D:/logs/gp/l2/l2_trade_queue.{}.log".format(date),
                                 "D:/logs/gp/l2/l2_trade_queue{}.{}.log".format("002193", date))
    # parse_l2_data()
server.py
@@ -31,8 +31,9 @@
import l2_code_operate
from code_data_util import ZYLTGBUtil
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record
from trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager
from log import logger_l2_error, logger_device, logger_trade_delegate, logger_buy_1_volumn_record, \
    logger_l2_trade_queue, logger_l2_latest_data
from trade_queue_manager import THSBuy1VolumnManager, Buy1PriceManager, thsl2tradequeuemanager
class MyTCPServer(socketserver.TCPServer):
@@ -50,8 +51,12 @@
    l2_data_error_dict = {}
    last_trade_delegate_data = None
    buy1_volumn_manager = THSBuy1VolumnManager()
    ths_l2_trade_queue_manager = thsl2tradequeuemanager()
    latest_buy1_volumn_dict = {}
    buy1_price_manager = Buy1PriceManager()
    l2_trade_queue_time_dict = {}
    l2_save_time_dict = {}
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -90,8 +95,14 @@
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
                            _str)
                        # 间隔1s保存一条l2的最后一条数据
                        if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[code] >= 1000 and len(datas) > 0:
                            self.l2_save_time_dict[code] = origin_start_time
                            logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1])
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
                        # print("截图时间:", process_time)
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                           "截图时间:{} 数据解析时间".format(process_time))
@@ -149,23 +160,26 @@
                        logging.exception(e)
                elif type == 1:
                    # 设置股票代码
                    data_list = data_process.parseGPCode(_str)
                    data_list, is_add = data_process.parseGPCode(_str)
                    ZYLTGBUtil.save_list(data_list)
                    code_list = []
                    for data in data_list:
                        code_list.append(data["code"])
                    # 获取基本信息
                    code_datas = juejin.JueJinManager.get_gp_latest_info(code_list)
                    if is_add:
                        gpcode_manager.add_gp_list(code_datas)
                    else:
                    gpcode_manager.set_gp_list(code_datas)
                    if not is_add:
                    # 同步同花顺目标代码
                    t1 = threading.Thread(target=lambda: sync_target_codes_to_ths())
                    t1.setDaemon(True)
                    t1.start()
                elif type == 2:
                    # 涨停代码
                    dataList = data_process.parseGPCode(_str)
                    dataList, is_add = data_process.parseGPCode(_str)
                    # 设置涨停时间
                    gpcode_manager.set_limit_up_list(dataList)
                    # 保存到内存中
@@ -184,8 +198,8 @@
                            continue
                        # 获取是否有涨停时间
                        if limit_up_time_manager.get_limit_up_time(d["code"]) is None:
                            limit_up_time_manager.save_limit_up_time(d["code"], d["time"])
                        # if limit_up_time_manager.get_limit_up_time(d["code"]) is None:
                        #     limit_up_time_manager.save_limit_up_time(d["code"], d["time"])
                elif type == 3:
                    # 交易成功信息
@@ -223,6 +237,32 @@
                    money = datas["money"]
                    # TODO存入缓存文件
                    trade_manager.set_available_money(client, money)
                # l2交易队列
                elif type == 10:
                    # 可用金额
                    datas = data_process.parseData(_str)
                    channel = datas["channel"]
                    code = datas["code"]
                    data = datas["data"]
                    buy_time = data["buyTime"]
                    buy_one_price = data["buyOnePrice"]
                    buy_one_volumn = data["buyOneVolumn"]
                    # 保存最近的记录
                    if self.ths_l2_trade_queue_manager.save_recod(code, data):
                        if buy_time != "00:00:00":
                            logger_l2_trade_queue.info("{}-{}", code, data)
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, buy_time,
                                                                                               int(buy_one_volumn),
                                                                                               buy_one_price)
                            if need_cancel:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg)
                            if need_sync:
                                # 同步数据
                                l2_data_manager_new.L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn),
                                                                                           buy_time)
                    # print(buy_time, buy_one_price, buy_one_volumn)
                    # print("L2买卖队列",datas)
                elif type == 20:
                    # 登录
                    data = data_process.parse(_str)["data"]
@@ -287,6 +327,12 @@
                        l2_clients = authority.get_l2_clients()
                        if client_id in l2_clients:
                            alert_util.alarm()
                elif type == 60:
                    # 心跳信息
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
                    print("L2自启动成功", client_id)
                    # print("心跳:", client_id)
                sk.send(return_str.encode())
@@ -368,6 +414,7 @@
if __name__ == "__main__":
    try:
        repair_ths_main_site(2)
        thsl2tradequeuemanager().test()
        # repair_ths_main_site(2)
    except Exception as e:
        print(str(e))
settings.py
New file
@@ -0,0 +1,26 @@
# 设置模块
import redis_manager
__redis_manager = redis_manager.RedisManager(2)
def __get_redis():
    return __redis_manager.getRedis()
def is_accept_l2_data():
    val = __get_redis().get("not_accpt_l2_data")
    if val is None:
        return True
    else:
        if int(val) == 1:
            return False
        else:
            return True
def set_accept_l2_data(val):
    if val:
        __get_redis().set("not_accpt_l2_data", 0)
    else:
        __get_redis().set("not_accpt_l2_data", 1)
trade_data_manager.py
@@ -210,9 +210,6 @@
        count = self.__get_redis().get(key)
        return 0 if count is None else count
    def process_rate(self, code, rate, time_str):
        # 9点半之前的数据不处理
        if int(time_str.replace(":", "")) < int("093000"):
@@ -238,6 +235,10 @@
        global_util.cuurent_prices[code] = (price, is_limit_up, round(time.time()))
        pass
    # 获取现价
    def get_current_price(self, code):
        return global_util.cuurent_prices.get(code)
    # 现价代码数量
    def save_current_price_codes_count(self, count):
        self.__save_current_price_codes_count(count)
@@ -253,6 +254,13 @@
        else:
            return time_seconds >= constant.UNDER_WATER_PRICE_TIME_AS_SECONDS
    # 当前代码是否涨停
    def current_is_limit_up(self, code):
        data = self.get_current_price(code)
        if data is None:
            return None
        return data[1]
if __name__ == "__main__":
    processor = CodeActualPriceProcessor()
trade_gui.py
@@ -385,6 +385,8 @@
            self.buy_cancel_lock.release()
            # 清空代码框
            self.input_number(code_input, "")
            # 再次清除代码框
            self.input_number(code_input, "")
    # 刷新交易窗口数据
    @async_call
trade_manager.py
@@ -3,16 +3,13 @@
对一系列的代码交易变量,下单,撤单进行管理
"""
# 交易管理器
import json
import time
import gpcode_manager
import l2_trade_util
import mysql_data
import tool
from trade_data_manager import TradeBuyDataManager
from trade_gui import THSGuiTrade, THSBuyWinManagerNew
import trade_data_manager
from trade_gui import THSBuyWinManagerNew,THSGuiTrade
import time as t
import l2_data_manager
@@ -183,7 +180,7 @@
# 中断买入
def break_buy(code, reason):
    TradeBuyDataManager.remove_buy_position_info(code)
    trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
# 购买
@@ -206,7 +203,7 @@
    # 下单成功,加入固定代码库
    l2_data_manager.add_to_l2_fixed_codes(code)
    # 记录下单的那一帧图片的截图时间与交易用时
    TradeBuyDataManager.set_buy_position_info(code, capture_timestamp, use_time, last_data, last_data_index)
    trade_data_manager.TradeBuyDataManager.set_buy_position_info(code, capture_timestamp, use_time, last_data, last_data_index)
    print("买入结束")
    logger_trade.info("{}买入成功".format(code))
@@ -241,7 +238,7 @@
# 取消委托成功
def __cancel_success(code):
    TradeBuyDataManager.remove_buy_position_info(code)
    trade_data_manager.TradeBuyDataManager.remove_buy_position_info(code)
    # 下单成功,加入固定代码库
    l2_data_manager.remove_from_l2_fixed_codes(code)
    logger_trade.info("{}撤单成功".format(code))
@@ -267,6 +264,8 @@
                l2_data_manager.TradePointManager.delete_buy_point(code)
                # 移除交易窗口分配
                THSBuyWinManagerNew.cancel_distribute_win_for_code(code)
                #TODO 完全成交后移除L2
# 处理委托成功数据
trade_queue_manager.py
@@ -200,5 +200,49 @@
        return self.__get_record(code)
class thsl2tradequeuemanager:
    __redisManager = redis_manager.RedisManager(0)
    __filter_dict = {}
    def __get_redis(self):
        return self.__redisManager.getRedis()
    def __save_latest_recod(self, code, info):
        # 保存每一次的
        key = "ths_l2_latest_trade_info-{}".format(code)
        self.__get_redis().setex(key, tool.get_expire(), json.dumps(info))
    def __get_latest_record(self, code):
        key = "ths_l2_latest_trade_info-{}".format(code)
        val = self.__get_redis().get(key)
        if val is None:
            return None
        return json.loads(val)
    def save_recod(self, code, data):
        _str = json.dumps(data)
        if code in self.__filter_dict and self.__filter_dict[code] == _str:
            return False
        self.__filter_dict[code] = _str
        self.__save_latest_recod(code, data)
        buy_time = data["buyTime"]
        buy_one_price = data["buyOnePrice"]
        buy_one_volumn = data["buyOneVolumn"]
        sell_time = data["sellTime"]
        sell_one_price = data["sellOnePrice"]
        sell_one_volumn = data["sellOneVolumn"]
        return True
    def get_sell1_info(self, code):
        data = self.__get_latest_record(code)
        if data is None:
            return None, None, None
        else:
            sell_time = data["sellTime"]
            sell_one_price = data["sellOnePrice"]
            sell_one_volumn = data["sellOneVolumn"]
            return sell_time, sell_one_price, int(sell_one_volumn)
if __name__ == '__main__':
    print( Buy1PriceManager().get_price("002644"))
    thsl2tradequeuemanager().test()