Administrator
2022-08-25 34491829675033e41715648b1e92f339bf2f35d1
Changes
11个文件已修改
2个文件已添加
725 ■■■■ 已修改文件
data_export_util.py 90 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_process.py 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gpcode_manager.py 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py 183 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_code_operate.py 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
l2_data_manager.py 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
log.py 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 68 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ths_util.py 175 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tool.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_gui.py 81 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_manager.py 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
data_export_util.py
New file
@@ -0,0 +1,90 @@
# 数据导出工具
import os
import time
import xlwt
def export_l2_data(code, datas, dest_dir="D:/export/l2"):
    local_time = time.strftime("%Y%m%dT%H%M%S", time.localtime(time.time()))
    file_name = "{}/{}_{}.xls".format(dest_dir, code, local_time)
    wb = xlwt.Workbook()
    ws = wb.add_sheet('sheet1')
    ws.write(0, 0, '序号')
    ws.write(0, 1, '时间')
    ws.write(0, 2, '买撤间隔')
    ws.write(0, 3, '价格')
    ws.write(0, 4, '手数')
    ws.write(0, 5, '类型')
    ws.write(0, 6, '重复数量')
    index = 0
    for data in datas:
        index += 1
        ws.write(index, 0, data["index"])
        ws.write(index, 1, data["val"]["time"])
        cancel_time = data["val"]["cancelTime"]
        if cancel_time == '0':
            cancel_time = ''
        if len(cancel_time) > 0:
            if int(data["val"]["cancelTimeUnit"]) == 0:
                cancel_time += "s";
            elif int(data["val"]["cancelTimeUnit"]) == 1:
                cancel_time += "m";
            elif int(data["val"]["cancelTimeUnit"]) == 2:
                cancel_time += "h";
        ws.write(index, 2, cancel_time)
        ws.write(index, 3, data["val"]["price"])
        ws.write(index, 4, data["val"]["num"])
        if int(data["val"]["operateType"]) == 0:
            ws.write(index, 5, '买')
        elif int(data["val"]["operateType"]) == 1:
            ws.write(index, 5, '买撤')
        ws.write(index, 6, data["re"])
    wb.save(file_name)
    return file_name
def export_l2_data_origin(code, datas, key, dest_dir="D:/export/l2_origin"):
    file_dir = "{}/{}".format(dest_dir, code)
    if not os.path.exists(file_dir):
        os.makedirs(file_dir)
    file_name = "{}/{}/{}.xls".format(dest_dir, code, key)
    wb = xlwt.Workbook()
    ws = wb.add_sheet('sheet1')
    ws.write(0, 0, '序号')
    ws.write(0, 1, '时间')
    ws.write(0, 2, '买撤间隔')
    ws.write(0, 3, '价格')
    ws.write(0, 4, '手数')
    ws.write(0, 5, '类型')
    index = 0
    for data in datas:
        index += 1
        ws.write(index, 0, index)
        ws.write(index, 1, data["time"])
        cancel_time = data["cancelTime"]
        if cancel_time > 0:
            cancel_time = "{}".format(cancel_time)
            if data["cancelTimeUnit"] == 0:
                cancel_time += "s";
            elif data["cancelTimeUnit"] == 1:
                cancel_time += "m";
            elif data["cancelTimeUnit"] == 2:
                cancel_time += "h";
        ws.write(index, 2, cancel_time)
        ws.write(index, 3, data["price"])
        ws.write(index, 4, data["num"])
        if data["operateType"] == 0:
            ws.write(index, 5, '买')
        elif data["operateType"] == 1:
            ws.write(index, 5, '买撤')
    wb.save(file_name)
    return file_name
if __name__ == "__main__":
    _t = "1661391666562"
    print(_t[-3:])
data_process.py
@@ -14,7 +14,7 @@
# 统计今日买入
import tool
redisManager = redis_manager.RedisManager()
__redisManager = redis_manager.RedisManager(0)
def _mysql_insert_data(day, code, item, conn):
@@ -94,7 +94,7 @@
# 保存L2交易队列
def saveL2TradeQueueData(code, data):
    redis = redisManager.getRedis()
    redis = __redisManager.getRedis()
    data_str = json.dumps(data)
    key = "trade-queue-{}".format(code)
    # 保存5s的数据
@@ -103,7 +103,7 @@
# 获取L2交易队列
def getL2TradeQueueData(code):
    redis = redisManager.getRedis()
    redis = __redisManager.getRedis()
    key = "trade-queue-{}".format(code)
    data_str = redis.get(key)
    if data_str is None or len(data_str) <= 0:
@@ -160,12 +160,12 @@
def saveClientActive(client_id, host):
    if client_id <= 0:
        return
    redis = redisManager.getRedis();
    redis = __redisManager.getRedis();
    redis.setex("client-active-{}".format(client_id), 10, host)
def getValidClients():
    redis = redisManager.getRedis();
    redis = __redisManager.getRedis();
    keys = redis.keys("client-active-*")
    client_ids = []
    for k in keys:
@@ -175,13 +175,13 @@
def getActiveClientIP(client_id):
    redis = redisManager.getRedis();
    redis = __redisManager.getRedis();
    return redis.get("client-active-{}".format(client_id))
# 保存量能
def saveCodeVolumn(datas):
    redis = redisManager.getRedis()
    redis = __redisManager.getRedis()
    for key in datas:
        k = "volumn-max-{}".format(key)
        redis.setex(k, tool.get_expire(), datas[key]["max_volumn"])
