Administrator
2022-10-12 be73e2b78857adaf006063275726b69c4c60f0d7
买撤策略修改;加入报警功能
16个文件已修改
3个文件已添加
553 ■■■■ 已修改文件
alert.mp3 补丁 | 查看 | 原始文档 | blame | 历史
alert_util.py 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
big_money_num_manager.py 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
code_data_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_process.py 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
global_util.py 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 99 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_code_operate.py 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 134 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_test.py 49 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_util.py 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_trade_factor.py 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
limit_up_time_manager.py 33 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths_industry_util.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths_util.py 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_gui.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
alert.mp3
Binary files differ
alert_util.py
New file
@@ -0,0 +1,34 @@
import time
import pygame
# 报警
def alarm():
    AlertUtil().stop_audio()
    AlertUtil().play_audio()
class AlertUtil:
    __instance = None
    # 单例模式
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(AlertUtil, cls).__new__(cls, *args, **kwargs)
            # 初始化设置
            pygame.mixer.init()
            pygame.mixer.music.load('alert.mp3')
            pygame.mixer.music.set_volume(1)
        return cls.__instance
    def play_audio(self):
        pygame.mixer.music.play()
    def stop_audio(self):
        pygame.mixer.music.stop()
if __name__ == '__main__':
    alarm()
    time.sleep(2)
big_money_num_manager.py
@@ -20,12 +20,17 @@
    redis.expire("big_money-{}".format(code), tool.get_expire())
def reset(code):
    redis = __redisManager.getRedis()
    redis.set("big_money-{}".format(code), 0)
def get_num(code):
    redis = __redisManager.getRedis()
    num = redis.get("big_money-{}".format(code))
    if num is None:
        return 0
    return num
    return int(num)
if __name__ == "__main__":
code_data_util.py
@@ -20,7 +20,7 @@
    @classmethod
    def get(cls, code):
        redis = _redisManager.getRedis()
        val = redis.get(code)
        val = redis.get("zyltgb-{}".format(code))
        if val is not None:
            return int(val)
        return None
data_process.py
@@ -11,6 +11,7 @@
# 统计今日卖出
# 统计今日买入
import ths_util
import tool
from code_data_util import ZYLTGBUtil
@@ -161,7 +162,7 @@
        if float(data["zyltgb"]) > 0:
            _list.append(_dict)
            # 保存10天
            ZYLTGBUtil.save(data["code"],data["zyltgb"],data["zyltgb_unit"])
            ZYLTGBUtil.save(data["code"], data["zyltgb"], data["zyltgb_unit"])
    mongo_data.save("ths-zylt", _list)
@@ -170,6 +171,7 @@
        return
    redis = __redisManager.getRedis();
    redis.setex("client-active-{}".format(client_id), 10, json.dumps((host, thsDead)))
    ths_util.set_ths_dead_state(client_id, thsDead)
def getValidL2Clients():
@@ -178,7 +180,9 @@
    client_ids = []
    for k in keys:
        _id = k.split("client-active-")[1]
        client_ids.append(_id)
        # 客户端同花顺没卡死才能加入
        if not ths_util.is_ths_dead(_id):
            client_ids.append(_id)
    l2_clients = authority.get_l2_clients()
    return list(set(client_ids).intersection(set(l2_clients)))
global_util.py
@@ -56,7 +56,7 @@
    for code in codes:
        result = ZYLTGBUtil.get(code)
        if result is not None:
            zyltgb_map[code]=result
            zyltgb_map[code] = result
# 加载量
gui.py
@@ -5,13 +5,13 @@
from tkinter.messagebox import *
import tkintertable
import win32gui
import alert_util
import data_export_util
import multiprocessing
import global_util
import redis_manager
import mongo_data
import server
@@ -163,7 +163,7 @@
            win.resizable(height=False, width=False)
            limit_up_datas = {}
            limit_up_datas["row{}".format(0)] = {'代码': '', '首次涨停时间': '', '现价': '','涨幅':'', '涨停封单额': ''}
            limit_up_datas["row{}".format(0)] = {'代码': '', '首次涨停时间': '', '现价': '', '涨幅': '', '涨停封单额': ''}
            cl = Label(win, text="更新时间:", bg="#DDDDDD", fg="#666666")
            cl.place(x=10, y=10)
