RockerMq发送消息之顺序消息

发布时间 2023-09-23 17:40:35作者: 自学Java笔记本

顺序消息

        消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

        顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

        下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

image

消费者监听消息:会独立的开出一个线程去消费,例如独立开出一个线程去消费张三的队列消息。

创建一个订单类:用于模拟业务

package com.zgf.mq.rocketmq.order;

import java.util.ArrayList;
import java.util.List;

/**
 * 订单的步骤
 */
public class OrderStep {
    private long orderId;
    private String desc;

    public long getOrderId() {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString() {
        return "OrderStep{" +
                "orderId=" + orderId +
                ", desc='" + desc + '\'' +
                '}';
    }


    /**
     * 生成模拟订单数据
     */
    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();
        /**
         *  15103111039L: 创建、付款、推送、完成
         *  15103111065L:创建、付款、完成
         *  15103117235L:创建、付款、完成
         */
        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

生产者代码:


/**
 * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        // 1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 2.指定NameServer地址
        producer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 3.启动producer
        producer.start();
        // 构建消息集合
        List<OrderStep> orderSteps = OrderStep.buildOrders();
        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        // 发送消息
        for (int i = 0;i<orderSteps.size();i++) {
            // 需要发送的消息实体
            String body = dateStr+" Hello RocketMQ " + orderSteps.get(i);
            Message message = new Message("OrderTopic","Order",""+i,body.getBytes());
            /**
             * 参数1:消息对象
             * 参数2:MessageQueueSelector 消息队列的选择器
             * 参数3:选择队列的业务标识(订单id)
             */
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                /**
                 * 来实现消息的自定义路由逻辑,以确保消息被按照特定的规则发送到合适的队列。这对于需要发送顺序消息或者自定义消息路由的场景非常有用。
                 * @param mqs 一个包含了所有可用消息队列的列表。你可以在这个列表中选择一个队列来发送消息。
                 * @param msg 要发送的消息对象。
                 * @param arg 自定义参数,可以用于根据业务逻辑来选择消息队列。通常情况下,你可以使用这个参数来实现选择队列的逻辑。
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 这里的arg,就是我们的 orderSteps.get(i).getOrderId()
                    Long orderId = (Long) arg;
                    // 根据订单id选择要发送的queue
                    long index = orderId % mqs.size(); // 例如 1 % 10  =1(可用的队列,即1号队列)
                    return mqs.get((int) index);
                }
            }, orderSteps.get(i).getOrderId());// 订单id,没有特殊的情况,都是唯一的
            System.out.println("发送结果:"+sendResult);
        }
        producer.shutdown(); // 关闭
    }
}

这里引入了新的内容:MessageQueueSelector接口,该接口注释也写的很清楚,就不介绍了。

消费者代码:

/**
 * 顺序消息的消费
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者Consumer,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 3.订阅主题Topic和Tag
        consumer.subscribe("OrderTopic","*");//消费Tag1中的消息,通过 || xxx 可以消费多个消息,通过*可以消费OrderTopic下的所有消息
		 /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 4.设置回调函数,处理消息,用于监听消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            /**
             * 如果消耗失败,不建议抛出异常,而不是返回ConsumerOrderlyStatus.SSUSPEND_CURRENT_QUEUE_A_MOMEN
             * @param msgs    msgs.size() >= 1 <br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
             * @param context
             * @return 消息状态
             */
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("线程:【"+Thread.currentThread().getName()+"】: 消费消息:"+new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        // 5.启动消费者consumer
        consumer.start();
    }
}

结果:
image
从上面的结果,我们可以看出来实现了分区有序,即一个线程只完成唯一标识的订单消息。

顺序消息缺陷

  • 消费顺序消息的并行度依赖于队列的数量 ;
  • 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题;
  • 遇到消息失败的消息,无法跳过,当前队列消费暂停;