"""
|
同花顺交易操作工具
|
"""
|
|
import array
|
import logging
|
import threading
|
import time
|
import random
|
|
import win32gui
|
import win32con
|
|
|
def async_call(fn):
|
def wrapper(*args, **kwargs):
|
threading.Thread(target=fn, args=args, kwargs=kwargs).start()
|
|
return wrapper
|
|
|
class THSGuiTrade(object):
|
__instance = None
|
|
# 单例模式
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(THSGuiTrade, cls).__new__(cls, *args, **kwargs)
|
# 初始化设置
|
# 获取交易窗口的锁
|
cls.__instance.buy_lock = threading.RLock()
|
cls.__instance.buy_cancel_locks = {}
|
cls.__instance.buy_win_list = cls.get_buy_wins()
|
print("交易窗口", cls.__instance.buy_win_list)
|
cls.__instance.using_buy_wins = set()
|
return cls.__instance
|
|
# 刷新窗口句柄
|
def refresh_hwnds(self):
|
self.buy_win_list = self.get_buy_wins()
|
|
def get_available_buy_win(self):
|
self.buy_lock.acquire()
|
try:
|
if len(self.buy_win_list) == 0:
|
self.refresh_hwnds()
|
for win in self.buy_win_list:
|
if win not in self.using_buy_wins:
|
self.using_buy_wins.add(win)
|
return win
|
finally:
|
self.buy_lock.release()
|
return 0
|
|
@classmethod
|
def getText(cls, hwnd):
|
bufSize = win32gui.SendMessage(hwnd, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
|
buffer = array.array('b', b'\x00\x00' * bufSize)
|
win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer)
|
text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1)
|
return text.replace("\x00", "").strip()
|
|
@classmethod
|
def get_buy_wins(cls):
|
buy_win_list = []
|
hWndList = []
|
win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
|
for hwnd in hWndList:
|
clsname = win32gui.GetClassName(hwnd)
|
if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
|
pos = win32gui.GetWindowRect(hwnd)
|
width = pos[2] - pos[0]
|
height = pos[3] - pos[1]
|
if 500 > width > 100 and 500 > height > 50:
|
# 查找确定按钮
|
try:
|
buy_win = win32gui.GetDlgItem(hwnd, 0x000003EE)
|
if buy_win > 0 and cls.getText(buy_win) == '一键买入[B]':
|
buy_win_list.append(hwnd)
|
except Exception as e:
|
print(e)
|
pass
|
return buy_win_list
|
|
def input_number(self, hwnd, num_str):
|
for i in range(10):
|
# win32gui.SendMessage(hwnd, 258, 8, 0);
|
win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, 8, 0)
|
win32gui.PostMessage(hwnd, win32con.WM_KEYUP, 8, 0)
|
# delete
|
for c in num_str:
|
code = -1
|
lp = 0
|
if c == '.':
|
code = 110
|
win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0)
|
win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0)
|
continue
|
elif c == '0':
|
code = 48
|
elif c == '1':
|
code = 49
|
elif c == '2':
|
code = 50
|
elif c == '3':
|
code = 51
|
elif c == '4':
|
code = 52
|
elif c == '5':
|
code = 53
|
elif c == '6':
|
code = 54
|
elif c == '7':
|
code = 55
|
elif c == '8':
|
code = 56
|
elif c == '9':
|
code = 57
|
win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0)
|
win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0)
|
|
def getLimitUpPrice(self, win):
|
hwnd = win32gui.GetDlgItem(win, 0x000006C8)
|
text_ = self.getText(hwnd)
|
return text_.replace("涨停:", "")
|
|
# 获取交易结果
|
def getTradeResultWin(self):
|
hWndList = []
|
win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
|
for hwnd in hWndList:
|
clsname = win32gui.GetClassName(hwnd)
|
if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
|
pos = win32gui.GetWindowRect(hwnd)
|
width = pos[2] - pos[0]
|
height = pos[3] - pos[1]
|
if 500 > width > 100 and 500 > height > 50 and width > height:
|
# 查找确定按钮
|
try:
|
sure = win32gui.GetDlgItem(hwnd, 0x00000002)
|
if sure > 0:
|
title = self.getText(sure)
|
if title == '确定':
|
return hwnd
|
except:
|
pass
|
return 0
|
|
def closeTradeResultDialog(self, win):
|
sure = win32gui.GetDlgItem(win, 0x00000002)
|
# 点击sure
|
win32gui.SendMessage(sure, win32con.WM_LBUTTONDOWN, 0, 0)
|
win32gui.SendMessage(sure, win32con.WM_LBUTTONUP, 0, 0)
|
|
def getTradeSuccessCode(self, win):
|
if win <= 0:
|
return ""
|
else:
|
code_hwnd = win32gui.GetDlgItem(win, 0x000003EC)
|
text = self.getText(code_hwnd)
|
text = text.split("合同编号:")[1]
|
code_str = ""
|
for i in text:
|
if 48 <= ord(i) < 58:
|
code_str += i
|
print(code_str)
|
return code_str
|
|
def buy(self, code, limit_up_price, win=0):
|
try:
|
print("使用窗口", win)
|
# 输入代码
|
# 代码输入框的控件ID:0x00000408
|
hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
|
# 名称 名称的控件ID:0x0000040C
|
hwnd_name = win32gui.GetDlgItem(win, 0x0000040C)
|
self.input_number(hwnd1, code)
|
# 最多等待2s钟
|
data_fill = False
|
for i in range(0, 500):
|
bufSize = win32gui.SendMessage(hwnd_name, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
|
print(i, bufSize)
|
if bufSize > 1:
|
data_fill = True
|
break
|
time.sleep(0.004)
|
|
if not data_fill:
|
raise Exception("代码输入填充出错")
|
time.sleep(0.001)
|
# 买入 快捷键B
|
# 获取交易win
|
win32gui.PostMessage(win, win32con.WM_KEYDOWN, 66, 0)
|
|
self.refresh_data()
|
return code, ""
|
finally:
|
self.using_buy_wins.discard(win)
|
|
@async_call
|
def close_delegate_success_dialog(self):
|
for i in range(0, 50):
|
hwnd = self.getTradeResultWin()
|
if hwnd > 0:
|
time.sleep(0.2)
|
code_str = self.getTradeSuccessCode(hwnd)
|
t = time.time()
|
print(t)
|
end = int(round(t * 1000))
|
# logger_trade_gui.info("获取委托单号:code-{} 单号-{} 整体耗时:{}".format(code, code_str, end - start))
|
# 关闭交易结果弹框
|
self.closeTradeResultDialog(hwnd)
|
break
|
# return code, code_str
|
time.sleep(0.02)
|
|
# 撤销确认框
|
def getCancelBuySureWin(self):
|
hWndList = []
|
win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
|
for hwnd in hWndList:
|
clsname = win32gui.GetClassName(hwnd)
|
if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
|
pos = win32gui.GetWindowRect(hwnd)
|
width = pos[2] - pos[0]
|
height = pos[3] - pos[1]
|
if 500 > width > 100 and 500 > height > 50 and width > height:
|
# 查找确定按钮
|
try:
|
title = win32gui.FindWindowEx(hwnd, 0, "Static", "撤单确认")
|
if title > 0:
|
return hwnd
|
except:
|
pass
|
return 0
|
|
def __get_code_input(self, code=None):
|
win = self.getCancelBuyWin(code)
|
if win <= 0:
|
raise Exception("无法找到取消委托窗口")
|
t = time.time()
|
print(t)
|
start = int(round(t * 1000))
|
print(win)
|
# 输入框控件ID 0x000003E9
|
code_input = win32gui.GetDlgItem(win, 0x00000996)
|
code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
|
# 刷新句柄
|
if code_input <= 0:
|
self.refresh_hwnds()
|
code_input = win32gui.GetDlgItem(win, 0x00000996)
|
code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
|
return code_input, win
|
|
# 刷新交易窗口数据
|
@async_call
|
def refresh_data(self):
|
# 获取到专业下单页面
|
pass
|
|
|
class THSGuiUtil:
|
@classmethod
|
def getText(cls, hwnd):
|
bufSize = win32gui.SendMessage(hwnd, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
|
try:
|
buffer = array.array('b', b'\x00\x00' * bufSize)
|
win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer)
|
text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1)
|
return text.replace("\x00", "").strip()
|
except:
|
return ""
|
|
# 添加下单窗口
|
@classmethod
|
def add_buy_win(cls):
|
buy_wins = THSGuiTrade().get_buy_wins()
|
if len(buy_wins) < 1:
|
raise Exception("没有买入窗口")
|
if len(buy_wins) >= 10:
|
raise Exception("最多只能添加10个下单框")
|
# 增加窗口按钮的ID:00005ED
|
win = buy_wins[-1]
|
add_btn = win32gui.GetDlgItem(win, 0x000005ED)
|
if add_btn <= 0:
|
raise Exception("没有找到添加按钮")
|
try:
|
win32gui.SetForegroundWindow(win)
|
except:
|
pass
|
cls.click(add_btn)
|
for i in range(0, 30):
|
new_buy_wins = THSGuiTrade().get_buy_wins()
|
if len(new_buy_wins) - len(buy_wins) >= 1:
|
# 求差集
|
list_ = list(set(new_buy_wins).difference(set(buy_wins)))
|
return list_[0]
|
else:
|
time.sleep(0.01)
|
raise Exception("未添加成功")
|
|
# 窗口是否存在
|
@classmethod
|
def is_win_exist(cls, win):
|
try:
|
result = win32gui.IsWindowVisible(win)
|
if result:
|
return True
|
else:
|
return False
|
except:
|
return False
|
|
# 窗口是否正在展示
|
@classmethod
|
def is_win_show(cls, win):
|
try:
|
result = win32gui.GetWindowRect(win)
|
if result[2] - result[0] > 0 and result[3] - result[1] > 0:
|
return True
|
else:
|
return False
|
except:
|
return False
|
|
@classmethod
|
def click(cls, control):
|
win32gui.SendMessage(control, win32con.WM_LBUTTONDOWN, 0, 0)
|
win32gui.SendMessage(control, win32con.WM_LBUTTONUP, 0, 0)
|
|
# 清除买入窗口代码
|
@classmethod
|
def clear_buy_window_code(cls, win):
|
if not cls.is_win_exist(win):
|
raise Exception("窗口不存在")
|
|
hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
|
if hwnd1 <= 0:
|
raise Exception("编辑控件没找到")
|
THSGuiTrade().input_number(hwnd1, "")
|
|
# 设置买入窗口代码
|
@classmethod
|
def set_buy_window_code(cls, win, code):
|
if not cls.is_win_exist(win):
|
raise Exception("窗口不存在")
|
try:
|
win32gui.SetForegroundWindow(win)
|
except:
|
pass
|
hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
|
if hwnd1 <= 0:
|
raise Exception("编辑控件没找到")
|
THSGuiTrade().input_number(hwnd1, code)
|
|
|
def start_buy(code):
|
buy_wins = THSGuiTrade.get_buy_wins()
|
if not buy_wins:
|
raise Exception("沒有没有闪电买入窗口")
|
THSGuiTrade().buy(code, None, buy_wins[0])
|
THSGuiTrade().close_delegate_success_dialog()
|