3.2.2.3 类: HBaseUtil

package com.atguigu.dataconsumer.util;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.Map;

/**
 * 该类主要用于封装一些HBase的常用操作,比如创建命名空间,创建表等等
 */
public class HBaseUtil {

    private static Configuration conf;

    private static Connection conn;
    /**
     * 预分区的数量
     */
    private static int regions = Integer.parseInt(PropertyUtil.getProperty("hbase.regions"));
    private static DecimalFormat formatter = new DecimalFormat("0000");

    static {

        try {
            // 创建一个 HBaseConfiguration, 会自动的添加资源文件中的 hbase-site.xml中的配置
            conf = HBaseConfiguration.create();
            // 创建 Connection 对象
            conn = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 在 HBase 中建表
     *
     * @param tableName 表名
     * @param cf        列族
     */
    public static void createTable(String tableName, String cf) {
        // 如果表已经存在则直接返回
        if (isTableExists(tableName)) return;

        Admin admin = null;
        try {
            admin = getAdmin();
            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
            System.out.println(cf);
            desc.addFamily(new HColumnDescriptor(cf));
            admin.createTable(desc, getSplitKeys());
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }


    }

    /**
     * 获取预分区键
     * <p>
     * 只考虑 5 个字符的预先分区: 4个数字, 加一个竖线 |
     * 0000| , 0001|,....
     * <p>
     * 将来的rowKey 使用 0000_ 来开头, 则 0000_一定会落在  无穷->0000| 之间, 因为 "_"比"|"小
     *
     * @return 存储预分区键的二维字节数组
     */
    public static byte[][] getSplitKeys() {

        // 创建存储预分区键的二维字节数组
        byte[][] splitKeys = new byte[regions][];
        for (int i = 0; i < splitKeys.length; i++) {
            String key = formatter.format(i) + "|";
            splitKeys[i] = Bytes.toBytes(key);
        }
        return splitKeys;
    }

    /**
     * 创建命名空间
     *
     * @param NSName 命名空间的名字
     */
    public static void createNS(String NSName) {
        if (isNSExists(NSName)) return;

        NamespaceDescriptor build = NamespaceDescriptor.create(NSName)
                .addConfiguration("creator", "atguigu")
                .addConfiguration("create_time", System.currentTimeMillis() + "")
                .build();
        Admin admin = null;
        try {
            admin = getAdmin();
            admin.createNamespace(build);

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * 获取到 HBase 的 Admin 对象
     */
    public static Admin getAdmin() throws IOException {
        Admin admin = conn.getAdmin();
        return admin;
    }

    /**
     * 判断指定的命名空间是否存在
     *
     * @param nameSpace
     * @return
     */
    public static boolean isNSExists(String nameSpace) {
        Admin admin = null;
        try {
            admin = getAdmin();
            NamespaceDescriptor descriptor = admin.getNamespaceDescriptor(nameSpace);
            // 如果在获取配置的过程中没有报错, 则证明这个命名空间是存在的
            Map<String, String> conf = descriptor.getConfiguration();
            return true;
        } catch (IOException e) {
            return false;
        } finally {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 判断指定的表是否存在
     *
     * @param tableName
     * @return
     */
    public static boolean isTableExists(String tableName) {
        Admin admin = null;
        try {
            admin = getAdmin();
            return admin.tableExists(TableName.valueOf(tableName));
        } catch (IOException e) {
            e.printStackTrace();
        } finally {

            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return false;
    }

    /**
     * 获取指定的表
     *
     * @param tableName 表名
     * @return 表对象
     */
    public static Table getTable(String tableName) {
        try {
            Table table = conn.getTable(TableName.valueOf(tableName));
            return table;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 根据要存入的值, 返回计算出来的 rowKey
     * <p>
     *                   this.call1 + ","
     *                 + dateFormatter.format(new Date(this.startTime)) + ","
     *                 + this.call2 + ","
     *                 + this.duration + ","
     *                 + this.flag;
     *
     * 来的信息格式: call1,startTime,call2,duration,flag
     * <p>
     * rowKey : 0001_call1_startTime_call2_duration_flag
     * @param value
     * @return
     */
    public static byte[] getRowKey(String value) {

        String[] split = value.split(",");

        String rowKeyStr = new StringBuffer()
                // 只要call1相同和通话时间的年和月一致就进入同一个分区
                .append(getRowKeyPrefix(split[0], split[1].substring(0, 7)))  // 分区前缀
                .append("_")
                .append(split[0]) // 电话号码
                .append("_")
                .append(split[1])  // 建立通话的时间
                .append("_")
                .append(split[2])  // 被叫号码
                .append("_")
                .append(split[3])  // 通话时长
                .append("_")
                .append(split[4])  // 通话类型
                .toString();
        return Bytes.toBytes(rowKeyStr);
    }

    /**
     * 获取 rowKey 的分区前缀
     * <p>
     * 返回: 0001   0002  ....
     */
    private static String getRowKeyPrefix(String... arms) {
        int i = Math.abs(Arrays.toString(arms).hashCode()) % regions;
        return formatter.format(i);
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2018-11-25 17:42:12

results matching ""

    No results matching ""