低级 API
package com.atguigu.streaming.kafka
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaCluster.Err
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaCluster, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object LowKafka {
def getOffset(kafkaCluster: KafkaCluster, group: String, topic: String): Map[TopicAndPartition, Long] = {
var topicAndPartition2Long: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]()
val topicMetadataEither: Either[Err, Set[TopicAndPartition]] = kafkaCluster.getPartitions(Set(topic))
if (topicMetadataEither.isRight) {
val topicAndPartitions: Set[TopicAndPartition] = topicMetadataEither.right.get
val topicAndPartition2LongEither: Either[Err, Map[TopicAndPartition, Long]] =
kafkaCluster.getConsumerOffsets(group, topicAndPartitions)
if (topicAndPartition2LongEither.isLeft) {
topicAndPartitions.foreach {
topicAndPartition => topicAndPartition2Long = topicAndPartition2Long + (topicAndPartition -> 0)
}
} else {
val current: Map[TopicAndPartition, Long] = topicAndPartition2LongEither.right.get
topicAndPartition2Long ++= current
}
}
topicAndPartition2Long
}
def saveOffset(kafkaCluster: KafkaCluster, group: String, dStream: InputDStream[String]) = {
dStream.foreachRDD(rdd => {
var map: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]()
val hasOffsetRangs: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]
val ranges: Array[OffsetRange] = hasOffsetRangs.offsetRanges
ranges.foreach(range => {
map += range.topicAndPartition() -> range.untilOffset
})
kafkaCluster.setConsumerOffsets(group,map)
})
}
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")
val ssc = new StreamingContext(conf, Seconds(3))
val brokers = "hadoop201:9092,hadoop202:9092,hadoop203:9092"
val topic = "first"
val group = "bigdata"
val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
val kafkaParams = Map(
"zookeeper.connect" -> "hadoop201:2181,hadoop202:2181,hadoop203:2181",
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization
)
val kafkaCluster = new KafkaCluster(kafkaParams)
val fromOffset: Map[TopicAndPartition, Long] = getOffset(kafkaCluster, group, topic)
val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
ssc,
kafkaParams,
fromOffset,
(message: MessageAndMetadata[String, String]) => message.message()
)
dStream.print()
saveOffset(kafkaCluster, group, dStream)
ssc.start()
ssc.awaitTermination()
}
}