admin
2025-06-10 568c763084b926a6f2d632b7ac65b9ec8280752f
gui_wx.py
@@ -1,13 +1,15 @@
import array
import json
import random
import os
import socket
import sys
import struct
import time
import win32con
import win32gui
import constant
from utils import ths_ocr_util
from utils import ths_ocr_util, invalid_hwnds_manager
import setting
from utils import socket_util
from utils import ths_util
@@ -20,6 +22,8 @@
import wx
import wx.html2 as webview
import concurrent.futures
APP_TITLE = "卖票看板"
APP_ICON = ""
@@ -260,32 +264,13 @@
class mainApp(wx.App):
    def __refresh(self):
        while True:
            if "expire_time" in ocr_settings and ocr_settings["expire_time"] > time.time():
                print("识别")
                try:
                    code = ths_ocr_util.ocr_ths_code()
                    if not code:
                        time.sleep(0.1)
                        continue
                    global_datas["pipe"].send(json.dumps({"code": code, "type": "code"}))
                except Exception as e:
                    logging.exception(e)
                    # print(str(e))
                    pass
            time.sleep(0.005)
    def __init_data(self):
        pass
    def OnInit(self):
        self.SetAppName(APP_TITLE)
        self.__init_data()
        t1 = threading.Thread(target=lambda: self.__refresh())
        # 后台运行
        t1.setDaemon(True)
        t1.start()
        # self.__show_main_frame()
        return True
@@ -349,9 +334,7 @@
if __name__ == "__main__1":
    print(webview.WebView.IsBackendAvailable(webview.WebViewBackendEdge))
global_datas = {
}
set_codes_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
# 接受来自其他进程的数据
@@ -366,13 +349,17 @@
                wx.CallAfter(lambda: JueJinSettingFrame(data["pos"]).Show())
            elif type_ == "manage_ths_pos":
                wx.CallAfter(lambda: THSPositionSettingFrame(data["pos"]).Show())
            elif type_ == "show_float_callback":
                wx.CallAfter(lambda: global_datas["floatFrame"].Show())
            elif type_ == "show_main_callback":
                wx.CallAfter(lambda: global_datas["tickFrame"].Show())
            elif type_ == "set_code":
                code = data["code"]
                set_codes_thread_pool.submit(set_xfdp_codes, code)
            elif type_ == "show_dead_hwnds":
                hwnds = win32_util.text_hwnds_doing
                print("死亡窗口", hwnds)
                if hwnds:
                    for hwnd in hwnds:
                        invalid_hwnds_manager.add_hwnd(hwnd)
            elif type_ == "exit":
                pass
                wx.CallAfter(lambda: sys.exit())
                os._exit(0)
        except Exception as e:
            logging.exception(e)
@@ -380,6 +367,55 @@
from pynput.mouse import Listener
ocr_settings = {}
def set_xfdp_codes(code):
    """
    设置悬浮盯盘中的代码
    :param code:
    :return:
    """
    def focus(hwnd, code_):
        try:
            int_buffer = array.array("L", [0])
            char_buffer = array.array('b',
                                      json.dumps({"type": "set_code",
                                                  "data": {"code": code_, "time": int(time.time() * 1000)}}).encode())
            int_buffer_address = int_buffer.buffer_info()[0]
            char_buffer_address, char_buffer_size = char_buffer.buffer_info()
            copy_struct = struct.pack("PLP", int_buffer_address, char_buffer_size, char_buffer_address)
            win32gui.SendMessage(hwnd, win32con.WM_COPYDATA, None, copy_struct)
            # pythoncom.CoInitialize()
            # shell = client.Dispatch("WScript.Shell")
            # shell.SendKeys('%')
            # # win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
            # win32gui.SetForegroundWindow(hwnd)
            # win32gui.PumpMessages()
            # win32gui.SetFocus(hwnd)
            # lib = CDLL("D:/workspace/GP/dll/Dll/x64/Debug/Dll.dll")
            # win32gui.SendMessage(hwnd, win32con.WM_SETFOCUS, 0, 0)
            # print("执行结果", lib.focus(hwnd))
            # remote_thread = win32process.GetWindowThreadProcessId(hwnd)[0]
            # tid = win32api.GetCurrentThreadId()
            # win32process.AttachThreadInput(tid, remote_thread, True)
            # prev_handle = win32gui.SetFocus(hwnd)
            # print("之前的焦点句柄", prev_handle,hwnd)
        except Exception as e:
            logging.exception(e)
            pass
    # 获取代码属性描述
    # auto_focus = setting.get_float_frame_auto_focus()
    if True:
        # 窗口显示在最前面
        hwnds = win32_util.search_window("悬浮盯盘")
        if hwnds:
            print("句柄", hwnds[0])
            time.sleep(0.1)
            focus(hwnds[0], code)
def on_mouse_click(x, y, button, pressed):
@@ -394,33 +430,37 @@
    if pressed:
        # 点击事件过后1s内可截图识别代码
        ocr_settings["expire_time"] = time.time() + 1
        print(f"鼠标在 ({x}, {y}) 位置按下,按钮: {button}")
        # print(f"鼠标在 ({x}, {y}) 位置按下,按钮: {button}")
    else:
        print(f"鼠标在 ({x}, {y}) 位置释放,按钮: {button}")
        # print(f"鼠标在 ({x}, {y}) 位置释放,按钮: {button}")
        pass
def __rec_code(pipe):
    time.sleep(5)
    while True:
        if "expire_time" in ocr_settings and ocr_settings["expire_time"] > time.time():
            try:
                code = ths_ocr_util.ocr_ths_code()
                if not code:
                    time.sleep(0.1)
                    continue
                pipe.send(json.dumps({"code": code, "type": "code"}))
            except Exception as e:
                # logging.exception(e)
                # print(str(e))
                pass
        time.sleep(0.005)
def run(pipe):
    global_datas["pipe"] = pipe
    global p2
    p1, p2 = multiprocessing.Pipe()
    t1 = threading.Thread(target=lambda: recieve_data(pipe))
    # 后台运行
    t1.setDaemon(True)
    t1.start()
    t2 = threading.Thread(target=lambda: ths_auto_click())
    # 后台运行
    t2.setDaemon(True)
    t2.start()
    t3 = threading.Thread(target=lambda: ths_auto_refresh())
    # 后台运行
    t3.setDaemon(True)
    t3.start()
    threading.Thread(target=lambda: __rec_code(pipe), daemon=True).start()
    threading.Thread(target=lambda: recieve_data(pipe), daemon=True).start()
    threading.Thread(target=lambda: ths_auto_click(), daemon=True).start()
    threading.Thread(target=lambda: ths_auto_refresh(), daemon=True).start()
    app = mainApp()
    global_datas["app"] = app
    app.MainLoop()
    with Listener(on_click=on_mouse_click) as listener: