Administrator
2022-10-21 892b50e242e3c59a738b92dfdfee1bf1ff8932f2
trade_gui.py
@@ -5,12 +5,16 @@
import array
import threading
import time
import random
import win32gui
import win32api
import win32con
import global_util
import gpcode_manager
import redis_manager
import tool
from log import *
from threading import Thread
@@ -66,9 +70,9 @@
    @classmethod
    def checkEnv(cls):
        # 检测交易窗口
        buy_wins = cls.get_buy_wins()
        if len(buy_wins) < 3:
            raise Exception("闪电买入窗口最低需要3个")
        buy_wins = THSBuyWinManagerNew.get_buy_wins()
        if len(buy_wins) < 10:
            raise Exception("下单窗口最低需要10个")
        # 检测撤单窗口
        cancel_trade_win = cls.getCancelBuyWin()
@@ -169,7 +173,8 @@
    def getLimitUpPrice(self, win):
        hwnd = win32gui.GetDlgItem(win, 0x000006C8)
        return self.getText(hwnd)
        text_ = self.getText(hwnd)
        return text_.replace("涨停:", "")
    # 获取交易结果
    def getTradeResultWin(self):
@@ -217,36 +222,40 @@
        try:
            logger_trade_gui.info("开始买入:code-{}".format(code))
            if win < 1:
                win = self.get_available_buy_win()
                if win < 1:
                win = THSBuyWinManagerNew.get_distributed_code_win(code)  # self.get_available_buy_win()
                if win is None or win < 1:
                    raise Exception("无可用的交易窗口")
            print("使用窗口", win)
            t = time.time()
            print(t)
            start = int(round(t * 1000))
            # 输入代码
            # 代码输入框的控件ID:0x00000408
            hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
            # 名称 名称的控件ID:0x0000040C
            hwnd_name = win32gui.GetDlgItem(win, 0x0000040C)
            self.input_number(hwnd1, code)
            # 最多等待2s钟
            data_fill = False
            for i in range(0, 500):
                bufSize = win32gui.SendMessage(hwnd_name, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
                print(i, bufSize)
                if bufSize > 1:
                    data_fill = True
                    break;
                time.sleep(0.004)
            if not data_fill:
                raise Exception("代码输入填充出错")
            time.sleep(0.001)
            # # 输入代码
            # # 代码输入框的控件ID:0x00000408
            # hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
            # # 名称 名称的控件ID:0x0000040C
            # hwnd_name = win32gui.GetDlgItem(win, 0x0000040C)
            # self.input_number(hwnd1, code)
            # # 最多等待2s钟
            # data_fill = False
            # for i in range(0, 500):
            #     bufSize = win32gui.SendMessage(hwnd_name, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
            #     print(i, bufSize)
            #     if bufSize > 1:
            #         data_fill = True
            #         break;
            #     time.sleep(0.004)
            #
            # if not data_fill:
            #     raise Exception("代码输入填充出错")
            # time.sleep(0.001)
            # 验证涨停价
            limit_up_price_now = self.getLimitUpPrice(win)
            trade_win = THSBuyWinManagerNew.get_trade_win(win)
            # if not trade_win:
            #     error = "交易子窗口查找失败 {}".format(code)
            #     raise Exception(error)
            # 测试,暂时不验证涨停价
            # TODO 暂时不验证涨停价
            if not global_util.TEST:
                if abs(float(limit_up_price_now) - float(limit_up_price)) >= 0.01:
                    error = "涨停价验证出错 {}-{}".format(limit_up_price, limit_up_price_now)
@@ -258,6 +267,7 @@
            # win32gui.SendMessage(buy_hwnd, win32con.WM_LBUTTONUP, 0, 0);
            # 买入 快捷键B
            # 获取交易win
            win32gui.PostMessage(win, win32con.WM_KEYDOWN, 66, 0);
            logger_trade_gui.info("执行买入结束:code-{} 耗时:{}".format(code, int(round(time.time() * 1000)) - start))
@@ -380,18 +390,432 @@
            self.input_number(code_input, "")
if __name__ == '__main__':
class THSGuiUtil:
    @classmethod
    def getText(cls, hwnd):
        bufSize = win32gui.SendMessage(hwnd, win32con.WM_GETTEXTLENGTH, 0, 0) + 1
        buffer = array.array('b', b'\x00\x00' * bufSize)
        win32gui.SendMessage(hwnd, win32con.WM_GETTEXT, bufSize, buffer)
        text = win32gui.PyGetString(buffer.buffer_info()[0], bufSize - 1)
        return text.replace("\x00", "").strip();
    # 添加下单窗口
    @classmethod
    def add_buy_win(cls):
        buy_wins = THSGuiTrade().get_buy_wins()
        if len(buy_wins) < 1:
            raise Exception("没有买入窗口")
        if len(buy_wins) >= 10:
            raise Exception("最多只能添加10个下单框")
        # 增加窗口按钮的ID:00005ED
        win = buy_wins[-1]
        add_btn = win32gui.GetDlgItem(win, 0x000005ED)
        if add_btn <= 0:
            raise Exception("没有找到添加按钮")
    try:
        # THSGuiTrade.checkEnv();
        # print("环境正常")
        trade = THSGuiTrade();
        print(id(trade))
        # win = trade.get_available_buy_win()
        # if win < 1:
        #     raise Exception("无可用的交易窗口")
        # result = trade.buy("002564", "7.26")
        # # print("交易成功")
        # time.sleep(0.2)
        trade.cancel_buy("000716")
    except Exception as e:
        print(e)
            win32gui.SetForegroundWindow(win)
        except:
            pass
        cls.click(add_btn)
        for i in range(0, 30):
            new_buy_wins = THSGuiTrade().get_buy_wins()
            if len(new_buy_wins) - len(buy_wins) >= 1:
                # 求差集
                list_ = list(set(new_buy_wins).difference(set(buy_wins)))
                return list_[0]
            else:
                time.sleep(0.01)
        raise Exception("未添加成功")
    # 窗口是否存在
    @classmethod
    def is_win_exist(cls, win):
        try:
            result = win32gui.IsWindowVisible(win)
            if result:
                return True
            else:
                return False
        except:
            return False
    # 窗口是否正在展示
    @classmethod
    def is_win_show(cls, win):
        try:
            result = win32gui.GetWindowRect(win)
            if result[2] - result[0] > 0 and result[3] - result[1] > 0:
                return True
            else:
                return False
        except:
            return False
    @classmethod
    def click(cls, control):
        win32gui.SendMessage(control, win32con.WM_LBUTTONDOWN, 0, 0)
        win32gui.SendMessage(control, win32con.WM_LBUTTONUP, 0, 0)
    # 清除买入窗口代码
    @classmethod
    def clear_buy_window_code(cls, win):
        if not cls.is_win_exist(win):
            raise Exception("窗口不存在")
        hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
        if hwnd1 <= 0:
            raise Exception("编辑控件没找到")
        THSGuiTrade().input_number(hwnd1, "")
    # 设置买入窗口代码
    @classmethod
    def set_buy_window_code(cls, win, code):
        if not cls.is_win_exist(win):
            raise Exception("窗口不存在")
        try:
            win32gui.SetForegroundWindow(win)
        except:
            pass
        hwnd1 = win32gui.GetDlgItem(win, 0x00000408)
        if hwnd1 <= 0:
            raise Exception("编辑控件没找到")
        THSGuiTrade().input_number(hwnd1, code)
# 过时 同花顺买入窗口管理器
class __THSBuyWinManager:
    redisManager = redis_manager.RedisManager(2)
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    # 保存窗口代码分配
    @classmethod
    def __save_code_win(cls, code, win):
        key = "buywin_distribute-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), win)
    # 获取窗口分配的代码
    @classmethod
    def __get_code_win(cls, code):
        key = "buywin_distribute-{}".format(code)
        win = cls.__get_redis().get(key)
        if win is not None:
            return int(win)
        return None
    # 删除代码窗口分配
    @classmethod
    def __del_code_win(cls, code):
        key = "buywin_distribute-{}".format(code)
        cls.__get_redis().delete(key)
    # 获取所有已经分配窗口的代码
    @classmethod
    def __get_distributed_win_codes(cls):
        key = "buywin_distribute-*"
        keys = cls.__get_redis().keys(key)
        codes = []
        for k in keys:
            codes.append(k.replace("buywin_distribute-", ""))
        return codes
    # 获取可用的窗口
    @classmethod
    def __get_available_win(cls):
        # 是否有可用的还未分配的窗口
        key = "buywin_distribute-*"
        keys = cls.__get_redis().keys(key)
        win_list = THSGuiTrade().get_buy_wins()
        if len(win_list) < 1:
            raise Exception("必须要有一个买入窗口")
        win_set = set(win_list)
        for k in keys:
            win = int(cls.__get_redis().get(k))
            if win in win_set:
                win_set.remove(win)
        if len(win_set) > 0:
            return win_set.pop()
        # 没有剩余的窗口,新增加窗口
        win = THSGuiUtil.add_buy_win()
        if win:
            return win
        else:
            raise Exception("新增窗口失败")
    # 为代码分配窗口
    @classmethod
    def distribute_win_for_code(cls, code):
        # 获取是否已经分配
        win = cls.__get_code_win(code)
        if win is not None:
            # 已经分配的窗口是否有效
            if THSGuiUtil.is_win_exist(win):
                # 填充代码
                THSGuiUtil.set_buy_window_code(win, code)
                return win
        # 获取可用的窗口
        win = cls.__get_available_win()
        if win is None:
            raise Exception("窗口已经分配完毕,无可用窗口")
        # 保存窗口分配信息
        cls.__save_code_win(code, win)
        THSGuiUtil.set_buy_window_code(win, code)
        return win
    # 删除代码窗口分配
    @classmethod
    def cancel_distribute_win_for_code(cls, code):
        win = cls.__get_code_win(code)
        if win is not None:
            # 清除代码
            THSGuiUtil.clear_buy_window_code(win)
        cls.__del_code_win(code)
    # 获取代码已经分配的窗口
    @classmethod
    def get_distributed_code_win(cls, code):
        win = cls.__get_code_win(code)
        if not THSGuiUtil.is_win_exist(win):
            # 删除不存在的窗口
            cls.__del_code_win(code)
            return None
        return win
    @classmethod
    def fill_codes(cls, codes):
        codes_ = gpcode_manager.get_gp_list()
        # 先删除没有的代码
        old_codes = cls.__get_distributed_win_codes()
        for code in old_codes:
            if code not in codes_:
                cls.cancel_distribute_win_for_code(code)
        add_codes = codes[0:10]
        del_codes = codes[10:]
        for code in del_codes:
            cls.cancel_distribute_win_for_code(code)
        for code in add_codes:
            # 已经加入进去的不做操作
            if code in old_codes:
                continue
            win = cls.distribute_win_for_code(code)
            print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
# 同花顺买入窗口管理器
class THSBuyWinManagerNew:
    redisManager = redis_manager.RedisManager(2)
    @classmethod
    def get_buy_wins(cls):
        buy_win_list = []
        hWndList = []
        main_hwnd = None
        win32gui.EnumWindows(lambda hWnd, param: param.append(hWnd), hWndList)
        for hwnd in hWndList:
            if THSGuiUtil.getText(hwnd) == "专业版下单":
                main_hwnd = hwnd
                break
        if not main_hwnd:
            raise Exception("专业版下单未打开")
        child_win = None
        for i in range(0, 20):
            child_win = win32gui.FindWindowEx(main_hwnd, child_win, "#32770", None)
            if not child_win:
                break
            if not win32gui.IsWindowVisible(child_win):
                continue
            temp = win32gui.FindWindowEx(child_win, None, "Button", "撤单")
            if temp:
                buy_win_list.append(child_win)
        return buy_win_list
    @classmethod
    def get_trade_win(cls, win):
        # 获取交易窗口
        child_child_win = None
        for j in range(0, 10):
            child_child_win = win32gui.FindWindowEx(win, child_child_win, "#32770", None)
            if not child_child_win:
                break
            temp = win32gui.FindWindowEx(child_child_win, None, "Edit", None)
            if temp:
                return child_child_win
        return None
    @classmethod
    def __get_redis(cls):
        return cls.redisManager.getRedis()
    # 保存窗口代码分配
    @classmethod
    def __save_code_win(cls, code, win):
        key = "buywin_distribute-{}".format(code)
        cls.__get_redis().setex(key, tool.get_expire(), win)
    # 获取窗口分配的代码
    @classmethod
    def __get_code_win(cls, code):
        key = "buywin_distribute-{}".format(code)
        win = cls.__get_redis().get(key)
        if win is not None:
            return int(win)
        return None
    # 删除代码窗口分配
    @classmethod
    def __del_code_win(cls, code):
        key = "buywin_distribute-{}".format(code)
        cls.__get_redis().delete(key)
    # 获取所有已经分配窗口的代码
    @classmethod
    def __get_distributed_win_codes(cls):
        key = "buywin_distribute-*"
        keys = cls.__get_redis().keys(key)
        codes = []
        for k in keys:
            codes.append(k.replace("buywin_distribute-", ""))
        return codes
    # 获取可用的窗口
    @classmethod
    def __get_available_win(cls):
        # 是否有可用的还未分配的窗口
        key = "buywin_distribute-*"
        keys = cls.__get_redis().keys(key)
        win_list = cls.get_buy_wins()
        if len(win_list) < 1:
            raise Exception("必须要有一个买入窗口")
        win_set = set(win_list)
        for k in keys:
            win = int(cls.__get_redis().get(k))
            if win in win_set:
                win_set.remove(win)
        if len(win_set) > 0:
            return win_set.pop()
        # 没有剩余的窗口,新增加窗口
        raise Exception("没有剩余窗口")
    # 为代码分配窗口
    @classmethod
    def distribute_win_for_code(cls, code):
        # 获取是否已经分配
        win = cls.__get_code_win(code)
        if win is not None:
            # 已经分配的窗口是否有效
            if THSGuiUtil.is_win_exist(win):
                # 填充代码
                THSGuiUtil.set_buy_window_code(cls.get_trade_win(win), code)
                return win
        # 获取可用的窗口
        win = cls.__get_available_win()
        if win is None:
            raise Exception("窗口已经分配完毕,无可用窗口")
        # 保存窗口分配信息
        cls.__save_code_win(code, win)
        THSGuiUtil.set_buy_window_code(cls.get_trade_win(win), code)
        return win
    # 删除代码窗口分配
    @classmethod
    def cancel_distribute_win_for_code(cls, code):
        win = cls.__get_code_win(code)
        if win is not None:
            # 清除代码
            try:
                THSGuiUtil.clear_buy_window_code(win)
            except:
                pass
        cls.__del_code_win(code)
    # 获取代码已经分配的窗口
    @classmethod
    def get_distributed_code_win(cls, code):
        win = cls.__get_code_win(code)
        if not THSGuiUtil.is_win_exist(win):
            # 删除不存在的窗口
            cls.__del_code_win(code)
            return None
        return win
    # 获取代码名称
    @classmethod
    def __get_code_name(cls, win):
        trade_win = cls.get_trade_win(win)
        if trade_win is None:
            return None
        code_name_win = win32gui.GetDlgItem(trade_win, 0x000005C2)
        return THSGuiUtil.getText(code_name_win)
    @classmethod
    def fill_codes(cls, codes):
        name_codes = gpcode_manager.get_name_codes()
        codes_ = gpcode_manager.get_gp_list()
        # 先删除没有的代码
        old_codes = cls.__get_distributed_win_codes()
        for code in old_codes:
            if code not in codes_:
                cls.cancel_distribute_win_for_code(code)
        add_codes = codes[0:10]
        del_codes = codes[10:]
        for code in del_codes:
            cls.cancel_distribute_win_for_code(code)
        for code in add_codes:
            # 已经加入进去的不做操作
            if code in old_codes:
                # 校验代码是否填充对
                win = cls.__get_code_win(code)
                if not THSGuiUtil.is_win_exist(win):
                    cls.cancel_distribute_win_for_code(code)
                else:
                    code_name = cls.__get_code_name(win)
                    if name_codes.get(code_name) != code:
                        cls.cancel_distribute_win_for_code(code)
                continue
            win = cls.distribute_win_for_code(code)
            print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
class GUITest:
    def test_distribute(self):
        codes = ["300396", "688656", "688029", "688787", "688016", "002659", "002777", "603318", "000333", "003004",
                 "002882", "300014", "688981", "002531"]
        for i in range(10, len(codes)):
            THSBuyWinManagerNew.cancel_distribute_win_for_code(codes[i])
        for i in range(0, 10):
            win = THSBuyWinManagerNew.distribute_win_for_code(codes[i])
            time.sleep(1)
            print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
        random.shuffle(codes)
        print(codes[0:10])
        for i in range(10, len(codes)):
            THSBuyWinManagerNew.cancel_distribute_win_for_code(codes[i])
        for i in range(0, 10):
            win = THSBuyWinManagerNew.distribute_win_for_code(codes[i])
            time.sleep(1)
            print("分配的窗口:", win, THSGuiUtil.is_win_exist(win))
        # THSBuyWinManager.cancel_distribute_win_for_code("600125")
if __name__ == '__main__':
    THSGuiTrade().buy("002853", "18.98", THSBuyWinManagerNew.get_buy_wins()[5])
    # GUITest().test_distribute()
    # try:
    #     THSGuiUtil.set_buy_window_code(0x000112D0, "000333")
    # except Exception as e:
    #     print(e)