# 悬浮框 import json import logging import os import sys import threading import time import requests import schedule import wx from utils import tool import kpl_api def show_warning(content, click): toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_WARNING) if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮 if click is not None: click() toastone.Destroy() def show_info(content, click): toastone = wx.MessageDialog(None, content, "提示", wx.YES_DEFAULT | wx.ICON_INFORMATION) if toastone.ShowModal() == wx.ID_YES: # 如果点击了提示框的确定按钮 click() toastone.Destroy() class MyLogHandler(logging.Handler): def __init__(self, obj): logging.Handler.__init__(self) self.setLevel(logging.DEBUG) self.Object = obj def emit(self, record): if record.levelno: if len(self.Object.GetValue()) > 1024: self.Object.SetValue('') tstr = time.strftime('%Y-%m-%d_%H:%M:%S') self.Object.AppendText("[%s][%s] %s\n" % (tstr, record.levelname, record.getMessage())) class KPLDataCaptureFrame(wx.Frame): def __create_item(self, name, checked=True, trade_time=True, circulate=True): btn = wx.Button(self, label=f"{'开启' if circulate else ''}{name}", size=(120, 28)) check = wx.CheckBox(self, size=(-1, -1)) if checked: check.SetValue(True) else: check.SetValue(False) label = wx.StaticText(self, label=f"{'是否交易时间采集' if trade_time else ''}") bs1 = wx.BoxSizer(wx.HORIZONTAL) bs1.Add(label) bs1.Add(check, 0, wx.LEFT, 10) bs2 = wx.BoxSizer(wx.VERTICAL) bs2.Add(bs1) bs2.Add(btn) if not trade_time: check.Hide() return bs2, btn, check # 初始化采集线程 def __init_capture_threads(self): def create_thread(target): t1 = threading.Thread(target=target) t1.setDaemon(True) t1.start() def create_circulate_task(target): while True: try: target() except Exception as e: print(str(e)) time.sleep(3) create_thread(lambda: create_circulate_task(self.__exec_limit_up)) time.sleep(0.5) create_thread(lambda: create_circulate_task(self.__exec_open_limit_up)) time.sleep(0.5) create_thread(lambda: create_circulate_task(self.__exec_limit_down)) time.sleep(0.5) create_thread(lambda: create_circulate_task(self.__exec_ever_limit_down)) time.sleep(0.5) create_thread(lambda: create_circulate_task(self.__exec_fengkou)) time.sleep(0.5) create_thread(lambda: create_circulate_task(self.__exec_fengkou_best)) time.sleep(0.5) create_thread(lambda: create_circulate_task(self.__exec_fengxiang)) time.sleep(0.5) create_thread(lambda: create_circulate_task(self.__exec_industry_rank)) time.sleep(0.5) create_thread(lambda: create_circulate_task(self.__exec_jingxuan_rank)) def __init__(self): self.__init_data() wx.Frame.__init__(self, None, -1, "开盘啦数据采集客户端", style=wx.CAPTION ^ wx.MINIMIZE_BOX ^ wx.CLOSE_BOX ^ wx.STAY_ON_TOP, size=(800, 400)) self.SetBackgroundColour(wx.Colour(224, 224, 224)) self.Bind(wx.EVT_CLOSE, self.OnExit) boxsier = wx.FlexGridSizer(5, 2, 10, 20) self.item_bidding = self.__create_item("竞价采集", trade_time=False, circulate=False) boxsier.Add(self.item_bidding[0], 0, wx.TOP, 5) self.items = [] names = ["涨停采集", "炸板采集", "跌停采集", "曾跌停采集", "市场风口采集", "最强风口采集", "风向标采集", "行业涨幅", "精选流入"] for name in names: self.items.append(self.__create_item(name)) boxsier.Add(self.items[-1][0], 0, wx.TOP, 5) # 绑定事件 self.item_bidding[1].Bind(wx.EVT_BUTTON, self.__capture_bidding) for i in range(0, len(names)): self.items[i][1].Bind(wx.EVT_BUTTON, lambda e, x=i: self.__capture_btn_click(x, e)) self.items[i][2].Bind(wx.EVT_CHECKBOX, lambda e, x=i: self.__capture_btn_check(x, e)) root_boxsier = wx.BoxSizer(wx.HORIZONTAL) left_boxsier = wx.BoxSizer(wx.VERTICAL) left_boxsier.Add(boxsier) whole_boxsier = wx.BoxSizer(wx.HORIZONTAL) self.start_all_btn = wx.Button(self, label="开启全部任务", size=(120, 28)) self.start_all_btn.SetForegroundColour("#008040") self.start_all_btn.Bind(wx.EVT_BUTTON, self.__start_all_task) self.stop_all_btn = wx.Button(self, label="关闭全部任务", size=(120, 28)) self.stop_all_btn.SetForegroundColour("#FF0000") self.stop_all_btn.Bind(wx.EVT_BUTTON, self.__stop_all_task) whole_boxsier.Add(self.stop_all_btn) whole_boxsier.Add(self.start_all_btn, 1, wx.LEFT, 25) left_boxsier.Add(whole_boxsier, 1, wx.TOP, 30) root_boxsier.Add(left_boxsier, 1, wx.LEFT, 10) self.text_log = wx.TextCtrl(parent=self, size=(500, self.Size[1]), style=wx.TE_MULTILINE) self.text_log.SetEditable(False) self.text_log.SetBackgroundColour("#EEE") self.text_log.SetForegroundColour("#333") handler = MyLogHandler(self.text_log) self.logger = logging.getLogger(__name__) self.logger.addHandler(handler) self.logger.setLevel(logging.DEBUG) root_boxsier.Add(self.text_log, 1, wx.LEFT, 10) self.SetSizer(root_boxsier) self.timer = wx.Timer(self) # 创建定时器 # self.Bind(wx.EVT_TIMER, self.clear_msg, self.timer) self.logger.info("初始化成功") self.__init_capture_threads() # 开启定时器,在9点26时读取竞价数据 def schedule_fun(): self.__capture_bidding(None) schedule.every().day.at("09:26:00").do(schedule_fun) #lambda: self.__capture_bidding(None) t1 = threading.Thread(target=self.run_schedule) t1.setDaemon(True) t1.start() # 自动开启采集 self.__start_all_task(None) def run_schedule(self): while True: schedule.run_pending() def __init_data(self): self.capture_status = {} self.keys = ["limit_up", "open_limit_up", "limit_down", "ever_limit_down", "feng_kou", "best_feng_kou", "feng_xiang", "industry_rank", "jingxuan_rank"] for key in self.keys: self.capture_status[key] = [False, True] def __set_btn_text(self, controls): text = controls[1].GetLabelText() if text.find("开启") == 0: text = text.replace("开启", "关闭") else: text = text.replace("关闭", "开启") controls[1].SetLabelText(text) def __capture_bidding(self, event): results = kpl_api.daBanList(kpl_api.DABAN_TYPE_BIDDING) result = json.loads(results) self.logger.info(f"获取到竞价数据:{len(result['list'])}") self.__upload_data("biddings", result) # show_info("采集成功", None) def __capture_btn_click(self, key_index, event): self.__set_btn_text(self.items[key_index]) self.capture_status[self.keys[key_index]][0] = not self.capture_status[self.keys[key_index]][0] def __capture_btn_check(self, key_index, event): self.capture_status[self.keys[key_index]][1] = not self.capture_status[self.keys[key_index]][1] def __can_capture(self, key_index): if not self.capture_status[self.keys[key_index]][0]: return False if self.capture_status[self.keys[key_index]][1] and not tool.is_trade_time(): return False return True def __exec_limit_up(self): if not self.__can_capture(0): return try: results = kpl_api.daBanList(kpl_api.DABAN_TYPE_LIMIT_UP) result = json.loads(results) self.logger.info(f"涨停代码数量:{len(result['list'])}") self.__upload_data(self.keys[0], result) except: pass def __exec_open_limit_up(self): if not self.__can_capture(1): return try: results = kpl_api.daBanList(kpl_api.DABAN_TYPE_OPEN_LIMIT_UP) result = json.loads(results) self.logger.info(f"炸板代码数量:{len(result['list'])}") self.__upload_data(self.keys[1], result) except: pass def __exec_limit_down(self): if not self.__can_capture(2): return try: results = kpl_api.daBanList(kpl_api.DABAN_TYPE_LIMIT_DOWN) result = json.loads(results) self.logger.info(f"跌停代码数量:{len(result['list'])}") self.__upload_data(self.keys[2], result) except: pass def __exec_ever_limit_down(self): if not self.__can_capture(3): return try: results = kpl_api.daBanList(kpl_api.DABAN_TYPE_EVER_LIMIT_DOWN) result = json.loads(results) self.logger.info(f"曾经跌停代码数量:{len(result['list'])}") self.__upload_data(self.keys[3], result) except: pass def __exec_fengkou(self): if not self.__can_capture(4): return try: results = kpl_api.getFengKouList(tool.get_now_date_str()) result = json.loads(results) self.logger.info(f"市场风口代码数量:{len(result['List'])}") self.__upload_data(self.keys[4], result) except: pass def __exec_fengkou_best(self): try: if not self.__can_capture(5): return if not os.path.exists("D:/kpl/GetFengKListBest_0_0.log"): return with open("D:/kpl/GetFengKListBest_0_0.log", mode='r', encoding="utf-16") as f: lines = f.readlines() if lines: data = json.loads(lines[0]) self.logger.info(f"最强风口代码数量:{data['Count']}") self.__upload_data(self.keys[5], data) except: pass def __exec_fengxiang(self): if not self.__can_capture(6): return self.logger.info(f"开始风向标采集") try: results = kpl_api.getFengXiangBiao() result = json.loads(results) self.logger.info(f"风向标代码数量:{len(result['list'])}") self.__upload_data(self.keys[6], result) except Exception as e: self.logger.error(str(e)) def __exec_industry_rank(self): if not self.__can_capture(7): return try: results = kpl_api.getMarketIndustryRealRankingInfo() result = json.loads(results) self.logger.info(f"行业涨幅排行代码数量:{len(result['list'])}") self.__upload_data(self.keys[7], result) except: pass def __exec_jingxuan_rank(self): if not self.__can_capture(8): return try: results = kpl_api.getMarketJingXuanRealRankingInfo() result = json.loads(results) self.logger.info(f"精选流入排行代码数量:{len(result['list'])}") self.__upload_data(self.keys[8], result) except: pass # 开始所有的任务 def __start_all_task(self, event): for i in range(0, len(self.items)): # 是否已经开启 if not self.capture_status[self.keys[i]][0]: self.__capture_btn_click(i, None) # 开始所有的任务 def __stop_all_task(self, event): for i in range(0, len(self.items)): # 是否已经开启 if self.capture_status[self.keys[i]][0]: self.__capture_btn_click(i, None) def __upload_data(self, type, datas): root_data = { "type": type, "data": datas } requests.post("http://192.168.3.252:9004/upload_kpl_data", json.dumps(root_data)) def OnExit(self, e): sys.exit(0) class mainApp(wx.App): def OnInit(self): self.SetAppName("开盘啦") # 传递代码设置成功的回调 self.kpl = KPLDataCaptureFrame() self.kpl.Show() return True # cd D:\workspace\GP\trade_desk # D:\workspace\GP\trade_desk\dist\env\pk_env\Scripts\pyinstaller.exe kpl/gui.spec # if __name__ == "__main__": app = mainApp(redirect=True, filename="logs/output.log") app.MainLoop()