diff --git a/discovery/file/file.go b/discovery/file/file.go index 911d87039..adb71127c 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -22,6 +22,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "github.com/go-kit/kit/log" @@ -38,10 +39,9 @@ const fileSDFilepathLabel = model.MetaLabelPrefix + "filepath" // TimestampCollector is a Custom Collector for Timestamps of the files. type TimestampCollector struct { - filenames []string Description *prometheus.Desc - - logger log.Logger + discoverers map[*Discovery]struct{} + lock sync.RWMutex } // Describe method sends the description to the channel. @@ -49,38 +49,49 @@ func (t *TimestampCollector) Describe(ch chan<- *prometheus.Desc) { ch <- t.Description } -// SetFiles changes the filenames of the struct to the paths returned by listfiles(). -func (t *TimestampCollector) SetFiles(files []string) { - t.filenames = files -} - // Collect creates constant metrics for each file with last modified time of the file. func (t *TimestampCollector) Collect(ch chan<- prometheus.Metric) { - files := t.filenames - for i := 0; i < len(files); i++ { - info, err := os.Stat(files[i]) - if err != nil { - t.logger.Errorf("Error getting the fileinfo of the file %q: %s", files[i], err) - continue + // New map to dedup filenames. + uniqueFiles := make(map[string]float64) + t.lock.RLock() + for fileSD := range t.discoverers { + for filename, timestamp := range fileSD.timestamps { + uniqueFiles[filename] = timestamp } + } + t.lock.RUnlock() + for filename, timestamp := range uniqueFiles { ch <- prometheus.MustNewConstMetric( t.Description, prometheus.GaugeValue, - float64(info.ModTime().Unix()), - files[i], + timestamp, + filename, ) } } +func (t *TimestampCollector) addDiscoverer(disc *Discovery) { + t.lock.Lock() + t.discoverers[disc] = struct{}{} + t.lock.Unlock() +} + +func (t *TimestampCollector) removeDiscoverer(disc *Discovery) { + t.lock.Lock() + delete(t.discoverers, disc) + t.lock.Unlock() +} + // NewTimestampCollector creates a TimestampCollector. func NewTimestampCollector() *TimestampCollector { return &TimestampCollector{ Description: prometheus.NewDesc( "prometheus_sd_file_timestamp", - "Timestamp of files read by FileSD", + "Timestamp (mtime) of files read by FileSD. Timestamp is set at read time.", []string{"filename"}, nil, ), + discoverers: make(map[*Discovery]struct{}), } } @@ -108,9 +119,11 @@ func init() { // on files that contain target groups in JSON or YAML format. Refreshing // happens using file watches and periodic refreshes. type Discovery struct { - paths []string - watcher *fsnotify.Watcher - interval time.Duration + paths []string + watcher *fsnotify.Watcher + interval time.Duration + timestamps map[string]float64 + lock sync.RWMutex // lastRefresh stores which files were found during the last refresh // and how many target groups they contained. @@ -124,11 +137,15 @@ func NewDiscovery(conf *config.FileSDConfig, logger log.Logger) *Discovery { if logger == nil { logger = log.NewNopLogger() } - return &Discovery{ - paths: conf.Files, - interval: time.Duration(conf.RefreshInterval), - logger: logger, + + disc := &Discovery{ + paths: conf.Files, + interval: time.Duration(conf.RefreshInterval), + timestamps: make(map[string]float64), + logger: logger, } + fileSDTimeStamp.addDiscoverer(disc) + return disc } // listFiles returns a list of all files that match the configured patterns. @@ -212,6 +229,18 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } } +func (d *Discovery) writeTimestamp(filename string, timestamp float64) { + d.lock.Lock() + d.timestamps[filename] = timestamp + d.lock.Unlock() +} + +func (d *Discovery) deleteTimestamp(filename string) { + d.lock.Lock() + delete(d.timestamps, filename) + d.lock.Unlock() +} + // stop shuts down the file watcher. func (d *Discovery) stop() { level.Debug(d.logger).Log("msg", "Stopping file discovery...", "paths", d.paths) @@ -219,6 +248,8 @@ func (d *Discovery) stop() { done := make(chan struct{}) defer close(done) + fileSDTimeStamp.removeDiscoverer(d) + // Closing the watcher will deadlock unless all events and errors are drained. go func() { for { @@ -245,11 +276,9 @@ func (d *Discovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup defer func() { fileSDScanDuration.Observe(time.Since(t0).Seconds()) }() - - fileSDTimeStamp.SetFiles(d.listFiles()) ref := map[string]int{} for _, p := range d.listFiles() { - tgroups, err := readFile(p) + tgroups, err := d.readFile(p) if err != nil { fileSDReadErrorsCount.Inc() @@ -270,6 +299,8 @@ func (d *Discovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup for f, n := range d.lastRefresh { m, ok := ref[f] if !ok || n > m { + level.Debug(d.logger).Log("msg", "file_sd refresh found file that should be removed", "file", f) + d.deleteTimestamp(f) for i := m; i < n; i++ { select { case ch <- []*config.TargetGroup{{Source: fileSource(f, i)}}: @@ -284,15 +315,21 @@ func (d *Discovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup d.watchFiles() } -// fileSource returns a source ID for the i-th target group in the file. -func fileSource(filename string, i int) string { - return fmt.Sprintf("%s:%d", filename, i) -} - // readFile reads a JSON or YAML list of targets groups from the file, depending on its // file extension. It returns full configuration target groups. -func readFile(filename string) ([]*config.TargetGroup, error) { - content, err := ioutil.ReadFile(filename) +func (d *Discovery) readFile(filename string) ([]*config.TargetGroup, error) { + fd, err := os.Open(filename) + if err != nil { + return nil, err + } + defer fd.Close() + + content, err := ioutil.ReadAll(fd) + if err != nil { + return nil, err + } + + info, err := fd.Stat() if err != nil { return nil, err } @@ -324,5 +361,13 @@ func readFile(filename string) ([]*config.TargetGroup, error) { } tg.Labels[fileSDFilepathLabel] = model.LabelValue(filename) } + + d.writeTimestamp(filename, float64(info.ModTime().Unix())) + return targetGroups, nil } + +// fileSource returns a source ID for the i-th target group in the file. +func fileSource(filename string, i int) string { + return fmt.Sprintf("%s:%d", filename, i) +}