Administrator
2022-11-03 f03eb72394a3fac097bb3ab1f956a83f99f7bd0e
server.py
@@ -45,8 +45,6 @@
class MyBaseRequestHandle(socketserver.BaseRequestHandler):
    reset_code_dict = {}
    set_operate_code_state_dict = {}
    l2_data_error_dict = {}
    last_trade_delegate_data = None
    buy1_volumn_manager = THSBuy1VolumnManager()
@@ -73,7 +71,7 @@
            data = sk.recv(102400)
            if len(data) == 0:
                # print("客户端断开连接")
                break;
                break
            _str = str(data, encoding="gbk")
            if len(_str) > 0:
                # print("结果:",_str)
@@ -82,57 +80,41 @@
                if type == 0:
                    try:
                        origin_start_time = round(time.time() * 1000)
                        __start_time = round(time.time() * 1000)
                        _start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
                            _str)
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                           "截图时间:{} 数据解析时间".format(process_time))
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "数据解析时间")
                        # try:
                        #     self.pipe_ui.send(
                        #         json.dumps({"type": "l2_data_notify", "data": {"count": len(datas), "code": code}}))
                        # except:
                        #     pass
                        # 过时 保存l2截图时间
                        # TradeCancelDataManager.save_l2_capture_time(client, channel, code, capture_time)
                        cid, pid = gpcode_manager.get_listen_code_pos(code)
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                           "l2获取代码位置耗时")
                        # 判断目标代码位置是否与上传数据位置一致
                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                            try:
                                # print("L2数据接受",day,code,len(datas))
                                # 查询
                                code_ = gpcode_manager.get_listen_code_by_pos(client, channel)
                                if code_ != code:
                                    key = "{}-{}-{}".format(client, channel, code)
                                    # 间隔2s
                                    if key not in self.reset_code_dict or round(
                                            time.time() * 1000) - self.reset_code_dict[key] > 2000:
                                        self.l2CodeOperate.set_operate_code_state(client, channel, 0)
                                        self.reset_code_dict[key] = round(time.time() * 1000)
                                        if code_ is None:
                                            code_ = ""
                                        if tool.is_trade_time():
                                            self.l2CodeOperate.repaire_operate(int(client), int(channel), code_)
                                else:
                                    key = "{}-{}".format(client, channel)
                                    if key not in self.set_operate_code_state_dict or round(
                                            time.time() * 1000) - self.set_operate_code_state_dict[key] > 1000:
                                        self.set_operate_code_state_dict[key] = round(time.time() * 1000)
                                        self.l2CodeOperate.set_operate_code_state(client, channel, 1)
                                __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                   "l2数据正确性判断时间")
                                # 校验客户端代码
                                l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                __start_time = round(time.time() * 1000)
                                if gpcode_manager.is_listen(code):
                                    __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                       "l2外部数据预处理耗时")
                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                                    __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                       "l2数据有效处理外部耗时",
                                                                       False)
                                    # 保存原始数据数量
                                    l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                    if round(time.time() * 1000) - __start_time > 20:
                                        l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                            "异步保存原始数据条数耗时",
                                                            False)
                            except l2_data_manager.L2DataException as l:
                                # 单价不符
                                if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
@@ -155,8 +137,9 @@
                                __end_time = round(time.time() * 1000)
                                # 只记录大于40ms的数据
                                if __end_time - __start_time > 40:
                                    l2_data_log.l2_time(code, round(time.time() * 1000) - _start_time, "l2数据处理总耗时",
                                if __end_time - origin_start_time > 100:
                                    l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time,
                                                        "l2数据处理总耗时",
                                                        True)
                    except Exception as e:
                        logging.exception(e)
@@ -171,8 +154,7 @@
                    # 获取基本信息
                    code_datas = juejin.JueJinManager.get_gp_latest_info(code_list)
                    gpcode_manager.set_gp_list(code_datas)
                    # 重新订阅
                    self.server.pipe_juejin.send(json.dumps({"type": "resub"}))
                    # 同步同花顺目标代码
                    t1 = threading.Thread(target=lambda: sync_target_codes_to_ths())
                    t1.setDaemon(True)
@@ -257,11 +239,13 @@
                                seconds = seconds - seconds % 3
                            time_ = tool.time_seconds_format(seconds)
                            # 保存数据
                            need_sync = self.buy1_volumn_manager.save(code, time_, volumn,price)
                            need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn,
                                                                                               price)
                            if need_cancel:
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg)
                            if need_sync:
                                # 同步数据
                                l2_data_manager.L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_)
                elif type == 30:
                    # 心跳信息