diff --git a/storage/remote/storage.go b/storage/remote/storage.go index d1e83fdf6..444c04954 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -15,13 +15,10 @@ package remote import ( "context" - "crypto/md5" - "encoding/json" "sync" "time" "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -40,13 +37,7 @@ type Storage struct { logger log.Logger mtx sync.Mutex - configHash [16]byte - - // For writes - walDir string - queues []*QueueManager - samplesIn *ewmaRate - flushDeadline time.Duration + rws *WriteStorage // For reads queryables []storage.Queryable @@ -61,28 +52,17 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal s := &Storage{ logger: logging.Dedupe(l, 1*time.Minute), localStartTimeCallback: stCallback, - flushDeadline: flushDeadline, - samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), - walDir: walDir, + rws: NewWriteStorage(l, walDir, flushDeadline), } - go s.run() return s } -func (s *Storage) run() { - ticker := time.NewTicker(shardUpdateDuration) - defer ticker.Stop() - for range ticker.C { - s.samplesIn.tick() - } -} - // ApplyConfig updates the state as the new config requires. func (s *Storage) ApplyConfig(conf *config.Config) error { s.mtx.Lock() defer s.mtx.Unlock() - if err := s.applyRemoteWriteConfig(conf); err != nil { + if err := s.rws.ApplyConfig(conf); err != nil { return err } @@ -113,66 +93,6 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { return nil } -// applyRemoteWriteConfig applies the remote write config only if the config has changed. -// The caller must hold the lock on s.mtx. -func (s *Storage) applyRemoteWriteConfig(conf *config.Config) error { - // Remote write queues only need to change if the remote write config or - // external labels change. Hash these together and only reload if the hash - // changes. - cfgBytes, err := json.Marshal(conf.RemoteWriteConfigs) - if err != nil { - return err - } - externalLabelBytes, err := json.Marshal(conf.GlobalConfig.ExternalLabels) - if err != nil { - return err - } - - hash := md5.Sum(append(cfgBytes, externalLabelBytes...)) - if hash == s.configHash { - level.Debug(s.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers") - return nil - } - - s.configHash = hash - - // Update write queues - newQueues := []*QueueManager{} - // TODO: we should only stop & recreate queues which have changes, - // as this can be quite disruptive. - for i, rwConf := range conf.RemoteWriteConfigs { - c, err := NewClient(i, &ClientConfig{ - URL: rwConf.URL, - Timeout: rwConf.RemoteTimeout, - HTTPClientConfig: rwConf.HTTPClientConfig, - }) - if err != nil { - return err - } - newQueues = append(newQueues, NewQueueManager( - s.logger, - s.walDir, - s.samplesIn, - rwConf.QueueConfig, - conf.GlobalConfig.ExternalLabels, - rwConf.WriteRelabelConfigs, - c, - s.flushDeadline, - )) - } - - for _, q := range s.queues { - q.Stop() - } - - s.queues = newQueues - for _, q := range s.queues { - q.Start() - } - - return nil -} - // StartTime implements the Storage interface. func (s *Storage) StartTime() (int64, error) { return int64(model.Latest), nil @@ -196,16 +116,16 @@ func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querie return storage.NewMergeQuerier(nil, queriers), nil } +// Appender implements storage.Storage. +func (s *Storage) Appender() (storage.Appender, error) { + return s.rws.Appender() +} + // Close the background processing of the storage queues. func (s *Storage) Close() error { s.mtx.Lock() defer s.mtx.Unlock() - - for _, q := range s.queues { - q.Stop() - } - - return nil + return s.rws.Close() } func labelsToEqualityMatchers(ls model.LabelSet) []*labels.Matcher { diff --git a/storage/remote/storage_test.go b/storage/remote/storage_test.go index 285b4e3bd..b03639253 100644 --- a/storage/remote/storage_test.go +++ b/storage/remote/storage_test.go @@ -20,7 +20,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/util/testutil" ) @@ -42,7 +41,7 @@ func TestStorageLifecycle(t *testing.T) { s.ApplyConfig(conf) // make sure remote write has a queue. - testutil.Equals(t, 1, len(s.queues)) + testutil.Equals(t, 1, len(s.rws.queues)) // make sure remote write has a queue. testutil.Equals(t, 1, len(s.queryables)) @@ -51,33 +50,6 @@ func TestStorageLifecycle(t *testing.T) { testutil.Ok(t, err) } -func TestUpdateExternalLabels(t *testing.T) { - dir, err := ioutil.TempDir("", "TestUpdateExternalLabels") - testutil.Ok(t, err) - defer os.RemoveAll(dir) - - s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline) - - externalLabels := labels.FromStrings("external", "true") - conf := &config.Config{ - GlobalConfig: config.GlobalConfig{}, - RemoteWriteConfigs: []*config.RemoteWriteConfig{ - &config.DefaultRemoteWriteConfig, - }, - } - s.ApplyConfig(conf) - testutil.Equals(t, 1, len(s.queues)) - testutil.Equals(t, labels.Labels(nil), s.queues[0].externalLabels) - - conf.GlobalConfig.ExternalLabels = externalLabels - s.ApplyConfig(conf) - testutil.Equals(t, 1, len(s.queues)) - testutil.Equals(t, externalLabels, s.queues[0].externalLabels) - - err = s.Close() - testutil.Ok(t, err) -} - func TestUpdateRemoteReadConfigs(t *testing.T) { dir, err := ioutil.TempDir("", "TestUpdateRemoteReadConfigs") testutil.Ok(t, err) @@ -100,31 +72,3 @@ func TestUpdateRemoteReadConfigs(t *testing.T) { err = s.Close() testutil.Ok(t, err) } - -func TestUpdateRemoteWriteConfigsNoop(t *testing.T) { - dir, err := ioutil.TempDir("", "TestUpdateRemoteWriteConfigsNoop") - testutil.Ok(t, err) - defer os.RemoveAll(dir) - - s := NewStorage(nil, prometheus.DefaultRegisterer, nil, dir, defaultFlushDeadline) - - conf := &config.Config{ - GlobalConfig: config.GlobalConfig{}, - RemoteWriteConfigs: []*config.RemoteWriteConfig{ - &config.DefaultRemoteWriteConfig, - }, - } - s.ApplyConfig(conf) - testutil.Equals(t, 1, len(s.queues)) - queue := s.queues[0] - - conf.RemoteReadConfigs = []*config.RemoteReadConfig{ - &config.DefaultRemoteReadConfig, - } - s.ApplyConfig(conf) - testutil.Equals(t, 1, len(s.queues)) - testutil.Assert(t, queue == s.queues[0], "Queue pointer should have remained the same") - - err = s.Close() - testutil.Ok(t, err) -} diff --git a/storage/remote/write.go b/storage/remote/write.go index b1e3ee9fd..dba8b1fd5 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -14,8 +14,16 @@ package remote import ( + "crypto/md5" + "encoding/json" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) @@ -37,15 +45,121 @@ var ( } ) -// Appender implements scrape.Appendable. -func (s *Storage) Appender() (storage.Appender, error) { +// WriteStorage represents all the remote write storage. +type WriteStorage struct { + logger log.Logger + mtx sync.Mutex + + configHash [16]byte + walDir string + queues []*QueueManager + samplesIn *ewmaRate + flushDeadline time.Duration +} + +// NewWriteStorage creates and runs a WriteStorage. +func NewWriteStorage(logger log.Logger, walDir string, flushDeadline time.Duration) *WriteStorage { + if logger == nil { + logger = log.NewNopLogger() + } + rws := &WriteStorage{ + logger: logger, + flushDeadline: flushDeadline, + samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), + walDir: walDir, + } + go rws.run() + return rws +} + +func (rws *WriteStorage) run() { + ticker := time.NewTicker(shardUpdateDuration) + defer ticker.Stop() + for range ticker.C { + rws.samplesIn.tick() + } +} + +// ApplyConfig updates the state as the new config requires. +func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { + rws.mtx.Lock() + defer rws.mtx.Unlock() + + // Remote write queues only need to change if the remote write config or + // external labels change. Hash these together and only reload if the hash + // changes. + cfgBytes, err := json.Marshal(conf.RemoteWriteConfigs) + if err != nil { + return err + } + externalLabelBytes, err := json.Marshal(conf.GlobalConfig.ExternalLabels) + if err != nil { + return err + } + + hash := md5.Sum(append(cfgBytes, externalLabelBytes...)) + if hash == rws.configHash { + level.Debug(rws.logger).Log("msg", "remote write config has not changed, no need to restart QueueManagers") + return nil + } + + rws.configHash = hash + + // Update write queues + newQueues := []*QueueManager{} + // TODO: we should only stop & recreate queues which have changes, + // as this can be quite disruptive. + for i, rwConf := range conf.RemoteWriteConfigs { + c, err := NewClient(i, &ClientConfig{ + URL: rwConf.URL, + Timeout: rwConf.RemoteTimeout, + HTTPClientConfig: rwConf.HTTPClientConfig, + }) + if err != nil { + return err + } + newQueues = append(newQueues, NewQueueManager( + rws.logger, + rws.walDir, + rws.samplesIn, + rwConf.QueueConfig, + conf.GlobalConfig.ExternalLabels, + rwConf.WriteRelabelConfigs, + c, + rws.flushDeadline, + )) + } + + for _, q := range rws.queues { + q.Stop() + } + + rws.queues = newQueues + for _, q := range rws.queues { + q.Start() + } + return nil +} + +// Appender implements storage.Storage. +func (rws *WriteStorage) Appender() (storage.Appender, error) { return ×tampTracker{ - storage: s, + writeStorage: rws, }, nil } +// Close closes the WriteStorage. +func (rws *WriteStorage) Close() error { + rws.mtx.Lock() + defer rws.mtx.Unlock() + for _, q := range rws.queues { + q.Stop() + } + return nil +} + type timestampTracker struct { - storage *Storage + writeStorage *WriteStorage samples int64 highestTimestamp int64 } @@ -67,7 +181,7 @@ func (t *timestampTracker) AddFast(l labels.Labels, _ uint64, ts int64, v float6 // Commit implements storage.Appender. func (t *timestampTracker) Commit() error { - t.storage.samplesIn.incr(t.samples) + t.writeStorage.samplesIn.incr(t.samples) samplesIn.Add(float64(t.samples)) highestTimestamp.Set(float64(t.highestTimestamp / 1000)) diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go new file mode 100644 index 000000000..c87cf3e62 --- /dev/null +++ b/storage/remote/write_test.go @@ -0,0 +1,95 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/util/testutil" +) + +func TestWriteStorageLifecycle(t *testing.T) { + dir, err := ioutil.TempDir("", "TestWriteStorageLifecycle") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s := NewWriteStorage(nil, dir, defaultFlushDeadline) + conf := &config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + &config.DefaultRemoteWriteConfig, + }, + } + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + + err = s.Close() + testutil.Ok(t, err) +} + +func TestUpdateExternalLabels(t *testing.T) { + dir, err := ioutil.TempDir("", "TestUpdateExternalLabels") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s := NewWriteStorage(nil, dir, defaultFlushDeadline) + + externalLabels := labels.FromStrings("external", "true") + conf := &config.Config{ + GlobalConfig: config.GlobalConfig{}, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + &config.DefaultRemoteWriteConfig, + }, + } + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + testutil.Equals(t, labels.Labels(nil), s.queues[0].externalLabels) + + conf.GlobalConfig.ExternalLabels = externalLabels + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + testutil.Equals(t, externalLabels, s.queues[0].externalLabels) + + err = s.Close() + testutil.Ok(t, err) +} + +func TestWriteStorageApplyConfigsIdempotent(t *testing.T) { + dir, err := ioutil.TempDir("", "TestWriteStorageApplyConfigsIdempotent") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + s := NewWriteStorage(nil, dir, defaultFlushDeadline) + + conf := &config.Config{ + GlobalConfig: config.GlobalConfig{}, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + &config.DefaultRemoteWriteConfig, + }, + } + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + queue := s.queues[0] + + s.ApplyConfig(conf) + testutil.Equals(t, 1, len(s.queues)) + testutil.Assert(t, queue == s.queues[0], "Queue pointer should have remained the same") + + err = s.Close() + testutil.Ok(t, err) +}