import base64 import ctypes import hashlib import json import logging import multiprocessing import queue import threading import time import sys from functools import partial from multiprocessing import Pipe, Process, freeze_support import torch import win32api import win32con import win32gui from PyQt5.QtGui import QFont, QPalette, QColor, QTextOption from PyQt5.QtWebChannel import QWebChannel from PyQt5.QtWebEngineWidgets import QWebEngineView, QWebEngineSettings, QWebEnginePage from PyQt5.QtWidgets import QMainWindow, QApplication, QAction, QMessageBox, QLabel from PyQt5.QtCore import Qt, pyqtSlot, QObject, pyqtSignal, QTimer, QUrl, QPoint import constant import gui_wx from utils import network_util, xgb_api, ths_util, ths_ocr_util import setting from utils import tool import win32_util from kpl.kpl_data_manager import KPLLimitUpDataManager from utils.network_delegate_manager import LocalKanPanNetworkDelegate URL_MSG_LIST = f"http://{constant.WEB_HOST}/kp/msg_list.html" window_msg_queue = queue.Queue() class BaseBridgeClass(QObject): signal_request = pyqtSignal(str, str, str) def __request_result_callback(self, method, key, result): base64_str = base64.b64encode(result.encode('utf-8')).decode('utf-8') self.__webview.page().runJavaScript(f"{method}('{key}','{base64_str}')") def __init__(self, webview): super().__init__() self.__webview = webview self.signal_request.connect(self.__request_result_callback) def __http_request(self, url, callback_info): try: # 代理请求结果 result, need_delegate = LocalKanPanNetworkDelegate.http_delegate_request(url) if not need_delegate: result = network_util.http_get(url) print(url, "请求结果:", result) self.signal_request.emit(callback_info[0], callback_info[1], result) return result except Exception as e: logging.exception(e) def __socket_request(self, text, callback_info, port=None): try: print("socket請求:", text) if port: result = network_util.socket_request(text, port=port) else: result = network_util.socket_request(text) print("请求结果:", result) self.signal_request.emit(callback_info[0], callback_info[1], result) return result except Exception as e: logging.exception(e) @pyqtSlot(str, str, str) def http_request(self, path, params, callback_info): url = path + "?" if params: params = json.loads(params) url += "&".join([f"{key}={params[key]}" for key in params]) print("http请求", url) callback_info = json.loads(callback_info) t1 = threading.Thread(target=lambda: self.__http_request(url, callback_info)) t1.setDaemon(True) t1.start() @pyqtSlot(str, str) def socket_request(self, text, callback_info): print("socket_request", text) try: text_json = json.loads(text) params = [] for k in text_json: if k == "sign": continue if type(text_json[k]) == dict or type(text_json[k]) == list: params.append(f"{k}={json.dumps(text_json[k], separators=(',', ':'))}") else: params.append(f"{k}={text_json[k]}") params.sort() params.append("%Yeshi2014@#.") params_str = "&".join(params) md5 = hashlib.md5() md5.update(params_str.encode("utf-8")) md5_hash = md5.hexdigest() text_json["sign"] = md5_hash text = json.dumps(text_json) except: pass callback_info = json.loads(callback_info) t1 = threading.Thread(target=lambda: self.__socket_request(text, callback_info)) t1.setDaemon(True) t1.start() @pyqtSlot(str, str) def ls_socket_request(self, text, callback_info): print("ls_socket_request", text) try: text_json = json.loads(text) params = [] for k in text_json: if k == "sign": continue if type(text_json[k]) == dict or type(text_json[k]) == list: params.append(f"{k}={json.dumps(text_json[k], separators=(',', ':'))}") else: params.append(f"{k}={text_json[k]}") params.sort() params.append("%Yeshi2014@#.") params_str = "&".join(params) md5 = hashlib.md5() md5.update(params_str.encode("utf-8")) md5_hash = md5.hexdigest() text_json["sign"] = md5_hash text = json.dumps(text_json) except: pass callback_info = json.loads(callback_info) t1 = threading.Thread(target=lambda: self.__socket_request(text, callback_info, port=14008)) t1.setDaemon(True) t1.start() # 获取客户端ID @pyqtSlot(result=str) def get_client(self): return setting.get_client() @pyqtSlot(str) def add_code_to_ths(self, code): # 添加到同花顺 threading.Thread(target=lambda: ths_util.add_code_to_zixuan(code), daemon=True).start() class SecondWindowBridgeClass(BaseBridgeClass): @pyqtSlot(str) def set_target_code(self, code): # 设置目标代码 window_msg_queue.put_nowait({"type": "set_target_code", "data": {"code": code}}) class JSBridgeClass(BaseBridgeClass): def __init__(self, window, webview): super().__init__(webview) self.window = window self.webview = webview @pyqtSlot(str) def show_info(self, msg): QMessageBox.information(self.window, "提示", msg, QMessageBox.Yes) @pyqtSlot(str, str, str, str, str, str) def set_trade_info(self, code, name, trade_data, trade_record, initiative_buy_codes, passive_buy_codes): self.window.set_trade_data(code, name, trade_data, trade_record, initiative_buy_codes, passive_buy_codes) class MessageWindow(QMainWindow): msgChange = pyqtSignal(str) def __setMsg(self, msg): print("收到信息:", msg) palette = QPalette() if msg.find("撤单") > -1: palette.setColor(QPalette.WindowText, QColor(0, 128, 0)) # 设置字体颜色为绿色 elif msg.find("下单") > -1: palette.setColor(QPalette.WindowText, QColor(34, 26, 178)) # 设置字体颜色为黄色 elif msg.find("成交") > -1: palette.setColor(QPalette.WindowText, QColor(255, 0, 0)) # 设置字体颜色为红色 self.label.setPalette(palette) self.label.setText(msg) self.show() self.timer.stop() self.timer.start(5000) # 设置信息 def setMsg(self, msg): # TODO 测试 self.msgChange.emit(msg) def __init__(self): super().__init__() window_height = 80 padding = 10 self.resize(300, 50) # hwnds = win32_util.search_window("悬浮盯盘", is_one_result=True) # if hwnds: # rect = win32gui.GetWindowRect(hwnds[0]) # self.move(rect[0] + padding - 3, rect[1] - window_height - 30) # if rect[2] - rect[0] > 0: # self.resize((rect[2] - rect[0]) - (padding - 2) * 2, window_height) self.setWindowTitle("消息提示") self.setWindowFlag(Qt.WindowStaysOnTopHint, True) self.setWindowOpacity(0.9) # 去掉标题栏 # self.setWindowFlags(Qt.FramelessWindowHint) # 删除最大最小化按钮 # self.setWindowFlags(self.windowFlags() & ~Qt.WindowMaximizeButtonHint & ~Qt.WindowMinimizeButtonHint) font = QFont('微软雅黑', 12) # 设置字体为微软雅黑,字号为12 palette = QPalette() # 创建一个调色板 palette.setColor(QPalette.WindowText, QColor(255, 0, 0)) # 设置字体颜色为红色 self.label = QLabel(self) self.label.setStyleSheet(f"padding: {padding}px;") self.label.setFont(font) self.label.setPalette(palette) # 将该调色板应用到QLabel上 self.label.setAlignment(Qt.AlignTop) self.label.resize(self.width(), self.height()) # 设置多行显示 self.label.setWordWrap(True) # 设置Label的自动换行 self.timer = QTimer() self.timer.timeout.connect(self.close) # 只运行1次定时器 self.timer.setSingleShot(True) self.msgChange.connect(self.__setMsg) class MsgListWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle('历史消息') # 窗口置顶 self.setWindowFlag(Qt.WindowStaysOnTopHint, True) self.resize(500, 800) self.webview = QWebEngineView() self.webview.settings().setAttribute(QWebEngineSettings.JavascriptEnabled, True) self.webview.page().setZoomFactor(1) self.setCentralWidget(self.webview) # JS桥设置 channel = QWebChannel(self.webview.page()) self.webview.page().setWebChannel(channel) self.python_bridge = BaseBridgeClass(self.webview) channel.registerObject("Bridge", self.python_bridge) def loadUrl(self, url): self.webview.load(QUrl(url)) class SecondWindow(QMainWindow): signal_update_kpl = pyqtSignal(str) def update_kpl_func(self): while True: time.sleep(3) try: self.__update_kpl() except: pass def __init__(self, parent=None): super(SecondWindow, self).__init__(parent) self.setWindowTitle('看盘副屏') window_info = setting.get_kp_second_window_info() if window_info: self.move(window_info[0], window_info[1]) self.resize(window_info[2], window_info[3]) else: self.resize(500, 800) self.webview = QWebEngineView() self.webview.settings().setAttribute(QWebEngineSettings.JavascriptEnabled, True) self.webview.page().setZoomFactor(1) self.setCentralWidget(self.webview) # JS桥设置 channel = QWebChannel(self.webview.page()) self.webview.page().setWebChannel(channel) self.python_bridge = SecondWindowBridgeClass(self.webview) channel.registerObject("Bridge", self.python_bridge) # win32gui.SetWindowLong(self.winId(), win32con.GWL_WNDPROC, self.handleCustomMessage) # self.signal_update_kpl.connect(self.set_kpl_data) # t1 = threading.Thread(target=self.update_kpl_func) # t1.setDaemon(True) # t1.start() # 自定义消息处理函数 def handleCustomMessage(self, hwnd, msg, wparam, lparam): if msg == win32con.WM_USER + 1024: # 解析数据 # try: # code = ctypes.cast(lparam, ctypes.py_object).value # print(code) # except: # pass pass # 处理数据 # self.dataReceived.emit(intValue, stringValue) return win32gui.DefWindowProc(hwnd, msg, wparam, lparam) def loadUrl(self, url): self.webview.load(QUrl(url)) # 设置交易数据 def set_trade_data(self, code, code_name, trade_data, trade_record, initiative_buy_codes, passive_buy_codes): script_str = "try{" script_str += "var trade_data = " + trade_data + ";" script_str += "var trade_record = " + trade_record + ";" script_str += "app.set_trade_info(\"{}\",\"{}\",trade_data,trade_record,'{}','{}');".format(code, code_name, initiative_buy_codes, passive_buy_codes) script_str += "} catch(e){console.log(e)}" self.webview.page().runJavaScript(script_str) def set_kpl_data(self, data): self.webview.page().runJavaScript(f"fill_kpl_data('{data}')") # 更新开盘啦数据 def __update_kpl(self): datas = network_util.http_get("/get_kpl_data") self.signal_update_kpl.emit(datas) def set_target_code(self, code): print("set_target_code", code) # 测试 self.webview.page().runJavaScript(f"app.set_target_code('{code}')") def closeEvent(self, event): try: setting.set_kp_second_window_info( (self.pos().x(), self.pos().y(), self.size().width(), self.size().height())) except Exception as e: print("") class WebEnginePage(QWebEnginePage): def javaScriptConsoleMessage(self, level, message, lineNumber, sourceID): """处理 JavaScript 控制台消息""" print(f"JavaScript Console Error: {message} at line {lineNumber} of {sourceID}") def onLoadFailed(self, errorCode, failingUrl, errorDescription): """处理加载失败的情况""" print(f"Failed to load URL: {failingUrl}, Error: {errorDescription}, Code: {errorCode}") class MainWindow(QMainWindow): signal_update_code = pyqtSignal(str) def show_info(self, msg): QMessageBox.information(self, "提示", msg, QMessageBox.Yes) def show_warning(self, msg): QMessageBox.warning(self, "提示", msg, QMessageBox.Yes) def set_trade_data(self, code, code_name, trade_data, trade_record, initiative_buy_codes, passive_buy_codes): self.secondWindow.set_trade_data(code, code_name, trade_data, trade_record, initiative_buy_codes, passive_buy_codes) # 设置目标代码 def set_target_code(self, code): print("set_target_code") self.wx_pipe.send(json.dumps({"type": "set_code", "code": code})) self.webview.page().runJavaScript(f"app.set_target_code('{code}')") self.secondWindow.set_target_code(code) def read_window_msg(self): while True: try: data = window_msg_queue.get() if data["type"] == "set_target_code": code = data["data"]["code"] self.signal_update_code.emit(code) except: pass finally: pass # 菜单及菜单点击事件 def __menu(self): def __gpu_is_avaiable(): if torch.cuda.is_available(): self.show_info("GPU可用") else: self.show_warning("GPU不可用") def __set_juejin_params(): ps = (self.x(), self.y()) size = (self.width(), self.height()) self.wx_pipe.send(json.dumps({"type": "juejin_setting", "pos": (ps[0] + size[0] / 2, ps[1] + size[1] / 2)})) def __juejin_tick_download(): self.wx_pipe.send(json.dumps({"type": "juejin_tick_download"})) def __ths_ocr_code(): try: code = ths_ocr_util.ocr_ths_code(always_save=True) self.show_info(f"识别到的代码:{code}") except Exception as e: self.show_warning(f"识别出错:{str(e)}") def __download_codes(): try: result = self.__request("", 72) result = json.loads(result) if result['code'] == 0: codes_dict = result['data'] if codes_dict: for key in codes_dict: # 将代码保存到桌面 path = f"C:\\Users\\Administrator\\Desktop\\涨停代码{key}.txt" with open(path, mode='w') as f: for c in codes_dict[key]: f.write(c) f.write("\n") self.show_info("下载成功") else: self.show_warning(result['msg']) except Exception as e: self.show_warning(str(e)) def __manage_code(): ps = (self.x(), self.y()) size = (self.width(), self.height()) self.wx_pipe.send(json.dumps({"type": "codes_setting", "pos": (ps[0] + size[0] / 2, ps[1] + size[1] / 2)})) def __show_dead_hwnds(): self.wx_pipe.send(json.dumps({"type": "show_dead_hwnds"})) def __manage_ths_pos(): ps = (self.x(), self.y()) size = (self.width(), self.height()) self.wx_pipe.send(json.dumps({"type": "manage_ths_pos", "pos": (ps[0] + size[0] / 2, ps[1] + size[1] / 2)})) def __show_second_window(): self.secondWindow.show() def __show_his_msg_window(): self.msgListWindow.loadUrl(URL_MSG_LIST) self.msgListWindow.show() # 清除浏览器缓存 def __clear_webview_cache(): self.webview.page().profile().clearHttpCache() def __show_ths_flash_trade(): hwnds = ths_util.get_flash_trade_hwnds() for hwnd in hwnds: if not win32gui.IsWindowVisible(hwnd): win32gui.BringWindowToTop(hwnd) win32_util.move_window(hwnd, 0, 0) menubar = self.menuBar() setting_ = menubar.addMenu('&设置') action = QAction("&GPU检测", self) action.triggered.connect(__gpu_is_avaiable) setting_.addAction(action) action = QAction("&清除浏览器缓存", self) action.triggered.connect(__clear_webview_cache) setting_.addAction(action) action = QAction("&同花顺代码识别", self) action.triggered.connect(__ths_ocr_code) setting_.addAction(action) action = QAction("&死句柄管理", self) action.triggered.connect(__show_dead_hwnds) setting_.addAction(action) view_ = menubar.addMenu('&视图') action = QAction("&打开副屏", self) action.triggered.connect(__show_second_window) view_.addAction(action) action = QAction("&打开历史消息", self) action.triggered.connect(__show_his_msg_window) view_.addAction(action) view_ = menubar.addMenu('&显示一键卖出') view_.aboutToShow.connect(__show_ths_flash_trade) def __init__(self, wx_pipe): super().__init__() self.wx_pipe = wx_pipe self.setWindowTitle('看盘页面') window_info = setting.get_kp_window_info() if window_info: self.move(window_info[0], window_info[1]) self.resize(window_info[2], window_info[3]) else: self.resize(1100, 1000) self.center() self.setWindowFlag(Qt.WindowStaysOnTopHint, True) self.webview = QWebEngineView() self.webview.setPage(WebEnginePage()) self.webview.settings().setAttribute(QWebEngineSettings.JavascriptEnabled, True) self.__menu() # JS桥设置 channel = QWebChannel(self.webview.page()) self.webview.page().setWebChannel(channel) self.python_bridge = JSBridgeClass(self, self.webview) channel.registerObject("Bridge", self.python_bridge) # 设置副屏 self.secondWindow = SecondWindow(self) self.setCentralWidget(self.webview) self.show() if not constant.IS_TEST: self.webview.load(QUrl(f"http://{constant.WEB_HOST}/kp/index23-05-04.html")) else: self.webview.load(QUrl("http://127.0.0.1:8848/kp/index23-05-04.html")) if not setting.is_only_convertible_bonds(): self.secondWindow.show() if not constant.IS_TEST: self.secondWindow.loadUrl(f"http://{constant.WEB_HOST}/kp/codes_list.html") else: self.secondWindow.loadUrl("http://127.0.0.1:8848/kp/codes_list.html") # 绑定槽函数 self.signal_update_code.connect(self.set_target_code) # self.statusBar().showMessage("这是条测试数据额", 10000) self.messageWindow = MessageWindow() self.msgListWindow = MsgListWindow() threading.Thread(target=KPLLimitUpDataManager().run, daemon=True).start() threading.Thread(target=xgb_api.run, daemon=True).start() threading.Thread(target=self.read_window_msg, daemon=True).start() def closeEvent(self, event): event.accept() try: setting.set_kp_window_info((self.pos().x(), self.pos().y(), self.size().width(), self.size().height())) except Exception as e: print("") self.wx_pipe.send(json.dumps({"type": "exit"})) wxGuiProcess.terminate() wxGuiProcess.join() sys.exit(0) # 读取消息 def read_msg(self): client = setting.get_client() if not client: client = 'hxh' while True: if tool.is_trade_time(): try: res = network_util.http_get(f"/pull_kp_client_msg?client=" + client.strip()) if res: res = json.loads(res) if res["code"] == 0: print("拉取到消息", res) self.messageWindow.setMsg(res["data"]) except: pass time.sleep(0.5) def recieve_code(pipe, mainWindow): latest_code = '' while True: try: data = pipe.recv() if data: data = json.loads(data) if data["type"] == "code": if latest_code != data["code"]: latest_code = data["code"] mainWindow.signal_update_code.emit(latest_code) except Exception as e: logging.exception(e) # 打包命令 # cd D:\workspace\GP\trade_desk # D:\workspace\GP\trade_desk\dist\env\pk_env\Scripts\pyinstaller.exe main.spec # 为了不出现意外的bug,运行时请将目录放在英文路径 if __name__ == "__main__": freeze_support() p1, p2 = multiprocessing.Pipe() global wxGuiProcess wxGuiProcess = Process(target=gui_wx.run, args=(p1,)) wxGuiProcess.start() app = QApplication(sys.argv) browser = MainWindow(p2) t1 = threading.Thread(target=lambda: recieve_code(p2, browser)) # 后台运行 t1.setDaemon(True) t1.start() sys.exit(app.exec_())