| | |
| | | |
| | | |
| | | class MyBaseRequestHandle(socketserver.BaseRequestHandler): |
| | | reset_code_dict = {} |
| | | set_operate_code_state_dict = {} |
| | | l2_data_error_dict = {} |
| | | last_trade_delegate_data = None |
| | | buy1_volumn_manager = THSBuy1VolumnManager() |
| | |
| | | data = sk.recv(102400) |
| | | if len(data) == 0: |
| | | # print("客户端断开连接") |
| | | break; |
| | | break |
| | | _str = str(data, encoding="gbk") |
| | | if len(_str) > 0: |
| | | # print("结果:",_str) |
| | |
| | | if type == 0: |
| | | |
| | | try: |
| | | origin_start_time = round(time.time() * 1000) |
| | | __start_time = round(time.time() * 1000) |
| | | _start_time = round(time.time() * 1000) |
| | | |
| | | # level2盘口数据 |
| | | day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data( |
| | | _str) |
| | | # 10ms的网络传输延时 |
| | | capture_timestamp = __start_time - process_time - 10 |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | "截图时间:{} 数据解析时间".format(process_time)) |
| | | |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "数据解析时间") |
| | | # try: |
| | | # self.pipe_ui.send( |
| | | # json.dumps({"type": "l2_data_notify", "data": {"count": len(datas), "code": code}})) |
| | | # except: |
| | | # pass |
| | | |
| | | # 过时 保存l2截图时间 |
| | | # TradeCancelDataManager.save_l2_capture_time(client, channel, code, capture_time) |
| | | cid, pid = gpcode_manager.get_listen_code_pos(code) |
| | | |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | "l2获取代码位置耗时") |
| | | # 判断目标代码位置是否与上传数据位置一致 |
| | | if cid is not None and pid is not None and client == int(cid) and channel == int(pid): |
| | | try: |
| | | # print("L2数据接受",day,code,len(datas)) |
| | | # 查询 |
| | | code_ = gpcode_manager.get_listen_code_by_pos(client, channel) |
| | | if code_ != code: |
| | | key = "{}-{}-{}".format(client, channel, code) |
| | | |
| | | # 间隔2s |
| | | if key not in self.reset_code_dict or round( |
| | | time.time() * 1000) - self.reset_code_dict[key] > 2000: |
| | | |
| | | self.l2CodeOperate.set_operate_code_state(client, channel, 0) |
| | | self.reset_code_dict[key] = round(time.time() * 1000) |
| | | if code_ is None: |
| | | code_ = "" |
| | | if tool.is_trade_time(): |
| | | self.l2CodeOperate.repaire_operate(int(client), int(channel), code_) |
| | | else: |
| | | key = "{}-{}".format(client, channel) |
| | | if key not in self.set_operate_code_state_dict or round( |
| | | time.time() * 1000) - self.set_operate_code_state_dict[key] > 1000: |
| | | self.set_operate_code_state_dict[key] = round(time.time() * 1000) |
| | | self.l2CodeOperate.set_operate_code_state(client, channel, 1) |
| | | |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | "l2数据正确性判断时间") |
| | | # 校验客户端代码 |
| | | l2_code_operate.verify_with_l2_data_pos_info(code, client, channel) |
| | | __start_time = round(time.time() * 1000) |
| | | if gpcode_manager.is_listen(code): |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | "l2外部数据预处理耗时") |
| | | l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp) |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | "l2数据有效处理外部耗时", |
| | | False) |
| | | # 保存原始数据数量 |
| | | l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) |
| | | if round(time.time() * 1000) - __start_time > 20: |
| | | l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | "异步保存原始数据条数耗时", |
| | | False) |
| | | |
| | | except l2_data_manager.L2DataException as l: |
| | | # 单价不符 |
| | | if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: |
| | |
| | | |
| | | __end_time = round(time.time() * 1000) |
| | | # 只记录大于40ms的数据 |
| | | if __end_time - __start_time > 40: |
| | | l2_data_log.l2_time(code, round(time.time() * 1000) - _start_time, "l2数据处理总耗时", |
| | | if __end_time - origin_start_time > 100: |
| | | l2_data_log.l2_time(code, round(time.time() * 1000) - origin_start_time, |
| | | "l2数据处理总耗时", |
| | | True) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | |
| | | # 获取基本信息 |
| | | code_datas = juejin.JueJinManager.get_gp_latest_info(code_list) |
| | | gpcode_manager.set_gp_list(code_datas) |
| | | # 重新订阅 |
| | | self.server.pipe_juejin.send(json.dumps({"type": "resub"})) |
| | | |
| | | # 同步同花顺目标代码 |
| | | t1 = threading.Thread(target=lambda: sync_target_codes_to_ths()) |
| | | t1.setDaemon(True) |
| | |
| | | seconds = seconds - seconds % 3 |
| | | time_ = tool.time_seconds_format(seconds) |
| | | # 保存数据 |
| | | need_sync = self.buy1_volumn_manager.save(code, time_, volumn,price) |
| | | need_sync, need_cancel, cancel_msg = self.buy1_volumn_manager.save(code, time_, volumn, |
| | | price) |
| | | if need_cancel: |
| | | l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg) |
| | | if need_sync: |
| | | # 同步数据 |
| | | l2_data_manager.L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_) |
| | | |
| | | |
| | | elif type == 30: |
| | | # 心跳信息 |