diff --git a/pkg/labels/labels.go b/pkg/labels/labels.go index 3b462e68d..b919408ed 100644 --- a/pkg/labels/labels.go +++ b/pkg/labels/labels.go @@ -200,6 +200,20 @@ func (ls Labels) Has(name string) bool { return false } +// HasDuplicateLabels returns whether ls has duplicate label names. +// It assumes that the labelset is sorted. +func (ls Labels) HasDuplicateLabelNames() (string, bool) { + for i, l := range ls { + if i == 0 { + continue + } + if l.Name == ls[i-1].Name { + return l.Name, true + } + } + return "", false +} + // WithoutEmpty returns the labelset without empty labels. // May return the same labelset. func (ls Labels) WithoutEmpty() Labels { diff --git a/pkg/labels/labels_test.go b/pkg/labels/labels_test.go index 173288aff..a8d319c52 100644 --- a/pkg/labels/labels_test.go +++ b/pkg/labels/labels_test.go @@ -149,3 +149,29 @@ func TestLabels_MatchLabels(t *testing.T) { testutil.Equals(t, test.expected, got, "unexpected labelset for test case %d", i) } } + +func TestLabels_HasDuplicateLabelNames(t *testing.T) { + cases := []struct { + Input Labels + Duplicate bool + LabelName string + }{ + { + Input: FromMap(map[string]string{"__name__": "up", "hostname": "localhost"}), + Duplicate: false, + }, { + Input: append( + FromMap(map[string]string{"__name__": "up", "hostname": "localhost"}), + FromMap(map[string]string{"hostname": "127.0.0.1"})..., + ), + Duplicate: true, + LabelName: "hostname", + }, + } + + for i, c := range cases { + l, d := c.Input.HasDuplicateLabelNames() + testutil.Equals(t, c.Duplicate, d, "test %d: incorrect duplicate bool", i) + testutil.Equals(t, c.LabelName, l, "test %d: incorrect label name", i) + } +} diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 01788bef8..dfde91ddf 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -917,7 +917,6 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) { } func TestScrapeLoopAppend(t *testing.T) { - tests := []struct { title string honorLabels bool @@ -1535,3 +1534,45 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) { } testutil.Equals(t, want, capp.result, "Appended samples not as expected") } + +func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { + s := teststorage.New(t) + defer s.Close() + + app, err := s.Appender() + testutil.Ok(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + sl := newScrapeLoop(ctx, + &testScraper{}, + nil, nil, + nopMutator, + nopMutator, + func() storage.Appender { return app }, + nil, + 0, + true, + ) + defer cancel() + + // We add a good and a bad metric to check that both are discarded. + _, _, _, err = sl.append([]byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{}) + testutil.NotOk(t, err) + + q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0) + testutil.Ok(t, err) + series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) + testutil.Ok(t, err) + testutil.Equals(t, false, series.Next(), "series found in tsdb") + + // We add a good metric to check that it is recorded. + _, _, _, err = sl.append([]byte("test_metric{le=\"500\"} 1\n"), "", time.Time{}) + testutil.Ok(t, err) + + q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0) + testutil.Ok(t, err) + series, _, err = q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "le", "500")) + testutil.Ok(t, err) + testutil.Equals(t, true, series.Next(), "series not found in tsdb") + testutil.Equals(t, false, series.Next(), "more than one series found in tsdb") +} diff --git a/tsdb/head.go b/tsdb/head.go index 314048248..caebeb4cb 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -53,6 +53,10 @@ var ( // writable time range. ErrOutOfBounds = errors.New("out of bounds") + // ErrInvalidSample is returned if an appended sample is not valid and can't + // be ingested. + ErrInvalidSample = errors.New("invalid sample") + // emptyTombstoneReader is a no-op Tombstone Reader. // This is used by head to satisfy the Tombstones() function call. emptyTombstoneReader = tombstones.NewMemTombstones() @@ -921,6 +925,10 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro // Ensure no empty labels have gotten through. lset = lset.WithoutEmpty() + if l, dup := lset.HasDuplicateLabelNames(); dup { + return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l)) + } + s, created := a.head.getOrCreate(lset.Hash(), lset) if created { a.series = append(a.series, record.RefSeries{ diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 4b826e50b..b6a7c1cba 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1255,7 +1255,7 @@ func TestWalRepair_DecodingError(t *testing.T) { } func TestNewWalSegmentOnTruncate(t *testing.T) { - dir, err := ioutil.TempDir("", "test_wal_segements") + dir, err := ioutil.TempDir("", "test_wal_segments") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) @@ -1290,3 +1290,28 @@ func TestNewWalSegmentOnTruncate(t *testing.T) { testutil.Ok(t, err) testutil.Equals(t, 2, last) } + +func TestAddDuplicateLabelName(t *testing.T) { + dir, err := ioutil.TempDir("", "test_duplicate_label_name") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() + wlog, err := wal.NewSize(nil, nil, dir, 32768, false) + testutil.Ok(t, err) + + h, err := NewHead(nil, nil, wlog, 1000) + testutil.Ok(t, err) + defer h.Close() + + add := func(labels labels.Labels, labelName string) { + app := h.Appender() + _, err = app.Add(labels, 0, 0) + testutil.NotOk(t, err) + testutil.Equals(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error()) + } + + add(labels.Labels{{Name: "a", Value: "c"}, {Name: "a", Value: "b"}}, "a") + add(labels.Labels{{Name: "a", Value: "c"}, {Name: "a", Value: "c"}}, "a") + add(labels.Labels{{Name: "__name__", Value: "up"}, {Name: "job", Value: "prometheus"}, {Name: "le", Value: "500"}, {Name: "le", Value: "400"}, {Name: "unit", Value: "s"}}, "le") +}