ClickHouse 集群迁移,你确认会吗

clickhouse 迁移的方案有很多,但是因为迁移稳单相对较少,很多人望而却步,这里为大家介绍3种方案

一 insert select from remote

详情

相关文档 https://clickhouse.com/docs/en/sql-reference/table-functions/remote/

INSERT INTO table SELECT * FROM remote('ip', 'db.table', 'user', 'password')   where ...                      

亲测,迁移亿级别一下的数据强烈推荐,不需要关心其他信息

但是 该方案和clickhouse 的写入性能相关,在去重的引擎的情况下性能相对较差,时间是ClickHouse 普通MergeTree 的好几倍

方案:

  1. 去重复 目标集群可以尝试先使用MergeTree来接受数据,然后通过一些其他方案来改为去重表引擎
  2. 大数据量但是有分区的表也可以尝试按照分区迁移拆分任务

优点:

简单方案

缺点:

数据量不宜过大

二 fetch part

详情

clickhouse 在目标集群配置 auxiliary_zookeepers 配置源集群zookeeper 的配置,进入可以访问源集群的元数据信息,通过fetch part 的方案,将源集群的数据fetch 到目标集群

腾讯云ClickHouse 21.8 也支持的通过ip:port fetch part ,这里也省去了auxiliary_zookeepers 的配置

个人之前有写入fetch part 文档 https://cloud.tencent.com/developer/article/1867322

官方文档 https://clickhouse.com/docs/en/sql-reference/statements/alter/partition/

<auxiliary_zookeepers>
    <source_zookeper>
        <node>
            <host>example_2_1</host>
            <port>2181</port>
        </node>
        <node>
            <host>example_2_2</host>
            <port>2181</port>
        </node>
        <node>
            <host>example_2_3</host>
            <port>2181</port>
        </node>
    </source_zookeper>
</auxiliary_zookeepers>
cluster('cluster_name', db.table[, sharding_key])
cluster('cluster_name', db, table[, sharding_key])
clusterAllReplicas('cluster_name', db.table[, sharding_key])
clusterAllReplicas('cluster_name', db, table[, sharding_key])
ALTER TABLE users FETCH PARTITION 201902 FROM 'source_zookeper:/clickhouse/tables/01-01/visits';
ALTER TABLE users ATTACH PARTITION 201902;
​
ALTER TABLE users FETCH PART 201901_2_2_0 FROM 'source_zookeeper:/clickhouse/tables/01-01/visits';
ALTER TABLE users ATTACH PART 201901_2_2_0;

但是:fetch part 需要clickhouse 21.3+ 的版本,集群迁移很可能源集群的版本不支持,可能需要在源集群进行升级后进行迁移

优点:

性能最佳,该过程支持远程http 拉去part ,性能基本与磁盘性能相当

缺点:

需要统计源集群所有part 的信息,可以参数 https://cloud.tencent.com/developer/article/1867322,然后组装拼接SQl 完成

且不支持按照key 维度的重分布,除非源集群和目标集群shard 数量相同

三 clickhouse copier

详情

clickhouse copier 为clickhouse 官方开源工具,本质逻辑也为insert select, 但它会将任务拆分

<yandex>
    <remote_servers>
        <source_cluster>
        </source_cluster>
        <destination_cluster>
        </destination_cluster>
    </remote_servers>
​
    <max_workers>20</max_workers>
    <number_of_splits>1</number_of_splits> <!-- 强烈迁移为1,默认为10-->
​
    <settings_pull>
        <readonly>1</readonly>
    </settings_pull>
    
    <settings_push>
        <readonly>0</readonly>
    </settings_push>
​
    <settings>
        <insert_distributed_sync>1</insert_distributed_sync>
    </settings>
​
    <tables>
        <!-- A table task, copies one table. -->
        <table_hits>
            <!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
            <cluster_pull>source_cluster</cluster_pull>
            <database_pull>test</database_pull>
            <table_pull>hits</table_pull>
​
            <!-- Destination cluster name and tables in which the data should be inserted -->
            <cluster_push>destination_cluster</cluster_push>
            <database_push>test</database_push>
            <table_push>hits2</table_push>
