初探持续监测技术

持续性检测是指可以自动检测系统的性能,定时或在系统故障时保存或上报监测数据,而不需要主动拉取系统的运行状况数据,可以帮助发现并分析系统中存在的问题。

开源的持续性监测工具有pyroscope和holmes,接下来将简单介绍一下两个工具。

Pyroscope

官网:https://github.com/pyroscope-io/pyroscope

star数:6.8k

pyroscope是传统的CS架构,pyroscope客户端上报数据到pyroscope的服务端,服务端再通过可视化技术进行展示。

pyroscope支持在数据上报时指定对应数据的标签,和上报prometheus的数据标签类似,整体的可视化界面也和Grafna类似。在线地址:pyroscope在线Demo

左侧菜单介绍:

  • Tag explorer:在Application选择应用和查看指标类型(CPU、内存等),通过选择对应的标签过滤数据,查看数据的整体分布情况,左侧展示的是函数资源占用,右侧是资源的火焰图。
  • Single View:在Application选择应用和查看指标类型(CPU、内存等),选择某单个标签进行数据过滤,左侧展示的是函数资源占用,右侧是资源的火焰图。
  • Comparison View:在Application选择应用和查看指标类型(CPU、内存等),在左侧的baseline选择某单个基准标签进行数据过滤,右侧选择某个需要对比的标签进行数据过滤,可以同时观察两个火焰的图差异。
  • Diff View:在Application选择应用和查看指标类型(CPU、内存等),在左侧的baseline选择某单个基准标签进行数据过滤,右侧选择某个需要对比的标签进行数据过滤,可以观察火焰图中哪些函数是新增的,哪些是已经去掉的,更直观查看两个火焰图的差异。
  • 鼠标放到火焰图上还可以看到采样数和这些样本中函数执行耗时的累积值。pyroscope展示的数据是以右上角选择查看时间段的所有满足条件样本数据累计值进行展示,如火焰图中函数执行时间是期间所有样本的该函数执行时间的累计值。

pyroscope支持多种客户端语言。

Golang客户端源码分析

https://pyroscope.io/docs/golang/

客户端通过设置ServerAddress来指定上报服务端地址,通过ProfileTypes来指定需要上报的数据类型。

pyroscope.Start(pyroscope.Config{
    ApplicationName: "simple.golang.app",

    // replace this with the address of pyroscope server
    ServerAddress:   "http://pyroscope-server:4040",

    // you can disable logging by setting this to nil
    Logger:          pyroscope.StandardLogger,

    // optionally, if authentication is enabled, specify the API key:
    // AuthToken:    os.Getenv("PYROSCOPE_AUTH_TOKEN"),

    // you can provide static tags via a map:
    Tags:            map[string]string{"hostname": os.Getenv("HOSTNAME")},

    ProfileTypes: []pyroscope.ProfileType{
        // these profile types are enabled by default:
        pyroscope.ProfileCPU,
        pyroscope.ProfileAllocObjects,
        pyroscope.ProfileAllocSpace,
        pyroscope.ProfileInuseObjects,
        pyroscope.ProfileInuseSpace,

        // these profile types are optional:
        pyroscope.ProfileGoroutines,
        pyroscope.ProfileMutexCount,
        pyroscope.ProfileMutexDuration,
        pyroscope.ProfileBlockCount,
        pyroscope.ProfileBlockDuration,
    },
})

pyroscope.Start方法的默认采样率是100%,然后通过start()方法中开启的另外一个协程的takeSnapshots()方法在不断循环调用reset()来上报数据。

const DefaultSampleRate                = 100
func Start(cfg Config) (*Profiler, error) {
	if len(cfg.ProfileTypes) == 0 {
		cfg.ProfileTypes = DefaultProfileTypes
	}
	if cfg.SampleRate == 0 {
		cfg.SampleRate = DefaultSampleRate
	}
	if cfg.Logger == nil {
		cfg.Logger = noopLogger
	}

    ...

	start()
}


func (ps *Session) start() error {
	t := ps.truncatedTime()
	ps.reset(t, t)

	go ps.takeSnapshots()
	return nil
}

// revive:disable-next-line:cognitive-complexity complexity is fine
func (ps *Session) takeSnapshots() {
	var automaticResetTicker <-chan time.Time
	if ps.DisableAutomaticResets {
		automaticResetTicker = make(chan time.Time)
	} else {
		t := alignedticker.NewAlignedTicker(ps.uploadRate)
		automaticResetTicker = t.C
		defer t.Stop()
	}
	for {
		select {
		case endTime := <-automaticResetTicker:
			ps.reset(ps.startTime, endTime)
		case f := <-ps.flushCh:
			ps.reset(ps.startTime, ps.truncatedTime())
			ps.upstream.Flush()
			f.wg.Done()
			break
		case <-ps.stopCh:
			return
		}
	}
}

