Administrator
2023-02-08 3ec79004bd769828c8dc18ed35280f81cfb473ff
server.py
@@ -19,10 +19,10 @@
import gpcode_manager
import authority
import juejin
import l2_data_log
from l2 import l2_data_manager_new, l2_data_manager
from l2 import l2_data_manager_new, l2_data_manager, l2_data_log
import l2_data_util
from l2.cancel_buy_strategy import HourCancelBigNumComputer, L2LimitUpMoneyStatisticUtil
import l2.l2_data_util
import ths_industry_util
import ths_util
@@ -60,6 +60,7 @@
    l2_save_time_dict = {}
    l2_trade_buy_queue_dict = {}
    tradeBuyQueue = l2.transaction_progress.TradeBuyQueue()
    last_time = {}
    def setup(self):
        super().setup()  # 可以不调用父类的setup()方法,父类的setup方法什么都没做
@@ -80,90 +81,101 @@
        # print("- " * 30)
        sk: socket.socket = self.request
        while True:
            data = sk.recv(1024 * 1024 * 20)
            data = sk.recv(1024 * 100)
            if len(data) == 0:
                # print("客户端断开连接")
                break
            _str = str(data, encoding="gbk")
            if len(_str) > 0:
                # print("结果:",_str)
                type = data_process.parseType(_str)
                type = -1
                try:
                    type = data_process.parseType(_str)
                except:
                    print(_str)
                return_str = "OK"
                if type == 0:
                    try:
                        origin_start_time = round(time.time() * 1000)
                        __start_time = round(time.time() * 1000)
                        do_id = random.randint(0, 100000)
                        # level2盘口数据
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2_data_manager.parseL2Data(
                        day, client, channel, code, capture_time, process_time, datas, origin_datas = l2.l2_data_util.parseL2Data(
                            _str)
                        # 间隔1s保存一条l2的最后一条数据
                        if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
                            code] >= 1000 and len(datas) > 0:
                            self.l2_save_time_dict[code] = origin_start_time
                            logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1])
                        if channel == 0:
                            now_time = round(time.time() * 1000)
                            if self.last_time.get(channel) is not None:
                                #print("接受到L2的数据", channel, now_time - self.last_time.get(channel), "解析耗时",now_time - origin_start_time)
                                pass
                        # 10ms的网络传输延时
                        capture_timestamp = __start_time - process_time - 10
                        # print("截图时间:", process_time)
                        __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                           "截图时间:{} 数据解析时间".format(process_time))
                            self.last_time[channel] = now_time
                        cid, pid = gpcode_manager.get_listen_code_pos(code)
                        if True:
                            # 间隔1s保存一条l2的最后一条数据
                            if code not in self.l2_save_time_dict or origin_start_time - self.l2_save_time_dict[
                                code] >= 1000 and len(datas) > 0:
                                self.l2_save_time_dict[code] = origin_start_time
                                logger_l2_latest_data.info("{}#{}#{}", code, capture_time, datas[-1])
                        __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                           "l2获取代码位置耗时")
                        # 判断目标代码位置是否与上传数据位置一致
                        if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                            try:
                                # 校验客户端代码
                                l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                __start_time = round(time.time() * 1000)
                                if gpcode_manager.is_listen(code):
                                    __start_time = l2_data_log.l2_time(code, do_id,
                                                                       round(time.time() * 1000) - __start_time,
                                                                       "l2外部数据预处理耗时")
                                    l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp,
                                                                                     do_id)
                                    __start_time = l2_data_log.l2_time(code, do_id,
                                                                       round(time.time() * 1000) - __start_time,
                                                                       "l2数据有效处理外部耗时",
                                                                       False)
                                    # 保存原始数据数量
                                    l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                    if round(time.time() * 1000) - __start_time > 20:
                                        l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                            "异步保存原始数据条数耗时",
                                                            False)
                            # 10ms的网络传输延时
                            capture_timestamp = __start_time - process_time - 10
                            # print("截图时间:", process_time)
                            __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                               "截图时间:{} 数据解析时间".format(process_time))
                            except l2_data_manager.L2DataException as l:
                                # 单价不符
                                if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
                                    key = "{}-{}-{}".format(client, channel, code)
                                    if key not in self.l2_data_error_dict or round(
                                            time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
                                        # self.l2CodeOperate.repaire_l2_data(code)
                                        # todo 太敏感移除代码
                                        logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
                                        # 单价不一致时需要移除代码重新添加
                                        l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误")
                                        self.l2_data_error_dict[key] = round(time.time() * 1000)
                            cid, pid = gpcode_manager.get_listen_code_pos(code)
                            except Exception as e:
                                print("异常", str(e), code)
                                logging.exception(e)
                                logger_l2_error.error("出错:{}".format(str(e)))
                                logger_l2_error.error("内容:{}".format(_str))
                            finally:
                            __start_time = l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                               "l2获取代码位置耗时")
                            # 判断目标代码位置是否与上传数据位置一致
                            if cid is not None and pid is not None and client == int(cid) and channel == int(pid):
                                try:
                                    # 校验客户端代码
                                    l2_code_operate.verify_with_l2_data_pos_info(code, client, channel)
                                    __start_time = round(time.time() * 1000)
                                    if gpcode_manager.is_listen(code):
                                        __start_time = l2_data_log.l2_time(code, do_id,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2外部数据预处理耗时")
                                        l2_data_manager_new.L2TradeDataProcessor.process(code, datas, capture_timestamp,
                                                                                         do_id)
                                        __start_time = l2_data_log.l2_time(code, do_id,
                                                                           round(time.time() * 1000) - __start_time,
                                                                           "l2数据有效处理外部耗时",
                                                                           False)
                                        # 保存原始数据数量
                                        l2_data_util.save_l2_latest_data_number(code, len(origin_datas))
                                        if round(time.time() * 1000) - __start_time > 20:
                                            l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - __start_time,
                                                                "异步保存原始数据条数耗时",
                                                                False)
                                __end_time = round(time.time() * 1000)
                                # 只记录大于40ms的数据
                                if __end_time - origin_start_time > 100:
                                    l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time,
                                                        "l2数据处理总耗时",
                                                        True)
                                except l2_data_manager.L2DataException as l:
                                    # 单价不符
                                    if l.get_code() == l2_data_manager.L2DataException.CODE_PRICE_ERROR:
                                        key = "{}-{}-{}".format(client, channel, code)
                                        if key not in self.l2_data_error_dict or round(
                                                time.time() * 1000) - self.l2_data_error_dict[key] > 10000:
                                            # self.l2CodeOperate.repaire_l2_data(code)
                                            # todo 太敏感移除代码
                                            logger_l2_error.warning("code-{} l2单价错误:{}", code, l.msg)
                                            # 单价不一致时需要移除代码重新添加
                                            l2_code_operate.L2CodeOperate().remove_l2_listen(code, "l2监听单价错误")
                                            self.l2_data_error_dict[key] = round(time.time() * 1000)
                                except Exception as e:
                                    print("异常", str(e), code)
                                    logging.exception(e)
                                    logger_l2_error.error("出错:{}".format(str(e)))
                                    logger_l2_error.error("内容:{}".format(_str))
                                finally:
                                    __end_time = round(time.time() * 1000)
                                    # 只记录大于40ms的数据
                                    if __end_time - origin_start_time > 100:
                                        l2_data_log.l2_time(code, do_id, round(time.time() * 1000) - origin_start_time,
                                                            "l2数据处理总耗时",
                                                            True)
                    except Exception as e:
                        logger_l2_error.exception(e)
                elif type == 1:
