diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 77e891693..9ffe3a906 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -357,7 +357,6 @@ func main() { opts = promql.EngineOpts{ Logger: log.With(logger, "component", "query engine"), Reg: prometheus.DefaultRegisterer, - MaxConcurrent: cfg.queryConcurrency, MaxSamples: cfg.queryMaxSamples, Timeout: time.Duration(cfg.queryTimeout), ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")), diff --git a/promql/bench_test.go b/promql/bench_test.go index 56ef789b1..58f12139e 100644 --- a/promql/bench_test.go +++ b/promql/bench_test.go @@ -29,11 +29,10 @@ func BenchmarkRangeQuery(b *testing.B) { storage := teststorage.New(b) defer storage.Close() opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 50000000, - Timeout: 100 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 50000000, + Timeout: 100 * time.Second, } engine := NewEngine(opts) diff --git a/promql/engine.go b/promql/engine.go index 96d66f977..0add9e147 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -217,7 +217,6 @@ func contextErr(err error, env string) error { type EngineOpts struct { Logger log.Logger Reg prometheus.Registerer - MaxConcurrent int MaxSamples int Timeout time.Duration ActiveQueryTracker *ActiveQueryTracker @@ -299,7 +298,12 @@ func NewEngine(opts EngineOpts) *Engine { Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }), } - metrics.maxConcurrentQueries.Set(float64(opts.MaxConcurrent)) + + if t := opts.ActiveQueryTracker; t != nil { + metrics.maxConcurrentQueries.Set(float64(t.GetMaxConcurrent())) + } else { + metrics.maxConcurrentQueries.Set(-1) + } if opts.Reg != nil { opts.Reg.MustRegister( diff --git a/promql/engine_test.go b/promql/engine_test.go index 677a1468b..a13cdfd77 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -41,7 +41,6 @@ func TestQueryConcurrency(t *testing.T) { opts := EngineOpts{ Logger: nil, Reg: nil, - MaxConcurrent: maxConcurrency, MaxSamples: 10, Timeout: 100 * time.Second, ActiveQueryTracker: queryTracker, @@ -60,7 +59,7 @@ func TestQueryConcurrency(t *testing.T) { return nil } - for i := 0; i < opts.MaxConcurrent; i++ { + for i := 0; i < maxConcurrency; i++ { q := engine.newTestQuery(f) go q.Exec(ctx) select { @@ -92,18 +91,17 @@ func TestQueryConcurrency(t *testing.T) { } // Terminate remaining queries. - for i := 0; i < opts.MaxConcurrent; i++ { + for i := 0; i < maxConcurrency; i++ { block <- struct{}{} } } func TestQueryTimeout(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 20, - MaxSamples: 10, - Timeout: 5 * time.Millisecond, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 5 * time.Millisecond, } engine := NewEngine(opts) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -127,11 +125,10 @@ const errQueryCanceled = ErrQueryCanceled("test statement execution") func TestQueryCancel(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := NewEngine(opts) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -198,11 +195,10 @@ func (e errSeriesSet) Err() error { return e.err } func TestQueryError(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := NewEngine(opts) errStorage := ErrStorage{errors.New("storage error")} @@ -261,11 +257,10 @@ func (*paramCheckerQuerier) Close() error { r func TestParamsSetCorrectly(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } // Set the lookback to be smaller and reset at the end. @@ -466,11 +461,10 @@ func TestParamsSetCorrectly(t *testing.T) { func TestEngineShutdown(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := NewEngine(opts) ctx, cancelCtx := context.WithCancel(context.Background()) @@ -1149,11 +1143,10 @@ func (f *FakeQueryLogger) Log(l ...interface{}) error { func TestQueryLogger_basic(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := NewEngine(opts) @@ -1201,11 +1194,10 @@ func TestQueryLogger_basic(t *testing.T) { func TestQueryLogger_fields(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := NewEngine(opts) @@ -1231,11 +1223,10 @@ func TestQueryLogger_fields(t *testing.T) { func TestQueryLogger_error(t *testing.T) { opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := NewEngine(opts) diff --git a/promql/functions_test.go b/promql/functions_test.go index bd0ced3f4..760d6e382 100644 --- a/promql/functions_test.go +++ b/promql/functions_test.go @@ -31,11 +31,10 @@ func TestDeriv(t *testing.T) { storage := teststorage.New(t) defer storage.Close() opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10000, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 10 * time.Second, } engine := NewEngine(opts) diff --git a/promql/query_logger.go b/promql/query_logger.go index e63e5cde5..1ae1bec5f 100644 --- a/promql/query_logger.go +++ b/promql/query_logger.go @@ -28,9 +28,10 @@ import ( ) type ActiveQueryTracker struct { - mmapedFile []byte - getNextIndex chan int - logger log.Logger + mmapedFile []byte + getNextIndex chan int + logger log.Logger + maxConcurrent int } type Entry struct { @@ -102,13 +103,13 @@ func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, er return fileAsBytes, err } -func NewActiveQueryTracker(localStoragePath string, maxQueries int, logger log.Logger) *ActiveQueryTracker { +func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger log.Logger) *ActiveQueryTracker { err := os.MkdirAll(localStoragePath, 0777) if err != nil { level.Error(logger).Log("msg", "Failed to create directory for logging active queries") } - filename, filesize := filepath.Join(localStoragePath, "queries.active"), 1+maxQueries*entrySize + filename, filesize := filepath.Join(localStoragePath, "queries.active"), 1+maxConcurrent*entrySize logUnfinishedQueries(filename, filesize, logger) fileAsBytes, err := getMMapedFile(filename, filesize, logger) @@ -118,12 +119,13 @@ func NewActiveQueryTracker(localStoragePath string, maxQueries int, logger log.L copy(fileAsBytes, "[") activeQueryTracker := ActiveQueryTracker{ - mmapedFile: fileAsBytes, - getNextIndex: make(chan int, maxQueries), - logger: logger, + mmapedFile: fileAsBytes, + getNextIndex: make(chan int, maxConcurrent), + logger: logger, + maxConcurrent: maxConcurrent, } - activeQueryTracker.generateIndices(maxQueries) + activeQueryTracker.generateIndices(maxConcurrent) return &activeQueryTracker } @@ -164,12 +166,16 @@ func newJSONEntry(query string, logger log.Logger) []byte { return jsonEntry } -func (tracker ActiveQueryTracker) generateIndices(maxQueries int) { - for i := 0; i < maxQueries; i++ { +func (tracker ActiveQueryTracker) generateIndices(maxConcurrent int) { + for i := 0; i < maxConcurrent; i++ { tracker.getNextIndex <- 1 + (i * entrySize) } } +func (tracker ActiveQueryTracker) GetMaxConcurrent() int { + return tracker.maxConcurrent +} + func (tracker ActiveQueryTracker) Delete(insertIndex int) { copy(tracker.mmapedFile[insertIndex:], strings.Repeat("\x00", entrySize)) tracker.getNextIndex <- insertIndex diff --git a/promql/test.go b/promql/test.go index 7c8c688e5..8c2b0497f 100644 --- a/promql/test.go +++ b/promql/test.go @@ -516,11 +516,10 @@ func (t *Test) clear() { t.storage = teststorage.New(t) opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 20, - MaxSamples: 10000, - Timeout: 100 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 100 * time.Second, } t.queryEngine = NewEngine(opts) @@ -630,11 +629,10 @@ func (ll *LazyLoader) clear() { ll.storage = teststorage.New(ll) opts := EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 20, - MaxSamples: 10000, - Timeout: 100 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10000, + Timeout: 100 * time.Second, } ll.queryEngine = NewEngine(opts) diff --git a/rules/alerting_test.go b/rules/alerting_test.go index 7bc37d5c1..59505fd6d 100644 --- a/rules/alerting_test.go +++ b/rules/alerting_test.go @@ -298,11 +298,10 @@ func TestAlertingRuleDuplicate(t *testing.T) { defer storage.Close() opts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) diff --git a/rules/manager_test.go b/rules/manager_test.go index 204c4f51a..1f62009e0 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -502,11 +502,10 @@ func TestStaleness(t *testing.T) { storage := teststorage.New(t) defer storage.Close() engineOpts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(engineOpts) opts := &ManagerOptions{ @@ -689,11 +688,10 @@ func TestUpdate(t *testing.T) { storage := teststorage.New(t) defer storage.Close() opts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) ruleManager := NewManager(&ManagerOptions{ @@ -821,11 +819,10 @@ func TestNotify(t *testing.T) { storage := teststorage.New(t) defer storage.Close() engineOpts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(engineOpts) var lastNotified []*Alert @@ -889,11 +886,10 @@ func TestMetricsUpdate(t *testing.T) { registry := prometheus.NewRegistry() defer storage.Close() opts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) ruleManager := NewManager(&ManagerOptions{ diff --git a/rules/recording_test.go b/rules/recording_test.go index 6a333d290..c00a244c0 100644 --- a/rules/recording_test.go +++ b/rules/recording_test.go @@ -31,11 +31,10 @@ func TestRuleEval(t *testing.T) { defer storage.Close() opts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) @@ -99,11 +98,10 @@ func TestRuleEvalDuplicate(t *testing.T) { defer storage.Close() opts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 10 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 10 * time.Second, } engine := promql.NewEngine(opts) diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 063c6721b..e8e679e38 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -228,11 +228,10 @@ func (m rulesRetrieverMock) RuleGroups() []*rules.Group { defer storage.Close() engineOpts := promql.EngineOpts{ - Logger: nil, - Reg: nil, - MaxConcurrent: 10, - MaxSamples: 10, - Timeout: 100 * time.Second, + Logger: nil, + Reg: nil, + MaxSamples: 10, + Timeout: 100 * time.Second, } engine := promql.NewEngine(engineOpts)