gpcode_manager.py
@@ -79,16 +79,14 @@
    result = redis_instance.get("price-pre-{}".format(code))
    if result is not None:
        return float(result)
    # 从网络上拉取收盘价
    juejin.set_price_pre(code)
    result = redis_instance.get("price-pre-{}".format(code))
    if result is not None:
        return float(result)
    return None
# 设置收盘价
def set_price_pre(code, price):
    codes= get_listen_codes()
    if code not in codes:
        return
    redis_instance = __redisManager.getRedis()
    redis_instance.setex("price-pre-{}".format(code), tool.get_expire(), str(price))
gui.py
@@ -10,6 +10,7 @@
import win32gui
import data_export_util
import data_process
import juejin
import multiprocessing
@@ -54,6 +55,7 @@
                self.l2_codes[client_id].append(code)
    def run(self):
        # TODO
        self.jueJinProcess.start()
        self.serverProcess.start()
        L2CodeOperate.get_instance()
@@ -69,13 +71,15 @@
        self.create_gui()
    def _draw_check(self, root):
        def _set_error_color(text, line, content):
            for i in range(0, len(content)):
                text.tag_add('error', "{}.{}".format(line, i))
        def sync_target_codes():
            server.sync_target_codes_to_ths()
            print(result)
        def click():
            text.delete('1.0', END)
@@ -139,22 +143,24 @@
        btn = Button(frame, text="运行环境检测", command=click)
        btn.place(x=5, y=5)
        btn = Button(frame, text="同步THS目标标的", command=sync_target_codes)
        btn.place(x=100, y=5)
        frame.grid(row=1, column=2)
    #绘制开盘前的数据准备情况
    def __draw_pre_data_check(self,frame):
    # 绘制开盘前的数据准备情况
    def __draw_pre_data_check(self, frame):
        def refresh_close_price_data():
            redis = redis_manager.RedisManager(0).getRedis()
            count= len(redis.keys("price-pre-*"))
            count = len(redis.keys("price-pre-*"))
            sv_num.set("获取到收盘价数量:{}".format(count))
        def re_get_close_price():
            juejin.set_price_pres(gpcode_manager.get_gp_list())
            juejin.re_set_price_pres(gpcode_manager.get_gp_list())
        def get_limit_up_codes_win():
            width=400
            height=800
            width = 400
            height = 800
            win = Tk()
            win.title("今日涨停")
            win.resizable(height=False, width=False)
@@ -169,37 +175,37 @@
            cl = Label(win, textvariable=limit_up_datas_time, bg="#DDDDDD", fg="#666666")
            cl.place(x=100, y=10)
            table_height = height-100
            table_width = width -20
            table_height = height - 100
            table_width = width - 20
            table_frame = Frame(win, {"height": table_height, "width": table_width, "bg": "#DDDDDD"})
            table_frame.place(x=10, y=45)
            table_limit_up = tkintertable.TableCanvas(table_frame, data=limit_up_datas, read_only=True, width=table_width,
                                             height=table_height, thefont=('微软雅黑', 10), cellwidth=90,
                                             rowheaderwidth=20)
            table_limit_up = tkintertable.TableCanvas(table_frame, data=limit_up_datas, read_only=True,
                                                      width=table_width,
                                                      height=table_height, thefont=('微软雅黑', 10), cellwidth=90,
                                                      rowheaderwidth=20)
            table_limit_up.show()
            # 获取数据
            time_str,datas = gpcode_manager.get_limit_up_list();
            time_str, datas = gpcode_manager.get_limit_up_list();
            limit_up_datas_time.set(time_str)
            # 删除所有的行
            #table_limit_up.model.deleteRows()
            # table_limit_up.model.deleteRows()
            # 增加数据
            index = 0
            for data in datas:
                data=json.loads(data)
                data = json.loads(data)
                table_limit_up.model.addRow()
                table_limit_up.model.setValueAt(data["code"], index, 0)
                table_limit_up.model.setValueAt(data["time"], index, 1)
                table_limit_up.model.setValueAt(float(data["price"]), index, 2)
                table_limit_up.model.setValueAt("{}{}".format(float( data["limitMoney"]),("亿" if data["limitMoneyUnit"]==0 else "万") ), index, 3)
                table_limit_up.model.setValueAt(
                    "{}{}".format(float(data["limitMoney"]), ("亿" if data["limitMoneyUnit"] == 0 else "万")), index, 3)
                index += 1
            table_limit_up.redraw()
            win.geometry("{}x{}".format(width,height))
            win.geometry("{}x{}".format(width, height))
            win.mainloop()
        btn = Button(frame, text="刷新收盘价", command=refresh_close_price_data)
        btn.place(x=5, y=150)
@@ -213,12 +219,6 @@
        btn = Button(frame, text="今日涨停", command=get_limit_up_codes_win)
        btn.place(x=300, y=150)
    # 绘制l2数据状态
    def __draw_l2_state(self, root):
