diff --git a/documentation/examples/remote_storage/remote_storage_bridge/graphite/client.go b/documentation/examples/remote_storage/remote_storage_bridge/graphite/client.go index 8ce28c4a5..45373cf19 100644 --- a/documentation/examples/remote_storage/remote_storage_bridge/graphite/client.go +++ b/documentation/examples/remote_storage/remote_storage_bridge/graphite/client.go @@ -72,8 +72,8 @@ func pathFromMetric(m model.Metric, prefix string) string { return buffer.String() } -// Store sends a batch of samples to Graphite. -func (c *Client) Store(samples model.Samples) error { +// Write sends a batch of samples to Graphite. +func (c *Client) Write(samples model.Samples) error { conn, err := net.DialTimeout(c.transport, c.address, c.timeout) if err != nil { return err diff --git a/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client.go b/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client.go index ae1fa4d63..26b5d56bd 100644 --- a/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client.go +++ b/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client.go @@ -14,26 +14,30 @@ package influxdb import ( + "encoding/json" + "fmt" "math" + "strings" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/remote" - influx "github.com/influxdb/influxdb/client" + influx "github.com/influxdata/influxdb/client/v2" ) // Client allows sending batches of Prometheus samples to InfluxDB. type Client struct { - client *influx.Client + client influx.Client database string retentionPolicy string ignoredSamples prometheus.Counter } // NewClient creates a new Client. -func NewClient(conf influx.Config, db string, rp string) *Client { - c, err := influx.NewClient(conf) +func NewClient(conf influx.HTTPConfig, db string, rp string) *Client { + c, err := influx.NewHTTPClient(conf) // Currently influx.NewClient() *should* never return an error. if err != nil { log.Fatal(err) @@ -63,9 +67,9 @@ func tagsFromMetric(m model.Metric) map[string]string { return tags } -// Store sends a batch of samples to InfluxDB via its HTTP API. -func (c *Client) Store(samples model.Samples) error { - points := make([]influx.Point, 0, len(samples)) +// Write sends a batch of samples to InfluxDB via its HTTP API. +func (c *Client) Write(samples model.Samples) error { + points := make([]*influx.Point, 0, len(samples)) for _, s := range samples { v := float64(s.Value) if math.IsNaN(v) || math.IsInf(v, 0) { @@ -73,24 +77,221 @@ func (c *Client) Store(samples model.Samples) error { c.ignoredSamples.Inc() continue } - points = append(points, influx.Point{ - Measurement: string(s.Metric[model.MetricNameLabel]), - Tags: tagsFromMetric(s.Metric), - Time: s.Timestamp.Time(), - Precision: "ms", - Fields: map[string]interface{}{ - "value": v, - }, - }) + p, err := influx.NewPoint( + string(s.Metric[model.MetricNameLabel]), + tagsFromMetric(s.Metric), + map[string]interface{}{"value": v}, + s.Timestamp.Time(), + ) + if err != nil { + return err + } + points = append(points, p) } - bps := influx.BatchPoints{ - Points: points, + bps, err := influx.NewBatchPoints(influx.BatchPointsConfig{ + Precision: "ms", Database: c.database, RetentionPolicy: c.retentionPolicy, + }) + if err != nil { + return err } - _, err := c.client.Write(bps) - return err + bps.AddPoints(points) + return c.client.Write(bps) +} + +func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) { + labelsToSeries := map[string]*remote.TimeSeries{} + for _, q := range req.Queries { + command, err := buildCommand(q) + if err != nil { + return nil, err + } + + query := influx.NewQuery(command, c.database, "ms") + resp, err := c.client.Query(query) + if err != nil { + return nil, err + } + if resp.Err != "" { + return nil, fmt.Errorf(resp.Err) + } + + if err = mergeResult(labelsToSeries, resp.Results); err != nil { + return nil, err + } + } + + resp := remote.ReadResponse{ + Timeseries: make([]*remote.TimeSeries, 0, len(labelsToSeries)), + } + for _, ts := range labelsToSeries { + resp.Timeseries = append(resp.Timeseries, ts) + } + return &resp, nil +} + +func buildCommand(q *remote.Query) (string, error) { + matchers := make([]string, 0, len(q.Matchers)) + // If we don't find a metric name matcher, query all metrics + // (InfluxDB measurements) by default. + from := "FROM /.+/" + for _, m := range q.Matchers { + if m.Name == model.MetricNameLabel { + switch m.Type { + case remote.MatchType_EQUAL: + from = fmt.Sprintf("FROM %q", m.Value) + case remote.MatchType_REGEX_MATCH: + from = fmt.Sprintf("FROM /^%s$/", escapeSlashes(m.Value)) + default: + // TODO: Figure out how to support these efficiently. + return "", fmt.Errorf("non-equal or regex-non-equal matchers are not supported on the metric name yet") + } + continue + } + + switch m.Type { + case remote.MatchType_EQUAL: + matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value))) + case remote.MatchType_NOT_EQUAL: + matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value))) + case remote.MatchType_REGEX_MATCH: + matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value))) + case remote.MatchType_REGEX_NO_MATCH: + matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value))) + default: + return "", fmt.Errorf("unknown match type %v", m.Type) + } + } + matchers = append(matchers, fmt.Sprintf("time >= %vms", q.StartTimestampMs)) + matchers = append(matchers, fmt.Sprintf("time <= %vms", q.EndTimestampMs)) + + return fmt.Sprintf("SELECT value %s WHERE %v GROUP BY *", from, strings.Join(matchers, " AND ")), nil +} + +func escapeSingleQuotes(str string) string { + return strings.Replace(str, `'`, `\'`, -1) +} + +func escapeSlashes(str string) string { + return strings.Replace(str, `/`, `\/`, -1) +} + +func mergeResult(labelsToSeries map[string]*remote.TimeSeries, results []influx.Result) error { + for _, r := range results { + for _, s := range r.Series { + k := concatLabels(s.Tags) + ts, ok := labelsToSeries[k] + if !ok { + ts = &remote.TimeSeries{ + Labels: tagsToLabelPairs(s.Name, s.Tags), + } + labelsToSeries[k] = ts + } + + samples, err := valuesToSamples(s.Values) + if err != nil { + return err + } + + ts.Samples = mergeSamples(ts.Samples, samples) + } + } + return nil +} + +func concatLabels(labels map[string]string) string { + // 0xff cannot cannot occur in valid UTF-8 sequences, so use it + // as a separator here. + separator := "\xff" + pairs := make([]string, 0, len(labels)) + for k, v := range labels { + pairs = append(pairs, k+separator+v) + } + return strings.Join(pairs, separator) +} + +func tagsToLabelPairs(name string, tags map[string]string) []*remote.LabelPair { + pairs := make([]*remote.LabelPair, 0, len(tags)) + for k, v := range tags { + if v == "" { + // If we select metrics with different sets of labels names, + // InfluxDB returns *all* possible tag names on all returned + // series, with empty tag values on series where they don't + // apply. In Prometheus, an empty label value is equivalent + // to a non-existent label, so we just skip empty ones here + // to make the result correct. + continue + } + pairs = append(pairs, &remote.LabelPair{ + Name: k, + Value: v, + }) + } + pairs = append(pairs, &remote.LabelPair{ + Name: model.MetricNameLabel, + Value: name, + }) + return pairs +} + +func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) { + samples := make([]*remote.Sample, 0, len(values)) + for _, v := range values { + if len(v) != 2 { + return nil, fmt.Errorf("bad sample tuple length, expected [, ], got %v", v) + } + + jsonTimestamp, ok := v[0].(json.Number) + if !ok { + return nil, fmt.Errorf("bad timestamp: %v", v[0]) + } + + jsonValue, ok := v[1].(json.Number) + if !ok { + return nil, fmt.Errorf("bad sample value: %v", v[1]) + } + + timestamp, err := jsonTimestamp.Int64() + if err != nil { + return nil, fmt.Errorf("unable to convert sample timestamp to int64: %v", err) + } + + value, err := jsonValue.Float64() + if err != nil { + return nil, fmt.Errorf("unable to convert sample value to float64: %v", err) + } + + samples = append(samples, &remote.Sample{ + TimestampMs: timestamp, + Value: value, + }) + } + return samples, nil +} + +// mergeSamples merges two lists of sample pairs and removes duplicate +// timestamps. It assumes that both lists are sorted by timestamp. +func mergeSamples(a, b []*remote.Sample) []*remote.Sample { + result := make([]*remote.Sample, 0, len(a)+len(b)) + i, j := 0, 0 + for i < len(a) && j < len(b) { + if a[i].TimestampMs < b[j].TimestampMs { + result = append(result, a[i]) + i++ + } else if a[i].TimestampMs > b[j].TimestampMs { + result = append(result, b[j]) + j++ + } else { + result = append(result, a[i]) + i++ + j++ + } + } + result = append(result, a[i:]...) + result = append(result, b[j:]...) + return result } // Name identifies the client as an InfluxDB client. diff --git a/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client_test.go b/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client_test.go index 38a5e1a68..b3567fb75 100644 --- a/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client_test.go +++ b/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client_test.go @@ -22,7 +22,7 @@ import ( "testing" "time" - influx "github.com/influxdb/influxdb/client" + influx "github.com/influxdata/influxdb/client/v2" "github.com/prometheus/common/model" ) @@ -68,8 +68,8 @@ func TestClient(t *testing.T) { }, } - expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123000000 -testmetric,test_label=test_label_value2 value=5.1234 123456789123000000 + expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123 +testmetric,test_label=test_label_value2 value=5.1234 123456789123 ` server := httptest.NewServer(http.HandlerFunc( @@ -97,15 +97,15 @@ testmetric,test_label=test_label_value2 value=5.1234 123456789123000000 t.Fatalf("Unable to parse server URL %s: %s", server.URL, err) } - conf := influx.Config{ - URL: *serverURL, + conf := influx.HTTPConfig{ + Addr: serverURL.String(), Username: "testuser", Password: "testpass", Timeout: time.Minute, } c := NewClient(conf, "test_db", "default") - if err := c.Store(samples); err != nil { + if err := c.Write(samples); err != nil { t.Fatalf("Error sending samples: %s", err) } } diff --git a/documentation/examples/remote_storage/remote_storage_bridge/main.go b/documentation/examples/remote_storage/remote_storage_bridge/main.go index d6e7fcb45..7603511bc 100644 --- a/documentation/examples/remote_storage/remote_storage_bridge/main.go +++ b/documentation/examples/remote_storage/remote_storage_bridge/main.go @@ -16,6 +16,7 @@ package main import ( "flag" + "fmt" "io/ioutil" "net/http" _ "net/http/pprof" @@ -30,7 +31,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" - influx "github.com/influxdb/influxdb/client" + influx "github.com/influxdata/influxdb/client/v2" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/graphite" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/influxdb" @@ -95,8 +96,8 @@ func main() { cfg := parseFlags() http.Handle(cfg.telemetryPath, prometheus.Handler()) - clients := buildClients(cfg) - serve(cfg.listenAddr, clients) + writers, readers := buildClients(cfg) + serve(cfg.listenAddr, writers, readers) } func parseFlags() *config { @@ -119,7 +120,7 @@ func parseFlags() *config { flag.StringVar(&cfg.influxdbURL, "influxdb-url", "", "The URL of the remote InfluxDB server to send samples to. None, if empty.", ) - flag.StringVar(&cfg.influxdbRetentionPolicy, "influxdb.retention-policy", "default", + flag.StringVar(&cfg.influxdbRetentionPolicy, "influxdb.retention-policy", "autogen", "The InfluxDB retention policy to use.", ) flag.StringVar(&cfg.influxdbUsername, "influxdb.username", "", @@ -139,38 +140,50 @@ func parseFlags() *config { return cfg } -func buildClients(cfg *config) []remote.StorageClient { - var clients []remote.StorageClient +type writer interface { + Write(samples model.Samples) error + Name() string +} + +type reader interface { + Read(req *remote.ReadRequest) (*remote.ReadResponse, error) + Name() string +} + +func buildClients(cfg *config) ([]writer, []reader) { + var writers []writer + var readers []reader if cfg.graphiteAddress != "" { c := graphite.NewClient( cfg.graphiteAddress, cfg.graphiteTransport, cfg.remoteTimeout, cfg.graphitePrefix) - clients = append(clients, c) + writers = append(writers, c) } if cfg.opentsdbURL != "" { c := opentsdb.NewClient(cfg.opentsdbURL, cfg.remoteTimeout) - clients = append(clients, c) + writers = append(writers, c) } if cfg.influxdbURL != "" { url, err := url.Parse(cfg.influxdbURL) if err != nil { log.Fatalf("Failed to parse InfluxDB URL %q: %v", cfg.influxdbURL, err) } - conf := influx.Config{ - URL: *url, + conf := influx.HTTPConfig{ + Addr: url.String(), Username: cfg.influxdbUsername, Password: cfg.influxdbPassword, Timeout: cfg.remoteTimeout, } c := influxdb.NewClient(conf, cfg.influxdbDatabase, cfg.influxdbRetentionPolicy) prometheus.MustRegister(c) - clients = append(clients, c) + writers = append(writers, c) + readers = append(readers, c) } - return clients + return writers, readers } -func serve(addr string, clients []remote.StorageClient) error { - http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) { +func serve(addr string, writers []writer, readers []reader) error { + http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) { reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body)) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -187,16 +200,57 @@ func serve(addr string, clients []remote.StorageClient) error { receivedSamples.Add(float64(len(samples))) var wg sync.WaitGroup - for _, c := range clients { + for _, w := range writers { wg.Add(1) - go func(rc remote.StorageClient) { - sendSamples(rc, samples) + go func(rw writer) { + sendSamples(rw, samples) wg.Done() - }(c) + }(w) } wg.Wait() }) + http.HandleFunc("/read", func(w http.ResponseWriter, r *http.Request) { + reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body)) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var req remote.ReadRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // TODO: Support reading from more than one reader and merging the results. + if len(readers) != 1 { + http.Error(w, fmt.Sprintf("expected exactly one reader, found %d readers", len(readers)), http.StatusInternalServerError) + return + } + reader := readers[0] + + var resp *remote.ReadResponse + resp, err = reader.Read(&req) + if err != nil { + log.With("query", req).With("storage", reader.Name()).With("err", err).Warnf("Error executing query") + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + data, err := proto.Marshal(resp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/x-protobuf") + if _, err := snappy.NewWriter(w).Write(data); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + }) + return http.ListenAndServe(addr, nil) } @@ -219,14 +273,14 @@ func protoToSamples(req *remote.WriteRequest) model.Samples { return samples } -func sendSamples(c remote.StorageClient, samples model.Samples) { +func sendSamples(w writer, samples model.Samples) { begin := time.Now() - err := c.Store(samples) + err := w.Write(samples) duration := time.Since(begin).Seconds() if err != nil { - log.Warnf("Error sending %d samples to remote storage %q: %v", len(samples), c.Name(), err) - failedSamples.WithLabelValues(c.Name()).Add(float64(len(samples))) + log.With("num_samples", len(samples)).With("storage", w.Name()).With("err", err).Warnf("Error sending samples to remote storage") + failedSamples.WithLabelValues(w.Name()).Add(float64(len(samples))) } - sentSamples.WithLabelValues(c.Name()).Add(float64(len(samples))) - sentBatchDuration.WithLabelValues(c.Name()).Observe(duration) + sentSamples.WithLabelValues(w.Name()).Add(float64(len(samples))) + sentBatchDuration.WithLabelValues(w.Name()).Observe(duration) } diff --git a/documentation/examples/remote_storage/remote_storage_bridge/opentsdb/client.go b/documentation/examples/remote_storage/remote_storage_bridge/opentsdb/client.go index fe8b79e77..4b6c0e6f4 100644 --- a/documentation/examples/remote_storage/remote_storage_bridge/opentsdb/client.go +++ b/documentation/examples/remote_storage/remote_storage_bridge/opentsdb/client.go @@ -69,8 +69,8 @@ func tagsFromMetric(m model.Metric) map[string]TagValue { return tags } -// Store sends a batch of samples to OpenTSDB via its HTTP API. -func (c *Client) Store(samples model.Samples) error { +// Write sends a batch of samples to OpenTSDB via its HTTP API. +func (c *Client) Write(samples model.Samples) error { reqs := make([]StoreSamplesRequest, 0, len(samples)) for _, s := range samples { v := float64(s.Value)