11.3 准备工作
11.3.1 ETL 数据清洗
是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、交互转换(transform)、加载(load)至目的端的过程
元数据的特点:
通过观察原始数据形式,可以发现,字段之间是用
/t
来分割的.每个视频的所属分类可以有多个, 且多个分类之间用
&
开分割, 并且&
两边有空格每个视频的相关视频也是用
\t
来进行分割的每条数据可以没有相关视频.
因为一共有 10 列数据, 可以没有相关视频的数据, 所以如果一条数据如果小于 9 列, 就算脏数据.
清洗操作
去除脏数据: 小于 9 列的数据
为了便于后期数据分析, 需要把视频类别中的空格去掉, 用
&
来分割多个相关视频之间也应该用
&
分割.
数据清洗用到的 MapReduce 程序
步骤1:添加依赖
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
步骤2: Mapper类
package com.atguigu;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class VideoETLMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private Text resultKey = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1. 清洗数据
String etlString = ETLUtil.elt(value.toString());
if(StringUtils.isBlank(etlString)) return;
// 2. 写出数据
resultKey.set(etlString);
context.write(resultKey, NullWritable.get());
}
}
步骤3: ETLUtil类
package com.atguigu;
public class ETLUtil {
public static String elt(String s) {
// 1. 用 \t 切割字符串. 把所有的字段切割出来了.
String[] splits = s.split("\t");
if (splits.length < 9) return null; // 如果长度小于 9 表示是脏数据, 之间返回空字符串
// 2. 去掉视频类别中的 & 两边的空格
splits[3] = splits[3].replaceAll(" ", "");
// 3. 把数组中所有的字符串拼接起来. 注意拼接的时候需要把从下标为 9 的字符串开始用 & 来拼接
StringBuilder sb = new StringBuilder();
String joinStr = ""; // 用于拼接的时候的连字符
for (int i = 0; i < splits.length; i++) {
joinStr = i < 9 ? "\t" : "&";
joinStr = i == splits.length - 1 ? "" : joinStr;
sb.append(splits[i] + joinStr); // 最后一个地方不需要连字符
}
// 4. 返回拼接好的字符串 .... ..... ... ..&&.. 111 222 aaa&bbb&ccc&
return sb.toString();
}
}
步骤4: Runner类
package com.atguigu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import javax.xml.soap.Text;
import java.io.FileOutputStream;
public class VideoETLRunner implements Tool {
private Configuration conf;
public int run(String[] strings) throws Exception {
System.out.println(getConf());
// 1. 获得 Job 对象
Job job = Job.getInstance(getConf());
// 2. 设置主类
job.setJarByClass(VideoETLRunner.class);
// 3. 设置 Mapper 类
job.setMapperClass(VideoETLMapper.class);
// 4. 不需要reduce
job.setNumReduceTasks(0);
// 5. 设置 Map key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 6. 设置输入路径
FileInputFormat.setInputPaths(job, new Path("/gulivideo/video/2008/0222"));
// 7 设置输出路径
FileOutputFormat.setOutputPath(job, new Path("/output"));
return job.waitForCompletion(true) ? 0 : 1;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public Configuration getConf() {
return this.conf;
}
public static void main(String[] args) throws Exception {
int resultCode = ToolRunner.run(new VideoETLRunner(), args);
if (resultCode == 0) {
System.out.println("success!");
} else {
System.out.println("failed!");
}
}
}
步骤5: 编译成 jar 包, 发布到集群执行
# 上传原始数据到 HDFS
hdfs fs -put 'guiliVideo' /
yarn jar gulivideo-1.0-SNAPSHOT.jar com.atguigu.VideoETLRunner
11.3.2 建表
建 4 张表:
2 张存储原始格式的表(ETC后的数据)
2 章存储
orc
格式的表(数据从原始表导入)
建: gulivideo_ori
create table gulivideo_ori(
videoId string,
uploader string,
age int,
category array<string>,
length int,
views int,
rate float,
ratings int,
comments int,
relatedId array<string>)
row format delimited
fields terminated by "\t"
collection items terminated by "&"
stored as textfile;
导入数据:
load data inpath '/output' into table gulivideo_ori;
建: gulivideo_user_ori
create table gulivideo_user_ori(
uploader string,
videos int,
friends int
)
row format delimited
fields terminated by "\t"
stored as textfile;
导入数据:
load data inpath '/gulivideo/user/2008/0903' into table gulivideo_user_ori;
建: gulivideo_orc
create table gulivideo_orc
stored as orc
as select * from gulivideo_ori;
建: gulivideo_user_orc
create table gulivideo_user_orc
stored as orc
as select * from gulivideo_user_ori;