@@ -218,6 +218,66 @@
        btn = Button(frame, text="今日涨停", command=get_limit_up_codes_win)
        btn.place(x=300, y=150)
    # 绘制交易状态
    def __draw_trade_state(self, frame):
        def refresh_data():
            normal=True
            if l2_code_operate.L2CodeOperate.is_read_queue_valid():
                cl_queue.configure(text="正常", foreground="#008000")
            else:
                cl_queue.configure(text="异常", foreground="#FF7F27")
                normal=False
            try:
                trade_gui.THSGuiTrade.checkEnv()
                cl_win.configure(text="正常", foreground="#008000")
            except Exception as e:
                normal = False
                cl_win.configure(text="异常:{}".format(str(e)),foreground="#FF7F27")
            # 状态有问题,需要报警
            if not normal:
                alert_util.alarm()
        def update_data():
            while True:
                # 刷新数据
                try:
                    if auo_refresh.get() > 0:
                        refresh_data()
                except:
                    pass
                time.sleep(2)
        start_y=230
        btn = Button(frame, text="刷新状态", command=refresh_data)
        btn.place(x=10, y=start_y)
        auo_refresh = IntVar()
        ch1 = Checkbutton(frame, text='自动刷新', variable=auo_refresh, onvalue=1, offvalue=0, background="#DDDDDD",
                          activebackground="#DDDDDD")
        # 默认自动刷新
        auo_refresh.set(1)
        ch1.place(x=100, y=start_y)
        y_=start_y+30
        cl = Label(frame, text="操作队列状态:", bg="#DDDDDD")
        cl.place(x=10, y=y_)
        cl_queue = Label(frame, text="未知", bg="#DDDDDD")
        cl_queue.place(x=100, y=y_)
        cl = Label(frame, text="交易窗口状态:", bg="#DDDDDD")
        cl.place(x=200, y=y_)
        cl_win = Label(frame, text="未知", bg="#DDDDDD")
        cl_win.place(x=300, y=y_)
        refresh_data()
        # 添加更新线程
        t1 = threading.Thread(target=lambda: update_data())
        # 后台运行
        t1.setDaemon(True)
        t1.start()
    # 绘制l2数据状态
    def __draw_l2_state(self, root):
        def update_data():
@@ -233,7 +293,7 @@
        def refresh_data():
            for client_id in code_sv_map:
                ip = data_process.getActiveClientIP(client_id)
                ths_dead=data_process.getTHSState(client_id)
                ths_dead = data_process.getTHSState(client_id)
                if ip is not None and len(ip) > 0:
                    if ths_dead:
                        client_state[client_id].configure(text="(在线:{})".format(ip), foreground="#FF7F27")
@@ -342,6 +402,7 @@
        ch1 = Checkbutton(frame, text='自动刷新', variable=auo_refresh, onvalue=1, offvalue=0, background="#DDDDDD",
                          activebackground="#DDDDDD")
        ch1.place(x=width - 80, y=5)
        auo_refresh.set(1)
        l2_client_count = 0
        code_sv_map = {}
@@ -350,13 +411,13 @@
        for key in self.l2_codes:
            client_lb = Label(frame, text="设备:{}".format(key), background="#DDDDDD")
            client_lb.place(x=30, y=40 + l2_client_count * 30)
            client_lb.place(x=38, y=40 + l2_client_count * 30)
            btn = Button(frame, text="检测", command=key)
            btn.bind('<Button-1>', check)
            btn.place(x=0, y=35 + l2_client_count * 30)
            btn.place(x=5, y=35 + l2_client_count * 30)
            client_state_lb = Label(frame, text="(未知)", background="#DDDDDD", font=('微软雅黑', 8))
            client_state_lb.place(x=75, y=40 + l2_client_count * 30)
            client_state_lb.place(x=80, y=40 + l2_client_count * 30)
            client_state[key] = client_state_lb
            code_sv_map[key] = []
@@ -471,7 +532,7 @@
        ch1 = Checkbutton(frame, text='自动刷新', variable=auo_refresh, onvalue=1, offvalue=0, background="#DDDDDD",
                          activebackground="#DDDDDD")
        ch1.place(x=width - 80, y=5)
        auo_refresh.set(1)
        # ------表头结束------
        # 委托表格
@@ -597,29 +658,28 @@
                showwarning('警告', e)
        def export_l2_data(code):
            if code not in  l2_data_manager.local_today_datas:
            if code not in l2_data_manager.local_today_datas:
                l2_data_manager.load_l2_data(code)
            datas = l2_data_manager.local_today_datas[code]
            try:
                path=data_export_util.export_l2_data(code,datas)
                showinfo("提示","导出成功,路径为:"+path)
                path = data_export_util.export_l2_data(code, datas)
                showinfo("提示", "导出成功,路径为:" + path)
            except Exception as e1:
                showerror("导出失败",str(e1))
                showerror("导出失败", str(e1))
        def export_l2_data_origin(code):
            redis = redis_manager.RedisManager(1).getRedis()
            keys = redis.keys("big_data-{}-*".format(code))
            try:
                for k in keys:
                    datas=redis.get(k)
                    datas=json.loads(datas)
                    datas=datas["data"]["data"]
                    datas = redis.get(k)
                    datas = json.loads(datas)
                    _t = k.split("-")[2]
                    k = time.strftime("%Y_%m_%d_%H_%M_%S_",time.localtime(float(_t)/1000))
                    k = "{}{}".format(k,_t[-3:])
                    data_export_util.export_l2_data_origin(code, datas,k)
                    k = time.strftime("%Y_%m_%d_%H_%M_%S_", time.localtime(float(_t) / 1000))
                    k = "{}{}".format(k, _t[-3:])
                    data_export_util.export_l2_data_origin(code, datas, k)
            except Exception as e1:
                    showerror("导出失败", str(e1))
                showerror("导出失败", str(e1))
            showinfo("提示", "导出完成")
@@ -685,6 +745,7 @@
        self.__draw_juejin(root)
        self._draw_check(root)
        self.__draw_l2_state(root)
        self.__draw_trade_state(root)
        self.__draw_trade_data(root)
        self.__draw_test(root)