func (ps *Session) reset(startTime, endTime time.Time) {

	ps.logger.Debugf("profiling session reset %s", startTime.String())

	// first reset should not result in an upload
	if !ps.startTime.IsZero() {
		ps.uploadData(startTime, endTime)
	} else {
		if ps.isCPUEnabled() {
			pprof.StartCPUProfile(ps.cpuBuf)
		}
	}

	ps.startTime = endTime
}

在上述reset()方法中,若是第一次调用,则不会上报,而是若开启了CPU采集则启动CPU采集,调用的pprof的方法,设置的CPU集采频率为100次/秒。否则会调用uploadData()方法进行上报,该方法中分布对CPU、内存、Goroutine、Block和Mutex等采集信息进行上报。

func StartCPUProfile(w io.Writer) error {
	// The runtime routines allow a variable profiling rate,
	// but in practice operating systems cannot trigger signals
	// at more than about 500 Hz, and our processing of the
	// signal is not cheap (mostly getting the stack trace).
	// 100 Hz is a reasonable choice: it is frequent enough to
	// produce useful data, rare enough not to bog down the
	// system, and a nice round number to make it easy to
	// convert sample counts to seconds. Instead of requiring
	// each client to specify the frequency, we hard code it.
	const hz = 100

	cpu.Lock()
	defer cpu.Unlock()
	if cpu.done == nil {
		cpu.done = make(chan bool)
	}
	// Double-check.
	if cpu.profiling {
		return fmt.Errorf("cpu profiling already in use")
	}
	cpu.profiling = true
	runtime.SetCPUProfileRate(hz)
	go profileWriter(w)
	return nil
}

func (ps *Session) uploadData(startTime, endTime time.Time) {
	if ps.isCPUEnabled() {
		pprof.StopCPUProfile()
		defer func() {
			pprof.StartCPUProfile(ps.cpuBuf)
		}()
		ps.upstream.Upload(&upstream.UploadJob{
			Name:            ps.appName,
			StartTime:       startTime,
			EndTime:         endTime,
			SpyName:         "gospy",
			SampleRate:      100,
			Units:           "samples",
			AggregationType: "sum",
			Format:          upstream.FormatPprof,
			Profile:         copyBuf(ps.cpuBuf.Bytes()),
		})
		ps.cpuBuf.Reset()
	}

	if ps.isGoroutinesEnabled() {
		p := pprof.Lookup("goroutine")
		if p != nil {
			p.WriteTo(ps.goroutinesBuf, 0)
			ps.upstream.Upload(&upstream.UploadJob{
				Name:            ps.appName,
				StartTime:       startTime,
				EndTime:         endTime,
				SpyName:         "gospy",
				Units:           "goroutines",
				AggregationType: "average",
				Format:          upstream.FormatPprof,
				Profile:         copyBuf(ps.goroutinesBuf.Bytes()),
				SampleTypeConfig: map[string]*upstream.SampleType{
					"goroutine": {
						DisplayName: "goroutines",
						Units:       "goroutines",
						Aggregation: "average",
					},
				},
			})
			ps.goroutinesBuf.Reset()
		}
	}

	if ps.isBlockEnabled() {
		p := pprof.Lookup("block")
		if p != nil {
			p.WriteTo(ps.blockBuf, 0)
			curBlockBuf := copyBuf(ps.blockBuf.Bytes())
			ps.blockBuf.Reset()
			if ps.blockPrevBytes != nil {
				ps.upstream.Upload(&upstream.UploadJob{
					Name:        ps.appName,
					StartTime:   startTime,
					EndTime:     endTime,
					SpyName:     "gospy",
					Format:      upstream.FormatPprof,
					Profile:     curBlockBuf,
					PrevProfile: ps.blockPrevBytes,
					SampleTypeConfig: map[string]*upstream.SampleType{
						"contentions": {
							DisplayName: "block_count",
							Units:       "lock_samples",
							Cumulative:  true,
						},
						"delay": {
							DisplayName: "block_duration",
							Units:       "lock_nanoseconds",
							Cumulative:  true,
						},
					},
				})
			}
			ps.blockPrevBytes = curBlockBuf
		}
	}
	if ps.isMutexEnabled() {
		p := pprof.Lookup("mutex")
		if p != nil {
			p.WriteTo(ps.mutexBuf, 0)
			curMutexBuf := copyBuf(ps.mutexBuf.Bytes())
			ps.mutexBuf.Reset()
			if ps.mutexPrevBytes != nil {
				ps.upstream.Upload(&upstream.UploadJob{
					Name:        ps.appName,
					StartTime:   startTime,
					EndTime:     endTime,
					SpyName:     "gospy",
					Format:      upstream.FormatPprof,
					Profile:     curMutexBuf,
					PrevProfile: ps.mutexPrevBytes,
					SampleTypeConfig: map[string]*upstream.SampleType{
						"contentions": {
							DisplayName: "mutex_count",
							Units:       "lock_samples",
							Cumulative:  true,
						},
						"delay": {
							DisplayName: "mutex_duration",
							Units:       "lock_nanoseconds",
							Cumulative:  true,
						},
					},
				})
			}
			ps.mutexPrevBytes = curMutexBuf
		}
	}

	if ps.isMemEnabled() {
		currentGCGeneration := numGC()
		// sometimes GC doesn't run within 10 seconds
		//   in such cases we force a GC run
		//   users can disable it with disableGCRuns option
		if currentGCGeneration == ps.lastGCGeneration && !ps.disableGCRuns {
			runtime.GC()
			currentGCGeneration = numGC()
		}
		if currentGCGeneration != ps.lastGCGeneration {
			pprof.WriteHeapProfile(ps.memBuf)
			curMemBytes := copyBuf(ps.memBuf.Bytes())
			ps.memBuf.Reset()
			if ps.memPrevBytes != nil {
				ps.upstream.Upload(&upstream.UploadJob{
					Name:        ps.appName,
					StartTime:   startTime,
					EndTime:     endTime,
					SpyName:     "gospy",
					SampleRate:  100,
					Format:      upstream.FormatPprof,
					Profile:     curMemBytes,
					PrevProfile: ps.memPrevBytes,
				})
			}
			ps.memPrevBytes = curMemBytes
			ps.lastGCGeneration = currentGCGeneration
		}
	}
}

