import array import json import os import socket import struct import time import win32con import win32gui import constant from utils import ths_ocr_util, invalid_hwnds_manager import setting from utils import socket_util from utils import ths_util import win32_util # freeze_support() import logging import multiprocessing import threading import wx import wx.html2 as webview import concurrent.futures APP_TITLE = "卖票看板" APP_ICON = "" SOCKET_PORT = 11008 HTTP_PORT = 9004 def show_warning(content, click): toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_WARNING) if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮 if click is not None: click() toastone.Destroy() def show_info(content, click): toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION) if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮 click() toastone.Destroy() def show_sure(content, callback): toastone = wx.MessageDialog(None, content, "提示", wx.YES_NO | wx.ICON_INFORMATION) result = toastone.ShowModal() if result == wx.ID_YES: # 如果点击了提示框的确定按钮 callback(True) toastone.Destroy() elif result == wx.ID_NO: callback(False) toastone.Destroy() class JueJinSettingFrame(wx.Frame): def __init__(self, position): wx.Frame.__init__(self, None, -1, "掘金参数设置", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP, size=(450, 200)) self.SetBackgroundColour(wx.Colour(224, 224, 224)) self.SetPosition(wx.Point(position[0] - self.GetSize()[0] / 2, position[1] - self.GetSize()[1] / 2)) boxsier = wx.BoxSizer() flex = wx.FlexGridSizer(rows=3, cols=2, vgap=10, hgap=10) # 策略 label = wx.StaticText(self, label="策略ID:") flex.Add(label, flag=wx.ALIGN_RIGHT) self.edit_celue = wx.TextCtrl(self, size=(300, -1)) flex.Add(self.edit_celue, flag=wx.EXPAND) # token label = wx.StaticText(self, label="Token:") flex.Add(label) self.edit_token = wx.TextCtrl(self, size=(300, -1), style=wx.TE_MULTILINE) flex.Add(self.edit_token) # 占位 flex.Add(wx.StaticText(self, label="")) # 确定按钮 ID_SURE = wx.NewId() self.btn_sure = wx.Button(self, label='确定', id=ID_SURE) self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn) flex.Add(self.btn_sure, 1, wx.TOP | wx.LEFT, 20) boxsier.Add(flex, 1, wx.TOP | wx.LEFT, 20) self.SetSizer(boxsier) # 初始化数据 self.__init_data() def __init_data(self): strategy_id, token = setting.get_juejin_params() self.edit_celue.SetLabelText(strategy_id) self.edit_token.SetLabelText(token) pass def __sure_btn(self, event): strategy_id = self.edit_celue.GetValue() token = self.edit_token.GetValue() setting.set_juejin_params(strategy_id, token) toastone = wx.MessageDialog(None, "更改成功", "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION) if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮 self.Close() toastone.Destroy() class FobiddenCodesFrame(wx.Frame): def __init__(self, position): wx.Frame.__init__(self, None, -1, "添加禁止交易代码", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP, size=(170, 200)) self.SetBackgroundColour(wx.Colour(224, 224, 224)) self.SetPosition(wx.Point(position[0] - self.GetSize()[0] / 2, position[1] - self.GetSize()[1] / 2)) boxsier = wx.BoxSizer(wx.VERTICAL) # 代码 label = wx.StaticText(self, label="代码:") boxsier.Add(label, flag=wx.ALIGN_LEFT, border=10) self.edit_codes = wx.TextCtrl(self, size=(150, 100), style=wx.TE_MULTILINE) boxsier.Add(self.edit_codes) # 确定按钮 ID_SURE = wx.NewId() self.btn_sure = wx.Button(self, label='确定', id=ID_SURE) self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn) boxsier.Add(self.btn_sure) root_boxsier = wx.BoxSizer(wx.HORIZONTAL) root_boxsier.Add(boxsier, 1, wx.TOP | wx.TOP, 10) self.SetSizer(root_boxsier) def __request(self, codes): client = socket.socket() # 生成socket,连接server ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后) client.connect(ip_port) data = {"type": 201, "data": {"codes": codes}} client.send(json.dumps(data).encode("utf-8")) client.close() def __sure_btn(self, event): codes_str = self.edit_codes.GetValue() if codes_str: codes_str = codes_str.strip() codes = codes_str.split("\n") codes_result = [] for code in codes: if code.strip() and len(code.strip()) == 6: codes_result.append(code.strip()) if len(codes_result) == 0: show_warning("请填写正确的代码!", self.Close) return try: self.__request(codes_result) except Exception as e: show_warning("添加出错:" + str(e), None) return toastone = wx.MessageDialog(None, "添加成功", "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION) if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮 self.Close() toastone.Destroy() class THSPositionSettingFrame(wx.Frame): def __init__(self, position): wx.Frame.__init__(self, None, -1, "同花顺坐标设置", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP, size=(170, 400)) self.SetBackgroundColour(wx.Colour(224, 224, 224)) self.SetPosition(wx.Point(position[0] - self.GetSize()[0] / 2, position[1] - self.GetSize()[1] / 2)) boxsier = wx.BoxSizer(wx.VERTICAL) label = wx.StaticText(self, label="交易刷新事件间隔(ms):") boxsier.Add(label) self.edit_refresh_time_space = wx.TextCtrl(self, size=(150, -1)) boxsier.Add(self.edit_refresh_time_space) label = wx.StaticText(self, label="点击时间间隔(ms):") boxsier.Add(label) self.edit_time_space = wx.TextCtrl(self, size=(150, -1)) boxsier.Add(self.edit_time_space) # 代码 label = wx.StaticText(self, label="坐标:") boxsier.Add(label) self.edit_codes = wx.TextCtrl(self, size=(150, 200), style=wx.TE_MULTILINE) boxsier.Add(self.edit_codes) # 确定按钮 ID_SURE = wx.NewId() self.btn_sure = wx.Button(self, label='确定', id=ID_SURE) self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn) boxsier.Add(self.btn_sure) root_boxsier = wx.BoxSizer(wx.HORIZONTAL) root_boxsier.Add(boxsier, 1, wx.TOP | wx.TOP, 10) self.SetSizer(root_boxsier) # 初始化数据 self.__init_data() def __init_data(self): pos = setting.get_ths_auto_click_positions() self.edit_codes.SetValue("\n".join(pos)) space = setting.get_ths_auto_click_time_space() if space is None: space = 500 self.edit_time_space.SetValue(f"{space}") space = setting.get_ths_auto_refresh_time_space() if space is None: space = 500 self.edit_refresh_time_space.SetValue(f"{space}") def __sure_btn(self, event): codes_str = self.edit_codes.GetValue() ps = codes_str.split("\n") result = [] for p in ps: if p.strip(): result.append(p.strip()) setting.set_ths_auto_click_positions(result) time_space = self.edit_time_space.GetValue() if len(time_space) == 0: show_warning("点击时间间隔不正确", None) return refresh_time_space = self.edit_refresh_time_space.GetValue() if len(refresh_time_space) == 0: show_warning("刷新时间间隔不正确", None) return setting.set_ths_auto_click_time_space(time_space) setting.set_ths_auto_refresh_time_space(refresh_time_space) toastone = wx.MessageDialog(None, "更改成功", "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION) if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮 self.Close() toastone.Destroy() class SocketApiUtil: @classmethod def __request(cls, data_json): socket_util.encryp_client_params_sign(data_json) client = socket.socket() # 生成socket,连接server ip_port = (constant.SERVER_HOST, 11009) # server地址和端口号(最好是10000以后) client.connect(ip_port) client.send(socket_util.load_header(json.dumps(data_json).encode("utf-8"))) result_str, header = socket_util.recv_data(client) return result_str @classmethod def get_cost_price(cls, code): result_str = cls.__request({"type": "get_cost_price", "data": {"code": code}}) print(result_str) result = json.loads(result_str) if result['code'] == 0: return result['data']['price'] raise Exception(result['msg']) class mainApp(wx.App): def __init_data(self): pass def OnInit(self): self.SetAppName(APP_TITLE) self.__init_data() # self.__show_main_frame() return True def ths_auto_click(): hwnd = ths_util.get_ths_second_screen_menu_hwnd() while True: try: if hwnd is None or not win32gui.IsWindowVisible(hwnd): # print("未找到同花顺副屏句柄") hwnd = ths_util.get_ths_second_screen_menu_hwnd() if hwnd is None: continue if not setting.is_ths_auto_click(): continue ps = setting.get_ths_auto_click_positions() if not ps: continue ps_new = [] for p in ps: p = eval(p) ps_new.append(p) space = setting.get_ths_auto_click_time_space() if space is None: space = 500 ths_util.betch_click(hwnd, ps_new, round(space / 1000, 4)) except Exception as e: pass finally: time.sleep(0.02) def ths_auto_refresh(): hwnd = ths_util.get_trade_refesh_hwnd() count = 0 while True: count += 1 if count > 10: count = 0 hwnd = ths_util.get_trade_refesh_hwnd() try: if hwnd is None: continue # 测试 if not setting.is_ths_trade_auto_refresh(): continue rect = win32gui.GetWindowRect(hwnd) win32_util.visual_click(hwnd, (160, (rect[3] - rect[1]) // 2)) time_space = setting.get_ths_auto_refresh_time_space() if time_space is None: time_space = 500 time.sleep(round(time_space / 1000, 4)) except Exception as e: pass finally: time.sleep(0.02) if __name__ == "__main__1": print(webview.WebView.IsBackendAvailable(webview.WebViewBackendEdge)) set_codes_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10) # 接受来自其他进程的数据 def recieve_data(pipe): while True: try: value = pipe.recv() data = json.loads(value) print("接受到数据:", data) type_ = data["type"] if type_ == "juejin_setting": wx.CallAfter(lambda: JueJinSettingFrame(data["pos"]).Show()) elif type_ == "manage_ths_pos": wx.CallAfter(lambda: THSPositionSettingFrame(data["pos"]).Show()) elif type_ == "set_code": code = data["code"] set_codes_thread_pool.submit(set_xfdp_codes, code) elif type_ == "show_dead_hwnds": hwnds = win32_util.text_hwnds_doing print("死亡窗口", hwnds) if hwnds: for hwnd in hwnds: invalid_hwnds_manager.add_hwnd(hwnd) elif type_ == "exit": os._exit(0) except Exception as e: logging.exception(e) from pynput.mouse import Listener ocr_settings = {} def set_xfdp_codes(code): """ 设置悬浮盯盘中的代码 :param code: :return: """ def focus(hwnd, code_): try: int_buffer = array.array("L", [0]) char_buffer = array.array('b', json.dumps({"type": "set_code", "data": {"code": code_, "time": int(time.time() * 1000)}}).encode()) int_buffer_address = int_buffer.buffer_info()[0] char_buffer_address, char_buffer_size = char_buffer.buffer_info() copy_struct = struct.pack("PLP", int_buffer_address, char_buffer_size, char_buffer_address) win32gui.SendMessage(hwnd, win32con.WM_COPYDATA, None, copy_struct) # pythoncom.CoInitialize() # shell = client.Dispatch("WScript.Shell") # shell.SendKeys('%') # # win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) # win32gui.SetForegroundWindow(hwnd) # win32gui.PumpMessages() # win32gui.SetFocus(hwnd) # lib = CDLL("D:/workspace/GP/dll/Dll/x64/Debug/Dll.dll") # win32gui.SendMessage(hwnd, win32con.WM_SETFOCUS, 0, 0) # print("执行结果", lib.focus(hwnd)) # remote_thread = win32process.GetWindowThreadProcessId(hwnd)[0] # tid = win32api.GetCurrentThreadId() # win32process.AttachThreadInput(tid, remote_thread, True) # prev_handle = win32gui.SetFocus(hwnd) # print("之前的焦点句柄", prev_handle,hwnd) except Exception as e: logging.exception(e) pass # 获取代码属性描述 # auto_focus = setting.get_float_frame_auto_focus() if True: # 窗口显示在最前面 hwnds = win32_util.search_window("悬浮盯盘") if hwnds: print("句柄", hwnds[0]) time.sleep(0.1) focus(hwnds[0], code) def on_mouse_click(x, y, button, pressed): """ 鼠标点击事件 :param x: :param y: :param button: :param pressed: :return: """ if pressed: # 点击事件过后1s内可截图识别代码 ocr_settings["expire_time"] = time.time() + 1 # print(f"鼠标在 ({x}, {y}) 位置按下,按钮: {button}") else: # print(f"鼠标在 ({x}, {y}) 位置释放,按钮: {button}") pass def __rec_code(pipe): time.sleep(5) while True: if "expire_time" in ocr_settings and ocr_settings["expire_time"] > time.time(): try: code = ths_ocr_util.ocr_ths_code() if not code: time.sleep(0.1) continue pipe.send(json.dumps({"code": code, "type": "code"})) except Exception as e: # logging.exception(e) # print(str(e)) pass time.sleep(0.005) def run(pipe): global p2 p1, p2 = multiprocessing.Pipe() threading.Thread(target=lambda: __rec_code(pipe), daemon=True).start() threading.Thread(target=lambda: recieve_data(pipe), daemon=True).start() threading.Thread(target=lambda: ths_auto_click(), daemon=True).start() threading.Thread(target=lambda: ths_auto_refresh(), daemon=True).start() app = mainApp() app.MainLoop() with Listener(on_click=on_mouse_click) as listener: listener.join() while True: time.sleep(2) if __name__ == "__main__": app = mainApp() app.MainLoop()