import logging
|
import queue
|
import time
|
|
import wx
|
import matplotlib.pyplot as plt
|
from matplotlib.backends.backend_wxagg import FigureCanvasWxAgg as FigureCanvas
|
from matplotlib.figure import Figure
|
|
from utils import tool
|
|
APP_TITLE = "可转债分析"
|
|
|
class JueJinUtil:
|
@classmethod
|
def get_all_convertible_bonds_codes(cls):
|
"""
|
获取所有的可转债代码
|
:return: 所有代码合集[((可转债代码,可转债名称),股票代码)]
|
"""
|
results = JueJinApi.get_codes(8)
|
count = 0
|
|
symbols = []
|
for result in results:
|
if int(tool.get_now_date_str("%Y%m%d")) < int(result['delisted_date'].strftime("%Y%m%d")):
|
count += 1
|
symbols.append(((result['symbol'], result['sec_name']), result['underlying_symbol']))
|
return symbols
|
|
|
# 绘图管理器
|
class DrawManager:
|
X_RANGE_MINIUTES = 335
|
X_DATA_MINIUTES = 335
|
|
h_lines_dict = {}
|
cost_mark_dict = {}
|
last_max_rate_dict = {}
|
cost_rate_dict = {}
|
sell_points_dict = {}
|
|
@classmethod
|
def get_x_time_as_seconds(cls, created_at_str):
|
time_ = created_at_str[-8:]
|
if tool.get_time_as_second("13:00:00") > tool.get_time_as_second(time_) > tool.get_time_as_second(
|
"11:30:00"):
|
time_ = "11:30:00"
|
time_s = int(time_.split(":")[0]) * 3600 + int(time_.split(":")[1]) * 60 + int(
|
time_.split(":")[2]) - 9 * 3600 - 60 * 30
|
|
if int(time_.replace(":", "")) > int("11:30:00".replace(":", "")):
|
time_s -= 90 * 60
|
return time_s
|
|
@classmethod
|
def set_cost_rate(cls, code, rate):
|
cls.cost_rate_dict[code] = rate
|
|
@classmethod
|
def __format_max_rate(cls, max_rate):
|
PERCENT_RATE = 1
|
line_count = 0
|
if max_rate % PERCENT_RATE < 0.0001:
|
line_count = int(round(max_rate // PERCENT_RATE))
|
else:
|
line_count = int(round(max_rate // PERCENT_RATE)) + 1
|
rate = line_count * PERCENT_RATE
|
# 还未接近涨停多增加PERCENT_RATE
|
if rate + PERCENT_RATE < 9.8:
|
rate += PERCENT_RATE
|
return rate, PERCENT_RATE
|
|
@classmethod
|
def set_y_info(cls, code, max_rate, axes, axes2, price):
|
# 设置y轴数据
|
max_rate, amplitude = cls.__format_max_rate(max_rate)
|
line_count = int(round(max_rate / amplitude))
|
print("line_count", line_count)
|
yticks2 = []
|
for i in range(0, line_count * 2 + 1):
|
if i >= line_count:
|
yticks2.append(0 - round(max_rate * (line_count - i) / line_count, 4))
|
else:
|
yticks2.append(round(max_rate * (i - line_count) / line_count, 4))
|
yticks2_labels = []
|
yticks = []
|
yticks_labels = []
|
if code not in cls.h_lines_dict:
|
cls.h_lines_dict[code] = []
|
|
for line in cls.h_lines_dict[code]:
|
line.remove()
|
cls.h_lines_dict[code].clear()
|
|
for i in range(0, len(yticks2)):
|
# if i % 2 == 0:
|
yticks2_labels.append("{}%".format(abs(round(yticks2[i], 2))))
|
# else:
|
# yticks2_labels.append("")
|
price_ = round((1 + yticks2[i] / 100) * price, 2)
|
yticks.append(price_)
|
if i % 2 == 0 and False:
|
yticks_labels.append("")
|
# yticks_labels.append(round(yticks[i], 2))
|
if i == line_count:
|
hline = axes2.axhline(yticks2[i], linestyle='-', color='firebrick', lw=2)
|
cls.h_lines_dict[code].append(hline)
|
else:
|
# hline = axes2.axhline(yticks2[i], linestyle='-', color='firebrick', lw=1.2)
|
# cls.h_lines_dict[code].append(hline)
|
pass
|
else:
|
# axes2.axhline(yticks2[i], linestyle='-', color='firebrick', lw=0.5)
|
yticks_labels.append("")
|
axes2.set_ylabel(u'')
|
# 设置纵轴的值的范围
|
axes2.set_ylim(0 - max_rate, max_rate)
|
axes.set_ylim(0 - max_rate, max_rate)
|
axes2.set_yticks(yticks2)
|
axes2.set_yticklabels(yticks2_labels)
|
# axes.axhline(0, color='firebrick', linewidth=2)
|
|
# 设置纵坐标数值颜色
|
for i in range(0, line_count):
|
# axes.get_yticklabels()[i].set_color("green")
|
axes2.get_yticklabels()[i].set_color("green")
|
for i in range(line_count, line_count):
|
# axes.get_yticklabels()[i].set_color("white")
|
axes2.get_yticklabels()[i].set_color("white")
|
for i in range(line_count + 1, line_count * 2 + 1):
|
# axes.get_yticklabels()[i].set_color("red")
|
axes2.get_yticklabels()[i].set_color("red")
|
|
@classmethod
|
def mark_code_info(cls, code, max_rate, axes2, cost_rate):
|
## 标记代码相关的信息
|
# 设置成本线
|
if code in cls.cost_mark_dict:
|
cls.cost_mark_dict[code].remove()
|
if cost_rate is None:
|
return
|
mark_x = axes2.get_xlim()
|
mark_y = axes2.get_ylim()
|
if cost_rate > max_rate:
|
scatter_cost = axes2.scatter((mark_x[0] + mark_x[1]) // 2, mark_y[1] - mark_y[1] / 25, c='#159EEC',
|
marker='^', s=50)
|
cls.cost_mark_dict[code] = scatter_cost
|
elif cost_rate < 0 and abs(cost_rate) > max_rate:
|
scatter_cost = axes2.scatter((mark_x[0] + mark_x[1]) // 2, 0 - mark_y[1] + mark_y[1] / 25, c='#159EEC',
|
marker='v', s=50)
|
cls.cost_mark_dict[code] = scatter_cost
|
else:
|
line1 = axes2.axhline(cost_rate, linestyle='--', color='#FF8020', lw=1)
|
cls.cost_mark_dict[code] = line1
|
|
# 更新数据
|
def __update_data(self, code, axes, datas, pre_price, y_max_rate=10.5, cost_rate=None):
|
def on_pick(event):
|
ind = event.ind[0]
|
offset = event.artist.get_offsets()[ind]
|
x = offset[0]
|
y = offset[1]
|
TickCompareFrame.set_mouse_data(x, y, code, axes[4])
|
print(f"Clicked on point ({x}, {y})")
|
|
y_max_rate, a = self.__format_max_rate(y_max_rate)
|
if code not in self.last_max_rate_dict or self.last_max_rate_dict[code] < y_max_rate:
|
self.last_max_rate_dict[code] = y_max_rate
|
# 设置纵坐标的信息
|
self.set_y_info(code, y_max_rate, axes[4], axes[0], pre_price)
|
# 标记代码相关的信息
|
self.mark_code_info(code, self.last_max_rate_dict[code], axes[0], cost_rate)
|
|
# 删除9:30以前的数据
|
for i in range(0, len(datas)):
|
time_ = datas[i]["created_at"][-8:]
|
if int(time_.replace(":", "")) >= int("092600"):
|
datas = datas[i:]
|
break
|
# 取最近14分钟的数据
|
for i in range(len(datas) - 1, -1, -1):
|
time_ = datas[i]["created_at"][-8:]
|
if tool.trade_time_sub(datas[-1]["created_at"][-8:], time_) >= self.X_DATA_MINIUTES * 60:
|
datas = datas[i:]
|
break
|
|
xs = []
|
ys_rate = []
|
ys_average_rate_1m = []
|
ys_average_rate = []
|
for data in datas:
|
xs.append(self.get_x_time_as_seconds(data["created_at"]))
|
ys_rate.append(data["rate"] * 100)
|
|
ys_average_rate_1m.append(data["average_rate"] * 100 + 1.5)
|
ys_average_rate.append(data["average_rate"] * 100)
|
|
xticks = []
|
xticklabels = []
|
# 设置X轴范围为09:30:00 到15:00:00
|
# x轴范围为0-15分钟
|
# end_x = "0000-00-00 " + tool.trade_time_add_second(datas[0]["created_at"][-8:], self.X_RANGE_MINIUTES * 60)
|
# axes[0].set_xlim(self.get_x_time_as_seconds(datas[0]["created_at"]), self.get_x_time_as_seconds(end_x))
|
|
xms = axes[0].get_xlim()
|
yms = axes[0].get_ylim()
|
# 暂时不设置X坐标
|
# step = (int(xms[1]) - int(xms[0])) // 30
|
# for i in range(int(xms[0]), int(xms[1] + 1), step):
|
# xticks.append(i)
|
# xticklabels.append("")
|
# axes[0].set_xticks(xticks)
|
# axes[0].set_xticklabels(xticklabels)
|
|
axes[1][0].set_data(xs, ys_rate)
|
|
point_x = xs[0] + 1
|
point_y = ys_rate[0]
|
|
if code not in self.sell_points_dict:
|
self.sell_points_dict[code] = queue.Queue()
|
if code in self.sell_points_dict:
|
if not self.sell_points_dict[code].empty():
|
item = self.sell_points_dict[code].get_nowait()
|
if item:
|
points = axes[0].scatter(item[0], item[1], s=15, c='green', zorder=2, picker=5)
|
# 设置点击事件
|
axes[0].figure.canvas.mpl_connect('pick_event', on_pick)
|
|
# 不需要一分钟均线
|
# axes[2][0].set_data(xs, ys_average_rate_1m)
|
if axes[3]:
|
axes[3][0].set_data(xs, ys_average_rate)
|
texts = axes[0].texts
|
texts[0].set_text("{}% ".format(round(datas[-1]["rate"] * 100, 2)))
|
if datas[-1]["rate"] > 0:
|
texts[0].set_color('red')
|
elif datas[-1]["rate"] == 0:
|
texts[0].set_color('white')
|
else:
|
texts[0].set_color('green')
|
# texts[1].set_text("{}".format(datas[-1]["created_at"].split(" ")[1]))
|
texts[1].set_text("")
|
texts[0].set_x(xms[1] - 130)
|
texts[0].set_y(yms[1] + yms[1] / 20)
|
texts[1].set_x(xms[0])
|
texts[1].set_y(yms[0] - yms[1] / 9)
|
|
# 删除之前
|
if code in self.lines:
|
for key in self.lines[code]:
|
line = self.lines[code][key]
|
line.remove()
|
self.lines.pop(code)
|
# 暂时不需要 绘制最大最小坐标
|
# line_min = axes[0].axhline(min_rate, linestyle='--', color='yellow', lw=0.5)
|
# line_max = axes[0].axhline(max_rate, linestyle='--', color='yellow', lw=0.5)
|
# self.lines[code] = {"min": line_min, "max": line_max}
|
axes[0].figure.canvas.draw()
|
axes[0].figure.canvas.flush_events()
|
|
def __init__(self, axes_list, codes_info):
|
self.axes_list = axes_list
|
self.codes_info = codes_info
|
self.lines = {}
|
|
def update(self, code, pre_price, datas):
|
__start_time = time.time()
|
# 获取当前的坐标范围
|
max_price = pre_price
|
for d in datas:
|
if abs(d['price'] - pre_price) > abs(max_price - pre_price):
|
max_price = d['price']
|
y_max_rate = round(abs(max_price - pre_price) * 100 / pre_price, 2)
|
# 展示最近的600个
|
# datas = datas[0 - self.X_DATA_MINIUTES * 20:]
|
# 修正量数据
|
last_info = None
|
for i in range(1, len(datas)):
|
# 如果在
|
if not last_info:
|
last_info = datas[i]
|
if int(datas[i]["created_at"][-2:]) - int(datas[i - 1]["created_at"][-2:]) < 0:
|
last_info = datas[i]
|
datas[i]['average_rate_1m'] = last_info['average_rate']
|
datas[0]['average_rate_1m'] = datas[0]['average_rate']
|
|
for i in range(0, len(self.codes_info)):
|
if self.codes_info[i][0] == code:
|
try:
|
self.__update_data(code, self.axes_list[i], datas, pre_price, y_max_rate,
|
self.cost_rate_dict.get(code))
|
except Exception as e:
|
logging.exception(e)
|
# print("绘图花费时间:", time.time() - __start_time)
|
|
|
class TickCompareFrame(wx.Frame):
|
# 可转债代码数据
|
__cb_datas = []
|
mark_lines = {}
|
|
@classmethod
|
def show_warning(cls, content, click=None):
|
toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_WARNING)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
if click is not None:
|
click()
|
toastone.Destroy()
|
|
@classmethod
|
def show_info(cls, content, click):
|
toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
click()
|
toastone.Destroy()
|
|
def __init__(self):
|
super().__init__(None, -1, APP_TITLE, style=wx.DEFAULT_FRAME_STYLE,
|
size=(800, 1000))
|
self.SetBackgroundColour(wx.Colour(0, 0, 0))
|
|
self.Center()
|
|
# 拉取数据线程
|
self.cost_price_threads = {}
|
# 定义窗口关闭
|
# self.Bind(wx.EVT_SIZE, self.OnResize)
|
self.panels = []
|
self.col = 1
|
self.ratio = 0.55
|
|
self.panel = wx.Panel(self, size=(-1, 50))
|
self.panel.SetBackgroundColour(wx.Colour(255, 255, 255))
|
bs1 = wx.BoxSizer(wx.HORIZONTAL)
|
self.edit_code1 = wx.TextCtrl(self.panel, size=(100, 25))
|
self.btn_load_data = wx.Button(self.panel, label="加载数据", size=(80, 25))
|
bs1.Add(wx.StaticText(self.panel, label="股票代码:"), proportion=0, flag=wx.ALL, border=12)
|
bs1.Add(self.edit_code1, proportion=0, flag=wx.ALL, border=10)
|
bs1.Add(self.btn_load_data, proportion=0, flag=wx.ALL, border=10)
|
self.panel.SetSizer(bs1)
|
|
# 初始化
|
# self.scroll = wx.ScrolledWindow(self, -1, size=(800, 1000))
|
root_boxsier = wx.BoxSizer(wx.VERTICAL)
|
root_boxsier.Add(self.panel, proportion=0, flag=wx.EXPAND)
|
self.panel1 = wx.Panel(self, size=(-1, 400))
|
self.panel2 = wx.Panel(self, size=(-1, 400))
|
self.panel1.SetBackgroundColour(wx.Colour(255, 0, 0))
|
self.panel2.SetBackgroundColour(wx.Colour(0, 255, 0))
|
|
root_boxsier.Add(self.panel1, proportion=0, flag=wx.EXPAND)
|
root_boxsier.Add(self.panel2, proportion=0, flag=wx.EXPAND)
|
self.SetSizer(root_boxsier)
|
self.Layout()
|
|
self.btn_load_data.Bind(wx.EVT_BUTTON, lambda e: self.__load_data(e))
|
|
def __load_data(self, event):
|
code = self.edit_code1.GetValue()
|
# 获取可转债
|
if not self.__cb_datas:
|
self.__cb_datas = JueJinUtil.get_all_convertible_bonds_codes()
|
cb_codes = None
|
for b in self.__cb_datas:
|
if b[1].find(code) >= 0:
|
cb_codes = b
|
break
|
if not cb_codes:
|
self.show_warning("无可转债")
|
return
|
# 获取基础信息
|
base_infos = JueJinApi.get_gp_latest_info([cb_codes[0], cb_codes[1]], fields="symbol,sec_id,sec_name,pre_close")
|
fdatas = []
|
for b in base_infos:
|
# 加载数据,获取tick
|
start_time, end_time = f"{tool.get_now_date_str()} 09:30:00", f"{tool.get_now_date_str()} {tool.get_now_time_str()}"
|
results = JueJinApi.get_history_tick(b['symbol'], start_time, end_time, frequency="tick")
|
datas = []
|
for data in results:
|
data['created_at'] = data["created_at"].strftime("%Y-%m-%d %H:%M:%S")
|
datas.append(juejin_core.parse_tick(data))
|
|
# (代码,代码名称,收盘价)
|
fdatas.append((b['sec_id'], b['sec_name'], b['pre_close'], datas))
|
|
panels = [self.panel1, self.panel2]
|
|
axes_list = []
|
codes_name = []
|
for i in range(len(fdatas)):
|
d = fdatas[i]
|
axes = self.__create_canvas(panels[i], d[0], d[1], d[2], d[3])
|
axes_list.append(axes)
|
codes_name.append(d[1])
|
drawManager = DrawManager(axes_list, codes_name)
|
for d in fdatas:
|
drawManager.update(d[0], d[2], d[3])
|
|
@classmethod
|
def set_mouse_data(cls, xdata, ydata, code, axes2):
|
try:
|
if code not in cls.mark_lines:
|
cls.mark_lines[code] = {}
|
if "mouse" not in cls.mark_lines[code]:
|
line_h = axes2.axhline(ydata, linestyle='-', color='white', lw=0.5, zorder=3)
|
line_v = axes2.axvline(xdata, linestyle='-', color='white', lw=0.5, zorder=3)
|
text_h = axes2.text(axes2.get_xlim()[1], ydata, r'', fontsize=10, color='white',
|
verticalalignment="center", bbox=dict(facecolor='white', alpha=0.9), zorder=3)
|
text_v = axes2.text(xdata, axes2.get_ylim()[0], r'', fontsize=10, color='red',
|
verticalalignment='top', horizontalalignment='center',
|
bbox=dict(facecolor='white', alpha=0.9), zorder=3)
|
|
cls.mark_lines[code]["mouse"] = (line_h, line_v, text_h, text_v)
|
line = cls.mark_lines.get(code).get("mouse")
|
x = round(xdata)
|
y = round(ydata, 2)
|
|
line[0].set_ydata(ydata)
|
line[1].set_xdata(xdata)
|
line[2].set_text(f"{y}%")
|
line[3].set_text(f"{tool.trade_time_add_second('09:30:00', x)}")
|
line[2].set_x(axes2.get_xlim()[1] * (1 + 0.005))
|
line[2].set_y(y)
|
line[3].set_x(x)
|
line[3].set_y(axes2.get_ylim()[0] * (1 + 0.02))
|
if y >= 0:
|
line[2].set_color('red')
|
else:
|
line[2].set_color('green')
|
axes2.figure.canvas.draw()
|
except Exception as e:
|
pass
|
|
def __create_canvas(self, pannel, code, title, price, datas, close_callback=None):
|
def show_mouse_line(event):
|
# 删除之前的线
|
self.set_mouse_data(event.xdata, event.ydata, code, axes)
|
|
def clear_mouse_line(event):
|
print("clear_mouse_line")
|
if code in self.mark_lines:
|
if self.mark_lines.get(code):
|
line = self.mark_lines.get(code).get("mouse")
|
if line is not None:
|
for l in line:
|
l.remove()
|
self.mark_lines.get(code).pop("mouse")
|
axes.figure.canvas.draw()
|
|
def close_canvas(event):
|
print("关闭", title)
|
close_callback(title)
|
|
dpi = 100
|
width_dpi = self.Size[0] / (dpi * self.col)
|
figure_score = Figure(figsize=(width_dpi, round(width_dpi * (self.ratio), 2)), dpi=dpi)
|
# 设置外边距
|
right_padding_px = 85
|
right = round((self.Size[0] - right_padding_px) / self.Size[0], 4)
|
figure_score.subplots_adjust(left=0.01, bottom=0.15, top=0.92,
|
right=right)
|
# 设置字体颜色
|
plt.rcParams["text.color"] = "red"
|
plt.rcParams["axes.labelcolor"] = "red"
|
# 设置坐标轴数字颜色
|
plt.rcParams["xtick.color"] = "white"
|
plt.rcParams["ytick.color"] = "white"
|
# 设置坐标轴颜色
|
plt.rcParams["axes.edgecolor"] = "firebrick"
|
|
# 解决中文乱码问题
|
plt.rcParams["font.sans-serif"] = ["SimHei"] # 设置字体
|
plt.rcParams["font.serif"] = ["SimHei"]
|
plt.rcParams["axes.unicode_minus"] = False # 该语句解决图像中的“-”负号的乱码问题
|
|
axes = figure_score.add_subplot(1, 1, 1)
|
axes.autoscale(True)
|
|
# axes_score.plot(t_score, s_score, 'ro', t_score, s_score, 'k')
|
axes.set_title(title)
|
axes.grid(False)
|
axes.set_xlabel(f'时间({title})')
|
axes.dist = 0
|
|
# axes.set_ylabel(u'价格')
|
# 获取平开价
|
extra = 0 # (tool.get_limit_up_price(price)-decimal.Decimal(price))*decimal.Decimal(0.02)
|
|
axes.patch.set_facecolor('black')
|
figure_score.patch.set_facecolor('black')
|
|
# 设置横坐标9:25-15:00
|
xs_str = ["09:25", "09:30", "10:30", "11:30", "14:00", "15:00"]
|
xs_int = [DrawManager.get_x_time_as_seconds(f"0000-00-00 {x}:00") for x in xs_str]
|
axes.set_xticks(xs_int[1:])
|
axes.set_xticklabels(xs_str[1:])
|
axes.set_xlim([xs_int[0], xs_int[-1]])
|
|
axes2 = axes.twinx()
|
# axes2.grid(color='firebrick', ls='-', lw=0.5)
|
axes2.grid(color='firebrick', ls='-', lw=0.5)
|
# 鼠标在画布移动
|
# axes2.figure.canvas.mpl_connect('motion_notify_event', show_mouse_line)
|
# # 鼠标离开画布
|
# axes2.figure.canvas.mpl_connect('axes_leave_event', clear_mouse_line)
|
|
# 设置纵坐标轴
|
limit_up_price = float(tool.get_limit_up_price(price))
|
max_rate = round((limit_up_price - price) / price, 4) * 100
|
print("涨停最大比例", max_rate)
|
# 设置纵坐标信息
|
# max_rate = 2
|
DrawManager.set_y_info(code, max_rate, axes, axes2, price)
|
|
line = axes2.plot([], [], color='white', linewidth=1)
|
average_line = axes2.plot([], [], color='yellow', linewidth=1, linestyle='-')
|
average_line_1m = axes2.plot([], [], color='yellow', linewidth=1, linestyle='-')
|
# axes2.legend(loc='upper left')
|
cannvas = FigureCanvas(pannel, -1, figure_score)
|
axes2.text(1, 11.5, r'', fontsize=15, color='red')
|
axes2.text(-1, -11.5, r'', fontsize=15, color='red')
|
|
axes2.spines['top'].set_visible(False)
|
axes.spines['top'].set_visible(False)
|
axes2.spines['bottom'].set_visible(True)
|
axes.spines['bottom'].set_visible(False)
|
|
# 中轴线加粗
|
hline = axes2.axhline(0, linestyle='-', color='firebrick', lw=2, zorder=1)
|
|
# 设置坐标轴标记点为黑色
|
axes.tick_params(axis='x', colors='firebrick')
|
axes.tick_params(axis='y', colors='black')
|
axes2.tick_params(axis='x', colors='firebrick')
|
axes2.tick_params(axis='y', colors='black')
|
|
def update_data(i):
|
print("更新数据:", i)
|
|
return axes2, line, average_line_1m, average_line, axes
|
|
def OnResize(self, e):
|
print("变化后的尺寸", e.Size)
|
|
|
class mainApp(wx.App):
|
|
def __show_main_frame(self):
|
self.frame.Show()
|
|
def __init_data(self):
|
try:
|
pass
|
except:
|
pass
|
|
def OnInit(self):
|
self.SetAppName(APP_TITLE)
|
self.__init_data()
|
self.frame = TickCompareFrame()
|
self.__show_main_frame()
|
return True
|
|
|
def run():
|
JueJinUtil.get_all_convertible_bonds_codes()
|
app = mainApp()
|
app.MainLoop()
|