记一次Context和goroutine实现超时调度引发的内存泄漏问题

背景

最近一个项目上线,需要在上线前进行单节点压测以估计每个业务的部署计划。使用腾讯云的压测大师进行压测时,发现一个非常有意思的情况。首先上一下监控图:

内存和cpu使用情况
网卡流量趋势图

先说明一下我是在10:00左右进行了2次压测, 每次压测没有超过10分钟。可以从CPU使用情况中看到,压测时机器CPU利用率在急剧上升,usage_bytesrss内存也是在那个时间点上涨的,问题是压测完成后CPU使用率降下来了,但是我们的内存在后面的几个小时里都没有得到释放。很显然程序中一定有什么地方hang住了一大块内存。

于是我使用pprof工具查看了一下测试之后机器的性能指标如下【注意:我是在压测完成10分钟后查看的】:

pprof

可以看到图中居然有15318个goroutine在使用,且heap上有2979个对象,从网卡的流量趋势图我们知道,压测后网卡流量基本属于正常情况。【这里之所以不是为0是因为我的测试环境我用脚本定时的放的测试流量,实际压测时不应该有这些干扰流量】

进入到goroutine详情里面去,看一个到底是哪里hang住了这么多goroutine。

image.png

可以看到/data/ggr/workspace/internal/xxx\\_recommend/service/xxx\\_recommend\\_algo/xxx\\_recommend\\_algo.go:153 hang住了14235个goroutine

然后在看一下153行到底干了什么坏事

image.png

为了将方便说明,我这边将代码简化了一个测试用例出来,如下:

package xxx_recommend_algo

import (
	"context"
	"errors"
	"testing"
	"time"
)

func TestxxxRecommendAlgo(t *testing.T) {

    // goroutine A
	go func() {
        // 设置Context超时时间为50ms
		backGroundCtx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
		defer cancel()
		// 5.2 通过GRPC的方式从算法侧的模型服务中获取到推荐项目的得分的结果, 设置了超时时间,如果超过30ms则认为模型超时
		xxxRecommendChannel := make(chan *AlgoServingResponse)

        // goroutine B
		go getXXXRecommend(backGroundCtx, xxxRecommendChannel, t)
		select {
		case xxxRecommendResult := <-xxxRecommendChannel:
			if xxxRecommendResult.err != nil {
				return
			}
			for _, v := range xxxRecommendResult.scores {
				t.Log(v)
			}
		case <-backGroundCtx.Done():
			return
		}
		t.Log(backGroundCtx.Deadline())
	}()

	time.Sleep(time.Second * 10)
	t.Log("ok")
}

func getXXXRecommend(ctx context.Context, xxxRecommendResult chan *AlgoServingResponse, t *testing.T) {
	time.Sleep(time.Second) // 模拟远程rpc请求
	t.Log("ok1")
	xxxRecommendResult <- &AlgoServingResponse{err: errors.New("error")}
	t.Log("ok2")
}
// 算法推荐服务返回结果
type AlgoServingResponse struct {
	err    error
	scores map[string]int
}

分析

这段代码主要是使用Context实现一个超时调用,如果算法在50ms内不返回的话,goroutine A就会自动超时,而不会一直等待算法超时,goroutine B主要负责rpc调用算法服务。在算法不超时的情况下,是不会hang住goroutine B, 但是一旦算法服务超时,那个goroutine B已经return了,此时goroutine B还往通道xxxRecommendResult写数据,那么就会导致goroutine B一直堵塞在通道上。随着超时的次数越来越多,堵塞的goroutine也会越来越多,最总导致内存炸了。

我们可以运行当前代码,你会发现ok2是永远不会被打印出来的。

=== RUN   TestxxxRecommendAlgo
    xxx_recommend_algo_test.go:38: ok1
    xxx_recommend_algo_test.go:39: context deadline exceeded
    xxx_recommend_algo_test.go:33: ok
--- PASS: TestxxxRecommendAlgo (10.00s)
PASS

如果main不退出,那么goroutine B会一直堵塞!!!

解决方案1

在向通道写数据前检查Context是否已经超时了,如果过期了,就直接return,其他地方无需修改。

func getXXXRecommend(ctx context.Context, xxxRecommendResult chan *AlgoServingResponse, t *testing.T) {
	time.Sleep(time.Second) // 模拟远程rpc请求
	t.Log("ok1")
  	if ctx.Err() == context.Canceled {
		xxxRecommendResult <- &AlgoServingResponse{err: errors.New("error")}
	}
	t.Log("ok2")
}

解决方案2

一种更好的解决方案是将超时控制的范围控制在远程调度方法里面,将异步改为同步,因为我这边只有一个调度方法,没有必要开新goroutine去跑。

package xxx_recommend_algo

import (
	"context"
	"errors"
	"testing"
	"time"
)

func TestxxxRecommendAlgo(t *testing.T) {

    // goroutine A
	go func() {
        // 设置Context超时时间为50ms
		backGroundCtx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
		defer cancel()
		// 5.2 通过GRPC的方式从算法侧的模型服务中获取到推荐项目的得分的结果, 设置了超时时间,如果超过30ms则认为模型超时

		xxxRecommendResult := getXXXRecommend(backGroundCtx, xxxRecommendChannel, t)
		if xxxRecommendResult.err != nil{
			return nil, xxxRecommendResult.err
		}
		return xxxRecommendResult.scores, nil
	}
	time.Sleep(time.Second * 10)
	t.Log("ok")
}

func getXXXRecommend(ctx context.Context, xxxRecommendResult chan *AlgoServingResponse, t *testing.T) {
	backGroundCtx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
	defer cancel()
	// 将超时控制在最小的真正调用的时候
	clientConn, err := grpc.DialContext(backGroundCtx, "")
	if err == nil {
		xxxRecommendResult <- &AlgoServingResponse{err: errors.New("error")}
	}
	defer clientConn.close()
	...
	t.Log("ok1")
	xxxRecommendResult <- &AlgoServingResponse{err: errors.New("error")}
	t.Log("ok2")
}
// 算法推荐服务返回结果
type AlgoServingResponse struct {
	err    error
	scores map[string]int
}

总结

【1】 内存泄漏不一定会导致程序马上崩溃,但是任何泄漏的地方都应该处理掉。

【2】Go语言中无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。

如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。

【3】Go语言的缓存通道虽然正常情况下不会堵塞接受和发送方,但是在缓存池满了的时候,会堵塞发送,缓存池空的情况下堵塞接收方。这一点一定要注意。

本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
记一次Context和goroutine实现超时调度引发的内存泄漏问题
最近一个项目上线,需要在上线前进行单节点压测以估计每个业务的部署计划。使用腾讯云的压测大师进行压测时,发现一个非常有意思的情况。首先上一下监控图:
<<上一篇
下一篇>>