admin
2025-06-10 568c763084b926a6f2d632b7ac65b9ec8280752f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
"""
同花顺交易操作工具
"""
 
import array
import logging
import threading
import time
import random
 
import win32gui
import win32con
 
 
def async_call(fn):
    def wrapper(*args, **kwargs):
        threading.Thread(target=fn, args=args, kwargs=kwargs).start()
 
    return wrapper
 
 
class THSGuiTrade(object):
    __instance = None
 
    # 单例模式
    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = super(THSGuiTrade, cls).__new__(cls, *args, **kwargs)
            # 初始化设置
            # 获取交易窗口的锁
            cls.__instance.buy_lock = threading.RLock()
            cls.__instance.buy_cancel_locks = {}
            cls.__instance.buy_win_list = cls.get_buy_wins()
            print("交易窗口", cls.__instance.buy_win_list)
            cls.__instance.using_buy_wins = set()
        return cls.__instance
 
    # 刷新窗口句柄
    def refresh_hwnds(self):
        self.buy_win_list = self.get_buy_wins()
 
    def get_available_buy_win(self):
        self.buy_lock.acquire()
        try:
            if len(self.buy_win_list) == 0:
                self.refresh_hwnds()
            for win in self.buy_win_list:
                if win not in self.using_buy_wins:
                    self.using_buy_wins.add(win)
                    return win
        finally:
            self.buy_lock.release()
        return 0
 
    @classmethod
    def getText(cls, hwnd):
        bufSize = win32gui.SendMessage(hwnd, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
        buffer = array.array('b', b'\x00\x00' * bufSize)
        win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer)
        text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1)
        return text.replace("\x00", "").strip()
 
    @classmethod
    def get_buy_wins(cls):
        buy_win_list = []
        hWndList = []
        win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
        for hwnd in hWndList:
            clsname = win32gui.GetClassName(hwnd)
            if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
                pos = win32gui.GetWindowRect(hwnd)
                width = pos[2] - pos[0]
                height = pos[3] - pos[1]
                if 500 > width > 100 and 500 > height > 50:
                    # 查找确定按钮
                    try:
                        buy_win = win32gui.GetDlgItem(hwnd, 0x000003EE)
                        if buy_win > 0 and cls.getText(buy_win) == '一键买入[B]':
                            buy_win_list.append(hwnd)
                    except Exception as e:
                        print(e)
                        pass
        return buy_win_list
 
    def input_number(self, hwnd, num_str):
        for i in range(10):
            # win32gui.SendMessage(hwnd, 258, 8, 0);
            win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, 8, 0)
            win32gui.PostMessage(hwnd, win32con.WM_KEYUP, 8, 0)
        # delete
        for c in num_str:
            code = -1
            lp = 0
            if c == '.':
                code = 110
                win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0)
                win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0)
                continue
            elif c == '0':
                code = 48
            elif c == '1':
                code = 49
            elif c == '2':
                code = 50
            elif c == '3':
                code = 51
            elif c == '4':
                code = 52
            elif c == '5':
                code = 53
            elif c == '6':
                code = 54
            elif c == '7':
                code = 55
            elif c == '8':
                code = 56
            elif c == '9':
                code = 57
            win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0)
            win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0)
 
    def getLimitUpPrice(self, win):
        hwnd = win32gui.GetDlgItem(win, 0x000006C8)
        text_ = self.getText(hwnd)
        return text_.replace("涨停:", "")
 
    # 获取交易结果
    def getTradeResultWin(self):
        hWndList = []
        win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
        for hwnd in hWndList:
            clsname = win32gui.GetClassName(hwnd)
            if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
                pos = win32gui.GetWindowRect(hwnd)
                width = pos[2] - pos[0]
                height = pos[3] - pos[1]
                if 500 > width > 100 and 500 > height > 50 and width > height:
                    # 查找确定按钮
                    try:
                        sure = win32gui.GetDlgItem(hwnd, 0x00000002)
                        if sure > 0:
                            title = self.getText(sure)
                            if title == '确定':
                                return hwnd
                    except:
                        pass
        return 0
 
    def closeTradeResultDialog(self, win):
        sure = win32gui.GetDlgItem(win, 0x00000002)
        # 点击sure
        win32gui.SendMessage(sure, win32con.WM_LBUTTONDOWN, 0, 0)
        win32gui.SendMessage(sure, win32con.WM_LBUTTONUP, 0, 0)
 
    def getTradeSuccessCode(self, win):
        if win <= 0:
            return ""
        else:
            code_hwnd = win32gui.GetDlgItem(win, 0x000003EC)
            text = self.getText(code_hwnd)
            text = text.split("合同编号:")[1]
            code_str = ""
            for i in text:
                if 48 <= ord(i) < 58:
                    code_str += i
            print(code_str)
            return code_str
 
    def buy(self, code, limit_up_price, win=0):
        try:
            print("使用窗口", win)
            # 输入代码
            # 代码输入框的控件ID:0x00000408
            hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
            # 名称 名称的控件ID:0x0000040C
            hwnd_name = win32gui.GetDlgItem(win, 0x0000040C)
            self.input_number(hwnd1, code)
            # 最多等待2s钟
            data_fill = False
            for i in range(0, 500):
                bufSize = win32gui.SendMessage(hwnd_name, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
                print(i, bufSize)
                if bufSize > 1:
                    data_fill = True
                    break
                time.sleep(0.004)
 
            if not data_fill:
                raise Exception("代码输入填充出错")
            time.sleep(0.001)
            # 买入 快捷键B
            # 获取交易win
            win32gui.PostMessage(win, win32con.WM_KEYDOWN, 66, 0)
 
            self.refresh_data()
            return code, ""
        finally:
            self.using_buy_wins.discard(win)
 
    @async_call
    def close_delegate_success_dialog(self):
        for i in range(0, 50):
            hwnd = self.getTradeResultWin()
            if hwnd > 0:
                time.sleep(0.2)
                code_str = self.getTradeSuccessCode(hwnd)
                t = time.time()
                print(t)
                end = int(round(t * 1000))
                # logger_trade_gui.info("获取委托单号:code-{} 单号-{} 整体耗时:{}".format(code, code_str, end - start))
                # 关闭交易结果弹框
                self.closeTradeResultDialog(hwnd)
                break
                # return code, code_str
            time.sleep(0.02)
 
    # 撤销确认框
    def getCancelBuySureWin(self):
        hWndList = []
        win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
        for hwnd in hWndList:
            clsname = win32gui.GetClassName(hwnd)
            if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
                pos = win32gui.GetWindowRect(hwnd)
                width = pos[2] - pos[0]
                height = pos[3] - pos[1]
                if 500 > width > 100 and 500 > height > 50 and width > height:
                    # 查找确定按钮
                    try:
                        title = win32gui.FindWindowEx(hwnd, 0, "Static", "撤单确认")
                        if title > 0:
                            return hwnd
                    except:
                        pass
        return 0
 
    def __get_code_input(self, code=None):
        win = self.getCancelBuyWin(code)
        if win <= 0:
            raise Exception("无法找到取消委托窗口")
        t = time.time()
        print(t)
        start = int(round(t * 1000))
        print(win)
        # 输入框控件ID 0x000003E9
        code_input = win32gui.GetDlgItem(win, 0x00000996)
        code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
        # 刷新句柄
        if code_input <= 0:
            self.refresh_hwnds()
            code_input = win32gui.GetDlgItem(win, 0x00000996)
            code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
        return code_input, win
 
    # 刷新交易窗口数据
    @async_call
    def refresh_data(self):
        # 获取到专业下单页面
        pass
 
 
class THSGuiUtil:
    @classmethod
    def getText(cls, hwnd):
        bufSize = win32gui.SendMessage(hwnd, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
        try:
            buffer = array.array('b', b'\x00\x00' * bufSize)
            win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer)
            text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1)
            return text.replace("\x00", "").strip()
        except:
            return ""
 
    # 添加下单窗口
    @classmethod
    def add_buy_win(cls):
        buy_wins = THSGuiTrade().get_buy_wins()
        if len(buy_wins) < 1:
            raise Exception("没有买入窗口")
        if len(buy_wins) >= 10:
            raise Exception("最多只能添加10个下单框")
        # 增加窗口按钮的ID:00005ED
        win = buy_wins[-1]
        add_btn = win32gui.GetDlgItem(win, 0x000005ED)
        if add_btn <= 0:
            raise Exception("没有找到添加按钮")
        try:
            win32gui.SetForegroundWindow(win)
        except:
            pass
        cls.click(add_btn)
        for i in range(0, 30):
            new_buy_wins = THSGuiTrade().get_buy_wins()
            if len(new_buy_wins) - len(buy_wins) >= 1:
                # 求差集
                list_ = list(set(new_buy_wins).difference(set(buy_wins)))
                return list_[0]
            else:
                time.sleep(0.01)
        raise Exception("未添加成功")
 
    # 窗口是否存在
    @classmethod
    def is_win_exist(cls, win):
        try:
            result = win32gui.IsWindowVisible(win)
            if result:
                return True
            else:
                return False
        except:
            return False
 
    # 窗口是否正在展示
    @classmethod
    def is_win_show(cls, win):
        try:
            result = win32gui.GetWindowRect(win)
            if result[2] - result[0] > 0 and result[3] - result[1] > 0:
                return True
            else:
                return False
        except:
            return False
 
    @classmethod
    def click(cls, control):
        win32gui.SendMessage(control, win32con.WM_LBUTTONDOWN, 0, 0)
        win32gui.SendMessage(control, win32con.WM_LBUTTONUP, 0, 0)
 
    # 清除买入窗口代码
    @classmethod
    def clear_buy_window_code(cls, win):
        if not cls.is_win_exist(win):
            raise Exception("窗口不存在")
 
        hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
        if hwnd1 <= 0:
            raise Exception("编辑控件没找到")
        THSGuiTrade().input_number(hwnd1, "")
 
    # 设置买入窗口代码
    @classmethod
    def set_buy_window_code(cls, win, code):
        if not cls.is_win_exist(win):
            raise Exception("窗口不存在")
        try:
            win32gui.SetForegroundWindow(win)
        except:
            pass
        hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
        if hwnd1 <= 0:
            raise Exception("编辑控件没找到")
        THSGuiTrade().input_number(hwnd1, code)
 
 
def start_buy(code):
    buy_wins = THSGuiTrade.get_buy_wins()
    if not buy_wins:
        raise Exception("沒有没有闪电买入窗口")
    THSGuiTrade().buy(code, None, buy_wins[0])
    THSGuiTrade().close_delegate_success_dialog()