diff --git a/promql/analyzer.go b/promql/analyzer.go index 7243d31db..a3319b211 100644 --- a/promql/analyzer.go +++ b/promql/analyzer.go @@ -27,7 +27,7 @@ import ( // from the storage. It is bound to a context that allows cancellation and timing out. type Analyzer struct { // The storage from which to query data. - Storage local.Storage + Storage local.Querier // The expression being analyzed. Expr Expr // The time range for evaluation of Expr. diff --git a/promql/engine.go b/promql/engine.go index 23dfeef25..8a7ad6d66 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -218,7 +218,7 @@ func contextDone(ctx context.Context, env string) error { // It is connected to a storage. type Engine struct { // The storage on which the engine operates. - storage local.Storage + storage local.Querier // The base context for all queries and its cancellation function. baseCtx context.Context @@ -230,7 +230,7 @@ type Engine struct { } // NewEngine returns a new engine. -func NewEngine(storage local.Storage, o *EngineOptions) *Engine { +func NewEngine(storage local.Querier, o *EngineOptions) *Engine { if o == nil { o = DefaultEngineOptions } diff --git a/storage/local/interface.go b/storage/local/interface.go index 26bc5325a..be9b31bab 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -16,27 +16,45 @@ package local import ( "time" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/metric" ) // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. Storage implements storage.SampleAppender. type Storage interface { - prometheus.Collector - // Append stores a sample in the Storage. Multiple samples for the same - // fingerprint need to be submitted in chronological order, from oldest - // to newest. When Append has returned, the appended sample might not be - // queryable immediately. (Use WaitForIndexing to wait for complete - // processing.) The implementation might remove labels with empty value - // from the provided Sample as those labels are considered equivalent to - // a label not present at all. - Append(*model.Sample) error - // NeedsThrottling returns true if the Storage has too many chunks in memory + Querier + + // This SampleAppender needs multiple samples for the same fingerprint to be + // submitted in chronological order, from oldest to newest. When Append has + // returned, the appended sample might not be queryable immediately. (Use + // WaitForIndexing to wait for complete processing.) The implementation might + // remove labels with empty value from the provided Sample as those labels + // are considered equivalent to a label not present at all. + // + // Appending is throttled if the Storage has too many chunks in memory // already or has too many chunks waiting for persistence. - NeedsThrottling() bool + storage.SampleAppender + + // Drop all time series associated with the given fingerprints. + DropMetricsForFingerprints(...model.Fingerprint) + // Run the various maintenance loops in goroutines. Returns when the + // storage is ready to use. Keeps everything running in the background + // until Stop is called. + Start() error + // Stop shuts down the Storage gracefully, flushes all pending + // operations, stops all maintenance loops,and frees all resources. + Stop() error + // WaitForIndexing returns once all samples in the storage are + // indexed. Indexing is needed for FingerprintsForLabelMatchers and + // LabelValuesForLabelName and may lag behind. + WaitForIndexing() +} + +// Querier allows querying a time series storage. +type Querier interface { // NewPreloader returns a new Preloader which allows preloading and pinning // series data into memory for use within a query. NewPreloader() Preloader @@ -56,19 +74,6 @@ type Storage interface { LastSampleForFingerprint(model.Fingerprint) model.Sample // Get all of the label values that are associated with a given label name. LabelValuesForLabelName(model.LabelName) model.LabelValues - // Drop all time series associated with the given fingerprints. - DropMetricsForFingerprints(...model.Fingerprint) - // Run the various maintenance loops in goroutines. Returns when the - // storage is ready to use. Keeps everything running in the background - // until Stop is called. - Start() error - // Stop shuts down the Storage gracefully, flushes all pending - // operations, stops all maintenance loops,and frees all resources. - Stop() error - // WaitForIndexing returns once all samples in the storage are - // indexed. Indexing is needed for FingerprintsForLabelMatchers and - // LabelValuesForLabelName and may lag behind. - WaitForIndexing() } // SeriesIterator enables efficient access of sample values in a series. Its diff --git a/storage/local/storage.go b/storage/local/storage.go index fb97dd603..9a817e381 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -189,7 +189,7 @@ type MemorySeriesStorageOptions struct { // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // has to be called to start the storage. -func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) Storage { +func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *memorySeriesStorage { s := &memorySeriesStorage{ fpLocker: newFingerprintLocker(o.NumMutexes), diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 476fe60c8..48eb34184 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -718,7 +718,7 @@ func TestLoop(t *testing.T) { storage.Append(s) } storage.WaitForIndexing() - series, _ := storage.(*memorySeriesStorage).fpToSeries.get(model.Metric{}.FastFingerprint()) + series, _ := storage.fpToSeries.get(model.Metric{}.FastFingerprint()) cdsBefore := len(series.chunkDescs) time.Sleep(fpMaxWaitDuration + time.Second) // TODO(beorn7): Ugh, need to wait for maintenance to kick in. cdsAfter := len(series.chunkDescs) @@ -1497,12 +1497,12 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) { for _, sample := range samples[start:middle] { s.Append(sample) } - verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:middle]) + verifyStorageRandom(b, s, samples[:middle]) for _, sample := range samples[middle:end] { s.Append(sample) } - verifyStorageRandom(b, s.(*memorySeriesStorage), samples[:end]) - verifyStorageSequential(b, s.(*memorySeriesStorage), samples) + verifyStorageRandom(b, s, samples[:end]) + verifyStorageSequential(b, s, samples) } } diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 1dedf518e..3695ea310 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -52,7 +52,7 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage, SyncStrategy: Adaptive, } storage := NewMemorySeriesStorage(o) - storage.(*memorySeriesStorage).archiveHighWatermark = model.Latest + storage.archiveHighWatermark = model.Latest if err := storage.Start(); err != nil { directory.Close() t.Fatalf("Error creating storage: %s", err) @@ -63,5 +63,5 @@ func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage, directory: directory, } - return storage.(*memorySeriesStorage), closer + return storage, closer }