@@ -254,6 +254,81 @@
                    else:
                        code_labels[client_id][i].configure(foreground="#999999")
        def check(event):
            client = (event.widget["command"])
            msg_list = []
            try:
                result = get_client_env_state(client)
                if result["ths_l2_win"]:
                    msg_list.append(("同花顺L2屏正常!", 0))
                else:
                    msg_list.append(("同花顺L2屏未打开...", 1))
                if result["ths_fp_1"]:
                    msg_list.append(("同花顺副屏1正常!", 0))
                else:
                    msg_list.append(("同花顺副屏1未打开...", 1))
                if result["ths_fp_2"]:
                    msg_list.append(("同花顺副屏2正常!", 0))
                else:
                    msg_list.append(("同花顺副屏2未打开...", 1))
                if result["ths_trade_success"]:
                    msg_list.append(("交易成功页面正常!", 0))
                else:
                    msg_list.append(("交易成功页面未打开...", 1))
                if result["l2_channel_invalid_count"] <= 0:
                    msg_list.append(("L2监控线程正常数:{} 异常数:{}...".format(result["l2_channel_valid_count"],
                                                                     result["l2_channel_invalid_count"]), 0))
                else:
                    msg_list.append(("L2监控线程正常数:{} 异常数:{}!".format(result["l2_channel_valid_count"],
                                                                   result["l2_channel_invalid_count"]), 1))
                if result["limitUp"]:
                    msg_list.append(("涨停监控线程正常!", 0))
                else:
                    msg_list.append(("涨停监控线程异常...", 1))
                if result["tradeSuccess"]:
                    msg_list.append(("当日成交监控线程正常!", 0))
                else:
                    msg_list.append(("当日成交监控线程异常...", 1))
            except Exception as e:
                msg_list.append((str(e), 1))
            def repair():
                try:
                    server.repair_client_env(client)
                    showinfo("提示", "修复完成")
                except Exception as e:
                    showerror("修复出错", str(e))
            # 创建界面
            win = Tk()
            win.title("检测结果")
            win.resizable(height=False, width=False)
            text = Text(win, height=100, undo=True)
            text.place(x=0, y=30)
            btn = Button(win, text="一键修复", command=repair)
            btn.place(x=0, y=0)
            line = 0
            for msg in msg_list:
                line += 1
                if msg[1] == 0:
                    text.insert(END, "{}\n".format(msg[0]))
                else:
                    text.insert(END, "{}\n".format(msg[0]))
                    for i in range(0, len(msg[0])):
                        text.tag_add('error', "{}.{}".format(line, i))
            text.tag_config('error', foreground='red')
            win.geometry("300x300")
            win.mainloop()
        width = 800
        height = 290
        frame = Frame(root, {"height": height, "width": width, "bg": "#DDDDDD"})
@@ -273,11 +348,14 @@
        client_state = {}
        for key in self.l2_codes:
            client_lb = Label(frame, text="设备号:{}".format(key), background="#DDDDDD")
            client_lb.place(x=5, y=40 + l2_client_count * 30)
            client_lb = Label(frame, text="设备:{}".format(key), background="#DDDDDD")
            client_lb.place(x=30, y=40 + l2_client_count * 30)
            btn = Button(frame, text="检测", command=key)
            btn.bind('<Button-1>', check)
            btn.place(x=0, y=35 + l2_client_count * 30)
            client_state_lb = Label(frame, text="(未知)", background="#DDDDDD", font=('微软雅黑', 8))
            client_state_lb.place(x=60, y=40 + l2_client_count * 30)
            client_state_lb.place(x=75, y=40 + l2_client_count * 30)
            client_state[key] = client_state_lb
            code_sv_map[key] = []
@@ -290,7 +368,7 @@
                code_labels[key].append(code_label)
                code_label.place(x=0, y=0)
                cframe.place(x=180 + i * 75, y=40 + l2_client_count * 30)
                cframe.place(x=200 + i * 75, y=40 + l2_client_count * 30)
            l2_client_count += 1
        # 添加更新线程
        t1 = threading.Thread(target=lambda: update_data())
@@ -503,7 +581,8 @@
                return
            if gpcode is None or len(gpcode) < 6:
                return
            l2_code_operate.L2CodeOperate.setGPCode(client,position,gpcode)
            l2_code_operate.L2CodeOperate.setGPCode(client, position, gpcode)
        def resub():
            self.p1.send("resub")
@@ -516,13 +595,40 @@
                print(e)
                showwarning('警告', e)
        def export_l2_data(code):
            if code not in  l2_data_manager.local_today_datas:
                l2_data_manager.load_l2_data(code)
            datas = l2_data_manager.local_today_datas[code]
            try:
                path=data_export_util.export_l2_data(code,datas)
                showinfo("提示","导出成功,路径为:"+path)
            except Exception as e1:
                showerror("导出失败",str(e1))
        def export_l2_data_origin(code):
            redis = redis_manager.RedisManager(1).getRedis()
            keys = redis.keys("big_data-{}-*".format(code))
            try:
                for k in keys:
                    datas=redis.get(k)
                    datas=json.loads(datas)
                    datas=datas["data"]["data"]
                    _t = k.split("-")[2]
                    k = time.strftime("%Y_%m_%d_%H_%M_%S_",time.localtime(float(_t)/1000))
                    k = "{}{}".format(k,_t[-3:])
                    data_export_util.export_l2_data_origin(code, datas,k)
            except Exception as e1:
                    showerror("导出失败", str(e1))
            showinfo("提示", "导出完成")
        frame = Frame(root, {"height": 280, "width": 300, "bg": "#DDDDDD"})
        frame.grid(row=2, column=2, rowspan=2, pady=5)
        btntext = StringVar()
        el = Label(frame, text="测试区域", bg="#DDDDDD",fg="#A0A000")
        el = Label(frame, text="测试区域", bg="#DDDDDD", fg="#A0A000")
        el.place(x=240, y=10)
        el = Label(frame, text="客户端ID:", bg="#DDDDDD")
        el.place(x=10, y=10)
