Oceanus实践-消费 CMQ 主题模型数据源

实时即未来,最近在腾讯云Oceanus进行Flink实时计算服务,以下为flink消费腾讯云CMQ数据实践。原文自Raigor,已获得授权,分享给大家~

Oceanus Flink CMQ connector 支持队列模型的数据源表和目的表,暂时不支持主题模型数据源表和目的表。CMQ 主题订阅可以实时同步主题模型数据到队列模型,借助这种机制,我们可以在 Oceanus 实现 CMQ 主题模型数据源表的读取。

1. 环境搭建

1.1 创建 Oceanus 集群

在 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,选择地域、可用区、VPC、日志、存储,设置初始密码等。

若之前未使用过VPC,日志,存储这些组件,需要先进行创建。

创建完后的集群如下:

CMQ 主题

1.3 新建 CMQ 队列

在 CMQ 控制台的【队列】-> 【新建】主题,输入队列名称、消息生命周期、堆积消息数量上限,其他保持默认值即可。我们这里新建两个队列,其中一个用来订阅 CMQ 主题模型数据,另一个用作 Oceanus 作业的目的表。新建的主题如下:

CMQ 读取 & 写入

CREATE TABLE `CMQSourceTable` (
    `id` bigint,
    `request_method` varchar(80),
    `response` varchar(80),
    PRIMARY KEY (`id`) NOT ENFORCED --如果想做到数据去重的操作,则需要指定PK,按照这个主键来区分不同的数据
) WITH (
	'connector' = 'cmq', 											--必须为 'cmq'
    'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com',	--cmq所在地域的nameServer
    'queue' = 'cs2',											--cmq的队列名
    'secret-id' = 'Your SecretId',			                                --账号secretId
    'secret-key' = 'Your SecretKey',                                          --账号secretKey
    'sign-method' = 'HmacSHA1',                                     --签名的方式
    'format' = 'csv',                                               --定义数据格式(JSON 格式)
    'batch-size' = '16',                                            --批量消费消息的个数/批量发送消息的个数
    'request-timeout' = '5000ms',                                   --请求的超时时间
    'polling-wait-timeout'= '10s',                                  --source参数; 获取不到数据情况下的等待时间
    'key-alive-timeout'= '5min',                                    --source参数;含primary key的消息,CMQ去重的有效时间
    'retry-times' = '3',                                            --sink参数;发送消息的重试次数
    'max-block-timeout' = '0s'                                      --sink参数;批量发送数据的最大等待时间
);

CREATE TABLE `CMQSinkTable` (
    `id` bigint,
    `request_method` varchar(80),
    `response` varchar(80),
    PRIMARY KEY (`id`) NOT ENFORCED --如果想做到数据去重的操作,则需要指定PK,按照这个主键来区分不同的数据
) WITH (
	'connector' = 'cmq', 											--必须为 'cmq'
    'hosts' = 'http://cmq-nameserver-vpc-gz.api.tencentyun.com',	--cmq所在地域的nameServer
    'queue' = 'sink_queue',											--cmq的队列名
    'secret-id' = 'Your SecretId',			                                --账号secretId
    'secret-key' = 'Your SecretKey',                                          --账号secretKey
    'sign-method' = 'HmacSHA1',                                     --签名的方式
    'format' = 'csv',                                               --定义数据格式(JSON 格式)
    'batch-size' = '16',                                            --批量消费消息的个数/批量发送消息的个数
    'request-timeout' = '5000ms',                                   --请求的超时时间
    'polling-wait-timeout'= '10s',                                  --source参数; 获取不到数据情况下的等待时间
    'key-alive-timeout'= '5min',                                    --source参数;含primary key的消息,CMQ去重的有效时间
    'retry-times' = '3',                                            --sink参数;发送消息的重试次数
    'max-block-timeout' = '0s'                                      --sink参数;批量发送数据的最大等待时间
);

insert into CMQSinkTable select * from CMQSourceTable;

2.3 算子操作

这里只做最简单的数据插入。

insert into CMQSinkTable select *from CMQSourceTable;

3. 验证总结

在 CMQ 控制台往名为test的主题中发送消息,可在sink_queue的队列中接收到消息。

发送主题消息
接收队列消息

原文链接:https://cloud.tencent.com/developer/article/1857665

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
Oceanus实践-消费 CMQ 主题模型数据源
实时即未来,最近在腾讯云Oceanus进行Flink实时计算服务,以下为flink消费腾讯云CMQ数据实践。原文自Raigor,已获得授权,分享给大家~
<<上一篇
下一篇>>