驾驶舱监听,不再处理同花顺14:55以后的交易队列数据
| | |
| | | import data_export_util |
| | | import multiprocessing |
| | | |
| | | import log |
| | | import mysql_data |
| | | import redis_manager |
| | | import server |
| | |
| | | |
| | | |
| | | # 读取server进程的消息 |
| | | from trade_data_manager import CodeActualPriceProcessor |
| | | |
| | | |
| | | def __read_server_pipe(pipe): |
| | | while True: |
| | | value = pipe.recv() |
| | |
| | | |
| | | class GUI: |
| | | |
| | | |
| | | def __init__(self): |
| | | p1, p2 = multiprocessing.Pipe() |
| | | gs_gui_pipe, gs_server_pipe = multiprocessing.Pipe() |
| | |
| | | self.p1 = p1 |
| | | self.p2 = p2 |
| | | self.gs_gui_pipe = gs_gui_pipe |
| | | self.thsBuy1VolumnManager = THSBuy1VolumnManager() |
| | | self.codeActualPriceProcessor = CodeActualPriceProcessor() |
| | | # L2显示 |
| | | self.l2_codes = {} |
| | | # 获取l2的客户端列表 |
| | |
| | | self.l2_codes[client_id].append(code) |
| | | |
| | | # 读取server进程的消息 |
| | | def __read_gui_server_pipe(self,pipe): |
| | | def __read_gui_server_pipe(self, pipe): |
| | | while True: |
| | | value = pipe.recv() |
| | | if value is not None: |
| | | value = json.loads(value) |
| | | if value.get("type") == "l2_data_notify": |
| | | code = value["data"]["code"] |
| | | count =value["data"]["count"] |
| | | print("l2数据通知:{}-{}", code,count) |
| | | count = value["data"]["count"] |
| | | print("l2数据通知:{}-{}", code, count) |
| | | |
| | | time.sleep(0.1) |
| | | |
| | |
| | | cl_win.configure(text="异常:{}".format(str(e)), foreground="#FF7F27") |
| | | |
| | | try: |
| | | juejin_length= JueJinManager.get_listen_codes_lenth() |
| | | juejin_length = JueJinManager.get_listen_codes_lenth() |
| | | codes_length = len(gpcode_manager.get_gp_list()) |
| | | cl_codes.configure(text="{}/{}".format(juejin_length,codes_length), foreground="#008000") |
| | | cl_codes.configure(text="{}/{}".format(juejin_length, codes_length), foreground="#008000") |
| | | except Exception as e: |
| | | pass |
| | | |
| | | try: |
| | | codes = self.thsBuy1VolumnManager.get_current_codes() |
| | | count = 0 |
| | | if codes: |
| | | count = len(codes) |
| | | cl_buy_1.configure(text="{}".format(count), foreground="#008000") |
| | | except: |
| | | pass |
| | | |
| | | |
| | | try: |
| | | cl_price_count.configure(text="{}".format(self.codeActualPriceProcessor.get_current_price_codes_count()), foreground="#008000") |
| | | except: |
| | | pass |
| | | |
| | | # 状态有问题,需要报警 |
| | | if not normal: |
| | |
| | | cl_codes = Label(frame, text="未知", bg="#DDDDDD") |
| | | cl_codes.place(x=450, y=y_) |
| | | |
| | | cl = Label(frame, text="买1代码数量:", bg="#DDDDDD") |
| | | cl.place(x=500, y=y_) |
| | | cl_buy_1 = Label(frame, text="未知", bg="#DDDDDD") |
| | | cl_buy_1.place(x=580, y=y_) |
| | | |
| | | cl = Label(frame, text="现价代码数量:", bg="#DDDDDD") |
| | | cl.place(x=620, y=y_) |
| | | cl_price_count = Label(frame, text="未知", bg="#DDDDDD") |
| | | cl_price_count.place(x=700, y=y_) |
| | | |
| | | refresh_data() |
| | | # 添加更新线程 |
| | | t1 = threading.Thread(target=lambda: update_data()) |
| | |
| | | code = gpcode_manager.get_listen_code_by_pos(client_id, i) |
| | | data_count = l2_data_util.get_l2_latest_data_number(code) |
| | | if data_count is None: |
| | | data_count=0 |
| | | data_count = 0 |
| | | if code is not None and len(code) > 0: |
| | | code_sv_map[client_id][i].set(code + "({})".format(data_count)) |
| | | else: |
| | |
| | | showinfo("提示", "导出完成") |
| | | |
| | | def compute_m(code): |
| | | m,msg = L2TradeFactorUtil.compute_m_value(code) |
| | | m, msg = L2TradeFactorUtil.compute_m_value(code) |
| | | showinfo("提示", "{}".format(m)) |
| | | |
| | | def clear_l2(code): |
| | |
| | | data_ = (symbol, time_, tick["quotes"][0]["bid_v"], tick["quotes"][0]["bid_p"]) |
| | | logger_juejin_tick.info("买1量 {},{},{},{}", data_[1], data_[0], data_[2], |
| | | data_[3]) |
| | | need_sync = __jueJinBuy1VolumnManager.save(data_[0], data_[1], data_[2], data_[3]) |
| | | if need_sync: |
| | | # 同步数据 |
| | | L2LimitUpMoneyStatisticUtil.verify_num(data_[0], data_[2], data_[1]) |
| | | # 暂时不采用 |
| | | # need_sync = __jueJinBuy1VolumnManager.save(data_[0], data_[1], data_[2], data_[3]) |
| | | # if need_sync: |
| | | # # 同步数据 |
| | | # L2LimitUpMoneyStatisticUtil.verify_num(data_[0], data_[2], data_[1]) |
| | | |
| | | # print(tick["created_at"],tick["quotes"][0]["bid_v"]) |
| | | |
| | |
| | | # 获取到现价 |
| | | def accpt_prices(prices): |
| | | print("价格代码数量:", len(prices)) |
| | | __actualPriceProcessor.save_current_price_codes_count(len(prices)) |
| | | now_str = datetime.datetime.now().strftime("%H:%M:%S") |
| | | now_strs = now_str.split(":") |
| | | now_second = int(now_strs[0]) * 60 * 60 + int(now_strs[1]) * 60 + int(now_strs[2]) |
| | |
| | | # 同步数据 |
| | | l2_data_manager.L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_) |
| | | |
| | | |
| | | elif type == 30: |
| | | # 心跳信息 |
| | | data = data_process.parse(_str)["data"] |
| | |
| | | else: |
| | | return int(val) |
| | | |
| | | def __save_current_price_codes_count(self,count): |
| | | key = "current_price_codes_count" |
| | | self.__get_redis().setex(key,10,count) |
| | | |
| | | def __get_current_price_codes_count(self): |
| | | key = "current_price_codes_count" |
| | | count = self.__get_redis().get(key) |
| | | return 0 if count is None else count |
| | | |
| | | def process_rate(self, code, rate, time_str): |
| | | # 9点半之前的数据不处理 |
| | | if int(time_str.replace(":", "")) < int("093000"): |
| | |
| | | global_util.cuurent_prices[code] = (price, is_limit_up, round(time.time())) |
| | | pass |
| | | |
| | | # 现价代码数量 |
| | | def save_current_price_codes_count(self, count): |
| | | self.__save_current_price_codes_count(count) |
| | | |
| | | def get_current_price_codes_count(self): |
| | | return self.__get_current_price_codes_count() |
| | | |
| | | |
| | | # 是否为水下捞 |
| | | def is_under_water(self, code): |
| | | time_seconds = self.__get_down_price_time_as_seconds(code) |
| | |
| | | val = json.loads(val) |
| | | return val[0], val[1] |
| | | |
| | | # 添加记录 |
| | | def __add_recod(self, code): |
| | | key = "buy1_volumn_codes" |
| | | self.__get_redis().sadd(key, code) |
| | | self.__get_redis().expire(key, 10) |
| | | |
| | | # 获取当前正在监听的代码 |
| | | def get_current_codes(self): |
| | | key = "buy1_volumn_codes" |
| | | return self.__get_redis().smembers(key) |
| | | |
| | | # 返回是否需要更新数据 |
| | | def save(self, code, time_str, volumn,price): |
| | | def save(self, code, time_str, volumn, price): |
| | | # 客户端数据未加载出来过滤 |
| | | if volumn < 1: |
| | | return False |
| | | # 14:55:00之后不在处理 |
| | | if int(time_str.replace(':', '')) >= int("145500"): |
| | | return False |
| | | |
| | | self.__add_recod(code) |
| | | # 判断是否为涨停价 |
| | | limit_up_price = gpcode_manager.get_limit_up_price(code) |
| | | if limit_up_price != tool.to_price(decimal.Decimal(price)): |
| | |
| | | time_str, volumn = self.__get_latest_record(code) |
| | | return time_str, volumn |
| | | |
| | | if __name__ == '__main__': |
| | | |
| | | JueJinBuy1VolumnManager().save("001203", "15:00:00", 40586553, 12.12) |
| | | if __name__ == '__main__': |
| | | JueJinBuy1VolumnManager().save("001203", "15:00:00", 40586553, 12.12) |