diff --git a/retrieval/target.go b/retrieval/target.go index 133009f4c..f45287dd2 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/local" "github.com/prometheus/prometheus/util/httputil" ) @@ -449,15 +450,29 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) { }, } - var samples model.Vector + var ( + samples model.Vector + numOutOfOrder int + ) for { if err = sdec.Decode(&samples); err != nil { break } for _, s := range samples { - appender.Append(s) + err := appender.Append(s) + if err != nil { + if err == local.ErrOutOfOrderSample { + numOutOfOrder++ + } else { + log.Warnf("Error inserting sample %v: %s", s, err) + } + } + } } + if numOutOfOrder > 0 { + log.Warnf("Error on ingesting %d out-of-order samples") + } if err == io.EOF { return nil @@ -472,7 +487,7 @@ type ruleLabelsAppender struct { labels model.LabelSet } -func (app ruleLabelsAppender) Append(s *model.Sample) { +func (app ruleLabelsAppender) Append(s *model.Sample) error { for ln, lv := range app.labels { if v, ok := s.Metric[ln]; ok && v != "" { s.Metric[model.ExportedLabelPrefix+ln] = v @@ -480,7 +495,7 @@ func (app ruleLabelsAppender) Append(s *model.Sample) { s.Metric[ln] = lv } - app.SampleAppender.Append(s) + return app.SampleAppender.Append(s) } type honorLabelsAppender struct { @@ -491,14 +506,14 @@ type honorLabelsAppender struct { // Merges the sample's metric with the given labels if the label is not // already present in the metric. // This also considers labels explicitly set to the empty string. -func (app honorLabelsAppender) Append(s *model.Sample) { +func (app honorLabelsAppender) Append(s *model.Sample) error { for ln, lv := range app.labels { if _, ok := s.Metric[ln]; !ok { s.Metric[ln] = lv } } - app.SampleAppender.Append(s) + return app.SampleAppender.Append(s) } // Applies a set of relabel configurations to the sample's metric @@ -508,19 +523,18 @@ type relabelAppender struct { relabelings []*config.RelabelConfig } -func (app relabelAppender) Append(s *model.Sample) { +func (app relabelAppender) Append(s *model.Sample) error { labels, err := Relabel(model.LabelSet(s.Metric), app.relabelings...) if err != nil { - log.Errorf("Error while relabeling metric %s: %s", s.Metric, err) - return + return fmt.Errorf("metric relabeling error %s: %s", s.Metric, err) } // Check if the timeseries was dropped. if labels == nil { - return + return nil } s.Metric = model.Metric(labels) - app.SampleAppender.Append(s) + return app.SampleAppender.Append(s) } // URL returns a copy of the target's URL. diff --git a/storage/local/interface.go b/storage/local/interface.go index e260cc762..454c2d9d5 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -33,7 +33,7 @@ type Storage interface { // processing.) The implementation might remove labels with empty value // from the provided Sample as those labels are considered equivalent to // a label not present at all. - Append(*model.Sample) + Append(*model.Sample) error // NeedsThrottling returns true if the Storage has too many chunks in memory // already or has too many chunks waiting for persistence. NeedsThrottling() bool diff --git a/storage/local/storage.go b/storage/local/storage.go index 29700163a..e7a4c292a 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -567,8 +567,10 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin } } +var ErrOutOfOrderSample = fmt.Errorf("sample timestamp out of order") + // Append implements Storage. -func (s *memorySeriesStorage) Append(sample *model.Sample) { +func (s *memorySeriesStorage) Append(sample *model.Sample) error { for ln, lv := range sample.Metric { if len(lv) == 0 { delete(sample.Metric, ln) @@ -594,11 +596,11 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { // It would be even better to also compare the sample values here, but // we don't have efficient access to a series's last value. if sample.Timestamp != series.lastTime { - log.Warnf("Ignoring sample with out-of-order timestamp for fingerprint %v (%v): %v is not after %v", fp, series.metric, sample.Timestamp, series.lastTime) s.outOfOrderSamplesCount.Inc() + return ErrOutOfOrderSample } s.fpLocker.Unlock(fp) - return + return nil } completedChunksCount := series.add(&model.SamplePair{ Value: sample.Value, @@ -607,6 +609,8 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) { s.fpLocker.Unlock(fp) s.ingestedSamplesCount.Inc() s.incNumChunksToPersist(completedChunksCount) + + return nil } // NeedsThrottling implements Storage. diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index ae3528c7d..4ed739707 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -133,13 +133,15 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue // Append queues a sample to be sent to the remote storage. It drops the // sample on the floor if the queue is full. -func (t *StorageQueueManager) Append(s *model.Sample) { +// Always returns nil. +func (t *StorageQueueManager) Append(s *model.Sample) error { select { case t.queue <- s: default: t.samplesCount.WithLabelValues(dropped).Inc() log.Warn("Remote storage queue full, discarding sample.") } + return nil } // Stop stops sending samples to the remote storage and waits for pending diff --git a/storage/remote/remote.go b/storage/remote/remote.go index d295f8f37..6c0ddba9d 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -104,8 +104,8 @@ func (s *Storage) Stop() { } } -// Append implements storage.SampleAppender. -func (s *Storage) Append(smpl *model.Sample) { +// Append implements storage.SampleAppender. Always returns nil. +func (s *Storage) Append(smpl *model.Sample) error { s.mtx.RLock() var snew model.Sample @@ -122,6 +122,7 @@ func (s *Storage) Append(smpl *model.Sample) { for _, q := range s.queues { q.Append(&snew) } + return nil } // NeedsThrottling implements storage.SampleAppender. It will always return diff --git a/storage/storage.go b/storage/storage.go index 71b6bcfa8..86730d643 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -27,7 +27,7 @@ type SampleAppender interface { // sending samples. Local storage implementations will only drop metrics // upon unrecoverable errors. Reporting any errors is done via metrics // and logs and not the concern of the caller. - Append(*model.Sample) + Append(*model.Sample) error // NeedsThrottling returns true if the underlying storage wishes to not // receive any more samples. Append will still work but might lead to // undue resource usage. It is recommended to call NeedsThrottling once @@ -53,10 +53,16 @@ type Fanout []SampleAppender // Append implements SampleAppender. It appends the provided sample to all // SampleAppenders in the Fanout slice and waits for each append to complete // before proceeding with the next. -func (f Fanout) Append(s *model.Sample) { +// If any of the SampleAppenders returns an error, the first one is returned +// at the end. +func (f Fanout) Append(s *model.Sample) error { + var err error for _, a := range f { - a.Append(s) + if e := a.Append(s); e != nil && err == nil { + err = e + } } + return err } // NeedsThrottling returns true if at least one of the SampleAppenders in the