​
            <!-- Engine of destination tables.
                 If destination tables have not be created, workers create them using columns definition from source tables and engine
                 definition from here.
​
                 NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
                 be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
                 specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
                 system.parts table.
            -->
            <engine>
            ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/hits2', '{replica}')
            PARTITION BY toMonday(date)
            ORDER BY (CounterID, EventDate)
            </engine>
​
            <!-- Sharding key used to insert data to destination cluster -->
            <sharding_key>jumpConsistentHash(intHash64(UserID), 2)</sharding_key>
​
            <!-- Optional expression that filter data while pull them from source servers -->
            <where_condition>CounterID != 0</where_condition>
​
            <!-- This section specifies partitions that should be copied, other partition will be ignored.
                 Partition names should have the same format as
                 partition column of system.parts table (i.e. a quoted text).
                 Since partition key of source and destination cluster could be different,
                 these partition names specify destination partitions.
​
                 NOTE: In spite of this section is optional (if it is not specified, all partitions will be copied),
                 it is strictly recommended to specify them explicitly.
                 If you already have some ready partitions on destination cluster they
                 will be removed at the start of the copying since they will be interpeted
                 as unfinished data from the previous copying!!!
            -->
            <enabled_partitions>
                <partition>'2018-02-26'</partition>
                <partition>'2018-03-05'</partition>
                ...
            </enabled_partitions>
        </table_hits>
​
        <!-- Next table to copy. It is not copied until previous table is copying. -->
        <table_visits>
        ...
        </table_visits>
        ...
    </tables>
</yandex>

操作步骤:

  1. 准备以上迁移任务 task.xml
  2. 准备zoookeeper.xml
  3. task.xml 上传到zookeeper ,这里zookeeper 为迁移任务的zookeeper,可以使用目标集群的zookeeper,也可以使用源集群的zookeeper
bin/zkCli.sh -server ip:2181   deleteall /clickhouse
bin/zkCli.sh -server ip:2181   create /clickhouse
bin/zkCli.sh -server ip:2181   deleteall /clickhouse/copier
bin/zkCli.sh -server ip:2181   create /clickhouse/copier
bin/zkCli.sh -server ip:2181   create /clickhouse/copier/table1
bin/zkCli.sh -server ip:2181   create /clickhouse/copier/table1/description "`cat task.xml`"

5. copier 任务拉起

nohup clickhouse-copier --config zookeeper.xml  --task-path /clickhouse/copier/table1   --log-level=warning --base-dir ./logs/table1 & 
nohup clickhouse-copier --config zookeeper.xml  --task-path /clickhouse/copier/table1   --log-level=warning --base-dir ./logs/table1 & 

建议:

  1. 一个task.xml 是可以配置多个表的迁移任务,但是建议大表还是要分开,copier是一张一张表来完成迁移的,即使开始多个copier 任务,所以你应该有 /clickhouse/copier/table1 /clickhouse/copier/table2 /clickhouse/copier/table...
  2. 相同表的任务你可以开启多个copier进程进行迁移,加速迁移任务,强烈建议在目标集群的每个clickhouse server 节点都拉起相同的任务
    (1因为copier 性能取决于copier 所在机器的性能,多个节点,性能自然好 ,2 某种意义也是分布式的,因为多个copier 任务迁移一张表的情况下,首先会将表拆分成很多的任务,存放到zookeeeper上,然后每个copier 获取任务去执行)
  3. copier 虽然是支持partition级别的,但是我们不能为一张表开始不同task 任务,因为在copier 迁移的过程中,目标集群会创建临时表
    (比如你迁移的 table1 临时表为 table1_piece_0 、table_piece_1),不同的任务使用相同的临时表,一个任务完成的时候,会讲起临时表删除,结果你可想而之,那就是剩下的任务迁移会失败
    但是:这里说个但是 也有其他的绕过它的放哪,该方案建立在多副本的情况下,你可以shard1_replica1 迁移partition 1 , shard1_replica2 迁移partition2

