""" 同花顺软件工具 """ # 打开交易环境 import time import win32api import win32con import win32gui from db import redis_manager_delegate as redis_manager from db.redis_manager_delegate import RedisUtils from utils import tool from trade import trade_gui __redisManager = redis_manager.RedisManager(2) def __click(hwnd): win32gui.SendMessage(hwnd, win32con.WM_LBUTTONDOWN, 0, 0) win32gui.SendMessage(hwnd, win32con.WM_LBUTTONUP, 0, 0) def __input_number(hwnd, num_str): # delete for c in num_str: code = -1 lp = 0 if c == '.': code = 110 win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0) win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0) continue elif c == '0': code = 48 elif c == '1': code = 49 elif c == '2': code = 50 elif c == '3': code = 51 elif c == '4': code = 52 elif c == '5': code = 53 elif c == '6': code = 54 elif c == '7': code = 55 elif c == '8': code = 56 elif c == '9': code = 57 win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0) win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0) def show_window(hwnd): win32gui.ShowWindow(hwnd, win32con.SW_SHOWNORMAL) def open_trade_gui(): main_win = __open_main_win() show_window(main_win) win_delegate = __open_trade_delegate_win(main_win) # 最大化委托 win_temp = win32gui.FindWindowEx(win_delegate, None, None, "HexinScrollWnd") left, top, right, bottom = win32gui.GetWindowRect(win_temp) height_1 = bottom - top win_temp = win32gui.FindWindowEx(win_delegate, win_temp, None, "HexinScrollWnd") left, top, right, bottom = win32gui.GetWindowRect(win_temp) height_2 = bottom - top if height_1 > 10 and height_2 > 10: maxwin = win32gui.GetDlgItem(win_delegate, 0x00002EF1) # 点击按钮 __click(maxwin) # 打开闪电买入 __open_buy_win(main_win) def __open_main_win(): # 打开同花顺主页 hWndList = [] win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList) for hwnd in hWndList: _title = win32gui.GetWindowText(hwnd) if _title.__contains__("同花顺("): if not win32gui.IsWindowVisible(hwnd): show_window(hwnd) win32gui.SetForegroundWindow(hwnd) win32gui.SetFocus(hwnd) return hwnd raise Exception("尚未打开同花顺") # 打开闪电买入窗口 def __open_buy_win(hwnd): show_window(hwnd) wins = trade_gui.THSGuiTrade.get_buy_wins() if len(wins) < 1: try: win32gui.SetForegroundWindow(hwnd) except: pass left, top, right, bottom = win32gui.GetWindowRect(hwnd) x = int(round((left + right) / 2, 0)) y = int(round(top + 2, 0)) win32api.mouse_event(win32con.MOUSEEVENTF_LEFTDOWN | win32con.MOUSEEVENTF_LEFTUP, x, y, 0, 0) # 输入21 time.sleep(0.1) __input_number(hwnd, "21") # 输入enter win32api.keybd_event(win32con.VK_RETURN, 0, 0, 0) time.sleep(1) wins = trade_gui.THSGuiTrade.get_buy_wins() if len(wins) < 1: raise Exception("打开闪电买入失败") if len(wins) < 3: for i in range(0, 3): btn = win32gui.GetDlgItem(wins[len(wins) - 1], 0x000005ED) # 点击事件添加 __click(btn) time.sleep(0.5) wins = trade_gui.THSGuiTrade.get_buy_wins() pass # 获取专业版下单句柄 def __get__trade_delegate_win_profession(): hWndList = [] win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList) for hwnd in hWndList: _title = win32gui.GetWindowText(hwnd) if _title.__contains__("专业版下单"): return hwnd return None # 打开交易委托 def __open_trade_delegate_win(hwnd1): win32gui.SetForegroundWindow(hwnd1) # 按下F12 win32api.keybd_event(win32con.VK_F12, 0, 0, 0) # 检测交易窗口是否打开 for i in range(0, 5): hWndList = [] win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList) for hwnd in hWndList: _title = win32gui.GetWindowText(hwnd) if _title.__contains__("专业版下单"): if not win32gui.IsWindowVisible(hwnd): win32gui.SetForegroundWindow(hwnd) return hwnd elif _title.__contains__("网上股票交易系统"): show_window(hwnd) time.sleep(0.2) hwnd = win32gui.FindWindowEx(hwnd, None, "ToolbarWindow32", None) hwnd = win32gui.FindWindowEx(hwnd, None, "#32770", None) hwnd = win32gui.FindWindowEx(hwnd, None, "Button", "专业") # 点击按钮 __click(hwnd) win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList) for j in range(0, 5): win = __get__trade_delegate_win_profession() if win is not None: return win time.sleep(1) time.sleep(1) raise Exception("专业版下单打开失败") def set_ths_dead_state(client_id, dead): redis = __redisManager.getRedis() try: key = "ths_state_dead_count-{}".format(client_id) if not dead: RedisUtils.setex(redis, key, tool.get_expire(), 0, auto_free=False) else: RedisUtils.incrby(redis, key, 1, auto_free=False) RedisUtils.expire(redis, key, tool.get_expire(), auto_free=False) finally: RedisUtils.realse(redis) # 同花顺是否卡死 def is_ths_dead(client_id): key = "ths_state_dead_count-{}".format(client_id) val = RedisUtils.get( __redisManager.getRedis(), key) if val is not None and int(val) >= 5: return True else: return False if __name__ == "__main__": open_trade_gui() pass