Administrator
2022-09-08 e7f8c6013d777dd5ba10b8d548d2d3db6158d37a
server.py
@@ -2,6 +2,7 @@
import logging
import socketserver
import socket
import threading
import time
import data_process
@@ -66,58 +67,60 @@
                        __start_time = round(time.time() * 1000)
                        # level2盘口数据
                        day, client, channel, code, datas = l2_data_manager.parseL2Data(_str)
                        cid, pid = gpcode_manager.get_listen_code_pos(code)
                        # 判断目标代码位置是否与上传数据位置一致
                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                            try:
                                # print("L2数据接受",day,code,len(datas))
                                # 查询
                                code_ = gpcode_manager.get_listen_code_by_pos(client, channel)
                                if code_ != code:
                                    key = "{}-{}-{}".format(client, channel, code)
                        try:
                            # print("L2数据接受",day,code,len(datas))
                            # 查询
                            code_ = gpcode_manager.get_listen_code_by_pos(client, channel)
                            if code_ != code:
                                key = "{}-{}-{}".format(client, channel, code)
                                    # 间隔2s
                                    if key not in self.reset_code_dict or round(
                                            time.time() * 1000) - self.reset_code_dict[key] > 2000:
                                # 间隔2s
                                if key not in self.reset_code_dict or round(
                                        time.time() * 1000) - self.reset_code_dict[key] > 2000:
                                        self.l2CodeOperate.set_operate_code_state(client, channel, 0)
                                        self.reset_code_dict[key] = round(time.time() * 1000)
                                        if code_ is None:
                                            code_ = ""
                                        if tool.is_trade_time():
                                            self.l2CodeOperate.repaire_operate(int(client), int(channel), code_)
                                else:
                                    key = "{}-{}".format(client, channel)
                                    if key not in self.set_operate_code_state_dict or round(
                                            time.time() * 1000) - self.set_operate_code_state_dict[key] > 1000:
                                        self.set_operate_code_state_dict[key] = round(time.time() * 1000)
                                        self.l2CodeOperate.set_operate_code_state(client, channel, 1)
                                    self.l2CodeOperate.set_operate_code_state(client, channel, 0)
                                    self.reset_code_dict[key] = round(time.time() * 1000)
                                    if code_ is None:
                                        code_ = ""
                                    if tool.is_trade_time():
                                        self.l2CodeOperate.repaire_operate(int(client), int(channel), code_)
                            else:
                                key = "{}-{}".format(client, channel)
                                if key not in self.set_operate_code_state_dict or round(
                                        time.time() * 1000) - self.set_operate_code_state_dict[key] > 1000:
                                    self.set_operate_code_state_dict[key] = round(time.time() * 1000)
                                    self.l2CodeOperate.set_operate_code_state(client, channel, 1)
                                if gpcode_manager.is_listen(code):
                                    l2_data_manager.process_data(code, datas)
                            except l2_data_manager.L2DataException as l:
                                # 单价不符
                                if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
                                    key = "{}-{}-{}".format(client, channel, code)
                                    if key not in self.l2_data_error_dict or round(
                                            time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
                                        # self.l2CodeOperate.repaire_l2_data(code)
                                        # todo 太敏感移除代码
                                        logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
                                        # 单价不一致时需要移除代码重新添加
                                        l2_code_operate.L2CodeOperate().remove_l2_listen(code)
                                        self.l2_data_error_dict[key] = round(time.time() * 1000)
                            if gpcode_manager.is_listen(code):
                                l2_data_manager.process_data(code, datas)
                        except l2_data_manager.L2DataException as l:
                            # 单价不符
                            if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
                                key = "{}-{}-{}".format(client, channel, code)
                                if key not in self.l2_data_error_dict or round(
                                        time.time() * 1000) - self.l2_data_error_dict[key] > 2000:
                                    self.l2CodeOperate.repaire_l2_data(code)
                                    self.l2_data_error_dict[key] = round(time.time() * 1000)
                        except Exception as e:
                            print("异常", str(e))
                            logging.exception(e)
                            logger_l2_error.error("出错:{}".format(str(e)))
                            logger_l2_error.error("内容:{}".format(_str))
                        finally:
                            __end_time = round(time.time() * 1000)
                            # 只记录大于40ms的数据
                            if __end_time - __start_time > 40:
                                logger_l2_process.info("l2处理时间:{}-{}".format(code, __end_time - __start_time));
                            except Exception as e:
                                print("异常", str(e))
                                logging.exception(e)
                                logger_l2_error.error("出错:{}".format(str(e)))
                                logger_l2_error.error("内容:{}".format(_str))
                            finally:
                                __end_time = round(time.time() * 1000)
                                # 只记录大于40ms的数据
                                if __end_time - __start_time > 40:
                                    logger_l2_process.info("l2处理时间:{}-{}".format(code, __end_time - __start_time));
                    except:
                        pass
                elif type == 10:
                    # level2交易队列
                    try:
@@ -126,7 +129,6 @@
                            data_process.saveL2Data(day, code, setData)
                    except:
                        print("异常")
                elif type == 1:
                    # 设置股票代码
                    data_list = data_process.parseGPCode(_str)
@@ -138,7 +140,10 @@
                    gpcode_manager.set_gp_list(code_list)
                    # 重新订阅
                    self.server.pipe.send(json.dumps({"type": "resub"}))
                    sync_target_codes_to_ths()
                    # 同步同花顺目标代码
                    t1 = threading.Thread(target=lambda: sync_target_codes_to_ths())
                    t1.setDaemon(True)
                    t1.start()
                elif type == 2:
                    # 涨停代码
                    codeList = data_process.parseGPCode(_str)
@@ -185,17 +190,18 @@
                            {"code": 0, "data": {"client": int(client_id), "authoritys": json.loads(_authoritys)}})
                    except Exception as e:
                        return_str = data_process.toJson({"code": 1, "msg": str(e)})
                # 现价更新
                elif type == 40:
                    data = data_process.parse(_str)["data"]
                    for item in data:
                        juejin.accpt_price(item["code"], float(item["price"]))
                    if data is not None:
                        print("现价数量", len(data))
                        for item in data:
                            juejin.accpt_price(item["code"], float(item["price"]))
                elif type == 30:
                    data = data_process.parse(_str)["data"]
                    client_id = data["client"]
                    if "memery" in data:
                        mem = data["memery"]
                        logger_device.info("({})内存使用率:{}".format(client_id, mem))
                    logger_device.info("({})客户端信息:{}".format(client_id, json.dumps(data)))
                    data_process.saveClientActive(int(client_id), host)
                    # print("心跳:", client_id)
@@ -235,7 +241,8 @@
                send_msg(client, {"action": "test"})
            except:
                pass
        # 矫正客户端代码
        l2_code_operate.correct_client_codes()
        time.sleep(5)