juejin.py
@@ -86,9 +86,6 @@
        server.repair_ths_main_site(client)
def __run_schedule():
    while True:
        schedule.run_pending()
@@ -202,7 +199,8 @@
# 获取到现价
def accpt_price(code, price):
def accpt_price(code, price, price_from="juejin"):
    return
    gpcode_manager.set_price(code, price)
    # 获取收盘价
    pricePre = gpcode_manager.get_price_pre(code)
@@ -212,12 +210,12 @@
            logger_juejin_tick.info("{}-{}-{}", code, price, rate)
            if not gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(
                    code) and not gpcode_manager.is_listen_full():
                L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
                L2CodeOperate.get_instance().add_operate(1, code, "现价变化,rate-{} from-{}".format(rate, price_from))
            # 进入监控
        elif rate < 5:
            # 移除监控
            if gpcode_manager.is_listen(code) and not gpcode_manager.is_operate(code):
                L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
                L2CodeOperate.get_instance().add_operate(0, code, "现价变化,rate-{} from-{}".format(rate, price_from))
# 获取到现价
@@ -226,10 +224,10 @@
    now_strs = now_str.split(":")
    now_second = int(now_strs[0]) * 60 * 60 + int(now_strs[1]) * 60 + int(now_strs[2])
    start = 60 * 60 * 9 + 31 * 60
    if now_second > start:
    if False:
        for d in prices:
            code, price = d["code"], float(d["price"])
            accpt_price(code, price)
            accpt_price(code, price, "ths")
    else:
        _code_list = []
        _delete_list = []
@@ -241,14 +239,19 @@
            if pricePre is not None:
                rate = round((price - pricePre) * 100 / pricePre, 1)
                if rate >= 0:
                    # 暂存涨幅为正的代码
                    _code_list.append((rate, code))
                else:
                    # 暂存涨幅为负的代码
                    _delete_list.append((rate, code))
        # 排序
        new_code_list = sorted(_code_list, key=lambda e: e.__getitem__(0), reverse=True)
        client_ids = data_process.getValidL2Clients()
        # 最多填充的代码数量
        max_count = len(client_ids) * 8
        # 截取前几个代码填充
        add_list = new_code_list[:max_count]
        # 后面的代码全部删除
        _delete_list.extend(new_code_list[max_count:])
        add_code_list = []
@@ -259,11 +262,18 @@
        for d in _delete_list:
            del_list.append(d[1])
        for code in add_code_list:
            L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
        # 后面的代码数量
        # 先删除应该删除的代码
        for code in del_list:
            L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
            if gpcode_manager.is_listen(code):
                # 判断是否在监听里面
                L2CodeOperate.get_instance().add_operate(0, code, "现价变化")
        # 增加应该增加的代码
        for code in add_code_list:
            if not gpcode_manager.is_listen(code):
                L2CodeOperate.get_instance().add_operate(1, code, "现价变化")
        print(add_code_list, del_list)
@@ -386,4 +396,4 @@
if __name__ == '__main__':
    pass
    everyday_init()
l2_code_operate.py
@@ -65,10 +65,11 @@
        finally:
            cls.__lock.release()
    @staticmethod
    def send_operate():
    @classmethod
    def send_operate(cls):
        redis = L2CodeOperate.getRedis()
        while True:
            cls.set_read_queue_valid()
            try:
                data = redis.lpop("code_operate_queue")
                # print("读取操作队列", data, redis.llen("code_operate_queue"))
@@ -162,7 +163,8 @@
                    "data": {"index": int(pos), "code": code, "min_price": float(min_price),
                             "max_price": float(max_price)}}
            redis = self.redis_manager_.getRedis()
            redis.rpush("code_operate_queue", json.dumps({"type": 3, "code": code, "client": client_id, "data": data,  "create_time": round(time.time() * 1000)}))
            redis.rpush("code_operate_queue", json.dumps(
                {"type": 3, "code": code, "client": client_id, "data": data, "create_time": round(time.time() * 1000)}))
    # 移除监控
    def remove_l2_listen(self, code, msg):
@@ -181,6 +183,18 @@
            return int(value)
        return value
    # 设置读取队列有效
    @classmethod
    def set_read_queue_valid(cls):
        redis = cls.getRedis()
        redis.setex("operate_queue_read_state", 20, 1)
    @classmethod
    def is_read_queue_valid(cls):
        redis = cls.getRedis()
        return redis.get("operate_queue_read_state") is not None
# 获取客户端正在监听的代码
def get_listen_codes_from_client(client_id):
l2_data_manager.py
@@ -17,6 +17,7 @@
import l2_trade_factor
import redis_manager
import ths_industry_util
import tool
import trade_manager
from log import logger_l2_trade, logger_l2_trade_cancel, logger_l2_trade_buy, logger_l2_process
@@ -283,10 +284,10 @@
    def get_add_data(cls, code, datas, _start_index):
        if datas is not None and len(datas) < 1:
            return []
        last_key = ""
        __latest_datas = local_latest_datas.get(code)
        if __latest_datas is not None and len(__latest_datas) > 0:
            last_key = __latest_datas[-1]["key"]
        last_data = None
        latest_datas_ = local_latest_datas.get(code)
        if latest_datas_ is not None and len(latest_datas_) > 0:
            last_data = latest_datas_[-1]
        count = 0
        start_index = -1
