[WIP] Remote Read

This commit is contained in:
Julius Volz 2017-03-10 12:53:27 +01:00
parent 40e41a4776
commit 02395a224d
9 changed files with 714 additions and 28 deletions

View File

@ -34,6 +34,7 @@ import (
"github.com/prometheus/prometheus/retrieval"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/fanin"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/web"
@ -94,12 +95,18 @@ func Main() int {
remoteStorage := &remote.Storage{}
sampleAppender = append(sampleAppender, remoteStorage)
reloadables = append(reloadables, remoteStorage)
remoteReader := &remote.Reader{}
reloadables = append(reloadables, remoteStorage, remoteReader)
queryable := fanin.Queryable{
Local: localStorage,
Remote: remoteReader,
}
var (
notifier = notifier.New(&cfg.notifier)
targetManager = retrieval.NewTargetManager(sampleAppender)
queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine)
queryEngine = promql.NewEngine(queryable, &cfg.queryEngine)
ctx, cancelCtx = context.WithCancel(context.Background())
)

View File

@ -167,6 +167,11 @@ var (
DefaultRemoteWriteConfig = RemoteWriteConfig{
RemoteTimeout: model.Duration(30 * time.Second),
}
// DefaultRemoteReadConfig is the default remote read configuration.
DefaultRemoteReadConfig = RemoteReadConfig{
RemoteTimeout: model.Duration(1 * time.Minute),
}
)
// URL is a custom URL type that allows validation at configuration load time.
@ -205,6 +210,7 @@ type Config struct {
ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"`
RemoteWriteConfigs []*RemoteWriteConfig `yaml:"remote_write,omitempty"`
RemoteReadConfigs []*RemoteReadConfig `yaml:"remote_read,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
@ -1296,7 +1302,7 @@ func (re Regexp) MarshalYAML() (interface{}, error) {
return nil, nil
}
// RemoteWriteConfig is the configuration for remote storage.
// RemoteWriteConfig is the configuration for writing to remote storage.
type RemoteWriteConfig struct {
URL *URL `yaml:"url,omitempty"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
@ -1321,3 +1327,28 @@ func (c *RemoteWriteConfig) UnmarshalYAML(unmarshal func(interface{}) error) err
}
return nil
}
// RemoteReadConfig is the configuration for reading from remote storage.
type RemoteReadConfig struct {
URL *URL `yaml:"url,omitempty"`
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
ProxyURL URL `yaml:"proxy_url,omitempty"`
// Catches all undefined fields and must be empty after parsing.
XXX map[string]interface{} `yaml:",inline"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (c *RemoteReadConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
*c = DefaultRemoteReadConfig
type plain RemoteReadConfig
if err := unmarshal((*plain)(c)); err != nil {
return err
}
if err := checkOverflow(c.XXX, "remote_read"); err != nil {
return err
}
return nil
}

218
storage/fanin/fanin.go Normal file
View File

@ -0,0 +1,218 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fanin
import (
"sort"
"time"
"golang.org/x/net/context"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/remote"
)
// Queryable is a local.Queryable that reads from local and remote storage.
type Queryable struct {
Local promql.Queryable
Remote *remote.Reader
}
// Querier implements local.Queryable.
func (q Queryable) Querier() (local.Querier, error) {
localQuerier, err := q.Local.Querier()
if err != nil {
return nil, err
}
fq := querier{
local: localQuerier,
remotes: q.Remote.Queriers(),
}
return fq, nil
}
type querier struct {
local local.Querier
remotes []local.Querier
}
func (q querier) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
return q.query(func(q local.Querier) ([]local.SeriesIterator, error) {
return q.QueryRange(ctx, from, through, matchers...)
})
}
func (q querier) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
return q.query(func(q local.Querier) ([]local.SeriesIterator, error) {
return q.QueryInstant(ctx, ts, stalenessDelta, matchers...)
})
}
func (q querier) query(qFn func(q local.Querier) ([]local.SeriesIterator, error)) ([]local.SeriesIterator, error) {
localIts, err := qFn(q.local)
if err != nil {
return nil, err
}
if len(q.remotes) == 0 {
// Skip merge logic if there are no remote queriers.
return localIts, nil
}
fpToIt := map[model.Fingerprint]*mergeIterator{}
for _, it := range localIts {
fp := it.Metric().Metric.Fingerprint()
fpToIt[fp] = &mergeIterator{local: it}
}
for _, q := range q.remotes {
its, err := qFn(q)
if err != nil {
return nil, err
}
mergeIterators(fpToIt, its)
}
its := make([]local.SeriesIterator, 0, len(fpToIt))
for _, it := range fpToIt {
its = append(its, it)
}
return its, nil
}
func (q querier) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) {
// TODO: implement querying metrics from remote storage.
return q.local.MetricsForLabelMatchers(ctx, from, through, matcherSets...)
}
func (q querier) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
// TODO: implement querying last samples from remote storage.
return q.local.LastSampleForLabelMatchers(ctx, cutoff, matcherSets...)
}
func (q querier) LabelValuesForLabelName(ctx context.Context, ln model.LabelName) (model.LabelValues, error) {
// TODO: implement querying label values from remote storage.
return q.local.LabelValuesForLabelName(ctx, ln)
}
func (q querier) Close() error {
for _, q := range append([]local.Querier{q.local}, q.remotes...) {
if err := q.Close(); err != nil {
return err
}
}
return nil
}
// mergeIterator is a SeriesIterator which merges query results for local and remote
// SeriesIterators. If a series has samples in a local iterator, remote samples are
// only considered before the first local sample of a series. This is to avoid things
// like downsampling on the side of the remote storage to interfere with rate(),
// irate(), etc.
type mergeIterator struct {
local local.SeriesIterator
remote []local.SeriesIterator
}
func (mit mergeIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair {
latest := model.ZeroSamplePair
for _, it := range append(mit.remote, mit.local) {
if it == nil {
// There might not be a local iterator for every remote series.
continue
}
v := it.ValueAtOrBeforeTime(t)
if v.Timestamp.After(latest.Timestamp) {
latest = v
}
}
return latest
}
func (mit mergeIterator) RangeValues(interval metric.Interval) []model.SamplePair {
remoteCutoff := model.Earliest
var values []model.SamplePair
if mit.local != nil {
values = mit.local.RangeValues(interval)
if len(values) > 0 {
remoteCutoff = values[0].Timestamp
}
}
for _, it := range mit.remote {
vs := it.RangeValues(interval)
n := sort.Search(len(vs), func(i int) bool {
return !vs[i].Timestamp.Before(remoteCutoff)
})
values = mergeSamples(values, vs[:n])
}
return values
}
func (mit mergeIterator) Metric() metric.Metric {
if mit.local != nil {
return mit.local.Metric()
}
// If there is no local iterator, there has to be at least one remote one in
// order for this iterator to have been created.
return mit.remote[0].Metric()
}
func (mit mergeIterator) Close() {
mit.local.Close()
for _, it := range mit.remote {
it.Close()
}
}
func mergeIterators(fpToIt map[model.Fingerprint]*mergeIterator, its []local.SeriesIterator) {
for _, it := range its {
fp := it.Metric().Metric.Fingerprint()
if fpIts, ok := fpToIt[fp]; !ok {
fpToIt[fp] = &mergeIterator{remote: []local.SeriesIterator{it}}
} else {
fpToIt[fp].remote = append(fpIts.remote, it)
}
}
}
// mergeSamples merges two lists of sample pairs and removes duplicate
// timestamps. It assumes that both lists are sorted by timestamp.
func mergeSamples(a, b []model.SamplePair) []model.SamplePair {
result := make([]model.SamplePair, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].Timestamp < b[j].Timestamp {
result = append(result, a[i])
i++
} else if a[i].Timestamp > b[j].Timestamp {
result = append(result, b[j])
j++
} else {
result = append(result, a[i])
i++
j++
}
}
result = append(result, a[i:]...)
result = append(result, b[i:]...)
return result
}

View File

@ -16,6 +16,7 @@ package remote
import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"time"
@ -26,20 +27,29 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/util/httputil"
)
// Client allows sending batches of Prometheus samples to an HTTP endpoint.
// Client allows reading and writing from/to a remote HTTP endpoint.
type Client struct {
index int // Used to differentiate metrics.
url config.URL
url *config.URL
client *http.Client
timeout time.Duration
}
type clientConfig struct {
url *config.URL
tlsConfig config.TLSConfig
proxyURL *config.URL
basicAuth *config.BasicAuth
timeout model.Duration
}
// NewClient creates a new Client.
func NewClient(index int, conf *config.RemoteWriteConfig) (*Client, error) {
tlsConfig, err := httputil.NewTLSConfig(conf.TLSConfig)
func NewClient(index int, conf *clientConfig) (*Client, error) {
tlsConfig, err := httputil.NewTLSConfig(conf.tlsConfig)
if err != nil {
return nil, err
}
@ -47,19 +57,19 @@ func NewClient(index int, conf *config.RemoteWriteConfig) (*Client, error) {
// The only timeout we care about is the configured push timeout.
// It is applied on request. So we leave out any timings here.
var rt http.RoundTripper = &http.Transport{
Proxy: http.ProxyURL(conf.ProxyURL.URL),
Proxy: http.ProxyURL(conf.proxyURL.URL),
TLSClientConfig: tlsConfig,
}
if conf.BasicAuth != nil {
rt = httputil.NewBasicAuthRoundTripper(conf.BasicAuth.Username, conf.BasicAuth.Password, rt)
if conf.basicAuth != nil {
rt = httputil.NewBasicAuthRoundTripper(conf.basicAuth.Username, conf.basicAuth.Password, rt)
}
return &Client{
index: index,
url: *conf.URL,
url: conf.url,
client: httputil.NewClient(rt),
timeout: time.Duration(conf.RemoteTimeout),
timeout: time.Duration(conf.timeout),
}, nil
}
@ -120,3 +130,102 @@ func (c *Client) Store(samples model.Samples) error {
func (c Client) Name() string {
return fmt.Sprintf("%d:%s", c.index, c.url)
}
// Read reads from a remote endpoint.
func (c *Client) Read(ctx context.Context, from, through model.Time, matchers metric.LabelMatchers) (model.Matrix, error) {
req := &ReadRequest{
// TODO: Support batching multiple queries into one read request,
// as the protobuf interface allows for it.
Queries: []*Query{{
StartTimestampMs: int64(from),
EndTimestampMs: int64(through),
Matchers: labelMatchersToProto(matchers),
}},
}
data, err := proto.Marshal(req)
if err != nil {
return nil, fmt.Errorf("unable to marshal read request: %v", err)
}
buf := bytes.Buffer{}
if _, err := snappy.NewWriter(&buf).Write(data); err != nil {
return nil, err
}
httpReq, err := http.NewRequest("POST", c.url.String(), &buf)
if err != nil {
return nil, fmt.Errorf("unable to create request: %v", err)
}
httpResp, err := ctxhttp.Do(ctx, c.client, httpReq)
if err != nil {
return nil, fmt.Errorf("error sending request: %v", err)
}
defer httpResp.Body.Close()
if httpResp.StatusCode/100 != 2 {
return nil, fmt.Errorf("server returned HTTP status %s", httpResp.Status)
}
if data, err = ioutil.ReadAll(snappy.NewReader(httpResp.Body)); err != nil {
return nil, fmt.Errorf("error reading response: %v", err)
}
var resp ReadResponse
err = proto.Unmarshal(data, &resp)
if err != nil {
return nil, fmt.Errorf("unable to unmarshal response body: %v", err)
}
return matrixFromProto(resp.Timeseries), nil
}
func labelMatchersToProto(matchers metric.LabelMatchers) []*LabelMatcher {
pbMatchers := make([]*LabelMatcher, 0, len(matchers))
for _, m := range matchers {
var mType MatchType
switch m.Type {
case metric.Equal:
mType = MatchType_EQUAL
case metric.NotEqual:
mType = MatchType_NOT_EQUAL
case metric.RegexMatch:
mType = MatchType_REGEX_MATCH
case metric.RegexNoMatch:
mType = MatchType_REGEX_NO_MATCH
default:
panic("invalid matcher type")
}
pbMatchers = append(pbMatchers, &LabelMatcher{
Type: mType,
Name: string(m.Name),
Value: string(m.Value),
})
}
return pbMatchers
}
func matrixFromProto(seriesSet []*TimeSeries) model.Matrix {
m := make(model.Matrix, 0, len(seriesSet))
for _, ts := range seriesSet {
var ss model.SampleStream
ss.Metric = labelPairsToMetric(ts.Labels)
ss.Values = make([]model.SamplePair, 0, len(ts.Samples))
for _, s := range ts.Samples {
ss.Values = append(ss.Values, model.SamplePair{
Value: model.SampleValue(s.Value),
Timestamp: model.Time(s.TimestampMs),
})
}
m = append(m, &ss)
}
return m
}
func labelPairsToMetric(labelPairs []*LabelPair) model.Metric {
metric := make(model.Metric, len(labelPairs))
for _, l := range labelPairs {
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
return metric
}

View File

@ -0,0 +1,64 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"sort"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/metric"
)
// This is a struct and not just a renamed type because otherwise the Metric
// field and Metric() methods would clash.
type sampleStreamIterator struct {
ss *model.SampleStream
}
func (it sampleStreamIterator) Metric() metric.Metric {
return metric.Metric{Metric: it.ss.Metric}
}
func (it sampleStreamIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair {
// TODO: This is a naive inefficient approach - in reality, queries go mostly
// linearly through iterators, and we will want to make successive calls to
// this method more efficient by taking into account the last result index
// somehow (similarly to how it's done in Prometheus's
// memorySeriesIterators).
i := sort.Search(len(it.ss.Values), func(n int) bool {
return it.ss.Values[n].Timestamp.After(ts)
})
if i == 0 {
return model.SamplePair{Timestamp: model.Earliest}
}
return it.ss.Values[i-1]
}
func (it sampleStreamIterator) RangeValues(in metric.Interval) []model.SamplePair {
n := len(it.ss.Values)
start := sort.Search(n, func(i int) bool {
return !it.ss.Values[i].Timestamp.Before(in.OldestInclusive)
})
end := sort.Search(n, func(i int) bool {
return it.ss.Values[i].Timestamp.After(in.NewestInclusive)
})
if start == n {
return nil
}
return it.ss.Values[start:end]
}
func (it sampleStreamIterator) Close() {}

116
storage/remote/read.go Normal file
View File

@ -0,0 +1,116 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"sync"
"time"
"github.com/prometheus/common/model"
"golang.org/x/net/context"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
)
// Reader allows reading from multiple remote sources.
type Reader struct {
mtx sync.Mutex
clients []*Client
}
// ApplyConfig updates the state as the new config requires.
func (r *Reader) ApplyConfig(conf *config.Config) error {
clients := []*Client{}
for i, rrConf := range conf.RemoteReadConfigs {
c, err := NewClient(i, &clientConfig{
url: rrConf.URL,
tlsConfig: rrConf.TLSConfig,
proxyURL: &rrConf.ProxyURL,
basicAuth: rrConf.BasicAuth,
timeout: rrConf.RemoteTimeout,
})
if err != nil {
return err
}
clients = append(clients, c)
}
r.mtx.Lock()
defer r.mtx.Unlock()
r.clients = clients
return nil
}
// Queriers returns a list of Queriers for the currently configured
// remote read endpoints.
func (r *Reader) Queriers() []local.Querier {
r.mtx.Lock()
defer r.mtx.Unlock()
queriers := make([]local.Querier, 0, len(r.clients))
for _, c := range r.clients {
queriers = append(queriers, &querier{client: c})
}
return queriers
}
// querier is an adapter to make a Client usable as a promql.Querier.
type querier struct {
client *Client
}
func (q *querier) QueryRange(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
return matrixToIterators(q.client.Read(ctx, from, through, matchers))
}
func (q *querier) QueryInstant(ctx context.Context, ts model.Time, stalenessDelta time.Duration, matchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
return matrixToIterators(q.client.Read(ctx, ts.Add(-stalenessDelta), ts, matchers))
}
func matrixToIterators(m model.Matrix, err error) ([]local.SeriesIterator, error) {
if err != nil {
return nil, err
}
its := make([]local.SeriesIterator, 0, len(m))
for _, ss := range m {
its = append(its, sampleStreamIterator{
ss: ss,
})
}
return its, nil
}
func (q *querier) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matcherSets ...metric.LabelMatchers) ([]metric.Metric, error) {
// TODO: Implement remote metadata querying.
return nil, nil
}
func (q *querier) LastSampleForLabelMatchers(ctx context.Context, cutoff model.Time, matcherSets ...metric.LabelMatchers) (model.Vector, error) {
// TODO: Implement remote last sample querying.
return nil, nil
}
func (q *querier) LabelValuesForLabelName(ctx context.Context, ln model.LabelName) (model.LabelValues, error) {
// TODO: Implement remote metadata querying.
return nil, nil
}
func (q *querier) Close() error {
return nil
}

View File

@ -36,7 +36,13 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
// TODO: we should only stop & recreate queues which have changes,
// as this can be quite disruptive.
for i, rwConf := range conf.RemoteWriteConfigs {
c, err := NewClient(i, rwConf)
c, err := NewClient(i, &clientConfig{
url: rwConf.URL,
tlsConfig: rwConf.TLSConfig,
proxyURL: &rwConf.ProxyURL,
basicAuth: rwConf.BasicAuth,
timeout: rwConf.RemoteTimeout,
})
if err != nil {
return err
}

View File

@ -13,6 +13,10 @@ It has these top-level messages:
LabelPair
TimeSeries
WriteRequest
ReadRequest
ReadResponse
Query
LabelMatcher
*/
package remote
@ -31,6 +35,33 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type MatchType int32
const (
MatchType_EQUAL MatchType = 0
MatchType_NOT_EQUAL MatchType = 1
MatchType_REGEX_MATCH MatchType = 2
MatchType_REGEX_NO_MATCH MatchType = 3
)
var MatchType_name = map[int32]string{
0: "EQUAL",
1: "NOT_EQUAL",
2: "REGEX_MATCH",
3: "REGEX_NO_MATCH",
}
var MatchType_value = map[string]int32{
"EQUAL": 0,
"NOT_EQUAL": 1,
"REGEX_MATCH": 2,
"REGEX_NO_MATCH": 3,
}
func (x MatchType) String() string {
return proto.EnumName(MatchType_name, int32(x))
}
func (MatchType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type Sample struct {
Value float64 `protobuf:"fixed64,1,opt,name=value" json:"value,omitempty"`
TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs" json:"timestamp_ms,omitempty"`
@ -92,29 +123,106 @@ func (m *WriteRequest) GetTimeseries() []*TimeSeries {
return nil
}
type ReadRequest struct {
Queries []*Query `protobuf:"bytes,1,rep,name=queries" json:"queries,omitempty"`
}
func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func (m *ReadRequest) GetQueries() []*Query {
if m != nil {
return m.Queries
}
return nil
}
type ReadResponse struct {
Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"`
}
func (m *ReadResponse) Reset() { *m = ReadResponse{} }
func (m *ReadResponse) String() string { return proto.CompactTextString(m) }
func (*ReadResponse) ProtoMessage() {}
func (*ReadResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} }
func (m *ReadResponse) GetTimeseries() []*TimeSeries {
if m != nil {
return m.Timeseries
}
return nil
}
type Query struct {
StartTimestampMs int64 `protobuf:"varint,1,opt,name=start_timestamp_ms,json=startTimestampMs" json:"start_timestamp_ms,omitempty"`
EndTimestampMs int64 `protobuf:"varint,2,opt,name=end_timestamp_ms,json=endTimestampMs" json:"end_timestamp_ms,omitempty"`
Matchers []*LabelMatcher `protobuf:"bytes,3,rep,name=matchers" json:"matchers,omitempty"`
}
func (m *Query) Reset() { *m = Query{} }
func (m *Query) String() string { return proto.CompactTextString(m) }
func (*Query) ProtoMessage() {}
func (*Query) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} }
func (m *Query) GetMatchers() []*LabelMatcher {
if m != nil {
return m.Matchers
}
return nil
}
type LabelMatcher struct {
Type MatchType `protobuf:"varint,1,opt,name=type,enum=remote.MatchType" json:"type,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"`
Value string `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"`
}
func (m *LabelMatcher) Reset() { *m = LabelMatcher{} }
func (m *LabelMatcher) String() string { return proto.CompactTextString(m) }
func (*LabelMatcher) ProtoMessage() {}
func (*LabelMatcher) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} }
func init() {
proto.RegisterType((*Sample)(nil), "remote.Sample")
proto.RegisterType((*LabelPair)(nil), "remote.LabelPair")
proto.RegisterType((*TimeSeries)(nil), "remote.TimeSeries")
proto.RegisterType((*WriteRequest)(nil), "remote.WriteRequest")
proto.RegisterType((*ReadRequest)(nil), "remote.ReadRequest")
proto.RegisterType((*ReadResponse)(nil), "remote.ReadResponse")
proto.RegisterType((*Query)(nil), "remote.Query")
proto.RegisterType((*LabelMatcher)(nil), "remote.LabelMatcher")
proto.RegisterEnum("remote.MatchType", MatchType_name, MatchType_value)
}
func init() { proto.RegisterFile("remote.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 216 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x4c, 0x90, 0x3f, 0x4f, 0x80, 0x30,
0x10, 0xc5, 0x03, 0x68, 0x0d, 0x07, 0x31, 0xf1, 0xe2, 0xc0, 0xa8, 0x9d, 0x70, 0x61, 0xc0, 0xf8,
0x01, 0x74, 0xd6, 0xc4, 0x14, 0x13, 0x47, 0x53, 0x92, 0x1b, 0x9a, 0xb4, 0x82, 0x6d, 0xf1, 0xf3,
0x5b, 0x5a, 0xfe, 0xb8, 0xf5, 0xdd, 0xbd, 0x7b, 0xf7, 0xeb, 0x41, 0x6d, 0xc9, 0x4c, 0x9e, 0xba,
0xd9, 0x4e, 0x7e, 0x42, 0x96, 0x14, 0x7f, 0x06, 0x36, 0x48, 0x33, 0x6b, 0xc2, 0x5b, 0xb8, 0xfc,
0x95, 0x7a, 0xa1, 0x26, 0xbb, 0xcb, 0xda, 0x4c, 0x24, 0x81, 0xf7, 0x50, 0x7b, 0x65, 0xc8, 0xf9,
0x60, 0xfa, 0x32, 0xae, 0xc9, 0x43, 0xb3, 0x10, 0xd5, 0x51, 0x7b, 0x73, 0xfc, 0x09, 0xca, 0x57,
0x39, 0x92, 0x7e, 0x97, 0xca, 0x22, 0xc2, 0xc5, 0xb7, 0x34, 0x29, 0xa4, 0x14, 0xf1, 0x7d, 0x26,
0xe7, 0xb1, 0x98, 0x04, 0x97, 0x00, 0x1f, 0x21, 0x65, 0x20, 0xab, 0xc8, 0xe1, 0x03, 0x30, 0xbd,
0x86, 0xb8, 0x30, 0x59, 0xb4, 0x55, 0x7f, 0xd3, 0x6d, 0xb8, 0x47, 0xb4, 0xd8, 0x0c, 0xd8, 0xc2,
0x95, 0x8b, 0xc8, 0x2b, 0xcd, 0xea, 0xbd, 0xde, 0xbd, 0xe9, 0x27, 0x62, 0x6f, 0xf3, 0x17, 0xa8,
0x3f, 0xad, 0xf2, 0x24, 0xe8, 0x67, 0x09, 0xb8, 0xd8, 0x03, 0x44, 0xf0, 0xb8, 0x72, 0x5b, 0x84,
0xfb, 0xf0, 0x09, 0x23, 0xfe, 0xb9, 0x46, 0x16, 0xef, 0xf5, 0xf8, 0x17, 0x00, 0x00, 0xff, 0xff,
0x73, 0xb4, 0xd1, 0xb6, 0x3f, 0x01, 0x00, 0x00,
// 397 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x52, 0x5d, 0x6b, 0xe2, 0x40,
0x14, 0xdd, 0x24, 0x1a, 0x37, 0xd7, 0x98, 0xcd, 0x0e, 0x3e, 0xf8, 0xb8, 0x1b, 0x58, 0xd6, 0x5d,
0x8a, 0x14, 0x4b, 0xfb, 0x6e, 0x8b, 0xb4, 0x14, 0x3f, 0xea, 0x98, 0xd2, 0xbe, 0x85, 0xb1, 0x0e,
0x34, 0x90, 0x98, 0x34, 0x33, 0x16, 0xfc, 0x19, 0xfd, 0xc7, 0x9d, 0xcc, 0xe4, 0x4b, 0xf0, 0xa9,
0x6f, 0xb9, 0xf7, 0x9c, 0x7b, 0xee, 0xc9, 0x9c, 0x0b, 0x76, 0x46, 0xe3, 0x84, 0xd3, 0x51, 0x9a,
0x25, 0x3c, 0x41, 0xa6, 0xaa, 0xbc, 0x09, 0x98, 0x6b, 0x12, 0xa7, 0x11, 0x45, 0x7d, 0x68, 0xbf,
0x93, 0x68, 0x4f, 0x07, 0xda, 0x2f, 0x6d, 0xa8, 0x61, 0x55, 0xa0, 0xdf, 0x60, 0xf3, 0x30, 0xa6,
0x8c, 0x0b, 0x52, 0x10, 0xb3, 0x81, 0x2e, 0x40, 0x03, 0x77, 0xab, 0xde, 0x9c, 0x79, 0x97, 0x60,
0xcd, 0xc8, 0x86, 0x46, 0x0f, 0x24, 0xcc, 0x10, 0x82, 0xd6, 0x8e, 0xc4, 0x4a, 0xc4, 0xc2, 0xf2,
0xbb, 0x56, 0xd6, 0x65, 0x53, 0x15, 0x1e, 0x01, 0xf0, 0x85, 0xca, 0x9a, 0x66, 0x21, 0x65, 0xe8,
0x1f, 0x98, 0x51, 0x2e, 0xc2, 0xc4, 0xa4, 0x31, 0xec, 0x8e, 0x7f, 0x8e, 0x0a, 0xbb, 0x95, 0x34,
0x2e, 0x08, 0x68, 0x08, 0x1d, 0x26, 0x2d, 0xe7, 0x6e, 0x72, 0xae, 0x53, 0x72, 0xd5, 0x9f, 0xe0,
0x12, 0xf6, 0xae, 0xc1, 0x7e, 0xca, 0x42, 0x4e, 0x31, 0x7d, 0xdb, 0x0b, 0xbb, 0x68, 0x0c, 0x20,
0x8d, 0xcb, 0x95, 0xc5, 0x22, 0x54, 0x0e, 0xd7, 0x66, 0x70, 0x83, 0xe5, 0x5d, 0x41, 0x17, 0x53,
0xb2, 0x2d, 0x25, 0xfe, 0x42, 0x47, 0x7c, 0x34, 0xe6, 0x7b, 0xe5, 0xfc, 0x4a, 0xb4, 0x0f, 0xb8,
0x44, 0xf3, 0xdd, 0x6a, 0x8e, 0xa5, 0xc9, 0x8e, 0xd1, 0x2f, 0xed, 0xfe, 0xd0, 0xa0, 0x2d, 0x65,
0xd1, 0x19, 0x20, 0xf1, 0xdc, 0x19, 0x0f, 0x8e, 0xc2, 0xd0, 0x64, 0x18, 0xae, 0x44, 0xfc, 0x3a,
0x11, 0xf1, 0x42, 0x2e, 0xdd, 0x6d, 0x83, 0x13, 0xc1, 0x39, 0xa2, 0xdf, 0x64, 0x9e, 0xc3, 0xf7,
0x98, 0xf0, 0x97, 0x57, 0x9a, 0xb1, 0x81, 0x21, 0x3d, 0xf5, 0x8f, 0x1e, 0x7e, 0xae, 0x40, 0x5c,
0xb1, 0xbc, 0x00, 0xec, 0x26, 0x82, 0xfe, 0x40, 0x8b, 0x1f, 0x52, 0x15, 0xb8, 0x53, 0xc7, 0x26,
0x61, 0x5f, 0x00, 0x58, 0xc2, 0xd5, 0x5d, 0xe8, 0xa7, 0xee, 0xc2, 0x68, 0xdc, 0xc5, 0xff, 0x7b,
0xb0, 0xaa, 0x61, 0x64, 0x41, 0x7b, 0xba, 0x7a, 0x9c, 0xcc, 0xdc, 0x6f, 0xa8, 0x07, 0xd6, 0x62,
0xe9, 0x07, 0xaa, 0xd4, 0xd0, 0x0f, 0x91, 0xcb, 0xf4, 0x76, 0xfa, 0x1c, 0xcc, 0x27, 0xfe, 0xcd,
0x9d, 0xab, 0x8b, 0x0d, 0x8e, 0x6a, 0x2c, 0x96, 0x45, 0xcf, 0xd8, 0x98, 0xf2, 0xd8, 0x2f, 0x3e,
0x03, 0x00, 0x00, 0xff, 0xff, 0x24, 0x79, 0x44, 0x11, 0xfc, 0x02, 0x00, 0x00,
}

View File

@ -34,3 +34,30 @@ message TimeSeries {
message WriteRequest {
repeated TimeSeries timeseries = 1;
}
message ReadRequest {
repeated Query queries = 1;
}
message ReadResponse {
repeated TimeSeries timeseries = 1;
}
message Query {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
repeated LabelMatcher matchers = 3;
}
enum MatchType {
EQUAL = 0;
NOT_EQUAL = 1;
REGEX_MATCH = 2;
REGEX_NO_MATCH = 3;
}
message LabelMatcher {
MatchType type = 1;
string name = 2;
string value = 3;
}