Apache Rocket MQ 组件 (泛型应用)

发布时间 2023-08-03 19:51:50作者: zno2

 一、实现

 

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import com.alibaba.fastjson.JSONObject;

import lombok.CustomLog;

/**
 * @author witas
 *
 */
@CustomLog
public class ApacheRocketMqManager {
    
    //思路:class定义业务对象,实现特定接口
    // 组名利用对象class名
    // 消费时仅需实现listener
    // 生产直接发送类对象 
    // 消费对象中通用的消费次数等信息 

    private static final Map<String, DefaultMQPushConsumer> CONSUMER_MAP = new ConcurrentHashMap<>();
    private static final Map<String, DefaultMQProducer> PRODUCER_MAP = new ConcurrentHashMap<>();

    private static void bindingConsumerBusi(String busi, String namesrvAddr,
            MessageListenerConcurrently listener) {
        if(busi != null) {
            busi = busi.toUpperCase();
        }
        if(CONSUMER_MAP.containsKey(busi)) {
            return;
        }
        if (busi == null || busi.trim().length() < busi.length()) {
            throw new IllegalArgumentException("业务标识[" + busi + "]格式错误");
        }
        if (StringUtils.isBlank(namesrvAddr) || namesrvAddr.split(":").length != 2 || namesrvAddr.contains("/")) {
            throw new IllegalArgumentException("namesrvAddr[" + busi + "]格式错误,必须是<域名/IP>:<端口>,例如 localhost:9876");
        }
        String env = System.getProperty("env");

        if (StringUtils.isBlank(env)) {
            env = "DEFAULT";
        } else {
            env = env.toUpperCase();
        }
        String consumerGroup = env + "_CONSUMER_GROUP_" + busi;
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setConsumeMessageBatchMaxSize(1);
        consumer.setMaxReconsumeTimes(3);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.registerMessageListener(listener);
        String topic = env + "_TOPIC_" + busi;
        try {
            consumer.subscribe(topic, env + "_TAG_" + busi);
            consumer.start();
        } catch (MQClientException e) {
            throw new RuntimeException("",e);
        }
        log.info("主题[{}]消费者[{}]已注册", topic, consumerGroup);
        CONSUMER_MAP.put(busi, consumer);
    }

    private static void bindingProducerBusi(String busi, String namesrvAddr) {
        if(busi != null) {
            busi = busi.toUpperCase();
        }
        if(PRODUCER_MAP.containsKey(busi)) {
            return;
        }
        if (busi == null || busi.trim().length() < busi.length()) {
            throw new IllegalArgumentException("业务标识[" + busi + "]格式错误");
        }
        if (StringUtils.isBlank(namesrvAddr) || namesrvAddr.split(":").length != 2 || namesrvAddr.contains("/")) {
            throw new IllegalArgumentException("namesrvAddr[" + busi + "]格式错误,必须是<域名/IP>:<端口>,例如 localhost:9876");
        }
        String env = System.getProperty("env");

        if (StringUtils.isBlank(env)) {
            env = "DEFAULT";
        } else {
            env = env.toUpperCase();
        }
        String producerGroup = env + "_PRODUCER_GROUP_" + busi;
        String topic = env + "_TOPIC_" + busi;
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        producer.setNamesrvAddr(namesrvAddr);
        try {
            producer.start();
        } catch (MQClientException e) {
            throw new RuntimeException("",e);
        }
        log.info("主题[{}]生产者[{}]已启动", topic, producerGroup);
        PRODUCER_MAP.put(busi, producer);
    }

    private static SendResult send(String busi, Object obj) {
        DefaultMQProducer defaultMQProducer = PRODUCER_MAP.get(busi);
        if(defaultMQProducer == null) {
            log.info("发送失败,无生产者[{}]", busi);
            return null;
        }
        String env = System.getProperty("env");

        if (StringUtils.isBlank(env)) {
            env = "DEFAULT";
        } else {
            env = env.toUpperCase();
        }

        SendResult sendResult = null;
        try {
            sendResult = defaultMQProducer.send(new Message(env + "_TOPIC_" + busi, env + "_TAG_" + busi, JSONObject.toJSONBytes(obj)));
        } catch (Exception e) {
            e.printStackTrace();
        }

        return sendResult;
    }

    
    public static void clean() {
        for (DefaultMQPushConsumer defaultMQPushConsumer : CONSUMER_MAP.values()) {
            defaultMQPushConsumer.shutdown();
        }
        for (DefaultMQProducer defaultMQProducer : PRODUCER_MAP.values()) {
            defaultMQProducer.shutdown();
        }
    }
    
    
    public static <P extends BusiProtocol> void bindingConsumerBusi(Class<P> protocolClazz, String namesrvAddr, BusiHandler<P> handler) {
        
        if(protocolClazz == null || handler == null || namesrvAddr == null) {
            log.info("Binding consumer failed.Illegal arguments[{}],[{}],[{}]",protocolClazz, handler, namesrvAddr);
            return;
        }
        bindingConsumerBusi(protocolClazz.getSimpleName(), namesrvAddr, new BusiListener<P>(protocolClazz, handler) {
            @Override
            public boolean doBusi(P p, BusiHandler<P> handler, MessageExt msg) {
                log.info("<<<< 来消息了 {}", msg.getMsgId());
                boolean result = handler.handle(p, msg);
                log.info("消息处理" + (result?"成功":"失败"));
                return result;
            }
        });
    }
    
