springboot整合rocketMQ——消费者

发布时间 2023-09-14 22:36:56作者: Mr-v

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.xt</groupId>
    <artifactId>c-rocketmq-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>c-rocketmq-consumer</name>
    <description>c-rocketmq-consumer</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

 

application.yml

server:
  port: 8081
rocketmq:
  name-server: 192.168.126.128:9876

 

注册监听器

TestConsumerListener

package com.xt.listener;

import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "boot-test-topic",consumerGroup = "boot-test-consumer-group")
public class TestConsumerListener implements RocketMQListener<MessageExt> {

    /**
     * 如果泛型指定的了固定的类型,那么消息体就是泛型类型
     * 泛型为MessageExt,可以获取到消息的所有内容,包括消息体和消息头
     *
     * 方法执行成功则消息自动签收,失败了拒收就会重试
     */
    @Override
    public void onMessage(MessageExt message) {
        System.out.println(message);
        System.out.println("消息体:"+new String(message.getBody()));
    }
}