Parallelize stat calls in Linux filesystem collector.
This change adds the ability to process multiple stat calls in parallel. Processing is rate-limited based on the new flag `collector.filesystem.stat-workers` (default 4). Caveat: filesystem stats information is no longer in the same order as returned by `/proc/1/mounts`. This should not be an issue. Caveat: This change currently uses unbuffered channels to prove correctness without reliance on buffers. Buffered channels will yield superior performance. Signed-off-by: Erica Mays <erica@emays.dev>
This commit is contained in:
parent
75d951d47a
commit
bdc430af2b
|
@ -40,6 +40,9 @@ const (
|
|||
var mountTimeout = kingpin.Flag("collector.filesystem.mount-timeout",
|
||||
"how long to wait for a mount to respond before marking it as stale").
|
||||
Hidden().Default("5s").Duration()
|
||||
var statWorkerCount = kingpin.Flag("collector.filesystem.stat-workers",
|
||||
"how many stat calls to process simultaneously").
|
||||
Hidden().Default("4").Int()
|
||||
var stuckMounts = make(map[string]struct{})
|
||||
var stuckMountsMtx = &sync.Mutex{}
|
||||
|
||||
|
@ -50,6 +53,26 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
|
|||
return nil, err
|
||||
}
|
||||
stats := []filesystemStats{}
|
||||
labelChan := make(chan filesystemLabels)
|
||||
statChan := make(chan filesystemStats)
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
workerCount := *statWorkerCount
|
||||
if workerCount < 1 {
|
||||
workerCount = 1
|
||||
}
|
||||
|
||||
for i := 0; i < workerCount; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for labels := range labelChan {
|
||||
statChan <- c.processStat(labels)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
for _, labels := range mps {
|
||||
if c.excludedMountPointsPattern.MatchString(labels.mountPoint) {
|
||||
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", labels.mountPoint)
|
||||
|
@ -59,6 +82,7 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
|
|||
level.Debug(c.logger).Log("msg", "Ignoring fs", "type", labels.fsType)
|
||||
continue
|
||||
}
|
||||
|
||||
stuckMountsMtx.Lock()
|
||||
if _, ok := stuckMounts[labels.mountPoint]; ok {
|
||||
stats = append(stats, filesystemStats{
|
||||
|
@ -69,17 +93,30 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
|
|||
stuckMountsMtx.Unlock()
|
||||
continue
|
||||
}
|
||||
stuckMountsMtx.Unlock()
|
||||
|
||||
// The success channel is used do tell the "watcher" that the stat
|
||||
// finished successfully. The channel is closed on success.
|
||||
stuckMountsMtx.Unlock()
|
||||
labelChan <- labels
|
||||
}
|
||||
close(labelChan)
|
||||
wg.Wait()
|
||||
close(statChan)
|
||||
}()
|
||||
|
||||
for stat := range statChan {
|
||||
stats = append(stats, stat)
|
||||
}
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemStats {
|
||||
success := make(chan struct{})
|
||||
go stuckMountWatcher(labels.mountPoint, success, c.logger)
|
||||
|
||||
buf := new(unix.Statfs_t)
|
||||
err = unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
|
||||
err := unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
|
||||
stuckMountsMtx.Lock()
|
||||
close(success)
|
||||
|
||||
// If the mount has been marked as stuck, unmark it and log it's recovery.
|
||||
if _, ok := stuckMounts[labels.mountPoint]; ok {
|
||||
level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint)
|
||||
|
@ -88,13 +125,11 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
|
|||
stuckMountsMtx.Unlock()
|
||||
|
||||
if err != nil {
|
||||
stats = append(stats, filesystemStats{
|
||||
level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
|
||||
return filesystemStats{
|
||||
labels: labels,
|
||||
deviceError: 1,
|
||||
})
|
||||
|
||||
level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
var ro float64
|
||||
|
@ -104,8 +139,7 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
|
|||
break
|
||||
}
|
||||
}
|
||||
|
||||
stats = append(stats, filesystemStats{
|
||||
return filesystemStats{
|
||||
labels: labels,
|
||||
size: float64(buf.Blocks) * float64(buf.Bsize),
|
||||
free: float64(buf.Bfree) * float64(buf.Bsize),
|
||||
|
@ -113,9 +147,7 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
|
|||
files: float64(buf.Files),
|
||||
filesFree: float64(buf.Ffree),
|
||||
ro: ro,
|
||||
})
|
||||
}
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// stuckMountWatcher listens on the given success channel and if the channel closes
|
||||
|
|
Loading…
Reference in New Issue