Flink 实践教程:入门3-读取 MySQL 数据

Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何取 MySQL 数据,经过流计算 Oceanus 实时计算引擎分析,输出数据到日志(Logger Sink)当中。

Flink 读取 MySQL 数据

前置准备

创建 Oceanus 集群

活动购买链接 1 元购买 Oceanus 集群

进入 Oceanus 控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群

创建 Mysql 实例

进入 MySQL 控制台,点击【新建】。具体可参考官方文档 创建 MySQL 实例。然后在【数据库管理】> 【参数设置】中设置参数 binlog_row_image=FULL,便于使用 CDC(Capture Data Change)特性,实现数据的变更实时捕获。

!创建 Oceanus 集群和 MySQL 实例时所选 VPC 必须是同一 VPC。

Oceanus 作业

1. 创建 Source

CREATE TABLE `MySQLSourceTable` (
    `id` INT,
    `name` VARCHAR,
    PRIMARY KEY (`id`) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义
) WITH (
    'connector' = 'mysql-cdc',       -- 必须为 'mysql-cdc'
    'hostname' = '10.0.0.158',       -- 数据库的 IP
    'port' = '3306',                 -- 数据库的访问端口
    'username' = 'root',             -- 数据库访问的用户名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 权限)
    'password' = 'xxxxxxxxxx',       -- 数据库访问的密码
    'database-name' = 'testdb',      -- 需要同步的数据库
    'table-name' = 'student'         -- 需要同步的数据表名
);

2. 创建 Sink

CREATE TABLE CustomSink ( 
  id INT,
  name VARCHAR
) WITH ( 
    'connector' = 'logger',
    'print-identifier' = 'DebugData'
);

3. 编写业务 SQL

INSERT INTO CustomSink
SELECT * FROM MySQLSourceTable;

4. 运行作业

点击【保存】>【发布草稿】运行作业。查看Flink UI Taskmanger 日志,观察全量数据是否正常打印到日志。

5. 验证 MySQL-CDC 特性

在 MySQL 中新增一条数据,然后在 Flink UI Taskmanger 日志中观察结果,观察新增的数据是否正常打印到日志。

在 MySQL 中修改和删除记录同样会更新到 Logger Sink中,并打印输出。

总结

1、Mysql CDC 支持对 MySQL 数据库的全量和增量读取,并保证 Exactly Once 语义。MySQL CDC 底层使用了 Debezium 来做 CDC(Change Data Capture),其工作特性可参考 数据库 MySQL CDC

2、输入到 Logger Sink 的数据, 会通过日志打印出来,便于调试。Logger Jar 包下载地址:https://cloud.tencent.com/document/product/849/58713

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
Flink 实践教程:入门3-读取 MySQL 数据
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点...
<<上一篇
下一篇>>