10.1 从 Canal 读取数据

创建子项目: gmall-canal

10.1.1 添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>gmall</artifactId>
        <groupId>com.atguigu.dw</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>gmall-canal</artifactId>
    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
        <!--canal 客户端, 从 canal 服务器读取数据-->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <!-- kafka 客户端 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.atguigu.dw</groupId>
            <artifactId>gmall-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

10.1.2 从 Canal 服务读取数据的客户端

package com.atguigu.dw.gamallcanal

import java.net.InetSocketAddress

import com.alibaba.otter.canal.client.{CanalConnector, CanalConnectors}
import com.alibaba.otter.canal.protocol.CanalEntry.{EntryType, RowChange}
import com.alibaba.otter.canal.protocol.{CanalEntry, Message}
import com.atguigu.dw.gamallcanal.util.CanalHandler
import com.google.protobuf.ByteString

/**
  * Author lzc
  * Date 2019/5/17 3:22 PM
  */
object CanalClient {
    def main(args: Array[String]): Unit = {
        // 1. 创建能连接到 Canal 的连接器对象
        val connector: CanalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop201", 11111), "example", "", "")
        // 2. 连接到 Canal
        connector.connect()
        // 3. 监控指定的表的数据的变化
        connector.subscribe("gmall.order_info")
        while (true) {
            // 4. 获取消息  (一个消息对应 sql 语句的执行)
            val msg: Message = connector.get(100) // 一次最多获取 100 条 sql
            // 5. 个消息对应多行数据发生了变化, 一个 entry 表示一行数据
            val entries: java.util.List[CanalEntry.Entry] = msg.getEntries
            import scala.collection.JavaConversions._
            if (entries.size() > 0) {
                // 6. 遍历每行数据
                for (entry <- entries) {
                    // 7. EntryType.ROWDATA 只对这样的 EntryType 左处理
                    if (entry.getEntryType == EntryType.ROWDATA) {
                        // 8. 获取到这行数据, 但是这种数据不是字符串, 所以要解析
                        val value: ByteString = entry.getStoreValue
                        val rowChange: RowChange = RowChange.parseFrom(value)
                        // 9.定义专门处理的工具类: 参数 1 表名, 参数 2 事件类型(插入, 删除等), 参数 3: 具体的数据
                        CanalHandler.handle(entry.getHeader.getTableName, rowChange.getEventType, rowChange.getRowDatasList)
                    }
                }

            } else {
                println("没有抓取到数据...., 2s 之后重新抓取")
                Thread.sleep(2000)
            }
        }

    }
}

10.1.3 专门处理数据的工具类CanalHandler

package com.atguigu.dw.gamallcanal.util

import java.util

import com.alibaba.otter.canal.protocol.CanalEntry
import com.alibaba.otter.canal.protocol.CanalEntry.{EventType, RowData}

/**
  * Author lzc
  * Date 2019/5/17 4:09 PM
  */
object CanalHandler {
    /**
      * 处理从 canal 取来的数据
      *
      * @param tableName   表名
      * @param eventType   事件类型
      * @param rowDataList 数据类别
      */
    def handle(tableName: String, eventType: EventType, rowDataList: util.List[RowData]) = {
        import scala.collection.JavaConversions._
        if ("order_info" == tableName && eventType == EventType.INSERT && rowDataList.size() > 0) {
            // 1. rowData 表示一行数据, 通过他得到每一列. 首先遍历每一行数据
            for (rowData <- rowDataList) {
                // 2. 得到每行中, 所有列组成的列表
                val columnList: util.List[CanalEntry.Column] = rowData.getAfterColumnsList
                for (column <- columnList) {
                    // 3. 得到列名和列值
                    println(column.getName + ":" + column.getValue)
                }
            }
        }
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-05-17 17:17:31

results matching ""

    No results matching ""