@@ -542,6 +648,15 @@
        btn = Button(frame, text="设置代码", command=lambda: setGPCode(ep_client.get(), ep.get(), code.get()), )
        btn.place(x=10, y=100)
        btn = Button(frame, text="修复L2数据", command=lambda: L2CodeOperate.get_instance().repaire_l2_data(code.get()))
        btn.place(x=100, y=100)
        btn = Button(frame, text="导出L2数据", command=lambda: export_l2_data(code.get()))
        btn.place(x=200, y=100)
        btn = Button(frame, text="导出L2原始数据", command=lambda: export_l2_data_origin(code.get()))
        btn.place(x=260, y=100)
        # 交易按钮
        btn = Button(frame, textvariable=btntext, command=startJueJinGui)
        btn.place(x=10, y=150)
juejin.py
@@ -44,9 +44,9 @@
    # 多个时间点获取收盘价
    gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:30:00')
    gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='08:50:00')
    gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='09:15:00')
    gmapi.schedule(schedule_func=get_latest_info, date_rule='1d', time_rule='09:28:00')
    gmapi.schedule(schedule_func=get_current_info, date_rule='1d', time_rule='09:25:00')
    gmapi.schedule(schedule_func=get_current_info, date_rule='1d', time_rule='09:29:00')
    gmapi.schedule(schedule_func=get_current_info, date_rule='1d', time_rule='09:29:35')
    re_subscribe_tick()
    # re_subscribe_bar()
@@ -63,15 +63,16 @@
    for c in clients:
        for i in range(0, 8):
            gpcode_manager.init_listen_code_by_pos(int(c), i)
    data = gpcode_manager.get_gp_list();
    result = JueJinManager.get_gp_latest_info(data);
    codes = gpcode_manager.get_gp_list();
    result = JueJinManager.get_gp_latest_info(codes);
    for item in result:
        sec_level = item['sec_level']
        symbol = item['symbol']
        symbol = symbol.split(".")[1]
        pre_close = tool.to_price(decimal.Decimal(str(item['pre_close'])))
        if sec_level == 1:
            gpcode_manager.set_price_pre(symbol, pre_close)
            if symbol in codes:
                gpcode_manager.set_price_pre(symbol, pre_close)
        else:
            gpcode_manager.rm_gp(symbol)
@@ -90,12 +91,12 @@
# 设置收盘价
def set_price_pre(code):
def re_set_price_pre(code):
    codes = [code]
    set_price_pres(codes)
    re_set_price_pres(codes)
def set_price_pres(codes):
def re_set_price_pres(codes):
    result = JueJinManager.get_gp_latest_info(codes);
    for item in result:
        symbol = item['symbol']
@@ -175,9 +176,20 @@
def recieve_msg(pipe):
    while True:
        value = pipe.recv()
        print(value)
        if value == 'resub':
        print("跨进程通信:",value)
        jsonValue= json.loads(value)
        action=jsonValue["type"]
        if action == 'resub':
            re_subscribe_tick()
        elif action == 'accpt_price':
            try:
                datas=jsonValue["data"]
                for data in datas:
                    accpt_price(data["code"],float(data["price"]))
            except Exception as e:
                print(str(e))
class JueJinManager:
l2_code_operate.py
@@ -1,5 +1,6 @@
# 操作l2的代码
import json
import logging
import os
import queue
import threading
@@ -40,8 +41,10 @@
            result = server.send_msg(client_id, data)
            logger_code_operate.info(
                "setGPCode结束({}):clientid-{}  position-{} code-{}".format(result, client_id, position, gpcode))
            if result.__contains__('OK'):
            jsonData = json.loads(result)
            if jsonData["code"] == 0:
                gpcode_manager.set_listen_code_by_pos(client_id, position, gpcode)
                L2CodeOperate.set_operate_code_state(client_id, position, 1)
        except Exception as e:
            logger_code_operate.error("setGPCode出错:{}", str(e))
@@ -97,19 +100,20 @@
                        if code_ == "" or code_ is None:
                            continue
                        logger_code_operate.info("修复代码一致:{}-{}-{}", client, pos, code)
                        logger_code_operate.info("修复代码一致:{}-{}-{}", client_id, pos, code)
                        L2CodeOperate.setGPCode(client_id, pos, code)
                    # 修复l2的数据错误
                    elif type == 3:
                        data = data["data"]
                        client = data["client"]
                        server.send_msg(client, json.dumps(data))
                        data=data["data"]
                        result = server.send_msg(client, data)
                        print("L2數據修復結果:",result)
                else:
                    time.sleep(1)
            except:
                print("发送操作异常")
            except Exception as e:
                logging.exception(e)
                print("发送操作异常:",str(e))
    def add_operate(self, type, code):
        redis = self.redis_manager_.getRedis()
@@ -117,6 +121,11 @@
        redis.rpush("code_operate_queue", json.dumps({"type": type, "code": code}))
    def repaire_operate(self, client, pos, code):
        # 如果本来该位置代码为空则不用修复
        code_ = gpcode_manager.get_listen_code_by_pos(client, pos)
        if code_ == "" or code_ is None:
            return
        logger_code_operate.info("客户端位置代码修复:client-{},pos-{},code-{}", client, pos, code)
        redis = self.redis_manager_.getRedis()
        redis.rpush("code_operate_queue", json.dumps({"type": 2, "client": client, "pos": pos, "code": code}))
