基于OGG和Sqoop的TBDS接入方案系列-总体方案

导语:本文为系列文章《基于OGG和Sqoop的TBDS接入方案系列》的第一篇,后两篇文章的传送门如下:

第二篇:《基于OGG和Sqoop的TBDS接入方案系列-Sqoop与腾讯大数据套件TBDS的集成示例介绍

第三篇:《基于OGG和Sqoop的TBDS接入方案系列-数据合并方案

本文主要讨论了一个如何利用OGG和Sqoop,将源系统为Oracle的数据接入腾讯大数据套件TBDS的Hadoop/Hive中的解决方案。

1. 概述

1.1. 腾讯大数据套件(TBDS)介绍

腾讯大数据处理套件(Tencent Big Data Suite,以下简称TBDS)是一套针对私有化场景提供的全功能型大数据处理平台。为客户提供按需部署大数据处理服务实现数据处理需求,例如数据接入、数据分析、数据提取、机器学习、报表展示、客户画像等大数据应用的能力。目前TBDS已经应用到工业、政务、互联网金融、公安等领域的客户生产环境。

1.2. 背景

由于历史原因,传统的行业(如电力、能源)大多使用传统的关系型数据库作为底层数据存储的工具。但是在大数据时代的今天,随着业务的不断发展以及随之而来的数据快速增长,传统的关系型数据库面对大数据的存储、处理、分析等挑战都显得力不从心,因此大数据平台应运而生,解决了大数据存储和处理等的难题。

在大多数客户的业务场景下,普遍存在一种基本的需求,即将其多个业务系统的数据统一集中到大数据平台进行存储、管理、处理、共享等。除TBDS本身具备数据接入功能外,本文档针对RDBMS(如Oracle数据库)作为数据源的情况下,探讨另一种如何离线和实时的讲Oracle数据源的数据接入到TBDS大数据平台并主要以Hive表的形式存在的解决方案,另外针对实时接入的方式,还提供了一种数据合并的方案,保证数据完整性和一致性。

2. 离线数据接入

离线数据的接入,本文考虑使用Sqoop工具,详细的介绍如下。

2.1. Sqoop介绍

Sqoop 是Apache 旗下一款“Hadoop 和关系数据库服务器之间传送数据”的工具。正如其名字所示,Sqoop意味着SQL–to–Hadoop,是一个用来将关系型数据库和Hadoop中的数据进行相互转移的工具,可以将一个关系型数据库(例如Mysql、Oracle)中的数据导入到Hadoop(例如HDFS、Hive、Hbase)中,也可以将Hadoop(例如HDFS、Hive、Hbase)中的数据导入到关系型数据库(例如Mysql、Oracle)中。如下图所示:

2.2. Sqoop架构及原理

本文主要介绍Sqoop 1.X,其整个架构是比较简单的,主要包含:

  • User(用户)
  • Sqoop Client(Sqoop工具本身)
  • Database Table(RDBMS系统)
  • HDFS/Hive/HBase(Hadoop系统)

Sqoop执行Import命令从RDBMS中导入数据到Hadoop系统的原理简单阐述如下:

  1. 用户向 Sqoop 发起Import命令之后,这个命令会转换为一个基于 Map任务且仅有Map任务的 MapReduce 作业;
  2. 在导数开始之前,Sqoop会使用JDBC连接到相应的源数据库,Map Task 会访问数据库的元数据信息(schema、table、field、field type等);
  3. 随后Sqoop通过并行的 Map任务将数据库的数据读取出来,然后导入 Hadoop 中,完成数据的接入。

Sqoop执行Export命令从Hadoop系统中导入数据到RDBMS的原理与Import类似,大致也是以下几个步骤:

  1. 用户向 Sqoop 发起Export命令之后,这个命令会转换为一个基于 Map任务且仅有Map任务的 MapReduce 作业;
  2. 在导数开始前,Sqoop会去获取导出表(如Hive表)的schema、meta信息,和Hadoop中的字段一一对应上;
  3. 多个map任务同时并行运行,完成hdfs中数据导出到关系型数据库中。

2.3. Sqoop环境部署

虽然TBDS的组件中不包含Sqoop,但得益于TBDS的开放性,Sqoop可以非常容易和方便地通过安装rpm包以及配置的形式,集成到TBDS大数据平台上。

