3.2.1 数据采集

数据采集的思路:

  • 配置kafka,启动zookeeper和kafka集群;

  • 创建kafka主题;

  • 启动kafka控制台消费者(此消费者只用于测试使用);

  • 配置flume,监控日志文件;

  • 启动flume监控任务;

  • 运行日志生产脚本;

  • 观察测试。


步骤1: 启动 Zookeeper 和 Kafka 集群

# 自定义的分别启动 3 个Zookeeper节点的脚本
zkstart
# 启动 Kafka集群 分别在 3 个节点启动
kafka-server-start.sh config/server.properties

步骤2: 创建 kafka 主题(topic)

kafka-topics.sh --create --zookeeper hadoop201:2181 --topic calllog  --replication-factor 1 --partitions 3

检查主体是否创建成功

kafka-topics.sh --zookeeper hadoop201:2181 --list

步骤3: 启动消费者开始消费数据

kafka-console-consumer.sh --zookeeper hadoop201:2181 -topic calllog --from-beginning

步骤4: 配置 Flume

/opt/module/telecom 目录下创建 agent 的配置文件:flume2kafka.conf

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/telecom/calls.csv
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop201:9092,hadoop202:9092,hadoop203:9092
a1.sinks.k1.kafka.topic = calllog
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

步骤5: 启动 Flume

flume-ng agent -c /opt/module/flume/conf -f flume2kafka.conf -n a1

步骤6: 运行生产日志的任务脚本,观察 Kafka 控制台消费者是否成功显示产生的数据

./producer.sh
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2018-12-27 20:36:01

results matching ""

    No results matching ""