10.2 读取的数据发送到 Kafka
10.2.1 实现 Kafka 生产者
package com.atguigu.dw.gamallcanal.util
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
/**
* Author lzc
* Date 2019/5/17 4:45 PM
*/
object MyKafkaSender {
val props = new Properties()
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9093")
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
def sendToKafka(topic: String, content: String) = {
producer.send(new ProducerRecord[String, String](topic, content))
}
}
10.2.2 使用生产者向 Kafka 生成数据
考虑到将来存储到 ES 中的数据是 Json 格式, 所以, 我们在 Kafka 存储的的时候也存储为 Json 格式的.
改写工具类: CanalHandler
package com.atguigu.dw.gamallcanal.util
import java.util
import com.alibaba.fastjson.JSONObject
import com.alibaba.otter.canal.protocol.CanalEntry
import com.alibaba.otter.canal.protocol.CanalEntry.{EventType, RowData}
import com.atguigu.dw.gmall.common.constant.GmallConstant
/**
* Author lzc
* Date 2019/5/17 4:09 PM
*/
object CanalHandler {
/**
* 处理从 canal 取来的数据
*
* @param tableName 表名
* @param eventType 事件类型
* @param rowDataList 数据类别
*/
def handle(tableName: String, eventType: EventType, rowDataList: util.List[RowData]) = {
import scala.collection.JavaConversions._
if ("order_info" == tableName && eventType == EventType.INSERT && rowDataList.size() > 0) {
val obj: JSONObject = new JSONObject()
// 1. rowData 表示一行数据, 通过他得到每一列. 首先遍历每一行数据
for (rowData <- rowDataList) {
// 2. 得到每行中, 所有列组成的列表
val columnList: util.List[CanalEntry.Column] = rowData.getAfterColumnsList
for (column <- columnList) {
// 3. 得到列名和列值
// key下划线转成驼峰
val newColumn: String = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName)
obj.put(newColumn, column.getValue)
}
}
// 4. 发送到 Kafka
MyKafkaSender.sendToKafka(GmallConstant.TOPIC_ORDER, obj.toJSONString)
}
}
}