From b7b6717438d7664d59f060ee8da698a8d1557484 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Thu, 23 Jun 2016 13:03:41 +0200 Subject: [PATCH] Separate query interface out of local.Storage. PromQL only requires a much narrower interface than local.Storage in order to run queries. Narrower interfaces are easier to replace and test, too. We could also change the web interface to use local.Querier, except that we'll probably use appending functions from there in the future. --- promql/analyzer.go | 2 +- promql/engine.go | 4 +-- storage/local/interface.go | 55 +++++++++++++++++++---------------- storage/local/storage.go | 2 +- storage/local/storage_test.go | 8 ++--- storage/local/test_helpers.go | 4 +-- 6 files changed, 40 insertions(+), 35 deletions(-) diff --git a/promql/analyzer.go b/promql/analyzer.go index 7243d31db..a3319b211 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -27,7 +27,7 @@ import ( // from the storage. It is bound to a context that allows cancellation and timing out. type Analyzer struct { // The storage from which to query data. - Storage local.Storage + Storage local.Querier // The expression being analyzed. Expr Expr // The time range for evaluation of Expr. diff --git a/promql/engine.go b/promql/engine.go index 23dfeef25..8a7ad6d66 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -218,7 +218,7 @@ func contextDone(ctx context.Context, env string) error { // It is connected to a storage. type Engine struct { // The storage on which the engine operates. - storage local.Storage + storage local.Querier // The base context for all queries and its cancellation function. baseCtx context.Context @@ -230,7 +230,7 @@ type Engine struct { } // NewEngine returns a new engine. -func NewEngine(storage local.Storage, o *EngineOptions) *Engine { +func NewEngine(storage local.Querier, o *EngineOptions) *Engine { if o == nil { o = DefaultEngineOptions } diff --git a/storage/local/interface.go b/storage/local/interface.go index 26bc5325a..be9b31bab 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -16,27 +16,45 @@ package local import ( "time" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/metric" ) // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. Storage implements storage.SampleAppender. type Storage interface { - prometheus.Collector - // Append stores a sample in the Storage. Multiple samples for the same - // fingerprint need to be submitted in chronological order, from oldest - // to newest. When Append has returned, the appended sample might not be - // queryable immediately. (Use WaitForIndexing to wait for complete - // processing.) The implementation might remove labels with empty value - // from the provided Sample as those labels are considered equivalent to - // a label not present at all. - Append(*model.Sample) error - // NeedsThrottling returns true if the Storage has too many chunks in memory + Querier + + // This SampleAppender needs multiple samples for the same fingerprint to be + // submitted in chronological order, from oldest to newest. When Append has + // returned, the appended sample might not be queryable immediately. (Use + // WaitForIndexing to wait for complete processing.) The implementation might + // remove labels with empty value from the provided Sample as those labels + // are considered equivalent to a label not present at all. + // + // Appending is throttled if the Storage has too many chunks in memory // already or has too many chunks waiting for persistence. - NeedsThrottling() bool + storage.SampleAppender + + // Drop all time series associated with the given fingerprints. + DropMetricsForFingerprints(...model.Fingerprint) + // Run the various maintenance loops in goroutines. Returns when the + // storage is ready to use. Keeps everything running in the background + // until Stop is called. + Start() error + // Stop shuts down the Storage gracefully, flushes all pending + // operations, stops all maintenance loops,and frees all resources. + Stop() error + // WaitForIndexing returns once all samples in the storage are + // indexed. Indexing is needed for FingerprintsForLabelMatchers and + // LabelValuesForLabelName and may lag behind. + WaitForIndexing() +} + +// Querier allows querying a time series storage. +type Querier interface { // NewPreloader returns a new Preloader which allows preloading and pinning // series data into memory for use within a query. NewPreloader() Preloader @@ -56,19 +74,6 @@ type Storage interface { LastSampleForFingerprint(model.Fingerprint) model.Sample // Get all of the label values that are associated with a given label name. LabelValuesForLabelName(model.LabelName) model.LabelValues - // Drop all time series associated with the given fingerprints. - DropMetricsForFingerprints(...model.Fingerprint) - // Run the various maintenance loops in goroutines. Returns when the - // storage is ready to use. Keeps everything running in the background - // until Stop is called. - Start() error - // Stop shuts down the Storage gracefully, flushes all pending - // operations, stops all maintenance loops,and frees all resources. - Stop() error - // WaitForIndexing returns once all samples in the storage are - // indexed. Indexing is needed for FingerprintsForLabelMatchers and - // LabelValuesForLabelName and may lag behind. - WaitForIndexing() } // SeriesIterator enables efficient access of sample values in a series. Its diff --git a/storage/local/storage.go b/storage/local/storage.go index fb97dd603..9a817e381 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -189,7 +189,7 @@ type MemorySeriesStorageOptions struct { // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // has to be called to start the storage. -func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { +func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *memorySeriesStorage { s := &memorySeriesStorage{ fpLocker: newFingerprintLocker(o.NumMutexes), diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 476fe60c8..48eb34184 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -718,7 +718,7 @@ func TestLoop(t *testing.T) { storage.Append(s) } storage.WaitForIndexing() - series, _ := storage.(*memorySeriesStorage).fpToSeries.get(model.Metric{}.FastFingerprint()) + series, _ := storage.fpToSeries.get(model.Metric{}.FastFingerprint()) cdsBefore := len(series.chunkDescs) time.Sleep(fpMaxWaitDuration + time.Second) // TODO(beorn7): Ugh, need to wait for maintenance to kick in. cdsAfter := len(series.chunkDescs) @@ -1497,12 +1497,12 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) { for _, sample := range samples[start:middle] { s.Append(sample) } - verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:middle]) + verifyStorageRandom(b, s, samples[:middle]) for _, sample := range samples[middle:end] { s.Append(sample) } - verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:end]) - verifyStorageSequential(b, s.(*memorySeriesStorage), samples) + verifyStorageRandom(b, s, samples[:end]) + verifyStorageSequential(b, s, samples) } } diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 1dedf518e..3695ea310 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -52,7 +52,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage, SyncStrategy: Adaptive, } storage := NewMemorySeriesStorage(o) - storage.(*memorySeriesStorage).archiveHighWatermark = model.Latest + storage.archiveHighWatermark = model.Latest if err := storage.Start(); err != nil { directory.Close() t.Fatalf("Error creating storage: %s", err) @@ -63,5 +63,5 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage, directory: directory, } - return storage.(*memorySeriesStorage), closer + return storage, closer }