大数据ClickHouse(十四):Integration系列表引擎

Integration系列表引擎

ClickHouse提供了许多与外部系统集成的方法,包括一些表引擎。这些表引擎与其他类型的表引擎类似,可以用于将外部数据导入到ClickHouse中,或者在ClickHouse中直接操作外部数据源。

一、HDFS

HDFS引擎支持ClickHouse 直接读取HDFS中特定格式的数据文件,目前文件格式支持Json,Csv文件等,ClickHouse通过HDFS引擎建立的表,不会在ClickHouse中产生数据,读取的是HDFS中的数据,将HDFS中的数据映射成ClickHouse中的一张表,这样就可以使用SQL操作HDFS中的数据。

ClickHouse并不能够删除HDFS上的数据,当我们在ClickHouse客户端中删除了对应的表,只是删除了表结构,HDFS上的文件并没有被删除,这一点跟Hive的外部表十分相似。

  • 语法:
ENGINE = HDFS(URI, format)

注意:URI是HDFS文件路径,format指定文件格式。HDFS文件路径中文件为多个时,可以指定成some_file_?,或者当数据映射的是HDFS多个文件夹下数据时,可以指定somepath/* 来指定URI

  • 其他配置:

由于HDFS配置了HA 模式,有集群名称,所以URI使用mycluster HDFS集群名称时,ClickHouse不识别,这时需要做以下配置:

  1. 将hadoop路径下$HADOOP_HOME/etc/hadoop下的hdfs-site.xml文件复制到/etc/clickhouse-server目录下。
  2. 修改/etc/init.d/clickhouse-server 文件,加入一行 “export LIBHDFS3_CONF=/etc/clickhouse-server/hdfs-site.xml”
  3. 重启ClickHouse-server 服务
serveice clickhouse-server restart

当然,这里也可以不做以上配置,在写HDFS URI时,直接写成对应的节点+端口即可。

  • 示例:
#在HDFS路径 hdfs://mycluster/ch/路径下,创建多个csv文件,写入一些数据
c1.csv文件内容:
  1,张三,19
2,李四,20
c2.csv文件内容:
  3,王五,21
4,马六,22

#创建表 t_hdfs,使用HDFS引擎
node1 :) create table t_hdfs(id UInt8,name String,age UInt8) engine = HDFS('hdfs://mycluster/ch/*.csv','CSV')

#查询表 t_hdfs中的数据
node1 :) select * from t_hdfs;
┌─id─┬─name─┬─age─┐
│  3 │ 王五 │  21 │
│  4 │ 马六 │  22 │
└────┴──────┴─────┘
┌─id─┬─name─┬─age─┐
│  1 │ 张三 │  19 │
│  2 │ 李四 │  20 │
└────┴──────┴─────┘

注意:这里表t_hdfs不会在clickhouse对应的节点路径下创建数据目录,同时这种表映射的是HDFS路径中的csv文件,不能插入数据,t_hdfs是只读表。

#创建表 t_hdfs2 文件 ,使用HDFS引擎
node1 :) create table t_hdfs2(id UInt8,name String,age UInt8) engine = HDFS('hdfs://mycluster/chdata','CSV');

#向表 t_hdfs2中写入数据
node1 :) insert into t_hdfs2 values(5,'田七',23),(6,'赵八',24);

#查询表t_hdfs2中的数据
node1 :) select * from t_hdfs2;
┌─id─┬─name─┬─age─┐
│  5 │ 田七 │  23 │
│  6 │ 赵八 │  24 │
└────┴──────┴─────┘

注意:t_hdfs2表没有直接映射已经存在的HDFS文件,这种表允许查询和插入数据。

二、MySQL

ClickHouse MySQL数据库引擎可以将MySQL某个库下的表映射到ClickHouse中,使用ClickHouse对数据进行操作。ClickHouse同样支持MySQL表引擎,即映射一张MySQL中的表到ClickHouse中,使用ClickHouse进行数据操作,与MySQL数据库引擎一样,这里映射的表只能做查询和插入操作,不支持删除和更新操作。

  • 语法:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
    ...
) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']);

以上语法的解释如下:

  • host:port - MySQL服务器名称和端口
  • database - MySQL 数据库。
  • table - 映射的MySQL中的表
  • user - 登录mysql的用户名
  • password - 登录mysql的密码
  • replace_query - 将INSERT INTO 查询是否替换为 REPLACE INTO 的标志,默认为0,不替换。当设置为1时,所有的insert into 语句更改为 replace into 语句。当插入的数据有重复主键数据时,此值为0默认报错,此值为1时,主键相同这条数据,默认替换成新插入的数据。
  • on_duplicate_clause - 默认不使用。当插入数据主键相同时,可以指定只更新某列的数据为新插入的数据,对应于on duplicate key 后面的语句,其他的值保持不变,需要replace_query 设置为0。
  • 示例:
#在mysql 中创建一张表 t_ch,指定id为主键
CREATE TABLE t_ch (
	id INT,
	NAME VARCHAR (255),
	age INT,
	PRIMARY KEY (id)
)

#向表中增加一些数据
insert into  t_ch values (1,"张三",18),(2,"李四",19),(3,"王五",20)

#在ClickHouse中创建MySQL引擎表 t_mysql_engine
node1 :) create table t_mysql_engine (
:-]  id UInt8,
:-]  name String,
:-]  age UInt8
:-] )engine = MySQL('node2:3306','test','t_ch','root','123456');

#查询ClickHouse表 t_mysql_engine 中的数据:
node1 :) select * from t_mysql_engine;
┌─id─┬─name─┬─age─┐
│  1 │ 张三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
└────┴──────┴─────┘

#在ClickHouse中向表 t_mysql_engine中插入一条数据
node1 :) insert into t_mysql_engine values (4,'马六','21');
┌─id─┬─name─┬─age─┐
│  1 │ 张三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
│  4 │ 马六 │  21 │
└────┴──────┴─────┘

#在ClickHouse中向表 t_mysql_engine中再插入一条数据,这里主键重复,报错。
node1 :) insert into t_mysql_engine values (4,'田七','22');
Exception: mysqlxx::BadQuery: Duplicate entry '4' for key
 'PRIMARY' (node2:3306).

注意:在clickhouse 中 t_mysql_engine表不会在ClickHouse服务器节点上创建数据目录。

  • 测试 replace_query :
#在mysql 中删除表 t_ch,重新创建,指定id为主键
CREATE TABLE t_ch (
	id INT,
	NAME VARCHAR (255),
	age INT,
	PRIMARY KEY (id)
)

#向表中增加一些数据
insert into  t_ch values (1,"张三",18),(2,"李四",19),(3,"王五",20)

#在ClickHouse中删除MySQL引擎表 t_mysql_engine,重建
node1 :) create table t_mysql_engine (
:-]  id UInt8,
:-]  name String,
:-]  age UInt8
:-] )engine = MySQL('node2:3306','test','t_ch','root','123456',1);

#查询ClickHouse表 t_mysql_engine 中的数据:
node1 :) select * from t_mysql_engine;
┌─id─┬─name─┬─age─┐
│  1 │ 张三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
└────┴──────┴─────┘

#在ClickHouse中向表 t_mysql_engine中插入一条数据,主键重复。这里由于指定了replace_query = 1 ,所以当前主键数据会被替换成新插入的数据。
node1 :) insert into t_mysql_engine values (3,'马六','21');

#查询ClichHouse t_mysql_engine表数据
node1 :) select * from t_mysql_engine;
┌─id─┬─name─┬─age─┐
│  1 │ 张三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 马六 │  21 │
└────┴──────┴─────┘
  • 测试 on_duplicate_clause:
#在mysql 中删除表 t_ch,重新创建,指定id为主键
CREATE TABLE t_ch (
	id INT,
	NAME VARCHAR (255),
	age INT,
	PRIMARY KEY (id)
)

#向表中增加一些数据
insert into  t_ch values (1,"张三",18),(2,"李四",19),(3,"王五",20)

#在ClickHouse中删除MySQL引擎表 t_mysql_engine,重建
node1 :) create table t_mysql_engine (
:-]  id UInt8,
:-]  name String,
:-]  age UInt8
:-] )engine = MySQL('node2:3306','test','t_ch','root','123456',0,'update age = values(age)');

#查询ClickHouse表 t_mysql_engine 中的数据:
node1 :) select * from t_mysql_engine;
┌─id─┬─name─┬─age─┐
│  1 │ 张三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
└────┴──────┴─────┘

#在ClickHouse 中向表 t_mysql_engine中插入一条数据
node1 :) insert into t_mysql_engine values (4,'马六','21');
┌─id─┬─name─┬─age─┐
│  1 │ 张三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
│  4 │ 马六 │  21 │
└────┴──────┴─────┘

#在ClickHouse中向表 t_mysql_engine中插入一条数据,主键重复。
node1 :) insert into t_mysql_engine values (4,'田七','100');

#查询ClichHouse t_mysql_engine表数据
node1 :) select * from t_mysql_engine;
┌─id─┬─name─┬─age─┐
│  1 │ 张三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
│  4 │ 马六 │ 100 │
└────┴──────┴─────┘

三、​​​​​​​​​​​​​​Kafka

ClickHouse中还可以创建表指定为Kafka为表引擎,这样创建出的表可以查询到Kafka中的流数据。对应创建的表不会将数据存入ClickHouse中,这里这张kafka引擎表相当于一个消费者,消费Kafka中的数据,数据被查询过后,就不会再次被查询到。

  • 语法:
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]

对以上参数的解释:

  • kafka_broker_list: 以逗号分隔的Kafka Broker节点列表
  • kafka_topic_list : topic列表
  • kafka_group_name : kafka消费者组名称
  • kafka_format : Kafka中消息的格式,例如:JSONEachRow、CSV等等,具体参照https://clickhouse.tech/docs/en/interfaces/formats/。这里一般使用JSONEachRow格式数据,需要注意的是,json字段名称需要与创建的Kafka引擎表中字段的名称一样,才能正确的映射数据。
  • 示例:
#创建表 t_kafka_consumer ,使用Kafka表引擎
node1 :) create table t_kafka_consumer (
:-] id UInt8,
:-] name String,
:-] age UInt8
:-] ) engine = Kafka()
:-] settings 
:-] kafka_broker_list='node1:9092,node2:9092,node3:9092',
:-] kafka_topic_list='ck-topic',
:-] kafka_group_name='group1',
:-] kafka_format='JSONEachRow';

#启动kafka,在kafka中创建ck-topic topic,并向此topic中生产以下数据:
创建topic:
kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic ck-topic --partitions 3 --replication-factor 3

生产数据:
kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic ck-topic

生产数据如下:
{"id":1,"name":"张三","age":18}
{"id":2,"name":"李四","age":19}
{"id":3,"name":"王五","age":20}
{"id":4,"name":"马六","age":21}
{"id":5,"name":"田七","age":22}

#在ClickHouse中查询表 t_kafka_consumer数据,可以看到生产的数据
node1 :) select * from t_kafka_consumer;
┌─id─┬─name─┬─age─┐
│  2 │ 李四 │  19 │
│  5 │ 田七 │  22 │
│  1 │ 张三 │  18 │
│  4 │ 马六 │  21 │
│  3 │ 王五 │  20 │
└────┴──────┴─────┘

注意:再次查看表 t_kafka_consumer数据 ,我们发现读取不到任何数据,这里对应的ClikcHouse中的Kafka引擎表,只是相当于是消费者,消费读取Kafka中的数据,数据被消费完成之后,不能再次查询到对应的数据。

以上在ClickHouse中创建的Kafka引擎表 t_kafka_consumer 只是一个数据管道,当查询这张表时就是消费Kafka中的数据,数据被消费完成之后,不能再次被读取到。如果想将Kafka中topic中的数据持久化到ClickHouse中,我们可以通过物化视图方式访问Kafka中的数据,可以通过以下三个步骤完成将Kafka中数据持久化到ClickHouse中:

  1. 创建Kafka 引擎表,消费kafka中的数据。
  2. 再创建一张ClickHouse中普通引擎表,这张表面向终端用户查询使用。这里生产环境中经常创建MergeTree家族引擎表。
  3. 创建物化视图,将Kafka引擎表数据实时同步到终端用户查询表中。

示例:

#在ClickHouse中创建 t_kafka_consumer2 表,使用Kafka引擎
node1 :) create table t_kafka_consumer2 (
:-] id UInt8,
:-] name String,
:-] age UInt8
:-] ) engine = Kafka()
:-] settings 
:-] kafka_broker_list='node1:9092,node2:9092,node3:9092',
:-] kafka_topic_list='ck-topic',
:-] kafka_group_name='group1',
:-] kafka_format='JSONEachRow';

#在ClickHouse中创建一张终端用户查询使用的表,使用MergeTree引擎
node1 :) create table t_kafka_mt(
:-] id UInt8,
:-] name String,
:-] age UInt8
:-] ) engine = MergeTree()
:-] order by id;

#创建物化视图,同步表t_kafka_consumer2数据到t_kafka_mt中
node1 :) create materialized view  view_consumer to t_kafka_mt 
:-] as select id,name,age from t_kafka_consumer2;

注意:物化视图在ClickHouse中也是存储数据的,create materialized view view_consumer to t_kafka_mt 语句是将物化视图view_consumer中的数据存储到到对应的t_kafka_mt 表中,这样同步的目的是如果不想继续同步kafka中的数据,可以直接删除物化视图即可。

#向Kafka ck-topic中生产以下数据:
生产数据:
kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic ck-topic

生产数据如下:
{"id":1,"name":"张三","age":18}
{"id":2,"name":"李四","age":19}
{"id":3,"name":"王五","age":20}
{"id":4,"name":"马六","age":21}
{"id":5,"name":"田七","age":22}


#查询表 t_kafka_mt中的数据,数据同步完成。
node1 :) select * from t_kafka_mt;
┌─id─┬─name─┬─age─┐
│  1 │ 张三 │  18 │
│  2 │ 李四 │  19 │
│  3 │ 王五 │  20 │
│  4 │ 马六 │  21 │
│  5 │ 田七 │  22 │
└────┴──────┴─────┘
本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
大数据ClickHouse(十四):Integration系列表引擎
ClickHouse提供了许多与外部系统集成的方法,包括一些表引擎。这些表引擎与其他类型的表引擎类似,可以用于将外部数据导入到ClickHouse中,或者在Cl...
<<上一篇
下一篇>>