RabbitMQ

发布时间 2023-12-13 11:15:06作者: 西芹-小汤圆

简介

作用

  1. 流量消峰:相当于等待队列。
  2. 应用解耦:当子系统出现故障,该系统的要处理的信息被缓存在消息队列中,待修复完成后即可恢复。
  3. 异步处理。

四大核心概念

  1. 生产者:产生数据发送消息的程序。
  2. 交换机:一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。
  3. 队列:队列是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。
  4. 消费者:消费者大多时候是一个等待接收消息的程序。

常用命令

命令 作用
/sbin/service rabbitmq-server start status stop 启动、查看状态、停止
chkconfig rabbitmq-server on 添加开机自启动
systemctl stop firewalld 暂时关闭防火墙
systemctl status firewalld 查看防火墙状态

实战

RabbitMQ在启动的时候可能会出现ativating (auto-restart)的状态,此时不要着急,等待一段时间后会成功启动。经常出现问题可以把开机自启动关掉,然后手动开启。

RabbitMQ的后台默认端口是15672,如果访问不了大概率是防火墙的问题。

IDEA中写代码连接RabbitMQ的后台的端口是5762。

在信道首次使用时,需要先启动生产者新建信道。

需要关闭防火墙,idle中的代码才能发送消息。

Hello World

生产者

public class Producer {
    //队列名称
    public static final String QUEUE_NAME = "hello";

    //发消息
    public static void main(String[] args) throws  Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //工厂IP,连接RabbitMQ队列
        factory.setHost("xxx.xxx.xxx.xxx");
        //设置RabbitMQ的用户名,密码
        factory.setUsername("xxx");
        factory.setPassword("xxx");

        //创建连接
        Connection connection = factory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发消息,需要将消息转换成二进制发送
        String message = "hello world";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");

    }
}

消费者

public class Consumer {
    //队列名称
    public static final String QUEUE_NAME = "hello";
    //接收消息
    public static void main(String[] args) throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xxx.xxx.xxx.xxx");
        factory.setUsername("xxx");
        factory.setPassword("xxx");
        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        //声明如何处理消息
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println(new String(message.getBody()));
        };
        //取消消息时的回调
        CancelCallback cancelCallback = consumerTag ->{
            System.out.println("消息消费被中断");
        };

        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

Work Queues

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

消息应答

消息应答:消费者在接收到消息并且处理该消息之后,告诉rabbitmq它己经处理了,rabbitmq可以把该消息删除了。

自动应答在接收时则进行消息应答,仅适用于消费者可以高效并以某种速率能够处理这些消息的时候使用。

手动应答的好处是可以批量应答并且减少网络拥堵,但一般不使用批量应答。

消息手动应答

DeliverCallback deliverCallback = (consumerTag,message) -> {
    SleepUtils.sleep(1);
    System.out.println("接收到的消息" + new String(message.getBody(),"UTF-8"));
    //手动应答
    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};

持久化

队列实现持久化只需要在声明队列的时候把durable参数设置为持久化。但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个特久化的队列,不然就会出现错误。

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。

不公平分发采取能者多劳的方式分发,只需在消费者端设置channel.basicQos(1);

生产者

public class Task2 {
    public static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        boolean durable = true;//设置队列持久化
        channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String message = scanner.next();
            //设置生产者发送消息为持久化消息,要求保存在磁盘上
            channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生成者发出消息" + message);
        }
    }
}

发布确认

发布确认是队列将信息保存在磁盘后,发信息告诉生产者信息已经持久化。

发布确认:单个确认,批量确认和异步确认,常用的是异步确认。

处理异步未确认信息的最好解决方案是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。

异步确认

public static void publishMessageAsync() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName,true,false,false,null);
    channel.confirmSelect();//开启发布确认
    //记录信息是否确认的线程安全有序的哈希表
    ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();

    //准备消息的监听器,监听信息成功与否
    ConfirmCallback ackCallback = (deliveryTag,multiple) ->{
        if(multiple){
            //删除已经确认的消息
            //返回的是小于等于当前序列号的未确认消息 是一个 map
            ConcurrentNavigableMap<Long, String> confirmed =
                    outstandingConfirms.headMap(deliveryTag);
            //清除该部分未确认消息
            confirmed.clear();
        }else {
            outstandingConfirms.remove(deliveryTag);
        }

    };
    ConfirmCallback nackCallback = (deliveryTag,multiple) ->{
        String message = outstandingConfirms.get(deliveryTag);
        System.out.println("未确认的消息" + deliveryTag + message);
    };
    channel.addConfirmListener(ackCallback,nackCallback);
    long begin = System.currentTimeMillis();
    for (int i = 0; i < MESSAGE_COUNT; i++) {
        String message = "消息" + i;
        channel.basicPublish("", queueName, null, message.getBytes());
        //记录所有要发送的消息
        outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
    }

    long end = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end-begin) + "ms");
}

