7.4 7.4 自定义 HBase-MapReduce 之二
目标:实现将HDFS中的数据写入到HBase表中。
ReadFruitFromHDFSMapper
类
构建ReadFruitFromHDFSMapper
于读取 HDFS 中的文件数据
package com.atguigu.mr2.mr;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 每行数据使用 \t 切割
String[] split = value.toString().split("\t");
// 根据数组中的数据分别取值
String rowKew = split[0];
String name = split[1];
String color = split[2];
// 初始化 rowkey
ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKew));
// 初始化 Put
Put put = new Put(Bytes.toBytes(rowKew));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));
context.write(rowKeyWritable, put);
}
}
WriteFruitMRFromTxtReducer
类
构建WriteFruitMRFromTxtReducer
类
package com.atguigu.mr2.mr;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import java.io.IOException;
public class WriteFruitMRFromTxtReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
for (Put put : values) {
context.write(NullWritable.get(), put);
}
}
}
调用执行Job
package com.atguigu.mr2.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Txt2FruitRunner implements Tool {
private Configuration conf;
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(this.conf);
job.setJarByClass(Txt2FruitRunner.class);
// 设置输入路径
FileInputFormat.setInputPaths(job, "hdfs://hadoop201:9000/input_fruit");
//设置 Mapper
job.setMapperClass(ReadFruitFromHDFSMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
// 设置 Reducer
TableMapReduceUtil.initTableReducerJob(
"fruit2",
WriteFruitMRFromTxtReducer.class,
job);
job.setNumReduceTasks(1);
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public Configuration getConf() {
return this.conf;
}
public static void main(String[] args) throws Exception {
int code = ToolRunner.run(new Txt2FruitRunner(), args);
System.out.println(code == 0 ? "执行成功" : "执行失败");
}
}