11.2 写入数据到 ES
11.2.1 规划索引
PUT gmall_order
{
"mappings" : {
"_doc" : {
"properties" : {
"provinceId" : {
"type" : "keyword"
},
"consignee" : {
"type" : "keyword",
"index":false
},
"consigneeTel" : {
"type" : "keyword",
"index":false
},
"createDate" : {
"type" : "keyword"
},
"createHour" : {
"type" : "keyword"
},
"createHourMinute" : {
"type" : "keyword"
},
"createTime" : {
"type" : "keyword"
},
"deliveryAddress" : {
"type" : "keyword"
},
"expireTime" : {
"type" : "keyword"
},
"id" : {
"type" : "keyword"
},
"imgUrl" : {
"type" : "keyword",
"index":false
},
"operateTime" : {
"type" : "keyword"
},
"orderComment" : {
"type" : "keyword",
"index":false
},
"orderStatus" : {
"type" : "keyword"
},
"outTradeNo" : {
"type" : "keyword",
"index":false
},
"parentOrderId" : {
"type" : "keyword"
},
"paymentWay" : {
"type" : "keyword"
},
"totalAmount" : {
"type" : "double"
},
"trackingNo" : {
"type" : "keyword"
},
"tradeBody" : {
"type" : "keyword",
"index":false
},
"userId" : {
"type" : "keyword"
}
}
}
}
}
11.2.2 写数据到 ES
OrderInfo.scala
package com.atguigu.dw.gmall.realtime.bean
import java.text.SimpleDateFormat
case class OrderInfo(area: String,
var consignee: String,
orderComment: String,
var consigneeTel: String,
operateTime: String,
orderStatus: String,
paymentWay: String,
userId: String,
imgUrl: String,
totalAmount: Double,
expireTime: String,
deliveryAddress: String,
createTime: String,
trackingNo: String,
parentOrderId: String,
outTradeNo: String,
id: String,
tradeBody: String) {
private val date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(createTime)
val createDate: String = new SimpleDateFormat("yyyy-MM-dd").format(date)
val createHour: String = new SimpleDateFormat("HH").format(date)
val createHourMinute: String = new SimpleDateFormat("HH:mm").format(date)
}
OrderApp
package com.atguigu.dw.gmall.realtime.app
import com.alibaba.fastjson.JSON
import com.atguigu.dw.gmall.common.constant.GmallConstant
import com.atguigu.dw.gmall.common.util.MyESUtil
import com.atguigu.dw.gmall.realtime.bean.OrderInfo
import com.atguigu.dw.gmall.realtime.util.MyKafkaUtil
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object OrderApp {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("OrderApp")
val ssc = new StreamingContext(conf, Seconds(2))
val inputDSteam: InputDStream[(String, String)] = MyKafkaUtil.getKafkaStream(ssc, GmallConstant.TOPIC_ORDER)
val orderInfoDStream: DStream[OrderInfo] = inputDSteam.map {
case (_, jsonString) => {
val orderInfo: OrderInfo = JSON.parseObject(jsonString, classOf[OrderInfo])
orderInfo.consignee =
orderInfo.consignee.substring(0, 1) + "**"
orderInfo.consigneeTel =
orderInfo.consigneeTel.substring(0, 3) + "****" + orderInfo.consigneeTel.substring(7, 11)
orderInfo
}
}
orderInfoDStream.foreachRDD(rdd => {
rdd.foreachPartition((orderInfoIt: Iterator[OrderInfo]) => {
MyESUtil.insertBulk(GmallConstant.ES_INDEX_ORDER, orderInfoIt.toList)
})
})
ssc.start()
ssc.awaitTermination()
}
}