@@ -294,13 +295,19 @@
        # 设置add_data的序号
        for n in reversed(datas):
            count += 1
            if n["key"] == last_key:
            if n["key"] == (last_data["key"] if last_data is not None else ""):
                start_index = len(datas) - count
                break
        _add_datas = []
        if len(last_key) > 0:
            if start_index < 0 or start_index + 1 >= len(datas):
        if last_data is not None:
            if start_index < 0:
                if L2DataUtil.get_time_as_second(datas[0]["val"]["time"]) >= L2DataUtil.get_time_as_second(
                        last_data["val"]["time"]):
                    _add_datas = datas
                else:
                    _add_datas = []
            elif start_index + 1 >= len(datas):
                _add_datas = []
            else:
                _add_datas = datas[start_index + 1:]
@@ -356,6 +363,9 @@
                else:
                    limitPrice = 0
                item["limitPrice"] = "{}".format(limitPrice)
            # 不需要非涨停数据/非跌停数据
            if int(item["limitPrice"]) == 0:
                continue
            operateType = item["operateType"]
            cancelTime = item["cancelTime"]
            cancelTimeUnit = item["cancelTimeUnit"]
@@ -471,9 +481,6 @@
                #                                                    add_datas)
                if len(add_datas) > 0:
                    _start_time = round(t.time() * 1000)
                    # 计算大单数量
                    cls.__compute_big_money_data(code, add_datas)
                    latest_time = add_datas[len(add_datas) - 1]["val"]["time"]
                    # 时间差不能太大才能处理
                    # TODO 暂时关闭处理
@@ -481,13 +488,13 @@
                        # 判断是否已经挂单
                        state = trade_manager.get_trade_state(code)
                        start_index = len(total_datas) - len(add_datas)
                        end_index = len(total_datas)-1
                        end_index = len(total_datas) - 1
                        if state == trade_manager.TRADE_STATE_BUY_DELEGATED or state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                            # 已挂单
                            cls.__process_order(code, start_index,end_index, capture_timestamp)
                            cls.__process_order(code, start_index, end_index, capture_timestamp)
                        else:
                            # 未挂单
                            cls.__process_not_order(code,start_index,end_index,capture_timestamp)
                            cls.__process_not_order(code, start_index, end_index, capture_timestamp)
                    logger_l2_process.info("code:{} 处理数据范围: {}-{} 处理时间:{}", code, add_datas[0]["index"],
                                           add_datas[-1]["index"], round(t.time() * 1000) - __start_time)
                # 保存数据
@@ -497,10 +504,12 @@
                cls.unreal_buy_dict.pop(code)
    @classmethod
    def __compute_big_money_data(cls, code, add_datas):
    def __compute_big_money_data(cls, code, start_index, end_index):
        # 计算大单
        total_datas = local_today_datas[code]
        num = 0
        for data in add_datas:
        for index in range(start_index, end_index + 1):
            data = total_datas[index]
            if l2_trade_factor.L2TradeFactorSourceDataUtil.is_big_money(data):
                if int(data["val"]["operateType"]) == 0:
                    num += data["re"]
@@ -693,6 +702,14 @@
    @classmethod
    def __buy(cls, code, capture_timestamp, last_data, last_data_index):
        can, reason = cls.__can_buy(code)
        # 不能购买
        if not can:
            cls.debug(code, "不可以下单,原因:{}", reason)
            return
        else:
            cls.debug(code, "可以下单,原因:{}", reason)
        # 删除虚拟下单
        if code in cls.unreal_buy_dict:
            cls.unreal_buy_dict.pop(code)
@@ -705,6 +722,36 @@
        except Exception as e:
            cls.debug(code, "执行买入异常:{}", str(e))
            pass
    # 是否可以买
    @classmethod
    def __can_buy(cls, code):
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None and L2DataUtil.get_time_as_second(limit_up_time) >= L2DataUtil.get_time_as_second(
                "14:30:00"):
            return False, "14:30后涨停的不能买,涨停时间为{}".format(limit_up_time)
        # 同一板块中老二后面的不能买
        industry, codes = ths_industry_util.get_same_industry_codes(code, gpcode_manager.get_gp_list())
        if industry is None:
            return True, "没有获取到行业"
        codes_index = limit_up_time_manager.sort_code_by_limit_time(codes)
        if codes_index is not None and codes_index.get(code) is not None and codes_index.get(code) > 1:
            return False, "同一板块中老三,老四,...不能买"
        # 13:00后涨停,本板块中涨停票数<29不能买
        limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        if limit_up_time is not None:
            if int(limit_up_time.replace(":", "")) >= 130000 and global_util.industry_hot_num.get(industry) is not None:
                if global_util.industry_hot_num.get(industry) < 29:
                    return False, "13:00后涨停,本板块中涨停票数<29不能买"
        # 老二,本板块中涨停票数<29 不能买
        if codes_index.get(code) is not None and codes_index.get(code) == 1 and global_util.industry_hot_num.get(
                industry) is not None:
            if global_util.industry_hot_num.get(industry) < 29:
                return False, "老二,本板块中涨停票数<29不能买"
        # 可以下单
        return True, None
    @classmethod
    def __cancel_buy(cls, code):