@@ -133,7 +142,7 @@
                    "data": {"index": int(pos), "code": code, "min_price": float(min_price),
                             "max_price": float(max_price)}}
        redis = self.redis_manager_.getRedis()
        redis.rpush("code_operate_queue", json.dumps({"type": 3, "client": client_id, "data": data}))
        redis.rpush("code_operate_queue", json.dumps({"type": 3,"code":code, "client": client_id, "data": data}))
    # 移除监控
    def remove_l2_listen(self, code):
@@ -142,8 +151,9 @@
            self.add_operate(0, code)
    # 设置代码操作状态,服务器保存的代码是否与实际设置的代码保持一致
    def set_operate_code_state(self, client_id, channel, state):
        self.getRedis().setex("code-operate_state-{}-{}".format(client_id, channel), tool.get_expire(), state)
    @classmethod
    def set_operate_code_state(cls, client_id, channel, state):
        cls.getRedis().setex("code-operate_state-{}-{}".format(client_id, channel), tool.get_expire(), state)
    def get_operate_code_state(self, client_id, channel):
        value = self.getRedis().get("code-operate_state-{}-{}".format(client_id, channel))
l2_data_manager.py
@@ -203,9 +203,15 @@
    # 获取涨停价
    limit_up_price = gpcode_manager.get_limit_up_price(code)
    same_time_num = {}
    for item in data:
        # 解析数据
        time = item["time"]
        if time in same_time_num:
            same_time_num[time] = same_time_num[time] + 1
        else:
            same_time_num[time] = 1
        price = float(item["price"])
        num = item["num"]
        limitPrice = item["limitPrice"]
@@ -225,6 +231,11 @@
            # 数据重复次数默认为1
            datas.append({"key": key, "val": item, "re": 1})
            dataIndexs.setdefault(key, len(datas) - 1)
    for key in same_time_num:
        if same_time_num[key] > 50:
            # TODO 保存数据
            redis = _redisManager.getRedis()
            redis.set("big_data-{}-{}".format(code, int(round(t.time() * 1000))), str)
    return day, client, channel, code, datas
@@ -423,7 +434,7 @@
                                if limit_up_price is not None:
                                    if latest_num * limit_up_price * 100 > 1000 * 10000:
                                        # 大于1000w就买
                                        #print("执行撤销")
                                        # print("执行撤销")
                                        logger_l2_trade.info(
                                            "执行撤销:{} - {}".format(code, json.dumps(add_datas[-1])))
                                        try:
@@ -526,7 +537,7 @@
            #         if datas[j]["val"]["limitPrice"] == 1 and datas[j]["val"]["operateType"] == 0:
            #             index_3 = j
            if index_1 - index_0 == 1 and index_2 - index_1 == 1:  # and index_3 - index_2 == 1
                logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i,datas[i]))
                logger_l2_trade.info("找到物理连续涨停买 {},{},{}".format(code, i, datas[i]))
                return i
        # 同1s内有不连续的4个涨停买(如果遇买撤就重新计算,中间可间隔不涨停买)标记计算起始点
        if __is_limit_up_price_buy(_val):
@@ -548,7 +559,7 @@
            _limit_up_count_1s_start_index = -1
        if _limit_up_count_1s >= 4 and _limit_up_count_1s_start_index > -1:
            logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index,datas[i]))
            logger_l2_trade.info("找到同一秒连续涨停买 {},{},{}".format(code, _limit_up_count_1s_start_index, datas[i]))
            return _limit_up_count_1s_start_index
    return None
@@ -590,7 +601,7 @@
                        index_2 = j
                        break
            if index_1 - index_0 == 1 and index_2 - index_1 == 1:
                logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i,json.dumps(datas[i])))
                logger_l2_trade.info("连续3个涨停买撤 {},{},{}".format(code, i, json.dumps(datas[i])))
                return i
    return None
@@ -675,6 +686,7 @@
    key = "l2-fixed-codes"
    redis = _redisManager.getRedis()
    redis.sadd(key, code)
    redis.expire(key, tool.get_expire())
# 是否在l2固定监控代码中
@@ -685,6 +697,7 @@
if __name__ == "__main__":
    print("big_data-{}-{}".format("123", int(round(t.time() * 1000))))
    pass
    # __set_buy_compute_start_data("000000", 100, 1)
    # __set_buy_compute_start_data("000000", 100)
log.py
@@ -10,6 +10,8 @@
           rotation="00:00", compression="zip")
logger.add(get_path("trade", "trade"), filter=lambda record: record["extra"].get("name") == "trade", rotation="00:00",
           compression="zip")
logger.add(get_path("trade", "delegate"), filter=lambda record: record["extra"].get("name") == "delegate", rotation="00:00",
           compression="zip")
logger.add(get_path("l2", "l2_error"), filter=lambda record: record["extra"].get("name") == "l2_error",
           rotation="00:00", compression="zip")
