refactor to make timestamp collector work for multiple file_sd's

This commit is contained in:
Callum Styan 2017-11-30 23:54:40 -08:00 committed by Brian Brazil
parent a00fc883c3
commit d76d5de66f
1 changed files with 79 additions and 34 deletions

View File

@ -22,6 +22,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"time" "time"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
@ -38,10 +39,9 @@ const fileSDFilepathLabel = model.MetaLabelPrefix + "filepath"
// TimestampCollector is a Custom Collector for Timestamps of the files. // TimestampCollector is a Custom Collector for Timestamps of the files.
type TimestampCollector struct { type TimestampCollector struct {
filenames []string
Description *prometheus.Desc Description *prometheus.Desc
discoverers map[*Discovery]struct{}
logger log.Logger lock sync.RWMutex
} }
// Describe method sends the description to the channel. // Describe method sends the description to the channel.
@ -49,38 +49,49 @@ func (t *TimestampCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- t.Description ch <- t.Description
} }
// SetFiles changes the filenames of the struct to the paths returned by listfiles().
func (t *TimestampCollector) SetFiles(files []string) {
t.filenames = files
}
// Collect creates constant metrics for each file with last modified time of the file. // Collect creates constant metrics for each file with last modified time of the file.
func (t *TimestampCollector) Collect(ch chan<- prometheus.Metric) { func (t *TimestampCollector) Collect(ch chan<- prometheus.Metric) {
files := t.filenames // New map to dedup filenames.
for i := 0; i < len(files); i++ { uniqueFiles := make(map[string]float64)
info, err := os.Stat(files[i]) t.lock.RLock()
if err != nil { for fileSD := range t.discoverers {
t.logger.Errorf("Error getting the fileinfo of the file %q: %s", files[i], err) for filename, timestamp := range fileSD.timestamps {
continue uniqueFiles[filename] = timestamp
} }
}
t.lock.RUnlock()
for filename, timestamp := range uniqueFiles {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
t.Description, t.Description,
prometheus.GaugeValue, prometheus.GaugeValue,
float64(info.ModTime().Unix()), timestamp,
files[i], filename,
) )
} }
} }
func (t *TimestampCollector) addDiscoverer(disc *Discovery) {
t.lock.Lock()
t.discoverers[disc] = struct{}{}
t.lock.Unlock()
}
func (t *TimestampCollector) removeDiscoverer(disc *Discovery) {
t.lock.Lock()
delete(t.discoverers, disc)
t.lock.Unlock()
}
// NewTimestampCollector creates a TimestampCollector. // NewTimestampCollector creates a TimestampCollector.
func NewTimestampCollector() *TimestampCollector { func NewTimestampCollector() *TimestampCollector {
return &TimestampCollector{ return &TimestampCollector{
Description: prometheus.NewDesc( Description: prometheus.NewDesc(
"prometheus_sd_file_timestamp", "prometheus_sd_file_timestamp",
"Timestamp of files read by FileSD", "Timestamp (mtime) of files read by FileSD. Timestamp is set at read time.",
[]string{"filename"}, []string{"filename"},
nil, nil,
), ),
discoverers: make(map[*Discovery]struct{}),
} }
} }
@ -111,6 +122,8 @@ type Discovery struct {
paths []string paths []string
watcher *fsnotify.Watcher watcher *fsnotify.Watcher
interval time.Duration interval time.Duration
timestamps map[string]float64
lock sync.RWMutex
// lastRefresh stores which files were found during the last refresh // lastRefresh stores which files were found during the last refresh
// and how many target groups they contained. // and how many target groups they contained.
@ -124,11 +137,15 @@ func NewDiscovery(conf *config.FileSDConfig, logger log.Logger) *Discovery {
if logger == nil { if logger == nil {
logger = log.NewNopLogger() logger = log.NewNopLogger()
} }
return &Discovery{
disc := &Discovery{
paths: conf.Files, paths: conf.Files,
interval: time.Duration(conf.RefreshInterval), interval: time.Duration(conf.RefreshInterval),
timestamps: make(map[string]float64),
logger: logger, logger: logger,
} }
fileSDTimeStamp.addDiscoverer(disc)
return disc
} }
// listFiles returns a list of all files that match the configured patterns. // listFiles returns a list of all files that match the configured patterns.
@ -212,6 +229,18 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) {
} }
} }
func (d *Discovery) writeTimestamp(filename string, timestamp float64) {
d.lock.Lock()
d.timestamps[filename] = timestamp
d.lock.Unlock()
}
func (d *Discovery) deleteTimestamp(filename string) {
d.lock.Lock()
delete(d.timestamps, filename)
d.lock.Unlock()
}
// stop shuts down the file watcher. // stop shuts down the file watcher.
func (d *Discovery) stop() { func (d *Discovery) stop() {
level.Debug(d.logger).Log("msg", "Stopping file discovery...", "paths", d.paths) level.Debug(d.logger).Log("msg", "Stopping file discovery...", "paths", d.paths)
@ -219,6 +248,8 @@ func (d *Discovery) stop() {
done := make(chan struct{}) done := make(chan struct{})
defer close(done) defer close(done)
fileSDTimeStamp.removeDiscoverer(d)
// Closing the watcher will deadlock unless all events and errors are drained. // Closing the watcher will deadlock unless all events and errors are drained.
go func() { go func() {
for { for {
@ -245,11 +276,9 @@ func (d *Discovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup
defer func() { defer func() {
fileSDScanDuration.Observe(time.Since(t0).Seconds()) fileSDScanDuration.Observe(time.Since(t0).Seconds())
}() }()
fileSDTimeStamp.SetFiles(d.listFiles())
ref := map[string]int{} ref := map[string]int{}
for _, p := range d.listFiles() { for _, p := range d.listFiles() {
tgroups, err := readFile(p) tgroups, err := d.readFile(p)
if err != nil { if err != nil {
fileSDReadErrorsCount.Inc() fileSDReadErrorsCount.Inc()
@ -270,6 +299,8 @@ func (d *Discovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup
for f, n := range d.lastRefresh { for f, n := range d.lastRefresh {
m, ok := ref[f] m, ok := ref[f]
if !ok || n > m { if !ok || n > m {
level.Debug(d.logger).Log("msg", "file_sd refresh found file that should be removed", "file", f)
d.deleteTimestamp(f)
for i := m; i < n; i++ { for i := m; i < n; i++ {
select { select {
case ch <- []*config.TargetGroup{{Source: fileSource(f, i)}}: case ch <- []*config.TargetGroup{{Source: fileSource(f, i)}}:
@ -284,15 +315,21 @@ func (d *Discovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup
d.watchFiles() d.watchFiles()
} }
// fileSource returns a source ID for the i-th target group in the file.
func fileSource(filename string, i int) string {
return fmt.Sprintf("%s:%d", filename, i)
}
// readFile reads a JSON or YAML list of targets groups from the file, depending on its // readFile reads a JSON or YAML list of targets groups from the file, depending on its
// file extension. It returns full configuration target groups. // file extension. It returns full configuration target groups.
func readFile(filename string) ([]*config.TargetGroup, error) { func (d *Discovery) readFile(filename string) ([]*config.TargetGroup, error) {
content, err := ioutil.ReadFile(filename) fd, err := os.Open(filename)
if err != nil {
return nil, err
}
defer fd.Close()
content, err := ioutil.ReadAll(fd)
if err != nil {
return nil, err
}
info, err := fd.Stat()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -324,5 +361,13 @@ func readFile(filename string) ([]*config.TargetGroup, error) {
} }
tg.Labels[fileSDFilepathLabel] = model.LabelValue(filename) tg.Labels[fileSDFilepathLabel] = model.LabelValue(filename)
} }
d.writeTimestamp(filename, float64(info.ModTime().Unix()))
return targetGroups, nil return targetGroups, nil
} }
// fileSource returns a source ID for the i-th target group in the file.
func fileSource(filename string, i int) string {
return fmt.Sprintf("%s:%d", filename, i)
}