5.2 拦截器案例
需求
实现一个简单的 2 个 interceptor 组成的拦截链。
第 1 个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;
第 2 个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
实例操作
时间拦截器
package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
/**
* 这个方法在key-value被序列化和分区被分配之前调用.
* 主要用来对 ProducerRecord 进行修改.
* 返回修改后的一个新的 ProducerRecord 即可
*
* @param record
* @return
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<String, String>(
record.topic(),
record.partition(),
record.key(),
System.currentTimeMillis() + " : " + record.value()
);
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
/**
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {
}
}
统计消息的拦截器
package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CountInterceptor implements ProducerInterceptor<String, String> {
private int successCount = 0;
private int failCount = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 不需要修改记录, 所以要把原记录返回, 否则将来发送给 broker 的都是null了
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if(exception == null){ // 发送成功
successCount++;
}else{ // 发送失败
failCount++;
}
}
@Override
public void close() {
System.out.println("successCount = " + successCount);
System.out.println("failCount = " + failCount);
}
@Override
public void configure(Map<String, ?> configs) {
}
}
Producer 主程序
package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class InterceptorProducer {
public static void main(String[] args) {
// 1. 设置配置信息
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop201:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2. 构建拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor");
interceptors.add("com.atguigu.kafka.interceptor.CountInterceptor");
props.put("interceptor.classes", interceptors);
// 3. 创建生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 4. 发送消息
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("first", "hadoop" + i);
producer.send(record);
}
// 5. 关闭 Producer. 一定要关闭Producer否则不会调用Interceptor的close方法
producer.close();
}
}
测试
在 kafka 上启动消费者,然后运行客户端java程序
观察 java 平台控制台输出数据