| | |
| | | import logging |
| | | import socketserver |
| | | import socket |
| | | import threading |
| | | import time |
| | | |
| | | import data_process |
| | |
| | | __start_time = round(time.time() * 1000) |
| | | # level2盘口数据 |
| | | day, client, channel, code, datas = l2_data_manager.parseL2Data(_str) |
| | | |
| | | cid, pid = gpcode_manager.get_listen_code_pos(code) |
| | | # 判断目标代码位置是否与上传数据位置一致 |
| | | if cid is not None and pid is not None and client == int(cid) and channel == int(pid): |
| | | try: |
| | | # print("L2数据接受",day,code,len(datas)) |
| | | # 查询 |
| | |
| | | if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: |
| | | key = "{}-{}-{}".format(client, channel, code) |
| | | if key not in self.l2_data_error_dict or round( |
| | | time.time() * 1000) - self.l2_data_error_dict[key] > 2000: |
| | | self.l2CodeOperate.repaire_l2_data(code) |
| | | time.time() * 1000) - self.l2_data_error_dict[key] > 10000: |
| | | # self.l2CodeOperate.repaire_l2_data(code) |
| | | # todo 太敏感移除代码 |
| | | logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg) |
| | | # 单价不一致时需要移除代码重新添加 |
| | | l2_code_operate.L2CodeOperate().remove_l2_listen(code) |
| | | self.l2_data_error_dict[key] = round(time.time() * 1000) |
| | | |
| | | except Exception as e: |
| | |
| | | logger_l2_process.info("l2处理时间:{}-{}".format(code, __end_time - __start_time)); |
| | | except: |
| | | pass |
| | | |
| | | |
| | | |
| | | |
| | | elif type == 10: |
| | | # level2交易队列 |
| | | try: |
| | |
| | | data_process.saveL2Data(day, code, setData) |
| | | except: |
| | | print("异常") |
| | | |
| | | elif type == 1: |
| | | # 设置股票代码 |
| | | data_list = data_process.parseGPCode(_str) |
| | |
| | | gpcode_manager.set_gp_list(code_list) |
| | | # 重新订阅 |
| | | self.server.pipe.send(json.dumps({"type": "resub"})) |
| | | sync_target_codes_to_ths() |
| | | # 同步同花顺目标代码 |
| | | t1 = threading.Thread(target=lambda: sync_target_codes_to_ths()) |
| | | t1.setDaemon(True) |
| | | t1.start() |
| | | elif type == 2: |
| | | # 涨停代码 |
| | | codeList = data_process.parseGPCode(_str) |
| | |
| | | {"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}}) |
| | | except Exception as e: |
| | | return_str = data_process.toJson({"code": 1, "msg": str(e)}) |
| | | # 现价更新 |
| | | elif type == 40: |
| | | data = data_process.parse(_str)["data"] |
| | | if data is not None: |
| | | print("现价数量", len(data)) |
| | | for item in data: |
| | | juejin.accpt_price(item["code"], float(item["price"])) |
| | | |
| | | elif type == 30: |
| | | data = data_process.parse(_str)["data"] |
| | | client_id = data["client"] |
| | | if "memery" in data: |
| | | mem = data["memery"] |
| | | logger_device.info("({})内存使用率:{}".format(client_id, mem)) |
| | | logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data))) |
| | | data_process.saveClientActive(int(client_id), host) |
| | | # print("心跳:", client_id) |
| | | |
| | |
| | | send_msg(client, {"action": "test"}) |
| | | except: |
| | | pass |
| | | |
| | | # 矫正客户端代码 |
| | | l2_code_operate.correct_client_codes() |
| | | time.sleep(5) |
| | | |
| | | |