import array
|
import ctypes
|
import json
|
import queue
|
import random
|
import socket
|
import struct
|
import sys
|
import time
|
|
import cv2
|
import win32con
|
import win32gui
|
|
import constant
|
import juejin_data_export
|
from log import debug_logger
|
from utils import ocr_util, ths_ocr_util
|
from utils import opencv_util
|
import setting
|
from utils import socket_util
|
from utils import ths_util
|
import win32_util
|
|
# freeze_support()
|
import logging
|
import multiprocessing
|
import threading
|
|
import matplotlib.pyplot as plt
|
from matplotlib.widgets import Button
|
|
import wx
|
import wx.html2 as webview
|
from matplotlib.backends.backend_wxagg import FigureCanvasWxAgg as FigureCanvas
|
from matplotlib.figure import Figure
|
|
import code_data_manager
|
import juejin_core
|
from utils import tool
|
import requests
|
|
from sell_processor import TickDataProcess
|
|
APP_TITLE = "卖票看板"
|
APP_ICON = ""
|
SOCKET_PORT = 11008
|
HTTP_PORT = 9004
|
|
|
def show_warning(content, click):
|
toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_WARNING)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
if click is not None:
|
click()
|
toastone.Destroy()
|
|
|
def show_info(content, click):
|
toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
click()
|
toastone.Destroy()
|
|
|
def show_sure(content, callback):
|
toastone = wx.MessageDialog(None, content, "提示", wx.YES_NO | wx.ICON_INFORMATION)
|
result = toastone.ShowModal()
|
if result == wx.ID_YES: # 如果点击了提示框的确定按钮
|
callback(True)
|
toastone.Destroy()
|
elif result == wx.ID_NO:
|
callback(False)
|
toastone.Destroy()
|
|
|
# 掘金分时下载
|
class JueJinTickDataDownloadFrame(wx.Frame):
|
def __init__(self):
|
wx.Frame.__init__(self, None, -1, "掘金分时数据下载", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP,
|
size=(350, 250))
|
|
self.SetBackgroundColour(wx.Colour(224, 224, 224))
|
boxsier = wx.BoxSizer()
|
flex = wx.FlexGridSizer(rows=4, cols=2, vgap=10, hgap=10)
|
# 策略
|
label = wx.StaticText(self, label="代码:")
|
flex.Add(label, flag=wx.ALIGN_RIGHT)
|
self.edit_code = wx.TextCtrl(self, size=(100, -1))
|
flex.Add(self.edit_code)
|
|
# 目录
|
label = wx.StaticText(self, label="目录选择:")
|
flex.Add(label)
|
|
bs1 = wx.BoxSizer(wx.VERTICAL)
|
self.btn_dir = wx.Button(self, label="选择目录")
|
bs1.Add(self.btn_dir)
|
self.label_path = wx.StaticText(self, label="")
|
bs1.Add(self.label_path)
|
flex.Add(bs1)
|
|
label = wx.StaticText(self, label="时间选择:")
|
flex.Add(label)
|
self.edit_start_time = wx.TextCtrl(self, size=(100, -1))
|
self.edit_end_time = wx.TextCtrl(self, size=(100, -1))
|
bs1 = wx.BoxSizer(wx.HORIZONTAL)
|
bs1.Add(self.edit_start_time, 1, wx.RIGHT, 10)
|
label = wx.StaticText(self, label="-")
|
bs1.Add(label)
|
bs1.Add(self.edit_end_time, 1, wx.LEFT, 10)
|
flex.Add(bs1)
|
|
# 占位
|
flex.Add(wx.StaticText(self, label=""))
|
# 确定按钮
|
ID_SURE = wx.NewId()
|
self.btn_sure = wx.Button(self, label='确定', id=ID_SURE)
|
self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn)
|
self.btn_dir.Bind(wx.EVT_BUTTON, self.__choose_dir)
|
flex.Add(self.btn_sure, 1, wx.TOP | wx.LEFT, 20)
|
|
boxsier.Add(flex, 1, wx.TOP | wx.LEFT, 20)
|
self.SetSizer(boxsier)
|
|
# 赋初值
|
self.edit_start_time.SetValue("09:30:00")
|
self.edit_end_time.SetValue("10:00:00")
|
|
def __choose_dir(self, event):
|
dialog = wx.DirDialog(None, "选择保存目录:")
|
if dialog.ShowModal() == wx.ID_OK:
|
path = dialog.GetPath()
|
self.label_path.SetLabel(path)
|
dialog.Destroy()
|
|
def __sure_btn(self, event):
|
code = self.edit_code.GetValue()
|
start_time = self.edit_start_time.GetValue()
|
end_time = self.edit_end_time.GetValue()
|
dir_path = self.label_path.GetLabel()
|
if code and start_time and end_time and dir_path:
|
juejin_data_export.export_tick_data(code, start_time, end_time, dir_path)
|
show_info("导出成功", None)
|
else:
|
show_warning("参数不完整", None)
|
|
|
class JueJinSettingFrame(wx.Frame):
|
def __init__(self, position):
|
wx.Frame.__init__(self, None, -1, "掘金参数设置", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP,
|
size=(450, 200))
|
|
self.SetBackgroundColour(wx.Colour(224, 224, 224))
|
self.SetPosition(wx.Point(position[0] - self.GetSize()[0] / 2, position[1] - self.GetSize()[1] / 2))
|
boxsier = wx.BoxSizer()
|
flex = wx.FlexGridSizer(rows=3, cols=2, vgap=10, hgap=10)
|
# 策略
|
label = wx.StaticText(self, label="策略ID:")
|
flex.Add(label, flag=wx.ALIGN_RIGHT)
|
self.edit_celue = wx.TextCtrl(self, size=(300, -1))
|
flex.Add(self.edit_celue, flag=wx.EXPAND)
|
|
# token
|
label = wx.StaticText(self, label="Token:")
|
flex.Add(label)
|
self.edit_token = wx.TextCtrl(self, size=(300, -1), style=wx.TE_MULTILINE)
|
flex.Add(self.edit_token)
|
|
# 占位
|
flex.Add(wx.StaticText(self, label=""))
|
# 确定按钮
|
ID_SURE = wx.NewId()
|
self.btn_sure = wx.Button(self, label='确定', id=ID_SURE)
|
self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn)
|
flex.Add(self.btn_sure, 1, wx.TOP | wx.LEFT, 20)
|
|
boxsier.Add(flex, 1, wx.TOP | wx.LEFT, 20)
|
self.SetSizer(boxsier)
|
|
# 初始化数据
|
self.__init_data()
|
|
def __init_data(self):
|
strategy_id, token = setting.get_juejin_params()
|
self.edit_celue.SetLabelText(strategy_id)
|
self.edit_token.SetLabelText(token)
|
pass
|
|
def __sure_btn(self, event):
|
strategy_id = self.edit_celue.GetValue()
|
token = self.edit_token.GetValue()
|
setting.set_juejin_params(strategy_id, token)
|
toastone = wx.MessageDialog(None, "更改成功", "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
self.Close()
|
toastone.Destroy()
|
|
|
class CodesSettingFrame(wx.Frame):
|
def __init__(self, position, callback):
|
wx.Frame.__init__(self, None, -1, "代码设置", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP,
|
size=(170, 300))
|
self.SetBackgroundColour(wx.Colour(224, 224, 224))
|
self.SetPosition(wx.Point(position[0] - self.GetSize()[0] / 2, position[1] - self.GetSize()[1] / 2))
|
boxsier = wx.BoxSizer(wx.VERTICAL)
|
# 客户端标识
|
label = wx.StaticText(self, label="客户端标识:")
|
boxsier.Add(label, 0, wx.LEFT | wx.RIGHT, 5)
|
self.edit_client = wx.TextCtrl(self)
|
boxsier.Add(self.edit_client, 0, wx.LEFT | wx.RIGHT, 5)
|
|
# 代码
|
label = wx.StaticText(self, label="目标代码:")
|
boxsier.Add(label, 0, wx.LEFT | wx.RIGHT, 5)
|
|
# 自动填充按钮
|
self.btn_position = wx.Button(self, label='填充持仓代码')
|
self.btn_position.Bind(wx.EVT_BUTTON, self.__fill_position_codes)
|
boxsier.Add(self.btn_position, 0, wx.LEFT | wx.RIGHT | wx.BOTTOM, 5)
|
|
self.edit_codes = wx.TextCtrl(self, size=(150, 120), style=wx.TE_MULTILINE)
|
boxsier.Add(self.edit_codes, 0, wx.LEFT | wx.RIGHT, 5)
|
|
# 确定按钮
|
ID_SURE = random.randint(1000, 1200)
|
self.btn_sure = wx.Button(self, label='确定', id=ID_SURE)
|
self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn)
|
boxsier.Add(self.btn_sure, 0, wx.LEFT | wx.TOP, 5 | 5)
|
root_boxsier = wx.BoxSizer(wx.HORIZONTAL)
|
root_boxsier.Add(boxsier, 1, wx.TOP | wx.TOP, 10)
|
self.SetSizer(root_boxsier)
|
|
# 初始化数据
|
self.__init_data()
|
self.callback = callback
|
|
def __init_data(self):
|
codes = juejin_core.GPCodeManager().get_codes()
|
self.edit_codes.SetValue("\n".join(codes))
|
|
client = setting.get_client()
|
if client:
|
self.edit_client.SetValue(client)
|
else:
|
self.edit_client.SetValue("")
|
|
pass
|
|
def __sure_btn(self, event):
|
codes_str = self.edit_codes.GetValue()
|
client = self.edit_client.GetValue()
|
setting.set_client(client)
|
codes = codes_str.split("\n")
|
codes_result = []
|
for code in codes:
|
if code.strip():
|
codes_result.append(code.strip())
|
juejin_core.GPCodeManager().set_codes(codes_result)
|
|
# 重新订阅
|
self.callback()
|
toastone = wx.MessageDialog(None, "更改成功", "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
self.Close()
|
toastone.Destroy()
|
|
@classmethod
|
def get_position_codes(cls):
|
def request_position_codes():
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
try:
|
data = socket_util.encryp_client_params_sign(
|
{"type": "common", "data": {"ctype": "get_position_codes"}})
|
client.send(socket_util.load_header(json.dumps(data).encode("utf-8")))
|
result_str, header_str = socket_util.recv_data(client)
|
return result_str
|
finally:
|
client.close()
|
|
result_str = request_position_codes()
|
result = json.loads(result_str)
|
if result["code"] == 0:
|
return result["data"]
|
else:
|
raise Exception(result["msg"])
|
|
# 填充持仓代码
|
def __fill_position_codes(self, event):
|
def request_position_codes():
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
try:
|
data = socket_util.encryp_client_params_sign(
|
{"type": "common", "data": {"ctype": "get_position_codes"}})
|
client.send(socket_util.load_header(json.dumps(data).encode("utf-8")))
|
result_str, header_str = socket_util.recv_data(client)
|
return result_str
|
finally:
|
client.close()
|
|
try:
|
codes = self.get_position_codes()
|
if codes:
|
self.edit_codes.SetValue("\n".join(codes))
|
else:
|
raise Exception("无持仓")
|
except Exception as e:
|
show_warning(str(e), None)
|
|
|
class FobiddenCodesFrame(wx.Frame):
|
def __init__(self, position):
|
wx.Frame.__init__(self, None, -1, "添加禁止交易代码", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP,
|
size=(170, 200))
|
self.SetBackgroundColour(wx.Colour(224, 224, 224))
|
self.SetPosition(wx.Point(position[0] - self.GetSize()[0] / 2, position[1] - self.GetSize()[1] / 2))
|
boxsier = wx.BoxSizer(wx.VERTICAL)
|
# 代码
|
label = wx.StaticText(self, label="代码:")
|
boxsier.Add(label, flag=wx.ALIGN_LEFT, border=10)
|
self.edit_codes = wx.TextCtrl(self, size=(150, 100), style=wx.TE_MULTILINE)
|
boxsier.Add(self.edit_codes)
|
|
# 确定按钮
|
ID_SURE = wx.NewId()
|
self.btn_sure = wx.Button(self, label='确定', id=ID_SURE)
|
self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn)
|
boxsier.Add(self.btn_sure)
|
root_boxsier = wx.BoxSizer(wx.HORIZONTAL)
|
root_boxsier.Add(boxsier, 1, wx.TOP | wx.TOP, 10)
|
self.SetSizer(root_boxsier)
|
|
def __request(self, codes):
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
data = {"type": 201, "data": {"codes": codes}}
|
client.send(json.dumps(data).encode("utf-8"))
|
client.close()
|
|
def __sure_btn(self, event):
|
codes_str = self.edit_codes.GetValue()
|
if codes_str:
|
codes_str = codes_str.strip()
|
|
codes = codes_str.split("\n")
|
codes_result = []
|
for code in codes:
|
if code.strip() and len(code.strip()) == 6:
|
codes_result.append(code.strip())
|
if len(codes_result) == 0:
|
show_warning("请填写正确的代码!", self.Close)
|
return
|
try:
|
self.__request(codes_result)
|
except Exception as e:
|
show_warning("添加出错:" + str(e), None)
|
return
|
|
toastone = wx.MessageDialog(None, "添加成功", "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
self.Close()
|
toastone.Destroy()
|
|
|
class THSPositionSettingFrame(wx.Frame):
|
def __init__(self, position):
|
wx.Frame.__init__(self, None, -1, "同花顺坐标设置", style=wx.SYSTEM_MENU ^ wx.CLOSE_BOX ^ wx.CAPTION ^ wx.STAY_ON_TOP,
|
size=(170, 400))
|
self.SetBackgroundColour(wx.Colour(224, 224, 224))
|
self.SetPosition(wx.Point(position[0] - self.GetSize()[0] / 2, position[1] - self.GetSize()[1] / 2))
|
boxsier = wx.BoxSizer(wx.VERTICAL)
|
|
label = wx.StaticText(self, label="交易刷新事件间隔(ms):")
|
boxsier.Add(label)
|
self.edit_refresh_time_space = wx.TextCtrl(self, size=(150, -1))
|
boxsier.Add(self.edit_refresh_time_space)
|
|
label = wx.StaticText(self, label="点击时间间隔(ms):")
|
boxsier.Add(label)
|
self.edit_time_space = wx.TextCtrl(self, size=(150, -1))
|
boxsier.Add(self.edit_time_space)
|
|
# 代码
|
label = wx.StaticText(self, label="坐标:")
|
boxsier.Add(label)
|
self.edit_codes = wx.TextCtrl(self, size=(150, 200), style=wx.TE_MULTILINE)
|
boxsier.Add(self.edit_codes)
|
|
# 确定按钮
|
ID_SURE = wx.NewId()
|
self.btn_sure = wx.Button(self, label='确定', id=ID_SURE)
|
self.btn_sure.Bind(wx.EVT_BUTTON, self.__sure_btn)
|
boxsier.Add(self.btn_sure)
|
root_boxsier = wx.BoxSizer(wx.HORIZONTAL)
|
root_boxsier.Add(boxsier, 1, wx.TOP | wx.TOP, 10)
|
self.SetSizer(root_boxsier)
|
|
# 初始化数据
|
self.__init_data()
|
|
def __init_data(self):
|
pos = setting.get_ths_auto_click_positions()
|
self.edit_codes.SetValue("\n".join(pos))
|
space = setting.get_ths_auto_click_time_space()
|
if space is None:
|
space = 500
|
self.edit_time_space.SetValue(f"{space}")
|
|
space = setting.get_ths_auto_refresh_time_space()
|
if space is None:
|
space = 500
|
self.edit_refresh_time_space.SetValue(f"{space}")
|
|
def __sure_btn(self, event):
|
codes_str = self.edit_codes.GetValue()
|
ps = codes_str.split("\n")
|
result = []
|
for p in ps:
|
if p.strip():
|
result.append(p.strip())
|
setting.set_ths_auto_click_positions(result)
|
|
time_space = self.edit_time_space.GetValue()
|
if len(time_space) == 0:
|
show_warning("点击时间间隔不正确", None)
|
return
|
|
refresh_time_space = self.edit_refresh_time_space.GetValue()
|
if len(refresh_time_space) == 0:
|
show_warning("刷新时间间隔不正确", None)
|
return
|
|
setting.set_ths_auto_click_time_space(time_space)
|
setting.set_ths_auto_refresh_time_space(refresh_time_space)
|
|
toastone = wx.MessageDialog(None, "更改成功", "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
self.Close()
|
toastone.Destroy()
|
|
class FloatFrame(wx.Frame):
|
def __init__(self):
|
wx.Frame.__init__(self, None, -1, "悬浮盯盘", style=wx.CAPTION ^ wx.MINIMIZE_BOX ^ wx.CLOSE_BOX ^ wx.STAY_ON_TOP,
|
size=(435, 220))
|
#
|
self.SetBackgroundColour(wx.Colour(224, 224, 224))
|
self.SetTransparent(230)
|
self.Bind(wx.EVT_CLOSE, self.OnExit)
|
|
# 设置字体大小
|
font = self.GetFont()
|
font.SetPixelSize(wx.Size(0, 11))
|
self.SetFont(font)
|
|
# 读取配置信息
|
window_info = setting.get_float_watch_window_info()
|
if window_info:
|
self.SetPosition(wx.Point(window_info[0], window_info[1]))
|
# self.Size = wx.Size(window_info[2], window_info[3])
|
|
boxsier = wx.BoxSizer(wx.VERTICAL)
|
|
# ------所有控件定义--------
|
btn_width = 60
|
|
self.btn_already_canceled = wx.Button(self, label="撤单", size=(-1, 40))
|
self.btn_sell = wx.Button(self, label="卖100%", size=(-1, 40))
|
font = self.btn_already_canceled.GetFont()
|
font.SetPixelSize(wx.Size(0, 16))
|
self.btn_already_canceled.SetFont(font)
|
self.btn_sell.SetFont(font)
|
|
self.btn_remove_black = wx.Button(self, label="移除黑名单", size=(btn_width, 30))
|
|
self.btn_close_buy = wx.Button(self, label="关闭交易", size=(65, 30))
|
self.btn_close_buy.SetForegroundColour("#FF3232")
|
|
self.btn_remove_white = wx.Button(self, label="移除白名单", size=(btn_width, 30))
|
|
self.btn_open_buy = wx.Button(self, label="开启交易", size=(65, 30), )
|
self.btn_open_buy.SetForegroundColour("#00e600")
|
|
# 买想买
|
self.btn_buy_mode_want = wx.Button(self, label="仅买想买", size=(65, 30))
|
self.btn_buy_mode_all = wx.Button(self, label="全部都买", size=(65, 30), )
|
# self.btn_buy_mode_all.SetForegroundColour("#00e600")
|
|
self.check_auto_refresh = wx.CheckBox(self, size=(-1, -1), label="交易刷新")
|
self.want_list = wx.Button(self, label="想买单", size=(40, 20))
|
self.btn_want_buy = wx.Button(self, label="加入想买单", size=(btn_width, 30))
|
|
self.check_auto_click = wx.CheckBox(self, size=(-1, -1), label="分组刷新")
|
|
self.check_focus = wx.CheckBox(self, size=(-1, -1), label="抢占焦点")
|
|
self.check_trade_quick_key = wx.CheckBox(self, size=(-1, -1), label="交易快捷键")
|
|
self.btn_want_buy_remove = wx.Button(self, label="移除想买单", size=(btn_width, 30))
|
|
self.edit_code = wx.TextCtrl(self, size=(80, -1))
|
self.edit_code.Show(False)
|
|
self.pause_buy_list = wx.Button(self, label="不买单", size=(40, 20))
|
self.btn_pause_buy = wx.Button(self, label="加入暂不买", size=(btn_width, 30))
|
|
# 代码
|
self.btn_black = wx.Button(self, label="加入黑名单", size=(btn_width, 30))
|
self.black_list = wx.Button(self, label="黑名单", size=(40, 20))
|
self.black_list.SetForegroundColour("#00e600")
|
|
self.btn_white = wx.Button(self, label="加入白名单", size=(btn_width, 30))
|
self.white_list = wx.Button(self, label="白名单", size=(40, 20))
|
self.white_list.SetForegroundColour("#FF3232")
|
|
self.btn_pause_buy_remove = wx.Button(self, label="移除暂不买", size=(btn_width, 30))
|
|
# ------布局--------
|
# 第1排 撤单与卖
|
bs1 = wx.BoxSizer(wx.HORIZONTAL)
|
bs1.Add(self.btn_already_canceled, 1, wx.ALL | wx.EXPAND, 2)
|
bs1.Add(self.btn_sell, 1, wx.ALL | wx.EXPAND, 2)
|
boxsier.Add(bs1, 0, wx.ALL | wx.EXPAND, 0)
|
boxsier.Add(wx.StaticText(self, size=wx.Size(-1, 5)), 0, wx.ALL | wx.EXPAND, 0)
|
|
# 第2排 白名单 黑名单 暂不买单 想买单
|
bs1 = wx.BoxSizer(wx.HORIZONTAL)
|
# 白名单
|
bs2 = wx.BoxSizer(wx.HORIZONTAL)
|
bs_v = wx.BoxSizer(wx.VERTICAL)
|
bs_v.Add(self.btn_white)
|
bs_v.Add(self.btn_remove_white)
|
bs2.Add(bs_v)
|
bs2.Add(self.white_list, 0, wx.ALL | wx.EXPAND, 0)
|
bs1.Add(bs2, 1, wx.ALL | wx.EXPAND, 1)
|
|
# 黑名单
|
bs2 = wx.BoxSizer(wx.HORIZONTAL)
|
bs_v = wx.BoxSizer(wx.VERTICAL)
|
bs_v.Add(self.btn_black)
|
bs_v.Add(self.btn_remove_black)
|
bs2.Add(bs_v)
|
bs2.Add(self.black_list, 0, wx.ALL | wx.EXPAND, 0)
|
bs1.Add(bs2, 1, wx.ALL | wx.EXPAND, 1)
|
|
# 暂不买单
|
bs2 = wx.BoxSizer(wx.HORIZONTAL)
|
bs_v = wx.BoxSizer(wx.VERTICAL)
|
bs_v.Add(self.btn_pause_buy)
|
bs_v.Add(self.btn_pause_buy_remove)
|
bs2.Add(bs_v)
|
bs2.Add(self.pause_buy_list, 0, wx.ALL | wx.EXPAND, 0)
|
bs1.Add(bs2, 1, wx.ALL | wx.EXPAND, 1)
|
|
# 想买单
|
bs2 = wx.BoxSizer(wx.HORIZONTAL)
|
bs_v = wx.BoxSizer(wx.VERTICAL)
|
bs_v.Add(self.btn_want_buy)
|
bs_v.Add(self.btn_want_buy_remove)
|
bs2.Add(bs_v)
|
bs2.Add(self.want_list, 0, wx.ALL | wx.EXPAND, 0)
|
bs1.Add(bs2, 1, wx.ALL | wx.EXPAND, 1)
|
|
boxsier.Add(bs1, 0, wx.ALL | wx.EXPAND, 0)
|
boxsier.Add(wx.StaticText(self, size=wx.Size(-1, 5)))
|
# 第3排 交易操作位
|
bs1 = wx.BoxSizer(wx.HORIZONTAL)
|
|
bs_v = wx.BoxSizer(wx.VERTICAL)
|
bs_v.Add(self.check_auto_refresh)
|
bs_v.Add(self.check_auto_click)
|
bs1.Add(bs_v)
|
|
bs_v = wx.BoxSizer(wx.VERTICAL)
|
bs_v.Add(self.check_focus)
|
bs_v.Add(self.check_trade_quick_key)
|
bs1.Add(bs_v, 0, wx.LEFT, 5)
|
|
blank_label = wx.StaticText(self)
|
bs1.Add(blank_label, 1, wx.ALL | wx.EXPAND, 1)
|
bs1.Add(self.btn_close_buy)
|
bs1.Add(self.btn_open_buy)
|
bs1.Add(self.btn_buy_mode_want)
|
bs1.Add(self.btn_buy_mode_all)
|
|
boxsier.Add(bs1, 0, wx.ALL | wx.EXPAND, 1)
|
|
# 绑定
|
self.btn_open_buy.Bind(wx.EVT_BUTTON, self.__open_buy)
|
self.btn_close_buy.Bind(wx.EVT_BUTTON, self.__close_buy)
|
|
self.btn_black.Bind(wx.EVT_BUTTON, self.add_black)
|
self.btn_white.Bind(wx.EVT_BUTTON, self.add_white)
|
self.btn_remove_black.Bind(wx.EVT_BUTTON, self.remove_from_black)
|
self.btn_remove_white.Bind(wx.EVT_BUTTON, self.remove_from_white)
|
|
self.btn_want_buy.Bind(wx.EVT_BUTTON, self.add_want)
|
self.btn_want_buy_remove.Bind(wx.EVT_BUTTON, self.remove_from_want)
|
|
self.btn_pause_buy.Bind(wx.EVT_BUTTON, self.add_pause_buy)
|
self.btn_pause_buy_remove.Bind(wx.EVT_BUTTON, self.remove_from_pause_buy)
|
|
self.btn_buy_mode_want.Bind(wx.EVT_BUTTON, lambda e: self.__set_trade_mode(e, 1))
|
self.btn_buy_mode_all.Bind(wx.EVT_BUTTON, lambda e: self.__set_trade_mode(e, 0))
|
|
self.btn_already_canceled.Bind(wx.EVT_BUTTON, self.cancel_buy)
|
|
self.check_auto_click.Bind(wx.EVT_CHECKBOX, self.__auto_click_check)
|
self.check_auto_refresh.Bind(wx.EVT_CHECKBOX, self.__auto_refresh_check)
|
self.check_focus.Bind(wx.EVT_CHECKBOX, self.__auto_focus_check)
|
self.check_trade_quick_key.Bind(wx.EVT_CHECKBOX, self.__trade_quick_key_check)
|
self.white_list.Bind(wx.EVT_BUTTON, lambda e: self.show_list(e, "白名单列表", 302))
|
self.black_list.Bind(wx.EVT_BUTTON, lambda e: self.show_list(e, "黑名单列表", 301))
|
self.want_list.Bind(wx.EVT_BUTTON, lambda e: self.show_list(e, "想要买列表", 403))
|
self.pause_buy_list.Bind(wx.EVT_BUTTON, lambda e: self.show_list(e, "暂不买列表", 413))
|
|
root_boxsier = wx.BoxSizer(wx.VERTICAL)
|
self.label_attribute = wx.StaticText(self, label="")
|
self.label_attribute.SetForegroundColour(wx.Colour(255, 0, 0))
|
root_boxsier.Add(self.label_attribute, 0, wx.LEFT | wx.TOP | wx.RIGHT, 5)
|
root_boxsier.Add(boxsier, 1, wx.ALL, 5)
|
|
self.SetSizer(root_boxsier)
|
|
self.__bind_hot_keys()
|
# 初始化数据
|
self.__init_data()
|
|
self.timer = wx.Timer(self) # 创建定时器
|
self.Bind(wx.EVT_TIMER, self.clear_msg, self.timer)
|
|
def clear_msg(self, event):
|
self.label_attribute.SetLabelText("")
|
self.label_attribute.SetForegroundColour("#000000")
|
|
def __ocr_code(self):
|
code = self.edit_code.GetValue()
|
if code is not None and len(code.strip()) == 0:
|
code = None
|
if code is not None:
|
if len(code) != 6:
|
self.show_warning("请填写正确的代码")
|
return
|
else:
|
return
|
|
code = ths_ocr_util.ocr_ths_code()
|
if code is None:
|
raise Exception("代码识别出错")
|
self.edit_code.SetValue(code)
|
|
def show_warning(self, content):
|
self.label_attribute.SetLabel(content)
|
self.label_attribute.SetForegroundColour("#FF7F27")
|
self.timer.Stop()
|
self.timer.StartOnce(20000)
|
|
def show_info(self, content):
|
self.label_attribute.SetLabel(content)
|
self.label_attribute.SetForegroundColour("#008000")
|
self.timer.Stop()
|
self.timer.StartOnce(20000)
|
|
def __get_code(self):
|
self.__ocr_code()
|
code = self.edit_code.GetValue()
|
if code is None or len(code) != 6:
|
raise Exception("请填写正确的代码")
|
return code
|
|
def add_black(self, event):
|
try:
|
code = self.__get_code()
|
print("加入黑名单", code)
|
self.__request([code], 201)
|
self.show_info(f"{code}加入黑名单成功")
|
self.edit_code.SetValue("")
|
except Exception as e:
|
self.show_warning(str(e))
|
return
|
|
def add_white(self, event):
|
try:
|
code = self.__get_code()
|
print("加入白名单", code)
|
result = self.__request([code], 202)
|
result = json.loads(result)
|
if result["code"] == 0:
|
self.show_info(f"{code}加入白名单成功")
|
else:
|
self.show_warning(f"加入失败:{result['msg']}")
|
self.edit_code.SetValue("")
|
except Exception as e:
|
self.show_warning(str(e))
|
return
|
|
def add_pause_buy(self, event):
|
try:
|
code = self.__get_code()
|
print("加入暂不买", code)
|
self.__request([code], 411)
|
self.show_info(f"{code}加入暂不买名单成功")
|
self.edit_code.SetValue("")
|
except Exception as e:
|
self.show_warning(str(e))
|
return
|
|
def add_want(self, event):
|
try:
|
code = self.__get_code()
|
print("加入想要买", code)
|
self.__request([code], 401)
|
self.show_info(f"{code}加入想要买名单成功")
|
self.edit_code.SetValue("")
|
except Exception as e:
|
self.show_warning(str(e))
|
return
|
|
def remove_from_black(self, event):
|
try:
|
code = self.__get_code()
|
print("移除黑名单", code)
|
self.__request([code], 203)
|
self.show_info(f"{code}移除黑名单成功")
|
self.edit_code.SetValue("")
|
except Exception as e:
|
self.show_warning(str(e))
|
return
|
|
def remove_from_pause_buy(self, event):
|
try:
|
code = self.__get_code()
|
print("移除暂不买名单", code)
|
self.__request([code], 412)
|
self.show_info(f"{code}移除暂不买成功")
|
self.edit_code.SetValue("")
|
except Exception as e:
|
self.show_warning(str(e))
|
return
|
|
def remove_from_white(self, event):
|
try:
|
code = self.__get_code()
|
print("移除白名单", code)
|
self.__request([code], 204)
|
self.show_info(f"{code}移除白名单成功")
|
self.edit_code.SetValue("")
|
except Exception as e:
|
self.show_warning(str(e))
|
return
|
|
def remove_from_want(self, event):
|
try:
|
code = self.__get_code()
|
print("移除想要买名单", code)
|
self.__request([code], 402)
|
self.show_info(f"{code}移除想要买名单成功")
|
self.edit_code.SetValue("")
|
except Exception as e:
|
self.show_warning(str(e))
|
return
|
|
def cancel_buy(self, event):
|
try:
|
code = self.__get_code()
|
print("撤单", code)
|
result = self.__request_cancel_buy(code)
|
result = json.loads(result)
|
if result["code"] == 0:
|
self.show_info(f"{code}撤单上报成功")
|
else:
|
self.show_warning(result.get("msg"))
|
self.edit_code.SetValue("")
|
except Exception as e:
|
self.show_warning(str(e))
|
return
|
|
def show_list(self, event, title, type):
|
try:
|
result = self.__request_list(type)
|
result = json.loads(result)
|
self.__show_list(title, result["data"])
|
except Exception as e:
|
show_warning(str(e), None)
|
|
def __show_list(self, title, datas):
|
st = ""
|
for i in range(0, len(datas)):
|
st += datas[i]
|
if i % 2 == 1 and i != len(datas) - 1:
|
st += "\n"
|
elif i != len(datas) - 1:
|
st += " , "
|
|
toastone = wx.MessageDialog(None, st, title)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
toastone.Destroy()
|
|
def __bind_hot_keys(self):
|
# 快捷键
|
setting_ = wx.Menu()
|
m_black = wx.MenuItem(setting_, id=101, text='&E', kind=wx.ITEM_NORMAL)
|
m_white = wx.MenuItem(setting_, id=102, text='&E', kind=wx.ITEM_NORMAL)
|
self.Bind(wx.EVT_MENU, self.add_black, m_black)
|
self.Bind(wx.EVT_MENU, self.add_white, m_white)
|
entries = [wx.AcceleratorEntry() for i in range(2)]
|
entries[0].Set(wx.ACCEL_CTRL, wx.WXK_F4, 101)
|
entries[1].Set(wx.ACCEL_CTRL, wx.WXK_F5, 102)
|
accel = wx.AcceleratorTable(entries)
|
self.SetAcceleratorTable(accel)
|
|
def __init_data(self):
|
auto_click = setting.is_ths_auto_click()
|
if auto_click:
|
self.check_auto_click.SetValue(True)
|
else:
|
self.check_auto_click.SetValue(False)
|
|
auto_refresh = setting.is_ths_trade_auto_refresh()
|
if auto_refresh:
|
self.check_auto_refresh.SetValue(True)
|
else:
|
self.check_auto_refresh.SetValue(False)
|
|
auto_focus = setting.get_float_frame_auto_focus()
|
if auto_focus:
|
self.check_focus.SetValue(True)
|
else:
|
self.check_focus.SetValue(False)
|
|
trade_quick_key = setting.get_float_frame_trade_quick_key()
|
if trade_quick_key:
|
self.check_trade_quick_key.SetValue(True)
|
else:
|
self.check_trade_quick_key.SetValue(False)
|
|
self.__init_trade_state()
|
self.__init_trade_mode()
|
|
def __init_trade_state(self):
|
# 获取交易状态
|
try:
|
result = self.__request_buy_state()
|
result = json.loads(result)
|
if result["code"] == 0:
|
if result["data"]["can_buy"]:
|
self.btn_open_buy.SetLabelText("开启交易*")
|
self.btn_close_buy.SetLabelText("关闭交易")
|
else:
|
self.btn_open_buy.SetLabelText("开启交易")
|
self.btn_close_buy.SetLabelText("关闭交易*")
|
except:
|
pass
|
|
def __init_trade_mode(self):
|
# 获取交易模式
|
try:
|
result = self.__request_buy_mode()
|
result = json.loads(result)
|
if result["code"] == 0:
|
if result["data"]["mode"] == 0:
|
self.btn_buy_mode_want.SetLabelText("仅买想买")
|
self.btn_buy_mode_all.SetLabelText("全部都买*")
|
else:
|
self.btn_buy_mode_want.SetLabelText("仅买想买*")
|
self.btn_buy_mode_all.SetLabelText("全部都买")
|
except:
|
pass
|
|
def __auto_click_check(self, event):
|
if event.Selection:
|
setting.set_ths_auto_click(True)
|
else:
|
setting.set_ths_auto_click(False)
|
|
def __auto_refresh_check(self, event):
|
if event.Selection:
|
setting.set_ths_trade_auto_refresh(True)
|
else:
|
setting.set_ths_trade_auto_refresh(False)
|
|
def __auto_focus_check(self, event):
|
if event.Selection:
|
setting.set_float_frame_auto_focus(True)
|
else:
|
setting.set_float_frame_auto_focus(False)
|
|
def __trade_quick_key_check(self, event):
|
if event.Selection:
|
setting.set_float_frame_trade_quick_key(True)
|
else:
|
setting.set_float_frame_trade_quick_key(False)
|
|
def __request(self, codes, type):
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
data = {"type": type, "data": {"codes": codes}}
|
client.send(json.dumps(data).encode("utf-8"))
|
result = client.recv(1024)
|
client.close()
|
return result.decode("gbk")
|
|
def __request_list(self, type):
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
data = {"type": type, "data": {}}
|
client.send(json.dumps(data).encode("utf-8"))
|
# 读取内容
|
result = client.recv(10240)
|
client.close()
|
return result.decode("gbk")
|
|
def __request_buy(self, is_open):
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
data = {"type": 501, "data": {"open": is_open}}
|
client.send(json.dumps(data).encode("utf-8"))
|
# 读取内容
|
result = client.recv(10240)
|
client.close()
|
return result.decode("gbk")
|
|
# 获取买入状态
|
def __request_buy_state(self):
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
data = {"type": 502, "data": {}}
|
client.send(json.dumps(data).encode("utf-8"))
|
# 读取内容
|
result = client.recv(1024)
|
client.close()
|
return result.decode("gbk")
|
|
# 获取买入模式
|
|
def __request_buy_mode(self):
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
data = {"type": 504, "data": {}}
|
client.send(json.dumps(data).encode("utf-8"))
|
# 读取内容
|
result = client.recv(1024)
|
client.close()
|
return result.decode("gbk")
|
|
def __request_set_buy_mode(self, mode):
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
data = {"type": 503, "data": {"mode": mode}}
|
client.send(json.dumps(data).encode("utf-8"))
|
# 读取内容
|
result = client.recv(1024)
|
client.close()
|
return result.decode("gbk")
|
|
# 查询是否可以撤单
|
def __request_can_cancel_buy(self, code):
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
data = {"type": 420, "data": {"codes": [code]}}
|
client.send(json.dumps(data).encode("utf-8"))
|
# 读取内容
|
result = client.recv(1024)
|
client.close()
|
return result.decode("gbk")
|
|
# 撤单
|
def __request_cancel_buy(self, code):
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
data = {"type": 80, "data": {"code": code}}
|
client.send(json.dumps(data).encode("utf-8"))
|
# 读取内容
|
result = client.recv(1024)
|
client.close()
|
return result.decode("gbk")
|
|
def __set_trade_mode(self, event, mode):
|
try:
|
result = self.__request_set_buy_mode(mode)
|
result = json.loads(result)
|
if result["code"] != 0:
|
raise Exception(result["msg"])
|
self.__init_trade_mode()
|
except Exception as e:
|
show_warning(str(e), None)
|
|
def __open_buy(self, event):
|
def open_buy(sure):
|
if sure:
|
try:
|
result = self.__request_buy(True)
|
msg = json.loads(result)["msg"]
|
show_info(msg, None)
|
self.__init_trade_state()
|
except Exception as e:
|
show_warning(str(e), None)
|
|
show_sure("是否开启交易", open_buy)
|
|
def __close_buy(self, event):
|
def close_buy(sure):
|
if sure:
|
try:
|
result = self.__request_buy(False)
|
msg = json.loads(result)["msg"]
|
show_info(msg, None)
|
self.__init_trade_state()
|
except Exception as e:
|
show_warning(str(e), None)
|
|
show_sure("是否关闭交易", close_buy)
|
|
def __request_attribute(self, code):
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, SOCKET_PORT) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
data = {"type": 430, "data": {"code": code}}
|
client.send(json.dumps(data).encode("utf-8"))
|
result = client.recv(1024)
|
client.close()
|
return result.decode("gbk")
|
|
@classmethod
|
def focus(cls, hwnd, code):
|
try:
|
int_buffer = array.array("L", [0])
|
char_buffer = array.array('b',
|
json.dumps({"type": "set_code", "data": {"code": code}}).encode())
|
int_buffer_address = int_buffer.buffer_info()[0]
|
char_buffer_address, char_buffer_size = char_buffer.buffer_info()
|
copy_struct = struct.pack("PLP", int_buffer_address, char_buffer_size, char_buffer_address)
|
win32gui.SendMessage(hwnd, win32con.WM_COPYDATA, None, copy_struct)
|
|
# pythoncom.CoInitialize()
|
# shell = client.Dispatch("WScript.Shell")
|
# shell.SendKeys('%')
|
# # win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
|
# win32gui.SetForegroundWindow(hwnd)
|
# win32gui.PumpMessages()
|
# win32gui.SetFocus(hwnd)
|
# lib = CDLL("D:/workspace/GP/dll/Dll/x64/Debug/Dll.dll")
|
# win32gui.SendMessage(hwnd, win32con.WM_SETFOCUS, 0, 0)
|
# print("执行结果", lib.focus(hwnd))
|
|
# remote_thread = win32process.GetWindowThreadProcessId(hwnd)[0]
|
# tid = win32api.GetCurrentThreadId()
|
# win32process.AttachThreadInput(tid, remote_thread, True)
|
# prev_handle = win32gui.SetFocus(hwnd)
|
# print("之前的焦点句柄", prev_handle,hwnd)
|
except Exception as e:
|
logging.exception(e)
|
pass
|
|
def is_admin(self):
|
try:
|
return ctypes.windll.shell32.IsUserAnAdmin()
|
except:
|
return False
|
|
# 设置代码,请求代码属性
|
@classmethod
|
def setCode(cls, code):
|
# 获取代码属性描述
|
# auto_focus = setting.get_float_frame_auto_focus()
|
if True:
|
# 窗口显示在最前面
|
hwnds = win32_util.search_window("悬浮盯盘")
|
if hwnds:
|
print("句柄", hwnds[0])
|
time.sleep(0.1)
|
wx.CallAfter(lambda: cls.focus(hwnds[0], code))
|
return
|
|
# wx.CallAfter(lambda: self.SetFocus())
|
|
# try:
|
# result = self.__request_attribute(code)
|
# result = json.loads(result)
|
# if result['code'] == 0:
|
# code_info = result['data']['code_info']
|
# desc = f"{code_info[1]} {code_info[0]} {result['data']['desc']}"
|
# wx.CallAfter(lambda: self.label_attribute.SetLabelText(desc))
|
# except Exception as e:
|
# wx.CallAfter(lambda: self.label_attribute.SetLabelText(str(e)))
|
|
def OnExit(self, e):
|
try:
|
setting.set_float_watch_window_info((self.Position[0], self.Position[1], self.Size[0], self.Size[1]))
|
except Exception as e:
|
print("")
|
self.Hide()
|
|
|
class SocketApiUtil:
|
|
@classmethod
|
def __request(cls, data_json):
|
socket_util.encryp_client_params_sign(data_json)
|
client = socket.socket() # 生成socket,连接server
|
ip_port = (constant.SERVER_HOST, 11009) # server地址和端口号(最好是10000以后)
|
client.connect(ip_port)
|
client.send(socket_util.load_header(json.dumps(data_json).encode("utf-8")))
|
result_str, header = socket_util.recv_data(client)
|
return result_str
|
|
@classmethod
|
def get_cost_price(cls, code):
|
result_str = cls.__request({"type": "get_cost_price", "data": {"code": code}})
|
print(result_str)
|
result = json.loads(result_str)
|
if result['code'] == 0:
|
return result['data']['price']
|
raise Exception(result['msg'])
|
|
|
class TickFrame(wx.Frame):
|
mark_lines = {}
|
|
def __init__(self):
|
'''构造函数'''
|
wx.Frame.__init__(self, None, -1, APP_TITLE, style=wx.DEFAULT_FRAME_STYLE,
|
size=(800, 500))
|
# ^ wx.RESIZE_BORDER ^ wx.STAY_ON_TOP
|
# 默认style是下列项的组合:wx.MINIMIZE_BOX | wx.MAXIMIZE_BOX | wx.RESIZE_BORDER | wx.SYSTEM_MENU | wx.CAPTION | wx.CLOSE_BOX | wx.CLIP_CHILDREN
|
self.SetBackgroundColour(wx.Colour(0, 0, 0))
|
win_info = setting.get_tick_window_info()
|
if win_info:
|
self.SetPosition(wx.Point(win_info[0], win_info[1]))
|
self.Size = wx.Size(win_info[2], win_info[3])
|
else:
|
self.Center()
|
|
# 拉取数据线程
|
self.cost_price_threads = {}
|
|
# 以下代码处理图标
|
# if hasattr(sys, "frozen") and getattr(sys, "frozen") == "windows_exe":
|
# exeName = win32api.GetModuleFileName(win32api.GetModuleHandle(None))
|
# icon = wx.Icon(exeName, wx.BITMAP_TYPE_ICO)
|
# else:
|
# icon = wx.Icon(APP_ICON, wx.BITMAP_TYPE_ICO)
|
# self.SetIcon(icon)
|
# 定义窗口关闭
|
self.Bind(wx.EVT_CLOSE, self.OnExit)
|
self.Bind(wx.EVT_SIZE, self.OnResize)
|
self.panels = []
|
self.scroll = None
|
self.col = 1
|
self.ratio = 0.55
|
self.__re_draw()
|
|
self.timer = wx.Timer(self) # 创建定时器
|
self.Bind(wx.EVT_TIMER, self.post_redraw, self.timer) # 绑定一个定时器事件
|
self.last_size = (self.Size[0], self.Size[1])
|
|
# self.scroll.Layout()
|
# self.boxsier.Fit(self.scroll)
|
|
# boxsier.Add(mainBoxsier, 1, wx.EXPAND | wx.ALL, 5)
|
# self.SetSizer(boxsier)
|
# mainBoxsier.Fit(self)
|
if setting.is_stay_on_top():
|
self.WindowStyle = wx.MINIMIZE_BOX | wx.MAXIMIZE_BOX | wx.RESIZE_BORDER | wx.SYSTEM_MENU | wx.CAPTION | wx.CLOSE_BOX | wx.CLIP_CHILDREN | wx.STAY_ON_TOP
|
|
def scrollTo(self, pos):
|
self.scroll.Scroll(0, pos)
|
|
def __re_draw(self):
|
codes_name = juejin_core.GPCodeManager().get_codes_with_names()
|
rows = len(codes_name)
|
if rows % self.col == 0:
|
rows = rows / self.col
|
else:
|
rows = rows / self.col + 1
|
space = 0
|
if self.scroll is None:
|
self.scroll = wx.ScrolledWindow(self, -1, size=(800, 1000))
|
self.boxsier = wx.FlexGridSizer(rows, self.col, space, space)
|
self.scroll.SetSizer(self.boxsier)
|
self.scroll.EnableScrolling(False, True)
|
if self.panels:
|
for p in self.panels:
|
p.Destroy()
|
self.boxsier.Clear()
|
|
pannel_height = round((self.Size[0] - (self.col - 1) * space) / self.col * self.ratio)
|
|
self.scroll.SetScrollbars(1, 1, 0, pannel_height * rows)
|
self.scroll.SetScrollRate(0, pannel_height)
|
global drawManager
|
axes_list = []
|
self.panels = []
|
for i in range(0, len(codes_name)):
|
# pos=(0, i * pannel_height)
|
pannel = wx.Panel(self.scroll, size=(-1, pannel_height))
|
pannel.BackgroundColour = wx.Colour(0, 0, 0)
|
self.panels.append(pannel)
|
self.boxsier.Add(pannel)
|
code = codes_name[i][0]
|
axes = self.__create_canvas(pannel, code, "{}({})".format(codes_name[i][0], codes_name[i][1]),
|
codes_name[i][1],
|
codes_name[i][2])
|
|
if code not in self.cost_price_threads:
|
t1 = self.run_get_cost_price(codes_name[i][0])
|
self.cost_price_threads[code] = t1
|
axes_list.append(axes)
|
|
self.scroll.Layout()
|
# self.boxsier.Fit(self.scroll)
|
#
|
# 初始化数据
|
drawManager = DrawManager(axes_list, codes_name)
|
t1 = threading.Thread(target=lambda: drawManager.init_code_datas())
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
|
@classmethod
|
def set_mouse_data(cls, xdata, ydata, code, axes2):
|
try:
|
if code not in cls.mark_lines:
|
cls.mark_lines[code] = {}
|
if "mouse" not in cls.mark_lines[code]:
|
line_h = axes2.axhline(ydata, linestyle='-', color='white', lw=0.5, zorder=3)
|
line_v = axes2.axvline(xdata, linestyle='-', color='white', lw=0.5, zorder=3)
|
text_h = axes2.text(axes2.get_xlim()[1], ydata, r'', fontsize=10, color='white',
|
verticalalignment="center", bbox=dict(facecolor='white', alpha=0.9), zorder=3)
|
text_v = axes2.text(xdata, axes2.get_ylim()[0], r'', fontsize=10, color='red',
|
verticalalignment='top', horizontalalignment='center',
|
bbox=dict(facecolor='white', alpha=0.9), zorder=3)
|
|
cls.mark_lines[code]["mouse"] = (line_h, line_v, text_h, text_v)
|
line = cls.mark_lines.get(code).get("mouse")
|
x = round(xdata)
|
y = round(ydata, 2)
|
|
line[0].set_ydata(ydata)
|
line[1].set_xdata(xdata)
|
line[2].set_text(f"{y}%")
|
line[3].set_text(f"{tool.trade_time_add_second('09:30:00', x)}")
|
line[2].set_x(axes2.get_xlim()[1] * (1 + 0.005))
|
line[2].set_y(y)
|
line[3].set_x(x)
|
line[3].set_y(axes2.get_ylim()[0] * (1 + 0.02))
|
if y >= 0:
|
line[2].set_color('red')
|
else:
|
line[2].set_color('green')
|
axes2.figure.canvas.draw()
|
except Exception as e:
|
pass
|
|
def __create_canvas(self, pannel, code, title, name, price, close_callback=None):
|
TickDataProcess.clear(code)
|
|
def show_mouse_line(event):
|
# 删除之前的线
|
self.set_mouse_data(event.xdata, event.ydata, code, axes)
|
|
def clear_mouse_line(event):
|
print("clear_mouse_line")
|
if code in self.mark_lines:
|
if self.mark_lines.get(code):
|
line = self.mark_lines.get(code).get("mouse")
|
if line is not None:
|
for l in line:
|
l.remove()
|
self.mark_lines.get(code).pop("mouse")
|
axes.figure.canvas.draw()
|
|
def close_canvas(event):
|
print("关闭", title)
|
close_callback(title)
|
|
dpi = 100
|
width_dpi = self.Size[0] / (dpi * self.col)
|
figure_score = Figure(figsize=(width_dpi, round(width_dpi * (self.ratio), 2)), dpi=dpi)
|
# 设置外边距
|
right_padding_px = 85
|
right = round((self.Size[0] - right_padding_px) / self.Size[0], 4)
|
figure_score.subplots_adjust(left=0.01, bottom=0.15, top=0.92,
|
right=right)
|
# 设置字体颜色
|
plt.rcParams["text.color"] = "red"
|
plt.rcParams["axes.labelcolor"] = "red"
|
# 设置坐标轴数字颜色
|
plt.rcParams["xtick.color"] = "white"
|
plt.rcParams["ytick.color"] = "white"
|
# 设置坐标轴颜色
|
plt.rcParams["axes.edgecolor"] = "firebrick"
|
|
# 解决中文乱码问题
|
plt.rcParams["font.sans-serif"] = ["SimHei"] # 设置字体
|
plt.rcParams["font.serif"] = ["SimHei"]
|
plt.rcParams["axes.unicode_minus"] = False # 该语句解决图像中的“-”负号的乱码问题
|
|
# 测试
|
buttonaxe = plt.axes([0.94, 0.5, 0.1, 0.1])
|
button1 = Button(buttonaxe, '关闭', color='white', hovercolor='yellow')
|
|
axes = figure_score.add_subplot(1, 1, 1)
|
axes.autoscale(True)
|
|
# axes_score.plot(t_score, s_score, 'ro', t_score, s_score, 'k')
|
axes.set_title(title)
|
axes.grid(False)
|
axes.set_xlabel(f'时间({name})')
|
axes.dist = 0
|
|
# axes.set_ylabel(u'价格')
|
# 获取平开价
|
extra = 0 # (tool.get_limit_up_price(price)-decimal.Decimal(price))*decimal.Decimal(0.02)
|
|
axes.patch.set_facecolor('black')
|
figure_score.patch.set_facecolor('black')
|
|
# 设置横坐标9:25-15:00
|
xs_str = ["09:25", "09:30", "10:30", "11:30", "14:00", "15:00"]
|
xs_int = [DrawManager.get_x_time_as_seconds(f"0000-00-00 {x}:00") for x in xs_str]
|
axes.set_xticks(xs_int[1:])
|
axes.set_xticklabels(xs_str[1:])
|
axes.set_xlim([xs_int[0], xs_int[-1]])
|
|
axes2 = axes.twinx()
|
# axes2.grid(color='firebrick', ls='-', lw=0.5)
|
axes2.grid(color='firebrick', ls='-', lw=0.5)
|
# 鼠标在画布移动
|
# axes2.figure.canvas.mpl_connect('motion_notify_event', show_mouse_line)
|
# # 鼠标离开画布
|
# axes2.figure.canvas.mpl_connect('axes_leave_event', clear_mouse_line)
|
|
# 设置纵坐标轴
|
limit_up_price = float(tool.get_limit_up_price(price))
|
max_rate = round((limit_up_price - price) / price, 4) * 100
|
print("涨停最大比例", max_rate)
|
# 设置纵坐标信息
|
# max_rate = 2
|
DrawManager.set_y_info(code, max_rate, axes, axes2, price)
|
|
line = axes2.plot([], [], color='white', linewidth=1)
|
average_line = axes2.plot([], [], color='yellow', linewidth=1, linestyle='-')
|
average_line_1m = axes2.plot([], [], color='yellow', linewidth=1, linestyle='-')
|
# axes2.legend(loc='upper left')
|
cannvas = FigureCanvas(pannel, -1, figure_score)
|
axes2.text(1, 11.5, r'', fontsize=15, color='red')
|
axes2.text(-1, -11.5, r'', fontsize=15, color='red')
|
|
axes2.spines['top'].set_visible(False)
|
axes.spines['top'].set_visible(False)
|
axes2.spines['bottom'].set_visible(True)
|
axes.spines['bottom'].set_visible(False)
|
|
# 中轴线加粗
|
hline = axes2.axhline(0, linestyle='-', color='firebrick', lw=2, zorder=1)
|
|
# 设置坐标轴标记点为黑色
|
axes.tick_params(axis='x', colors='firebrick')
|
axes.tick_params(axis='y', colors='black')
|
axes2.tick_params(axis='x', colors='firebrick')
|
axes2.tick_params(axis='y', colors='black')
|
|
def update_data(i):
|
print("更新数据:", i)
|
|
return axes2, line, average_line_1m, average_line, axes
|
|
def __show_top(self, event):
|
if event.Selection:
|
setting.set_stay_on_top(1)
|
self.WindowStyle = wx.MINIMIZE_BOX | wx.MAXIMIZE_BOX | wx.RESIZE_BORDER | wx.SYSTEM_MENU | wx.CAPTION | wx.CLOSE_BOX | wx.CLIP_CHILDREN | wx.STAY_ON_TOP
|
else:
|
setting.set_stay_on_top(0)
|
self.WindowStyle = wx.MINIMIZE_BOX | wx.MAXIMIZE_BOX | wx.RESIZE_BORDER | wx.SYSTEM_MENU | wx.CAPTION | wx.CLOSE_BOX | wx.CLIP_CHILDREN
|
|
def OnExit(self, e):
|
try:
|
setting.set_tick_window_info((self.Position[0], self.Position[1], self.Size[0], self.Size[1]))
|
except Exception as e:
|
print("")
|
self.Hide()
|
|
def post_redraw(self, evt):
|
|
if abs(self.last_size[0] - self.Size[0]) > 20:
|
print("--------post_redraw--------")
|
self.last_size = (self.Size[0], self.Size[1])
|
self.__re_draw()
|
|
def OnResize(self, e):
|
print("变化后的尺寸", e.Size)
|
# 留出滚动条,留出上边距
|
if self.scroll:
|
self.scroll.Size = (e.Size[0] - 15, e.Size[1] - 60)
|
for p in self.panels:
|
p_height = round(e.Size[0] * (450 / 800))
|
p.Size = (e.Size[0], p_height)
|
self.timer.Stop()
|
self.timer.StartOnce(1000)
|
|
# 降低重绘频率
|
# self.__re_draw()
|
|
def set_codes_success(self):
|
print("设置代码成功回调")
|
p2.send("resub")
|
self.__re_draw()
|
self.timer.Stop()
|
self.timer.StartOnce(1000)
|
self.Size = wx.Size(self.Size[0], self.Size[1] + 10)
|
time.sleep(0.1)
|
self.Size = wx.Size(self.Size[0], self.Size[1] - 10)
|
|
def run_get_cost_price(self, code):
|
def request(code):
|
while True:
|
try:
|
price = SocketApiUtil.get_cost_price(code)
|
pre_price = juejin_core.GPCodeManager().get_pre_prices(code)
|
rate = round((price - pre_price) * 100 / pre_price, 2)
|
DrawManager.set_cost_rate(code, rate)
|
# wx.CallAfter(lambda :)
|
except Exception as e:
|
DrawManager.set_cost_rate(code, None)
|
# wx.CallAfter(lambda: str(e))
|
except:
|
pass
|
finally:
|
time.sleep(3)
|
|
t1 = threading.Thread(target=lambda: request(code), daemon=True)
|
t1.start()
|
return t1
|
|
|
class TickCompareFrame(wx.Frame):
|
mark_lines = {}
|
|
def __init__(self):
|
'''构造函数'''
|
wx.Frame.__init__(self, None, -1, APP_TITLE, style=wx.DEFAULT_FRAME_STYLE,
|
size=(800, 1000))
|
self.SetBackgroundColour(wx.Colour(0, 0, 0))
|
|
self.Center()
|
|
# 拉取数据线程
|
self.cost_price_threads = {}
|
|
# 以下代码处理图标
|
# if hasattr(sys, "frozen") and getattr(sys, "frozen") == "windows_exe":
|
# exeName = win32api.GetModuleFileName(win32api.GetModuleHandle(None))
|
# icon = wx.Icon(exeName, wx.BITMAP_TYPE_ICO)
|
# else:
|
# icon = wx.Icon(APP_ICON, wx.BITMAP_TYPE_ICO)
|
# self.SetIcon(icon)
|
# 定义窗口关闭
|
self.Bind(wx.EVT_CLOSE, self.OnExit)
|
self.Bind(wx.EVT_SIZE, self.OnResize)
|
self.panels = []
|
self.scroll = None
|
self.col = 1
|
self.ratio = 0.55
|
self.__re_draw()
|
|
self.timer = wx.Timer(self) # 创建定时器
|
self.Bind(wx.EVT_TIMER, self.post_redraw, self.timer) # 绑定一个定时器事件
|
self.last_size = (self.Size[0], self.Size[1])
|
|
# 初始化
|
# self.scroll = wx.ScrolledWindow(self, -1, size=(800, 1000))
|
self.boxsier = wx.BoxSizer(wx.VERTICAL)
|
boxSizer1 = wx.BoxSizer(wx.HORIZONTAL)
|
self.edit_code1 = wx.TextCtrl(self, size=(100, 30))
|
self.edit_code2 = wx.TextCtrl(self, size=(100, 30))
|
self.btn_load_data = wx.Button(self, label="加载数据", size=(100, 30))
|
boxSizer1.Add(self.edit_code1)
|
boxSizer1.Add(self.edit_code2)
|
boxSizer1.Add(self.btn_load_data, 1, wx.LEFT | wx.TOP, 2)
|
|
self.boxsier.Add(boxSizer1)
|
|
# self.scroll.EnableScrolling(False, True)
|
|
# self.boxsier.Add(wx.FlexGridSizer(rows, self.col, space, space))
|
|
# self.scroll.SetSizer(self.boxsier)
|
|
# self.scroll.Layout()
|
# self.boxsier.Fit(self.scroll)
|
|
# boxsier.Add(mainBoxsier, 1, wx.EXPAND | wx.ALL, 5)
|
# self.SetSizer(boxsier)
|
# mainBoxsier.Fit(self)
|
if setting.is_stay_on_top():
|
self.WindowStyle = wx.MINIMIZE_BOX | wx.MAXIMIZE_BOX | wx.RESIZE_BORDER | wx.SYSTEM_MENU | wx.CAPTION | wx.CLOSE_BOX | wx.CLIP_CHILDREN | wx.STAY_ON_TOP
|
|
def __re_draw(self):
|
return
|
codes_name = juejin_core.GPCodeManager().get_codes_with_names()
|
rows = len(codes_name)
|
if rows % self.col == 0:
|
rows = rows / self.col
|
else:
|
rows = rows / self.col + 1
|
space = 0
|
if not self.scroll:
|
return
|
|
if self.panels:
|
for p in self.panels:
|
p.Destroy()
|
self.boxsier.Clear()
|
|
pannel_height = round((self.Size[0] - (self.col - 1) * space) / self.col * self.ratio)
|
|
self.scroll.SetScrollbars(1, 1, 0, pannel_height * rows)
|
self.scroll.SetScrollRate(0, pannel_height)
|
global drawManager
|
axes_list = []
|
self.panels = []
|
for i in range(0, len(codes_name)):
|
# pos=(0, i * pannel_height)
|
pannel = wx.Panel(self.scroll, size=(-1, pannel_height))
|
pannel.BackgroundColour = wx.Colour(0, 0, 0)
|
self.panels.append(pannel)
|
self.boxsier.Add(pannel)
|
code = codes_name[i][0]
|
axes = self.__create_canvas(pannel, code, "{}({})".format(codes_name[i][0], codes_name[i][1]),
|
codes_name[i][1],
|
codes_name[i][2])
|
|
if code not in self.cost_price_threads:
|
t1 = self.run_get_cost_price(codes_name[i][0])
|
self.cost_price_threads[code] = t1
|
axes_list.append(axes)
|
|
self.scroll.Layout()
|
# self.boxsier.Fit(self.scroll)
|
#
|
# 初始化数据
|
drawManager = DrawManager(axes_list, codes_name)
|
t1 = threading.Thread(target=lambda: drawManager.init_code_datas())
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
|
@classmethod
|
def set_mouse_data(cls, xdata, ydata, code, axes2):
|
try:
|
if code not in cls.mark_lines:
|
cls.mark_lines[code] = {}
|
if "mouse" not in cls.mark_lines[code]:
|
line_h = axes2.axhline(ydata, linestyle='-', color='white', lw=0.5, zorder=3)
|
line_v = axes2.axvline(xdata, linestyle='-', color='white', lw=0.5, zorder=3)
|
text_h = axes2.text(axes2.get_xlim()[1], ydata, r'', fontsize=10, color='white',
|
verticalalignment="center", bbox=dict(facecolor='white', alpha=0.9), zorder=3)
|
text_v = axes2.text(xdata, axes2.get_ylim()[0], r'', fontsize=10, color='red',
|
verticalalignment='top', horizontalalignment='center',
|
bbox=dict(facecolor='white', alpha=0.9), zorder=3)
|
|
cls.mark_lines[code]["mouse"] = (line_h, line_v, text_h, text_v)
|
line = cls.mark_lines.get(code).get("mouse")
|
x = round(xdata)
|
y = round(ydata, 2)
|
|
line[0].set_ydata(ydata)
|
line[1].set_xdata(xdata)
|
line[2].set_text(f"{y}%")
|
line[3].set_text(f"{tool.trade_time_add_second('09:30:00', x)}")
|
line[2].set_x(axes2.get_xlim()[1] * (1 + 0.005))
|
line[2].set_y(y)
|
line[3].set_x(x)
|
line[3].set_y(axes2.get_ylim()[0] * (1 + 0.02))
|
if y >= 0:
|
line[2].set_color('red')
|
else:
|
line[2].set_color('green')
|
axes2.figure.canvas.draw()
|
except Exception as e:
|
pass
|
|
def __create_canvas(self, pannel, code, title, name, price, close_callback=None):
|
TickDataProcess.clear(code)
|
|
def show_mouse_line(event):
|
# 删除之前的线
|
self.set_mouse_data(event.xdata, event.ydata, code, axes)
|
|
def clear_mouse_line(event):
|
print("clear_mouse_line")
|
if code in self.mark_lines:
|
if self.mark_lines.get(code):
|
line = self.mark_lines.get(code).get("mouse")
|
if line is not None:
|
for l in line:
|
l.remove()
|
self.mark_lines.get(code).pop("mouse")
|
axes.figure.canvas.draw()
|
|
def close_canvas(event):
|
print("关闭", title)
|
close_callback(title)
|
|
dpi = 100
|
width_dpi = self.Size[0] / (dpi * self.col)
|
figure_score = Figure(figsize=(width_dpi, round(width_dpi * (self.ratio), 2)), dpi=dpi)
|
# 设置外边距
|
right_padding_px = 85
|
right = round((self.Size[0] - right_padding_px) / self.Size[0], 4)
|
figure_score.subplots_adjust(left=0.01, bottom=0.15, top=0.92,
|
right=right)
|
# 设置字体颜色
|
plt.rcParams["text.color"] = "red"
|
plt.rcParams["axes.labelcolor"] = "red"
|
# 设置坐标轴数字颜色
|
plt.rcParams["xtick.color"] = "white"
|
plt.rcParams["ytick.color"] = "white"
|
# 设置坐标轴颜色
|
plt.rcParams["axes.edgecolor"] = "firebrick"
|
|
# 解决中文乱码问题
|
plt.rcParams["font.sans-serif"] = ["SimHei"] # 设置字体
|
plt.rcParams["font.serif"] = ["SimHei"]
|
plt.rcParams["axes.unicode_minus"] = False # 该语句解决图像中的“-”负号的乱码问题
|
|
# 测试
|
buttonaxe = plt.axes([0.94, 0.5, 0.1, 0.1])
|
button1 = Button(buttonaxe, '关闭', color='white', hovercolor='yellow')
|
|
axes = figure_score.add_subplot(1, 1, 1)
|
axes.autoscale(True)
|
|
# axes_score.plot(t_score, s_score, 'ro', t_score, s_score, 'k')
|
axes.set_title(title)
|
axes.grid(False)
|
axes.set_xlabel(f'时间({name})')
|
axes.dist = 0
|
|
# axes.set_ylabel(u'价格')
|
# 获取平开价
|
extra = 0 # (tool.get_limit_up_price(price)-decimal.Decimal(price))*decimal.Decimal(0.02)
|
|
axes.patch.set_facecolor('black')
|
figure_score.patch.set_facecolor('black')
|
|
# 设置横坐标9:25-15:00
|
xs_str = ["09:25", "09:30", "10:30", "11:30", "14:00", "15:00"]
|
xs_int = [DrawManager.get_x_time_as_seconds(f"0000-00-00 {x}:00") for x in xs_str]
|
axes.set_xticks(xs_int[1:])
|
axes.set_xticklabels(xs_str[1:])
|
axes.set_xlim([xs_int[0], xs_int[-1]])
|
|
axes2 = axes.twinx()
|
# axes2.grid(color='firebrick', ls='-', lw=0.5)
|
axes2.grid(color='firebrick', ls='-', lw=0.5)
|
# 鼠标在画布移动
|
# axes2.figure.canvas.mpl_connect('motion_notify_event', show_mouse_line)
|
# # 鼠标离开画布
|
# axes2.figure.canvas.mpl_connect('axes_leave_event', clear_mouse_line)
|
|
# 设置纵坐标轴
|
limit_up_price = float(tool.get_limit_up_price(price))
|
max_rate = round((limit_up_price - price) / price, 4) * 100
|
print("涨停最大比例", max_rate)
|
# 设置纵坐标信息
|
# max_rate = 2
|
DrawManager.set_y_info(code, max_rate, axes, axes2, price)
|
|
line = axes2.plot([], [], color='white', linewidth=1)
|
average_line = axes2.plot([], [], color='yellow', linewidth=1, linestyle='-')
|
average_line_1m = axes2.plot([], [], color='yellow', linewidth=1, linestyle='-')
|
# axes2.legend(loc='upper left')
|
cannvas = FigureCanvas(pannel, -1, figure_score)
|
axes2.text(1, 11.5, r'', fontsize=15, color='red')
|
axes2.text(-1, -11.5, r'', fontsize=15, color='red')
|
|
axes2.spines['top'].set_visible(False)
|
axes.spines['top'].set_visible(False)
|
axes2.spines['bottom'].set_visible(True)
|
axes.spines['bottom'].set_visible(False)
|
|
# 中轴线加粗
|
hline = axes2.axhline(0, linestyle='-', color='firebrick', lw=2, zorder=1)
|
|
# 设置坐标轴标记点为黑色
|
axes.tick_params(axis='x', colors='firebrick')
|
axes.tick_params(axis='y', colors='black')
|
axes2.tick_params(axis='x', colors='firebrick')
|
axes2.tick_params(axis='y', colors='black')
|
|
def update_data(i):
|
print("更新数据:", i)
|
|
return axes2, line, average_line_1m, average_line, axes
|
|
def __show_top(self, event):
|
if event.Selection:
|
setting.set_stay_on_top(1)
|
self.WindowStyle = wx.MINIMIZE_BOX | wx.MAXIMIZE_BOX | wx.RESIZE_BORDER | wx.SYSTEM_MENU | wx.CAPTION | wx.CLOSE_BOX | wx.CLIP_CHILDREN | wx.STAY_ON_TOP
|
else:
|
setting.set_stay_on_top(0)
|
self.WindowStyle = wx.MINIMIZE_BOX | wx.MAXIMIZE_BOX | wx.RESIZE_BORDER | wx.SYSTEM_MENU | wx.CAPTION | wx.CLOSE_BOX | wx.CLIP_CHILDREN
|
|
def OnExit(self, e):
|
try:
|
setting.set_tick_window_info((self.Position[0], self.Position[1], self.Size[0], self.Size[1]))
|
except Exception as e:
|
print("")
|
self.Hide()
|
|
def post_redraw(self, evt):
|
|
if abs(self.last_size[0] - self.Size[0]) > 20:
|
print("--------post_redraw--------")
|
self.last_size = (self.Size[0], self.Size[1])
|
self.__re_draw()
|
|
def OnResize(self, e):
|
print("变化后的尺寸", e.Size)
|
# 留出滚动条,留出上边距
|
if self.scroll:
|
self.scroll.Size = (e.Size[0] - 15, e.Size[1] - 60)
|
for p in self.panels:
|
p_height = round(e.Size[0] * (450 / 800))
|
p.Size = (e.Size[0], p_height)
|
self.timer.Stop()
|
self.timer.StartOnce(1000)
|
|
# 降低重绘频率
|
# self.__re_draw()
|
|
def set_codes_success(self):
|
print("设置代码成功回调")
|
p2.send("resub")
|
self.__re_draw()
|
self.timer.Stop()
|
self.timer.StartOnce(1000)
|
self.Size = wx.Size(self.Size[0], self.Size[1] + 10)
|
time.sleep(0.1)
|
self.Size = wx.Size(self.Size[0], self.Size[1] - 10)
|
|
def run_get_cost_price(self, code):
|
def request(code):
|
while True:
|
try:
|
price = SocketApiUtil.get_cost_price(code)
|
pre_price = juejin_core.GPCodeManager().get_pre_prices(code)
|
rate = round((price - pre_price) * 100 / pre_price, 2)
|
DrawManager.set_cost_rate(code, rate)
|
# wx.CallAfter(lambda :)
|
except Exception as e:
|
DrawManager.set_cost_rate(code, None)
|
# wx.CallAfter(lambda: str(e))
|
except:
|
pass
|
finally:
|
time.sleep(3)
|
|
t1 = threading.Thread(target=lambda: request(code), daemon=True)
|
t1.start()
|
return t1
|
|
|
# 绘图管理器
|
class DrawManager:
|
X_RANGE_MINIUTES = 335
|
X_DATA_MINIUTES = 335
|
|
h_lines_dict = {}
|
cost_mark_dict = {}
|
last_max_rate_dict = {}
|
cost_rate_dict = {}
|
sell_points_dict = {}
|
|
# 添加卖点
|
@classmethod
|
def add_sell_point(cls, code, time_str, rate):
|
if code not in cls.sell_points_dict:
|
cls.sell_points_dict[code] = queue.Queue()
|
# 暂时注释掉
|
# cls.sell_points_dict[code].put_nowait((cls.get_x_time_as_seconds(time_str), rate))
|
|
@classmethod
|
def get_x_time_as_seconds(cls, created_at_str):
|
time_ = created_at_str[-8:]
|
if tool.get_time_as_second("13:00:00") > tool.get_time_as_second(time_) > tool.get_time_as_second(
|
"11:30:00"):
|
time_ = "11:30:00"
|
time_s = int(time_.split(":")[0]) * 3600 + int(time_.split(":")[1]) * 60 + int(
|
time_.split(":")[2]) - 9 * 3600 - 60 * 30
|
|
if int(time_.replace(":", "")) > int("11:30:00".replace(":", "")):
|
time_s -= 90 * 60
|
return time_s
|
|
@classmethod
|
def set_cost_rate(cls, code, rate):
|
cls.cost_rate_dict[code] = rate
|
|
def __load_lack_datas(self, code, time_ranges):
|
codeDataManager = code_data_manager.CodeDataManager()
|
day = tool.get_now_date_str()
|
for time_range in time_ranges:
|
results = juejin_core.GPCodeManager().get_history_tick(code, day + " " + time_range[0],
|
day + " " + time_range[1])
|
datas = []
|
for data in results:
|
datas.append(juejin_core.parse_tick(data))
|
# 保存数据
|
last_time = None
|
for data in datas:
|
# 09:25:00之前的数据不保存
|
created_at = data["created_at"].strftime("%Y-%m-%d %H:%M:%S")
|
time_ = created_at[-8:]
|
time_s = int(time_.replace(":", ""))
|
if time_s < int("092500") or time_s > int("150000"):
|
continue
|
if int("113000") < time_s < int("130000"):
|
continue
|
|
# 不是今天的数据不保存
|
if day != created_at[:10]:
|
continue
|
# 每隔15s保存一次
|
if last_time is None or tool.trade_time_sub(time_, last_time) >= 15:
|
last_time = created_at[-8:]
|
codeDataManager.save_data(data)
|
|
def init_code_datas(self):
|
|
global code_datas
|
global max_min_prices
|
codeDataManager = code_data_manager.CodeDataManager()
|
gpCodeManager = juejin_core.GPCodeManager()
|
code_datas = {}
|
max_min_prices = {}
|
codes = gpCodeManager.get_codes()
|
if codes:
|
# 获取当日的最高价最低价
|
res = juejin_core.GPCodeManager().get_min_and_max_price(codes)
|
for data in res:
|
max_min_prices[data[0]] = (data[1], data[2])
|
|
for code in codes:
|
# 清除Y轴最大振幅
|
if code in self.last_max_rate_dict:
|
self.last_max_rate_dict.pop(code)
|
# 加载历史数据
|
code_datas[code] = []
|
old_datas = codeDataManager.get_datas(code)
|
# 获取缺失的数据
|
min_time = tool.get_now_time_str()
|
if int(min_time.replace(":", "")) > int("150000"):
|
min_time = "15:00:00"
|
elif int("113000") < int(min_time.replace(":", "")) < int("130000"):
|
min_time = "11:30:00"
|
|
min_time = tool.trade_time_add_second(min_time, 0 - DrawManager.X_DATA_MINIUTES * 60)
|
# 如果最小时间小于9:25则取9:25
|
if int(min_time.replace(":", "")) < int("092500"):
|
min_time = "09:25:00"
|
|
ranges = codeDataManager.get_lack_datas_time_range(old_datas, min_time)
|
|
latest_time = min_time
|
if old_datas:
|
latest_time = old_datas[-1]['created_at'][-8:]
|
|
now_time_str = tool.get_now_time_str()
|
if tool.trade_time_sub(now_time_str, "15:00:00") > 0:
|
now_time_str = "15:00:00"
|
|
if tool.trade_time_sub(now_time_str, latest_time) > 15:
|
# 加载到当前时间
|
if ranges:
|
ranges.append((ranges[-1][1], tool.get_now_time_str()))
|
else:
|
ranges.append((latest_time, tool.get_now_time_str()))
|
|
if len(ranges) > 0:
|
self.__load_lack_datas(code, ranges)
|
old_datas = codeDataManager.get_datas(code)
|
# old_datas = []
|
if old_datas:
|
code_datas[code].extend(old_datas)
|
# self.update(code, code_datas[code])
|
wx.CallAfter(self.update, code, code_datas[code])
|
|
@classmethod
|
def __format_max_rate(cls, max_rate):
|
PERCENT_RATE = 1
|
line_count = 0
|
if max_rate % PERCENT_RATE < 0.0001:
|
line_count = int(round(max_rate // PERCENT_RATE))
|
else:
|
line_count = int(round(max_rate // PERCENT_RATE)) + 1
|
rate = line_count * PERCENT_RATE
|
# 还未接近涨停多增加PERCENT_RATE
|
if rate + PERCENT_RATE < 9.8:
|
rate += PERCENT_RATE
|
return rate, PERCENT_RATE
|
|
@classmethod
|
def set_y_info(cls, code, max_rate, axes, axes2, price):
|
# 设置y轴数据
|
max_rate, amplitude = cls.__format_max_rate(max_rate)
|
line_count = int(round(max_rate / amplitude))
|
print("line_count", line_count)
|
yticks2 = []
|
for i in range(0, line_count * 2 + 1):
|
if i >= line_count:
|
yticks2.append(0 - round(max_rate * (line_count - i) / line_count, 4))
|
else:
|
yticks2.append(round(max_rate * (i - line_count) / line_count, 4))
|
yticks2_labels = []
|
yticks = []
|
yticks_labels = []
|
if code not in cls.h_lines_dict:
|
cls.h_lines_dict[code] = []
|
|
for line in cls.h_lines_dict[code]:
|
line.remove()
|
cls.h_lines_dict[code].clear()
|
|
for i in range(0, len(yticks2)):
|
# if i % 2 == 0:
|
yticks2_labels.append("{}%".format(abs(round(yticks2[i], 2))))
|
# else:
|
# yticks2_labels.append("")
|
price_ = round((1 + yticks2[i] / 100) * price, 2)
|
yticks.append(price_)
|
if i % 2 == 0 and False:
|
yticks_labels.append("")
|
# yticks_labels.append(round(yticks[i], 2))
|
if i == line_count:
|
hline = axes2.axhline(yticks2[i], linestyle='-', color='firebrick', lw=2)
|
cls.h_lines_dict[code].append(hline)
|
else:
|
# hline = axes2.axhline(yticks2[i], linestyle='-', color='firebrick', lw=1.2)
|
# cls.h_lines_dict[code].append(hline)
|
pass
|
else:
|
# axes2.axhline(yticks2[i], linestyle='-', color='firebrick', lw=0.5)
|
yticks_labels.append("")
|
axes2.set_ylabel(u'')
|
# 设置纵轴的值的范围
|
axes2.set_ylim(0 - max_rate, max_rate)
|
axes.set_ylim(0 - max_rate, max_rate)
|
axes2.set_yticks(yticks2)
|
axes2.set_yticklabels(yticks2_labels)
|
# axes.axhline(0, color='firebrick', linewidth=2)
|
|
# 设置纵坐标数值颜色
|
for i in range(0, line_count):
|
# axes.get_yticklabels()[i].set_color("green")
|
axes2.get_yticklabels()[i].set_color("green")
|
for i in range(line_count, line_count):
|
# axes.get_yticklabels()[i].set_color("white")
|
axes2.get_yticklabels()[i].set_color("white")
|
for i in range(line_count + 1, line_count * 2 + 1):
|
# axes.get_yticklabels()[i].set_color("red")
|
axes2.get_yticklabels()[i].set_color("red")
|
|
@classmethod
|
def mark_code_info(cls, code, max_rate, axes2, cost_rate):
|
## 标记代码相关的信息
|
# 设置成本线
|
if code in cls.cost_mark_dict:
|
cls.cost_mark_dict[code].remove()
|
if cost_rate is None:
|
return
|
mark_x = axes2.get_xlim()
|
mark_y = axes2.get_ylim()
|
if cost_rate > max_rate:
|
scatter_cost = axes2.scatter((mark_x[0] + mark_x[1]) // 2, mark_y[1] - mark_y[1] / 25, c='#159EEC',
|
marker='^', s=50)
|
cls.cost_mark_dict[code] = scatter_cost
|
elif cost_rate < 0 and abs(cost_rate) > max_rate:
|
scatter_cost = axes2.scatter((mark_x[0] + mark_x[1]) // 2, 0 - mark_y[1] + mark_y[1] / 25, c='#159EEC',
|
marker='v', s=50)
|
cls.cost_mark_dict[code] = scatter_cost
|
else:
|
line1 = axes2.axhline(cost_rate, linestyle='--', color='#FF8020', lw=1)
|
cls.cost_mark_dict[code] = line1
|
|
# 更新数据
|
def __update_data(self, code, axes, datas, pre_price, y_max_rate=10.5, cost_rate=None):
|
def on_pick(event):
|
ind = event.ind[0]
|
offset = event.artist.get_offsets()[ind]
|
x = offset[0]
|
y = offset[1]
|
TickFrame.set_mouse_data(x, y, code, axes[4])
|
print(f"Clicked on point ({x}, {y})")
|
|
y_max_rate, a = self.__format_max_rate(y_max_rate)
|
if code not in self.last_max_rate_dict or self.last_max_rate_dict[code] < y_max_rate:
|
self.last_max_rate_dict[code] = y_max_rate
|
# 设置纵坐标的信息
|
self.set_y_info(code, y_max_rate, axes[4], axes[0], pre_price)
|
# 标记代码相关的信息
|
self.mark_code_info(code, self.last_max_rate_dict[code], axes[0], cost_rate)
|
|
# 删除9:30以前的数据
|
for i in range(0, len(datas)):
|
time_ = datas[i]["created_at"][-8:]
|
if int(time_.replace(":", "")) >= int("092600"):
|
datas = datas[i:]
|
break
|
# 取最近14分钟的数据
|
for i in range(len(datas) - 1, -1, -1):
|
time_ = datas[i]["created_at"][-8:]
|
if tool.trade_time_sub(datas[-1]["created_at"][-8:], time_) >= self.X_DATA_MINIUTES * 60:
|
datas = datas[i:]
|
break
|
|
xs = []
|
ys_rate = []
|
ys_average_rate_1m = []
|
ys_average_rate = []
|
for data in datas:
|
xs.append(self.get_x_time_as_seconds(data["created_at"]))
|
ys_rate.append(data["rate"] * 100)
|
|
ys_average_rate_1m.append(data["average_rate"] * 100 + 1.5)
|
ys_average_rate.append(data["average_rate"] * 100)
|
|
xticks = []
|
xticklabels = []
|
# 设置X轴范围为09:30:00 到15:00:00
|
# x轴范围为0-15分钟
|
# end_x = "0000-00-00 " + tool.trade_time_add_second(datas[0]["created_at"][-8:], self.X_RANGE_MINIUTES * 60)
|
# axes[0].set_xlim(self.get_x_time_as_seconds(datas[0]["created_at"]), self.get_x_time_as_seconds(end_x))
|
|
xms = axes[0].get_xlim()
|
yms = axes[0].get_ylim()
|
# 暂时不设置X坐标
|
# step = (int(xms[1]) - int(xms[0])) // 30
|
# for i in range(int(xms[0]), int(xms[1] + 1), step):
|
# xticks.append(i)
|
# xticklabels.append("")
|
# axes[0].set_xticks(xticks)
|
# axes[0].set_xticklabels(xticklabels)
|
|
axes[1][0].set_data(xs, ys_rate)
|
|
point_x = xs[0] + 1
|
point_y = ys_rate[0]
|
|
if code not in self.sell_points_dict:
|
self.sell_points_dict[code] = queue.Queue()
|
if code in self.sell_points_dict:
|
if not self.sell_points_dict[code].empty():
|
item = self.sell_points_dict[code].get_nowait()
|
if item:
|
points = axes[0].scatter(item[0], item[1], s=15, c='green', zorder=2, picker=5)
|
# 设置点击事件
|
axes[0].figure.canvas.mpl_connect('pick_event', on_pick)
|
|
# 不需要一分钟均线
|
# axes[2][0].set_data(xs, ys_average_rate_1m)
|
if axes[3]:
|
axes[3][0].set_data(xs, ys_average_rate)
|
texts = axes[0].texts
|
texts[0].set_text("{}% ".format(round(datas[-1]["rate"] * 100, 2)))
|
if datas[-1]["rate"] > 0:
|
texts[0].set_color('red')
|
elif datas[-1]["rate"] == 0:
|
texts[0].set_color('white')
|
else:
|
texts[0].set_color('green')
|
# texts[1].set_text("{}".format(datas[-1]["created_at"].split(" ")[1]))
|
texts[1].set_text("")
|
texts[0].set_x(xms[1] - 130)
|
texts[0].set_y(yms[1] + yms[1] / 20)
|
texts[1].set_x(xms[0])
|
texts[1].set_y(yms[0] - yms[1] / 9)
|
|
# 删除之前
|
if code in self.lines:
|
for key in self.lines[code]:
|
line = self.lines[code][key]
|
line.remove()
|
self.lines.pop(code)
|
# 暂时不需要 绘制最大最小坐标
|
# line_min = axes[0].axhline(min_rate, linestyle='--', color='yellow', lw=0.5)
|
# line_max = axes[0].axhline(max_rate, linestyle='--', color='yellow', lw=0.5)
|
# self.lines[code] = {"min": line_min, "max": line_max}
|
axes[0].figure.canvas.draw()
|
axes[0].figure.canvas.flush_events()
|
|
def __init__(self, axes_list, codes_info):
|
self.axes_list = axes_list
|
self.codes_info = codes_info
|
self.lines = {}
|
|
def update(self, code, datas):
|
__start_time = time.time()
|
pre_price = juejin_core.GPCodeManager().get_pre_prices(code)
|
# 获取当前的坐标范围
|
max_price = pre_price
|
for d in datas:
|
if abs(d['price'] - pre_price) > abs(max_price - pre_price):
|
max_price = d['price']
|
y_max_rate = round(abs(max_price - pre_price) * 100 / pre_price, 2)
|
# 展示最近的600个
|
# datas = datas[0 - self.X_DATA_MINIUTES * 20:]
|
# 修正量数据
|
last_info = None
|
for i in range(1, len(datas)):
|
# 如果在
|
if not last_info:
|
last_info = datas[i]
|
if int(datas[i]["created_at"][-2:]) - int(datas[i - 1]["created_at"][-2:]) < 0:
|
last_info = datas[i]
|
datas[i]['average_rate_1m'] = last_info['average_rate']
|
datas[0]['average_rate_1m'] = datas[0]['average_rate']
|
|
for i in range(0, len(self.codes_info)):
|
if self.codes_info[i][0] == code:
|
try:
|
self.__update_data(code, self.axes_list[i], datas, pre_price, y_max_rate,
|
self.cost_rate_dict.get(code))
|
except Exception as e:
|
logging.exception(e)
|
# print("绘图花费时间:", time.time() - __start_time)
|
|
|
class mainApp(wx.App):
|
|
def __refresh(self):
|
codes = juejin_core.GPCodeManager().get_codes()
|
last_time = round(time.time())
|
while True:
|
try:
|
code = ths_ocr_util.ocr_ths_code()
|
if not code:
|
time.sleep(0.1)
|
continue
|
global_datas["pipe"].send(json.dumps({"code": code, "type": "code"}))
|
# 1s更新一次
|
if round(time.time()) - last_time > 5:
|
codes = juejin_core.GPCodeManager().get_codes()
|
last_time = round(time.time())
|
for index in range(0, len(codes)):
|
if codes[index] == code:
|
self.frame.scrollTo(index)
|
break
|
except Exception as e:
|
# logging.exception(e)
|
# print(str(e))
|
pass
|
|
time.sleep(0.005)
|
|
def __show_float_frame(self):
|
self.floatFrame.Show()
|
|
def __show_main_frame(self):
|
self.frame.Show()
|
|
def __init_data(self):
|
try:
|
codes = CodesSettingFrame.get_position_codes()
|
if codes:
|
juejin_core.GPCodeManager().set_codes(codes)
|
except:
|
pass
|
|
def OnInit(self):
|
self.SetAppName(APP_TITLE)
|
self.__init_data()
|
self.frame = TickFrame()
|
# self.floatFrame = FloatFrame()
|
global_datas["tickFrame"] = self.frame
|
# global_datas["floatFrame"] = self.floatFrame
|
|
t1 = threading.Thread(target=lambda: self.__refresh())
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
# self.__show_main_frame()
|
|
return True
|
|
|
def recieve_tick(pipe):
|
codeDataManager = code_data_manager.CodeDataManager()
|
while True:
|
data = pipe.recv()
|
if data:
|
_type = data["type"]
|
if _type == 0:
|
# tick数据
|
data = data["data"]
|
code = data["code"]
|
if abs(float(data['rate'])) > 0.4:
|
continue
|
try:
|
if code not in code_datas:
|
code_datas[code] = []
|
except:
|
continue
|
create_time = data['created_at'].strftime("%H:%M:%S")
|
last_time = None
|
if code_datas[code]:
|
if type(code_datas[code][-1]["created_at"]) == str:
|
last_time = code_datas[code][-1]["created_at"].split(" ")[1]
|
else:
|
last_time = code_datas[code][-1]["created_at"].strftime("%H:%M:%S")
|
if tool.trade_time_sub(create_time, "09:30:00") < 0:
|
TickDataProcess.init_origin_price(code, data['price'], data['rate'] * 100)
|
else:
|
if TickDataProcess.process_tick_data(code, create_time, data['price'], data['rate'] * 100):
|
drawManager.add_sell_point(code, create_time, data['rate'] * 100)
|
# 如果相差15s就添加进去
|
if last_time is None or tool.trade_time_sub(create_time, last_time) >= 15:
|
code_datas[code].append(data)
|
codeDataManager.save_data(data)
|
else:
|
code_datas[code][-1]['rate'] = data['rate']
|
code_datas[code][-1]['price'] = data['price']
|
code_datas[code][-1]['average_price'] = data['average_price']
|
code_datas[code][-1]['average_rate'] = data['average_rate']
|
# 更新数据
|
try:
|
drawManager.update(code, code_datas[code])
|
# print("接受到的tick数据:", data)
|
except:
|
pass
|
|
|
def ths_auto_click():
|
hwnd = ths_util.get_ths_second_screen_menu_hwnd()
|
while True:
|
try:
|
if hwnd is None or not win32gui.IsWindowVisible(hwnd):
|
# print("未找到同花顺副屏句柄")
|
hwnd = ths_util.get_ths_second_screen_menu_hwnd()
|
|
if hwnd is None:
|
continue
|
if not setting.is_ths_auto_click():
|
continue
|
ps = setting.get_ths_auto_click_positions()
|
if not ps:
|
continue
|
ps_new = []
|
for p in ps:
|
p = eval(p)
|
ps_new.append(p)
|
space = setting.get_ths_auto_click_time_space()
|
if space is None:
|
space = 500
|
ths_util.betch_click(hwnd, ps_new, round(space / 1000, 4))
|
except Exception as e:
|
pass
|
finally:
|
time.sleep(0.02)
|
|
|
def ths_auto_refresh():
|
hwnd = ths_util.get_trade_refesh_hwnd()
|
count = 0
|
while True:
|
count += 1
|
if count > 10:
|
count = 0
|
hwnd = ths_util.get_trade_refesh_hwnd()
|
try:
|
if hwnd is None:
|
continue
|
# 测试
|
if not setting.is_ths_trade_auto_refresh():
|
continue
|
rect = win32gui.GetWindowRect(hwnd)
|
win32_util.visual_click(hwnd, (160, (rect[3] - rect[1]) // 2))
|
time_space = setting.get_ths_auto_refresh_time_space()
|
if time_space is None:
|
time_space = 500
|
time.sleep(round(time_space / 1000, 4))
|
except Exception as e:
|
pass
|
finally:
|
time.sleep(0.02)
|
|
|
if __name__ == "__main__1":
|
print(webview.WebView.IsBackendAvailable(webview.WebViewBackendEdge))
|
|
global_datas = {
|
|
}
|
|
|
# 接受来自其他进程的数据
|
def recieve_data(pipe):
|
while True:
|
try:
|
value = pipe.recv()
|
data = json.loads(value)
|
print("接受到数据:", data)
|
type_ = data["type"]
|
if type_ == "codes_setting":
|
wx.CallAfter(lambda: CodesSettingFrame(data["pos"], global_datas["tickFrame"].set_codes_success).Show())
|
elif type_ == "juejin_setting":
|
wx.CallAfter(lambda: JueJinSettingFrame(data["pos"]).Show())
|
elif type_ == "manage_ths_pos":
|
wx.CallAfter(lambda: THSPositionSettingFrame(data["pos"]).Show())
|
elif type_ == "show_float_callback":
|
wx.CallAfter(lambda: global_datas["floatFrame"].Show())
|
elif type_ == "show_main_callback":
|
wx.CallAfter(lambda: global_datas["tickFrame"].Show())
|
elif type_ == "set_code":
|
code = data["code"]
|
t1 = threading.Thread(target=FloatFrame.setCode(code))
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
elif type_ == "juejin_tick_download":
|
wx.CallAfter(lambda: JueJinTickDataDownloadFrame().Show())
|
elif type_ == "exit":
|
try:
|
jueJinProcess.terminate()
|
except:
|
pass
|
wx.CallAfter(lambda: sys.exit())
|
except Exception as e:
|
logging.exception(e)
|
|
|
def run(pipe):
|
global_datas["pipe"] = pipe
|
global p2
|
p1, p2 = multiprocessing.Pipe()
|
global jueJinProcess
|
jueJinProcess = multiprocessing.Process(target=juejin_core.run, args=(p1,))
|
jueJinProcess.start()
|
|
t1 = threading.Thread(target=lambda: recieve_data(pipe))
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
|
t1 = threading.Thread(target=lambda: recieve_tick(p2))
|
# 后台运行
|
t1.setDaemon(True)
|
t1.start()
|
|
t2 = threading.Thread(target=lambda: ths_auto_click())
|
# 后台运行
|
t2.setDaemon(True)
|
t2.start()
|
|
t3 = threading.Thread(target=lambda: ths_auto_refresh())
|
# 后台运行
|
t3.setDaemon(True)
|
t3.start()
|
app = mainApp()
|
global_datas["app"] = app
|
app.MainLoop()
|
|
|
if __name__ == "__main__":
|
app = mainApp()
|
app.MainLoop()
|