    public static void bindingProducerBusi(Class<? extends BusiProtocol> clazz, String namesrvAddr){
        bindingProducerBusi(clazz.getSimpleName(), namesrvAddr);
    }
    
    public static <T> SendResult send(T t) {
        return send(t.getClass().getSimpleName().toUpperCase(),t);
    }
}

 

 

import java.nio.charset.Charset;
import java.util.List;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import com.alibaba.fastjson.JSONObject;

public abstract class BusiListener<P extends BusiProtocol> implements MessageListenerConcurrently {

    private Class<P> protocolClazz;
    private BusiHandler<P> handler;

    /**
     * @param protocolClazz 协议类
     * @param handlerClazz 消息处理类 
     */
    public BusiListener(Class<P> protocolClazz, BusiHandler<P> handler) {
        this.protocolClazz = protocolClazz;
        this.handler = handler;
    }

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        MessageExt msg = msgs.get(0);
        P p = JSONObject.parseObject(new String(msg.getBody(), Charset.forName("utf-8")), protocolClazz);
        p.setRetry(msg.getReconsumeTimes());
        p.setMsgId(msg.getMsgId());
        p.setPercent(msg.getQueueOffset() + "/" + msg.getStoreSize());
        return doBusi(p, handler, msg) ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS
                : ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

    /**
     * @param p
     * @param handlerClazz 可通过类型寻找spring托管的bean 
     * @param msg
     * @return
     */
    public abstract boolean doBusi(P p, BusiHandler<P> handler, MessageExt msg);

    public Class<? extends BusiProtocol> getProtocolClazz() {
        return protocolClazz;
    }

    public BusiHandler<P> getHandler() {
        return handler;
    }

}

 

import org.apache.rocketmq.common.message.MessageExt;

public interface BusiHandler<P extends BusiProtocol> {

    boolean handle(P p, MessageExt messageExt);
}

 

public class BusiProtocol {

    private int retry;
    private String msgId;
    private String percent;

    public int getRetry() {
        return retry;
    }

    public void setRetry(int retry) {
        this.retry = retry;
    }

    public String getMsgId() {
        return msgId;
    }

    public void setMsgId(String msgId) {
        this.msgId = msgId;
    }

    public String getPercent() {
        return percent;
    }

    public void setPercent(String percent) {
        this.percent = percent;
    }
    
}

 

使用:

第一步:定义协议对象

public class SAYHELLO extends BusiProtocol{

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
    
}

 

