"""
|
同花顺交易操作工具
|
"""
|
|
import array
|
import logging
|
import threading
|
import time
|
import random
|
|
import win32gui
|
import win32con
|
|
from code_attribute import gpcode_manager
|
from db.redis_manager_delegate import RedisUtils
|
from ocr import ocr_util
|
from trade import l2_trade_util
|
from db import redis_manager_delegate as redis_manager
|
from log_module.log import *
|
from utils.tool import async_call
|
from utils import win32_util, capture_util, tool
|
|
|
class THSGuiTrade(object):
|
__instance = None
|
|
# 单例模式
|
def __new__(cls, *args, **kwargs):
|
if not cls.__instance:
|
cls.__instance = super(THSGuiTrade, cls).__new__(cls, *args, **kwargs)
|
# 初始化设置
|
# 获取交易窗口的锁
|
cls.__instance.buy_lock = threading.RLock()
|
cls.__instance.buy_cancel_locks = {}
|
cls.__instance.buy_win_list = cls.get_buy_wins()
|
# print("交易窗口", cls.__instance.buy_win_list)
|
cls.__instance.using_buy_wins = set()
|
cls.__instance.cancel_wins = cls.__instance.getCancelBuyWins()
|
return cls.__instance
|
|
# 刷新窗口句柄
|
def refresh_hwnds(self):
|
self.cancel_wins = self.__instance.getCancelBuyWins()
|
self.buy_win_list = self.get_buy_wins()
|
|
# 打开交易环境
|
def open_trade_env(self):
|
# 打开交易界面(F12)
|
|
pass
|
|
def get_available_buy_win(self):
|
self.buy_lock.acquire()
|
try:
|
if len(self.buy_win_list) == 0:
|
self.refresh_hwnds()
|
for win in self.buy_win_list:
|
if win not in self.using_buy_wins:
|
self.using_buy_wins.add(win)
|
return win
|
finally:
|
self.buy_lock.release()
|
return 0
|
|
@classmethod
|
def checkEnv(cls):
|
# 检测交易窗口
|
buy_wins = THSBuyWinManagerNew.get_buy_wins()
|
if len(buy_wins) < 10:
|
raise Exception("下单窗口最低需要10个")
|
|
# 检测撤单窗口
|
cancel_trade_wins = cls.getCancelBuyWins()
|
if len(cancel_trade_wins) <= 0:
|
raise Exception("委托撤销窗口未打开")
|
else:
|
pos = win32gui.GetWindowRect(cancel_trade_wins[0])
|
width = pos[2] - pos[0]
|
height = pos[3] - pos[1]
|
if width <= 0 or height <= 0:
|
raise Exception("委托撤销窗口被最小化")
|
|
@classmethod
|
def getText(cls, hwnd):
|
bufSize = win32gui.SendMessage(hwnd, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
|
buffer = array.array('b', b'\x00\x00' * bufSize)
|
win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer)
|
text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1)
|
return text.replace("\x00", "").strip()
|
|
@classmethod
|
def get_buy_wins(cls):
|
buy_win_list = []
|
hWndList = []
|
win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
|
for hwnd in hWndList:
|
clsname = win32gui.GetClassName(hwnd)
|
if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
|
pos = win32gui.GetWindowRect(hwnd)
|
width = pos[2] - pos[0]
|
height = pos[3] - pos[1]
|
if 500 > width > 100 and 500 > height > 50:
|
# 查找确定按钮
|
try:
|
buy_win = win32gui.GetDlgItem(hwnd, 0x000003EE)
|
if buy_win > 0 and cls.getText(buy_win) == '一键买入[B]':
|
buy_win_list.append(hwnd)
|
except Exception as e:
|
print(e)
|
pass
|
return buy_win_list
|
|
# 获取撤单窗口
|
@classmethod
|
def getCancelBuyWin(cls, code=None):
|
# 获取代码位分配的下单窗口
|
if code:
|
trade_win = THSBuyWinManagerNew.get_distributed_code_win(code)
|
if trade_win and win32gui.IsWindowVisible(trade_win):
|
top_win = win32_util.get_top_window(trade_win)
|
if cls.getText(top_win) == '专业版下单' and win32gui.IsWindowVisible(top_win):
|
return top_win
|
|
wins = cls.getCancelBuyWins()
|
if wins:
|
return wins[0]
|
else:
|
return 0
|
|
@classmethod
|
def getCancelBuyWins(cls):
|
cancel_buy_wins = set()
|
hWndList = []
|
win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
|
for hwnd in hWndList:
|
clsname = win32gui.GetClassName(hwnd)
|
if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
|
try:
|
if cls.getText(hwnd) == '专业版下单':
|
pos = win32gui.GetWindowRect(hwnd)
|
width = pos[2] - pos[0]
|
height = pos[3] - pos[1]
|
if width > 200 and height > 200:
|
cancel_buy_wins.add(hwnd)
|
except:
|
pass
|
return list(cancel_buy_wins)
|
|
def input_number(self, hwnd, num_str):
|
for i in range(10):
|
# win32gui.SendMessage(hwnd, 258, 8, 0);
|
win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, 8, 0)
|
win32gui.PostMessage(hwnd, win32con.WM_KEYUP, 8, 0)
|
# delete
|
for c in num_str:
|
code = -1
|
lp = 0
|
if c == '.':
|
code = 110
|
win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0)
|
win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0)
|
continue
|
elif c == '0':
|
code = 48
|
elif c == '1':
|
code = 49
|
elif c == '2':
|
code = 50
|
elif c == '3':
|
code = 51
|
elif c == '4':
|
code = 52
|
elif c == '5':
|
code = 53
|
elif c == '6':
|
code = 54
|
elif c == '7':
|
code = 55
|
elif c == '8':
|
code = 56
|
elif c == '9':
|
code = 57
|
win32gui.SendMessage(hwnd, win32con.WM_KEYDOWN, code, 0)
|
win32gui.PostMessage(hwnd, win32con.WM_KEYUP, code, 0)
|
|
def getLimitUpPrice(self, win):
|
hwnd = win32gui.GetDlgItem(win, 0x000006C8)
|
text_ = self.getText(hwnd)
|
return text_.replace("涨停:", "")
|
|
# 获取交易结果
|
def getTradeResultWin(self):
|
hWndList = []
|
win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
|
for hwnd in hWndList:
|
clsname = win32gui.GetClassName(hwnd)
|
if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
|
pos = win32gui.GetWindowRect(hwnd)
|
width = pos[2] - pos[0]
|
height = pos[3] - pos[1]
|
if 500 > width > 100 and 500 > height > 50 and width > height:
|
# 查找确定按钮
|
try:
|
sure = win32gui.GetDlgItem(hwnd, 0x00000002)
|
if sure > 0:
|
title = self.getText(sure)
|
if title == '确定':
|
return hwnd
|
except:
|
pass
|
return 0
|
|
def closeTradeResultDialog(self, win):
|
sure = win32gui.GetDlgItem(win, 0x00000002)
|
# 点击sure
|
win32gui.SendMessage(sure, win32con.WM_LBUTTONDOWN, 0, 0)
|
win32gui.SendMessage(sure, win32con.WM_LBUTTONUP, 0, 0)
|
|
def getTradeSuccessCode(self, win):
|
if win <= 0:
|
return ""
|
else:
|
code_hwnd = win32gui.GetDlgItem(win, 0x000003EC)
|
text = self.getText(code_hwnd)
|
text = text.split("合同编号:")[1]
|
code_str = ""
|
for i in text:
|
if 48 <= ord(i) < 58:
|
code_str += i
|
print(code_str)
|
return code_str
|
|
def buy(self, code, limit_up_price, win=0):
|
if not constant.TRADE_ENABLE:
|
return
|
try:
|
logger_trade_gui.info("开始买入:code-{}".format(code))
|
if win < 1:
|
win = THSBuyWinManagerNew.get_distributed_code_win(code) # self.get_available_buy_win()
|
if win is None or win < 1:
|
raise Exception("无可用的交易窗口")
|
print("使用窗口", win)
|
t = time.time()
|
print(t)
|
start = int(round(t * 1000))
|
# # 输入代码
|
# # 代码输入框的控件ID:0x00000408
|
# hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
|
# # 名称 名称的控件ID:0x0000040C
|
# hwnd_name = win32gui.GetDlgItem(win, 0x0000040C)
|
# self.input_number(hwnd1, code)
|
# # 最多等待2s钟
|
# data_fill = False
|
# for i in range(0, 500):
|
# bufSize = win32gui.SendMessage(hwnd_name, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
|
# print(i, bufSize)
|
# if bufSize > 1:
|
# data_fill = True
|
# break;
|
# time.sleep(0.004)
|
#
|
# if not data_fill:
|
# raise Exception("代码输入填充出错")
|
# time.sleep(0.001)
|
# 验证涨停价
|
limit_up_price_now = self.getLimitUpPrice(win)
|
trade_win = THSBuyWinManagerNew.get_trade_win(win)
|
# if not trade_win:
|
# error = "交易子窗口查找失败 {}".format(code)
|
# raise Exception(error)
|
|
# TODO 暂时不验证涨停价
|
# if not constant.TEST:
|
# if abs(float(limit_up_price_now) - float(limit_up_price)) >= 0.001:
|
# error = "涨停价验证出错 {}-{}".format(limit_up_price, limit_up_price_now)
|
# raise Exception(error)
|
|
# 开始交易,买入按钮ID:0x000003EE
|
# buy_hwnd = win32gui.GetDlgItem(win, 0x000003EE)
|
# win32gui.SendMessage(buy_hwnd, win32con.WM_LBUTTONDOWN, 0, 0);
|
# win32gui.SendMessage(buy_hwnd, win32con.WM_LBUTTONUP, 0, 0);
|
|
# 买入 快捷键B
|
# 获取交易win
|
win32gui.PostMessage(win, win32con.WM_KEYDOWN, 66, 0)
|
|
logger_trade_gui.info("执行买入结束:code-{} 耗时:{}".format(code, int(round(time.time() * 1000)) - start))
|
# 过时
|
# self.close_delegate_success_dialog()
|
|
self.refresh_data()
|
|
# 循环读取下单结果,最多等待10s
|
# for i in range(0, 50):
|
# hwnd = self.getTradeResultWin()
|
# if hwnd > 0:
|
# time.sleep(0.2)
|
# code_str = self.getTradeSuccessCode(hwnd)
|
# t = time.time()
|
# print(t)
|
# end = int(round(t * 1000))
|
# print("买入耗时:", end - start)
|
# logger_trade_gui.info("获取委托单号:code-{} 单号-{} 整体耗时:{}".format(code, code_str, end - start))
|
# # 关闭交易结果弹框
|
# self.closeTradeResultDialog(hwnd)
|
# return code, code_str
|
# time.sleep(0.02)
|
return code, ""
|
# raise Exception("获取交易结果出错")
|
finally:
|
self.using_buy_wins.discard(win)
|
|
@async_call
|
def close_delegate_success_dialog(self):
|
for i in range(0, 50):
|
hwnd = self.getTradeResultWin()
|
if hwnd > 0:
|
time.sleep(0.2)
|
code_str = self.getTradeSuccessCode(hwnd)
|
t = time.time()
|
print(t)
|
end = int(round(t * 1000))
|
# logger_trade_gui.info("获取委托单号:code-{} 单号-{} 整体耗时:{}".format(code, code_str, end - start))
|
# 关闭交易结果弹框
|
self.closeTradeResultDialog(hwnd)
|
break
|
# return code, code_str
|
time.sleep(0.02)
|
|
# 撤销确认框
|
def getCancelBuySureWin(self):
|
hWndList = []
|
win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
|
for hwnd in hWndList:
|
clsname = win32gui.GetClassName(hwnd)
|
if clsname == '#32770' and win32gui.IsWindowVisible(hwnd):
|
pos = win32gui.GetWindowRect(hwnd)
|
width = pos[2] - pos[0]
|
height = pos[3] - pos[1]
|
if 500 > width > 100 and 500 > height > 50 and width > height:
|
# 查找确定按钮
|
try:
|
title = win32gui.FindWindowEx(hwnd, 0, "Static", "撤单确认")
|
if title > 0:
|
return hwnd
|
except:
|
pass
|
return 0
|
|
def __get_code_input(self, code=None):
|
win = self.getCancelBuyWin(code)
|
if win <= 0:
|
raise Exception("无法找到取消委托窗口")
|
t = time.time()
|
print(t)
|
start = int(round(t * 1000))
|
print(win)
|
# 输入框控件ID 0x000003E9
|
code_input = win32gui.GetDlgItem(win, 0x00000996)
|
code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
|
# 刷新句柄
|
if code_input <= 0:
|
self.refresh_hwnds()
|
code_input = win32gui.GetDlgItem(win, 0x00000996)
|
code_input = win32gui.FindWindowEx(code_input, 0, "Edit", None)
|
return code_input, win
|
|
# 撤买
|
def cancel_buy(self, code):
|
if not constant.TRADE_ENABLE:
|
return
|
main_win = self.getCancelBuyWin(code)
|
if main_win <= 0:
|
raise Exception("尚未找到交易窗口")
|
if main_win not in self.buy_cancel_locks:
|
self.buy_cancel_locks[main_win] = threading.RLock()
|
|
self.buy_cancel_locks[main_win].acquire()
|
logger_trade_gui.info("开始获取撤单控件:code-{}".format(code))
|
code_input, win = self.__get_code_input(code)
|
try:
|
logger_trade_gui.info("开始撤单:code-{}".format(code))
|
start = int(round(time.time() * 1000))
|
code_result = "-"
|
retry_count = 0
|
while code != code_result and retry_count < 5:
|
code_result = self.getText(code_input)
|
if retry_count > 0 or len(code_result) > 0:
|
self.input_number(code_input, "")
|
|
time.sleep(0.01)
|
# 输入数字
|
self.input_number(code_input, code)
|
time.sleep(0.005)
|
# 检测输入是否正确
|
code_result = self.getText(code_input)
|
code_result = code_result.split(",")[0]
|
retry_count += 1
|
if code != code_result:
|
self.input_number(code_input, "")
|
raise Exception("输入代码出错")
|
|
# 撤单快捷键X
|
time.sleep(0.01)
|
win32gui.PostMessage(win, win32con.WM_KEYDOWN, 0x00000058, 0x002D001)
|
win32gui.PostMessage(win, win32con.WM_CHAR, 0x00000078, 0x002D001)
|
win32gui.PostMessage(win, win32con.WM_KEYUP, 0x00000058, 0x002D001)
|
# win32gui.PostMessage(win, win32con.WM_KEYUP, 0x00000058, 0);
|
t = time.time()
|
print(t)
|
end = int(round(t * 1000))
|
print("耗时", end - start)
|
logger_trade_gui.info("撤单成功:code-{} 耗时:{}".format(code, end - start))
|
time.sleep(0.03)
|
finally:
|
self.buy_cancel_locks[main_win].release()
|
# 清空代码框
|
self.input_number(code_input, "")
|
# 再次清除代码框
|
self.input_number(code_input, "")
|
|
# 交易盘口中的撤买
|
def cancel_buy_again(self, code):
|
if not constant.TRADE_ENABLE:
|
return
|
win = THSBuyWinManagerNew.get_distributed_code_win(code)
|
if win is None or win <= 0:
|
raise Exception("没找到分配的交易窗口")
|
cancel_btn_hwnd = win32gui.FindWindowEx(win, 0, "Button", "撤单")
|
if cancel_btn_hwnd <= 0:
|
raise Exception("没有找到撤单按钮")
|
|
if not win32gui.IsWindowVisible(cancel_btn_hwnd):
|
raise Exception("撤单按钮不可见")
|
THSGuiUtil.click(cancel_btn_hwnd)
|
|
# 刷新交易窗口数据
|
@async_call
|
def refresh_data(self):
|
# 获取到专业下单页面
|
wins = self.getCancelBuyWins()
|
if wins:
|
for win in wins:
|
refresh_btn = None
|
child_win = None
|
for i in range(0, 20):
|
child_win = win32gui.FindWindowEx(win, child_win, "#32770", None)
|
if not child_win:
|
break
|
if not win32gui.IsWindowVisible(child_win):
|
continue
|
temp = win32gui.FindWindowEx(child_win, None, "Button", "还原")
|
if temp:
|
refresh_btn = win32gui.GetDlgItem(child_win, 0x00000457)
|
break
|
if refresh_btn:
|
# 点击刷新
|
THSGuiUtil.click(refresh_btn)
|
|
for w in wins:
|
if w not in self.buy_cancel_locks:
|
self.buy_cancel_locks[w] = threading.RLock()
|
|
|
class THSGuiUtil:
|
@classmethod
|
def getText(cls, hwnd):
|
bufSize = win32gui.SendMessage(hwnd, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
|
try:
|
buffer = array.array('b', b'\x00\x00' * bufSize)
|
win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer)
|
text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1)
|
return text.replace("\x00", "").strip()
|
except:
|
return ""
|
|
# 添加下单窗口
|
@classmethod
|
def add_buy_win(cls):
|
buy_wins = THSGuiTrade().get_buy_wins()
|
if len(buy_wins) < 1:
|
raise Exception("没有买入窗口")
|
if len(buy_wins) >= 10:
|
raise Exception("最多只能添加10个下单框")
|
# 增加窗口按钮的ID:00005ED
|
win = buy_wins[-1]
|
add_btn = win32gui.GetDlgItem(win, 0x000005ED)
|
if add_btn <= 0:
|
raise Exception("没有找到添加按钮")
|
try:
|
win32gui.SetForegroundWindow(win)
|
except:
|
pass
|
cls.click(add_btn)
|
for i in range(0, 30):
|
new_buy_wins = THSGuiTrade().get_buy_wins()
|
if len(new_buy_wins) - len(buy_wins) >= 1:
|
# 求差集
|
list_ = list(set(new_buy_wins).difference(set(buy_wins)))
|
return list_[0]
|
else:
|
time.sleep(0.01)
|
raise Exception("未添加成功")
|
|
# 窗口是否存在
|
@classmethod
|
def is_win_exist(cls, win):
|
try:
|
result = win32gui.IsWindowVisible(win)
|
if result:
|
return True
|
else:
|
return False
|
except:
|
return False
|
|
# 窗口是否正在展示
|
@classmethod
|
def is_win_show(cls, win):
|
try:
|
result = win32gui.GetWindowRect(win)
|
if result[2] - result[0] > 0 and result[3] - result[1] > 0:
|
return True
|
else:
|
return False
|
except:
|
return False
|
|
@classmethod
|
def click(cls, control):
|
win32gui.SendMessage(control, win32con.WM_LBUTTONDOWN, 0, 0)
|
win32gui.SendMessage(control, win32con.WM_LBUTTONUP, 0, 0)
|
|
# 清除买入窗口代码
|
@classmethod
|
def clear_buy_window_code(cls, win):
|
if not cls.is_win_exist(win):
|
raise Exception("窗口不存在")
|
|
hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
|
if hwnd1 <= 0:
|
raise Exception("编辑控件没找到")
|
THSGuiTrade().input_number(hwnd1, "")
|
|
# 设置买入窗口代码
|
@classmethod
|
def set_buy_window_code(cls, win, code):
|
if not cls.is_win_exist(win):
|
raise Exception("窗口不存在")
|
try:
|
win32gui.SetForegroundWindow(win)
|
except:
|
pass
|
hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
|
if hwnd1 <= 0:
|
raise Exception("编辑控件没找到")
|
THSGuiTrade().input_number(hwnd1, code)
|
|
|
# 同花顺买入窗口管理器
|
class THSBuyWinManagerNew:
|
redisManager = redis_manager.RedisManager(2)
|
|
@classmethod
|
def get_buy_wins(cls):
|
buy_win_list = []
|
hWndList = []
|
main_hwnds = []
|
win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
|
for hwnd in hWndList:
|
if win32gui.IsWindowVisible(hwnd) and THSGuiUtil.getText(hwnd) == "专业版下单":
|
main_hwnds.append(hwnd)
|
if not main_hwnds:
|
raise Exception("专业版下单未打开")
|
child_win = None
|
for main_hwnd in main_hwnds:
|
for i in range(0, 20):
|
child_win = win32gui.FindWindowEx(main_hwnd, child_win, "#32770", None)
|
if not child_win:
|
break
|
if not win32gui.IsWindowVisible(child_win):
|
continue
|
temp = win32gui.FindWindowEx(child_win, None, "Button", "撤单")
|
if temp:
|
buy_win_list.append(child_win)
|
return buy_win_list
|
|
@classmethod
|
def get_trade_win(cls, win):
|
# 获取交易窗口
|
child_child_win = None
|
for j in range(0, 10):
|
child_child_win = win32gui.FindWindowEx(win, child_child_win, "#32770", None)
|
if not child_child_win:
|
break
|
temp = win32gui.FindWindowEx(child_child_win, None, "Edit", None)
|
if temp:
|
return child_child_win
|
return None
|
|
@classmethod
|
def __get_redis(cls):
|
return cls.redisManager.getRedis()
|
|
# 保存窗口代码分配
|
@classmethod
|
def __save_code_win(cls, code, win):
|
key = "buywin_distribute-{}".format(code)
|
RedisUtils.setex(cls.__get_redis(), key, tool.get_expire(), win)
|
|
# 获取窗口分配的代码
|
@classmethod
|
def __get_code_win(cls, code):
|
key = "buywin_distribute-{}".format(code)
|
win = RedisUtils.get(cls.__get_redis(), key)
|
if win is not None:
|
return int(win)
|
return None
|
|
# 删除代码窗口分配
|
@classmethod
|
def __del_code_win(cls, code):
|
key = "buywin_distribute-{}".format(code)
|
RedisUtils.delete(cls.__get_redis(), key)
|
|
# 获取所有已经分配窗口的代码
|
@classmethod
|
def __get_distributed_win_codes(cls):
|
key = "buywin_distribute-*"
|
keys = RedisUtils.keys(cls.__get_redis(), key)
|
codes = []
|
for k in keys:
|
codes.append(k.replace("buywin_distribute-", ""))
|
return codes
|
|
# 获取可用的窗口
|
@classmethod
|
def __get_available_win(cls):
|
# 是否有可用的还未分配的窗口
|
key = "buywin_distribute-*"
|
keys = RedisUtils.keys(cls.__get_redis(), key)
|
win_list = cls.get_buy_wins()
|
if len(win_list) < 1:
|
raise Exception("必须要有一个买入窗口")
|
win_set = set(win_list)
|
for k in keys:
|
win = int(RedisUtils.get(cls.__get_redis(),k))
|
if win in win_set:
|
win_set.remove(win)
|
if len(win_set) > 0:
|
win_list = list(win_set)
|
random.shuffle(win_list)
|
return win_list[0]
|
|
# 没有剩余的窗口,新增加窗口
|
raise Exception("没有剩余窗口")
|
|
# 为代码分配窗口
|
@classmethod
|
def distribute_win_for_code(cls, code, code_name):
|
# 获取是否已经分配
|
win = cls.__get_code_win(code)
|
if win is not None:
|
# 已经分配的窗口是否有效
|
if THSGuiUtil.is_win_exist(win):
|
# 填充代码
|
THSGuiUtil.set_buy_window_code(cls.get_trade_win(win), code)
|
return win
|
|
# 获取可用的窗口
|
win = cls.__get_available_win()
|
if win is None:
|
logger_buy_win_distibute.error(f"无可用窗口:{code}")
|
raise Exception("窗口已经分配完毕,无可用窗口")
|
# 保存窗口分配信息
|
cls.__save_code_win(code, win)
|
# 设置代码多试几次
|
is_success = False
|
for i in range(0, 3):
|
THSGuiUtil.set_buy_window_code(cls.get_trade_win(win), code)
|
time.sleep(0.5)
|
code_name_win = cls.__get_code_name(win)
|
if code_name == code_name_win or code_name_win.find(code_name) > -1:
|
if cls.__is_buy_limit_up_price(win):
|
is_success = True
|
break
|
else:
|
cls.__del_code_win(code)
|
THSGuiUtil.set_buy_window_code(cls.get_trade_win(win), "")
|
raise Exception("不是买涨停价")
|
if is_success:
|
logger_buy_win_distibute.info(f"新分配窗口成功:{code}-{win}")
|
else:
|
logger_buy_win_distibute.info(f"新分配窗口失败:{code}-{win}")
|
return win
|
|
# 删除代码窗口分配
|
@classmethod
|
def cancel_distribute_win_for_code(cls, code):
|
win = cls.__get_code_win(code)
|
if win is not None:
|
# 清除代码
|
try:
|
THSGuiUtil.clear_buy_window_code(win)
|
except:
|
pass
|
cls.__del_code_win(code)
|
|
# 获取代码已经分配的窗口
|
@classmethod
|
def get_distributed_code_win(cls, code):
|
win = cls.__get_code_win(code)
|
if not THSGuiUtil.is_win_exist(win):
|
# 删除不存在的窗口
|
cls.__del_code_win(code)
|
return None
|
return win
|
|
# 获取已分配的交易框信息
|
@classmethod
|
def get_distributed_code_wins(cls):
|
key = "buywin_distribute-*"
|
keys = RedisUtils.keys(cls.__get_redis(), key)
|
results = []
|
for k in keys:
|
code = k.split("-")[-1]
|
win = RedisUtils.get(cls.__get_redis(), k)
|
results.append((code, win))
|
return results
|
|
# 获取代码名称
|
@classmethod
|
def __get_code_name(cls, win):
|
trade_win = cls.get_trade_win(win)
|
if trade_win is None:
|
return None
|
code_name_win = win32gui.GetDlgItem(trade_win, 0x000005C2)
|
name = THSGuiUtil.getText(code_name_win)
|
if name is not None:
|
name = name.replace(" ", "")
|
return tool.strQ2B(name)
|
|
# 是否是涨停价
|
@classmethod
|
def __is_buy_limit_up_price(cls, win):
|
trade_win = cls.get_trade_win(win)
|
if trade_win is None:
|
return None
|
price_win = win32gui.GetDlgItem(trade_win, 0x00000409)
|
ocr_result = ocr_util.OcrUtil.ocr_with_key(capture_util.window_capture(price_win), "涨停价|张停价")
|
if ocr_result:
|
return True
|
return False
|
|
@classmethod
|
def fill_codes(cls, codes):
|
name_codes = gpcode_manager.get_name_codes()
|
codes_ = gpcode_manager.get_gp_list()
|
# 先删除没有的代码
|
old_codes = cls.__get_distributed_win_codes()
|
for code in old_codes:
|
if code not in codes_:
|
cls.cancel_distribute_win_for_code(code)
|
|
# 删除禁止的代码
|
new_codes = []
|
new_delete_codes = []
|
for code in codes:
|
if not l2_trade_util.is_in_forbidden_trade_codes(code):
|
# 取消分配
|
new_codes.append(code)
|
else:
|
new_delete_codes.append(code)
|
|
cancel_wins = THSGuiTrade.getCancelBuyWins()
|
add_codes_num = len(cancel_wins) * 10
|
add_codes = new_codes[0:add_codes_num]
|
del_codes = new_codes[add_codes_num:]
|
del_codes.extend(new_delete_codes)
|
|
for code in del_codes:
|
cls.cancel_distribute_win_for_code(code)
|
|
for code in add_codes:
|
# 已经加入进去的不做操作
|
if code in old_codes:
|
# 校验代码是否填充对
|
win = cls.__get_code_win(code)
|
if not THSGuiUtil.is_win_exist(win):
|
cls.cancel_distribute_win_for_code(code)
|
else:
|
code_name = cls.__get_code_name(win)
|
# '深振业A'
|
if name_codes.get(code_name) != code:
|
cls.cancel_distribute_win_for_code(code)
|
continue
|
try:
|
win = cls.distribute_win_for_code(code, gpcode_manager.get_code_name(code))
|
print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
|
except Exception as e:
|
logging.exception(e)
|
|
|
# 根据涨幅高低分配交易窗口
|
def re_distribute_buy_win(codes):
|
THSBuyWinManagerNew.fill_codes(codes)
|
|
|
class GUITest:
|
def test_distribute(self):
|
codes = ["300396", "688656", "688029", "688787", "688016", "002659", "002777", "603318", "000333", "003004",
|
"002882", "300014", "688981", "002531"]
|
|
for i in range(10, len(codes)):
|
THSBuyWinManagerNew.cancel_distribute_win_for_code(codes[i])
|
|
for i in range(0, 10):
|
win = THSBuyWinManagerNew.distribute_win_for_code(codes[i])
|
time.sleep(1)
|
print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
|
|
random.shuffle(codes)
|
print(codes[0:10])
|
for i in range(10, len(codes)):
|
THSBuyWinManagerNew.cancel_distribute_win_for_code(codes[i])
|
|
for i in range(0, 10):
|
win = THSBuyWinManagerNew.distribute_win_for_code(codes[i])
|
time.sleep(1)
|
print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
|
|
# THSBuyWinManager.cancel_distribute_win_for_code("600125")
|
|
|
if __name__ == '__main__':
|
# try:
|
# THSGuiTrade().cancel_buy_again("000637")
|
# except Exception as e:
|
# print(e)
|
print(ocr_util.OcrUtil.ocr_with_key(capture_util.window_capture(0x000314EA), "涨停价|张停价"))
|
|
# THSGuiTrade().buy("600613", 10.29)
|