@@ -731,6 +778,7 @@
            cls.unreal_buy_dict.pop(code)
        else:
            cls.__cancel_buy(code)
        L2BigNumProcessor.del_big_num_pos(code)
    @classmethod
    def __start_compute_buy(cls, code, compute_start_index, compute_end_index, threshold_money, capture_time,
@@ -753,11 +801,14 @@
                new_get_pos = True
                cls.debug(code, "获取到买入信号起始点:{}  数据:{}", buy_single_index, total_datas[buy_single_index])
                limit_up_time_manager.save_limit_up_time(code, total_datas[buy_single_index]["val"]["time"])
                # 重置大单计算
                big_money_num_manager.reset(code)
        if buy_single_index is None:
            # 未获取到买入信号,终止程序
            return None
        # TODO 可能存在问题 计算大单数量
        cls.__compute_big_money_data(code, max(compute_start_index, buy_single_index), compute_end_index)
        # 买入纯买额统计
        compute_index, buy_nums, rebegin_buy_pos = cls.__sum_buy_num_for_order_3(code, max(buy_single_index,
                                                                                           compute_start_index),
@@ -792,7 +843,7 @@
                                                                                       compute_index)
            # 计算大群撤的大单
            L2BetchCancelBigNumProcessor.process_new(code, buy_single_index, compute_index)
            # 连续涨停数计算
            L2ContinueLimitUpCountManager.process(code, buy_single_index, compute_index)
            # 数据是否处理完毕
@@ -846,7 +897,8 @@
            if L2DataUtil.get_time_as_second(_val["time"]) < second_930:
                continue
            if L2DataUtil.is_limit_up_price_buy(_val) and (last_index is None or (i - last_index == 1 and datas[last_index]["val"]["time"] == datas[i]["val"]["time"])):
            if L2DataUtil.is_limit_up_price_buy(_val) and (last_index is None or (
                    i - last_index == 1 and datas[last_index]["val"]["time"] == datas[i]["val"]["time"])):
                if start is None:
                    start = i
                last_index = i
@@ -925,10 +977,6 @@
            return start, end_index
        else:
            return None, None
    # 是否可以下单
    def __is_can_order(self):
        pass
    # 虚拟下单
    def __unreal_order(self):
@@ -1264,14 +1312,22 @@
    def test1(cls):
        code = "000593"
        load_l2_data(code, True)
        print( cls.__compute_order_begin_pos(code,232,3,239))
        print(cls.__compute_order_begin_pos(code, 232, 3, 239))
    @classmethod
    def test2(cls):
        code = "000677"
        code = "600082"
        load_l2_data(code, True)
        cls.random_key[code] = random.randint(0, 100000)
        L2BetchCancelBigNumProcessor.process_new(code, 57, 150)
        need_cancel, cancel_data = L2BigNumProcessor.process_cancel_with_big_num(code, 121, 123)
    @classmethod
    def test_can_order(cls):
        code = "002393"
        global_util.load_industry()
        limit_up_time_manager.load_limit_up_time()
        print(cls.__can_buy(code))
# 连续涨停买单数最大值管理器
@@ -1396,7 +1452,7 @@
class L2BigNumProcessor:
    # 是否需要根据大单撤单,返回是否需要撤单与撤单信号的数据
    @classmethod
    def __need_cancel_with_max_num(cls, code, max_num_info):
    def __need_cancel_with_max_num(cls, code, max_num_info, start_index, end_index):
        if max_num_info is None:
            return False, None
        # 如果是买入单,需要看他前面同一秒是否有撤单
@@ -1409,6 +1465,9 @@
                if cancel_datas is not None:
                    for cancel_data in cancel_datas:
                        # 只能在当前规定的数据范围查找,以防出现重复查找
                        if cancel_data["index"] < start_index or cancel_data["index"] > end_index:
                            continue
                        if cancel_data["index"] > max_num_info["index"]:
                            buy_index, buy_data = l2_data_util.get_buy_data_with_cancel_data(cancel_data,
                                                                                             local_today_num_operate_map[
@@ -1417,6 +1476,7 @@
                                continue
                            if buy_data["val"]["time"] != max_num_info["val"]["time"]:
                                continue
                            min_space, max_space = l2_data_util.compute_time_space_as_second(
                                cancel_data["val"]["cancelTime"],
                                cancel_data["val"][
@@ -1491,7 +1551,7 @@
        return index
    @classmethod
    def __del_big_num_pos(cls, code):
    def del_big_num_pos(cls, code):
        redis = _redisManager.getRedis()
        redis.delete("big_num_pos-{}".format(code))
@@ -1499,7 +1559,6 @@
    def __cancel_buy(cls, code, index):
        L2TradeDataProcessor.debug(code, "撤买,触发位置-{},触发条件:大单,数据:{}", index, local_today_datas[code][index])
        L2TradeDataProcessor.cancel_buy(code)
        cls.__del_big_num_pos(code)
    # 处理数据中的大单,返回是否已经撤单和撤单数据的时间
    @classmethod
@@ -1521,12 +1580,12 @@
            L2TradeDataProcessor.debug(code, "获取到大单位置信息:{}", json.dumps(new_max_info))
            index = new_max_info["index"]
            # 大单是否有撤单信号
            need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, new_max_info)
            need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, new_max_info, start_index, end_index)
            if need_cancel:
                # 需要撤单
                # 撤单
                L2TradeDataProcessor.cancel_debug(code, "新找到大单-{},需要撤买", new_max_info["index"])
                cls.__cancel_buy(code, index)
                cls.__cancel_buy(code, new_max_info["index"])
                return True, cancel_data,
            else:
@@ -1538,7 +1597,7 @@
            # 有大单记录
            need_cancel = False
            cancel_index = -1
            need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, total_data[index])
            need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, total_data[index], start_index, end_index)
            # 需要撤单
            if need_cancel:
                # 撤单
@@ -1554,7 +1613,8 @@
                L2TradeDataProcessor.debug(code, "找到大单位置信息:{}", json.dumps(max_num_data))
                # 大单是否有撤单信号
                need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, max_num_data)
                need_cancel, cancel_data = cls.__need_cancel_with_max_num(code, max_num_data, max_num_data["index"],
                                                                          end_index)
                if need_cancel:
                    # 需要撤单
                    # 撤单
