mirror of
https://github.com/prometheus/prometheus
synced 2025-01-12 01:29:43 +00:00
Port remote read server to 2.0.
This commit is contained in:
parent
0997191b18
commit
ee011d906d
@ -29,7 +29,6 @@ import (
|
||||
"golang.org/x/net/context/ctxhttp"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/util/httputil"
|
||||
)
|
||||
@ -74,8 +73,7 @@ type recoverableError struct {
|
||||
}
|
||||
|
||||
// Store sends a batch of samples to the HTTP endpoint.
|
||||
func (c *Client) Store(samples model.Samples) error {
|
||||
req := ToWriteRequest(samples)
|
||||
func (c *Client) Store(req *prompb.WriteRequest) error {
|
||||
data, err := proto.Marshal(req)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -123,12 +121,7 @@ func (c Client) Name() string {
|
||||
}
|
||||
|
||||
// Read reads from a remote endpoint.
|
||||
func (c *Client) Read(ctx context.Context, from, through int64, matchers []*labels.Matcher) ([]*prompb.TimeSeries, error) {
|
||||
query, err := ToQuery(from, through, matchers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryResult, error) {
|
||||
req := &prompb.ReadRequest{
|
||||
// TODO: Support batching multiple queries into one read request,
|
||||
// as the protobuf interface allows for it.
|
||||
@ -182,5 +175,5 @@ func (c *Client) Read(ctx context.Context, from, through int64, matchers []*labe
|
||||
return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results))
|
||||
}
|
||||
|
||||
return resp.Results[0].Timeseries, nil
|
||||
return resp.Results[0], nil
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
)
|
||||
|
||||
var longErrMessage = strings.Repeat("error message", maxErrMsgLen)
|
||||
@ -65,14 +66,14 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
|
||||
}
|
||||
|
||||
c, err := NewClient(0, &ClientConfig{
|
||||
URL: &config.URL{serverURL},
|
||||
URL: &config.URL{URL: serverURL},
|
||||
Timeout: model.Duration(time.Second),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = c.Store(nil)
|
||||
err = c.Store(&prompb.WriteRequest{})
|
||||
if !reflect.DeepEqual(err, test.err) {
|
||||
t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err)
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
// DecodeReadRequest reads a remote.Request from a http.Request.
|
||||
@ -99,52 +100,166 @@ func ToQuery(from, to int64, matchers []*labels.Matcher) (*prompb.Query, error)
|
||||
}
|
||||
|
||||
// FromQuery unpacks a Query proto.
|
||||
func FromQuery(req *prompb.Query) (model.Time, model.Time, []*labels.Matcher, error) {
|
||||
func FromQuery(req *prompb.Query) (int64, int64, []*labels.Matcher, error) {
|
||||
matchers, err := fromLabelMatchers(req.Matchers)
|
||||
if err != nil {
|
||||
return 0, 0, nil, err
|
||||
}
|
||||
from := model.Time(req.StartTimestampMs)
|
||||
to := model.Time(req.EndTimestampMs)
|
||||
return from, to, matchers, nil
|
||||
return req.StartTimestampMs, req.EndTimestampMs, matchers, nil
|
||||
}
|
||||
|
||||
// ToQueryResult builds a QueryResult proto.
|
||||
func ToQueryResult(matrix model.Matrix) *prompb.QueryResult {
|
||||
func ToQueryResult(ss storage.SeriesSet) (*prompb.QueryResult, error) {
|
||||
resp := &prompb.QueryResult{}
|
||||
for _, ss := range matrix {
|
||||
ts := prompb.TimeSeries{
|
||||
Labels: MetricToLabelProtos(ss.Metric),
|
||||
Samples: make([]*prompb.Sample, 0, len(ss.Values)),
|
||||
}
|
||||
for _, s := range ss.Values {
|
||||
ts.Samples = append(ts.Samples, &prompb.Sample{
|
||||
Value: float64(s.Value),
|
||||
Timestamp: int64(s.Timestamp),
|
||||
for ss.Next() {
|
||||
series := ss.At()
|
||||
iter := series.Iterator()
|
||||
samples := []*prompb.Sample{}
|
||||
|
||||
for iter.Next() {
|
||||
ts, val := iter.At()
|
||||
samples = append(samples, &prompb.Sample{
|
||||
Timestamp: ts,
|
||||
Value: val,
|
||||
})
|
||||
}
|
||||
resp.Timeseries = append(resp.Timeseries, &ts)
|
||||
if err := iter.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp.Timeseries = append(resp.Timeseries, &prompb.TimeSeries{
|
||||
Labels: labelsToLabelsProto(series.Labels()),
|
||||
Samples: samples,
|
||||
})
|
||||
}
|
||||
return resp
|
||||
if err := ss.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// FromQueryResult unpacks a QueryResult proto.
|
||||
func FromQueryResult(resp *prompb.QueryResult) model.Matrix {
|
||||
m := make(model.Matrix, 0, len(resp.Timeseries))
|
||||
for _, ts := range resp.Timeseries {
|
||||
var ss model.SampleStream
|
||||
ss.Metric = LabelProtosToMetric(ts.Labels)
|
||||
ss.Values = make([]model.SamplePair, 0, len(ts.Samples))
|
||||
for _, s := range ts.Samples {
|
||||
ss.Values = append(ss.Values, model.SamplePair{
|
||||
Value: model.SampleValue(s.Value),
|
||||
Timestamp: model.Time(s.Timestamp),
|
||||
})
|
||||
func FromQueryResult(res *prompb.QueryResult) storage.SeriesSet {
|
||||
series := make([]storage.Series, 0, len(res.Timeseries))
|
||||
for _, ts := range res.Timeseries {
|
||||
labels := labelProtosToLabels(ts.Labels)
|
||||
if err := validateLabelsAndMetricName(labels); err != nil {
|
||||
return errSeriesSet{err: err}
|
||||
}
|
||||
m = append(m, &ss)
|
||||
}
|
||||
|
||||
return m
|
||||
series = append(series, &concreteSeries{
|
||||
labels: labels,
|
||||
samples: ts.Samples,
|
||||
})
|
||||
}
|
||||
sort.Sort(byLabel(series))
|
||||
return &concreteSeriesSet{
|
||||
series: series,
|
||||
}
|
||||
}
|
||||
|
||||
// errSeriesSet implements storage.SeriesSet, just returning an error.
|
||||
type errSeriesSet struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (errSeriesSet) Next() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (errSeriesSet) At() storage.Series {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e errSeriesSet) Err() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
// concreteSeriesSet implements storage.SeriesSet.
|
||||
type concreteSeriesSet struct {
|
||||
cur int
|
||||
series []storage.Series
|
||||
}
|
||||
|
||||
func (c *concreteSeriesSet) Next() bool {
|
||||
c.cur++
|
||||
return c.cur-1 < len(c.series)
|
||||
}
|
||||
|
||||
func (c *concreteSeriesSet) At() storage.Series {
|
||||
return c.series[c.cur-1]
|
||||
}
|
||||
|
||||
func (c *concreteSeriesSet) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// concreteSeries implementes storage.Series.
|
||||
type concreteSeries struct {
|
||||
labels labels.Labels
|
||||
samples []*prompb.Sample
|
||||
}
|
||||
|
||||
func (c *concreteSeries) Labels() labels.Labels {
|
||||
return c.labels
|
||||
}
|
||||
|
||||
func (c *concreteSeries) Iterator() storage.SeriesIterator {
|
||||
return newConcreteSeriersIterator(c)
|
||||
}
|
||||
|
||||
// concreteSeriesIterator implements storage.SeriesIterator.
|
||||
type concreteSeriesIterator struct {
|
||||
cur int
|
||||
series *concreteSeries
|
||||
}
|
||||
|
||||
func newConcreteSeriersIterator(series *concreteSeries) storage.SeriesIterator {
|
||||
return &concreteSeriesIterator{
|
||||
cur: -1,
|
||||
series: series,
|
||||
}
|
||||
}
|
||||
|
||||
// Seek implements storage.SeriesIterator.
|
||||
func (c *concreteSeriesIterator) Seek(t int64) bool {
|
||||
c.cur = sort.Search(len(c.series.samples), func(n int) bool {
|
||||
return c.series.samples[n].Timestamp >= t
|
||||
})
|
||||
return c.cur < len(c.series.samples)
|
||||
}
|
||||
|
||||
// At implements storage.SeriesIterator.
|
||||
func (c *concreteSeriesIterator) At() (t int64, v float64) {
|
||||
s := c.series.samples[c.cur]
|
||||
return s.Timestamp, s.Value
|
||||
}
|
||||
|
||||
// Next implements storage.SeriesIterator.
|
||||
func (c *concreteSeriesIterator) Next() bool {
|
||||
c.cur++
|
||||
return c.cur < len(c.series.samples)
|
||||
}
|
||||
|
||||
// Err implements storage.SeriesIterator.
|
||||
func (c *concreteSeriesIterator) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateLabelsAndMetricName validates the label names/values and metric names returned from remote read.
|
||||
func validateLabelsAndMetricName(ls labels.Labels) error {
|
||||
for _, l := range ls {
|
||||
if l.Name == labels.MetricName && !model.IsValidMetricName(model.LabelValue(l.Value)) {
|
||||
return fmt.Errorf("Invalid metric name: %v", l.Value)
|
||||
}
|
||||
if !model.LabelName(l.Name).IsValid() {
|
||||
return fmt.Errorf("Invalid label name: %v", l.Name)
|
||||
}
|
||||
if !model.LabelValue(l.Value).IsValid() {
|
||||
return fmt.Errorf("Invalid label value: %v", l.Value)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) {
|
||||
@ -199,14 +314,14 @@ func fromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro
|
||||
|
||||
// MetricToLabelProtos builds a []*prompb.Label from a model.Metric
|
||||
func MetricToLabelProtos(metric model.Metric) []*prompb.Label {
|
||||
labelPairs := make([]*prompb.Label, 0, len(metric))
|
||||
labels := make([]*prompb.Label, 0, len(metric))
|
||||
for k, v := range metric {
|
||||
labelPairs = append(labelPairs, &prompb.Label{
|
||||
labels = append(labels, &prompb.Label{
|
||||
Name: string(k),
|
||||
Value: string(v),
|
||||
})
|
||||
}
|
||||
return labelPairs
|
||||
return labels
|
||||
}
|
||||
|
||||
// LabelProtosToMetric unpack a []*prompb.Label to a model.Metric
|
||||
@ -230,6 +345,17 @@ func labelProtosToLabels(labelPairs []*prompb.Label) labels.Labels {
|
||||
return result
|
||||
}
|
||||
|
||||
func labelsToLabelsProto(labels labels.Labels) []*prompb.Label {
|
||||
result := make([]*prompb.Label, 0, len(labels))
|
||||
for _, l := range labels {
|
||||
result = append(result, &prompb.Label{
|
||||
Name: l.Name,
|
||||
Value: l.Value,
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func labelsToMetric(ls labels.Labels) model.Metric {
|
||||
metric := make(model.Metric, len(ls))
|
||||
for _, l := range ls {
|
||||
|
113
storage/remote/codec_test.go
Normal file
113
storage/remote/codec_test.go
Normal file
@ -0,0 +1,113 @@
|
||||
package remote
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
func TestValidateLabelsAndMetricName(t *testing.T) {
|
||||
tests := []struct {
|
||||
input labels.Labels
|
||||
expectedErr string
|
||||
shouldPass bool
|
||||
}{
|
||||
{
|
||||
input: labels.FromStrings(
|
||||
"__name__", "name",
|
||||
"labelName", "labelValue",
|
||||
),
|
||||
expectedErr: "",
|
||||
shouldPass: true,
|
||||
},
|
||||
{
|
||||
input: labels.FromStrings(
|
||||
"__name__", "name",
|
||||
"_labelName", "labelValue",
|
||||
),
|
||||
expectedErr: "",
|
||||
shouldPass: true,
|
||||
},
|
||||
{
|
||||
input: labels.FromStrings(
|
||||
"__name__", "name",
|
||||
"@labelName", "labelValue",
|
||||
),
|
||||
expectedErr: "Invalid label name: @labelName",
|
||||
shouldPass: false,
|
||||
},
|
||||
{
|
||||
input: labels.FromStrings(
|
||||
"__name__", "name",
|
||||
"123labelName", "labelValue",
|
||||
),
|
||||
expectedErr: "Invalid label name: 123labelName",
|
||||
shouldPass: false,
|
||||
},
|
||||
{
|
||||
input: labels.FromStrings(
|
||||
"__name__", "name",
|
||||
"", "labelValue",
|
||||
),
|
||||
expectedErr: "Invalid label name: ",
|
||||
shouldPass: false,
|
||||
},
|
||||
{
|
||||
input: labels.FromStrings(
|
||||
"__name__", "name",
|
||||
"labelName", string([]byte{0xff}),
|
||||
),
|
||||
expectedErr: "Invalid label value: " + string([]byte{0xff}),
|
||||
shouldPass: false,
|
||||
},
|
||||
{
|
||||
input: labels.FromStrings(
|
||||
"__name__", "@invalid_name",
|
||||
),
|
||||
expectedErr: "Invalid metric name: @invalid_name",
|
||||
shouldPass: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
err := validateLabelsAndMetricName(test.input)
|
||||
if test.shouldPass != (err == nil) {
|
||||
if test.shouldPass {
|
||||
t.Fatalf("Test should pass, got unexpected error: %v", err)
|
||||
} else {
|
||||
t.Fatalf("Test should fail, unexpected error, got: %v, expected: %v", err, test.expectedErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcreteSeriesSet(t *testing.T) {
|
||||
series1 := &concreteSeries{
|
||||
labels: labels.FromStrings("foo", "bar"),
|
||||
samples: []*prompb.Sample{&prompb.Sample{Value: 1, Timestamp: 2}},
|
||||
}
|
||||
series2 := &concreteSeries{
|
||||
labels: labels.FromStrings("foo", "baz"),
|
||||
samples: []*prompb.Sample{&prompb.Sample{Value: 3, Timestamp: 4}},
|
||||
}
|
||||
c := &concreteSeriesSet{
|
||||
series: []storage.Series{series1, series2},
|
||||
}
|
||||
if !c.Next() {
|
||||
t.Fatalf("Expected Next() to be true.")
|
||||
}
|
||||
if c.At() != series1 {
|
||||
t.Fatalf("Unexpected series returned.")
|
||||
}
|
||||
if !c.Next() {
|
||||
t.Fatalf("Expected Next() to be true.")
|
||||
}
|
||||
if c.At() != series2 {
|
||||
t.Fatalf("Unexpected series returned.")
|
||||
}
|
||||
if c.Next() {
|
||||
t.Fatalf("Expected Next() to be false.")
|
||||
}
|
||||
}
|
@ -25,6 +25,7 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/relabel"
|
||||
)
|
||||
|
||||
@ -128,7 +129,7 @@ func init() {
|
||||
// external timeseries database.
|
||||
type StorageClient interface {
|
||||
// Store stores the given samples in the remote storage.
|
||||
Store(model.Samples) error
|
||||
Store(*prompb.WriteRequest) error
|
||||
// Name identifies the remote storage implementation.
|
||||
Name() string
|
||||
}
|
||||
@ -466,7 +467,8 @@ func (s *shards) sendSamplesWithBackoff(samples model.Samples) {
|
||||
backoff := s.qm.cfg.MinBackoff
|
||||
for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- {
|
||||
begin := time.Now()
|
||||
err := s.qm.client.Store(samples)
|
||||
req := ToWriteRequest(samples)
|
||||
err := s.qm.client.Store(req)
|
||||
|
||||
sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds())
|
||||
if err == nil {
|
||||
|
@ -15,6 +15,7 @@ package remote
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
@ -22,19 +23,20 @@ import (
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
)
|
||||
|
||||
type TestStorageClient struct {
|
||||
receivedSamples map[string]model.Samples
|
||||
expectedSamples map[string]model.Samples
|
||||
receivedSamples map[string][]*prompb.Sample
|
||||
expectedSamples map[string][]*prompb.Sample
|
||||
wg sync.WaitGroup
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
func NewTestStorageClient() *TestStorageClient {
|
||||
return &TestStorageClient{
|
||||
receivedSamples: map[string]model.Samples{},
|
||||
expectedSamples: map[string]model.Samples{},
|
||||
receivedSamples: map[string][]*prompb.Sample{},
|
||||
expectedSamples: map[string][]*prompb.Sample{},
|
||||
}
|
||||
}
|
||||
|
||||
@ -43,8 +45,11 @@ func (c *TestStorageClient) expectSamples(ss model.Samples) {
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
for _, s := range ss {
|
||||
ts := s.Metric.String()
|
||||
c.expectedSamples[ts] = append(c.expectedSamples[ts], s)
|
||||
ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String()
|
||||
c.expectedSamples[ts] = append(c.expectedSamples[ts], &prompb.Sample{
|
||||
Timestamp: int64(s.Timestamp),
|
||||
Value: float64(s.Value),
|
||||
})
|
||||
}
|
||||
c.wg.Add(len(ss))
|
||||
}
|
||||
@ -55,23 +60,24 @@ func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
for ts, expectedSamples := range c.expectedSamples {
|
||||
for i, expected := range expectedSamples {
|
||||
if !expected.Equal(c.receivedSamples[ts][i]) {
|
||||
t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[ts][i])
|
||||
}
|
||||
if !reflect.DeepEqual(expectedSamples, c.receivedSamples[ts]) {
|
||||
t.Fatalf("%s: Expected %v, got %v", ts, expectedSamples, c.receivedSamples[ts])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TestStorageClient) Store(ss model.Samples) error {
|
||||
func (c *TestStorageClient) Store(req *prompb.WriteRequest) error {
|
||||
c.mtx.Lock()
|
||||
defer c.mtx.Unlock()
|
||||
|
||||
for _, s := range ss {
|
||||
ts := s.Metric.String()
|
||||
c.receivedSamples[ts] = append(c.receivedSamples[ts], s)
|
||||
count := 0
|
||||
for _, ts := range req.Timeseries {
|
||||
labels := labelProtosToLabels(ts.Labels).String()
|
||||
for _, sample := range ts.Samples {
|
||||
count++
|
||||
c.receivedSamples[labels] = append(c.receivedSamples[labels], sample)
|
||||
}
|
||||
}
|
||||
c.wg.Add(-len(ss))
|
||||
c.wg.Add(-count)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -162,7 +168,7 @@ func NewTestBlockedStorageClient() *TestBlockingStorageClient {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TestBlockingStorageClient) Store(s model.Samples) error {
|
||||
func (c *TestBlockingStorageClient) Store(_ *prompb.WriteRequest) error {
|
||||
atomic.AddUint64(&c.numCalls, 1)
|
||||
<-c.block
|
||||
return nil
|
||||
|
@ -15,17 +15,14 @@ package remote
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
// Querier returns a new Querier on the storage.
|
||||
func (r *Storage) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
func (r *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
|
||||
@ -47,6 +44,7 @@ func (r *Storage) Querier(_ context.Context, mint, maxt int64) (storage.Querier,
|
||||
}
|
||||
}
|
||||
queriers = append(queriers, &querier{
|
||||
ctx: ctx,
|
||||
mint: mint,
|
||||
maxt: cmaxt,
|
||||
client: c,
|
||||
@ -61,6 +59,7 @@ var newMergeQueriers = storage.NewMergeQuerier
|
||||
|
||||
// Querier is an adapter to make a Client usable as a storage.Querier.
|
||||
type querier struct {
|
||||
ctx context.Context
|
||||
mint, maxt int64
|
||||
client *Client
|
||||
externalLabels model.LabelSet
|
||||
@ -69,28 +68,20 @@ type querier struct {
|
||||
// Select returns a set of series that matches the given label matchers.
|
||||
func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet {
|
||||
m, added := q.addExternalLabels(matchers)
|
||||
res, err := q.client.Read(context.TODO(), q.mint, q.maxt, m)
|
||||
|
||||
query, err := ToQuery(q.mint, q.maxt, m)
|
||||
if err != nil {
|
||||
return errSeriesSet{err: err}
|
||||
}
|
||||
|
||||
series := make([]storage.Series, 0, len(res))
|
||||
for _, ts := range res {
|
||||
labels := labelProtosToLabels(ts.Labels)
|
||||
removeLabels(&labels, added)
|
||||
if err := validateLabelsAndMetricName(labels); err != nil {
|
||||
return errSeriesSet{err: err}
|
||||
}
|
||||
res, err := q.client.Read(q.ctx, query)
|
||||
if err != nil {
|
||||
return errSeriesSet{err: err}
|
||||
}
|
||||
|
||||
series = append(series, &concreteSeries{
|
||||
labels: labels,
|
||||
samples: ts.Samples,
|
||||
})
|
||||
}
|
||||
sort.Sort(byLabel(series))
|
||||
return &concreteSeriesSet{
|
||||
series: series,
|
||||
}
|
||||
seriesSet := FromQueryResult(res)
|
||||
|
||||
return newSeriesSetFilter(seriesSet, added)
|
||||
}
|
||||
|
||||
type byLabel []storage.Series
|
||||
@ -110,110 +101,6 @@ func (q *querier) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// errSeriesSet implements storage.SeriesSet, just returning an error.
|
||||
type errSeriesSet struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (errSeriesSet) Next() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (errSeriesSet) At() storage.Series {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e errSeriesSet) Err() error {
|
||||
return e.err
|
||||
}
|
||||
|
||||
// concreteSeriesSet implements storage.SeriesSet.
|
||||
type concreteSeriesSet struct {
|
||||
cur int
|
||||
series []storage.Series
|
||||
}
|
||||
|
||||
func (c *concreteSeriesSet) Next() bool {
|
||||
c.cur++
|
||||
return c.cur-1 < len(c.series)
|
||||
}
|
||||
|
||||
func (c *concreteSeriesSet) At() storage.Series {
|
||||
return c.series[c.cur-1]
|
||||
}
|
||||
|
||||
func (c *concreteSeriesSet) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// concreteSeries implementes storage.Series.
|
||||
type concreteSeries struct {
|
||||
labels labels.Labels
|
||||
samples []*prompb.Sample
|
||||
}
|
||||
|
||||
func (c *concreteSeries) Labels() labels.Labels {
|
||||
return c.labels
|
||||
}
|
||||
|
||||
func (c *concreteSeries) Iterator() storage.SeriesIterator {
|
||||
return newConcreteSeriersIterator(c)
|
||||
}
|
||||
|
||||
// concreteSeriesIterator implements storage.SeriesIterator.
|
||||
type concreteSeriesIterator struct {
|
||||
cur int
|
||||
series *concreteSeries
|
||||
}
|
||||
|
||||
func newConcreteSeriersIterator(series *concreteSeries) storage.SeriesIterator {
|
||||
return &concreteSeriesIterator{
|
||||
cur: -1,
|
||||
series: series,
|
||||
}
|
||||
}
|
||||
|
||||
// Seek implements storage.SeriesIterator.
|
||||
func (c *concreteSeriesIterator) Seek(t int64) bool {
|
||||
c.cur = sort.Search(len(c.series.samples), func(n int) bool {
|
||||
return c.series.samples[n].Timestamp >= t
|
||||
})
|
||||
return c.cur < len(c.series.samples)
|
||||
}
|
||||
|
||||
// At implements storage.SeriesIterator.
|
||||
func (c *concreteSeriesIterator) At() (t int64, v float64) {
|
||||
s := c.series.samples[c.cur]
|
||||
return s.Timestamp, s.Value
|
||||
}
|
||||
|
||||
// Next implements storage.SeriesIterator.
|
||||
func (c *concreteSeriesIterator) Next() bool {
|
||||
c.cur++
|
||||
return c.cur < len(c.series.samples)
|
||||
}
|
||||
|
||||
// Err implements storage.SeriesIterator.
|
||||
func (c *concreteSeriesIterator) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateLabelsAndMetricName validates the label names/values and metric names returned from remote read.
|
||||
func validateLabelsAndMetricName(ls labels.Labels) error {
|
||||
for _, l := range ls {
|
||||
if l.Name == labels.MetricName && !model.IsValidMetricName(model.LabelValue(l.Value)) {
|
||||
return fmt.Errorf("Invalid metric name: %v", l.Value)
|
||||
}
|
||||
if !model.LabelName(l.Name).IsValid() {
|
||||
return fmt.Errorf("Invalid label name: %v", l.Name)
|
||||
}
|
||||
if !model.LabelValue(l.Value).IsValid() {
|
||||
return fmt.Errorf("Invalid label value: %v", l.Value)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// addExternalLabels adds matchers for each external label. External labels
|
||||
// that already have a corresponding user-supplied matcher are skipped, as we
|
||||
// assume that the user explicitly wants to select a different value for them.
|
||||
@ -240,12 +127,38 @@ func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Match
|
||||
return matchers, el
|
||||
}
|
||||
|
||||
func removeLabels(l *labels.Labels, toDelete model.LabelSet) {
|
||||
for i := 0; i < len(*l); {
|
||||
if _, ok := toDelete[model.LabelName((*l)[i].Name)]; ok {
|
||||
*l = (*l)[:i+copy((*l)[i:], (*l)[i+1:])]
|
||||
} else {
|
||||
i++
|
||||
}
|
||||
func newSeriesSetFilter(ss storage.SeriesSet, toFilter model.LabelSet) storage.SeriesSet {
|
||||
return &seriesSetFilter{
|
||||
SeriesSet: ss,
|
||||
toFilter: toFilter,
|
||||
}
|
||||
}
|
||||
|
||||
type seriesSetFilter struct {
|
||||
storage.SeriesSet
|
||||
toFilter model.LabelSet
|
||||
}
|
||||
|
||||
func (ssf seriesSetFilter) At() storage.Series {
|
||||
return seriesFilter{
|
||||
Series: ssf.SeriesSet.At(),
|
||||
toFilter: ssf.toFilter,
|
||||
}
|
||||
}
|
||||
|
||||
type seriesFilter struct {
|
||||
storage.Series
|
||||
toFilter model.LabelSet
|
||||
}
|
||||
|
||||
func (sf seriesFilter) Labels() labels.Labels {
|
||||
labels := sf.Series.Labels()
|
||||
for i := 0; i < len(labels); {
|
||||
if _, ok := sf.toFilter[model.LabelName(labels[i].Name)]; ok {
|
||||
labels = labels[:i+copy(labels[i:], labels[i+1:])]
|
||||
continue
|
||||
}
|
||||
i++
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
@ -27,128 +27,6 @@ import (
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
)
|
||||
|
||||
func TestValidateLabelsAndMetricName(t *testing.T) {
|
||||
tests := []struct {
|
||||
result model.Matrix
|
||||
expectedErr string
|
||||
shouldPass bool
|
||||
}{
|
||||
{
|
||||
result: model.Matrix{
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
"__name__": "name",
|
||||
"labelName": "labelValue",
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedErr: "",
|
||||
shouldPass: true,
|
||||
},
|
||||
{
|
||||
result: model.Matrix{
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
"__name__": "name",
|
||||
"_labelName": "labelValue",
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedErr: "",
|
||||
shouldPass: true,
|
||||
},
|
||||
{
|
||||
result: model.Matrix{
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
"__name__": "name",
|
||||
"@labelName": "labelValue",
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedErr: "Invalid label name: @labelName",
|
||||
shouldPass: false,
|
||||
},
|
||||
{
|
||||
result: model.Matrix{
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
"__name__": "name",
|
||||
"123labelName": "labelValue",
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedErr: "Invalid label name: 123labelName",
|
||||
shouldPass: false,
|
||||
},
|
||||
{
|
||||
result: model.Matrix{
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
"__name__": "name",
|
||||
"": "labelValue",
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedErr: "Invalid label name: ",
|
||||
shouldPass: false,
|
||||
},
|
||||
{
|
||||
result: model.Matrix{
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
"__name__": "name",
|
||||
"labelName": model.LabelValue([]byte{0xff}),
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedErr: "Invalid label value: " + string([]byte{0xff}),
|
||||
shouldPass: false,
|
||||
},
|
||||
{
|
||||
result: model.Matrix{
|
||||
&model.SampleStream{
|
||||
Metric: model.Metric{
|
||||
"__name__": "@invalid_name",
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedErr: "Invalid metric name: @invalid_name",
|
||||
shouldPass: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
var err error
|
||||
for _, ss := range test.result {
|
||||
ls := make(labels.Labels, 0, len(ss.Metric))
|
||||
for k, v := range ss.Metric {
|
||||
ls = append(ls, labels.Label{
|
||||
Name: string(k),
|
||||
Value: string(v),
|
||||
})
|
||||
}
|
||||
err = validateLabelsAndMetricName(ls)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if test.shouldPass {
|
||||
if err != nil {
|
||||
t.Fatalf("Test should pass, got unexpected error: %v", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
if err.Error() != test.expectedErr {
|
||||
t.Fatalf("Unexpected error, got: %v, expected: %v", err, test.expectedErr)
|
||||
}
|
||||
} else {
|
||||
t.Fatalf("Expected error, got none")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher {
|
||||
m, err := labels.NewMatcher(mt, name, val)
|
||||
if err != nil {
|
||||
@ -222,53 +100,36 @@ func TestAddExternalLabels(t *testing.T) {
|
||||
|
||||
func TestRemoveLabels(t *testing.T) {
|
||||
tests := []struct {
|
||||
in labels.Labels
|
||||
out labels.Labels
|
||||
in *prompb.QueryResult
|
||||
toRemove model.LabelSet
|
||||
|
||||
expected *prompb.QueryResult
|
||||
}{
|
||||
{
|
||||
toRemove: model.LabelSet{"foo": "bar"},
|
||||
in: labels.FromStrings("foo", "bar", "a", "b"),
|
||||
out: labels.FromStrings("a", "b"),
|
||||
in: &prompb.QueryResult{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
{Labels: labelsToLabelsProto(labels.FromStrings("foo", "bar", "a", "b")), Samples: []*prompb.Sample{}},
|
||||
},
|
||||
},
|
||||
expected: &prompb.QueryResult{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
{Labels: labelsToLabelsProto(labels.FromStrings("a", "b")), Samples: []*prompb.Sample{}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for i, test := range tests {
|
||||
in := test.in.Copy()
|
||||
removeLabels(&in, test.toRemove)
|
||||
|
||||
if !reflect.DeepEqual(in, test.out) {
|
||||
t.Fatalf("%d. unexpected labels; want %v, got %v", i, test.out, in)
|
||||
for i, tc := range tests {
|
||||
filtered := newSeriesSetFilter(FromQueryResult(tc.in), tc.toRemove)
|
||||
have, err := ToQueryResult(filtered)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcreteSeriesSet(t *testing.T) {
|
||||
series1 := &concreteSeries{
|
||||
labels: labels.FromStrings("foo", "bar"),
|
||||
samples: []*prompb.Sample{&prompb.Sample{Value: 1, Timestamp: 2}},
|
||||
}
|
||||
series2 := &concreteSeries{
|
||||
labels: labels.FromStrings("foo", "baz"),
|
||||
samples: []*prompb.Sample{&prompb.Sample{Value: 3, Timestamp: 4}},
|
||||
}
|
||||
c := &concreteSeriesSet{
|
||||
series: []storage.Series{series1, series2},
|
||||
}
|
||||
if !c.Next() {
|
||||
t.Fatalf("Expected Next() to be true.")
|
||||
}
|
||||
if c.At() != series1 {
|
||||
t.Fatalf("Unexpected series returned.")
|
||||
}
|
||||
if !c.Next() {
|
||||
t.Fatalf("Expected Next() to be true.")
|
||||
}
|
||||
if c.At() != series2 {
|
||||
t.Fatalf("Unexpected series returned.")
|
||||
}
|
||||
if c.Next() {
|
||||
t.Fatalf("Expected Next() to be false.")
|
||||
if !reflect.DeepEqual(have, tc.expected) {
|
||||
t.Fatalf("%d. unexpected labels; want %v, got %v", i, tc.expected, have)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -31,9 +31,11 @@ import (
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/retrieval"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
"github.com/prometheus/prometheus/util/httputil"
|
||||
)
|
||||
|
||||
@ -161,6 +163,7 @@ func (api *API) Register(r *route.Router) {
|
||||
r.Get("/alertmanagers", instr("alertmanagers", api.alertmanagers))
|
||||
|
||||
r.Get("/status/config", instr("config", api.serveConfig))
|
||||
r.Post("/read", api.ready(prometheus.InstrumentHandler("read", http.HandlerFunc(api.remoteRead))))
|
||||
}
|
||||
|
||||
type queryData struct {
|
||||
@ -451,6 +454,80 @@ func (api *API) serveConfig(r *http.Request) (interface{}, *apiError) {
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
|
||||
req, err := remote.DecodeReadRequest(r)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
resp := prompb.ReadResponse{
|
||||
Results: make([]*prompb.QueryResult, len(req.Queries)),
|
||||
}
|
||||
for i, query := range req.Queries {
|
||||
from, through, matchers, err := remote.FromQuery(query)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
querier, err := api.Queryable.Querier(r.Context(), from, through)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer querier.Close()
|
||||
|
||||
// Change equality matchers which match external labels
|
||||
// to a matcher that looks for an empty label,
|
||||
// as that label should not be present in the storage.
|
||||
externalLabels := api.config().GlobalConfig.ExternalLabels.Clone()
|
||||
filteredMatchers := make([]*labels.Matcher, 0, len(matchers))
|
||||
for _, m := range matchers {
|
||||
value := externalLabels[model.LabelName(m.Name)]
|
||||
if m.Type == labels.MatchEqual && value == model.LabelValue(m.Value) {
|
||||
matcher, err := labels.NewMatcher(labels.MatchEqual, m.Name, "")
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
filteredMatchers = append(filteredMatchers, matcher)
|
||||
} else {
|
||||
filteredMatchers = append(filteredMatchers, m)
|
||||
}
|
||||
}
|
||||
|
||||
resp.Results[i], err = remote.ToQueryResult(querier.Select(filteredMatchers...))
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Add external labels back in.
|
||||
for _, ts := range resp.Results[i].Timeseries {
|
||||
globalUsed := map[string]struct{}{}
|
||||
for _, l := range ts.Labels {
|
||||
if _, ok := externalLabels[model.LabelName(l.Name)]; ok {
|
||||
globalUsed[l.Name] = struct{}{}
|
||||
}
|
||||
}
|
||||
for ln, lv := range externalLabels {
|
||||
if _, ok := globalUsed[string(ln)]; !ok {
|
||||
ts.Labels = append(ts.Labels, &prompb.Label{
|
||||
Name: string(ln),
|
||||
Value: string(lv),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := remote.EncodeReadResponse(&resp, w); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func respond(w http.ResponseWriter, data interface{}) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
@ -14,6 +14,7 @@
|
||||
package v1
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
@ -26,14 +27,19 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/prometheus/common/model"
|
||||
"github.com/prometheus/common/route"
|
||||
"github.com/weaveworks/common/test"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/pkg/labels"
|
||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||
"github.com/prometheus/prometheus/prompb"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/retrieval"
|
||||
"github.com/prometheus/prometheus/storage/remote"
|
||||
)
|
||||
|
||||
type targetRetrieverFunc func() []*retrieval.Target
|
||||
@ -476,6 +482,103 @@ func TestEndpoints(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadEndpoint(t *testing.T) {
|
||||
suite, err := promql.NewTest(t, `
|
||||
load 1m
|
||||
test_metric1{foo="bar",baz="qux"} 1
|
||||
`)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer suite.Close()
|
||||
|
||||
if err := suite.Run(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
api := &API{
|
||||
Queryable: suite.Storage(),
|
||||
QueryEngine: suite.QueryEngine(),
|
||||
config: func() config.Config {
|
||||
return config.Config{
|
||||
GlobalConfig: config.GlobalConfig{
|
||||
ExternalLabels: model.LabelSet{
|
||||
"baz": "a",
|
||||
"b": "c",
|
||||
"d": "e",
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
// Encode the request.
|
||||
matcher1, err := labels.NewMatcher(labels.MatchEqual, "__name__", "test_metric1")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
matcher2, err := labels.NewMatcher(labels.MatchEqual, "d", "e")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
query, err := remote.ToQuery(0, 1, []*labels.Matcher{matcher1, matcher2})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req := &prompb.ReadRequest{Queries: []*prompb.Query{query}}
|
||||
data, err := proto.Marshal(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
compressed := snappy.Encode(nil, data)
|
||||
request, err := http.NewRequest("POST", "", bytes.NewBuffer(compressed))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
recorder := httptest.NewRecorder()
|
||||
api.remoteRead(recorder, request)
|
||||
|
||||
// Decode the response.
|
||||
compressed, err = ioutil.ReadAll(recorder.Result().Body)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
uncompressed, err := snappy.Decode(nil, compressed)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var resp prompb.ReadResponse
|
||||
err = proto.Unmarshal(uncompressed, &resp)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(resp.Results) != 1 {
|
||||
t.Fatalf("Expected 1 result, got %d", len(resp.Results))
|
||||
}
|
||||
|
||||
result := resp.Results[0]
|
||||
expected := &prompb.QueryResult{
|
||||
Timeseries: []*prompb.TimeSeries{
|
||||
{
|
||||
Labels: []*prompb.Label{
|
||||
{Name: "__name__", Value: "test_metric1"},
|
||||
{Name: "baz", Value: "qux"},
|
||||
{Name: "foo", Value: "bar"},
|
||||
{Name: "b", Value: "c"},
|
||||
{Name: "d", Value: "e"},
|
||||
},
|
||||
Samples: []*prompb.Sample{{Value: 1, Timestamp: 0}},
|
||||
},
|
||||
},
|
||||
}
|
||||
if !reflect.DeepEqual(result, expected) {
|
||||
t.Fatalf(test.Diff(expected, result))
|
||||
t.Fatalf("Expected response \n%v\n but got \n%v\n", result, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRespondSuccess(t *testing.T) {
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
respond(w, "test")
|
||||
|
Loading…
Reference in New Issue
Block a user