agent: make the global hash lookup table smaller

This is the same change made in #13040, plus subsequent improvements,
applied to agent-mode code.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
This commit is contained in:
Bryan Boreham 2023-12-07 15:27:59 +00:00
parent 1b74378a4c
commit e64d7d8928
2 changed files with 146 additions and 44 deletions

View File

@ -45,14 +45,23 @@ func (m *memSeries) updateTimestamp(newTs int64) bool {
return false
}
// seriesHashmap is a simple hashmap for memSeries by their label set.
// It is built on top of a regular hashmap and holds a slice of series to
// resolve hash collisions. Its methods require the hash to be submitted
// seriesHashmap lets agent find a memSeries by its label set, via a 64-bit hash.
// There is one map for the common case where the hash value is unique, and a
// second map for the case that two series have the same hash value.
// Each series is in only one of the maps. Its methods require the hash to be submitted
// with the label set to avoid re-computing hash throughout the code.
type seriesHashmap map[uint64][]*memSeries
type seriesHashmap struct {
unique map[uint64]*memSeries
conflicts map[uint64][]*memSeries
}
func (m seriesHashmap) Get(hash uint64, lset labels.Labels) *memSeries {
for _, s := range m[hash] {
func (m *seriesHashmap) Get(hash uint64, lset labels.Labels) *memSeries {
if s, found := m.unique[hash]; found {
if labels.Equal(s.lset, lset) {
return s
}
}
for _, s := range m.conflicts[hash] {
if labels.Equal(s.lset, lset) {
return s
}
@ -60,28 +69,49 @@ func (m seriesHashmap) Get(hash uint64, lset labels.Labels) *memSeries {
return nil
}
func (m seriesHashmap) Set(hash uint64, s *memSeries) {
seriesSet := m[hash]
func (m *seriesHashmap) Set(hash uint64, s *memSeries) {
if existing, found := m.unique[hash]; !found || labels.Equal(existing.lset, s.lset) {
m.unique[hash] = s
return
}
if m.conflicts == nil {
m.conflicts = make(map[uint64][]*memSeries)
}
seriesSet := m.conflicts[hash]
for i, prev := range seriesSet {
if labels.Equal(prev.lset, s.lset) {
seriesSet[i] = s
return
}
}
m[hash] = append(seriesSet, s)
m.conflicts[hash] = append(seriesSet, s)
}
func (m seriesHashmap) Delete(hash uint64, ref chunks.HeadSeriesRef) {
func (m *seriesHashmap) Delete(hash uint64, ref chunks.HeadSeriesRef) {
var rem []*memSeries
for _, s := range m[hash] {
if s.ref != ref {
rem = append(rem, s)
unique, found := m.unique[hash]
switch {
case !found: // Supplied hash is not stored.
return
case unique.ref == ref:
conflicts := m.conflicts[hash]
if len(conflicts) == 0 { // Exactly one series with this hash was stored
delete(m.unique, hash)
return
}
m.unique[hash] = conflicts[0] // First remaining series goes in 'unique'.
rem = conflicts[1:] // Keep the rest.
default: // The series to delete is somewhere in 'conflicts'. Keep all the ones that don't match.
for _, s := range m.conflicts[hash] {
if s.ref != ref {
rem = append(rem, s)
}
}
}
if len(rem) == 0 {
delete(m, hash)
delete(m.conflicts, hash)
} else {
m[hash] = rem
m.conflicts[hash] = rem
}
}
@ -117,7 +147,10 @@ func newStripeSeries(stripeSize int) *stripeSeries {
s.series[i] = map[chunks.HeadSeriesRef]*memSeries{}
}
for i := range s.hashes {
s.hashes[i] = seriesHashmap{}
s.hashes[i] = seriesHashmap{
unique: map[uint64]*memSeries{},
conflicts: nil, // Initialized on demand in set().
}
}
for i := range s.exemplars {
s.exemplars[i] = map[chunks.HeadSeriesRef]*exemplar.Exemplar{}
@ -136,40 +169,49 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} {
defer s.gcMut.Unlock()
deleted := map[chunks.HeadSeriesRef]struct{}{}
// For one series, truncate old chunks and check if any chunks left. If not, mark as deleted and collect the ID.
check := func(hashLock int, hash uint64, series *memSeries) {
series.Lock()
// Any series that has received a write since mint is still alive.
if series.lastTs >= mint {
series.Unlock()
return
}
// The series is stale. We need to obtain a second lock for the
// ref if it's different than the hash lock.
refLock := int(series.ref) & (s.size - 1)
if hashLock != refLock {
s.locks[refLock].Lock()
}
deleted[series.ref] = struct{}{}
delete(s.series[refLock], series.ref)
s.hashes[hashLock].Delete(hash, series.ref)
// Since the series is gone, we'll also delete
// the latest stored exemplar.
delete(s.exemplars[refLock], series.ref)
if hashLock != refLock {
s.locks[refLock].Unlock()
}
series.Unlock()
}
for hashLock := 0; hashLock < s.size; hashLock++ {
s.locks[hashLock].Lock()
for hash, all := range s.hashes[hashLock] {
for hash, all := range s.hashes[hashLock].conflicts {
for _, series := range all {
series.Lock()
// Any series that has received a write since mint is still alive.
if series.lastTs >= mint {
series.Unlock()
continue
}
// The series is stale. We need to obtain a second lock for the
// ref if it's different than the hash lock.
refLock := int(series.ref) & (s.size - 1)
if hashLock != refLock {
s.locks[refLock].Lock()
}
deleted[series.ref] = struct{}{}
delete(s.series[refLock], series.ref)
s.hashes[hashLock].Delete(hash, series.ref)
// Since the series is gone, we'll also delete
// the latest stored exemplar.
delete(s.exemplars[refLock], series.ref)
if hashLock != refLock {
s.locks[refLock].Unlock()
}
series.Unlock()
check(hashLock, hash, series)
}
}
for hash, series := range s.hashes[hashLock].unique {
check(hashLock, hash, series)
}
s.locks[hashLock].Unlock()
}

View File

@ -74,3 +74,63 @@ func TestNoDeadlock(t *testing.T) {
require.FailNow(t, "deadlock detected")
}
}
func labelsWithHashCollision() (labels.Labels, labels.Labels) {
// These two series have the same XXHash; thanks to https://github.com/pstibrany/labels_hash_collisions
ls1 := labels.FromStrings("__name__", "metric", "lbl1", "value", "lbl2", "l6CQ5y")
ls2 := labels.FromStrings("__name__", "metric", "lbl1", "value", "lbl2", "v7uDlF")
if ls1.Hash() != ls2.Hash() {
// These ones are the same when using -tags stringlabels
ls1 = labels.FromStrings("__name__", "metric", "lbl", "HFnEaGl")
ls2 = labels.FromStrings("__name__", "metric", "lbl", "RqcXatm")
}
if ls1.Hash() != ls2.Hash() {
panic("This code needs to be updated: find new labels with colliding hash values.")
}
return ls1, ls2
}
// stripeSeriesWithCollidingSeries returns a stripeSeries with two memSeries having the same, colliding, hash.
func stripeSeriesWithCollidingSeries(_ *testing.T) (*stripeSeries, *memSeries, *memSeries) {
lbls1, lbls2 := labelsWithHashCollision()
ms1 := memSeries{
lset: lbls1,
}
ms2 := memSeries{
lset: lbls2,
}
hash := lbls1.Hash()
s := newStripeSeries(1)
s.Set(hash, &ms1)
s.Set(hash, &ms2)
return s, &ms1, &ms2
}
func TestStripeSeries_Get(t *testing.T) {
s, ms1, ms2 := stripeSeriesWithCollidingSeries(t)
hash := ms1.lset.Hash()
// Verify that we can get both of the series despite the hash collision
got := s.GetByHash(hash, ms1.lset)
require.Same(t, ms1, got)
got = s.GetByHash(hash, ms2.lset)
require.Same(t, ms2, got)
}
func TestStripeSeries_gc(t *testing.T) {
s, ms1, ms2 := stripeSeriesWithCollidingSeries(t)
hash := ms1.lset.Hash()
s.GC(1)
// Verify that we can get neither ms1 nor ms2 after gc-ing corresponding series
got := s.GetByHash(hash, ms1.lset)
require.Nil(t, got)
got = s.GetByHash(hash, ms2.lset)
require.Nil(t, got)
}