6.2.5 foreach sink
foreach sink 会遍历表中的每一行, 允许将流查询结果按开发者指定的逻辑输出.
把 wordcount 数据写入到 mysql
步骤 1: 添加 mysql 驱动
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
步骤 2: 在 mysql 中创建数据库和表
create database ss;
use ss;
create table word_count(word varchar(255) primary key not null, count bigint not null);
步骤 3: 实现代码
package com.atguigu.ss
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
/**
* Author lzc
* Date 2019/8/14 7:39 PM
*/
object ForeachSink {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("ForeachSink")
.getOrCreate()
import spark.implicits._
val lines: DataFrame = spark.readStream
.format("socket") // 设置数据源
.option("host", "hadoop201")
.option("port", 10000)
.load
val wordCount: DataFrame = lines.as[String]
.flatMap(_.split("\\W+"))
.groupBy("value")
.count()
val query: StreamingQuery = wordCount.writeStream
.outputMode("update")
// 使用 foreach 的时候, 需要传递ForeachWriter实例, 三个抽象方法需要实现. 每个批次的所有分区都会创建 ForeeachWriter 实例
.foreach(new ForeachWriter[Row] {
var conn: Connection = _
var ps: PreparedStatement = _
var batchCount = 0
// 一般用于 打开链接. 返回 false 表示跳过该分区的数据,
override def open(partitionId: Long, epochId: Long): Boolean = {
println("open ..." + partitionId + " " + epochId)
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection("jdbc:mysql://hadoop201:3306/ss", "root", "aaa")
// 插入数据, 当有重复的 key 的时候更新
val sql = "insert into word_count values(?, ?) on duplicate key update word=?, count=?"
ps = conn.prepareStatement(sql)
conn != null && !conn.isClosed && ps != null
}
// 把数据写入到连接
override def process(value: Row): Unit = {
println("process ...." + value)
val word: String = value.getString(0)
val count: Long = value.getLong(1)
ps.setString(1, word)
ps.setLong(2, count)
ps.setString(3, word)
ps.setLong(4, count)
ps.execute()
}
// 用户关闭连接
override def close(errorOrNull: Throwable): Unit = {
println("close...")
ps.close()
conn.close()
}
})
.start
query.awaitTermination()
}
}