3.3.4.3 包:com.atguigu.dataanalasis.mapred

1. 类: CallDriver

驱动类

package com.atguigu.dataanalasis.mapred;

import com.atguigu.dataanalasis.bean.CommonDimension;
import com.atguigu.dataanalasis.outputformat.MysqlOutput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class CallDriver implements Tool {
    private Configuration conf;

    @Override
    public int run(String[] args) throws Exception {
        // 获取job对象
        Job job = Job.getInstance();

        // 设置主类
        job.setJarByClass(CallDriver.class);

        // 设置 Mapper
        TableMapReduceUtil.initTableMapperJob(
                "ns_telecom:calllog",
                new Scan(),
                CallMapper.class,
                CommonDimension.class,
                Text.class,
                job);
        // 设置 Reducer
        job.setReducerClass(CallReducer.class);
        // 设置 Output
        job.setOutputFormatClass(MysqlOutput.class);
        job.addFileToClassPath(new Path("hdfs://hadoop201:9000/libs/mysql-connector-java-5.1.27-bin.jar"));
        // 提交
        return job.waitForCompletion(true) ? 0 : 1;
    }

    @Override
    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new CallDriver(), args);
        String s = run == 0 ? "正常结束" : "发生异常";
        System.out.println(s);
        System.exit(run);

    }
}

2. 类: CallMapper

package com.atguigu.dataanalasis.mapred;

import com.atguigu.dataanalasis.bean.CommonDimension;
import com.atguigu.dataanalasis.bean.ContactDimension;
import com.atguigu.dataanalasis.bean.DateDimension;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CallMapper extends TableMapper<CommonDimension, Text> {
    private Map<String, String> contacts = new HashMap<>();
    private Text v = new Text();
    private CommonDimension commonDimension = new CommonDimension();
    private ContactDimension contactDimension = new ContactDimension();
    private DateDimension dateDimension = new DateDimension();

    /**
     * 任务开始的时候调用一次
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {

        contacts.put("15369468720", "李雁");
        contacts.put("19920860202", "卫艺");
        contacts.put("18411925860", "仰莉");
        contacts.put("14473548449", "陶欣悦");
        contacts.put("18749966182", "施梅梅");
        contacts.put("19379884788", "金虹霖");
        contacts.put("19335715448", "魏明艳");
        contacts.put("18503558939", "华贞");
        contacts.put("13407209608", "华啟倩");
        contacts.put("15596505995", "仲采绿");
        contacts.put("17519874292", "卫丹");
        contacts.put("15178485516", "戚丽红");
        contacts.put("19877232369", "何翠柔");
        contacts.put("18706287692", "钱溶艳");
        contacts.put("18944239644", "钱琳");
        contacts.put("17325302007", "缪静欣");
        contacts.put("18839074540", "焦秋菊");
        contacts.put("19879419704", "吕访琴");
        contacts.put("16480981069", "沈丹");
        contacts.put("18674257265", "褚美丽");
        contacts.put("18302820904", "孙怡");
        contacts.put("15133295266", "许婵");
        contacts.put("17868457605", "曹红恋");
        contacts.put("15490732767", "吕柔");
        contacts.put("15064972307", "冯怜云");
    }

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        String rowKey = Bytes.toString(key.get());
        String[] split = rowKey.split("_");
        // 0005_19877232369_2019-09-12 04:19:10_13407209608_2988_1
        String call1 = split[1];
        String startTime = split[2];
        String duration = split[4];

        // 2019-09-12 04:19:10
        String year = startTime.substring(0, 4);
        String month = startTime.substring(5, 7);
        String day = startTime.substring(8, 10);

        // 封装 value
        v.set(duration);

        // 封装 key :
        commonDimension.setContactDimension(contactDimension);
        commonDimension.setDateDimension(dateDimension);

        contactDimension.setTelephone(call1);
        contactDimension.setName(contacts.get(call1));
        /*
         每从HBase 读取到一行记录, 将来要向Mysql的 tb_contacts 写入 3 条记录
          */
        // 封装日维度
        dateDimension.setYear(year);
        dateDimension.setMonth(month);
        dateDimension.setDay(day);
        // 写出去r维度
        context.write(commonDimension, v);


        // 封装月维度  把日期改为-1
        dateDimension.setDay("-1");
        context.write(commonDimension, v);

        // 封装年维度 把月和日期都改为-1
        dateDimension.setMonth("-1");
        context.write(commonDimension, v);
    }
}

3. 类:CallReducer

package com.atguigu.dataanalasis.mapred;

import com.atguigu.dataanalasis.bean.CommonDimension;
import com.atguigu.dataanalasis.bean.CountDurationValue;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class CallReducer extends Reducer<CommonDimension, Text, CommonDimension, CountDurationValue> {
    private CountDurationValue countDurationValue = new CountDurationValue();

    @Override
    protected void reduce(CommonDimension key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        int countSum = 0;
        int durationSum = 0;

        for (Text value : values) {
            durationSum += Integer.parseInt(value.toString());
            countSum++;
        }

        // 把两个值封装到对象中
        countDurationValue.setCountSum(countSum);
        countDurationValue.setDurationSum(durationSum);

        // 写出数据, 然后让output来处理数据应该如何写到 Mysql 中
        context.write(key, countDurationValue);
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2018-11-27 16:04:01

results matching ""

    No results matching ""