@@ -32,6 +34,7 @@
logger_trade_gui = logger.bind(name="trade_gui")
logger_trade = logger.bind(name="trade")
logger_trade_delegate = logger.bind(name="delegate")
logger_l2_error = logger.bind(name="l2_error")
logger_l2_process = logger.bind(name="l2_process")
logger_l2_trade = logger.bind(name="l2_trade")
server.py
@@ -7,11 +7,13 @@
import data_process
import gpcode_manager
import authority
import juejin
import l2_data_manager
import tool
import trade_manager
import l2_code_operate
from log import logger_l2_error, logger_l2_process, logger_device
from log import logger_l2_error, logger_l2_process, logger_device, logger_trade_delegate
class MyTCPServer(socketserver.TCPServer):
@@ -28,6 +30,7 @@
    reset_code_dict = {}
    set_operate_code_state_dict = {}
    l2_data_error_dict = {}
    last_trade_delegate_data = None
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -74,11 +77,13 @@
                                # 间隔2s
                                if key not in self.reset_code_dict or round(
                                        time.time() * 1000) - self.reset_code_dict[key] > 2000:
                                    self.l2CodeOperate.set_operate_code_state(client, channel, 0)
                                    self.reset_code_dict[key] = round(time.time() * 1000)
                                    if code_ is None:
                                        code_ = ""
                                    self.l2CodeOperate.repaire_operate(int(client), int(channel), code_)
                                    if tool.is_trade_time():
                                        self.l2CodeOperate.repaire_operate(int(client), int(channel), code_)
                            else:
                                key = "{}-{}".format(client, channel)
                                if key not in self.set_operate_code_state_dict or round(
@@ -132,7 +137,8 @@
                    gpcode_manager.set_gp_list(code_list)
                    # 重新订阅
                    self.server.pipe.send("resub")
                    self.server.pipe.send(json.dumps({"type": "resub"}))
                    sync_target_codes_to_ths()
                elif type == 2:
                    # 涨停代码
                    codeList = data_process.parseGPCode(_str)
@@ -149,6 +155,10 @@
                elif type == 5:
                    # 交易委托信息
                    dataList = data_process.parseList(_str)
                    if self.last_trade_delegate_data != _str:
                        self.last_trade_delegate_data = _str
                        # 保存委托信息
                        logger_trade_delegate.info(dataList)
                    try:
                        trade_manager.process_trade_delegate_data(dataList)
                    except Exception as e:
@@ -175,6 +185,11 @@
                            {"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}})
                    except Exception as e:
                        return_str = data_process.toJson({"code": 1, "msg": str(e)})
                elif type == 40:
                    data = data_process.parse(_str)["data"]
                    for item in data:
                        juejin.accpt_price(item["code"], float(item["price"]))
                elif type == 30:
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
@@ -195,6 +210,7 @@
def send_msg(client_id, data):
    _ip = data_process.getActiveClientIP(client_id)
    print("ip", client_id, _ip)
    if _ip is None or len(_ip) <= 0:
        raise Exception("客户端IP为空")
    socketClient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -223,16 +239,38 @@
        time.sleep(5)
# 获取采集客户端的状态
def get_client_env_state(client):
    result = send_msg(client, {"action": "getEnvState"})
    result = json.loads(result)
    if result["code"] == 0:
        return json.loads(result["data"])
    else:
        raise Exception(result["msg"])
# 修复采集客户端
def repair_client_env(client):
    result = send_msg(client, {"action": "repairEnv"})
    result = json.loads(result)
    if result["code"] != 0:
        raise Exception(result["msg"])
# 同步目标标的到同花顺
def sync_target_codes_to_ths():
    codes = gpcode_manager.get_gp_list()
    code_list = []
    for code in codes:
        code_list.append(code)
    client = authority._get_client_ids_by_rule("client-industry")
    result = send_msg(client[0], {"action": "syncTargetCodes", "data": code_list})
    return result
if __name__ == "__main__":
    code = "000503"
    client_id, pos = gpcode_manager.get_listen_code_pos(code)
    if client_id is not None and pos is not None:
        # 获取涨停价与跌停价
        max_price = gpcode_manager.get_limit_up_price(code)
        min_price = gpcode_manager.get_limit_down_price(code)
        data = {"action": "repairL2Data",
                "data": {"index": int(pos), "code": code, "min_price": float(min_price),
                         "max_price": float(max_price)}}
        send_msg(client_id, data)
        # 有值
        pass
    try:
        result = get_client_env_state(3)
        print(result)
    except Exception as e:
        print(str(e))
ths_util.py
New file
@@ -0,0 +1,175 @@
# 同花顺工具
# 打开交易环境
import time
import win32api
import win32con
import win32gui
import trade_gui
def __click(hwnd):
    win32gui.SendMessage(hwnd, win32con.WM_LBUTTONDOWN, 0, 0);
    win32gui.SendMessage(hwnd, win32con.WM_LBUTTONUP, 0, 0);
def __input_number(hwnd, num_str):
    # delete
    for c in num_str:
        code = -1
        lp = 0
        if c == '.':
            code = 110
            win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0);
            win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0);
            continue
        elif c == '0':
            code = 48
        elif c == '1':
            code = 49
        elif c == '2':
            code = 50
        elif c == '3':
            code = 51
        elif c == '4':
            code = 52
        elif c == '5':
            code = 53
        elif c == '6':
            code = 54
        elif c == '7':
            code = 55
        elif c == '8':
            code = 56
        elif c == '9':
            code = 57
        win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0);
        win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0);
def show_window(hwnd):
    win32gui.ShowWindow(hwnd, win32con.SW_SHOWNORMAL)
