import json
|
import random
|
import socket
|
import sys
|
import time
|
|
import win32gui
|
|
import constant
|
from utils import ths_ocr_util
|
import setting
|
from utils import socket_util
|
from utils import ths_util
|
import win32_util
|
|
# freeze_support()
|
import logging
|
import multiprocessing
|
import threading
|
|
import wx
|
import wx.html2 as webview
|
|
APP_TITLE = "卖票看板"
|
APP_ICON = ""
|
SOCKET_PORT = 11008
|
HTTP_PORT = 9004
|
|
|
def show_warning(content, click):
|
toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_WARNING)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
if click is not None:
|
click()
|
toastone.Destroy()
|
|
|
def show_info(content, click):
|
toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
click()
|
toastone.Destroy()
|
|
|
def show_sure(content, callback):
|
toastone = wx.MessageDialog(None, content, "提示", wx.YES_NO | wx.ICON_INFORMATION)
|
result = toastone.ShowModal()
|
if result == wx.ID_YES: # 如果点击了提示框的确定按钮
|
callback(True)
|
toastone.Destroy()
|
elif result == wx.ID_NO:
|
callback(False)
|
toastone.Destroy()
|
|
|
class JueJinSettingFrame(wx.Frame):
|
def __init__(self, position):
|
wx.Frame.__init__(self, None, -1, "掘金参数设置", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP,
|
size=(450, 200))
|
|
self.SetBackgroundColour(wx.Colour(224, 224, 224))
|
self.SetPosition(wx.Point(position[0] - self.GetSize()[0] / 2, position[1] - self.GetSize()[1] / 2))
|
boxsier = wx.BoxSizer()
|
flex = wx.FlexGridSizer(rows=3, cols=2, vgap=10, hgap=10)
|
# 策略
|
label = wx.StaticText(self, label="策略ID:")
|
flex.Add(label, flag=wx.ALIGN_RIGHT)
|
self.edit_celue = wx.TextCtrl(self, size=(300, -1))
|
flex.Add(self.edit_celue, flag=wx.EXPAND)
|
|
# token
|
label = wx.StaticText(self, label="Token:")
|
flex.Add(label)
|
self.edit_token = wx.TextCtrl(self, size=(300, -1), style=wx.TE_MULTILINE)
|
flex.Add(self.edit_token)
|
|
# 占位
|
flex.Add(wx.StaticText(self, label=""))
|
# 确定按钮
|
ID_SURE = wx.NewId()
|
self.btn_sure = wx.Button(self, label='确定', id=ID_SURE)
|
self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn)
|
flex.Add(self.btn_sure, 1, wx.TOP | wx.LEFT, 20)
|
|
boxsier.Add(flex, 1, wx.TOP | wx.LEFT, 20)
|
self.SetSizer(boxsier)
|
|
# 初始化数据
|
self.__init_data()
|
|
def __init_data(self):
|
strategy_id, token = setting.get_juejin_params()
|
self.edit_celue.SetLabelText(strategy_id)
|
self.edit_token.SetLabelText(token)
|
pass
|
|
def __sure_btn(self, event):
|
strategy_id = self.edit_celue.GetValue()
|
token = self.edit_token.GetValue()
|
setting.set_juejin_params(strategy_id, token)
|
toastone = wx.MessageDialog(None, "更改成功", "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
self.Close()
|
toastone.Destroy()
|
|
|
class FobiddenCodesFrame(wx.Frame):
|
def __init__(self, position):
|
wx.Frame.__init__(self, None, -1, "添加禁止交易代码", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP,
|
size=(170, 200))
|
self.SetBackgroundColour(wx.Colour(224, 224, 224))
|
self.SetPosition(wx.Point(position[0] - self.GetSize()[0] / 2, position[1] - self.GetSize()[1] / 2))
|
boxsier = wx.BoxSizer(wx.VERTICAL)
|
# 代码
|
label = wx.StaticText(self, label="代码:")
|
boxsier.Add(label, flag=wx.ALIGN_LEFT, border=10)
|
self.edit_codes = wx.TextCtrl(self, size=(150, 100), style=wx.TE_MULTILINE)
|
boxsier.Add(self.edit_codes)
|
|
# 确定按钮
|
ID_SURE = wx.NewId()
|
self.btn_sure = wx.Button(self, label='确定', id=ID_SURE)
|
self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn)
|
boxsier.Add(self.btn_sure)
|
root_boxsier = wx.BoxSizer(wx.HORIZONTAL)
|
root_boxsier.Add(boxsier, 1, wx.TOP | wx.TOP, 10)
|
self.SetSizer(root_boxsier)
|
|
def __request(self, codes):
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
data = {"type": 201, "data": {"codes": codes}}
|
client.send(json.dumps(data).encode("utf-8"))
|
client.close()
|
|
def __sure_btn(self, event):
|
codes_str = self.edit_codes.GetValue()
|
if codes_str:
|
codes_str = codes_str.strip()
|
|
codes = codes_str.split("\n")
|
codes_result = []
|
for code in codes:
|
if code.strip() and len(code.strip()) == 6:
|
codes_result.append(code.strip())
|
if len(codes_result) == 0:
|
show_warning("请填写正确的代码!", self.Close)
|
return
|
try:
|
self.__request(codes_result)
|
except Exception as e:
|
show_warning("添加出错:" + str(e), None)
|
return
|
|
toastone = wx.MessageDialog(None, "添加成功", "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
self.Close()
|
toastone.Destroy()
|
|
|
class THSPositionSettingFrame(wx.Frame):
|
def __init__(self, position):
|
wx.Frame.__init__(self, None, -1, "同花顺坐标设置", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP,
|
size=(170, 400))
|
self.SetBackgroundColour(wx.Colour(224, 224, 224))
|
self.SetPosition(wx.Point(position[0] - self.GetSize()[0] / 2, position[1] - self.GetSize()[1] / 2))
|
boxsier = wx.BoxSizer(wx.VERTICAL)
|
|
label = wx.StaticText(self, label="交易刷新事件间隔(ms):")
|
boxsier.Add(label)
|
self.edit_refresh_time_space = wx.TextCtrl(self, size=(150, -1))
|
boxsier.Add(self.edit_refresh_time_space)
|
|
label = wx.StaticText(self, label="点击时间间隔(ms):")
|
boxsier.Add(label)
|
self.edit_time_space = wx.TextCtrl(self, size=(150, -1))
|
boxsier.Add(self.edit_time_space)
|
|
# 代码
|
label = wx.StaticText(self, label="坐标:")
|
boxsier.Add(label)
|
self.edit_codes = wx.TextCtrl(self, size=(150, 200), style=wx.TE_MULTILINE)
|
boxsier.Add(self.edit_codes)
|
|
# 确定按钮
|
ID_SURE = wx.NewId()
|
self.btn_sure = wx.Button(self, label='确定', id=ID_SURE)
|
self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn)
|
boxsier.Add(self.btn_sure)
|
root_boxsier = wx.BoxSizer(wx.HORIZONTAL)
|
root_boxsier.Add(boxsier, 1, wx.TOP | wx.TOP, 10)
|
self.SetSizer(root_boxsier)
|
|
# 初始化数据
|
self.__init_data()
|
|
def __init_data(self):
|
pos = setting.get_ths_auto_click_positions()
|
self.edit_codes.SetValue("\n".join(pos))
|
space = setting.get_ths_auto_click_time_space()
|
if space is None:
|
space = 500
|
self.edit_time_space.SetValue(f"{space}")
|
|
space = setting.get_ths_auto_refresh_time_space()
|
if space is None:
|
space = 500
|
self.edit_refresh_time_space.SetValue(f"{space}")
|
|
def __sure_btn(self, event):
|
codes_str = self.edit_codes.GetValue()
|
ps = codes_str.split("\n")
|
result = []
|
for p in ps:
|
if p.strip():
|
result.append(p.strip())
|
setting.set_ths_auto_click_positions(result)
|
|
time_space = self.edit_time_space.GetValue()
|
if len(time_space) == 0:
|
show_warning("点击时间间隔不正确", None)
|
return
|
|
refresh_time_space = self.edit_refresh_time_space.GetValue()
|
if len(refresh_time_space) == 0:
|
show_warning("刷新时间间隔不正确", None)
|
return
|
|
setting.set_ths_auto_click_time_space(time_space)
|
setting.set_ths_auto_refresh_time_space(refresh_time_space)
|
|
toastone = wx.MessageDialog(None, "更改成功", "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
self.Close()
|
toastone.Destroy()
|
|
|
class SocketApiUtil:
|
|
@classmethod
|
def __request(cls, data_json):
|
socket_util.encryp_client_params_sign(data_json)
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, 11009) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
client.send(socket_util.load_header(json.dumps(data_json).encode("utf-8")))
|
result_str, header = socket_util.recv_data(client)
|
return result_str
|
|
@classmethod
|
def get_cost_price(cls, code):
|
result_str = cls.__request({"type": "get_cost_price", "data": {"code": code}})
|
print(result_str)
|
result = json.loads(result_str)
|
if result['code'] == 0:
|
return result['data']['price']
|
raise Exception(result['msg'])
|
|
|
class mainApp(wx.App):
|
|
def __refresh(self):
|
while True:
|
if "expire_time" in ocr_settings and ocr_settings["expire_time"] > time.time():
|
print("识别")
|
try:
|
code = ths_ocr_util.ocr_ths_code()
|
if not code:
|
time.sleep(0.1)
|
continue
|
global_datas["pipe"].send(json.dumps({"code": code, "type": "code"}))
|
except Exception as e:
|
logging.exception(e)
|
# print(str(e))
|
pass
|
time.sleep(0.005)
|
|
def __init_data(self):
|
pass
|
|
def OnInit(self):
|
self.SetAppName(APP_TITLE)
|
self.__init_data()
|
t1 = threading.Thread(target=lambda: self.__refresh())
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
# self.__show_main_frame()
|
|
return True
|
|
|
def ths_auto_click():
|
hwnd = ths_util.get_ths_second_screen_menu_hwnd()
|
while True:
|
try:
|
if hwnd is None or not win32gui.IsWindowVisible(hwnd):
|
# print("未找到同花顺副屏句柄")
|
hwnd = ths_util.get_ths_second_screen_menu_hwnd()
|
|
if hwnd is None:
|
continue
|
if not setting.is_ths_auto_click():
|
continue
|
ps = setting.get_ths_auto_click_positions()
|
if not ps:
|
continue
|
ps_new = []
|
for p in ps:
|
p = eval(p)
|
ps_new.append(p)
|
space = setting.get_ths_auto_click_time_space()
|
if space is None:
|
space = 500
|
ths_util.betch_click(hwnd, ps_new, round(space / 1000, 4))
|
except Exception as e:
|
pass
|
finally:
|
time.sleep(0.02)
|
|
|
def ths_auto_refresh():
|
hwnd = ths_util.get_trade_refesh_hwnd()
|
count = 0
|
while True:
|
count += 1
|
if count > 10:
|
count = 0
|
hwnd = ths_util.get_trade_refesh_hwnd()
|
try:
|
if hwnd is None:
|
continue
|
# 测试
|
if not setting.is_ths_trade_auto_refresh():
|
continue
|
rect = win32gui.GetWindowRect(hwnd)
|
win32_util.visual_click(hwnd, (160, (rect[3] - rect[1]) // 2))
|
time_space = setting.get_ths_auto_refresh_time_space()
|
if time_space is None:
|
time_space = 500
|
time.sleep(round(time_space / 1000, 4))
|
except Exception as e:
|
pass
|
finally:
|
time.sleep(0.02)
|
|
|
if __name__ == "__main__1":
|
print(webview.WebView.IsBackendAvailable(webview.WebViewBackendEdge))
|
|
global_datas = {
|
|
}
|
|
|
# 接受来自其他进程的数据
|
def recieve_data(pipe):
|
while True:
|
try:
|
value = pipe.recv()
|
data = json.loads(value)
|
print("接受到数据:", data)
|
type_ = data["type"]
|
if type_ == "juejin_setting":
|
wx.CallAfter(lambda: JueJinSettingFrame(data["pos"]).Show())
|
elif type_ == "manage_ths_pos":
|
wx.CallAfter(lambda: THSPositionSettingFrame(data["pos"]).Show())
|
elif type_ == "show_float_callback":
|
wx.CallAfter(lambda: global_datas["floatFrame"].Show())
|
elif type_ == "show_main_callback":
|
wx.CallAfter(lambda: global_datas["tickFrame"].Show())
|
elif type_ == "exit":
|
pass
|
wx.CallAfter(lambda: sys.exit())
|
except Exception as e:
|
logging.exception(e)
|
|
|
from pynput.mouse import Listener
|
|
ocr_settings = {}
|
|
|
def on_mouse_click(x, y, button, pressed):
|
"""
|
鼠标点击事件
|
:param x:
|
:param y:
|
:param button:
|
:param pressed:
|
:return:
|
"""
|
if pressed:
|
# 点击事件过后1s内可截图识别代码
|
ocr_settings["expire_time"] = time.time() + 1
|
|
print(f"鼠标在 ({x}, {y}) 位置按下,按钮: {button}")
|
else:
|
print(f"鼠标在 ({x}, {y}) 位置释放,按钮: {button}")
|
|
|
def run(pipe):
|
global_datas["pipe"] = pipe
|
global p2
|
p1, p2 = multiprocessing.Pipe()
|
|
t1 = threading.Thread(target=lambda: recieve_data(pipe))
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
|
t2 = threading.Thread(target=lambda: ths_auto_click())
|
# 后台运行
|
t2.setDaemon(True)
|
t2.start()
|
|
t3 = threading.Thread(target=lambda: ths_auto_refresh())
|
# 后台运行
|
t3.setDaemon(True)
|
t3.start()
|
app = mainApp()
|
global_datas["app"] = app
|
app.MainLoop()
|
|
with Listener(on_click=on_mouse_click) as listener:
|
listener.join()
|
|
while True:
|
time.sleep(2)
|
|
|
if __name__ == "__main__":
|
app = mainApp()
|
app.MainLoop()
|