4.2 Kafka 生产者 API

4.2.1 创建生产者(旧 API)

package com.atguigu.kafka.producer;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class OldProducer {
    public static void main(String[] args) {

        /*创建 Properties 对象*/
        Properties props = new Properties();
        props.put("metadata.broker.list", "hadoop201:9092");
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        /*生产者配置文件对象*/
        ProducerConfig conf = new ProducerConfig(props);
        /*1. 创建 生产者客户端 泛型是要发送的消息的 key, value 类型*/
        Producer<Integer, String> producer = new Producer<Integer, String>(conf);

        /*创建要发送的消息对象: 参数1: topic 参数2: vlaue */
        KeyedMessage<Integer, String> msg = new KeyedMessage<Integer, String>("first", "hello atguigu");
        /*2. 发送消息*/
        producer.send(msg);

    }

}

4.2.2 创建生产者(新 API)

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class NewProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "hadoop201:9092");
        // 等待所有副本节点的应答
        props.put("acks", "all");
        // 重试最大次数
        props.put("retries", 0);
        // 批消息处理大小
        props.put("batch.size", 16384);
        // 请求延时
        props.put("linger.ms", 1);
        //// 发送缓存区内存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 5; i++) {
            producer.send(new ProducerRecord<String, String>("first", i + "", "atguigu " + i));
        }
        // 数据发送完毕后关闭生产者与kafka服务器的连接
        producer.close();
    }
}

4.2.3 创建带回调函数的生产者(新 API)

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class ProducerWithCallBack {
    public static void main(String[] args) {
        Properties props = new Properties();
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "hadoop201:9092");
        // 等待所有副本节点的应答
        props.put("acks", "all");
        // 重试最大次数
        props.put("retries", 0);
        // 批消息处理大小
        props.put("batch.size", 16384);
        // 请求延时
        props.put("linger.ms", 1);
        //// 发送缓存区内存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("first", i + "", "hello  " + i);
            // 当发送成功之后会回调这个拉姆达表示
            producer.send(record, (metadata, exception) -> {
                if(exception != null){
                    System.out.println("发送失败");
                }else{
                    System.out.println(metadata.partition() + " ---- " + metadata.offset());
                }
            });

        }
        // 数据发送完毕后关闭生产者与kafka服务器的连接
        producer.close();
    }
}

4.2.4 自定义分区的生产者

生产的时候指定把数据发送到指定的分区.

自定义的分区类: CustomPartitioner

package com.atguigu.kafka.producer;


import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {
    private Map<String, ?> configs;

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    /*当分区关闭的时候调用这个方法*/
    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
        this.configs = configs;

    }
}

调用类:PartitionProducer

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "hadoop201:9092");
        // 等待所有副本节点的应答
        props.put("acks", "all");
        // 消息发送最大尝试次数
        props.put("retries", 0);
        // 一批消息处理大小
        props.put("batch.size", 16384);
        // 增加服务端请求延时
        props.put("linger.ms", 1);
        // 发送缓存区内存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 指定自定义分区类
        props.put("partitioner.class", "com.atguigu.kafka.producer.CustomPartitioner");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {

            producer.send(new ProducerRecord<String, String>("first", "key: " + i, "world: " + i));
        }

        producer.close();

    }
}

测试是否都发送到了0分区

查看 00000000000000000000.log 文件发现确实都存储到了 0分区

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2018-11-20 18:14:18

results matching ""

    No results matching ""