Administrator
2022-10-21 892b50e242e3c59a738b92dfdfee1bf1ff8932f2
server.py
@@ -12,10 +12,14 @@
import alert_util
import code_volumn_manager
import data_process
import global_util
import gpcode_manager
import authority
import juejin
import l2_data_log
import l2_data_manager
import l2_data_manager_new
import log
import ths_industry_util
import ths_util
import tool
@@ -24,11 +28,13 @@
from log import logger_l2_error, logger_l2_process, logger_device, logger_trade_delegate
from trade_data_manager import TradeCancelDataManager
from trade_queue_manager import THSBuy1VolumnManager
class MyTCPServer(socketserver.TCPServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe=None):
        self.pipe = pipe  # 增加的参数
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, pipe_juejin=None, pipe_ui=None):
        self.pipe_juejin = pipe_juejin  # 增加的参数
        self.pipe_ui = pipe_ui
        socketserver.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)
@@ -41,6 +47,7 @@
    set_operate_code_state_dict = {}
    l2_data_error_dict = {}
    last_trade_delegate_data = None
    buy1_volumn_manager = THSBuy1VolumnManager()
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -65,7 +72,7 @@
            if len(data) == 0:
                # print("客户端断开连接")
                break;
            _str = str(data, encoding="gb2312")
            _str = str(data, encoding="gbk")
            if len(_str) > 0:
                # print("结果:",_str)
                type = data_process.parseType(_str)
@@ -74,14 +81,23 @@
                    try:
                        __start_time = round(time.time() * 1000)
                        _start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas = l2_data_manager.parseL2Data(
                            _str)
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
                        # 保存l2截图时间
                        TradeCancelDataManager.save_l2_capture_time(client, channel, code, capture_time)
                        __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time, "数据解析时间")
                        # try:
                        #     self.pipe_ui.send(
                        #         json.dumps({"type": "l2_data_notify", "data": {"count": len(datas), "code": code}}))
                        # except:
                        #     pass
                        # 过时 保存l2截图时间
                        # TradeCancelDataManager.save_l2_capture_time(client, channel, code, capture_time)
                        cid, pid = gpcode_manager.get_listen_code_pos(code)
                        # 判断目标代码位置是否与上传数据位置一致
                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
@@ -109,8 +125,10 @@
                                        self.set_operate_code_state_dict[key] = round(time.time() * 1000)
                                        self.l2CodeOperate.set_operate_code_state(client, channel, 1)
                                __start_time = l2_data_log.l2_time(code, round(time.time() * 1000) - __start_time,
                                                                   "l2数据正确性判断时间")
                                if gpcode_manager.is_listen(code):
                                    l2_data_manager.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp)
                            except l2_data_manager.L2DataException as l:
                                # 单价不符
                                if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
@@ -133,9 +151,10 @@
                                __end_time = round(time.time() * 1000)
                                # 只记录大于40ms的数据
                                if __end_time - __start_time > 40:
                                    logger_l2_process.info("l2处理时间:{}-{}".format(code, __end_time - __start_time));
                    except:
                        pass
                                    l2_data_log.l2_time(code, round(time.time() * 1000) - _start_time, "l2数据处理总耗时",
                                                        True)
                    except Exception as e:
                        logging.exception(e)
                elif type == 10:
                    # level2交易队列
                    try:
@@ -154,7 +173,7 @@
                    gpcode_manager.set_gp_list(code_list)
                    # 重新订阅
                    self.server.pipe.send(json.dumps({"type": "resub"}))
                    self.server.pipe_juejin.send(json.dumps({"type": "resub"}))
                    # 同步同花顺目标代码
                    t1 = threading.Thread(target=lambda: sync_target_codes_to_ths())
                    t1.setDaemon(True)
@@ -190,7 +209,7 @@
                elif type == 4:
                    # 行业代码信息
                    dataList = data_process.parseList(_str)
                    data_process.saveIndustryCode(dataList)
                    ths_industry_util.save_industry_code(dataList)
                elif type == 6:
                    # 可用金额
                    datas = data_process.parseData(_str)
@@ -217,6 +236,20 @@
                            volumnUnit = item["volumnUnit"]
                            code_volumn_manager.save_today_volumn(item["code"], volumn, volumnUnit)
                        juejin.accpt_prices(data)
                elif type == 50:
                    data = data_process.parse(_str)["data"]
                    if data is not None:
                        index = data["index"]
                        code_name = data["codeName"]
                        volumn = data["volumn"]
                        time_ = data["time"]
                        code = global_util.name_codes.get(code_name)
                        if code is None:
                            global_util.load_name_codes()
                        code = global_util.name_codes.get(code_name)
                        if code is not None:
                            # 保存数据
                            self.buy1_volumn_manager.save(code, time_, volumn)
                elif type == 30:
                    # 心跳信息
@@ -228,7 +261,9 @@
                    if ths_util.is_ths_dead(client_id):
                        # TODO 重启同花顺
                        # 报警
                        alert_util.alarm()
                        l2_clients = authority.get_l2_clients()
                        if client_id in l2_clients:
                            alert_util.alarm()
                    # print("心跳:", client_id)
                sk.send(return_str.encode())
@@ -250,7 +285,7 @@
    try:
        socketClient.send(json.dumps(data).encode())
        recv = socketClient.recv(1024)
        result = recv.decode().lstrip()
        result = str(recv, encoding="gbk")
        return result
    finally:
        socketClient.close()