@@ -221,40 +233,45 @@
                elif type == 5:
                    logger_trade_delegate.debug("接收到委托信息")
                    # 交易委托信息
                    dataList = data_process.parseList(_str)
                    if self.last_trade_delegate_data != _str:
                        self.last_trade_delegate_data = _str
                        # 保存委托信息
                        logger_trade_delegate.info(dataList)
                    __start_time = round(time.time() * 1000)
                    try:
                        # 设置申报时间
                        for item in dataList:
                            apply_time = item["apply_time"]
                            if apply_time and len(apply_time) >= 8:
                                code = item["code"]
                                trade_state = trade_manager.get_trade_state(code)
                                # 设置下单状态的代码为已委托
                                if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                                    origin_apply_time = apply_time
                                    apply_time = apply_time[0:6]
                                    apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4], apply_time[4:6])
                                    ms = origin_apply_time[6:9]
                                    if int(ms) > 500:
                                        # 时间+1s
                                        apply_time = tool.trade_time_add_second(apply_time, 1)
                        # 交易委托信息
                        dataList = data_process.parseList(_str)
                        if self.last_trade_delegate_data != _str:
                            self.last_trade_delegate_data = _str
                            # 保存委托信息
                            logger_trade_delegate.info(dataList)
                        try:
                            # 设置申报时间
                            for item in dataList:
                                apply_time = item["apply_time"]
                                if apply_time and len(apply_time) >= 8:
                                    code = item["code"]
                                    trade_state = trade_manager.get_trade_state(code)
                                    # 设置下单状态的代码为已委托
                                    if trade_state == trade_manager.TRADE_STATE_BUY_PLACE_ORDER:
                                        origin_apply_time = apply_time
                                        apply_time = apply_time[0:6]
                                        apply_time = "{}:{}:{}".format(apply_time[0:2], apply_time[2:4],
                                                                       apply_time[4:6])
                                        ms = origin_apply_time[6:9]
                                        if int(ms) > 500:
                                            # 时间+1s
                                            apply_time = tool.trade_time_add_second(apply_time, 1)
                                    print(apply_time)
                    except Exception as e:
                        logging.exception(e)
                                        print(apply_time)
                        except Exception as e:
                            logging.exception(e)
                    try:
                        trade_manager.process_trade_delegate_data(dataList)
                    except Exception as e:
                        logging.exception(e)
                    trade_manager.save_trade_delegate_data(dataList)
                    # 刷新交易界面
                    trade_gui.THSGuiTrade().refresh_data()
                        try:
                            trade_manager.process_trade_delegate_data(dataList)
                        except Exception as e:
                            logging.exception(e)
                        trade_manager.save_trade_delegate_data(dataList)
                        # 刷新交易界面
                        trade_gui.THSGuiTrade().refresh_data()
                    finally:
                        pass
                elif type == 4:
                    # 行业代码信息