def open_trade_gui():
    main_win = __open_main_win()
    show_window(main_win)
    win_delegate = __open_trade_delegate_win(main_win)
    # 最大化委托
    win_temp = win32gui.FindWindowEx(win_delegate, None, None, "HexinScrollWnd")
    left, top, right, bottom = win32gui.GetWindowRect(win_temp)
    height_1 = bottom - top
    win_temp = win32gui.FindWindowEx(win_delegate, win_temp, None, "HexinScrollWnd")
    left, top, right, bottom = win32gui.GetWindowRect(win_temp)
    height_2 = bottom - top
    if height_1 > 10 and height_2 > 10:
        maxwin = win32gui.GetDlgItem(win_delegate, 0x00002EF1)
        # 点击按钮
        __click(maxwin)
    # 打开闪电买入
    __open_buy_win(main_win)
def __open_main_win():
    # 打开同花顺主页
    hWndList = []
    win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
    for hwnd in hWndList:
        _title = win32gui.GetWindowText(hwnd)
        if _title.__contains__("同花顺("):
            if not win32gui.IsWindowVisible(hwnd):
                show_window(hwnd)
                win32gui.SetForegroundWindow(hwnd)
                win32gui.SetFocus(hwnd)
            return hwnd
    raise Exception("尚未打开同花顺")
# 打开闪电买入窗口
def __open_buy_win(hwnd):
    show_window(hwnd)
    wins = trade_gui.THSGuiTrade.get_buy_wins()
    if len(wins) < 1:
        try:
            win32gui.SetForegroundWindow(hwnd)
        except:
            pass
        left, top, right, bottom = win32gui.GetWindowRect(hwnd)
        x = int(round((left + right) / 2, 0))
        y = int(round(top + 2, 0))
        win32api.mouse_event(win32con.MOUSEEVENTF_LEFTDOWN | win32con.MOUSEEVENTF_LEFTUP, x, y, 0, 0)
        # 输入21
        time.sleep(0.1)
        __input_number(hwnd, "21")
        # 输入enter
        win32api.keybd_event(win32con.VK_RETURN, 0, 0, 0)
        time.sleep(1)
        wins = trade_gui.THSGuiTrade.get_buy_wins()
    if len(wins) < 1:
        raise Exception("打开闪电买入失败")
    if len(wins) < 3:
        for i in range(0, 3):
            btn = win32gui.GetDlgItem(wins[len(wins)-1], 0x000005ED)
            # 点击事件添加
            __click(btn)
            time.sleep(0.5)
            wins = trade_gui.THSGuiTrade.get_buy_wins()
    pass
# 获取专业版下单句柄
def __get__trade_delegate_win_profession():
    hWndList = []
    win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
    for hwnd in hWndList:
        _title = win32gui.GetWindowText(hwnd)
        if _title.__contains__("专业版下单"):
            return hwnd
    return None
# 打开交易委托
def __open_trade_delegate_win(hwnd1):
    win32gui.SetForegroundWindow(hwnd1)
    # 按下F12
    win32api.keybd_event(win32con.VK_F12, 0, 0, 0)
    # 检测交易窗口是否打开
    for i in range(0, 5):
        hWndList = []
        win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
        for hwnd in hWndList:
            _title = win32gui.GetWindowText(hwnd)
            if _title.__contains__("专业版下单"):
                if not win32gui.IsWindowVisible(hwnd):
                    win32gui.SetForegroundWindow(hwnd)
                return hwnd
            elif _title.__contains__("网上股票交易系统"):
                show_window(hwnd)
                time.sleep(0.2)
                hwnd = win32gui.FindWindowEx(hwnd, None, "ToolbarWindow32", None)
                hwnd = win32gui.FindWindowEx(hwnd, None, "#32770", None)
                hwnd = win32gui.FindWindowEx(hwnd, None, "Button", "专业")
                # 点击按钮
                __click(hwnd)
                win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
                for j in range(0, 5):
                    win = __get__trade_delegate_win_profession()
                    if win is not None:
                        return win
                    time.sleep(1)
        time.sleep(1)
    raise Exception("专业版下单打开失败")
if __name__ == "__main__":
    open_trade_gui()
    pass
tool.py
@@ -25,6 +25,19 @@
    return _decimal.quantize(decimal.Decimal("0.00"), decimal.ROUND_HALF_UP)
# 是否为交易时间
def is_trade_time():
    relative_timestamp = t.time() % (24 * 60 * 60) + 8 * 60 * 60
    start1 = 60 * 60 * 9 + 24 * 60;
    end1 = 60 * 60 * 11 + 35 * 60;
    start2 = 60 * 60 * 12 + 50 * 60;
    end2 = 60 * 60 * 15 + 5 * 60;
    if start1 < relative_timestamp < end1 or start2 < relative_timestamp < end2:
        return True
    else:
        return False
if __name__=="__main__":
     d1 = decimal.Decimal("0.12")
     d2 = decimal.Decimal("0.12")
trade_gui.py
@@ -6,7 +6,13 @@
import win32api
import win32con
from log import *
from threading import Thread
def async_call(fn):
    def wrapper(*args, **kwargs):
        Thread(target=fn, args=args, kwargs=kwargs).start()
    return wrapper
class THSGuiTrade(object):
    __instance = None