@@ -1566,6 +1626,13 @@
                    # 保存大单记录
                    cls.__save_big_num_pos(code, max_num_data["index"])
                    return False, cancel_data
    @classmethod
    def test(cls):
        code = "000036"
        load_l2_data(code, True)
        new_max_info = cls.__compute_max_num(code, 470, 476, None, "09:32:59")
        print(new_max_info)
# 大群撤大单跟踪
@@ -1627,6 +1694,7 @@
        for i in index_set:
            if i <= latest_buy_index:
                total_count += total_datas[i]["re"]
        L2TradeDataProcessor.debug(code, "大群撤大单数量:{}/{}", count, total_count)
        # 大单小于5笔无脑撤
        if total_count <= 5:
            return True
@@ -1967,4 +2035,4 @@
if __name__ == "__main__":
    L2TradeDataProcessor.test1()
    L2TradeDataProcessor.test_can_order()
l2_data_test.py
New file
@@ -0,0 +1,49 @@
import json
import l2_data_manager
class L2DataTest:
    def test_concat_l2_data(self):
        path = "D:/test/2750_1.txt"
        code = "002750"
        data = None
        with open(path, 'r') as f:
            temp = f.readline()
            data = json.loads(temp)
        datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92)
        l2_data_manager.L2DataUtil.get_add_data(code, datas, 0)
        l2_data_manager.local_latest_datas[code] = datas
        l2_data_manager.local_today_datas[code] = datas
        path = "D:/test/2750_2.txt"
        with open(path, 'r') as f:
            temp = f.readline()
            data = json.loads(temp)
            datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92)
            datas = l2_data_manager.L2DataUtil.correct_data(code, datas)
            _start_index = 0
            if l2_data_manager.local_today_datas.get(code) is not None and len(
                    l2_data_manager.local_today_datas[code]) > 0:
                _start_index = l2_data_manager.local_today_datas[code][-1]["index"] + 1
            add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
            l2_data_manager.local_latest_datas[code]=add_datas
            l2_data_manager.local_today_datas[code].extend(add_datas)
        path = "D:/test/2750_3.txt"
        with open(path, 'r') as f:
            temp = f.readline()
            data = json.loads(temp)
            datas = l2_data_manager.L2DataUtil.format_l2_data(data, code, 10.92)
            datas = l2_data_manager.L2DataUtil.correct_data(code, datas)
            _start_index = 0
            if l2_data_manager.local_today_datas.get(code) is not None and len(
                    l2_data_manager.local_today_datas[code]) > 0:
                _start_index = l2_data_manager.local_today_datas[code][-1]["index"] + 1
            add_datas = l2_data_manager.L2DataUtil.get_add_data(code, datas, _start_index)
            l2_data_manager.local_latest_datas[code] = add_datas
            l2_data_manager.local_today_datas[code].extend(add_datas)
            print(l2_data_manager.local_today_datas[code])
if __name__ == '__main__':
    L2DataTest().test_concat_l2_data()
