diff --git a/storage/remote/write.go b/storage/remote/write.go index dba8b1fd5..c322a4347 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -50,11 +50,13 @@ type WriteStorage struct { logger log.Logger mtx sync.Mutex - configHash [16]byte - walDir string - queues []*QueueManager - samplesIn *ewmaRate - flushDeadline time.Duration + configHash [16]byte + externalLabelHash [16]byte + walDir string + queues []*QueueManager + hashes [][16]byte + samplesIn *ewmaRate + flushDeadline time.Duration } // NewWriteStorage creates and runs a WriteStorage. @@ -81,6 +83,7 @@ func (rws *WriteStorage) run() { } // ApplyConfig updates the state as the new config requires. +// Only stop & create queues which have changes. func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.mtx.Lock() defer rws.mtx.Unlock() @@ -97,19 +100,38 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { return err } - hash := md5.Sum(append(cfgBytes, externalLabelBytes...)) - if hash == rws.configHash { + configHash := md5.Sum(cfgBytes) + externalLabelHash := md5.Sum(externalLabelBytes) + externalLabelUnchanged := externalLabelHash == rws.externalLabelHash + if configHash == rws.configHash && externalLabelUnchanged { level.Debug(rws.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers") return nil } - rws.configHash = hash + rws.configHash = configHash + rws.externalLabelHash = externalLabelHash // Update write queues newQueues := []*QueueManager{} - // TODO: we should only stop & recreate queues which have changes, - // as this can be quite disruptive. + newHashes := [][16]byte{} + newClientIndexes := []int{} for i, rwConf := range conf.RemoteWriteConfigs { + b, err := json.Marshal(rwConf) + if err != nil { + return err + } + + // Use RemoteWriteConfigs and its index to get hash. So if its index changed, + // the correspoinding queue should also be restarted. + hash := md5.Sum(b) + if i < len(rws.queues) && rws.hashes[i] == hash && externalLabelUnchanged { + // The RemoteWriteConfig and index both not changed, keep the queue. + newQueues = append(newQueues, rws.queues[i]) + newHashes = append(newHashes, hash) + rws.queues[i] = nil + continue + } + // Otherwise create a new queue. c, err := NewClient(i, &ClientConfig{ URL: rwConf.URL, Timeout: rwConf.RemoteTimeout, @@ -128,16 +150,24 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { c, rws.flushDeadline, )) + newHashes = append(newHashes, hash) + newClientIndexes = append(newClientIndexes, i) } for _, q := range rws.queues { - q.Stop() + // A nil queue means that queue has been reused. + if q != nil { + q.Stop() + } + } + + for _, index := range newClientIndexes { + newQueues[index].Start() } rws.queues = newQueues - for _, q := range rws.queues { - q.Start() - } + rws.hashes = newHashes + return nil } diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index c87cf3e62..0d602a70f 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -17,7 +17,9 @@ import ( "io/ioutil" "os" "testing" + "time" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/util/testutil" @@ -93,3 +95,57 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) { err = s.Close() testutil.Ok(t, err) } + +func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) { + dir, err := ioutil.TempDir("", "TestWriteStorageApplyConfigsPartialUpdate") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s := NewWriteStorage(nil, dir, defaultFlushDeadline) + + c0 := &config.RemoteWriteConfig{ + RemoteTimeout: model.Duration(10 * time.Second), + QueueConfig: config.DefaultQueueConfig, + } + c1 := &config.RemoteWriteConfig{ + RemoteTimeout: model.Duration(20 * time.Second), + QueueConfig: config.DefaultQueueConfig, + } + c2 := &config.RemoteWriteConfig{ + RemoteTimeout: model.Duration(30 * time.Second), + QueueConfig: config.DefaultQueueConfig, + } + + conf := &config.Config{ + GlobalConfig: config.GlobalConfig{}, + RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2}, + } + s.ApplyConfig(conf) + testutil.Equals(t, 3, len(s.queues)) + q := s.queues[1] + + // Update c0 and c2. + c0.RemoteTimeout = model.Duration(40 * time.Second) + c2.RemoteTimeout = model.Duration(50 * time.Second) + conf = &config.Config{ + GlobalConfig: config.GlobalConfig{}, + RemoteWriteConfigs: []*config.RemoteWriteConfig{c0, c1, c2}, + } + s.ApplyConfig(conf) + testutil.Equals(t, 3, len(s.queues)) + + testutil.Assert(t, q == s.queues[1], "Pointer of unchanged queue should have remained the same") + + // Delete c0. + conf = &config.Config{ + GlobalConfig: config.GlobalConfig{}, + RemoteWriteConfigs: []*config.RemoteWriteConfig{c1, c2}, + } + s.ApplyConfig(conf) + testutil.Equals(t, 2, len(s.queues)) + + testutil.Assert(t, q != s.queues[1], "If the index changed, the queue should be stopped and recreated.") + + err = s.Close() + testutil.Ok(t, err) +}