tsdb: error on series with duplicate labels (#6664)

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
Julien Pivotto 2020-01-20 12:05:27 +01:00 committed by Brian Brazil
parent 08d6ddc7f2
commit 46d18112a3
5 changed files with 116 additions and 2 deletions

View File

@ -200,6 +200,20 @@ func (ls Labels) Has(name string) bool {
return false
}
// HasDuplicateLabels returns whether ls has duplicate label names.
// It assumes that the labelset is sorted.
func (ls Labels) HasDuplicateLabelNames() (string, bool) {
for i, l := range ls {
if i == 0 {
continue
}
if l.Name == ls[i-1].Name {
return l.Name, true
}
}
return "", false
}
// WithoutEmpty returns the labelset without empty labels.
// May return the same labelset.
func (ls Labels) WithoutEmpty() Labels {

View File

@ -149,3 +149,29 @@ func TestLabels_MatchLabels(t *testing.T) {
testutil.Equals(t, test.expected, got, "unexpected labelset for test case %d", i)
}
}
func TestLabels_HasDuplicateLabelNames(t *testing.T) {
cases := []struct {
Input Labels
Duplicate bool
LabelName string
}{
{
Input: FromMap(map[string]string{"__name__": "up", "hostname": "localhost"}),
Duplicate: false,
}, {
Input: append(
FromMap(map[string]string{"__name__": "up", "hostname": "localhost"}),
FromMap(map[string]string{"hostname": "127.0.0.1"})...,
),
Duplicate: true,
LabelName: "hostname",
},
}
for i, c := range cases {
l, d := c.Input.HasDuplicateLabelNames()
testutil.Equals(t, c.Duplicate, d, "test %d: incorrect duplicate bool", i)
testutil.Equals(t, c.LabelName, l, "test %d: incorrect label name", i)
}
}

View File

@ -917,7 +917,6 @@ func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
}
func TestScrapeLoopAppend(t *testing.T) {
tests := []struct {
title string
honorLabels bool
@ -1535,3 +1534,45 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
}
testutil.Equals(t, want, capp.result, "Appended samples not as expected")
}
func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
s := teststorage.New(t)
defer s.Close()
app, err := s.Appender()
testutil.Ok(t, err)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
&testScraper{},
nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return app },
nil,
0,
true,
)
defer cancel()
// We add a good and a bad metric to check that both are discarded.
_, _, _, err = sl.append([]byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
testutil.NotOk(t, err)
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err := q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))
testutil.Ok(t, err)
testutil.Equals(t, false, series.Next(), "series found in tsdb")
// We add a good metric to check that it is recorded.
_, _, _, err = sl.append([]byte("test_metric{le=\"500\"} 1\n"), "", time.Time{})
testutil.Ok(t, err)
q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
series, _, err = q.Select(&storage.SelectParams{}, labels.MustNewMatcher(labels.MatchEqual, "le", "500"))
testutil.Ok(t, err)
testutil.Equals(t, true, series.Next(), "series not found in tsdb")
testutil.Equals(t, false, series.Next(), "more than one series found in tsdb")
}

View File

@ -53,6 +53,10 @@ var (
// writable time range.
ErrOutOfBounds = errors.New("out of bounds")
// ErrInvalidSample is returned if an appended sample is not valid and can't
// be ingested.
ErrInvalidSample = errors.New("invalid sample")
// emptyTombstoneReader is a no-op Tombstone Reader.
// This is used by head to satisfy the Tombstones() function call.
emptyTombstoneReader = tombstones.NewMemTombstones()
@ -921,6 +925,10 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if l, dup := lset.HasDuplicateLabelNames(); dup {
return 0, errors.Wrap(ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, l))
}
s, created := a.head.getOrCreate(lset.Hash(), lset)
if created {
a.series = append(a.series, record.RefSeries{

View File

@ -1255,7 +1255,7 @@ func TestWalRepair_DecodingError(t *testing.T) {
}
func TestNewWalSegmentOnTruncate(t *testing.T) {
dir, err := ioutil.TempDir("", "test_wal_segements")
dir, err := ioutil.TempDir("", "test_wal_segments")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
@ -1290,3 +1290,28 @@ func TestNewWalSegmentOnTruncate(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, 2, last)
}
func TestAddDuplicateLabelName(t *testing.T) {
dir, err := ioutil.TempDir("", "test_duplicate_label_name")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()
wlog, err := wal.NewSize(nil, nil, dir, 32768, false)
testutil.Ok(t, err)
h, err := NewHead(nil, nil, wlog, 1000)
testutil.Ok(t, err)
defer h.Close()
add := func(labels labels.Labels, labelName string) {
app := h.Appender()
_, err = app.Add(labels, 0, 0)
testutil.NotOk(t, err)
testutil.Equals(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error())
}
add(labels.Labels{{Name: "a", Value: "c"}, {Name: "a", Value: "b"}}, "a")
add(labels.Labels{{Name: "a", Value: "c"}, {Name: "a", Value: "c"}}, "a")
add(labels.Labels{{Name: "__name__", Value: "up"}, {Name: "job", Value: "prometheus"}, {Name: "le", Value: "500"}, {Name: "le", Value: "400"}, {Name: "unit", Value: "s"}}, "le")
}