第二步:

    @Test
    public void test() {

        ApacheRocketMqManager.bindingConsumerBusi(SAYHELLO.class, "192.168.200.214:9876", (p, msg) -> {
            System.out.println(JSONObject.toJSONString(p));
            return true;
        });
        ApacheRocketMqManager.bindingProducerBusi(SAYHELLO.class, "192.168.200.214:9876");

        SAYHELLO t = new SAYHELLO();
        t.setName("张三1");
        ApacheRocketMqManager.send(t);

        try {
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

 

备注:需要自建rocket mq,可以自动创建 topic ,一个协议对象代表一个业务。

具体使用时只需在项目中配置好生产和消费,然后直接发送对象即可

环境区分需要项目启动时传入env参数 

 

 

 

二、该组件初衷

  • 项目中重复而繁琐的初始化消费者和生产者
  • Apollo重复而繁杂的topic维护,命名无规则且容易出错
  • Apollo 各个环境topic需要单独维护
 
以上现状令人头疼,为了扭转当前局面,封装了该组件
 

三、设计思路

  • 保持业务独立。一个业务就是一组特定的业务数据触发特定的动作,业务绑定生产者组和消费者组,业务数据需要有业务发起者(生产者)发起,有业务处理者处理(消费者)。不同的业务是需要隔离。
  • 生产者到消费者的消息通过协议约束。协议是为了约束生产者和消费者,保证各个业务的处理数据不会错乱,并绑定到特定的topic
  • 自动维护Topic。受限制于RocketMQ生产和消费一致性问题,引入协议后不再需要手动维护topic,包括各个环境
 

四、使用说明

第一步:定义协议对象

一个普通的pojo,需要继承 cn.xs.ambi.mgt.apachermq.BusiProtocol ,因为协议对象可能跨项目使用,建议统一放在 qishi-rocketmq 项目
举例:
public class SAYHELLO extends BusiProtocol{

    private String name;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
    
}

 

 
这个协议中只有一个业务数据name
 

第二步:配置生产者和消费者

@Configuration
public class MqConfig implements InitializingBean {

    @Value("${rocketmq.namesrv-addr}")
    private String MQ_ADDRESS;
    @Autowired
    ApplicationContext ac;

    @Override
    public void afterPropertiesSet() throws Exception {
        //工单完结发送短信
        ApacheRocketMqManager.bindingProducerBusi(WoFinishSmsDto.class, this.MQ_ADDRESS);
        ApacheRocketMqManager.bindingConsumerBusi(WoFinishSmsDto.class, this.MQ_ADDRESS, ac.getBean(FinishWoSmsHandler.class));
        
        // say hello
        ApacheRocketMqManager.bindingProducerBusi(SAYHELLO.class, this.MQ_ADDRESS);
        ApacheRocketMqManager.bindingConsumerBusi(SAYHELLO.class, this.MQ_ADDRESS, (p,ex)->{
            System.out.println(JSONObject.toJSONString(p));
            return true;
        });
    }
}

 

上面示例配置了两个协议,最后一个是SAYHELLO
 

第三步:实现消费者handler

需要实现 cn.xs.ambi.mgt.apachermq.BusiHandler<P> 接口,实现handle 方法,返回true代表消费成功,返回false代表需要重试
 

第四步:发消息

SAYHELLO t = new SAYHELLO();
t.setName("张三1");
SendResult result = ApacheRocketMqManager.send(t);
System.out.println(result);

 

发消息非常简单,只需要封装业务数据,然后调用 ApacheRocketMqManager.send(T)
可以通过返回的 org.apache.rocketmq.client.producer.SendResult 判断是否发送成功,该返回可能是null
 

五、测试

@Before
    public void init() {
        ApacheRocketMqManager.bindingConsumerBusi(SAYHELLO.class, "192.168.200.214:9876", (p, ex) -> {
            System.out.println(JSONObject.toJSONString(p));
            return true;
        });
        ApacheRocketMqManager.bindingProducerBusi(SAYHELLO.class, "192.168.200.214:9876");
    }

    @Test
    public void test() {
        SAYHELLO t = new SAYHELLO();
        t.setName("张三1");
        SendResult result = ApacheRocketMqManager.send(t);
        System.out.println(result);
        try {
            Thread.sleep(20000L);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

 

结果:

15:28:35.708 [main] INFO ApacheRocketMqManager - 主题[DEFAULT_TOPIC_SAYHELLO]消费者[DEFAULT_CONSUMER_GROUP_SAYHELLO]已注册
15:28:36.542 [main] INFO ApacheRocketMqManager - 主题[DEFAULT_TOPIC_SAYHELLO]生产者[DEFAULT_PRODUCER_GROUP_SAYHELLO]已启动

SendResult [sendStatus=SEND_OK, msgId=7F00000146E836BAF30C7EEACC4A0000, offsetMsgId=C0A8C8D600002A9F00000000EADAEE73, messageQueue=MessageQueue [topic=DEFAULT_TOPIC_SAYHELLO, brokerName=xx-pubsrv-rocketmq, queueId=0], queueOffset=7]

15:28:37.093 [ConsumeMessageThread_DEFAULT_CONSUMER_GROUP_SAYHELLO_1] INFO ApacheRocketMqManager - <<<< 来消息了 7F00000146E836BAF30C7EEACC4A0000
{"msgId":"7F00000146E836BAF30C7EEACC4A0000","name":"张三1","percent":"7/231","retry":0}
15:28:37.093 [ConsumeMessageThread_DEFAULT_CONSUMER_GROUP_SAYHELLO_1] INFO ApacheRocketMqManager - 消息处理成功