l2_data_util.py
@@ -4,6 +4,7 @@
"""
# 比较时间的大小
import datetime
import json
import time
@@ -44,7 +45,7 @@
        local_today_num_operate_map[code] = {}
    for data in source_datas:
        key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"],data["val"]["price"])
        key = "{}-{}-{}".format(data["val"]["num"], data["val"]["operateType"], data["val"]["price"])
        if local_today_num_operate_map[code].get(key) is None:
            local_today_num_operate_map[code].setdefault(key, [])
        local_today_num_operate_map[code].get(key).append(data)
@@ -86,10 +87,11 @@
def get_buy_data_with_cancel_data(cancel_data, local_today_num_operate_map):
    # 计算时间区间
    min_space, max_space = compute_time_space_as_second(cancel_data["val"]["cancelTime"],
                                                          cancel_data["val"]["cancelTimeUnit"])
                                                        cancel_data["val"]["cancelTimeUnit"])
    max_time = __sub_time(cancel_data["val"]["time"], min_space)
    min_time = __sub_time(cancel_data["val"]["time"], max_space)
    buy_datas = local_today_num_operate_map.get("{}-{}-{}".format(cancel_data["val"]["num"], "0",cancel_data["val"]["price"]))
    buy_datas = local_today_num_operate_map.get(
        "{}-{}-{}".format(cancel_data["val"]["num"], "0", cancel_data["val"]["price"]))
    if buy_datas is None:
        # 无数据
        return None, None
@@ -126,9 +128,13 @@
                # 保存快照
                # logger_l2_big_data.debug("code:{} d1:{}  d2:{}", code, d1[i - 60: i + 30], d2[i - 60: i + 30])
                break
    time_str = datetime.datetime.now().strftime("%H:%M:%S")
    for key in same_time_nums:
        if same_time_nums[key] > 20:
    for time_ in same_time_nums:
        # 只保留最近3s内的大数据
        if abs(get_time_as_seconds(time_str) - get_time_as_seconds(time_)) > 3:
            continue
        if same_time_nums[time_] > 20:
            redis = l2_data_manager._redisManager.getRedis()
            redis.setex("big_data-{}-{}".format(code, int(round(time.time() * 1000))), tool.get_expire(), d1)
            break
l2_trade_factor.py
@@ -3,9 +3,10 @@
"""
# l2交易因子
import big_money_num_manager
import global_util
import limit_up_time_manager
import log
class L2TradeFactorUtil:
@@ -91,6 +92,8 @@
    # 纯万手哥影响值(手数》=9000 OR 金额》=500w)
    @classmethod
    def get_big_money_rate(cls, num):
        if num < 0:
            num = 0
        if num >= 10:
            return 0.5
        else:
@@ -129,11 +132,28 @@
    @classmethod
    def compute_rate_by_code(cls, code):
        zyltgb = global_util.zyltgb_map.get(code)
        total_industry_limit_percent = global_util.industry_hot_num.get(code)
        # 获取行业热度
        industry = global_util.code_industry_map.get(code)
        if industry is None:
            global_util.load_industry()
            industry = global_util.code_industry_map.get(code)
        total_industry_limit_percent = global_util.industry_hot_num.get(industry) if industry is not None else None
        # 获取量
        volumn_day60_max, volumn_yest, volumn_today = global_util.max60_volumn.get(
            code), global_util.yesterday_volumn.get(code), global_util.today_volumn.get(code)
        if volumn_day60_max is None or volumn_yest is None:
            global_util.load_volumn()
            volumn_day60_max, volumn_yest, volumn_today = global_util.max60_volumn.get(
                code), global_util.yesterday_volumn.get(code), global_util.today_volumn.get(code)
        # 首次涨停时间
        limit_up_time = global_util.limit_up_time.get(code)
        if limit_up_time is None:
            limit_up_time = limit_up_time_manager.get_limit_up_time(code)
        big_money_num = global_util.big_money_num.get(code)
        if big_money_num is None:
            big_money_num = big_money_num_manager.get_num(code)
        return cls.compute_rate(zyltgb, total_industry_limit_percent, volumn_day60_max, volumn_yest, volumn_today,
                                limit_up_time, big_money_num)
@@ -146,6 +166,8 @@
            if zyltgb is None:
                print("没有获取到自由流通市值")
                return 10000000
        if code == '002476':
            print("")
        zyltgb = cls.get_base_safe_val(zyltgb)
        rate = cls.compute_rate_by_code(code)
        # print("m值获取:", code, round(zyltgb * rate))
@@ -169,6 +191,7 @@
if __name__ == "__main__":
    print(L2TradeFactorUtil.get_big_money_rate(1))
    print(L2TradeFactorUtil.get_big_money_rate(2))
    print(L2TradeFactorUtil.get_big_money_rate(3))
    L2TradeFactorUtil.compute_m_value("000036")
    # print(L2TradeFactorUtil.get_big_money_rate(1))
    # print(L2TradeFactorUtil.get_big_money_rate(2))
    # print(L2TradeFactorUtil.get_big_money_rate(3))
limit_up_time_manager.py
@@ -2,8 +2,7 @@
"""
涨停时间管理器
"""
import l2_data_util
import redis_manager
import tool
import global_util
@@ -25,7 +24,33 @@
        redis = _redisManager.getRedis()
        time = redis.get("limit_up_time-{}".format(code))
        if time is not None:
            redis = _redisManager.getRedis()
            redis.setex("limit_up_time-{}".format(code), tool.get_expire(), time)
            global_util.limit_up_time[code] = time
    return time
def load_limit_up_time():
    redis = _redisManager.getRedis()
    keys = redis.keys("limit_up_time-*")
    for key in keys:
        code = key.replace("limit_up_time-", "")
        global_util.limit_up_time[code] = redis.get(key)
def sort_code_by_limit_time(codes):
    if not global_util.limit_up_time:
        load_limit_up_time()
    list = []
    for code in codes:
        limit_up_time = global_util.limit_up_time.get(code)
        if limit_up_time is not None:
            list.append((code, limit_up_time))
    new_s = sorted(list, key=lambda e: int(e[1].replace(":", "")))
    dict_ = {}
    for i in range(0, len(new_s)):
        dict_[new_s[i][0]] = i
    return dict_