@@ -278,28 +295,33 @@
                    buy_one_price = data["buyOnePrice"]
                    buy_one_volumn = data["buyOneVolumn"]
                    buy_queue = data["buyQueue"]
                    buy_queue_result_list = self.tradeBuyQueue.save(code, gpcode_manager.get_limit_up_price(code),
                                                                    buy_one_price, buy_time,
                                                                    buy_queue)
                    if buy_queue_result_list:
                        # 有数据
                        try:
                            buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                decimal.Decimal("0.00"))
                            buy_progress_index = self.tradeBuyQueue.save_traded_index(code, buy_one_price_,
                                                                                      buy_queue_result_list)
                            if buy_progress_index is not None:
                                HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                                                            l2_data_manager.local_today_datas.get(code),
                                                                            l2_data_manager.local_today_num_operate_map.get(
                                                                                code))
                            logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                           buy_progress_index,
                                                           json.dumps(buy_queue_result_list))
                        except Exception as e:
                            print("买入队列", code, buy_queue_result_list)
                            logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{}  数据-{}", code, str(e),
                                                              json.dumps(buy_queue_result_list))
                    if buy_one_price is None:
                        print('买1价没有,', code)
                    limit_up_price = gpcode_manager.get_limit_up_price(code)
                    if limit_up_price is not None:
                        buy_queue_result_list = self.tradeBuyQueue.save(code, limit_up_price, buy_one_price, buy_time,
                                                                        buy_queue)
                        if buy_queue_result_list:
                            # 有数据
                            try:
                                buy_one_price_ = decimal.Decimal(round(float(buy_one_price), 2)).quantize(
                                    decimal.Decimal("0.00"))
                                buy_progress_index = self.tradeBuyQueue.save_traded_index(code, buy_one_price_,
                                                                                          buy_queue_result_list)
                                if buy_progress_index is not None:
                                    HourCancelBigNumComputer.set_trade_progress(code, buy_progress_index,
                                                                                l2.l2_data_util.local_today_datas.get(
                                                                                    code),
                                                                                l2.l2_data_util.local_today_num_operate_map.get(
                                                                                    code))
                                logger_l2_trade_buy_queue.info("获取成交位置成功: code-{} index-{}  数据-{}", code,
                                                               buy_progress_index,
                                                               json.dumps(buy_queue_result_list))
                            except Exception as e:
                                logging.exception(e)
                                print("买入队列", code, buy_queue_result_list)
                                logger_l2_trade_buy_queue.warning("获取成交位置失败: code-{} 原因-{}  数据-{}", code, str(e),
                                                                  json.dumps(buy_queue_result_list))
                    # buy_queue是否有变化
                    if self.l2_trade_buy_queue_dict.get(code) is None or buy_queue != self.l2_trade_buy_queue_dict.get(
@@ -318,7 +340,7 @@
                                l2_data_manager_new.L2TradeDataProcessor.cancel_buy(code, cancel_msg, "trade_queue")
                            if need_sync:
                                # 同步数据
                                L2LimitUpMoneyStatisticUtil.verify_num(0, code, int(buy_one_volumn), buy_time)
                                L2LimitUpMoneyStatisticUtil.verify_num(code, int(buy_one_volumn), buy_time)
                    # print(buy_time, buy_one_price, buy_one_volumn)
                    # print("L2买卖队列",datas)