查看kafka指定位置offset消息

发布时间 2023-05-26 11:38:15作者: levi125
package com.infinitus.cdc.test;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * .
 *
 * @author levi
 * @version 1.0
 * @date 2023/5/26 10:50
 **/
public class KafkaOffsetTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //调用返回的记录数
        props.put("max.poll.records", 100);
//        props.put("enable.auto.commit", "true");
//        props.put("auto.commit.interval.ms", "1000");
        //可以取值为latest(从最新的消息开始消费)或者earliest(从最老的消息开始消费)
        props.put("auto.offset.reset", "earliest");
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "127.0.0.1:9093");
        //设置消费者组名称
        props.put("group.id", KafkaOffsetTest.class.getSimpleName());

        //从哪个offset和partition开始
        String offsetn = "10";
        int partiton = 1;
        String topicName = "topic_name_1";
        KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
        consumer.assign(Arrays.asList(new TopicPartition(topicName, partiton)));
//        consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, partiton)));//不改变当前offset
       consumer.seek(new TopicPartition(topicName, partiton), Long.parseLong(offsetn));//不改变当前offset

        while (true) {
            ConsumerRecords<String, Object> records = consumer.poll(100);
            for (ConsumerRecord<String, Object> record : records){
                System.out.println(record.toString());
                // 在这里打断点
                System.out.println(record.toString());
            }
        }

    }

}