import array
|
import json
|
import os
|
import socket
|
import struct
|
import time
|
|
import win32con
|
import win32gui
|
|
import constant
|
from utils import ths_ocr_util, invalid_hwnds_manager
|
import setting
|
from utils import socket_util
|
from utils import ths_util
|
import win32_util
|
|
# freeze_support()
|
import logging
|
import multiprocessing
|
import threading
|
|
import wx
|
import wx.html2 as webview
|
|
import concurrent.futures
|
|
APP_TITLE = "卖票看板"
|
APP_ICON = ""
|
SOCKET_PORT = 11008
|
HTTP_PORT = 9004
|
|
|
def show_warning(content, click):
|
toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_WARNING)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
if click is not None:
|
click()
|
toastone.Destroy()
|
|
|
def show_info(content, click):
|
toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
click()
|
toastone.Destroy()
|
|
|
def show_sure(content, callback):
|
toastone = wx.MessageDialog(None, content, "提示", wx.YES_NO | wx.ICON_INFORMATION)
|
result = toastone.ShowModal()
|
if result == wx.ID_YES: # 如果点击了提示框的确定按钮
|
callback(True)
|
toastone.Destroy()
|
elif result == wx.ID_NO:
|
callback(False)
|
toastone.Destroy()
|
|
|
class JueJinSettingFrame(wx.Frame):
|
def __init__(self, position):
|
wx.Frame.__init__(self, None, -1, "掘金参数设置", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP,
|
size=(450, 200))
|
|
self.SetBackgroundColour(wx.Colour(224, 224, 224))
|
self.SetPosition(wx.Point(position[0] - self.GetSize()[0] / 2, position[1] - self.GetSize()[1] / 2))
|
boxsier = wx.BoxSizer()
|
flex = wx.FlexGridSizer(rows=3, cols=2, vgap=10, hgap=10)
|
# 策略
|
label = wx.StaticText(self, label="策略ID:")
|
flex.Add(label, flag=wx.ALIGN_RIGHT)
|
self.edit_celue = wx.TextCtrl(self, size=(300, -1))
|
flex.Add(self.edit_celue, flag=wx.EXPAND)
|
|
# token
|
label = wx.StaticText(self, label="Token:")
|
flex.Add(label)
|
self.edit_token = wx.TextCtrl(self, size=(300, -1), style=wx.TE_MULTILINE)
|
flex.Add(self.edit_token)
|
|
# 占位
|
flex.Add(wx.StaticText(self, label=""))
|
# 确定按钮
|
ID_SURE = wx.NewId()
|
self.btn_sure = wx.Button(self, label='确定', id=ID_SURE)
|
self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn)
|
flex.Add(self.btn_sure, 1, wx.TOP | wx.LEFT, 20)
|
|
boxsier.Add(flex, 1, wx.TOP | wx.LEFT, 20)
|
self.SetSizer(boxsier)
|
|
# 初始化数据
|
self.__init_data()
|
|
def __init_data(self):
|
strategy_id, token = setting.get_juejin_params()
|
self.edit_celue.SetLabelText(strategy_id)
|
self.edit_token.SetLabelText(token)
|
pass
|
|
def __sure_btn(self, event):
|
strategy_id = self.edit_celue.GetValue()
|
token = self.edit_token.GetValue()
|
setting.set_juejin_params(strategy_id, token)
|
toastone = wx.MessageDialog(None, "更改成功", "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
self.Close()
|
toastone.Destroy()
|
|
|
class FobiddenCodesFrame(wx.Frame):
|
def __init__(self, position):
|
wx.Frame.__init__(self, None, -1, "添加禁止交易代码", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP,
|
size=(170, 200))
|
self.SetBackgroundColour(wx.Colour(224, 224, 224))
|
self.SetPosition(wx.Point(position[0] - self.GetSize()[0] / 2, position[1] - self.GetSize()[1] / 2))
|
boxsier = wx.BoxSizer(wx.VERTICAL)
|
# 代码
|
label = wx.StaticText(self, label="代码:")
|
boxsier.Add(label, flag=wx.ALIGN_LEFT, border=10)
|
self.edit_codes = wx.TextCtrl(self, size=(150, 100), style=wx.TE_MULTILINE)
|
boxsier.Add(self.edit_codes)
|
|
# 确定按钮
|
ID_SURE = wx.NewId()
|
self.btn_sure = wx.Button(self, label='确定', id=ID_SURE)
|
self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn)
|
boxsier.Add(self.btn_sure)
|
root_boxsier = wx.BoxSizer(wx.HORIZONTAL)
|
root_boxsier.Add(boxsier, 1, wx.TOP | wx.TOP, 10)
|
self.SetSizer(root_boxsier)
|
|
def __request(self, codes):
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
data = {"type": 201, "data": {"codes": codes}}
|
client.send(json.dumps(data).encode("utf-8"))
|
client.close()
|
|
def __sure_btn(self, event):
|
codes_str = self.edit_codes.GetValue()
|
if codes_str:
|
codes_str = codes_str.strip()
|
|
codes = codes_str.split("\n")
|
codes_result = []
|
for code in codes:
|
if code.strip() and len(code.strip()) == 6:
|
codes_result.append(code.strip())
|
if len(codes_result) == 0:
|
show_warning("请填写正确的代码!", self.Close)
|
return
|
try:
|
self.__request(codes_result)
|
except Exception as e:
|
show_warning("添加出错:" + str(e), None)
|
return
|
|
toastone = wx.MessageDialog(None, "添加成功", "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
self.Close()
|
toastone.Destroy()
|
|
|
class THSPositionSettingFrame(wx.Frame):
|
def __init__(self, position):
|
wx.Frame.__init__(self, None, -1, "同花顺坐标设置", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP,
|
size=(170, 400))
|
self.SetBackgroundColour(wx.Colour(224, 224, 224))
|
self.SetPosition(wx.Point(position[0] - self.GetSize()[0] / 2, position[1] - self.GetSize()[1] / 2))
|
boxsier = wx.BoxSizer(wx.VERTICAL)
|
|
label = wx.StaticText(self, label="交易刷新事件间隔(ms):")
|
boxsier.Add(label)
|
self.edit_refresh_time_space = wx.TextCtrl(self, size=(150, -1))
|
boxsier.Add(self.edit_refresh_time_space)
|
|
label = wx.StaticText(self, label="点击时间间隔(ms):")
|
boxsier.Add(label)
|
self.edit_time_space = wx.TextCtrl(self, size=(150, -1))
|
boxsier.Add(self.edit_time_space)
|
|
# 代码
|
label = wx.StaticText(self, label="坐标:")
|
boxsier.Add(label)
|
self.edit_codes = wx.TextCtrl(self, size=(150, 200), style=wx.TE_MULTILINE)
|
boxsier.Add(self.edit_codes)
|
|
# 确定按钮
|
ID_SURE = wx.NewId()
|
self.btn_sure = wx.Button(self, label='确定', id=ID_SURE)
|
self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn)
|
boxsier.Add(self.btn_sure)
|
root_boxsier = wx.BoxSizer(wx.HORIZONTAL)
|
root_boxsier.Add(boxsier, 1, wx.TOP | wx.TOP, 10)
|
self.SetSizer(root_boxsier)
|
|
# 初始化数据
|
self.__init_data()
|
|
def __init_data(self):
|
pos = setting.get_ths_auto_click_positions()
|
self.edit_codes.SetValue("\n".join(pos))
|
space = setting.get_ths_auto_click_time_space()
|
if space is None:
|
space = 500
|
self.edit_time_space.SetValue(f"{space}")
|
|
space = setting.get_ths_auto_refresh_time_space()
|
if space is None:
|
space = 500
|
self.edit_refresh_time_space.SetValue(f"{space}")
|
|
def __sure_btn(self, event):
|
codes_str = self.edit_codes.GetValue()
|
ps = codes_str.split("\n")
|
result = []
|
for p in ps:
|
if p.strip():
|
result.append(p.strip())
|
setting.set_ths_auto_click_positions(result)
|
|
time_space = self.edit_time_space.GetValue()
|
if len(time_space) == 0:
|
show_warning("点击时间间隔不正确", None)
|
return
|
|
refresh_time_space = self.edit_refresh_time_space.GetValue()
|
if len(refresh_time_space) == 0:
|
show_warning("刷新时间间隔不正确", None)
|
return
|
|
setting.set_ths_auto_click_time_space(time_space)
|
setting.set_ths_auto_refresh_time_space(refresh_time_space)
|
|
toastone = wx.MessageDialog(None, "更改成功", "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
self.Close()
|
toastone.Destroy()
|
|
|
class SocketApiUtil:
|
|
@classmethod
|
def __request(cls, data_json):
|
socket_util.encryp_client_params_sign(data_json)
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, 11009) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
client.send(socket_util.load_header(json.dumps(data_json).encode("utf-8")))
|
result_str, header = socket_util.recv_data(client)
|
return result_str
|
|
@classmethod
|
def get_cost_price(cls, code):
|
result_str = cls.__request({"type": "get_cost_price", "data": {"code": code}})
|
print(result_str)
|
result = json.loads(result_str)
|
if result['code'] == 0:
|
return result['data']['price']
|
raise Exception(result['msg'])
|
|
|
class mainApp(wx.App):
|
|
def __init_data(self):
|
pass
|
|
def OnInit(self):
|
self.SetAppName(APP_TITLE)
|
self.__init_data()
|
|
# self.__show_main_frame()
|
|
return True
|
|
|
def ths_auto_click():
|
hwnd = ths_util.get_ths_second_screen_menu_hwnd()
|
while True:
|
try:
|
if hwnd is None or not win32gui.IsWindowVisible(hwnd):
|
# print("未找到同花顺副屏句柄")
|
hwnd = ths_util.get_ths_second_screen_menu_hwnd()
|
|
if hwnd is None:
|
continue
|
if not setting.is_ths_auto_click():
|
continue
|
ps = setting.get_ths_auto_click_positions()
|
if not ps:
|
continue
|
ps_new = []
|
for p in ps:
|
p = eval(p)
|
ps_new.append(p)
|
space = setting.get_ths_auto_click_time_space()
|
if space is None:
|
space = 500
|
ths_util.betch_click(hwnd, ps_new, round(space / 1000, 4))
|
except Exception as e:
|
pass
|
finally:
|
time.sleep(0.02)
|
|
|
def ths_auto_refresh():
|
hwnd = ths_util.get_trade_refesh_hwnd()
|
count = 0
|
while True:
|
count += 1
|
if count > 10:
|
count = 0
|
hwnd = ths_util.get_trade_refesh_hwnd()
|
try:
|
if hwnd is None:
|
continue
|
# 测试
|
if not setting.is_ths_trade_auto_refresh():
|
continue
|
rect = win32gui.GetWindowRect(hwnd)
|
win32_util.visual_click(hwnd, (160, (rect[3] - rect[1]) // 2))
|
time_space = setting.get_ths_auto_refresh_time_space()
|
if time_space is None:
|
time_space = 500
|
time.sleep(round(time_space / 1000, 4))
|
except Exception as e:
|
pass
|
finally:
|
time.sleep(0.02)
|
|
|
if __name__ == "__main__1":
|
print(webview.WebView.IsBackendAvailable(webview.WebViewBackendEdge))
|
|
set_codes_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=10)
|
|
|
# 接受来自其他进程的数据
|
def recieve_data(pipe):
|
while True:
|
try:
|
value = pipe.recv()
|
data = json.loads(value)
|
print("接受到数据:", data)
|
type_ = data["type"]
|
if type_ == "juejin_setting":
|
wx.CallAfter(lambda: JueJinSettingFrame(data["pos"]).Show())
|
elif type_ == "manage_ths_pos":
|
wx.CallAfter(lambda: THSPositionSettingFrame(data["pos"]).Show())
|
elif type_ == "set_code":
|
code = data["code"]
|
set_codes_thread_pool.submit(set_xfdp_codes, code)
|
elif type_ == "show_dead_hwnds":
|
hwnds = win32_util.text_hwnds_doing
|
print("死亡窗口", hwnds)
|
if hwnds:
|
for hwnd in hwnds:
|
invalid_hwnds_manager.add_hwnd(hwnd)
|
elif type_ == "exit":
|
os._exit(0)
|
except Exception as e:
|
logging.exception(e)
|
|
|
from pynput.mouse import Listener
|
|
ocr_settings = {}
|
|
|
def set_xfdp_codes(code):
|
"""
|
设置悬浮盯盘中的代码
|
:param code:
|
:return:
|
"""
|
|
def focus(hwnd, code_):
|
try:
|
int_buffer = array.array("L", [0])
|
char_buffer = array.array('b',
|
json.dumps({"type": "set_code",
|
"data": {"code": code_, "time": int(time.time() * 1000)}}).encode())
|
int_buffer_address = int_buffer.buffer_info()[0]
|
char_buffer_address, char_buffer_size = char_buffer.buffer_info()
|
copy_struct = struct.pack("PLP", int_buffer_address, char_buffer_size, char_buffer_address)
|
win32gui.SendMessage(hwnd, win32con.WM_COPYDATA, None, copy_struct)
|
|
# pythoncom.CoInitialize()
|
# shell = client.Dispatch("WScript.Shell")
|
# shell.SendKeys('%')
|
# # win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
|
# win32gui.SetForegroundWindow(hwnd)
|
# win32gui.PumpMessages()
|
# win32gui.SetFocus(hwnd)
|
# lib = CDLL("D:/workspace/GP/dll/Dll/x64/Debug/Dll.dll")
|
# win32gui.SendMessage(hwnd, win32con.WM_SETFOCUS, 0, 0)
|
# print("执行结果", lib.focus(hwnd))
|
|
# remote_thread = win32process.GetWindowThreadProcessId(hwnd)[0]
|
# tid = win32api.GetCurrentThreadId()
|
# win32process.AttachThreadInput(tid, remote_thread, True)
|
# prev_handle = win32gui.SetFocus(hwnd)
|
# print("之前的焦点句柄", prev_handle,hwnd)
|
except Exception as e:
|
logging.exception(e)
|
pass
|
|
# 获取代码属性描述
|
# auto_focus = setting.get_float_frame_auto_focus()
|
if True:
|
# 窗口显示在最前面
|
hwnds = win32_util.search_window("悬浮盯盘")
|
if hwnds:
|
print("句柄", hwnds[0])
|
time.sleep(0.1)
|
focus(hwnds[0], code)
|
|
|
def on_mouse_click(x, y, button, pressed):
|
"""
|
鼠标点击事件
|
:param x:
|
:param y:
|
:param button:
|
:param pressed:
|
:return:
|
"""
|
if pressed:
|
# 点击事件过后1s内可截图识别代码
|
ocr_settings["expire_time"] = time.time() + 1
|
# print(f"鼠标在 ({x}, {y}) 位置按下,按钮: {button}")
|
else:
|
# print(f"鼠标在 ({x}, {y}) 位置释放,按钮: {button}")
|
pass
|
|
|
def __rec_code(pipe):
|
time.sleep(5)
|
while True:
|
if "expire_time" in ocr_settings and ocr_settings["expire_time"] > time.time():
|
try:
|
code = ths_ocr_util.ocr_ths_code()
|
if not code:
|
time.sleep(0.1)
|
continue
|
pipe.send(json.dumps({"code": code, "type": "code"}))
|
except Exception as e:
|
# logging.exception(e)
|
# print(str(e))
|
pass
|
time.sleep(0.005)
|
|
|
def run(pipe):
|
global p2
|
p1, p2 = multiprocessing.Pipe()
|
threading.Thread(target=lambda: __rec_code(pipe), daemon=True).start()
|
threading.Thread(target=lambda: recieve_data(pipe), daemon=True).start()
|
threading.Thread(target=lambda: ths_auto_click(), daemon=True).start()
|
threading.Thread(target=lambda: ths_auto_refresh(), daemon=True).start()
|
app = mainApp()
|
app.MainLoop()
|
|
with Listener(on_click=on_mouse_click) as listener:
|
listener.join()
|
|
while True:
|
time.sleep(2)
|
|
|
if __name__ == "__main__":
|
app = mainApp()
|
app.MainLoop()
|