| | |
| | | import time |
| | | |
| | | import alert_util |
| | | import client_manager |
| | | import code_volumn_manager |
| | | import data_process |
| | | import global_data_loader |
| | | import global_util |
| | | import gpcode_manager |
| | | import authority |
| | |
| | | import l2_data_log |
| | | import l2_data_manager |
| | | import l2_data_manager_new |
| | | import log |
| | | import l2_data_util |
| | | import ths_industry_util |
| | | import ths_util |
| | | import tool |
| | | import trade_manager |
| | | import l2_code_operate |
| | | from code_data_util import ZYLTGBUtil |
| | | |
| | | from log import logger_l2_error, logger_l2_process, logger_device, logger_trade_delegate |
| | | from trade_data_manager import TradeCancelDataManager |
| | | from log import logger_l2_error, logger_device, logger_trade_delegate |
| | | from trade_queue_manager import THSBuy1VolumnManager |
| | | |
| | | |
| | |
| | | _start_time = round(time.time() * 1000) |
| | | |
| | | # level2盘口数据 |
| | | day, client, channel, code, capture_time, process_time, datas = l2_data_manager.parseL2Data( |
| | | day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data( |
| | | _str) |
| | | # 10ms的网络传输延时 |
| | | capture_timestamp = __start_time - process_time - 10 |
| | |
| | | "l2数据正确性判断时间") |
| | | if gpcode_manager.is_listen(code): |
| | | l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp) |
| | | # 保存原始数据数量 |
| | | l2_data_util.save_l2_latest_data_number(code, len(origin_datas)) |
| | | except l2_data_manager.L2DataException as l: |
| | | # 单价不符 |
| | | if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: |
| | |
| | | logger_l2_error.error("出错:{}".format(str(e))) |
| | | logger_l2_error.error("内容:{}".format(_str)) |
| | | finally: |
| | | |
| | | __end_time = round(time.time() * 1000) |
| | | # 只记录大于40ms的数据 |
| | | if __end_time - __start_time > 40: |
| | |
| | | True) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | elif type == 10: |
| | | # level2交易队列 |
| | | try: |
| | | code, setData = data_process.parseL2TradeQueueData(_str) |
| | | if gpcode_manager.is_listen(code): |
| | | data_process.saveL2Data(day, code, setData) |
| | | except: |
| | | print("异常") |
| | | elif type == 1: |
| | | # 设置股票代码 |
| | | data_list = data_process.parseGPCode(_str) |
| | | data_process.saveZYLTSZ(data_list) |
| | | ZYLTGBUtil.save_list(data_list) |
| | | code_list = [] |
| | | for data in data_list: |
| | | code_list.append(data["code"]) |
| | | |
| | | gpcode_manager.set_gp_list(code_list) |
| | | # 获取基本信息 |
| | | code_datas = juejin.JueJinManager.get_gp_latest_info(code_list) |
| | | gpcode_manager.set_gp_list(code_datas) |
| | | # 重新订阅 |
| | | self.server.pipe_juejin.send(json.dumps({"type": "resub"})) |
| | | # 同步同花顺目标代码 |
| | |
| | | dataList = data_process.parseGPCode(_str) |
| | | # 设置涨停时间 |
| | | gpcode_manager.set_limit_up_list(dataList) |
| | | # 保存到内存中 |
| | | if dataList: |
| | | global_data_loader.add_limit_up_codes(dataList) |
| | | ths_industry_util.set_industry_hot_num(dataList) |
| | | elif type == 3: |
| | | # 交易成功信息 |
| | |
| | | index = data["index"] |
| | | code_name = data["codeName"] |
| | | volumn = data["volumn"] |
| | | price = data["price"] |
| | | time_ = data["time"] |
| | | code = global_util.name_codes.get(code_name) |
| | | if code is None: |
| | | global_util.load_name_codes() |
| | | global_data_loader.load_name_codes() |
| | | code = global_util.name_codes.get(code_name) |
| | | if code is not None: |
| | | # 校正时间 |
| | | seconds = tool.get_time_as_second(time_) |
| | | if seconds % 3 > 0: |
| | | seconds = seconds - seconds % 3 |
| | | time_ = tool.time_seconds_format(seconds) |
| | | # 保存数据 |
| | | self.buy1_volumn_manager.save(code, time_, volumn) |
| | | need_sync = self.buy1_volumn_manager.save(code, time_, volumn,price) |
| | | if need_sync: |
| | | # 同步数据 |
| | | l2_data_manager.L2LimitUpMoneyStatisticUtil.verify_num(code, volumn, time_) |
| | | |
| | | elif type == 30: |
| | | # 心跳信息 |
| | |
| | | client_id = data["client"] |
| | | thsDead = data.get("thsDead") |
| | | logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data))) |
| | | data_process.saveClientActive(int(client_id), host, thsDead) |
| | | client_manager.saveClientActive(int(client_id), host, thsDead) |
| | | if ths_util.is_ths_dead(client_id): |
| | | # TODO 重启同花顺 |
| | | # 报警 |
| | |
| | | |
| | | |
| | | def send_msg(client_id, data): |
| | | _ip = data_process.getActiveClientIP(client_id) |
| | | _ip = client_manager.getActiveClientIP(client_id) |
| | | print("ip", client_id, _ip) |
| | | if _ip is None or len(_ip) <= 0: |
| | | raise Exception("客户端IP为空") |