具体的环境部署操作指南,请参考《基于OGG和Sqoop的TBDS接入方案系列-Sqoop与腾讯大数据套件TBDS的集成示例介绍》。

2.4. 数据导入方式

目前Sqoop支持两种数据导入模式:

  • 全量数据导入
  • 增量数据导入

2.5. 全量数据导入

全量数据导入指的是将关系型数据库中的某张表,通过查询过滤得到需要导入的数据后,一次性将其导入到大数据平台中。

例如上图所示,源数据库中有一张表,在当前时刻有5条记录,需要将该表的5条记录即所有的数据都一次性导入到Hadoop系统中。

全量数据导入的方式主要的适用场景为一次性离线分析场景,其主要使用sqoop import命令,示例如下:

# 全量数据导入
sqoop import \\
 --connect jdbc:mysql://10.0.0.1:3306/mydb \\
 --username root \\
 --password 123456 \\
 --query “select * from tab_account where \\$CONDITIONS” \\
 --target-dir /user/aron/tmpdata/tab_account_initial \\ 
 --fields-terminated-by “,” \\
 --hive-drop-import-delims \\
 --null-string “\\\\N” \\
 --null-non-string “\\\\N” \\
 --split-by id \\
 -m 6 \\

其中比较重要参数说明如下:

参数

说明

--query

SQL查询语句

--target-dir

HDFS目标目录(确保目录不存在,否则会报错,因为Sqoop在导入数据至HDFS时会自己在HDFS上创建目录)

--hive-drop-import- delims

删除数据中包含的Hive默认分隔符(^A, ^B, \\n)

--null-string

string类型空值的替换符(Hive中Null用\\n表示)

--null-non-string

非string类型空值的替换符

--split-by

数据切片字段(int类型,m>1时必须指定)

-m

Mapper任务数,默认为4

更多的参数,可参阅官方文档:

http://sqoop.apache.org/docs/1.4.5/SqoopUserGuide.html#_literal_sqoop_import_literal

2.6. 增量数据导入

在客户实际的生产环境中,相对全量导入后的数据来说,源系统必然会产生增量的数据,而每次都使用全量数据导入的方式显然是不现实的,在资源的开销和数据同步的效率等多方面都存在不少问题。业界比较通用的做法是,在导入一次全量数据后,通过定期从业务系统中导入增量数据,达到数仓离线分析的使用场景需求。

例如上图所示,前文提及的源数据库中的同一张表,在全量导入过后的某一时刻从原来的5条记录,增长到了7条记录,那么此时如果希望Hadoop系统中的数据与源系统保持一致,则只需要将新增长的2条记录增量同步到Hadoop系统中,而不必再次全量导入7条数据。

目前Sqoop支持的增量数据导入主要有两种,一是基于递增列的增量数据导入(Append方式),二是基于时间列的增量数据导入(LastModified方式)。

几个核心的参数如下:

  • --check-column

用来指定一些列,这些列在增量导入时用来检查这些数据是否作为增量数据进行导入,和关系型数据库中的自增字段及时间戳类似.

注意:这些被指定的列的类型不能使任意字符类型,如char、varchar等类型都是不可以的,同时–check-column可以去指定多个列

  • --incremental

用来指定增量导入的模式,两种模式分别为Append和Lastmodified

  • --last-value

指定上一次导入中检查列指定字段最大值

2.6.1. Append方式

Append方式适用于源表含有一个数据类型为整型的自增列的场景。

例如,有一个订单交易表,对于每一条订单,均有唯一的标识自增列T_ID,在关系型数据库中以主键形式存在。在之前,已经通过全量导入的方式,将T_ID在1-100范围之间的订单数据全部入库到Hive表中,后续的增量数据,比如T_ID从101-200范围之间的订单数据,需要导入到Hadoop系统中,则此时只需指定--incremental 参数为append,--last-value参数为100即可,后者表示只需要从T_ID大于100后的数据开始导入。

Sqoop import导入示例如下,

# Append方式的全量数据导入
 sqoop import \\
 --connect jdbc:mysql://10.0.0.1:3306/mydb \\
 --username root \\
 --password 123456 \\
 --query “select T_ID, T_ACCOUNT from tab_account where \\$CONDITIONS” \\
 --target-dir /user/aron/tmpdata/tab_account_delta \\ 
 --split-by order_id \\
 -m 6 \\
 --incremental append \\
 --check-column order_id \\
 --last-value 100

