4.3.2 Kafka 消费者之低级 API
实现使用低级 API 读取指定 topic,指定 partition,指定 offset 的数据。
消费者使用低级API 的主要步骤
步骤 | 主要工作 |
---|---|
1 | 根据指定的分区从主题元数据中找到分区领导 ✅ |
2 | 获取分区最新的消费进度 |
3 | 从主副本拉取分区的消息 ✅ |
4 | 识别主副本的变化,重试 |
代码
官方实例代码: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
我们精简代码:
package com.atguigu.kafka.consume;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class LowerConsumer {
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
// 定义 3 个 brokers
ArrayList<String> brokers = new ArrayList<>(Arrays.asList("hadoop201", "hadoop202", "hadoop303"));
// 定义 brokers 的端口号
int port = 9092;
// 定义 topic
String topic = "first";
// 定义 partition
int partition = 0;
int offset = 0;
LowerConsumer lowerConsumer = new LowerConsumer();
lowerConsumer.getData(brokers, port, topic, partition, offset);
}
/**
* 查找指定 brokers 指定 topic 指定 partition 的 leader
*
* @param brokers brokers
* @param port
* @param topic
* @param partition
* @return
*/
public PartitionMetadata findLeader(List<String> brokers, int port, String topic, int partition) {
for (String broker : brokers) {
SimpleConsumer consumer
= new SimpleConsumer(broker, port, 100000, 64 * 1024, "lookup_leader");
// 创建 TopicMetadataRequest 对象, 使用 consumer 发送请求的时候使用
TopicMetadataRequest request = new TopicMetadataRequest(Arrays.asList(topic));
// 发送请求, 返回值为 Topic 元数据信息
TopicMetadataResponse topicMetadataResponse = consumer.send(request);
// 拿到每个 topic 的元数据信息
List<TopicMetadata> topicMetadataList = topicMetadataResponse.topicsMetadata();
for (TopicMetadata topicMetadata : topicMetadataList) {
// 拿到每个 topic 的所有分区元数据信息
List<PartitionMetadata> partitionMetadataList = topicMetadata.partitionsMetadata();
for (PartitionMetadata partitionMetadata : partitionMetadataList) {
if (partition == partitionMetadata.partitionId()) {
return partitionMetadata;
}
}
}
}
return null;
}
/**
* 获取指定brokers 中指定topic的指定分区的指定offset开始的数据
*
* @param brokers
* @param port
* @param topic
* @param partition
* @param offset
*/
public void getData(List<String> brokers, int port, String topic, int partition, long offset) throws UnsupportedEncodingException, InterruptedException {
PartitionMetadata leader = findLeader(brokers, port, topic, partition);
String host = leader.leader().host();
SimpleConsumer c1 = new SimpleConsumer(host, port, 10000, 64 * 1024, "c1");
while (true) {
FetchRequest request = new FetchRequestBuilder()
.clientId("c1")
// fetchSize 表示一次读取到的最多的字节数. 如果这个值设置过小, 则至少保证一次读取一条记录.
// 为了读取效率的提高, 可以把这个值设置的大一些.
.addFetch(topic, partition, offset, 100000)
.build();
FetchResponse response = c1.fetch(request);
ByteBufferMessageSet messageAndOffsets = response.messageSet(topic, partition);
for (MessageAndOffset messageAndOffset : messageAndOffsets) {
ByteBuffer buffer = messageAndOffset.message().payload();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
System.out.println(messageAndOffset.offset() + " : " + new String(data));
// 每读完一条数据, 把offset 的值置为最新的offset
offset = messageAndOffset.offset();
Thread.sleep(1000);
}
if (messageAndOffsets.iterator().hasNext()) {
// 如果这次确实读到数据了, 所以把offset的值 ++, 下一次读的时候就从新的位置开始读了
// 如果这次没有读到数据, 表示数据已经读完了, 所以就不需要再 ++ 了
// 如果这个offset 只是针对当前分区的.
// 如果想下次启动的时候从上次位置开始读取, 则应该把 offset 的值持久化,, 比如存到Zookeeper 或者 mysql 数据库中.
offset++;
System.out.println(offset);
}
}
}
}