| | |
| | | import array |
| | | import json |
| | | import random |
| | | import os |
| | | import socket |
| | | import sys |
| | | import struct |
| | | import time |
| | | |
| | | import win32con |
| | | import win32gui |
| | | |
| | | import constant |
| | | from utils import ths_ocr_util |
| | | from utils import ths_ocr_util, invalid_hwnds_manager |
| | | import setting |
| | | from utils import socket_util |
| | | from utils import ths_util |
| | |
| | | |
| | | import wx |
| | | import wx.html2 as webview |
| | | |
| | | import concurrent.futures |
| | | |
| | | APP_TITLE = "卖票看板" |
| | | APP_ICON = "" |
| | |
| | | |
| | | class mainApp(wx.App): |
| | | |
| | | def __refresh(self): |
| | | while True: |
| | | if "expire_time" in ocr_settings and ocr_settings["expire_time"] > time.time(): |
| | | print("识别") |
| | | try: |
| | | code = ths_ocr_util.ocr_ths_code() |
| | | if not code: |
| | | time.sleep(0.1) |
| | | continue |
| | | global_datas["pipe"].send(json.dumps({"code": code, "type": "code"})) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | # print(str(e)) |
| | | pass |
| | | time.sleep(0.005) |
| | | |
| | | def __init_data(self): |
| | | pass |
| | | |
| | | def OnInit(self): |
| | | self.SetAppName(APP_TITLE) |
| | | self.__init_data() |
| | | t1 = threading.Thread(target=lambda: self.__refresh()) |
| | | # 后台运行 |
| | | t1.setDaemon(True) |
| | | t1.start() |
| | | |
| | | # self.__show_main_frame() |
| | | |
| | | return True |
| | |
| | | if __name__ == "__main__1": |
| | | print(webview.WebView.IsBackendAvailable(webview.WebViewBackendEdge)) |
| | | |
| | | global_datas = { |
| | | |
| | | } |
| | | set_codes_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) |
| | | |
| | | |
| | | # 接受来自其他进程的数据 |
| | |
| | | wx.CallAfter(lambda: JueJinSettingFrame(data["pos"]).Show()) |
| | | elif type_ == "manage_ths_pos": |
| | | wx.CallAfter(lambda: THSPositionSettingFrame(data["pos"]).Show()) |
| | | elif type_ == "show_float_callback": |
| | | wx.CallAfter(lambda: global_datas["floatFrame"].Show()) |
| | | elif type_ == "show_main_callback": |
| | | wx.CallAfter(lambda: global_datas["tickFrame"].Show()) |
| | | elif type_ == "set_code": |
| | | code = data["code"] |
| | | set_codes_thread_pool.submit(set_xfdp_codes, code) |
| | | elif type_ == "show_dead_hwnds": |
| | | hwnds = win32_util.text_hwnds_doing |
| | | print("死亡窗口", hwnds) |
| | | if hwnds: |
| | | for hwnd in hwnds: |
| | | invalid_hwnds_manager.add_hwnd(hwnd) |
| | | elif type_ == "exit": |
| | | pass |
| | | wx.CallAfter(lambda: sys.exit()) |
| | | os._exit(0) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | |
| | |
| | | from pynput.mouse import Listener |
| | | |
| | | ocr_settings = {} |
| | | |
| | | |
| | | def set_xfdp_codes(code): |
| | | """ |
| | | 设置悬浮盯盘中的代码 |
| | | :param code: |
| | | :return: |
| | | """ |
| | | |
| | | def focus(hwnd, code_): |
| | | try: |
| | | int_buffer = array.array("L", [0]) |
| | | char_buffer = array.array('b', |
| | | json.dumps({"type": "set_code", |
| | | "data": {"code": code_, "time": int(time.time() * 1000)}}).encode()) |
| | | int_buffer_address = int_buffer.buffer_info()[0] |
| | | char_buffer_address, char_buffer_size = char_buffer.buffer_info() |
| | | copy_struct = struct.pack("PLP", int_buffer_address, char_buffer_size, char_buffer_address) |
| | | win32gui.SendMessage(hwnd, win32con.WM_COPYDATA, None, copy_struct) |
| | | |
| | | # pythoncom.CoInitialize() |
| | | # shell = client.Dispatch("WScript.Shell") |
| | | # shell.SendKeys('%') |
| | | # # win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) |
| | | # win32gui.SetForegroundWindow(hwnd) |
| | | # win32gui.PumpMessages() |
| | | # win32gui.SetFocus(hwnd) |
| | | # lib = CDLL("D:/workspace/GP/dll/Dll/x64/Debug/Dll.dll") |
| | | # win32gui.SendMessage(hwnd, win32con.WM_SETFOCUS, 0, 0) |
| | | # print("执行结果", lib.focus(hwnd)) |
| | | |
| | | # remote_thread = win32process.GetWindowThreadProcessId(hwnd)[0] |
| | | # tid = win32api.GetCurrentThreadId() |
| | | # win32process.AttachThreadInput(tid, remote_thread, True) |
| | | # prev_handle = win32gui.SetFocus(hwnd) |
| | | # print("之前的焦点句柄", prev_handle,hwnd) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | pass |
| | | |
| | | # 获取代码属性描述 |
| | | # auto_focus = setting.get_float_frame_auto_focus() |
| | | if True: |
| | | # 窗口显示在最前面 |
| | | hwnds = win32_util.search_window("悬浮盯盘") |
| | | if hwnds: |
| | | print("句柄", hwnds[0]) |
| | | time.sleep(0.1) |
| | | focus(hwnds[0], code) |
| | | |
| | | |
| | | def on_mouse_click(x, y, button, pressed): |
| | |
| | | if pressed: |
| | | # 点击事件过后1s内可截图识别代码 |
| | | ocr_settings["expire_time"] = time.time() + 1 |
| | | |
| | | print(f"鼠标在 ({x}, {y}) 位置按下,按钮: {button}") |
| | | # print(f"鼠标在 ({x}, {y}) 位置按下,按钮: {button}") |
| | | else: |
| | | print(f"鼠标在 ({x}, {y}) 位置释放,按钮: {button}") |
| | | # print(f"鼠标在 ({x}, {y}) 位置释放,按钮: {button}") |
| | | pass |
| | | |
| | | |
| | | def __rec_code(pipe): |
| | | time.sleep(5) |
| | | while True: |
| | | if "expire_time" in ocr_settings and ocr_settings["expire_time"] > time.time(): |
| | | try: |
| | | code = ths_ocr_util.ocr_ths_code() |
| | | if not code: |
| | | time.sleep(0.1) |
| | | continue |
| | | pipe.send(json.dumps({"code": code, "type": "code"})) |
| | | except Exception as e: |
| | | # logging.exception(e) |
| | | # print(str(e)) |
| | | pass |
| | | time.sleep(0.005) |
| | | |
| | | |
| | | def run(pipe): |
| | | global_datas["pipe"] = pipe |
| | | global p2 |
| | | p1, p2 = multiprocessing.Pipe() |
| | | |
| | | t1 = threading.Thread(target=lambda: recieve_data(pipe)) |
| | | # 后台运行 |
| | | t1.setDaemon(True) |
| | | t1.start() |
| | | |
| | | t2 = threading.Thread(target=lambda: ths_auto_click()) |
| | | # 后台运行 |
| | | t2.setDaemon(True) |
| | | t2.start() |
| | | |
| | | t3 = threading.Thread(target=lambda: ths_auto_refresh()) |
| | | # 后台运行 |
| | | t3.setDaemon(True) |
| | | t3.start() |
| | | threading.Thread(target=lambda: __rec_code(pipe), daemon=True).start() |
| | | threading.Thread(target=lambda: recieve_data(pipe), daemon=True).start() |
| | | threading.Thread(target=lambda: ths_auto_click(), daemon=True).start() |
| | | threading.Thread(target=lambda: ths_auto_refresh(), daemon=True).start() |
| | | app = mainApp() |
| | | global_datas["app"] = app |
| | | app.MainLoop() |
| | | |
| | | with Listener(on_click=on_mouse_click) as listener: |