其中比较重要参数说明如下:

参数

说明

--incremental append

基于递增列的增量导入(将递增列值大于阈值的所有数据增量导入Hadoop)

--check-column

递增列(int)

--last-value

阈值(int)

2.6.2. LastModified方式

LastModified方式适用于源表中含有一个数据类型为时间戳的列的场景,例如,有一个订单交易表,对于每一条订单,均有一列T_TIME标识其插入到该表的时间,同时因为后续订单可能状态会变化,比如订单号T_ID为1的,新的时间戳T_TIME相比之前已经更新了列T_ACCOUNT,此时Sqoop依然会将相同状态更改后的订单导入HDFS,可以指定merge-key参数为T_ID,表示将后续新的记录与原有记录合并。

Sqoop import导入示例如下:

# 将时间列大于等于阈值的数据增量导入HDFS
 sqoop import \\
 --connect jdbc:mysql://10.0.0.1:3306/mydb \\
 --username root \\
 --password 123456 \\
 --query “select T_ID, T_ACCOUNT from tab_account where \\$CONDITIONS” \\
 --target-dir /user/aron/tmpdata/tab_account_delta2 \\ 
 --split-by id \\
 -m 4 \\
 --incremental lastmodified \\
 --merge-key T_ID \\
 --check-column T_TIME \\
 --last-value “2018-12-11 20:00:00” 

其中比较重要参数说明如下:

参数

说明

--incremental lastmodified

基于时间列的增量导入(将时间列大于等于阈值的所有数据增量导入Hadoop)

--check-column

时间列(int)

--last-value

阈值(int)

--merge-key

合并列(主键,合并键值相同的记录)

2.7. Demo示例

一个应用场景是将Oracle数据库中的数据,以离线的形式,全量和/或增量的导入到TBDS大数据平台的HDFS上。具体的操作示例,请参考《基于OGG和Sqoop的TBDS接入方案系列-Sqoop与腾讯大数据套件TBDS的集成示例介绍》。

3. 实时数据导入

实时数据的接入,对于源数据库为Oracle的情况下,本文考虑使用OGG工具,详细的介绍如下。

3.1. OGG介绍

Oracle Golden Gate是Oracle旗下一款支持异构平台之间高级复制技术,提供异构环境下交易数据的实时捕捉、变换、投递。

3.2. OGG架构及其原理

和传统的逻辑复制一样,OGG的实现原理是抽取源端的redo log和archive log,然后通过TCP/IP协议投递到目标端,最后解析还原同步到目标端,使目标端实现源端的数据同步。

利用OGG将数据从Oracle实时传输到Hadoop集群(HDFS,Hive,Kafka等)的基本原理如图:

其中组件说明:

  • Manager进程:ogg的控制进程;
  • Extract进程:运行在数据库的源端,负责从源端数据表或者日志中捕获数据 ;
  • Pump进程:运行在数据库源端,如果源端使用了本地的trail文件,那么Pump进程就会把trail以数据块的形式通过TCP/IP协议发送到目标端;
  • trail文件:Extract抽完数据以后OG会将抽取的事务信息转化为一种GoldenGate专有格式的文件,然后Pump负责把远端的trail文件投递到目标端,所以源、目标两端都会存在着文件,远端存放的tail文件叫远程trail文件;
  • Replicat进程:运行在目标端,是数据传递的最后一站,负责读取目标端的trail文件中得内容,并将其解析为DMl/DDL语句,然后应用到目标数据库中。

根据如上原理,配置大概分为如下步骤:

  1. 源端目标端配置ogg管理器(mgr);
  2. 源端配置extract进程进行Oracle日志抓取;
  3. 源端配置pump进程传输抓取内容到目标端;
  4. 目标端配置replicate进程复制日志到Hadoop集群或者复制到用户自定义的解析器将最终结果落入到Hadoop集群。

3.3. OGG环境部署

OGG的环境部署包括源端和目标端的安装和配置,具体的操作指南请参见《基于 OGG 的 Oracle 与 Hadoop 集群准实时同步介绍》。

3.4. 数据导入方式

