安装
1.下载erlang并安装,地址:http://erlang.org
2.下载mq并安装,地址:http://www.rabbitmq.com/download.html
备注:以下内容涉及的队列和交换机,以及绑定关系都是在管理后台进行的操作
一、直接发送消息到队列,不经过交换机
1、简单队列,只有一个消费者
/**
* 简单队列
*/
@Test
public void testSimpleQueue(){
String queueName ="simple.queue";
String message ="testSimpleQueue";
rabbitTemplate.convertAndSend(queueName,message);
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者收到消息====================="+msg);
}
2、工作队列,多个消费者
/**
* 工作队列
*/
@Test
public void testWorkQueue() throws InterruptedException {
String queueName ="work.queue";
String message ="testWorkQueue";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName,message+i);
Thread.sleep(20);
}
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String msg) {
System.out.println("消费者work.queue 1收到消息====================="+msg);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.out.println("消费者work.queue 2收到消息====================="+msg);
Thread.sleep(100);
}
每个消息只会被消费一次,默认情况下消费者轮流消费,可以通过设置spring.rabbitmq.listener.simple.prefetch=1来确保同一时刻最多投递给消费者1条消息,从而避免消息堆积
二、消息经过交换机
交换机的作用是对消息进行路由,但不具备保存消息的能力。
1.Fanout交换机
会将接收到的消息广播到每一个跟其绑定的队列,所以也叫广播模式
举例,交换机和队列进行绑定如下,
/**
* fanout交换机
*/
@Test
public void testSimpleQueue(){
String exchangeName ="test.fanout";
String message ="hello,everyone!";
rabbitTemplate.convertAndSend(exchangeName,null,message);
}
@RabbitListener(queues = "fanout.queue")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者fanout.queue 收到消息====================="+msg);
}
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者fanout.queue 1收到消息====================="+msg);
}
结果:
消费者fanout.queue 收到消息=====================hello,everyone!
消费者fanout.queue 1收到消息=====================hello,everyone!
2.Direct交换机
Direct交换机会将接收到的消息根据规则路由到指定的队列,因此称为定向路由。
每一个队列都与交换机设置一个BindingKey;
发布者发送消息时,指定消息的RoutingKey;
交换机将消息路由到BingingKey与消息RoutingKey一致的队列;
举例,交换机与队列的BindingKey如下,
@Test
public void testDirectQueue(){
String exchangeName ="test.direct";
String message ="hello!";
rabbitTemplate.convertAndSend(exchangeName,"red",message);
rabbitTemplate.convertAndSend(exchangeName,"blue",message);
}
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {
System.out.println("消费者direct.queue1 收到消息====================="+msg);
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {
System.out.println("消费者direct.queue2 收到消息====================="+msg);
}
结果:
消费者direct.queue1 收到消息=====================hello!
消费者direct.queue2 收到消息=====================hello!
消费者direct.queue1 收到消息=====================hello!
3.Topic交换机
与Direct交换机类似,区别在于RoutingKey可以是多个单词的列表,以.分隔。BindingKey可以使用通配符。#代指0个或多个单词,*代指一个单词(比如routingKey是多个单词,就不匹配)
举例,当交换机与队列的BindingKey如下时,
@Test
public void testTopicQueue(){
String exchangeName ="test.topic";
rabbitTemplate.convertAndSend(exchangeName,"japan.news","日本红色预警!");
rabbitTemplate.convertAndSend(exchangeName,"china.news","中国蓝色预警!");
rabbitTemplate.convertAndSend(exchangeName,"china.weather","阳光明媚!");
}
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) {
System.out.println("消费者topic.queue1 收到消息====================="+msg);
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) {
System.out.println("消费者topic.queue2 收到消息====================="+msg);
}
结果:
消费者topic.queue2 收到消息=====================日本红色预警!
消费者topic.queue1 收到消息=====================中国蓝色预警!
消费者topic.queue2 收到消息=====================中国蓝色预警!
消费者topic.queue1 收到消息=====================阳光明媚!