Remove MaxConcurrent from the PromQL engine opts (#6712)

Since we use ActiveQueryTracker to check for concurrency in
d992c36b3a it does not make sense to keep
the MaxConcurrent value as an option of the PromQL engine.

This pull request removes it from the PromQL engine options, sets the
max concurrent metric to -1 if there is no active query tracker, and use
the value of the active query tracker otherwise.

It removes dead code and also will inform people who import the promql
package that we made that change, as it breaks the EngineOpts struct.

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
Julien Pivotto 2020-01-28 21:38:49 +01:00 committed by Brian Brazil
parent 540dc7dfb0
commit 9adad8ad30
11 changed files with 105 additions and 117 deletions

View File

@ -357,7 +357,6 @@ func main() {
opts = promql.EngineOpts{ opts = promql.EngineOpts{
Logger: log.With(logger, "component", "query engine"), Logger: log.With(logger, "component", "query engine"),
Reg: prometheus.DefaultRegisterer, Reg: prometheus.DefaultRegisterer,
MaxConcurrent: cfg.queryConcurrency,
MaxSamples: cfg.queryMaxSamples, MaxSamples: cfg.queryMaxSamples,
Timeout: time.Duration(cfg.queryTimeout), Timeout: time.Duration(cfg.queryTimeout),
ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")), ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")),

View File

@ -29,11 +29,10 @@ func BenchmarkRangeQuery(b *testing.B) {
storage := teststorage.New(b) storage := teststorage.New(b)
defer storage.Close() defer storage.Close()
opts := EngineOpts{ opts := EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 50000000,
MaxSamples: 50000000, Timeout: 100 * time.Second,
Timeout: 100 * time.Second,
} }
engine := NewEngine(opts) engine := NewEngine(opts)

View File

@ -217,7 +217,6 @@ func contextErr(err error, env string) error {
type EngineOpts struct { type EngineOpts struct {
Logger log.Logger Logger log.Logger
Reg prometheus.Registerer Reg prometheus.Registerer
MaxConcurrent int
MaxSamples int MaxSamples int
Timeout time.Duration Timeout time.Duration
ActiveQueryTracker *ActiveQueryTracker ActiveQueryTracker *ActiveQueryTracker
@ -299,7 +298,12 @@ func NewEngine(opts EngineOpts) *Engine {
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}), }),
} }
metrics.maxConcurrentQueries.Set(float64(opts.MaxConcurrent))
if t := opts.ActiveQueryTracker; t != nil {
metrics.maxConcurrentQueries.Set(float64(t.GetMaxConcurrent()))
} else {
metrics.maxConcurrentQueries.Set(-1)
}
if opts.Reg != nil { if opts.Reg != nil {
opts.Reg.MustRegister( opts.Reg.MustRegister(

View File

@ -41,7 +41,6 @@ func TestQueryConcurrency(t *testing.T) {
opts := EngineOpts{ opts := EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: maxConcurrency,
MaxSamples: 10, MaxSamples: 10,
Timeout: 100 * time.Second, Timeout: 100 * time.Second,
ActiveQueryTracker: queryTracker, ActiveQueryTracker: queryTracker,
@ -60,7 +59,7 @@ func TestQueryConcurrency(t *testing.T) {
return nil return nil
} }
for i := 0; i < opts.MaxConcurrent; i++ { for i := 0; i < maxConcurrency; i++ {
q := engine.newTestQuery(f) q := engine.newTestQuery(f)
go q.Exec(ctx) go q.Exec(ctx)
select { select {
@ -92,18 +91,17 @@ func TestQueryConcurrency(t *testing.T) {
} }
// Terminate remaining queries. // Terminate remaining queries.
for i := 0; i < opts.MaxConcurrent; i++ { for i := 0; i < maxConcurrency; i++ {
block <- struct{}{} block <- struct{}{}
} }
} }
func TestQueryTimeout(t *testing.T) { func TestQueryTimeout(t *testing.T) {
opts := EngineOpts{ opts := EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 20, MaxSamples: 10,
MaxSamples: 10, Timeout: 5 * time.Millisecond,
Timeout: 5 * time.Millisecond,
} }
engine := NewEngine(opts) engine := NewEngine(opts)
ctx, cancelCtx := context.WithCancel(context.Background()) ctx, cancelCtx := context.WithCancel(context.Background())
@ -127,11 +125,10 @@ const errQueryCanceled = ErrQueryCanceled("test statement execution")
func TestQueryCancel(t *testing.T) { func TestQueryCancel(t *testing.T) {
opts := EngineOpts{ opts := EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := NewEngine(opts) engine := NewEngine(opts)
ctx, cancelCtx := context.WithCancel(context.Background()) ctx, cancelCtx := context.WithCancel(context.Background())
@ -198,11 +195,10 @@ func (e errSeriesSet) Err() error { return e.err }
func TestQueryError(t *testing.T) { func TestQueryError(t *testing.T) {
opts := EngineOpts{ opts := EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := NewEngine(opts) engine := NewEngine(opts)
errStorage := ErrStorage{errors.New("storage error")} errStorage := ErrStorage{errors.New("storage error")}
@ -261,11 +257,10 @@ func (*paramCheckerQuerier) Close() error { r
func TestParamsSetCorrectly(t *testing.T) { func TestParamsSetCorrectly(t *testing.T) {
opts := EngineOpts{ opts := EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
// Set the lookback to be smaller and reset at the end. // Set the lookback to be smaller and reset at the end.
@ -466,11 +461,10 @@ func TestParamsSetCorrectly(t *testing.T) {
func TestEngineShutdown(t *testing.T) { func TestEngineShutdown(t *testing.T) {
opts := EngineOpts{ opts := EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := NewEngine(opts) engine := NewEngine(opts)
ctx, cancelCtx := context.WithCancel(context.Background()) ctx, cancelCtx := context.WithCancel(context.Background())
@ -1149,11 +1143,10 @@ func (f *FakeQueryLogger) Log(l ...interface{}) error {
func TestQueryLogger_basic(t *testing.T) { func TestQueryLogger_basic(t *testing.T) {
opts := EngineOpts{ opts := EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := NewEngine(opts) engine := NewEngine(opts)
@ -1201,11 +1194,10 @@ func TestQueryLogger_basic(t *testing.T) {
func TestQueryLogger_fields(t *testing.T) { func TestQueryLogger_fields(t *testing.T) {
opts := EngineOpts{ opts := EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := NewEngine(opts) engine := NewEngine(opts)
@ -1231,11 +1223,10 @@ func TestQueryLogger_fields(t *testing.T) {
func TestQueryLogger_error(t *testing.T) { func TestQueryLogger_error(t *testing.T) {
opts := EngineOpts{ opts := EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := NewEngine(opts) engine := NewEngine(opts)

View File

@ -31,11 +31,10 @@ func TestDeriv(t *testing.T) {
storage := teststorage.New(t) storage := teststorage.New(t)
defer storage.Close() defer storage.Close()
opts := EngineOpts{ opts := EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10000,
MaxSamples: 10000, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := NewEngine(opts) engine := NewEngine(opts)

View File

@ -28,9 +28,10 @@ import (
) )
type ActiveQueryTracker struct { type ActiveQueryTracker struct {
mmapedFile []byte mmapedFile []byte
getNextIndex chan int getNextIndex chan int
logger log.Logger logger log.Logger
maxConcurrent int
} }
type Entry struct { type Entry struct {
@ -102,13 +103,13 @@ func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, er
return fileAsBytes, err return fileAsBytes, err
} }
func NewActiveQueryTracker(localStoragePath string, maxQueries int, logger log.Logger) *ActiveQueryTracker { func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger log.Logger) *ActiveQueryTracker {
err := os.MkdirAll(localStoragePath, 0777) err := os.MkdirAll(localStoragePath, 0777)
if err != nil { if err != nil {
level.Error(logger).Log("msg", "Failed to create directory for logging active queries") level.Error(logger).Log("msg", "Failed to create directory for logging active queries")
} }
filename, filesize := filepath.Join(localStoragePath, "queries.active"), 1+maxQueries*entrySize filename, filesize := filepath.Join(localStoragePath, "queries.active"), 1+maxConcurrent*entrySize
logUnfinishedQueries(filename, filesize, logger) logUnfinishedQueries(filename, filesize, logger)
fileAsBytes, err := getMMapedFile(filename, filesize, logger) fileAsBytes, err := getMMapedFile(filename, filesize, logger)
@ -118,12 +119,13 @@ func NewActiveQueryTracker(localStoragePath string, maxQueries int, logger log.L
copy(fileAsBytes, "[") copy(fileAsBytes, "[")
activeQueryTracker := ActiveQueryTracker{ activeQueryTracker := ActiveQueryTracker{
mmapedFile: fileAsBytes, mmapedFile: fileAsBytes,
getNextIndex: make(chan int, maxQueries), getNextIndex: make(chan int, maxConcurrent),
logger: logger, logger: logger,
maxConcurrent: maxConcurrent,
} }
activeQueryTracker.generateIndices(maxQueries) activeQueryTracker.generateIndices(maxConcurrent)
return &activeQueryTracker return &activeQueryTracker
} }
@ -164,12 +166,16 @@ func newJSONEntry(query string, logger log.Logger) []byte {
return jsonEntry return jsonEntry
} }
func (tracker ActiveQueryTracker) generateIndices(maxQueries int) { func (tracker ActiveQueryTracker) generateIndices(maxConcurrent int) {
for i := 0; i < maxQueries; i++ { for i := 0; i < maxConcurrent; i++ {
tracker.getNextIndex <- 1 + (i * entrySize) tracker.getNextIndex <- 1 + (i * entrySize)
} }
} }
func (tracker ActiveQueryTracker) GetMaxConcurrent() int {
return tracker.maxConcurrent
}
func (tracker ActiveQueryTracker) Delete(insertIndex int) { func (tracker ActiveQueryTracker) Delete(insertIndex int) {
copy(tracker.mmapedFile[insertIndex:], strings.Repeat("\x00", entrySize)) copy(tracker.mmapedFile[insertIndex:], strings.Repeat("\x00", entrySize))
tracker.getNextIndex <- insertIndex tracker.getNextIndex <- insertIndex

View File

@ -516,11 +516,10 @@ func (t *Test) clear() {
t.storage = teststorage.New(t) t.storage = teststorage.New(t)
opts := EngineOpts{ opts := EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 20, MaxSamples: 10000,
MaxSamples: 10000, Timeout: 100 * time.Second,
Timeout: 100 * time.Second,
} }
t.queryEngine = NewEngine(opts) t.queryEngine = NewEngine(opts)
@ -630,11 +629,10 @@ func (ll *LazyLoader) clear() {
ll.storage = teststorage.New(ll) ll.storage = teststorage.New(ll)
opts := EngineOpts{ opts := EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 20, MaxSamples: 10000,
MaxSamples: 10000, Timeout: 100 * time.Second,
Timeout: 100 * time.Second,
} }
ll.queryEngine = NewEngine(opts) ll.queryEngine = NewEngine(opts)

View File

@ -298,11 +298,10 @@ func TestAlertingRuleDuplicate(t *testing.T) {
defer storage.Close() defer storage.Close()
opts := promql.EngineOpts{ opts := promql.EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := promql.NewEngine(opts) engine := promql.NewEngine(opts)

View File

@ -502,11 +502,10 @@ func TestStaleness(t *testing.T) {
storage := teststorage.New(t) storage := teststorage.New(t)
defer storage.Close() defer storage.Close()
engineOpts := promql.EngineOpts{ engineOpts := promql.EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := promql.NewEngine(engineOpts) engine := promql.NewEngine(engineOpts)
opts := &ManagerOptions{ opts := &ManagerOptions{
@ -689,11 +688,10 @@ func TestUpdate(t *testing.T) {
storage := teststorage.New(t) storage := teststorage.New(t)
defer storage.Close() defer storage.Close()
opts := promql.EngineOpts{ opts := promql.EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := promql.NewEngine(opts) engine := promql.NewEngine(opts)
ruleManager := NewManager(&ManagerOptions{ ruleManager := NewManager(&ManagerOptions{
@ -821,11 +819,10 @@ func TestNotify(t *testing.T) {
storage := teststorage.New(t) storage := teststorage.New(t)
defer storage.Close() defer storage.Close()
engineOpts := promql.EngineOpts{ engineOpts := promql.EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := promql.NewEngine(engineOpts) engine := promql.NewEngine(engineOpts)
var lastNotified []*Alert var lastNotified []*Alert
@ -889,11 +886,10 @@ func TestMetricsUpdate(t *testing.T) {
registry := prometheus.NewRegistry() registry := prometheus.NewRegistry()
defer storage.Close() defer storage.Close()
opts := promql.EngineOpts{ opts := promql.EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := promql.NewEngine(opts) engine := promql.NewEngine(opts)
ruleManager := NewManager(&ManagerOptions{ ruleManager := NewManager(&ManagerOptions{

View File

@ -31,11 +31,10 @@ func TestRuleEval(t *testing.T) {
defer storage.Close() defer storage.Close()
opts := promql.EngineOpts{ opts := promql.EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := promql.NewEngine(opts) engine := promql.NewEngine(opts)
@ -99,11 +98,10 @@ func TestRuleEvalDuplicate(t *testing.T) {
defer storage.Close() defer storage.Close()
opts := promql.EngineOpts{ opts := promql.EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 10 * time.Second,
Timeout: 10 * time.Second,
} }
engine := promql.NewEngine(opts) engine := promql.NewEngine(opts)

View File

@ -228,11 +228,10 @@ func (m rulesRetrieverMock) RuleGroups() []*rules.Group {
defer storage.Close() defer storage.Close()
engineOpts := promql.EngineOpts{ engineOpts := promql.EngineOpts{
Logger: nil, Logger: nil,
Reg: nil, Reg: nil,
MaxConcurrent: 10, MaxSamples: 10,
MaxSamples: 10, Timeout: 100 * time.Second,
Timeout: 100 * time.Second,
} }
engine := promql.NewEngine(engineOpts) engine := promql.NewEngine(engineOpts)