Administrator
2022-10-28 fa6a3ab958ce9493833eef68ac62bd155be5d53e
驾驶舱监听,不再处理同花顺14:55以后的交易队列数据
5个文件已修改
94 ■■■■ 已修改文件
gui.py 44 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
juejin.py 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
server.py 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_data_manager.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
trade_queue_manager.py 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui.py
@@ -10,6 +10,7 @@
import data_export_util
import multiprocessing
import log
import mysql_data
import redis_manager
import server
@@ -22,6 +23,9 @@
# 读取server进程的消息
from trade_data_manager import CodeActualPriceProcessor
def __read_server_pipe(pipe):
    while True:
        value = pipe.recv()
@@ -64,7 +68,6 @@
class GUI:
    def __init__(self):
        p1, p2 = multiprocessing.Pipe()
        gs_gui_pipe, gs_server_pipe = multiprocessing.Pipe()
@@ -74,6 +77,8 @@
        self.p1 = p1
        self.p2 = p2
        self.gs_gui_pipe = gs_gui_pipe
        self.thsBuy1VolumnManager = THSBuy1VolumnManager()
        self.codeActualPriceProcessor = CodeActualPriceProcessor()
        # L2显示
        self.l2_codes = {}
        # 获取l2的客户端列表
@@ -85,15 +90,15 @@
                self.l2_codes[client_id].append(code)
    # 读取server进程的消息
    def __read_gui_server_pipe(self,pipe):
    def __read_gui_server_pipe(self, pipe):
        while True:
            value = pipe.recv()
            if value is not None:
                value = json.loads(value)
                if value.get("type") == "l2_data_notify":
                    code = value["data"]["code"]
                    count =value["data"]["count"]
                    print("l2数据通知:{}-{}", code,count)
                    count = value["data"]["count"]
                    print("l2数据通知:{}-{}", code, count)
            time.sleep(0.1)
@@ -285,13 +290,26 @@
                cl_win.configure(text="异常:{}".format(str(e)), foreground="#FF7F27")
            try:
                juejin_length= JueJinManager.get_listen_codes_lenth()
                juejin_length = JueJinManager.get_listen_codes_lenth()
                codes_length = len(gpcode_manager.get_gp_list())
                cl_codes.configure(text="{}/{}".format(juejin_length,codes_length), foreground="#008000")
                cl_codes.configure(text="{}/{}".format(juejin_length, codes_length), foreground="#008000")
            except Exception as e:
                pass
            try:
                codes = self.thsBuy1VolumnManager.get_current_codes()
                count = 0
                if codes:
                    count = len(codes)
                cl_buy_1.configure(text="{}".format(count), foreground="#008000")
            except:
                pass
            try:
                cl_price_count.configure(text="{}".format(self.codeActualPriceProcessor.get_current_price_codes_count()), foreground="#008000")
            except:
                pass
            # 状态有问题,需要报警
            if not normal:
@@ -335,6 +353,16 @@
        cl_codes = Label(frame, text="未知", bg="#DDDDDD")
        cl_codes.place(x=450, y=y_)
        cl = Label(frame, text="买1代码数量:", bg="#DDDDDD")
        cl.place(x=500, y=y_)
        cl_buy_1 = Label(frame, text="未知", bg="#DDDDDD")
        cl_buy_1.place(x=580, y=y_)
        cl = Label(frame, text="现价代码数量:", bg="#DDDDDD")
        cl.place(x=620, y=y_)
        cl_price_count = Label(frame, text="未知", bg="#DDDDDD")
        cl_price_count.place(x=700, y=y_)
        refresh_data()
        # 添加更新线程
        t1 = threading.Thread(target=lambda: update_data())
@@ -370,7 +398,7 @@
                    code = gpcode_manager.get_listen_code_by_pos(client_id, i)
                    data_count = l2_data_util.get_l2_latest_data_number(code)
                    if data_count is None:
                        data_count=0
                        data_count = 0
                    if code is not None and len(code) > 0:
                        code_sv_map[client_id][i].set(code + "({})".format(data_count))
                    else:
