3.2.4.1 手动写入被叫信息

类: HBaseConsumer2

package com.atguigu.dataconsumer;

import com.atguigu.dataconsumer.util.HBaseUtil;
import com.atguigu.dataconsumer.util.PropertyUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;

/**
 * 该类主要用于读取 Kafka中缓存的数据,然后调用 HBaseAPI,持久化数据。
 * <p>
 * 是 data-consumer 模块的入口
 */
public class HBaseConsumer2 {
    public static void main(String[] args) throws InterruptedException {
        final HBaseDao hBaseDao = new HBaseDao();
        HBaseConsumer2 hBaseConsumer = new HBaseConsumer2();
        hBaseConsumer.readDataFromKafka(value -> {
            System.out.println(Bytes.toString(HBaseUtil.getRowKey(value)));
            // 把拿到的数据 put 到 HBase 表中
            hBaseDao.put(value);

            // 新增的手动插入被叫信息的代码
            // 交换 call1位置和call2位置的值, 并把 flag 设置为1, 再向 HBase 插入一条记录
            String[] split = value.split(",");
            String temp = split[0];  // call1 主叫
            split[0] = split[2];
            split[2] = temp;
            split[4] = "1"; // 表示这条记录是被叫记录
            hBaseDao.put(split[0] + "," + split[1] + "," + split[2] + "," + split[3] + "," + split[4]);
        });
    }

    /**
     * 从 Kafka 读取数据
     * 并把读取到的数据的值通过回调接口传递到这个方法的调用者
     *
     * @param callback 回调接口
     * @throws InterruptedException
     */
    public void readDataFromKafka(CallBack<String> callback) throws InterruptedException {
        // 创建 Kafka 消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(PropertyUtil.properties);
        // 订阅主体
        consumer.subscribe(Collections.singletonList(PropertyUtil.getProperty("kafka.topics")));
        // 循环读取数据
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                if (callback != null) {
                    callback.call(record.value());
                }
                // 测试的时候为了防止读取速度过快, 线程休眠100ms
                Thread.sleep(100);
            }
        }
    }

    /**
     * 定义一个回调接口
     *
     * @param <T>
     */
    public static interface CallBack<T> {
        public abstract void call(T t);
    }
}


类: QueryWithStartRowStopRow

package com.atguigu.dataconsumer.query;

import com.atguigu.dataconsumer.util.HBaseUtil;
import com.atguigu.dataconsumer.util.PropertyUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.text.DecimalFormat;

public class QueryWithStartRowStopRow {
    public static void main(String[] args) throws IOException {
        DecimalFormat formatter = new DecimalFormat("0000");

        // 定义开始时间和结束时间
        String startTime = "2018-01";
        String endTime = "2018-12";

        // 定义要查询电话
        String phone = "16480981069";

        // 分区的数量
        int regions = Integer.parseInt(PropertyUtil.getProperty("hbase.regions"));

        // 获取表
        Table table = HBaseUtil.getTable(PropertyUtil.getProperty("hbase.table.name"));
        // 我们需要分区进行查询
        for (int i = 0; i < regions; i++) {
            String start = formatter.format(i) + "_" + phone + "_" +  startTime;
            String end = formatter.format(i) + "_" + phone + "_" + endTime;
            Scan scan = new Scan(Bytes.toBytes(start), Bytes.toBytes(end));
            ResultScanner results = table.getScanner(scan);
            for (Result result : results) {
                System.out.println(Bytes.toString(result.getRow()));
            }
        }
    }
}

注意:

  • [startRowKew,stopRowKey) 是前闭后开的区间
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2018-11-25 20:52:44

results matching ""

    No results matching ""