| | |
| | | import alert_util |
| | | import code_volumn_manager |
| | | import data_process |
| | | import global_util |
| | | import gpcode_manager |
| | | import authority |
| | | import juejin |
| | | import l2_data_log |
| | | import l2_data_manager |
| | | import l2_data_manager_new |
| | | import log |
| | | import ths_industry_util |
| | | import ths_util |
| | | import tool |
| | |
| | | |
| | | from log import logger_l2_error, logger_l2_process, logger_device, logger_trade_delegate |
| | | from trade_data_manager import TradeCancelDataManager |
| | | from trade_queue_manager import THSBuy1VolumnManager |
| | | |
| | | |
| | | class MyTCPServer(socketserver.TCPServer): |
| | | def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe=None): |
| | | self.pipe = pipe # 增加的参数 |
| | | def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_juejin=None, pipe_ui=None): |
| | | self.pipe_juejin = pipe_juejin # 增加的参数 |
| | | self.pipe_ui = pipe_ui |
| | | socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate) |
| | | |
| | | |
| | |
| | | set_operate_code_state_dict = {} |
| | | l2_data_error_dict = {} |
| | | last_trade_delegate_data = None |
| | | buy1_volumn_manager = THSBuy1VolumnManager() |
| | | |
| | | def setup(self): |
| | | super().setup() # 可以不调用父类的setup()方法,父类的setup方法什么都没做 |
| | |
| | | if len(data) == 0: |
| | | # print("客户端断开连接") |
| | | break; |
| | | _str = str(data, encoding="gb2312") |
| | | _str = str(data, encoding="gbk") |
| | | if len(_str) > 0: |
| | | # print("结果:",_str) |
| | | type = data_process.parseType(_str) |
| | |
| | | |
| | | try: |
| | | __start_time = round(time.time() * 1000) |
| | | _start_time = round(time.time() * 1000) |
| | | |
| | | # level2盘口数据 |
| | | day, client, channel, code, capture_time, process_time, datas = l2_data_manager.parseL2Data( |
| | | _str) |
| | | # 10ms的网络传输延时 |
| | | capture_timestamp = __start_time - process_time - 10 |
| | | # 保存l2截图时间 |
| | | TradeCancelDataManager.save_l2_capture_time(client, channel, code, capture_time) |
| | | |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "数据解析时间") |
| | | # try: |
| | | # self.pipe_ui.send( |
| | | # json.dumps({"type": "l2_data_notify", "data": {"count": len(datas), "code": code}})) |
| | | # except: |
| | | # pass |
| | | |
| | | # 过时 保存l2截图时间 |
| | | # TradeCancelDataManager.save_l2_capture_time(client, channel, code, capture_time) |
| | | cid, pid = gpcode_manager.get_listen_code_pos(code) |
| | | # 判断目标代码位置是否与上传数据位置一致 |
| | | if cid is not None and pid is not None and client == int(cid) and channel == int(pid): |
| | |
| | | self.set_operate_code_state_dict[key] = round(time.time() * 1000) |
| | | self.l2CodeOperate.set_operate_code_state(client, channel, 1) |
| | | |
| | | __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, |
| | | "l2数据正确性判断时间") |
| | | if gpcode_manager.is_listen(code): |
| | | l2_data_manager.L2TradeDataProcessor.process(code, datas, capture_timestamp) |
| | | l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp) |
| | | except l2_data_manager.L2DataException as l: |
| | | # 单价不符 |
| | | if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR: |
| | |
| | | __end_time = round(time.time() * 1000) |
| | | # 只记录大于40ms的数据 |
| | | if __end_time - __start_time > 40: |
| | | logger_l2_process.info("l2处理时间:{}-{}".format(code, __end_time - __start_time)); |
| | | except: |
| | | pass |
| | | l2_data_log.l2_time(code, round(time.time() * 1000) - _start_time, "l2数据处理总耗时", |
| | | True) |
| | | except Exception as e: |
| | | logging.exception(e) |
| | | elif type == 10: |
| | | # level2交易队列 |
| | | try: |
| | |
| | | |
| | | gpcode_manager.set_gp_list(code_list) |
| | | # 重新订阅 |
| | | self.server.pipe.send(json.dumps({"type": "resub"})) |
| | | self.server.pipe_juejin.send(json.dumps({"type": "resub"})) |
| | | # 同步同花顺目标代码 |
| | | t1 = threading.Thread(target=lambda: sync_target_codes_to_ths()) |
| | | t1.setDaemon(True) |
| | |
| | | elif type == 4: |
| | | # 行业代码信息 |
| | | dataList = data_process.parseList(_str) |
| | | data_process.saveIndustryCode(dataList) |
| | | ths_industry_util.save_industry_code(dataList) |
| | | elif type == 6: |
| | | # 可用金额 |
| | | datas = data_process.parseData(_str) |
| | |
| | | volumnUnit = item["volumnUnit"] |
| | | code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit) |
| | | juejin.accpt_prices(data) |
| | | elif type == 50: |
| | | data = data_process.parse(_str)["data"] |
| | | if data is not None: |
| | | index = data["index"] |
| | | code_name = data["codeName"] |
| | | volumn = data["volumn"] |
| | | time_ = data["time"] |
| | | code = global_util.name_codes.get(code_name) |
| | | if code is None: |
| | | global_util.load_name_codes() |
| | | code = global_util.name_codes.get(code_name) |
| | | if code is not None: |
| | | # 保存数据 |
| | | self.buy1_volumn_manager.save(code, time_, volumn) |
| | | |
| | | elif type == 30: |
| | | # 心跳信息 |
| | |
| | | if ths_util.is_ths_dead(client_id): |
| | | # TODO 重启同花顺 |
| | | # 报警 |
| | | alert_util.alarm() |
| | | l2_clients = authority.get_l2_clients() |
| | | if client_id in l2_clients: |
| | | alert_util.alarm() |
| | | # print("心跳:", client_id) |
| | | sk.send(return_str.encode()) |
| | | |
| | |
| | | try: |
| | | socketClient.send(json.dumps(data).encode()) |
| | | recv = socketClient.recv(1024) |
| | | result = recv.decode().lstrip() |
| | | result = str(recv, encoding="gbk") |
| | | return result |
| | | finally: |
| | | socketClient.close() |