总的task

<yandex>
    <remote_servers>
        <source_cluster>
        </source_cluster>
        <destination_cluster>
           <shard>
             <replica>1
             </replica>
             <replica>2
             </replica>
          </shard>
        </destination_cluster>
    </remote_servers>

    <tables>
        <table_hits>
            <cluster_pull>source_cluster</cluster_pull>
            <database_pull>test</database_pull>
            <table_pull>hits</table_pull>
            <cluster_push>destination_cluster</cluster_push>
            <database_push>test</database_push>
            <table_push>hits</table_push>

            <enabled_partitions>
                <partition>'partiiton1'</partition>
                <partition>'partiiton2'</partition>
            </enabled_partitions>
        </table_hits>

    </tables>
</yandex>

我们可以将 task 拆分为 task1 task2

task1.xml

<yandex>
    <remote_servers>
        <source_cluster>
        </source_cluster>
        <destination_cluster>
           <shard>
             <replica>1
             </replica>
             <!--<replica>2
             </replica>-->
          </shard>
        </destination_cluster>
    </remote_servers>

    <tables>
        <table_hits>
            <cluster_pull>source_cluster</cluster_pull>
            <database_pull>test</database_pull>
            <table_pull>hits</table_pull>
            <cluster_push>destination_cluster</cluster_push>
            <database_push>test</database_push>
            <table_push>hits</table_push>

            <enabled_partitions>
                <partition>'partiiton1'</partition>
                <!--<partition>'partiiton2'</partition>-->
            </enabled_partitions>
        </table_hits>

    </tables>
</yandex>

task2

<yandex>
    <remote_servers>
        <source_cluster>
        </source_cluster>
        <destination_cluster>
           <shard>
             <!--<replica>1
             </replica>-->
             <replica>2
             </replica>
          </shard>
        </destination_cluster>
    </remote_servers>

    <tables>
        <table_hits>
            <cluster_pull>source_cluster</cluster_pull>
            <database_pull>test</database_pull>
            <table_pull>hits</table_pull>
            <cluster_push>destination_cluster</cluster_push>
            <database_push>test</database_push>
            <table_push>hits</table_push>

            <enabled_partitions>
                <!--<partition>'partiiton1'</partition>-->
                <partition>'partiiton2'</partition>
            </enabled_partitions>
        </table_hits>

    </tables>
</yandex>

4. 如果源集群与目标集群的节点,结构完全一致的情况,强烈建议点对点迁移,shard 对shard ,性能提升明显,且将对应的copier 任务放到对应的节点执行
原先的task.xml

<yandex>
    <remote_servers>
        <source_cluster>
           <shard>1
          </shard>
           <shard>2
          </shard>
        </source_cluster>
        <destination_cluster>
           <shard>1
          </shard>
           <shard>2
          </shard>
        </destination_cluster>
    </remote_servers>

task1

<yandex>
    <remote_servers>
        <source_cluster>
           <shard>1
          </shard>
          <!-- <shard>2
          </shard>-->
        </source_cluster>
        <destination_cluster>
           <shard>1
          </shard>
           <!--<shard>2
          </shard>-->
        </destination_cluster>
    </remote_servers>

task2

<yandex>
    <remote_servers>
        <source_cluster>
          <!-- <shard>1
          </shard>-->
           <shard>2
          </shard>
        </source_cluster>
        <destination_cluster>
          <!-- <shard>1
          </shard>-->
           <shard>2
          </shard>
        </destination_cluster>
    </remote_servers>

5. 最佳时间就是 3+4 组合

优点:稳定

缺点:使用复杂

希望可以给大家带来帮助,欢迎大家使用 腾讯云clickhouse https://cloud.tencent.com/product/cdwch

马上也会推出弹性版ClikHouse https://mp.weixin.qq.com/s/dxoU7S7hOK_PIBZp5OZTKA

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
ClickHouse 集群迁移,你确认会吗
clickhouse 迁移的方案有很多,但是因为迁移稳单相对较少,很多人望而却步,这里为大家介绍3种方案
<<上一篇
下一篇>>