import base64
|
import hashlib
|
import json
|
import logging
|
import multiprocessing
|
import queue
|
import threading
|
import time
|
import sys
|
from multiprocessing import Pipe, Process, freeze_support
|
|
import torch
|
import win32gui
|
from PyQt5.QtGui import QFont, QPalette, QColor, QTextOption
|
|
from PyQt5.QtWebChannel import QWebChannel
|
from PyQt5.QtWebEngineWidgets import QWebEngineView, QWebEngineSettings
|
from PyQt5.QtWidgets import QMainWindow, QApplication, QAction, QMessageBox, QLabel
|
|
from PyQt5.QtCore import Qt, pyqtSlot, QObject, pyqtSignal, QTimer, QUrl, QPoint
|
|
import constant
|
import gui_wx
|
from network_delegate_manager import LocalKanPanNetworkDelegate
|
from utils import network_util, xgb_api, ths_util
|
import setting
|
from utils import tool
|
import win32_util
|
from kpl.kpl_data_manager import KPLLimitUpDataManager
|
|
URL_MSG_LIST = f"http://{constant.WEB_HOST}/kp/msg_list.html"
|
|
window_msg_queue = queue.Queue()
|
|
|
class BaseBridgeClass(QObject):
|
signal_request = pyqtSignal(str, str, str)
|
|
def __request_result_callback(self, method, key, result):
|
base64_str = base64.b64encode(result.encode('utf-8')).decode('utf-8')
|
self.__webview.page().runJavaScript(f"{method}('{key}','{base64_str}')")
|
|
def __init__(self, webview):
|
super().__init__()
|
self.__webview = webview
|
self.signal_request.connect(self.__request_result_callback)
|
|
def __http_request(self, url, callback_info):
|
try:
|
# 代理请求结果
|
result, need_delegate = LocalKanPanNetworkDelegate.http_delegate_request(url)
|
if not need_delegate:
|
result = network_util.http_get(url)
|
print(url, "请求结果:", result)
|
self.signal_request.emit(callback_info[0], callback_info[1], result)
|
return result
|
except Exception as e:
|
logging.exception(e)
|
|
def __socket_request(self, text, callback_info):
|
try:
|
print("socket請求:", text)
|
result = network_util.socket_request(text)
|
print("请求结果:", result)
|
self.signal_request.emit(callback_info[0], callback_info[1], result)
|
return result
|
except Exception as e:
|
logging.exception(e)
|
|
@pyqtSlot(str, str, str)
|
def http_request(self, path, params, callback_info):
|
url = path + "?"
|
if params:
|
params = json.loads(params)
|
url += "&".join([f"{key}={params[key]}" for key in params])
|
print("http请求", url)
|
callback_info = json.loads(callback_info)
|
t1 = threading.Thread(target=lambda: self.__http_request(url, callback_info))
|
t1.setDaemon(True)
|
t1.start()
|
|
@pyqtSlot(str, str)
|
def socket_request(self, text, callback_info):
|
print("socket_request", text)
|
try:
|
text_json = json.loads(text)
|
params = []
|
for k in text_json:
|
if k == "sign":
|
continue
|
if type(text_json[k]) == dict or type(text_json[k]) == list:
|
params.append(f"{k}={json.dumps(text_json[k], separators=(',', ':'))}")
|
else:
|
params.append(f"{k}={text_json[k]}")
|
params.sort()
|
params.append("%Yeshi2014@#.")
|
params_str = "&".join(params)
|
md5 = hashlib.md5()
|
md5.update(params_str.encode("utf-8"))
|
md5_hash = md5.hexdigest()
|
text_json["sign"] = md5_hash
|
text = json.dumps(text_json)
|
except:
|
pass
|
callback_info = json.loads(callback_info)
|
t1 = threading.Thread(target=lambda: self.__socket_request(text, callback_info))
|
t1.setDaemon(True)
|
t1.start()
|
|
# 获取客户端ID
|
@pyqtSlot(result=str)
|
def get_client(self):
|
return setting.get_client()
|
|
@pyqtSlot(str)
|
def add_code_to_ths(self, code):
|
# 添加到同花顺
|
threading.Thread(target=lambda: ths_util.add_code_to_zixuan(code), daemon=True).start()
|
|
|
class SecondWindowBridgeClass(BaseBridgeClass):
|
@pyqtSlot(str)
|
def set_target_code(self, code):
|
# 设置目标代码
|
window_msg_queue.put_nowait({"type": "set_target_code", "data": {"code": code}})
|
|
|
class JSBridgeClass(BaseBridgeClass):
|
|
def __init__(self, window, webview):
|
super().__init__(webview)
|
self.window = window
|
self.webview = webview
|
|
@pyqtSlot(str)
|
def show_info(self, msg):
|
QMessageBox.information(self.window, "提示", msg, QMessageBox.Yes)
|
|
@pyqtSlot(str, str, str, str, str, str)
|
def set_trade_info(self, code, name, trade_data, trade_record, initiative_buy_codes, passive_buy_codes):
|
self.window.set_trade_data(code, name, trade_data, trade_record, initiative_buy_codes, passive_buy_codes)
|
|
|
class MessageWindow(QMainWindow):
|
msgChange = pyqtSignal(str)
|
|
def __setMsg(self, msg):
|
print("收到信息:", msg)
|
palette = QPalette()
|
if msg.find("撤单") > -1:
|
palette.setColor(QPalette.WindowText, QColor(0, 128, 0)) # 设置字体颜色为绿色
|
elif msg.find("下单") > -1:
|
palette.setColor(QPalette.WindowText, QColor(34, 26, 178)) # 设置字体颜色为黄色
|
elif msg.find("成交") > -1:
|
palette.setColor(QPalette.WindowText, QColor(255, 0, 0)) # 设置字体颜色为红色
|
self.label.setPalette(palette)
|
self.label.setText(msg)
|
self.show()
|
self.timer.stop()
|
self.timer.start(5000)
|
|
# 设置信息
|
def setMsg(self, msg):
|
# TODO 测试
|
self.msgChange.emit(msg)
|
|
def __init__(self):
|
super().__init__()
|
window_height = 80
|
padding = 10
|
self.resize(300, 50)
|
hwnds = win32_util.search_window("悬浮盯盘")
|
if hwnds:
|
rect = win32gui.GetWindowRect(hwnds[0])
|
self.move(rect[0] + padding - 3, rect[1] - window_height - 30)
|
if rect[2] - rect[0] > 0:
|
self.resize((rect[2] - rect[0]) - (padding - 2) * 2, window_height)
|
self.setWindowTitle("消息提示")
|
self.setWindowFlag(Qt.WindowStaysOnTopHint, True)
|
self.setWindowOpacity(0.9)
|
# 去掉标题栏
|
# self.setWindowFlags(Qt.FramelessWindowHint)
|
# 删除最大最小化按钮
|
# self.setWindowFlags(self.windowFlags() & ~Qt.WindowMaximizeButtonHint & ~Qt.WindowMinimizeButtonHint)
|
|
font = QFont('微软雅黑', 12) # 设置字体为微软雅黑,字号为12
|
palette = QPalette() # 创建一个调色板
|
palette.setColor(QPalette.WindowText, QColor(255, 0, 0)) # 设置字体颜色为红色
|
|
self.label = QLabel(self)
|
self.label.setStyleSheet(f"padding: {padding}px;")
|
self.label.setFont(font)
|
self.label.setPalette(palette) # 将该调色板应用到QLabel上
|
self.label.setAlignment(Qt.AlignTop)
|
self.label.resize(self.width(), self.height())
|
# 设置多行显示
|
self.label.setWordWrap(True) # 设置Label的自动换行
|
|
self.timer = QTimer()
|
self.timer.timeout.connect(self.close)
|
# 只运行1次定时器
|
self.timer.setSingleShot(True)
|
self.msgChange.connect(self.__setMsg)
|
|
|
class MsgListWindow(QMainWindow):
|
|
def __init__(self):
|
super().__init__()
|
self.setWindowTitle('历史消息')
|
# 窗口置顶
|
self.setWindowFlag(Qt.WindowStaysOnTopHint, True)
|
self.resize(500, 800)
|
self.webview = QWebEngineView()
|
self.webview.settings().setAttribute(QWebEngineSettings.JavascriptEnabled, True)
|
self.webview.page().setZoomFactor(1)
|
self.setCentralWidget(self.webview)
|
|
# JS桥设置
|
channel = QWebChannel(self.webview.page())
|
self.webview.page().setWebChannel(channel)
|
self.python_bridge = BaseBridgeClass(self.webview)
|
channel.registerObject("Bridge", self.python_bridge)
|
|
def loadUrl(self, url):
|
self.webview.load(QUrl(url))
|
|
|
class SecondWindow(QMainWindow):
|
signal_update_kpl = pyqtSignal(str)
|
|
def update_kpl_func(self):
|
while True:
|
time.sleep(3)
|
try:
|
self.__update_kpl()
|
except:
|
pass
|
|
def __init__(self, parent=None):
|
super(SecondWindow, self).__init__(parent)
|
self.setWindowTitle('看盘副屏')
|
window_info = setting.get_kp_second_window_info()
|
if window_info:
|
self.move(window_info[0], window_info[1])
|
self.resize(window_info[2], window_info[3])
|
else:
|
self.resize(500, 800)
|
|
self.webview = QWebEngineView()
|
self.webview.settings().setAttribute(QWebEngineSettings.JavascriptEnabled, True)
|
self.webview.page().setZoomFactor(1)
|
self.setCentralWidget(self.webview)
|
|
# JS桥设置
|
channel = QWebChannel(self.webview.page())
|
self.webview.page().setWebChannel(channel)
|
self.python_bridge = SecondWindowBridgeClass(self.webview)
|
channel.registerObject("Bridge", self.python_bridge)
|
|
# self.signal_update_kpl.connect(self.set_kpl_data)
|
|
# t1 = threading.Thread(target=self.update_kpl_func)
|
# t1.setDaemon(True)
|
# t1.start()
|
|
def loadUrl(self, url):
|
self.webview.load(QUrl(url))
|
|
# 设置交易数据
|
def set_trade_data(self, code, code_name, trade_data, trade_record, initiative_buy_codes, passive_buy_codes):
|
script_str = "try{"
|
script_str += "var trade_data = " + trade_data + ";"
|
script_str += "var trade_record = " + trade_record + ";"
|
|
script_str += "app.set_trade_info(\"{}\",\"{}\",trade_data,trade_record,'{}','{}');".format(code, code_name,
|
initiative_buy_codes,
|
passive_buy_codes)
|
script_str += "} catch(e){console.log(e)}"
|
self.webview.page().runJavaScript(script_str)
|
|
def set_kpl_data(self, data):
|
self.webview.page().runJavaScript(f"fill_kpl_data('{data}')")
|
|
# 更新开盘啦数据
|
def __update_kpl(self):
|
datas = network_util.http_get("/get_kpl_data")
|
self.signal_update_kpl.emit(datas)
|
|
def set_target_code(self, code):
|
print("set_target_code", code)
|
# 测试
|
self.webview.page().runJavaScript(f"app.set_target_code('{code}')")
|
|
def closeEvent(self, event):
|
try:
|
setting.set_kp_second_window_info(
|
(self.pos().x(), self.pos().y(), self.size().width(), self.size().height()))
|
except Exception as e:
|
print("")
|
|
|
class MainWindow(QMainWindow):
|
signal_update_code = pyqtSignal(str)
|
|
def show_info(self, msg):
|
QMessageBox.information(self, "提示", msg, QMessageBox.Yes)
|
|
def show_warning(self, msg):
|
QMessageBox.warning(self, "提示", msg, QMessageBox.Yes)
|
|
def set_trade_data(self, code, code_name, trade_data, trade_record, initiative_buy_codes, passive_buy_codes):
|
self.secondWindow.set_trade_data(code, code_name, trade_data, trade_record, initiative_buy_codes,
|
passive_buy_codes)
|
|
# 设置目标代码
|
def set_target_code(self, code):
|
print("set_target_code")
|
self.wx_pipe.send(json.dumps({"type": "set_code", "code": code}))
|
self.webview.page().runJavaScript(f"app.set_target_code('{code}')")
|
self.secondWindow.set_target_code(code)
|
|
def read_window_msg(self):
|
while True:
|
try:
|
data = window_msg_queue.get()
|
if data["type"] == "set_target_code":
|
code = data["data"]["code"]
|
self.signal_update_code.emit(code)
|
except:
|
pass
|
finally:
|
pass
|
|
# 菜单及菜单点击事件
|
def __menu(self):
|
def __gpu_is_avaiable():
|
if torch.cuda.is_available():
|
self.show_info("GPU可用")
|
else:
|
self.show_warning("GPU不可用")
|
|
def __set_juejin_params():
|
ps = (self.x(), self.y())
|
size = (self.width(), self.height())
|
self.wx_pipe.send(json.dumps({"type": "juejin_setting", "pos": (ps[0] + size[0] / 2, ps[1] + size[1] / 2)}))
|
|
def __juejin_tick_download():
|
self.wx_pipe.send(json.dumps({"type": "juejin_tick_download"}))
|
|
def __download_codes():
|
try:
|
result = self.__request("", 72)
|
result = json.loads(result)
|
if result['code'] == 0:
|
codes_dict = result['data']
|
if codes_dict:
|
for key in codes_dict:
|
# 将代码保存到桌面
|
path = f"C:\\Users\\Administrator\\Desktop\\涨停代码{key}.txt"
|
with open(path, mode='w') as f:
|
for c in codes_dict[key]:
|
f.write(c)
|
f.write("\n")
|
self.show_info("下载成功")
|
else:
|
self.show_warning(result['msg'])
|
except Exception as e:
|
self.show_warning(str(e))
|
|
def __manage_code():
|
ps = (self.x(), self.y())
|
size = (self.width(), self.height())
|
self.wx_pipe.send(json.dumps({"type": "codes_setting", "pos": (ps[0] + size[0] / 2, ps[1] + size[1] / 2)}))
|
|
def __manage_ths_pos():
|
ps = (self.x(), self.y())
|
size = (self.width(), self.height())
|
self.wx_pipe.send(json.dumps({"type": "manage_ths_pos", "pos": (ps[0] + size[0] / 2, ps[1] + size[1] / 2)}))
|
|
def __show_float_callback():
|
self.wx_pipe.send(json.dumps({"type": "show_float_callback"}))
|
|
def __show_main_callback():
|
self.wx_pipe.send(json.dumps({"type": "show_main_callback"}))
|
|
def __show_second_window():
|
self.secondWindow.show()
|
|
def __show_his_msg_window():
|
self.msgListWindow.loadUrl(URL_MSG_LIST)
|
self.msgListWindow.show()
|
|
# 清除浏览器缓存
|
def __clear_webview_cache():
|
self.webview.page().profile().clearHttpCache()
|
|
menubar = self.menuBar()
|
|
setting_ = menubar.addMenu('&设置')
|
|
juejin_action = QAction("&掘金参数配置", self)
|
juejin_action.triggered.connect(__set_juejin_params)
|
setting_.addAction(juejin_action)
|
|
action = QAction("&代码管理", self)
|
action.triggered.connect(__manage_code)
|
setting_.addAction(action)
|
|
action = QAction("&下载涨停代码", self)
|
action.triggered.connect(__download_codes)
|
setting_.addAction(action)
|
|
action = QAction("&GPU检测", self)
|
action.triggered.connect(__gpu_is_avaiable)
|
setting_.addAction(action)
|
|
action = QAction("&清除浏览器缓存", self)
|
action.triggered.connect(__clear_webview_cache)
|
setting_.addAction(action)
|
|
action = QAction("&掘金分时数据下载", self)
|
action.triggered.connect(__juejin_tick_download)
|
setting_.addAction(action)
|
|
auto_ = menubar.addMenu('&自动化')
|
action = QAction("&同花顺设置", self)
|
action.triggered.connect(__manage_ths_pos)
|
auto_.addAction(action)
|
|
view_ = menubar.addMenu('&视图')
|
action = QAction("&打开副屏", self)
|
action.triggered.connect(__show_second_window)
|
view_.addAction(action)
|
|
action = QAction("&打开悬浮盯盘", self)
|
action.triggered.connect(__show_float_callback)
|
view_.addAction(action)
|
|
action = QAction("&打开历史消息", self)
|
action.triggered.connect(__show_his_msg_window)
|
view_.addAction(action)
|
|
view_ = menubar.addMenu('&代码管理')
|
view_.aboutToShow.connect(__manage_code)
|
|
view_ = menubar.addMenu('&打开分时')
|
view_.aboutToShow.connect(__show_main_callback)
|
|
def __init__(self, wx_pipe):
|
super().__init__()
|
self.wx_pipe = wx_pipe
|
self.setWindowTitle('看盘页面')
|
window_info = setting.get_kp_window_info()
|
if window_info:
|
self.move(window_info[0], window_info[1])
|
self.resize(window_info[2], window_info[3])
|
else:
|
self.resize(1100, 1000)
|
self.center()
|
self.setWindowFlag(Qt.WindowStaysOnTopHint, True)
|
self.webview = QWebEngineView()
|
self.webview.settings().setAttribute(QWebEngineSettings.JavascriptEnabled, True)
|
self.__menu()
|
# JS桥设置
|
channel = QWebChannel(self.webview.page())
|
self.webview.page().setWebChannel(channel)
|
self.python_bridge = JSBridgeClass(self, self.webview)
|
channel.registerObject("Bridge", self.python_bridge)
|
# 设置副屏
|
self.secondWindow = SecondWindow(self)
|
|
self.setCentralWidget(self.webview)
|
self.show()
|
if not constant.IS_TEST:
|
self.webview.load(QUrl(f"http://{constant.WEB_HOST}/kp/index23-05-04.html"))
|
else:
|
self.webview.load(QUrl("http://127.0.0.1:8848/kp/index23-05-04.html"))
|
|
self.secondWindow.show()
|
if not constant.IS_TEST:
|
self.secondWindow.loadUrl(f"http://{constant.WEB_HOST}/kp/codes_list.html")
|
else:
|
self.secondWindow.loadUrl("http://127.0.0.1:8848/kp/codes_list.html")
|
|
# 绑定槽函数
|
self.signal_update_code.connect(self.set_target_code)
|
# self.statusBar().showMessage("这是条测试数据额", 10000)
|
|
self.messageWindow = MessageWindow()
|
self.msgListWindow = MsgListWindow()
|
|
threading.Thread(target=KPLLimitUpDataManager().run, daemon=True).start()
|
threading.Thread(target=xgb_api.run, daemon=True).start()
|
threading.Thread(target=self.read_window_msg, daemon=True).start()
|
|
def closeEvent(self, event):
|
event.accept()
|
try:
|
setting.set_kp_window_info((self.pos().x(), self.pos().y(), self.size().width(), self.size().height()))
|
except Exception as e:
|
print("")
|
self.wx_pipe.send(json.dumps({"type": "exit"}))
|
sys.exit(0)
|
|
# 读取消息
|
def read_msg(self):
|
client = setting.get_client()
|
if not client:
|
client = 'hxh'
|
while True:
|
if tool.is_trade_time():
|
try:
|
res = network_util.http_get(f"/pull_kp_client_msg?client=" + client.strip())
|
if res:
|
res = json.loads(res)
|
if res["code"] == 0:
|
print("拉取到消息", res)
|
self.messageWindow.setMsg(res["data"])
|
except:
|
pass
|
time.sleep(0.5)
|
|
|
def recieve_code(pipe, mainWindow):
|
latest_code = ''
|
while True:
|
try:
|
data = pipe.recv()
|
if data:
|
data = json.loads(data)
|
if data["type"] == "code":
|
if latest_code != data["code"]:
|
latest_code = data["code"]
|
mainWindow.signal_update_code.emit(latest_code)
|
except Exception as e:
|
logging.exception(e)
|
|
|
# 打包命令
|
# cd D:\workspace\GP\trade_desk
|
# D:\workspace\GP\trade_desk\dist\env\pk_env\Scripts\pyinstaller.exe main.spec
|
# 为了不出现意外的bug,运行时请将目录放在英文路径
|
|
if __name__ == "__main__":
|
freeze_support()
|
p1, p2 = multiprocessing.Pipe()
|
wxGuiProcess = Process(target=gui_wx.run, args=(p1,))
|
wxGuiProcess.start()
|
|
app = QApplication(sys.argv)
|
browser = MainWindow(p2)
|
t1 = threading.Thread(target=lambda: recieve_code(p2, browser))
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
sys.exit(app.exec_())
|