diff --git a/storage/remote/client.go b/storage/remote/client.go index bba984bc8..8da4d8452 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -37,18 +37,16 @@ const maxErrMsgLen = 256 // Client allows reading and writing from/to a remote HTTP endpoint. type Client struct { - index int // Used to differentiate metrics. - url *config.URL - client *http.Client - timeout time.Duration - readRecent bool + index int // Used to differentiate clients in metrics. + url *config.URL + client *http.Client + timeout time.Duration } // ClientConfig configures a Client. type ClientConfig struct { URL *config.URL Timeout model.Duration - ReadRecent bool HTTPClientConfig config.HTTPClientConfig } @@ -60,11 +58,10 @@ func NewClient(index int, conf *ClientConfig) (*Client, error) { } return &Client{ - index: index, - url: conf.URL, - client: httpClient, - timeout: time.Duration(conf.Timeout), - readRecent: conf.ReadRecent, + index: index, + url: conf.URL, + client: httpClient, + timeout: time.Duration(conf.Timeout), }, nil } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 5b966acba..0fa25f36c 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -158,6 +158,12 @@ func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet { } } +type byLabel []storage.Series + +func (a byLabel) Len() int { return len(a) } +func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } + // errSeriesSet implements storage.SeriesSet, just returning an error. type errSeriesSet struct { err error diff --git a/storage/remote/read.go b/storage/remote/read.go index c466e7ddb..c3753c378 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -21,55 +21,30 @@ import ( "github.com/prometheus/prometheus/storage" ) -// Querier returns a new Querier on the storage. -func (r *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - r.mtx.Lock() - defer r.mtx.Unlock() - - queriers := make([]storage.Querier, 0, len(r.clients)) - localStartTime, err := r.localStartTimeCallback() - if err != nil { - return nil, err - } - for _, c := range r.clients { - cmaxt := maxt - if !c.readRecent { - // Avoid queries whose timerange is later than the first timestamp in local DB. - if mint > localStartTime { - continue - } - // Query only samples older than the first timestamp in local DB. - if maxt > localStartTime { - cmaxt = localStartTime - } - } - queriers = append(queriers, &querier{ - ctx: ctx, - mint: mint, - maxt: cmaxt, - client: c, - externalLabels: r.externalLabels, - }) - } - return newMergeQueriers(queriers), nil +// QueryableClient returns a storage.Queryable which queries the given +// Client to select series sets. +func QueryableClient(c *Client) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return &querier{ + ctx: ctx, + mint: mint, + maxt: maxt, + client: c, + }, nil + }) } -// Store it in variable to make it mockable in tests since a mergeQuerier is not publicly exposed. -var newMergeQueriers = storage.NewMergeQuerier - -// Querier is an adapter to make a Client usable as a storage.Querier. +// querier is an adapter to make a Client usable as a storage.Querier. type querier struct { - ctx context.Context - mint, maxt int64 - client *Client - externalLabels model.LabelSet + ctx context.Context + mint, maxt int64 + client *Client } -// Select returns a set of series that matches the given label matchers. +// Select implements storage.Querier and uses the given matchers to read series +// sets from the Client. func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { - m, added := q.addExternalLabels(matchers) - - query, err := ToQuery(q.mint, q.maxt, m) + query, err := ToQuery(q.mint, q.maxt, matchers) if err != nil { return errSeriesSet{err: err} } @@ -79,40 +54,83 @@ func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { return errSeriesSet{err: err} } - seriesSet := FromQueryResult(res) - - return newSeriesSetFilter(seriesSet, added) + return FromQueryResult(res) } -type byLabel []storage.Series - -func (a byLabel) Len() int { return len(a) } -func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } - -// LabelValues returns all potential values for a label name. +// LabelValues implements storage.Querier and is a noop. func (q *querier) LabelValues(name string) ([]string, error) { // TODO implement? return nil, nil } -// Close releases the resources of the Querier. +// Close implements storage.Querier and is a noop. func (q *querier) Close() error { return nil } +// ExternablLabelsHandler returns a storage.Queryable which creates a +// externalLabelsQuerier. +func ExternablLabelsHandler(next storage.Queryable, externalLabels model.LabelSet) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + q, err := next.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + return &externalLabelsQuerier{Querier: q, externalLabels: externalLabels}, nil + }) +} + +// externalLabelsQuerier is a querier which ensures that Select() results match +// the configured external labels. +type externalLabelsQuerier struct { + storage.Querier + + externalLabels model.LabelSet +} + +// Select adds equality matchers for all external labels to the list of matchers +// before calling the wrapped storage.Queryable. The added external labels are +// removed from the returned series sets. +func (q externalLabelsQuerier) Select(matchers ...*labels.Matcher) storage.SeriesSet { + m, added := q.addExternalLabels(matchers) + s := q.Querier.Select(m...) + return newSeriesSetFilter(s, added) +} + +// PreferLocalStorageFilter returns a QueryableFunc which creates a NoopQuerier +// if requested timeframe can be answered completely by the local TSDB, and +// reduces maxt if the timeframe can be partially answered by TSDB. +func PreferLocalStorageFilter(next storage.Queryable, cb startTimeCallback) storage.Queryable { + return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + localStartTime, err := cb() + if err != nil { + return nil, err + } + cmaxt := maxt + // Avoid queries whose timerange is later than the first timestamp in local DB. + if mint > localStartTime { + return storage.NoopQuerier(), nil + } + // Query only samples older than the first timestamp in local DB. + if maxt > localStartTime { + cmaxt = localStartTime + } + return next.Querier(ctx, mint, cmaxt) + }) +} + // addExternalLabels adds matchers for each external label. External labels // that already have a corresponding user-supplied matcher are skipped, as we // assume that the user explicitly wants to select a different value for them. // We return the new set of matchers, along with a map of labels for which // matchers were added, so that these can later be removed from the result // time series again. -func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) { +func (q externalLabelsQuerier) addExternalLabels(ms []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) { el := make(model.LabelSet, len(q.externalLabels)) for k, v := range q.externalLabels { el[k] = v } - for _, m := range matchers { + for _, m := range ms { if _, ok := el[model.LabelName(m.Name)]; ok { delete(el, model.LabelName(m.Name)) } @@ -122,9 +140,9 @@ func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Match if err != nil { panic(err) } - matchers = append(matchers, m) + ms = append(ms, m) } - return matchers, el + return ms, el } func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.SeriesSet { diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 2fce33189..e8ba06f23 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -18,10 +18,8 @@ import ( "reflect" "sort" "testing" - "time" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" @@ -35,7 +33,22 @@ func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher return m } -func TestAddExternalLabels(t *testing.T) { +func TestExternalLabelsQuerierSelect(t *testing.T) { + matchers := []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + } + q := &externalLabelsQuerier{ + Querier: mockQuerier{}, + externalLabels: model.LabelSet{"region": "europe"}, + } + + want := newSeriesSetFilter(mockSeriesSet{}, q.externalLabels) + if have := q.Select(matchers...); !reflect.DeepEqual(want, have) { + t.Errorf("expected series set %+v, got %+v", want, have) + } +} + +func TestExternalLabelsQuerierAddExternalLabels(t *testing.T) { tests := []struct { el model.LabelSet inMatchers []*labels.Matcher @@ -80,10 +93,7 @@ func TestAddExternalLabels(t *testing.T) { } for i, test := range tests { - q := querier{ - externalLabels: test.el, - } - + q := &externalLabelsQuerier{Querier: mockQuerier{}, externalLabels: test.el} matchers, added := q.addExternalLabels(test.inMatchers) sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name }) @@ -133,58 +143,57 @@ func TestSeriesSetFilter(t *testing.T) { } } -type mockMergeQuerier struct{ queriersCount int } +type testQuerier struct { + ctx context.Context + mint, maxt int64 -func (*mockMergeQuerier) Select(...*labels.Matcher) storage.SeriesSet { return nil } -func (*mockMergeQuerier) LabelValues(name string) ([]string, error) { return nil, nil } -func (*mockMergeQuerier) Close() error { return nil } + storage.Querier +} + +func TestPreferLocalStorageFilter(t *testing.T) { + ctx := context.Background() -func TestRemoteStorageQuerier(t *testing.T) { tests := []struct { - localStartTime int64 - readRecentClients []bool - mint int64 - maxt int64 - expectedQueriersCount int + localStartTime int64 + mint int64 + maxt int64 + querier storage.Querier }{ { - localStartTime: int64(20), - readRecentClients: []bool{true, true, false}, - mint: int64(0), - maxt: int64(50), - expectedQueriersCount: 3, + localStartTime: int64(100), + mint: int64(0), + maxt: int64(50), + querier: testQuerier{ctx: ctx, mint: 0, maxt: 50}, }, { - localStartTime: int64(20), - readRecentClients: []bool{true, true, false}, - mint: int64(30), - maxt: int64(50), - expectedQueriersCount: 2, + localStartTime: int64(20), + mint: int64(0), + maxt: int64(50), + querier: testQuerier{ctx: ctx, mint: 0, maxt: 20}, + }, + { + localStartTime: int64(20), + mint: int64(30), + maxt: int64(50), + querier: storage.NoopQuerier(), }, } for i, test := range tests { - s := NewStorage(nil, func() (int64, error) { return test.localStartTime, nil }) - s.clients = []*Client{} - for _, readRecent := range test.readRecentClients { - c, _ := NewClient(0, &ClientConfig{ - URL: nil, - Timeout: model.Duration(30 * time.Second), - HTTPClientConfig: config.HTTPClientConfig{}, - ReadRecent: readRecent, - }) - s.clients = append(s.clients, c) - } - // overrides mergeQuerier to mockMergeQuerier so we can reflect its type - newMergeQueriers = func(queriers []storage.Querier) storage.Querier { - return &mockMergeQuerier{queriersCount: len(queriers)} + f := PreferLocalStorageFilter( + storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return testQuerier{ctx: ctx, mint: mint, maxt: maxt}, nil + }), + func() (int64, error) { return test.localStartTime, nil }, + ) + + q, err := f.Querier(ctx, test.mint, test.maxt) + if err != nil { + t.Fatal(err) } - querier, _ := s.Querier(context.Background(), test.mint, test.maxt) - actualQueriersCount := reflect.ValueOf(querier).Interface().(*mockMergeQuerier).queriersCount - - if !reflect.DeepEqual(actualQueriersCount, test.expectedQueriersCount) { - t.Fatalf("%d. unexpected queriers count; want %v, got %v", i, test.expectedQueriersCount, actualQueriersCount) + if test.querier != q { + t.Errorf("%d. expected quierer %+v, got %+v", i, test.querier, q) } } } diff --git a/storage/remote/storage.go b/storage/remote/storage.go index bb8c17372..b07c34638 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -14,11 +14,13 @@ package remote import ( + "context" "sync" "github.com/go-kit/kit/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/storage" ) // Callback func that return the oldest timestamp stored in a storage. @@ -34,9 +36,8 @@ type Storage struct { queues []*QueueManager // For reads - clients []*Client + queryables []storage.Queryable localStartTimeCallback startTimeCallback - externalLabels model.LabelSet } // NewStorage returns a remote.Storage. @@ -86,22 +87,25 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { // Update read clients - clients := []*Client{} + s.queryables = make([]storage.Queryable, 0, len(conf.RemoteReadConfigs)) for i, rrConf := range conf.RemoteReadConfigs { c, err := NewClient(i, &ClientConfig{ URL: rrConf.URL, Timeout: rrConf.RemoteTimeout, HTTPClientConfig: rrConf.HTTPClientConfig, - ReadRecent: rrConf.ReadRecent, }) if err != nil { return err } - clients = append(clients, c) - } - s.clients = clients - s.externalLabels = conf.GlobalConfig.ExternalLabels + var q storage.Queryable + q = QueryableClient(c) + q = ExternablLabelsHandler(q, conf.GlobalConfig.ExternalLabels) + if !rrConf.ReadRecent { + q = PreferLocalStorageFilter(q, s.localStartTimeCallback) + } + s.queryables = append(s.queryables, q) + } return nil } @@ -111,6 +115,24 @@ func (s *Storage) StartTime() (int64, error) { return int64(model.Latest), nil } +// Querier returns a storage.MergeQuerier combining the remote client queriers +// of each configured remote read endpoint. +func (s *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + s.mtx.Lock() + queryables := s.queryables + s.mtx.Unlock() + + queriers := make([]storage.Querier, 0, len(queryables)) + for _, queryable := range queryables { + q, err := queryable.Querier(ctx, mint, maxt) + if err != nil { + return nil, err + } + queriers = append(queriers, q) + } + return storage.NewMergeQuerier(queriers), nil +} + // Close the background processing of the storage queues. func (s *Storage) Close() error { s.mtx.Lock()