MQ消息队列

发布时间 2023-03-28 17:24:11作者: 中亿丰数字科技

MQ,消息队列,存储消息的中间件。
分布式系统通信两种方式:直接远程调用和借助第三方完成间接通信
发送方称为生产者,接收方称为消费者

MQ的优势

1.应用解耦

提高系统容错性和可维护性

使用MQ后,消息通过中间件转发,消费者从MQ中取消息,如果库存系统出现异常,等库存系统自我修复后再去MQ中取消息,不会影响其他系统。添加新系统也是去MQ中取,不用修改订单系统的代码。

2.异步提速

提升了用户体验和系统吞吐量

使用MQ,一个订单只需要25ms,提升用户体验和系统的吞吐量(吞吐量:单位时间内处理请求的数目)

3.削峰填谷

提高系统稳定性

使用了MQ之后,限制消费信息速度为1000,这样一来,高峰期产生的数据势必会被积压再MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。

MQ的劣势

1.系统可用性降低

系统引入的外部依赖越多,系统的稳定性越差,一旦MQ宕机,就会对业务造成影响,如何保证MQ的高可用?
如何保证MQ的高可用?
镜像集群模式(高可用性)
跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。
如何开启:
RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

2.系统复杂度提高

MQ的加入大大的增加了系统的复杂度,以前系统间是同步的远程调用,限制是通过MQ进行异步调用,如何保证消息没有被重复消费?怎么处理消息丢失情况?怎么保证消息传递的顺序性?

1)如何保证消息没有被重复消费?

要保证消息不被重复消费,其实就是要保证消息消费时的幂等性。幂等性:无论你重复请求多少次,得到的结果都是一样的。例如:一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。 那么如何保证幂等性呢?
1.写数据时,先根据主键查一下这条数据是否存在,如果已经存在则 update;
2.数据库的唯一键约束也可以保证不会重复插入多条,因为重复插入多条只会报错,不会导致数据库中出现脏数据;
3.如果是写 redis,就没有问题,因为 set 操作是天然幂等性的。

(2)怎么处理消息丢失情况?

1、生产者可靠性投递:开启confirm模式,一旦生产者传递到Rabbitmq时,rabbitmq就发送一个确认消息给生产者,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。
通过 channel.confirmSelect() 开启发送方确认模式
2、消息持久化:message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给消费端。所有需要给exchange、queue和message都进行持久化
exchange持久化:

# 第三个参数true表示这个exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

queue持久化:

# 第二个参数true表示这个queue持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

message持久化:

# 第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessagePropertis.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

3、消费端消息不丢失:默认RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。所以将自动ack机制改为手动ack机制。
4、极端情况下:消息补偿机制----> 消息入库
首先发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示生产端将消息发送给了RabbitMQ但还没收到确认;在生产端收到确认后将status设为1,表示RabbitMQ已收到消息。这里有可能会出现上面说的两种情况,所以生产端这边开一个定时器,定时检索消息表,将status=0并且超过固定时间后(可能消息刚发出去还没来得及确认这边定时器刚好检索到这条status=0的消息,所以给个时间)还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性),可能重发还会失败,所以可以做一个最大重发次数,超过就做另外的处理。

(3)怎么保证消息传递的顺序性?

拆分多个 Queue,每个 Queue一个 Consumer,就是多一些 Queue 而已,确实是麻烦点;或者就一个 Queue 但是对应一个 Consumer,然后这个 Consumer 内部用内存队列做排队,然后分发给底层不同的 Worker 来处理。

作者:钱煜