import array
|
import threading
|
import time
|
|
import win32gui
|
import win32api
|
import win32con
|
from log import *
|
from threading import Thread
|
|
|
def async_call(fn):
|
def wrapper(*args, **kwargs):
|
Thread(target=fn, args=args, kwargs=kwargs).start()
|
|
return wrapper
|
|
|
class THSGuiTrade(object):
|
__instance = None
|
|
# 单例模式
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(THSGuiTrade, cls).__new__(cls, *args, **kwargs)
|
# 初始化设置
|
# 获取交易窗口的锁
|
cls.__instance.buy_lock = threading.RLock()
|
cls.__instance.buy_cancel_lock = threading.RLock()
|
cls.__instance.buy_win_list = cls.get_buy_wins()
|
print("交易窗口", cls.__instance.buy_win_list)
|
cls.__instance.using_buy_wins = set()
|
cls.__instance.cancel_win = cls.__instance.getCancelBuyWin()
|
return cls.__instance
|
|
# 刷新窗口句柄
|
def refresh_hwnds(self):
|
self.cancel_win = self.__instance.getCancelBuyWin()
|
self.buy_win_list = self.get_buy_wins();
|
|
# 打开交易环境
|
def open_trade_env(self):
|
# 打开交易界面(F12)
|
|
pass
|
|
def get_available_buy_win(self):
|
self.buy_lock.acquire()
|
try:
|
if len(self.buy_win_list) == 0:
|
self.refresh_hwnds()
|
for win in self.buy_win_list:
|
if win not in self.using_buy_wins:
|
self.using_buy_wins.add(win)
|
return win
|
finally:
|
self.buy_lock.release()
|
return 0
|
|
@classmethod
|
def checkEnv(cls):
|
# 检测交易窗口
|
buy_wins = cls.get_buy_wins()
|
if len(buy_wins) < 3:
|
raise Exception("闪电买入窗口最低需要3个")
|
|
# 检测撤单窗口
|
cancel_trade_win = cls.getCancelBuyWin()
|
if cancel_trade_win <= 0:
|
raise Exception("委托撤销窗口未打开")
|
|
@classmethod
|
def getText(cls, hwnd):
|
bufSize = win32gui.SendMessage(hwnd, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
|
buffer = array.array('b', b'\x00\x00' * bufSize)
|
win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer)
|
text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1)
|
return text.replace("\x00", "").strip();
|
|
@classmethod
|
def get_buy_wins(cls):
|
buy_win_list = []
|
hWndList = []
|
win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
|
for hwnd in hWndList:
|
clsname = win32gui.GetClassName(hwnd)
|
if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
|
pos = win32gui.GetWindowRect(hwnd)
|
width = pos[2] - pos[0]
|
height = pos[3] - pos[1]
|
if 500 > width > 100 and 500 > height > 50:
|
# 查找确定按钮
|
try:
|
buy_win = win32gui.GetDlgItem(hwnd, 0x000003EE)
|
if buy_win > 0 and cls.getText(buy_win) == '一键买入[B]':
|
buy_win_list.append(hwnd)
|
except Exception as e:
|
print(e)
|
pass
|
return buy_win_list
|
|
# 获取撤单窗口
|
@classmethod
|
def getCancelBuyWin(cls):
|
hWndList = []
|
win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
|
for hwnd in hWndList:
|
clsname = win32gui.GetClassName(hwnd)
|
if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
|
try:
|
if cls.getText(hwnd) == '专业版下单':
|
pos = win32gui.GetWindowRect(hwnd)
|
width = pos[2] - pos[0]
|
height = pos[3] - pos[1]
|
if width > 200 and height > 200:
|
return hwnd
|
except:
|
pass
|
return 0
|
|
def input_number(self, hwnd, num_str):
|
for i in range(10):
|
# win32gui.SendMessage(hwnd, 258, 8, 0);
|
win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, 8, 0);
|
win32gui.PostMessage(hwnd, win32con.WM_KEYUP, 8, 0);
|
# delete
|
for c in num_str:
|
code = -1
|
lp = 0
|
if c == '.':
|
code = 110
|
win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0);
|
win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0);
|
continue
|
elif c == '0':
|
code = 48
|
elif c == '1':
|
code = 49
|
elif c == '2':
|
code = 50
|
elif c == '3':
|
code = 51
|
elif c == '4':
|
code = 52
|
elif c == '5':
|
code = 53
|
elif c == '6':
|
code = 54
|
elif c == '7':
|
code = 55
|
elif c == '8':
|
code = 56
|
elif c == '9':
|
code = 57
|
win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0);
|
win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0);
|
|
def getLimitUpPrice(self, win):
|
hwnd = win32gui.GetDlgItem(win, 0x000006C8)
|
return self.getText(hwnd)
|
|
# 获取交易结果
|
def getTradeResultWin(self):
|
hWndList = []
|
win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
|
for hwnd in hWndList:
|
clsname = win32gui.GetClassName(hwnd)
|
if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
|
pos = win32gui.GetWindowRect(hwnd)
|
width = pos[2] - pos[0]
|
height = pos[3] - pos[1]
|
if 500 > width > 100 and 500 > height > 50 and width > height:
|
# 查找确定按钮
|
try:
|
sure = win32gui.GetDlgItem(hwnd, 0x00000002)
|
if sure > 0:
|
title = self.getText(sure)
|
if title == '确定':
|
return hwnd
|
except:
|
pass
|
return 0
|
|
def closeTradeResultDialog(self, win):
|
sure = win32gui.GetDlgItem(win, 0x00000002)
|
# 点击sure
|
win32gui.SendMessage(sure, win32con.WM_LBUTTONDOWN, 0, 0);
|
win32gui.SendMessage(sure, win32con.WM_LBUTTONUP, 0, 0);
|
|
def getTradeSuccessCode(self, win):
|
if win <= 0:
|
return ""
|
else:
|
code_hwnd = win32gui.GetDlgItem(win, 0x000003EC)
|
text = self.getText(code_hwnd)
|
text = text.split("合同编号:")[1]
|
code_str = ""
|
for i in text:
|
if 48 <= ord(i) < 58:
|
code_str += i
|
print(code_str)
|
return code_str
|
|
def buy(self, code, limit_up_price, win=0):
|
try:
|
logger_trade_gui.info("开始买入:code-{}".format(code))
|
if win < 1:
|
win = self.get_available_buy_win()
|
if win < 1:
|
raise Exception("无可用的交易窗口")
|
print("使用窗口", win)
|
t = time.time()
|
print(t)
|
start = int(round(t * 1000))
|
# 输入代码
|
# 代码输入框的控件ID:0x00000408
|
hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
|
# 名称 名称的控件ID:0x0000040C
|
hwnd_name = win32gui.GetDlgItem(win, 0x0000040C)
|
self.input_number(hwnd1, code)
|
# 最多等待2s钟
|
data_fill = False
|
for i in range(0, 500):
|
bufSize = win32gui.SendMessage(hwnd_name, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
|
print(i, bufSize)
|
if bufSize > 1:
|
data_fill = True
|
break;
|
time.sleep(0.004)
|
|
if not data_fill:
|
raise Exception("代码输入填充出错")
|
time.sleep(0.001)
|
# 验证涨停价
|
limit_up_price_now = self.getLimitUpPrice(win)
|
|
# TODO 测试,暂时不验证涨停价
|
if abs(float(limit_up_price_now) - float(limit_up_price)) >= 0.01:
|
error = "涨停价验证出错 {}-{}".format(limit_up_price, limit_up_price_now)
|
raise Exception(error)
|
|
# 开始交易,买入按钮ID:0x000003EE
|
# buy_hwnd = win32gui.GetDlgItem(win, 0x000003EE)
|
# win32gui.SendMessage(buy_hwnd, win32con.WM_LBUTTONDOWN, 0, 0);
|
# win32gui.SendMessage(buy_hwnd, win32con.WM_LBUTTONUP, 0, 0);
|
|
# 买入 快捷键B
|
win32gui.PostMessage(win, win32con.WM_KEYDOWN, 66, 0);
|
|
logger_trade_gui.info("执行买入结束:code-{} 耗时:{}".format(code, int(round(time.time() * 1000)) - start))
|
self.close_delegate_success_dialog()
|
# 循环读取下单结果,最多等待10s
|
# for i in range(0, 50):
|
# hwnd = self.getTradeResultWin()
|
# if hwnd > 0:
|
# time.sleep(0.2)
|
# code_str = self.getTradeSuccessCode(hwnd)
|
# t = time.time()
|
# print(t)
|
# end = int(round(t * 1000))
|
# print("买入耗时:", end - start)
|
# logger_trade_gui.info("获取委托单号:code-{} 单号-{} 整体耗时:{}".format(code, code_str, end - start))
|
# # 关闭交易结果弹框
|
# self.closeTradeResultDialog(hwnd)
|
# return code, code_str
|
# time.sleep(0.02)
|
return code, ""
|
# raise Exception("获取交易结果出错")
|
finally:
|
self.using_buy_wins.discard(win)
|
|
@async_call
|
def close_delegate_success_dialog(self):
|
for i in range(0, 50):
|
hwnd = self.getTradeResultWin()
|
if hwnd > 0:
|
time.sleep(0.2)
|
code_str = self.getTradeSuccessCode(hwnd)
|
t = time.time()
|
print(t)
|
end = int(round(t * 1000))
|
# logger_trade_gui.info("获取委托单号:code-{} 单号-{} 整体耗时:{}".format(code, code_str, end - start))
|
# 关闭交易结果弹框
|
self.closeTradeResultDialog(hwnd)
|
break
|
# return code, code_str
|
time.sleep(0.02)
|
|
# 撤销确认框
|
def getCancelBuySureWin(self):
|
hWndList = []
|
win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
|
for hwnd in hWndList:
|
clsname = win32gui.GetClassName(hwnd)
|
if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
|
pos = win32gui.GetWindowRect(hwnd)
|
width = pos[2] - pos[0]
|
height = pos[3] - pos[1]
|
if 500 > width > 100 and 500 > height > 50 and width > height:
|
# 查找确定按钮
|
try:
|
title = win32gui.FindWindowEx(hwnd, 0, "Static", "撤单确认")
|
if title > 0:
|
return hwnd
|
except:
|
pass
|
return 0
|
|
# 撤买
|
def cancel_buy(self, code):
|
self.buy_cancel_lock.acquire()
|
code_input = 0
|
try:
|
logger_trade_gui.info("开始撤单:code-{}".format(code))
|
win = self.cancel_win
|
if win <= 0:
|
raise Exception("无法找到取消委托窗口")
|
t = time.time()
|
print(t)
|
start = int(round(t * 1000))
|
print(win)
|
# 输入框控件ID 0x000003E9
|
code_input = win32gui.GetDlgItem(win, 0x00000996)
|
code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
|
# 刷新句柄
|
if code_input <= 0:
|
self.refresh_hwnds()
|
code_input = win32gui.GetDlgItem(win, 0x00000996)
|
code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
|
|
code_result = "-"
|
retry_count = 0
|
while code != code_result and retry_count < 5:
|
|
if retry_count > 0:
|
self.input_number(code_input, "")
|
|
time.sleep(0.01)
|
# 输入数字
|
self.input_number(code_input, code)
|
time.sleep(0.005)
|
# 检测输入是否正确
|
code_result = self.getText(code_input)
|
code_result = code_result.split(",")[0]
|
retry_count += 1
|
if code != code_result:
|
raise Exception("输入代码出错")
|
|
# 撤单快捷键X
|
time.sleep(0.01)
|
win32gui.PostMessage(win, win32con.WM_KEYDOWN, 0x00000058, 0x002D001);
|
win32gui.PostMessage(win, win32con.WM_CHAR, 0x00000078, 0x002D001);
|
win32gui.PostMessage(win, win32con.WM_KEYUP, 0x00000058, 0x002D001);
|
# win32gui.PostMessage(win, win32con.WM_KEYUP, 0x00000058, 0);
|
t = time.time()
|
print(t)
|
end = int(round(t * 1000))
|
print("耗时", end - start)
|
logger_trade_gui.info("撤单成功:code-{} 耗时:{}".format(code, end - start))
|
time.sleep(0.05)
|
finally:
|
self.buy_cancel_lock.release()
|
# 清空代码框
|
self.input_number(code_input, "")
|
|
|
if __name__ == '__main__':
|
try:
|
# THSGuiTrade.checkEnv();
|
# print("环境正常")
|
trade = THSGuiTrade();
|
print(id(trade))
|
# win = trade.get_available_buy_win()
|
# if win < 1:
|
# raise Exception("无可用的交易窗口")
|
# result = trade.buy("002564", "7.26")
|
# # print("交易成功")
|
# time.sleep(0.2)
|
trade.cancel_buy("000716")
|
except Exception as e:
|
print(e)
|