向mq写消息

发布时间 2023-12-15 19:19:41作者: 借你耳朵说爱你

1.基础版本

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import com.alibaba.fastjson.JSON;

public class MQProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("G-Group_REQ");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String jsonStr = JSON.toJSONString("Your JSON content");
        Message msg = new Message("T-Topic", jsonStr.getBytes());

        producer.send(msg);
        producer.shutdown();
    }
}

2.添加写消息失败处理:登记日志、定时处理、参数配置化

import cn.com.*.support.MQLogSupport;
import cn.com.*.entity.MQLog;
import cn.com.*.repository.MQLogRepository;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import net.javacrumbs.shedlock.core.SchedulerLock;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;

@Component
public class MQLogSupportImpl implements MQLogSupport{
    private final Logger logger = LoggerFactory.getLogger(MQLogSupport.class);
    @Autowired
    private MQLogRepository mqLogRepository;

    @Value("${rocketmq.name-server}")
    String namesrvAddr;

    // 写消息服务
    public void sendMessageNoTag(MQLog mqLog) {
        try {
            DefaultMQProducer producer = new DefaultMQProducer(mqLog.getMqGroup());
            producer.setNamesrvAddr(namesrvAddr);
            // 设置发送消息的超时时间为30秒
            producer.setSendMsgTimeout(30000);
            producer.start();
            Message msg = new Message(mqLog.getMqTopic(), mqLog.getMqMessage().getBytes());
            producer.send(msg);
            setMQlogSuccess(mqLog);
            producer.shutdown();
        } catch (Exception e) {
            setMQlogError(mqLog, "9999", "初次:" + e.getMessage());
        }
    }

    // 定时任务,失败后重新写消息,每分钟执行一次
    @Scheduled(cron = "0 0/1 * * * ?")
    @SchedulerLock(name = "RocketMQ", lockAtLeastForString = "PT5M", lockAtMostForString = "PT30M")
    public void readAndResendMessage() {
        // 读取失败信息
        List<MQLog> messageList = mqLogRepository.findByDealCodeNot("0000");
        if(CollectionUtils.isEmpty(messageList)){
            return;
        }
        // 重新写消息
        for (MQLog message : messageList) {
            try {
                DefaultMQProducer producer = new DefaultMQProducer(message.getMqGroup());
                producer.setNamesrvAddr(namesrvAddr);
                producer.setSendMsgTimeout(30000);
                producer.start();
                Message msg = new Message(message.getMqTopic(), message.getMqMessage().getBytes());
                producer.send(msg);
                setMQlogSuccess(message);
                producer.shutdown();
            } catch (Exception e) {
                setMQlogError(message, "定时任务:" + e.getMessage());
            }
        }
    }

    public MQLog regMQlog(JSONObject mqLogJson){
        logger.debug("登记 mq 消息......");
        String topic = mqLogJson.getString("topic");
        String tags = mqLogJson.getString("tags");
        String msg = JSON.toJSONString(mqLogJson);
        String key = mqLogJson.getString("key");
        Boolean approveResult = mqLogJson.getBoolean("approved");
        String approver = mqLogJson.getString("approver");
        String comment = mqLogJson.getString("comment");
        String finshTime = mqLogJson.getString("finshTime");

        MQLog.MQLogBuilder mqLogBuilder = MQLog.builder();
        mqLogBuilder.mqTopic(topic)
                .mqTags(tags)
                .key(key)
                .mqMessage(msg)
                .approveResult(approveResult)
                .approver(approver)
                .approveComment(comment)
                .approveFinishTime(finshTime)
                .regDateTime(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));

        MQLog savedMqLog = mqLogRepository.save(mqLogBuilder.build());
        logger.debug("登记 mq 消息完成。");
        return savedMqLog;
    }

    public void setMQlogError(MQLog mqLog, String errorCode, String errorMsg){
        mqLogRepository.save(mqLog.toBuilder().dealCode(errorCode).dealMsg(errorMsg).build());
    }

    public void setMQlogError(MQLog mqLog, String errorMsg){
        setMQlogError(mqLog,"Error",errorMsg);
    }

    public void setMQlogSuccess(MQLog mqLog){
        mqLogRepository.save(mqLog.toBuilder().dealCode("0000").dealMsg("success").build());
    }
}

3.简化代码

import cn.com.*.support.MQLogSupport;
import cn.com.*.entity.MQLog;
import cn.com.*.repository.MQLogRepository;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import net.javacrumbs.shedlock.core.SchedulerLock;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;

@Component
public class MQLogSupportImpl implements MQLogSupport {
    private final Logger logger = LoggerFactory.getLogger(MQLogSupport.class);
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Autowired
    private MQLogRepository mqLogRepository;

    // 写消息服务
    public void sendMessageNoTag(MQLog mqLog) {
        rocketMQTemplate.asyncSend(mqLog.getMqTopic(), mqLog.getMqMessage(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                setMQlogSuccess(mqLog);
            }

            @Override
            public void onException(Throwable e) {
                setMQlogError(mqLog, "初次:" + e.getMessage());
                retrySendMessage(mqLog);
            }
        });
    }

    // 重试发送消息
    private void retrySendMessage(MQLog mqLog) {
        int retryCount = mqLog.getRetryCount();
        if (retryCount < 3) { // 最多重试3次
            mqLog.setRetryCount(retryCount + 1);
            mqLogRepository.save(mqLog);
            sendMessageNoTag(mqLog);
        } else {
            setMQlogError(mqLog, "重试3次仍然失败");
        }
    }

    // 定时任务,失败后重新写消息,每分钟执行一次
    @Scheduled(cron = "0 0/1 * * * ?")
    @SchedulerLock(name = "RocketMQ", lockAtLeastForString = "PT5M", lockAtMostForString = "PT30M")
    public void readAndResendMessage() {
        // 读取失败信息
        List<MQLog> messageList = mqLogRepository.findByDealCodeNot("0000");
        if (CollectionUtils.isEmpty(messageList)) {
            return;
        }
        // 重新写消息
        for (MQLog message : messageList) {
            rocketMQTemplate.asyncSend(message.getMqTopic(), message.getMqMessage(), new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    setMQlogSuccess(message);
                }

                @Override
                public void onException(Throwable e) {
                    setMQlogError(message, "定时任务:" + e.getMessage());
                    retrySendMessage(message);
                }
            });
        }
    }

    // 设置消息发送成功
    private void setMQlogSuccess(MQLog mqLog) {
        mqLog.setDealCode("0000");
        mqLog.setDealDesc("发送成功");
        mqLogRepository.save(mqLog);
    }

    // 设置消息发送失败
    private void setMQlogError(MQLog mqLog, String errorMsg) {
        mqLog.setDealCode("9999");
        mqLog.setDealDesc(errorMsg);
        mqLogRepository.save(mqLog);
    }
}

注意:

1) 配置文件

rocketmq:
  name-server: IP:端口
  producer:
    group: "G-Group_REQ"
    send-msg-timeout: 30000

2) 根据需要实现:

MQLogSupport、MQLog、MQLogRepository