7.3 自定义 HBase-MapReduce 之一
目标:将 fruit 表中的一部分数据,通过MR迁入到 fruit_mr 表中。
步骤1: 添加依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.3.2.1</version>
</dependency>
步骤2: 创建相应的 MapReduce 类
Mapper
构建ReadFruitMapper
类,用于读取fruit表中的数据
package com.atguigu.mr;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
Put put = new Put(key.get()); // 拿到行键,
// 遍历数据
Cell[] cells = value.rawCells();
for (Cell cell : cells) {
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ // 只遍历 name 这一列
put.add(cell);
}
}
// 写出数据
context.write(key, put);
}
}
Reducer
构建WriteFruitMRReducer
类,用于将读取到的 fruit 表中的数据写入到fruit_mr 表中
package com.atguigu.mr;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
// 遍历写出即可
for (Put put : values) {
context.write(NullWritable.get(), put);
}
}
}
Tool
构建 Fruit2FruitMRRunner extends Configured implements Tool
用于组装运行 Job 任务
package com.atguigu.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Fruit2FruitMRRunner implements Tool {
private Configuration conf;
@Override
public int run(String[] args) throws Exception {
// 得到job对象
Job job = Job.getInstance(conf);
// 指定drivelei
job.setJarByClass(Fruit2FruitMRRunner.class);
//指定mapper, 和输入
TableMapReduceUtil.initTableMapperJob(
"fruit",
new Scan(),
ReadFruitMapper.class,
ImmutableBytesWritable.class,
Put.class,
job);
//指定reduce 和输出
TableMapReduceUtil.initTableReducerJob(
"fruit_mr",
WriteFruitMRReducer.class,
job);
//提交
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return this.conf;
}
public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new Fruit2FruitMRRunner(), args);
if(code == 0){
System.out.println("任务正常完成");
}else{
System.out.println("任务失败");
}
}
}
步骤3: 打包
步骤4: 执行
yarn jar hbase-1.0-SNAPSHOT.jar com.atguigu.mr.Fruit2FruitMRRunner