MQ【消息延迟解决方案】

发布时间 2023-11-02 15:11:18作者: 木乃伊人

一、消息延迟如何监控

       1、消息队列提供的工具,通过监控消息的堆积来完成。

       2、通过生产监控消息对消息延时的监控。

二、详情

     /2.1、消息队列工具

           以kafka为例。不用版本消费者的消费进度不一样。

           在 Kafka0.9 之前的版本中,消费进度是存储在 ZooKeeper 中的,消费者在消费消息的时候先要从 ZooKeeper 中获取最新的消费进度,再从这个进度的基础上消费后面的消息。          

           在 Kafka0.9 版本之后,消费进度被迁入到 Kakfa 的一个专门的 topic 叫“__consumer_offsets”里面。

当然,作为一个成熟的组件,Kafka 也提供了一些工具来获取这个消费进度的信息帮助我们实现自己的监控,这个工具主要有两个:

           (1)、Kafka 提供了工具叫做“kafka-consumer-groups.sh”(它在 Kafka 安装包的 bin 目录下)。

  1. 前两列是队列的基本信息,包括topic名和分区名。
  2. 第三列是当前消费者的消费进度。
  3. 第四列是当前生产消息的总数。
  4. 第五列就是消费消息的堆积数(也就是第四列与第三列的差值)。

           (2)、JMX

                Kafka 通过 JMX 暴露了消息堆积的数据,然后我们就可以通过写代码将这个堆积数据发送到我们的监控系统中去。

       2.2、自己生成消息监控

            可以自定义一种特殊的消息,然后启动一个监控程序将消息定时循环的写入到消息队列中,这个消息可以是生成一个时间戳。同时这个消息是可以被消费者消费的,当业务消费到的时候就将其丢弃,而监控程序消费这个消息是,就将其生成时间和消费时间进行对比,如果超过了我们预设的一定阈值就像我们报警。

   建议:上面两种方式都是可以监控消息延迟的,但是在实际生产中,这里推荐将他们两者进行结合来使用,比如,我们先可以在监控程序中通过JMX获取消息堆积数据,然后发送到我们的dashboard 中;同时起一个探测进程确认消息的延迟情况是怎样的。

  三、减少消息延迟

         3.1、消费端

  1. 通过优化消费代码来提升性能。
  2. 增加消费者的数量 。

                 不过第二种方式并不是对于所有的消费队列有效的,它是受消费队列限制的,比如Kafka 是不能通过增加消费者数量来提升消费性能的。

               因为, 在 Kafka 中,一个 Topic 可以配置多个 Partition,数据会被平均或者按照生产者指定的方式写入到多个分区中,那么在消费的时候,Kafka 约定一个分区只能被一个消费者消费,为什么要这么设计呢?在我看来,如果有多个 consumer(消费者)可以消费一个分区的数据,那么在操作这个消费进度的时候就需要加锁,可能会对性能有一定的影响。

              所以,分区数量决定了消费的并行度,增加多余的消费者也是没有用处的,你可以通过增加分区来提高消费者的处理能力。

            既然如此,那我们在不增加分区的情况下该怎么去提升消费性能呢?

             我们虽然不能增加消费者,但是我们可以在消费者使用并行处理。所以我们就可以考虑使用多线程的方式来增加处理能力:

  1.  预先创建一个或者多个线程池;
  2.  拉取到消息丢到线程池中进行异步处理,将串行的消息消费变成了并行的;
  3.  不仅提高了吞吐量,还可以一次消费多拉取一些消息,分配给多个线程来处理 ;

       

     3.2、消息队列本身

             两个关键点:消息存储、零拷贝技术。

           【消息存储】:应当使用本地磁盘作为存储介质。Page Cache 的存在就可以提升消息的读取速度,即使要读取磁盘中的数据,由于消息的读取是顺序的并且不需要跨网络读取数据,所以读取消息的 QPS 肯定是比普通数据库高很多很多。

          【零拷贝技术】:说是零拷贝,其实我们不可能消灭数据的拷贝,只是尽量减少拷贝的次数。在读取消息队列的数据的时候,其实就是把磁盘中的数据通过网络发送给消费客户端,在实现上会有四次数据拷贝的步骤:

                         1、数据从磁盘拷贝到内核缓冲区;

                         2、系统调用将内核缓存区的数据拷贝到用户缓冲区;

                         3、用户缓冲区的数据被写入到 Socket 缓冲区中;

                         4、操作系统再将 Socket 缓冲区的数据拷贝到网卡的缓冲区中。

                  

 操作系统提供了 Sendfile 函数可以减少数据被拷贝的次数。使用了 Sendfile 之后,在内核缓冲区的数据不会被拷贝到用户缓冲区而是直接被拷贝到 Socket 缓冲区,节省了一次拷贝的过程提升了消息发送的性能。高级语言中对于 Sendfile 函数有封装,比如说在 Java 里面的 java.nio.channels.FileChannel 类就提供了 transferTo 方法提供了 Sendfile 的功能。

      队列经常会用在我们项目当中,做好数据堆积监控是关键。