@@ -29,6 +35,12 @@
    def refresh_hwnds(self):
        self.cancel_win = self.__instance.getCancelBuyWin()
        self.buy_win_list = self.get_buy_wins();
    # 打开交易环境
    def open_trade_env(self):
        # 打开交易界面(F12)
        pass
    def get_available_buy_win(self):
        self.buy_lock.acquire()
@@ -234,24 +246,43 @@
            win32gui.PostMessage(win, win32con.WM_KEYDOWN, 66, 0);
            logger_trade_gui.info("执行买入结束:code-{} 耗时:{}".format(code, int(round(time.time() * 1000)) - start))
            self.close_delegate_success_dialog()
            # 循环读取下单结果,最多等待10s
            for i in range(0, 50):
                hwnd = self.getTradeResultWin()
                if hwnd > 0:
                    time.sleep(0.2)
                    code_str = self.getTradeSuccessCode(hwnd)
                    t = time.time()
                    print(t)
                    end = int(round(t * 1000))
                    print("买入耗时:", end - start)
                    logger_trade_gui.info("获取委托单号:code-{} 单号-{} 整体耗时:{}".format(code, code_str, end - start))
                    # 关闭交易结果弹框
                    self.closeTradeResultDialog(hwnd)
                    return code, code_str
                time.sleep(0.02)
            raise Exception("获取交易结果出错")
            # for i in range(0, 50):
            #     hwnd = self.getTradeResultWin()
            #     if hwnd > 0:
            #         time.sleep(0.2)
            #         code_str = self.getTradeSuccessCode(hwnd)
            #         t = time.time()
            #         print(t)
            #         end = int(round(t * 1000))
            #         print("买入耗时:", end - start)
            #         logger_trade_gui.info("获取委托单号:code-{} 单号-{} 整体耗时:{}".format(code, code_str, end - start))
            #         # 关闭交易结果弹框
            #         self.closeTradeResultDialog(hwnd)
            #         return code, code_str
            #     time.sleep(0.02)
            return code,""
            # raise Exception("获取交易结果出错")
        finally:
            self.using_buy_wins.discard(win)
    @async_call
    def close_delegate_success_dialog(self):
        for i in range(0, 50):
            hwnd = self.getTradeResultWin()
            if hwnd > 0:
                time.sleep(0.2)
                code_str = self.getTradeSuccessCode(hwnd)
                t = time.time()
                print(t)
                end = int(round(t * 1000))
                # logger_trade_gui.info("获取委托单号:code-{} 单号-{} 整体耗时:{}".format(code, code_str, end - start))
                # 关闭交易结果弹框
                self.closeTradeResultDialog(hwnd)
                break
                # return code, code_str
            time.sleep(0.02)
    # 撤销确认框
    def getCancelBuySureWin(self):
@@ -275,8 +306,8 @@
    # 撤买
    def cancel_buy(self, code):
        self.buy_cancel_lock.acquire()
        global code_input
        try:
            logger_trade_gui.info("开始撤单:code-{}".format(code))
            win = self.cancel_win
@@ -289,6 +320,11 @@
            # 输入框控件ID 0x000003E9
            code_input = win32gui.GetDlgItem(win, 0x00000996)
            code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
            # 刷新句柄
            if code_input <= 0:
                self.refresh_hwnds()
                code_input = win32gui.GetDlgItem(win, 0x00000996)
                code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
            code_result = "-"
            retry_count = 0
@@ -310,7 +346,10 @@
            # 撤单快捷键X
            time.sleep(0.01)
            win32gui.PostMessage(win, win32con.WM_KEYDOWN, 0x00000058, 0);
            win32gui.PostMessage(win, win32con.WM_KEYDOWN, 0x00000058, 0x002D001);
            win32gui.PostMessage(win, win32con.WM_CHAR, 0x00000078, 0x002D001);
            win32gui.PostMessage(win, win32con.WM_KEYUP, 0x00000058, 0x002D001);
            #win32gui.PostMessage(win, win32con.WM_KEYUP, 0x00000058, 0);
            t = time.time()
            print(t)
            end = int(round(t * 1000))
@@ -325,16 +364,16 @@
if __name__ == '__main__':
    try:
        THSGuiTrade.checkEnv();
        print("环境正常")
        # THSGuiTrade.checkEnv();
        # print("环境正常")
        trade = THSGuiTrade();
        print(id(trade))
        # win = trade.get_available_buy_win()
        # if win < 1:
        #     raise Exception("无可用的交易窗口")
        result = trade.buy("002564", "7.26")
        # result = trade.buy("002564", "7.26")
        # # print("交易成功")
        # time.sleep(0.2)
        trade.cancel_buy("002564")
        trade.cancel_buy("000716")
    except Exception as e:
        print(e)
trade_manager.py
@@ -79,6 +79,10 @@
    time_str = datetime.datetime.now().strftime("%H:%M:%S")
    redis.setex("trade-success-latest-time", tool.get_expire(), time_str)
    for data in datas:
        _time = data["time"]
        # 过滤错误数据
        if _time == "00:00:00":
            continue
        data["_id"] = data["trade_num"]
        data["day"] = day
        data["create_time"] = int(round(t.time() * 1000))
@@ -224,6 +228,9 @@
        return None
    for data in datas:
        code = data["code"]
        _time = data["time"]
        if _time == "00:00:00":
            continue
        if code is not None:
            set_trade_state(code, TRADE_STATE_BUY_SUCCESS)
            forbidden_trade(code)