# 悬浮框
|
import json
|
import logging
|
import os
|
import sys
|
import threading
|
import time
|
|
import requests
|
import schedule
|
import wx
|
|
from utils import tool
|
import kpl_api
|
|
|
def show_warning(content, click):
|
toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_WARNING)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
if click is not None:
|
click()
|
toastone.Destroy()
|
|
|
def show_info(content, click):
|
toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION)
|
if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮
|
click()
|
toastone.Destroy()
|
|
|
class MyLogHandler(logging.Handler):
|
|
def __init__(self, obj):
|
logging.Handler.__init__(self)
|
self.setLevel(logging.DEBUG)
|
self.Object = obj
|
|
def emit(self, record):
|
if record.levelno:
|
if len(self.Object.GetValue()) > 1024:
|
self.Object.SetValue('')
|
|
tstr = time.strftime('%Y-%m-%d_%H:%M:%S')
|
self.Object.AppendText("[%s][%s] %s\n" % (tstr, record.levelname, record.getMessage()))
|
|
|
class KPLDataCaptureFrame(wx.Frame):
|
|
def __create_item(self, name, checked=True, trade_time=True, circulate=True):
|
btn = wx.Button(self, label=f"{'开启' if circulate else ''}{name}", size=(120, 28))
|
check = wx.CheckBox(self, size=(-1, -1))
|
if checked:
|
check.SetValue(True)
|
else:
|
check.SetValue(False)
|
label = wx.StaticText(self, label=f"{'是否交易时间采集' if trade_time else ''}")
|
bs1 = wx.BoxSizer(wx.HORIZONTAL)
|
bs1.Add(label)
|
bs1.Add(check, 0, wx.LEFT, 10)
|
bs2 = wx.BoxSizer(wx.VERTICAL)
|
bs2.Add(bs1)
|
bs2.Add(btn)
|
if not trade_time:
|
check.Hide()
|
return bs2, btn, check
|
|
# 初始化采集线程
|
def __init_capture_threads(self):
|
def create_thread(target):
|
t1 = threading.Thread(target=target)
|
t1.setDaemon(True)
|
t1.start()
|
|
def create_circulate_task(target):
|
while True:
|
try:
|
target()
|
except Exception as e:
|
print(str(e))
|
time.sleep(3)
|
|
create_thread(lambda: create_circulate_task(self.__exec_limit_up))
|
time.sleep(0.5)
|
create_thread(lambda: create_circulate_task(self.__exec_open_limit_up))
|
time.sleep(0.5)
|
create_thread(lambda: create_circulate_task(self.__exec_limit_down))
|
time.sleep(0.5)
|
create_thread(lambda: create_circulate_task(self.__exec_ever_limit_down))
|
time.sleep(0.5)
|
create_thread(lambda: create_circulate_task(self.__exec_fengkou))
|
time.sleep(0.5)
|
create_thread(lambda: create_circulate_task(self.__exec_fengkou_best))
|
time.sleep(0.5)
|
create_thread(lambda: create_circulate_task(self.__exec_fengxiang))
|
time.sleep(0.5)
|
create_thread(lambda: create_circulate_task(self.__exec_industry_rank))
|
time.sleep(0.5)
|
create_thread(lambda: create_circulate_task(self.__exec_jingxuan_rank))
|
|
def __init__(self):
|
self.__init_data()
|
wx.Frame.__init__(self, None, -1, "开盘啦数据采集客户端",
|
style=wx.CAPTION ^ wx.MINIMIZE_BOX ^ wx.CLOSE_BOX ^ wx.STAY_ON_TOP,
|
size=(800, 400))
|
self.SetBackgroundColour(wx.Colour(224, 224, 224))
|
self.Bind(wx.EVT_CLOSE, self.OnExit)
|
boxsier = wx.FlexGridSizer(5, 2, 10, 20)
|
self.item_bidding = self.__create_item("竞价采集", trade_time=False, circulate=False)
|
boxsier.Add(self.item_bidding[0], 0, wx.TOP, 5)
|
|
self.items = []
|
names = ["涨停采集", "炸板采集", "跌停采集", "曾跌停采集", "市场风口采集", "最强风口采集", "风向标采集", "行业涨幅", "精选流入"]
|
for name in names:
|
self.items.append(self.__create_item(name))
|
boxsier.Add(self.items[-1][0], 0, wx.TOP, 5)
|
|
# 绑定事件
|
self.item_bidding[1].Bind(wx.EVT_BUTTON, self.__capture_bidding)
|
|
for i in range(0, len(names)):
|
self.items[i][1].Bind(wx.EVT_BUTTON, lambda e, x=i: self.__capture_btn_click(x, e))
|
self.items[i][2].Bind(wx.EVT_CHECKBOX, lambda e, x=i: self.__capture_btn_check(x, e))
|
|
root_boxsier = wx.BoxSizer(wx.HORIZONTAL)
|
left_boxsier = wx.BoxSizer(wx.VERTICAL)
|
left_boxsier.Add(boxsier)
|
whole_boxsier = wx.BoxSizer(wx.HORIZONTAL)
|
|
self.start_all_btn = wx.Button(self, label="开启全部任务", size=(120, 28))
|
self.start_all_btn.SetForegroundColour("#008040")
|
self.start_all_btn.Bind(wx.EVT_BUTTON, self.__start_all_task)
|
|
self.stop_all_btn = wx.Button(self, label="关闭全部任务", size=(120, 28))
|
self.stop_all_btn.SetForegroundColour("#FF0000")
|
self.stop_all_btn.Bind(wx.EVT_BUTTON, self.__stop_all_task)
|
|
whole_boxsier.Add(self.stop_all_btn)
|
whole_boxsier.Add(self.start_all_btn, 1, wx.LEFT, 25)
|
left_boxsier.Add(whole_boxsier, 1, wx.TOP, 30)
|
root_boxsier.Add(left_boxsier, 1, wx.LEFT, 10)
|
self.text_log = wx.TextCtrl(parent=self, size=(500, self.Size[1]), style=wx.TE_MULTILINE)
|
self.text_log.SetEditable(False)
|
self.text_log.SetBackgroundColour("#EEE")
|
self.text_log.SetForegroundColour("#333")
|
|
handler = MyLogHandler(self.text_log)
|
self.logger = logging.getLogger(__name__)
|
self.logger.addHandler(handler)
|
self.logger.setLevel(logging.DEBUG)
|
|
root_boxsier.Add(self.text_log, 1, wx.LEFT, 10)
|
self.SetSizer(root_boxsier)
|
|
self.timer = wx.Timer(self) # 创建定时器
|
# self.Bind(wx.EVT_TIMER, self.clear_msg, self.timer)
|
|
self.logger.info("初始化成功")
|
|
self.__init_capture_threads()
|
|
# 开启定时器,在9点26时读取竞价数据
|
|
def schedule_fun():
|
self.__capture_bidding(None)
|
|
schedule.every().day.at("09:26:00").do(schedule_fun) #lambda: self.__capture_bidding(None)
|
t1 = threading.Thread(target=self.run_schedule)
|
t1.setDaemon(True)
|
t1.start()
|
|
# 自动开启采集
|
self.__start_all_task(None)
|
|
def run_schedule(self):
|
while True:
|
schedule.run_pending()
|
|
def __init_data(self):
|
self.capture_status = {}
|
self.keys = ["limit_up", "open_limit_up", "limit_down", "ever_limit_down", "feng_kou", "best_feng_kou",
|
"feng_xiang", "industry_rank", "jingxuan_rank"]
|
for key in self.keys:
|
self.capture_status[key] = [False, True]
|
|
def __set_btn_text(self, controls):
|
text = controls[1].GetLabelText()
|
if text.find("开启") == 0:
|
text = text.replace("开启", "关闭")
|
else:
|
text = text.replace("关闭", "开启")
|
controls[1].SetLabelText(text)
|
|
def __capture_bidding(self, event):
|
results = kpl_api.daBanList(kpl_api.DABAN_TYPE_BIDDING)
|
result = json.loads(results)
|
self.logger.info(f"获取到竞价数据:{len(result['list'])}")
|
self.__upload_data("biddings", result)
|
# show_info("采集成功", None)
|
|
def __capture_btn_click(self, key_index, event):
|
self.__set_btn_text(self.items[key_index])
|
self.capture_status[self.keys[key_index]][0] = not self.capture_status[self.keys[key_index]][0]
|
|
def __capture_btn_check(self, key_index, event):
|
self.capture_status[self.keys[key_index]][1] = not self.capture_status[self.keys[key_index]][1]
|
|
def __can_capture(self, key_index):
|
if not self.capture_status[self.keys[key_index]][0]:
|
return False
|
if self.capture_status[self.keys[key_index]][1] and not tool.is_trade_time():
|
return False
|
return True
|
|
def __exec_limit_up(self):
|
if not self.__can_capture(0):
|
return
|
try:
|
results = kpl_api.daBanList(kpl_api.DABAN_TYPE_LIMIT_UP)
|
result = json.loads(results)
|
self.logger.info(f"涨停代码数量:{len(result['list'])}")
|
self.__upload_data(self.keys[0], result)
|
except:
|
pass
|
|
def __exec_open_limit_up(self):
|
if not self.__can_capture(1):
|
return
|
try:
|
results = kpl_api.daBanList(kpl_api.DABAN_TYPE_OPEN_LIMIT_UP)
|
result = json.loads(results)
|
self.logger.info(f"炸板代码数量:{len(result['list'])}")
|
self.__upload_data(self.keys[1], result)
|
except:
|
pass
|
|
def __exec_limit_down(self):
|
if not self.__can_capture(2):
|
return
|
try:
|
results = kpl_api.daBanList(kpl_api.DABAN_TYPE_LIMIT_DOWN)
|
result = json.loads(results)
|
self.logger.info(f"跌停代码数量:{len(result['list'])}")
|
self.__upload_data(self.keys[2], result)
|
except:
|
pass
|
|
def __exec_ever_limit_down(self):
|
if not self.__can_capture(3):
|
return
|
try:
|
results = kpl_api.daBanList(kpl_api.DABAN_TYPE_EVER_LIMIT_DOWN)
|
result = json.loads(results)
|
self.logger.info(f"曾经跌停代码数量:{len(result['list'])}")
|
self.__upload_data(self.keys[3], result)
|
except:
|
pass
|
|
def __exec_fengkou(self):
|
if not self.__can_capture(4):
|
return
|
try:
|
results = kpl_api.getFengKouList(tool.get_now_date_str())
|
result = json.loads(results)
|
self.logger.info(f"市场风口代码数量:{len(result['List'])}")
|
self.__upload_data(self.keys[4], result)
|
except:
|
pass
|
|
def __exec_fengkou_best(self):
|
try:
|
if not self.__can_capture(5):
|
return
|
if not os.path.exists("D:/kpl/GetFengKListBest_0_0.log"):
|
return
|
with open("D:/kpl/GetFengKListBest_0_0.log", mode='r', encoding="utf-16") as f:
|
lines = f.readlines()
|
if lines:
|
data = json.loads(lines[0])
|
self.logger.info(f"最强风口代码数量:{data['Count']}")
|
self.__upload_data(self.keys[5], data)
|
except:
|
pass
|
|
def __exec_fengxiang(self):
|
if not self.__can_capture(6):
|
return
|
self.logger.info(f"开始风向标采集")
|
try:
|
results = kpl_api.getFengXiangBiao()
|
result = json.loads(results)
|
self.logger.info(f"风向标代码数量:{len(result['list'])}")
|
self.__upload_data(self.keys[6], result)
|
except Exception as e:
|
self.logger.error(str(e))
|
|
def __exec_industry_rank(self):
|
if not self.__can_capture(7):
|
return
|
try:
|
results = kpl_api.getMarketIndustryRealRankingInfo()
|
result = json.loads(results)
|
self.logger.info(f"行业涨幅排行代码数量:{len(result['list'])}")
|
self.__upload_data(self.keys[7], result)
|
except:
|
pass
|
|
def __exec_jingxuan_rank(self):
|
if not self.__can_capture(8):
|
return
|
try:
|
results = kpl_api.getMarketJingXuanRealRankingInfo()
|
result = json.loads(results)
|
self.logger.info(f"精选流入排行代码数量:{len(result['list'])}")
|
self.__upload_data(self.keys[8], result)
|
except:
|
pass
|
|
|
# 开始所有的任务
|
def __start_all_task(self, event):
|
for i in range(0, len(self.items)):
|
# 是否已经开启
|
if not self.capture_status[self.keys[i]][0]:
|
self.__capture_btn_click(i, None)
|
|
# 开始所有的任务
|
|
def __stop_all_task(self, event):
|
for i in range(0, len(self.items)):
|
# 是否已经开启
|
if self.capture_status[self.keys[i]][0]:
|
self.__capture_btn_click(i, None)
|
|
def __upload_data(self, type, datas):
|
root_data = {
|
"type": type,
|
"data": datas
|
}
|
requests.post("http://192.168.3.252:9004/upload_kpl_data", json.dumps(root_data))
|
|
def OnExit(self, e):
|
sys.exit(0)
|
|
|
class mainApp(wx.App):
|
|
def OnInit(self):
|
self.SetAppName("开盘啦")
|
# 传递代码设置成功的回调
|
self.kpl = KPLDataCaptureFrame()
|
self.kpl.Show()
|
return True
|
|
|
# cd D:\workspace\GP\trade_desk
|
# D:\workspace\GP\trade_desk\dist\env\pk_env\Scripts\pyinstaller.exe kpl/gui.spec
|
#
|
|
if __name__ == "__main__":
|
app = mainApp(redirect=True, filename="logs/output.log")
|
app.MainLoop()
|