14.2 完成从 dw 到 es 的业务代码

利用 SparkSql 从 Hive 中查询到数据, 然后写入到 ES 中.

14.2.2 在 ES 中为宽表创建索引

PUT gmall_sale_detail
{
  "mappings": {
    "_doc":{
      "properties":{
         "user_id":{
           "type":"keyword"
         },
         "sku_id":{
           "type":"keyword"
         },
         "user_gender":{
           "type":"keyword"
         },
         "user_age":{
           "type":"short"
         },
         "user_level":{
           "type":"keyword"
         },
         "sku_price":{
           "type":"double" 
         },
         "sku_name":{
           "type":"text",
           "analyzer": "ik_max_word"
         },
         "sku_tm_id ":{
           "type":"keyword"
         },
         "sku_category3_id":{
           "type":"keyword"
         },
         "sku_category2_id":{
           "type":"keyword"
         },
         "sku_category1_id":{
           "type":"keyword"
         },
         "sku_category3_name":{
           "type":"text",
           "analyzer": "ik_max_word"
         },
         "sku_category2_name":{
           "type":"text",
           "analyzer": "ik_max_word"
         },
         "sku_category1_name":{
           "type":"text",
           "analyzer": "ik_max_word"
         },
         "spu_id":{
           "type":"keyword"
         },
         "sku_num":{
           "type":"long"
         },
         "order_count":{
           "type":"long"
         },
         "order_amount":{
           "type":"long"
         },
         "dt":{
           "type":"keyword"
         } 
      }
    }
  }
}

14.2.2 创建SaleDetailDayCount

创建 bean 类com.atguigu.dw.gmalldw2es.bean.SaleDetailDayCount用来封装宽表的数据

package com.atguigu.dw.gmalldw2es.bean


case class SaleDetailDayCount(user_id: String,
                              sku_id: String,
                              user_gender: String,
                              user_age: Int,
                              user_level: String,
                              order_price: Double,
                              sku_name: String,
                              sku_tm_id: String,
                              sku_category1_id: String,
                              sku_category2_id: String,
                              sku_category3_id: String,
                              sku_category1_name: String,
                              sku_category2_name: String,
                              sku_category3_name: String,
                              spu_id: String,
                              sku_num: Long,
                              order_count: Long,
                              order_amount: Double,
                              var dt: String)

14.2.3 创建SaleDetailApp

package com.atguigu.dw.gmalldw2es.app

import com.atguigu.dw.gmall.common.util.MyESUtil
import com.atguigu.dw.gmalldw2es.bean.SaleDetailDayCount
import org.apache.spark.sql.{Dataset, SparkSession}

/**
  * Author lzc
  * Date 2019/5/21 6:10 PM
  */
object SaleDetailApp {
    def main(args: Array[String]): Unit = {
        // 获取要查询的日期
        val date = if (args.length > 0) args(0) else "2019-05-20"
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("SaleDetailApp")
            .enableHiveSupport()
            .getOrCreate()
        import spark.implicits._
        val sql =
            s"""
               |select
               |    user_id,
               |    sku_id,
               |    user_gender,
               |    cast(user_age as int) user_age,
               |    user_level,
               |    cast(order_price as double) order_price,
               |    sku_name,
               |    sku_tm_id,
               |    sku_category3_id,
               |    sku_category2_id,
               |    sku_category1_id,
               |    sku_category3_name,
               |    sku_category2_name,
               |    sku_category1_name,
               |    spu_id,
               |    sku_num,
               |    cast(order_count as bigint) order_count,
               |    cast(order_amount as double) order_amount,
               |    dt
               |from dws_sale_detail_daycount
               |where dt='$date'
             """.stripMargin
        spark.sql("use gmall")
        val ds: Dataset[SaleDetailDayCount] = spark.sql(sql).as[SaleDetailDayCount]
        ds.foreachPartition(it => {
            MyESUtil.insertBulk("gmall_sale_detail", it.toList)
        })

        spark.stop()
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-05-21 22:15:35

results matching ""

    No results matching ""