holmes

官网:https://github.com/mosn/holmes

star数:831

holmes 每隔一段时间收集一次以下应用指标:

  • 协程数,通过runtime.NumGoroutine。
  • 当前应用所占用的RSS,通过gopsutil第三方库。
  • CPU使用率,比如8C的机器,如果使用了4C,则使用率为50%,通过gopsutil第三方库。

holmes 支持对以下几种应用指标进行监控:

  • mem: 内存分配
  • cpu: cpu使用率
  • thread: 线程数
  • goroutine: 协程数
  • gcHeap: 基于GC周期的内存分配

holmes 客户端可以支持设置采集多种对象,CPU采集的控制参数示例如下:

  • WithCollectInterval("5s") 每5s采集一次当前应用的各项指标,该值建议设置为大于1s。
  • WithDumpPath("/tmp") profile文件保存路径。
  • WithCPUDump(10, 25, 80, time.Minute) 会在满足以下条件时dump profile cpu usage > 10% && cpu usage > 125% * previous cpu usage recorded or cpu usage > 80%. time.Minute 是两次dump操作之间最小时间间隔,避免频繁profiling对性能产生的影响。
  • WithCPUMax 当cpu使用率大于Max, holmes会跳过dump操作,以防拖垮系统。
func initHolmes() *Holmes{
    h, _ := holmes.New(
    holmes.WithCollectInterval("5s"),
    holmes.WithDumpPath("/tmp"),
    holmes.WithCPUDump(20, 25, 80, time.Minute),
    holmes.WithCPUMax(90),

    // holmes.WithMemDump(30, 25, 80, time.Minute),
    // holmes.WithGCHeapDump(10, 20, 40, time.Minute),
    // holmes.WithGoroutineDump(500, 25, 20000, 0, time.Minute),
    )
    h.EnableCPUDump()
    return h
}

可以通过Set在系统运行时更新holmes的配置

h.Set(
        WithCollectInterval("2s"),
        WithGoroutineDump(10, 10, 50, 90, time.Minute))

可以通过实现Reporter来实现以下功能:

发送包含现场的告警信息,当holmes触发Dump操作时。

将Profiles上传到其他地方,以防实例被销毁,从而导致profile丢失,或进行分析。

type ReporterImpl struct{}
func (r *ReporterImpl) 	Report(pType string, filename string, reason ReasonType, eventID string, sampleTime time.Time, pprofBytes []byte, scene Scene) error{ 
    // do something	
}
......
r := &ReporterImpl{} // a implement of holmes.ProfileReporter Interface.
h, _ := holmes.New(
    holmes.WithProfileReporter(reporter),
    holmes.WithDumpPath("/tmp"),
    holmes.WithLogger(holmes.NewFileLog("/tmp/holmes.log", mlog.INFO)),
    holmes.WithBinaryDump(),
    holmes.WithMemoryLimit(100*1024*1024), // 100MB
    holmes.WithGCHeapDump(10, 20, 40, time.Minute),
)
本站文章资源均来源自网络,除非特别声明,否则均不代表站方观点,并仅供查阅,不作为任何参考依据!
如有侵权请及时跟我们联系,本站将及时删除!
如遇版权问题,请查看 本站版权声明
THE END
分享
二维码
海报
初探持续监测技术
持续性检测是指可以自动检测系统的性能,定时或在系统故障时保存或上报监测数据,而不需要主动拉取系统的运行状况数据,可以帮助发现并分析系统中存在的问题。
<<上一篇
下一篇>>