3.2.2.4 类: HBaseConsumer
package com.atguigu.dataconsumer;
import com.atguigu.dataconsumer.util.HBaseUtil;
import com.atguigu.dataconsumer.util.PropertyUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;
public class HBaseConsumer {
public static void main(String[] args) throws InterruptedException {
final HBaseDao hBaseDao = new HBaseDao();
HBaseConsumer hBaseConsumer = new HBaseConsumer();
hBaseConsumer.readDataFromKafka(value -> {
System.out.println(Bytes.toString(HBaseUtil.getRowKey(value)));
hBaseDao.put(value);
});
}
public void readDataFromKafka(CallBack<String> callback) throws InterruptedException {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(PropertyUtil.properties);
consumer.subscribe(Collections.singletonList(PropertyUtil.getProperty("kafka.topics")));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
if (callback != null) {
callback.call(record.value());
}
Thread.sleep(100);
}
}
}
public static interface CallBack<T> {
public abstract void call(T t);
}
}