Oceanus使用自定义Connector指南
实时即未来,最近在腾讯云Oceanus进行Flink实时计算服务,以下为使用自定义Connector的实践。分享给大家~
支持自定义conncetor操作说明
- 在腾讯云Oceanus页面-->程序包管理页面--> 选择新建程序包,上传自己jar包。 上传时选择与自己环境对应的区域。 如:flink-connector-kudu.jar
- 在作业管理页面新建作业 --> 开发调试 --> 作业参数 -->引用程序包,选择刚刚上传的jar包,注意选择对应的版本,并保存。
- 在开发调试页面编写对应的Flink SQL --> 发布运行。
SQL示例
-- Datagen Connector 可以随机生成一些数据用于测试
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html
CREATE TABLE random_source (
f_sequence INT,
f_random INT,
f_random_str VARCHAR
) WITH (
'connector' = 'datagen',
'rows-per-second'='1', -- 每秒产生的数据条数
'fields.f_sequence.kind'='sequence', -- 有界序列(结束后自动停止输出)
'fields.f_sequence.start'='1', -- 序列的起始值
'fields.f_sequence.end'='10000', -- 序列的终止值
'fields.f_random.kind'='random', -- 无界的随机数
'fields.f_random.min'='1', -- 随机数的最小值
'fields.f_random.max'='1000', -- 随机数的最大值
'fields.f_random_str.length'='10' -- 随机字符串的长度
);
CREATE TABLE Data_Output (
`id` BIGINT,
`name` STRING
) WITH (
'connector.type' = 'kudu'
,'kudu.masters' = 'master01:7051,master02:7051,master03:7051'
,'kudu.table' = 'Data_Output'
,'kudu.hash-columns' = 'id'
,'kudu.primary-key-columns' = 'id'
,'kudu.max-buffer-size' = '5000'
,'kudu.flush-interval' = '1000'
);
INSERT INTO `Data_Output`
SELECT f_sequence, f_random_str FROM random_source;
注:首次使用Kudu表时,kudu表用impala-shell查询时需要在Impala中创建对应的外表才能查到kudu的表数据。
自定义Conector可参考开源代码自己进行修改,打包。