""" 同花顺交易操作工具 """ import array import logging import threading import time import random import win32gui import win32con def async_call(fn): def wrapper(*args, **kwargs): threading.Thread(target=fn, args=args, kwargs=kwargs).start() return wrapper class THSGuiTrade(object): __instance = None # 单例模式 def __new__(cls, *args, **kwargs): if not cls.__instance: cls.__instance = super(THSGuiTrade, cls).__new__(cls, *args, **kwargs) # 初始化设置 # 获取交易窗口的锁 cls.__instance.buy_lock = threading.RLock() cls.__instance.buy_cancel_locks = {} cls.__instance.buy_win_list = cls.get_buy_wins() print("交易窗口", cls.__instance.buy_win_list) cls.__instance.using_buy_wins = set() return cls.__instance # 刷新窗口句柄 def refresh_hwnds(self): self.buy_win_list = self.get_buy_wins() def get_available_buy_win(self): self.buy_lock.acquire() try: if len(self.buy_win_list) == 0: self.refresh_hwnds() for win in self.buy_win_list: if win not in self.using_buy_wins: self.using_buy_wins.add(win) return win finally: self.buy_lock.release() return 0 @classmethod def getText(cls, hwnd): bufSize = win32gui.SendMessage(hwnd, win32con.WM_GETTEXTLENGTH, 0, 0) + 1 buffer = array.array('b', b'\x00\x00' * bufSize) win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer) text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1) return text.replace("\x00", "").strip() @classmethod def get_buy_wins(cls): buy_win_list = [] hWndList = [] win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList) for hwnd in hWndList: clsname = win32gui.GetClassName(hwnd) if clsname == '#32770' and win32gui.IsWindowVisible(hwnd): pos = win32gui.GetWindowRect(hwnd) width = pos[2] - pos[0] height = pos[3] - pos[1] if 500 > width > 100 and 500 > height > 50: # 查找确定按钮 try: buy_win = win32gui.GetDlgItem(hwnd, 0x000003EE) if buy_win > 0 and cls.getText(buy_win) == '一键买入[B]': buy_win_list.append(hwnd) except Exception as e: print(e) pass return buy_win_list def input_number(self, hwnd, num_str): for i in range(10): # win32gui.SendMessage(hwnd, 258, 8, 0); win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, 8, 0) win32gui.PostMessage(hwnd, win32con.WM_KEYUP, 8, 0) # delete for c in num_str: code = -1 lp = 0 if c == '.': code = 110 win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0) win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0) continue elif c == '0': code = 48 elif c == '1': code = 49 elif c == '2': code = 50 elif c == '3': code = 51 elif c == '4': code = 52 elif c == '5': code = 53 elif c == '6': code = 54 elif c == '7': code = 55 elif c == '8': code = 56 elif c == '9': code = 57 win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0) win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0) def getLimitUpPrice(self, win): hwnd = win32gui.GetDlgItem(win, 0x000006C8) text_ = self.getText(hwnd) return text_.replace("涨停:", "") # 获取交易结果 def getTradeResultWin(self): hWndList = [] win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList) for hwnd in hWndList: clsname = win32gui.GetClassName(hwnd) if clsname == '#32770' and win32gui.IsWindowVisible(hwnd): pos = win32gui.GetWindowRect(hwnd) width = pos[2] - pos[0] height = pos[3] - pos[1] if 500 > width > 100 and 500 > height > 50 and width > height: # 查找确定按钮 try: sure = win32gui.GetDlgItem(hwnd, 0x00000002) if sure > 0: title = self.getText(sure) if title == '确定': return hwnd except: pass return 0 def closeTradeResultDialog(self, win): sure = win32gui.GetDlgItem(win, 0x00000002) # 点击sure win32gui.SendMessage(sure, win32con.WM_LBUTTONDOWN, 0, 0) win32gui.SendMessage(sure, win32con.WM_LBUTTONUP, 0, 0) def getTradeSuccessCode(self, win): if win <= 0: return "" else: code_hwnd = win32gui.GetDlgItem(win, 0x000003EC) text = self.getText(code_hwnd) text = text.split("合同编号:")[1] code_str = "" for i in text: if 48 <= ord(i) < 58: code_str += i print(code_str) return code_str def buy(self, code, limit_up_price, win=0): try: print("使用窗口", win) # 输入代码 # 代码输入框的控件ID:0x00000408 hwnd1 = win32gui.GetDlgItem(win, 0x00000408) # 名称 名称的控件ID:0x0000040C hwnd_name = win32gui.GetDlgItem(win, 0x0000040C) self.input_number(hwnd1, code) # 最多等待2s钟 data_fill = False for i in range(0, 500): bufSize = win32gui.SendMessage(hwnd_name, win32con.WM_GETTEXTLENGTH, 0, 0) + 1 print(i, bufSize) if bufSize > 1: data_fill = True break time.sleep(0.004) if not data_fill: raise Exception("代码输入填充出错") time.sleep(0.001) # 买入 快捷键B # 获取交易win win32gui.PostMessage(win, win32con.WM_KEYDOWN, 66, 0) self.refresh_data() return code, "" finally: self.using_buy_wins.discard(win) @async_call def close_delegate_success_dialog(self): for i in range(0, 50): hwnd = self.getTradeResultWin() if hwnd > 0: time.sleep(0.2) code_str = self.getTradeSuccessCode(hwnd) t = time.time() print(t) end = int(round(t * 1000)) # logger_trade_gui.info("获取委托单号:code-{} 单号-{} 整体耗时:{}".format(code, code_str, end - start)) # 关闭交易结果弹框 self.closeTradeResultDialog(hwnd) break # return code, code_str time.sleep(0.02) # 撤销确认框 def getCancelBuySureWin(self): hWndList = [] win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList) for hwnd in hWndList: clsname = win32gui.GetClassName(hwnd) if clsname == '#32770' and win32gui.IsWindowVisible(hwnd): pos = win32gui.GetWindowRect(hwnd) width = pos[2] - pos[0] height = pos[3] - pos[1] if 500 > width > 100 and 500 > height > 50 and width > height: # 查找确定按钮 try: title = win32gui.FindWindowEx(hwnd, 0, "Static", "撤单确认") if title > 0: return hwnd except: pass return 0 def __get_code_input(self, code=None): win = self.getCancelBuyWin(code) if win <= 0: raise Exception("无法找到取消委托窗口") t = time.time() print(t) start = int(round(t * 1000)) print(win) # 输入框控件ID 0x000003E9 code_input = win32gui.GetDlgItem(win, 0x00000996) code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None) # 刷新句柄 if code_input <= 0: self.refresh_hwnds() code_input = win32gui.GetDlgItem(win, 0x00000996) code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None) return code_input, win # 刷新交易窗口数据 @async_call def refresh_data(self): # 获取到专业下单页面 pass class THSGuiUtil: @classmethod def getText(cls, hwnd): bufSize = win32gui.SendMessage(hwnd, win32con.WM_GETTEXTLENGTH, 0, 0) + 1 try: buffer = array.array('b', b'\x00\x00' * bufSize) win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer) text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1) return text.replace("\x00", "").strip() except: return "" # 添加下单窗口 @classmethod def add_buy_win(cls): buy_wins = THSGuiTrade().get_buy_wins() if len(buy_wins) < 1: raise Exception("没有买入窗口") if len(buy_wins) >= 10: raise Exception("最多只能添加10个下单框") # 增加窗口按钮的ID:00005ED win = buy_wins[-1] add_btn = win32gui.GetDlgItem(win, 0x000005ED) if add_btn <= 0: raise Exception("没有找到添加按钮") try: win32gui.SetForegroundWindow(win) except: pass cls.click(add_btn) for i in range(0, 30): new_buy_wins = THSGuiTrade().get_buy_wins() if len(new_buy_wins) - len(buy_wins) >= 1: # 求差集 list_ = list(set(new_buy_wins).difference(set(buy_wins))) return list_[0] else: time.sleep(0.01) raise Exception("未添加成功") # 窗口是否存在 @classmethod def is_win_exist(cls, win): try: result = win32gui.IsWindowVisible(win) if result: return True else: return False except: return False # 窗口是否正在展示 @classmethod def is_win_show(cls, win): try: result = win32gui.GetWindowRect(win) if result[2] - result[0] > 0 and result[3] - result[1] > 0: return True else: return False except: return False @classmethod def click(cls, control): win32gui.SendMessage(control, win32con.WM_LBUTTONDOWN, 0, 0) win32gui.SendMessage(control, win32con.WM_LBUTTONUP, 0, 0) # 清除买入窗口代码 @classmethod def clear_buy_window_code(cls, win): if not cls.is_win_exist(win): raise Exception("窗口不存在") hwnd1 = win32gui.GetDlgItem(win, 0x00000408) if hwnd1 <= 0: raise Exception("编辑控件没找到") THSGuiTrade().input_number(hwnd1, "") # 设置买入窗口代码 @classmethod def set_buy_window_code(cls, win, code): if not cls.is_win_exist(win): raise Exception("窗口不存在") try: win32gui.SetForegroundWindow(win) except: pass hwnd1 = win32gui.GetDlgItem(win, 0x00000408) if hwnd1 <= 0: raise Exception("编辑控件没找到") THSGuiTrade().input_number(hwnd1, code) def start_buy(code): buy_wins = THSGuiTrade.get_buy_wins() if not buy_wins: raise Exception("沒有没有闪电买入窗口") THSGuiTrade().buy(code, None, buy_wins[0]) THSGuiTrade().close_delegate_success_dialog()