@@ -750,7 +778,7 @@
            showinfo("提示", "导出完成")
        def compute_m(code):
            m,msg = L2TradeFactorUtil.compute_m_value(code)
            m, msg = L2TradeFactorUtil.compute_m_value(code)
            showinfo("提示", "{}".format(m))
        def clear_l2(code):
juejin.py
@@ -207,10 +207,11 @@
        data_ = (symbol, time_, tick["quotes"][0]["bid_v"], tick["quotes"][0]["bid_p"])
        logger_juejin_tick.info("买1量 {},{},{},{}", data_[1], data_[0], data_[2],
                                data_[3])
        need_sync = __jueJinBuy1VolumnManager.save(data_[0], data_[1], data_[2], data_[3])
        if need_sync:
            # 同步数据
            L2LimitUpMoneyStatisticUtil.verify_num(data_[0], data_[2], data_[1])
        # 暂时不采用
        # need_sync = __jueJinBuy1VolumnManager.save(data_[0], data_[1], data_[2], data_[3])
        # if need_sync:
        #     # 同步数据
        #     L2LimitUpMoneyStatisticUtil.verify_num(data_[0], data_[2], data_[1])
        # print(tick["created_at"],tick["quotes"][0]["bid_v"])
@@ -241,6 +242,7 @@
# 获取到现价
def accpt_prices(prices):
    print("价格代码数量:", len(prices))
    __actualPriceProcessor.save_current_price_codes_count(len(prices))
    now_str = datetime.datetime.now().strftime("%H:%M:%S")
    now_strs = now_str.split(":")
    now_second = int(now_strs[0]) * 60 * 60 + int(now_strs[1]) * 60 + int(now_strs[2])
server.py
@@ -262,6 +262,7 @@
                                # 同步数据
                                l2_data_manager.L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息
                    data = data_process.parse(_str)["data"]
trade_data_manager.py
@@ -192,6 +192,15 @@
        else:
            return int(val)
    def __save_current_price_codes_count(self,count):
        key = "current_price_codes_count"
        self.__get_redis().setex(key,10,count)
    def __get_current_price_codes_count(self):
        key = "current_price_codes_count"
        count = self.__get_redis().get(key)
        return 0 if count is None else count
    def process_rate(self, code, rate, time_str):
        # 9点半之前的数据不处理
        if int(time_str.replace(":", "")) < int("093000"):
@@ -217,6 +226,14 @@
        global_util.cuurent_prices[code] = (price, is_limit_up, round(time.time()))
        pass
    # 现价代码数量
    def save_current_price_codes_count(self, count):
        self.__save_current_price_codes_count(count)
    def get_current_price_codes_count(self):
        return self.__get_current_price_codes_count()
    # 是否为水下捞
    def is_under_water(self, code):
        time_seconds = self.__get_down_price_time_as_seconds(code)
trade_queue_manager.py
@@ -46,11 +46,27 @@
        val = json.loads(val)
        return val[0], val[1]
    # 添加记录
    def __add_recod(self, code):
        key = "buy1_volumn_codes"
        self.__get_redis().sadd(key, code)
        self.__get_redis().expire(key, 10)
    # 获取当前正在监听的代码
    def get_current_codes(self):
        key = "buy1_volumn_codes"
        return self.__get_redis().smembers(key)
    # 返回是否需要更新数据
    def save(self, code, time_str, volumn,price):
    def save(self, code, time_str, volumn, price):
        # 客户端数据未加载出来过滤
        if volumn < 1:
            return False
        # 14:55:00之后不在处理
        if int(time_str.replace(':', '')) >= int("145500"):
            return False
        self.__add_recod(code)
        # 判断是否为涨停价
        limit_up_price = gpcode_manager.get_limit_up_price(code)
        if limit_up_price != tool.to_price(decimal.Decimal(price)):
@@ -133,6 +149,6 @@
        time_str, volumn = self.__get_latest_record(code)
        return time_str, volumn
if __name__ == '__main__':
    JueJinBuy1VolumnManager().save("001203", "15:00:00", 40586553, 12.12)
if __name__ == '__main__':
    JueJinBuy1VolumnManager().save("001203", "15:00:00", 40586553, 12.12)