【技术种草】CKafka调优笔记 消费堆积 服务CPU未跑满应该如何解决?

1. 背景

Proxy服务负责消费CKafka消息并解析,并分发消息至不同的CKafka topic。近期发现Proxy服务消费CKafka有消息堆积,且服务所在CVM CPU与内存资源大概只占用50%左右。

如图所示可以看到,在数据量峰值的的时候,生产流量可以达到2000MB/小时,但是消费流量达不到这么多,说明该服务有消息堆积。

其他说明:CKafka partition数量与服务实例数量正好一比一关系,CKafka 消费Client Concurrence设置为1。Proxy服务维护一个线程池,用于解析与分发消费的每一条消息。每当有消息进入服务时,每条消息会用一个线程进行解析消息并发送数据。

    @KafkaListener(topics = "topic")
    public void consumerKafkaMsg(List<ConsumerRecord<?, String>> records) throws Exception {
        for (ConsumerRecord<?, String> record : records) {
            log.debug("kafka topic = {}, value:\\n{}", record.topic(), record.value());
            service.handleMsg(record);
        }
    }
    @PostConstruct
    public void init() {
        BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<Runnable>(consumerCount);
        RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d").build();
        threadPool = new ThreadPoolExecutor(consumerCount, consumerCount, 0L, TimeUnit.MILLISECONDS,
                workingQueue, namedThreadFactory, rejectedExecutionHandler);
    }

    public void handleMsg(ConsumerRecord<?, String> record) {
        threadPool.execute(new ThreadPoolTask(recorde));
    }

2. 问题分析

使用Arthas工具分析一下堆栈,如下图,可以看到每个线程都在TIMED_WAITING的等待状态,CPU消耗也很低,初步判断消费堆积并不是因为线程数量不够,而是卡在IO。

3.3 CKafka Producer 参数修改

同时重新查看Arthas里面每个线程的状态,线程卡在kafkaTemplate里面的dosent方法,再往上是awiat

腾讯云监控还是起了很大作用,在调优过程有很大参考意义,Ckafka或者组件都需要进行适当的参数调整才能发挥最大作用

效果还是比较明显可以看到机器CPU负载提高显著,未消费的Kafka消息也慢慢降低,达到预期。

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
【技术种草】CKafka调优笔记 消费堆积 服务CPU未跑满应该如何解决?
Proxy服务负责消费CKafka消息并解析,并分发消息至不同的CKafka topic。近期发现Proxy服务消费CKafka有消息堆积,且服务所在CVM C...
<<上一篇
下一篇>>