if __name__ == "__main__":
    print(sort_code_by_limit_time(["002393", "002476", "002614", "002750", "600082", "002751"]))
server.py
@@ -9,6 +9,7 @@
import threading
import time
import alert_util
import code_volumn_manager
import data_process
import gpcode_manager
@@ -16,6 +17,7 @@
import juejin
import l2_data_manager
import ths_industry_util
import ths_util
import tool
import trade_manager
import l2_code_operate
@@ -223,6 +225,10 @@
                    thsDead = data.get("thsDead")
                    logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data)))
                    data_process.saveClientActive(int(client_id), host, thsDead)
                    if ths_util.is_ths_dead(client_id):
                        # TODO 重启同花顺
                        # 报警
                        alert_util.alarm()
                    # print("心跳:", client_id)
                sk.send(return_str.encode())
ths_industry_util.py
@@ -49,6 +49,23 @@
    global_util.industry_hot_num = industry_hot_dict
# 获取相同行业的代码
# 返回:行业,同行业代码
def get_same_industry_codes(code, codes):
    industry = global_util.code_industry_map.get(code)
    if industry is None:
        global_util.load_industry()
        industry = global_util.code_industry_map.get(code)
    if industry is None:
        return None,None
    codes_ = set()
    for code_ in codes:
        if global_util.code_industry_map.get(code_) == industry:
            # 同一行业
            codes_.add(code_)
    return industry, codes_
if __name__ == "__main__":
    _code_map, _industry_map = get_code_industry_maps()
    print(_code_map, _industry_map)
ths_util.py
@@ -8,7 +8,11 @@
import win32con
import win32gui
import redis_manager
import tool
import trade_gui
__redisManager = redis_manager.RedisManager(2)
def __click(hwnd):
@@ -118,7 +122,7 @@
    if len(wins) < 3:
        for i in range(0, 3):
            btn = win32gui.GetDlgItem(wins[len(wins)-1], 0x000005ED)
            btn = win32gui.GetDlgItem(wins[len(wins) - 1], 0x000005ED)
            # 点击事件添加
            __click(btn)
            time.sleep(0.5)
@@ -171,6 +175,27 @@
    raise Exception("专业版下单打开失败")
def set_ths_dead_state(client_id, dead):
    redis = __redisManager.getRedis()
    key = "ths_state_dead_count-{}".format(client_id)
    if not dead:
        redis.setex(key, tool.get_expire(), 0)
    else:
        redis.incrby(key, 1)
        redis.expire(key, tool.get_expire())
# 同花顺是否卡死
def is_ths_dead(client_id):
    key = "ths_state_dead_count-{}".format(client_id)
    redis = __redisManager.getRedis()
    val = redis.get(key)
    if val is not None and int(val) >= 5:
        return True
    else:
        return False
if __name__ == "__main__":
    open_trade_gui()
trade_gui.py
@@ -74,6 +74,12 @@
        cancel_trade_win = cls.getCancelBuyWin()
        if cancel_trade_win <= 0:
            raise Exception("委托撤销窗口未打开")
        else:
            pos = win32gui.GetWindowRect(cancel_trade_win)
            width = pos[2] - pos[0]
            height = pos[3] - pos[1]
            if width <= 0 or height <= 0:
                raise Exception("委托撤销窗口被最小化")
    @classmethod
    def getText(cls, hwnd):
@@ -321,7 +327,10 @@
            logger_trade_gui.info("开始撤单:code-{}".format(code))
            win = self.cancel_win
            if win <= 0:
                raise Exception("无法找到取消委托窗口")
                self.cancel_win = self.getCancelBuyWin()
                win = self.cancel_win
                if win <= 0:
                    raise Exception("无法找到取消委托窗口")
            t = time.time()
            print(t)
            start = int(round(t * 1000))
trade_manager.py
@@ -307,8 +307,8 @@
    keys = redis_l2.keys("*{}*".format(code))
    for k in keys:
        if (k.find("l2-") is None or k.find("l2-") < 0) and (k.find("big_data-") is None or k.find("big_data-") < 0):
            redis_l2.delete(k)
        # if (k.find("l2-") is None or k.find("l2-") < 0) and (k.find("big_data-") is None or k.find("big_data-") < 0):
        redis_l2.delete(k)
    redis_trade = redis_manager.RedisManager(2).getRedis()
    redis_trade.delete("trade-state-{}".format(code))
@@ -316,11 +316,20 @@
    redis_info = redis_manager.RedisManager(0).getRedis()
    keys = redis_info.keys("*{}*".format(code))
    for k in keys:
        if k.find("pre") is None or k.find("pre") < 0:
        if k.find("pre") is None or k.find("pre") or k.find("zyltgb") < 0:
            redis_info.delete(k)
def __clear_big_data():
    redis_l2 = redis_manager.RedisManager(1).getRedis()
    keys = redis_l2.keys("big_data-*")
    for k in keys:
        redis_l2.delete(k)
if __name__ == "__main__":
    # time_str = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    # print(time_str)
    __clear_data("000593")
    # __clear_data("002093")
    __clear_big_data()
    pass