11.2 写入数据到 ES

11.2.1 规划索引

PUT gmall_order
{
  "mappings" : {
    "_doc" : {
      "properties" : {
        "provinceId" : {
          "type" : "keyword"
        },
        "consignee" : {
          "type" : "keyword",
          "index":false
        },
        "consigneeTel" : {
          "type" : "keyword",
          "index":false
        },
        "createDate" : {
          "type" : "keyword"
        },
        "createHour" : {
          "type" : "keyword"
        },
        "createHourMinute" : {
          "type" : "keyword"
        },
        "createTime" : {
          "type" : "keyword"
        },
        "deliveryAddress" : {
          "type" : "keyword"
        },
        "expireTime" : {
          "type" : "keyword"
        },
        "id" : {
          "type" : "keyword"
        },
        "imgUrl" : {
          "type" : "keyword",
          "index":false
        },
        "operateTime" : {
          "type" : "keyword"
        },
        "orderComment" : {
          "type" : "keyword",
          "index":false
        },
        "orderStatus" : {
          "type" : "keyword"
        },
        "outTradeNo" : {
          "type" : "keyword",
          "index":false
        },
        "parentOrderId" : {
          "type" : "keyword"
        },
        "paymentWay" : {
          "type" : "keyword"
        },
        "totalAmount" : {
          "type" : "double"
        },
        "trackingNo" : {
          "type" : "keyword"
        },
        "tradeBody" : {
          "type" : "keyword",
          "index":false
        },
        "userId" : {
          "type" : "keyword"
        }
      }
    }
  }
}

11.2.2 写数据到 ES

OrderInfo.scala

package com.atguigu.dw.gmall.realtime.bean

import java.text.SimpleDateFormat

case class OrderInfo(area: String,
                     var consignee: String, // 收件人
                     orderComment: String,
                     var consigneeTel: String,
                     operateTime: String,
                     orderStatus: String,
                     paymentWay: String,
                     userId: String,
                     imgUrl: String,
                     totalAmount: Double,
                     expireTime: String,
                     deliveryAddress: String,
                     createTime: String,
                     trackingNo: String,
                     parentOrderId: String,
                     outTradeNo: String,
                     id: String,
                     tradeBody: String) {
    private val date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(createTime)
    val createDate: String = new SimpleDateFormat("yyyy-MM-dd").format(date)
    val createHour: String = new SimpleDateFormat("HH").format(date)
    val createHourMinute: String = new SimpleDateFormat("HH:mm").format(date)
}

OrderApp

package com.atguigu.dw.gmall.realtime.app

import com.alibaba.fastjson.JSON
import com.atguigu.dw.gmall.common.constant.GmallConstant
import com.atguigu.dw.gmall.common.util.MyESUtil
import com.atguigu.dw.gmall.realtime.bean.OrderInfo
import com.atguigu.dw.gmall.realtime.util.MyKafkaUtil
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Author lzc
  * Date 2019/5/17 5:31 PM
  */
object OrderApp {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("OrderApp")
        val ssc = new StreamingContext(conf, Seconds(2))
        val inputDSteam: InputDStream[(String, String)] = MyKafkaUtil.getKafkaStream(ssc, GmallConstant.TOPIC_ORDER)

        // 1. 类型的调整为 OrderInfo
        val orderInfoDStream: DStream[OrderInfo] = inputDSteam.map {
            case (_, jsonString) => {
                val orderInfo: OrderInfo = JSON.parseObject(jsonString, classOf[OrderInfo])
                // 对人名和电话号码脱敏
                orderInfo.consignee =
                    orderInfo.consignee.substring(0, 1) + "**"
                orderInfo.consigneeTel =
                    orderInfo.consigneeTel.substring(0, 3) + "****" + orderInfo.consigneeTel.substring(7, 11)
                orderInfo
            }
        }
        // 2. 开始向 ES 中写入数据
        orderInfoDStream.foreachRDD(rdd => {
            rdd.foreachPartition((orderInfoIt: Iterator[OrderInfo]) => {
                MyESUtil.insertBulk(GmallConstant.ES_INDEX_ORDER, orderInfoIt.toList)
            })
        })

        ssc.start()
        ssc.awaitTermination()
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-05-17 19:16:20

results matching ""

    No results matching ""