交换机

交换机可以将消息传递给多个消费者,这种模式称为“发布/订阅”模式。

RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。生产者只能将消息发送到交换机。

交换机有:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)四种模式。

生产者关注交换机和路由键、消费者关心队列名、交换机、路由键的绑定。

发送到类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd,nyse","nyse.vmw”。

扇出

生产者

public class EmitLog {

    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        Scanner scanner = new Scanner(System.in);

        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
            System.out.println("生产消息" + message);
        }
    }
}

消费者

public class ReceiveLogs01 {

    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//声明一个交换机
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机,使用默认的key值
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("等待接收消息");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("01控制台打印接收到的消息"+message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

死信队列

死信的产生:消息被拒绝、信息TTL过期、队列达到最大长度。

修改队列信息后,需要将原本的队列删除后才能正常运行。

消费者

public class Consumer01 {

    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String DEAD_EXCHANGE = "dead_exchange";
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        Map<String,Object> arguments = new HashMap<>();
        //过期时间设置,正常队列设置死信交换机
//        arguments.put("x-message-ttl",10000);
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key","lisi");
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接收消息");

        DeliverCallback deliverCallback = (consumerTag,message) -> {
            System.out.println("consumer01接收的消息" + new String(message.getBody(),"UTF-8"));
        };

        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumerTag ->{});
    }
}

生产者

public class Producer {
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        for (int i = 0; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
            System.out.println("dayin" + i);
        }
    }
}

延迟队列

延迟队列用于存放需要在指定时间被处理的元素的队列。

延迟队列本质上就是死信队列中的消息TTL过期。

Spring报错'BootFailed to start bean ‘documentationPluginsBootstrapper'

基于死信队列存在的问题:RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。解决方法是使用RabbitMQ插件rabbitmq_delayed_message_exchange来实现延迟队列。安装成功后会新增一个x-delayed-message类型的交换机,此时在交换机进行延迟,等TTL过期后再给队列。

原始方式

配置类代码

@Configuration
public class TtlQueueConfig {

    //普通交换机的名称
    public static final String X_EXCHANGE = "X";
    //死信交换机的名称
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通队列的名称
    public static final String QUEUE_A = "QA";

    //声明xExchange
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    //声明队列
    @Bean("queueA")
    public Queue queueA(){
        Map<String,Object> arguments = new HashMap<>(3);
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        arguments.put("x-dead-letter-routing-key","YD");
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }
    
    //绑定关系
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange")DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
}

生产者

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //开始发消息
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},发送一条信息给TTL队列:{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10s的队列"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40s的队列"+message);
    }
}

消费者

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    //接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception{
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
    }
}

基于插件方式

配置类

@Configuration
public class DelayedQueueConfig {
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //自定义交换机 我们在这里定义的是一个延迟交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        //自定义交换机的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
                args);
    }

    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
                                       @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

生产者

@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){
    log.info("当前时间:{},发送一条时长为{}毫秒的信息给延迟队列:{}",new Date().toString(),delayTime,message);
    rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
            DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg ->{
        msg.getMessageProperties().setDelay(delayTime);
        return msg;
    });
}

消费者

@Slf4j
@Component
public class DelayQueueConsumer {
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayQueue(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
    }
}

高级篇

发布确认

发布确认高级篇要解决的问题是当RabbitMQ出现问题时,交换机和队列都不存在的消息发送问题,我们可以使用缓存机制来解决改为问题。

生产者

回调接口中的参数CorrelationData是由生产者发送的时候写入的。

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,"key1",message,correlationData);
        log.info("发送消息内容为:{}",message);
    }
}

回调接口和消息回退接口

实现RabbitTemplate内部的ConfirmCallback接口,由于是内容接口,因此需要将该接口注入到RabbitTemplate。

开启回调接口需要在配置文件中设定spring.rabbitmq.publisher-confirm-type=correlated

回退消息发生在生产者已经成功将信息交给交换机,而交换机没有找到对应的routingkey的队列,默认情况下交换机会直接将消息丢弃。

开启回退消息,需要在配置文件中增加spring.rabbitmq.publisher-returns=true

#@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{

    //将实现的接口注入
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }
    //交换机确认回调方法
    @Override
    public void confirm(CorrelationData correlationData,boolean ack,String cause){
        String id = correlationData !=null ? correlationData.getId():"";
        if(ack){
            log.info("交换机已经收到 id 为:{}的消息",id);
        }else{
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
        }
    }


    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("消息已被回退");
    }
}

其他内容

MQ消费者的幂等性(重复提交问题)的解决一般使用全局ID或者唯一标识符来解决。另一种思路是使用Redis的原子性进行解决。

RabbitMQ可以开启优先级队列。

惰性队列会尽可能的将消息存储在磁盘中,一般用于消费者长时间不能消费的场景下。