4.2 Kafka 生产者 API
4.2.1 创建生产者(旧 API)
package com.atguigu.kafka.producer;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
public class OldProducer {
public static void main(String[] args) {
/*创建 Properties 对象*/
Properties props = new Properties();
props.put("metadata.broker.list", "hadoop201:9092");
props.put("request.required.acks", "1");
props.put("serializer.class", "kafka.serializer.StringEncoder");
/*生产者配置文件对象*/
ProducerConfig conf = new ProducerConfig(props);
/*1. 创建 生产者客户端 泛型是要发送的消息的 key, value 类型*/
Producer<Integer, String> producer = new Producer<Integer, String>(conf);
/*创建要发送的消息对象: 参数1: topic 参数2: vlaue */
KeyedMessage<Integer, String> msg = new KeyedMessage<Integer, String>("first", "hello atguigu");
/*2. 发送消息*/
producer.send(msg);
}
}
4.2.2 创建生产者(新 API)
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class NewProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop201:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 重试最大次数
props.put("retries", 0);
// 批消息处理大小
props.put("batch.size", 16384);
// 请求延时
props.put("linger.ms", 1);
//// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 5; i++) {
producer.send(new ProducerRecord<String, String>("first", i + "", "atguigu " + i));
}
// 数据发送完毕后关闭生产者与kafka服务器的连接
producer.close();
}
}
4.2.3 创建带回调函数的生产者(新 API)
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class ProducerWithCallBack {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop201:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 重试最大次数
props.put("retries", 0);
// 批消息处理大小
props.put("batch.size", 16384);
// 请求延时
props.put("linger.ms", 1);
//// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("first", i + "", "hello " + i);
// 当发送成功之后会回调这个拉姆达表示
producer.send(record, (metadata, exception) -> {
if(exception != null){
System.out.println("发送失败");
}else{
System.out.println(metadata.partition() + " ---- " + metadata.offset());
}
});
}
// 数据发送完毕后关闭生产者与kafka服务器的连接
producer.close();
}
}
4.2.4 自定义分区的生产者
生产的时候指定把数据发送到指定的分区.
自定义的分区类: CustomPartitioner
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
private Map<String, ?> configs;
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 0;
}
/*当分区关闭的时候调用这个方法*/
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
this.configs = configs;
}
}
调用类:PartitionProducer
package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class PartitionProducer {
public static void main(String[] args) {
Properties props = new Properties();
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop201:9092");
// 等待所有副本节点的应答
props.put("acks", "all");
// 消息发送最大尝试次数
props.put("retries", 0);
// 一批消息处理大小
props.put("batch.size", 16384);
// 增加服务端请求延时
props.put("linger.ms", 1);
// 发送缓存区内存大小
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 指定自定义分区类
props.put("partitioner.class", "com.atguigu.kafka.producer.CustomPartitioner");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("first", "key: " + i, "world: " + i));
}
producer.close();
}
}
测试是否都发送到了0
分区
查看 00000000000000000000.log
文件发现确实都存储到了 0
分区