由于OGG是基于日志将变化的数据实时的同步到目标系统中,因此可看做是增量同步的方式,而OGG不能满足全量数据的导入的要求。因此对于数据的初始化导入,可以考虑使用Sqoop全量数据导入的方式先完成;对于增量部分数据的实时导入,再利用OGG来完成。

3.5. Demo示例

一个应用场景是将Oracle数据库中的数据,以实时的形式,将增量部分的数据导入到TBDS大数据平台的HDFS上,具体的操作示例请参见《基于 OGG 的 Oracle 与 Hadoop 集群准实时同步介绍》。

4. 数据合并

该部分有一个Demo示例操作步骤指引,请参考《基于OGG和Sqoop的TBDS接入方案系列-数据合并方案》。

4.1. 数据重复问题

综合上文,我们可以采用以下方案将数据导入TBDS中:

  • 采用OGG工具完成数据的实时导入增量数据
  • 采用Sqoop完成数据的离线导入全量数据

但由于数据的全量数据与增量数据使用了不同的工具,且两者之间的在时间上几乎能肯定是不会无缝衔接的,例如考虑以下情况:

在T=2019-06-26 09:00:00时刻,存在一张表结构和数据如下,其中第一列仅为了说明数据插入表的时刻,实际不存在:

数据插入表的时间戳

ID

2019-06-12 01:00:00

1

Aron

2019-06-12 01:05:00

2

Ben

2019-06-12 01:10:00

3

Cindy

2019-06-12 01:15:00

4

David

2019-06-12 01:20:00

5

Emma

在T=2019-06-26 09:05:00时刻,我们开启了OGG从Oracle表到TBDS中Hive表的数据同步,由于前5条数据插入时间久远,因此暂时不会有数据同步到Hive表。

接下来,在T=2019-06-26 09:00:00时刻,我们插入一条数据:

数据插入表的时间戳

ID

2019-06-12 01:00:00

1

Aron

2019-06-12 01:05:00

2

Ben

2019-06-12 01:10:00

3

Cindy

2019-06-12 01:15:00

4

David

2019-06-12 01:20:00

5

Emma

2019-06-26 09:00:00

6

Frank

这时候由于OGG是实时同步,第6条数据会同步到Hive表中:

数据插入表的时间戳

ID

2019-06-26 09:00:00

6

Frank

接下来几乎在T=2019-06-26 09:00:00比OGG启动稍后一点点的时刻,我们立马开启Sqoop导入全量数据到Hive表,此时Hive表数据如下:

数据插入表的时间戳

ID

2019-06-26 09:00:00

6

Frank

2019-06-12 01:00:00

1

Aron

2019-06-12 01:05:00

2

Ben

2019-06-12 01:10:00

3

Cindy

2019-06-12 01:15:00

4

David

2019-06-12 01:20:00

5

Emma

2019-06-26 09:00:00

6

Frank

可以发现,由于增量同步与全量同步的时间点衔接几乎不可能重合,因此会出现数据重复的现象,因此我们需要一个数据合并的方案。

4.2. 数据合并原理

数据合并的原理概述如下:

1) OGG同步数据到HDFS后,数据格式为:

操作符[分隔符]数据库.表名[分隔符]操作时间戳(GMT+0)[分隔符]当前时间戳(GMT+8) [分隔符]偏移量[分隔符]字段1名称[分隔符]字段1内容[分隔符]字段2名称[分隔符]字段2内容[…])

其中操作时间戳为每一行数据在源表中插入的时间戳,因此可以利用该列,按主键来提取增量数据中每一个不同主键的最新时间的记录,而要求是Oracle源表中的数据具备物理主键或业务主键其一。

2) 提取出增量数据中每一个不同主键的最新时间的记录后(暂称为增量临时数据),与全量数据进行合并,合并的原则是

  • 全量数据中,按主键提取不存在其对应增量数据中的所有记录;
  • 增量临时数据中,操作为D(删除)的增量数据不提取,因为该部分数据已标识为删除;
  • 增量临时数据中,操作为I(插入)、U(更新)的增量数据全部提取,因为该部分数据已为最新。
本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
基于OGG和Sqoop的TBDS接入方案系列-总体方案
腾讯大数据处理套件(Tencent Big Data Suite,以下简称TBDS)是一套针对私有化场景提供的全功能型大数据处理平台。为客户提供按需部署大数据处...
<<上一篇
下一篇>>