diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index de002a0b2..c4077954e 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -178,6 +178,18 @@ func main() { queryLabelsEnd := queryLabelsCmd.Flag("end", "End time (RFC3339 or Unix timestamp).").String() queryLabelsMatch := queryLabelsCmd.Flag("match", "Series selector. Can be specified multiple times.").Strings() + pushCmd := app.Command("push", "Push to a Prometheus server.") + pushCmd.Flag("http.config.file", "HTTP client configuration file for promtool to connect to Prometheus.").PlaceHolder("").ExistingFileVar(&httpConfigFilePath) + pushMetricsCmd := pushCmd.Command("metrics", "Push metrics to a prometheus remote write.") + pushMetricsCmd.Arg("remote-write-url", "Prometheus remote write url to push metrics.").Required().URLVar(&serverURL) + metricFiles := pushMetricsCmd.Arg( + "metric-files", + "The metric files to push.", + ).Required().ExistingFiles() + metricJobLabel := pushMetricsCmd.Flag("job-label", "Job label to attach to metrics.").Default("promtool").String() + pushMetricsTimeout := pushMetricsCmd.Flag("timeout", "The time to wait for pushing metrics.").Default("30s").Duration() + pushMetricsHeaders := pushMetricsCmd.Flag("header", "Prometheus remote write header.").StringMap() + testCmd := app.Command("test", "Unit testing.") testRulesCmd := testCmd.Command("rules", "Unit tests for rules.") testRulesFiles := testRulesCmd.Arg( @@ -301,6 +313,9 @@ func main() { case checkMetricsCmd.FullCommand(): os.Exit(CheckMetrics(*checkMetricsExtended)) + case pushMetricsCmd.FullCommand(): + os.Exit(PushMetrics(serverURL, httpRoundTripper, *pushMetricsHeaders, *pushMetricsTimeout, *metricJobLabel, *metricFiles...)) + case queryInstantCmd.FullCommand(): os.Exit(QueryInstant(serverURL, httpRoundTripper, *queryInstantExpr, *queryInstantTime, p)) diff --git a/cmd/promtool/metrics.go b/cmd/promtool/metrics.go new file mode 100644 index 000000000..21fcd3e66 --- /dev/null +++ b/cmd/promtool/metrics.go @@ -0,0 +1,234 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/url" + "os" + "sort" + "time" + + "github.com/golang/snappy" + dto "github.com/prometheus/client_model/go" + config_util "github.com/prometheus/common/config" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" +) + +// Push metrics to a prometheus remote write. +func PushMetrics(url *url.URL, roundTripper http.RoundTripper, headers map[string]string, timeout time.Duration, jobLabel string, files ...string) int { + // remote write should respect specification: https://prometheus.io/docs/concepts/remote_write_spec/ + failed := false + + addressURL, err := url.Parse(url.String()) + if err != nil { + fmt.Fprintln(os.Stderr, err) + return failureExitCode + } + + // build remote write client + writeClient, err := remote.NewWriteClient("remote-write", &remote.ClientConfig{ + URL: &config_util.URL{URL: addressURL}, + Timeout: model.Duration(timeout), + }) + if err != nil { + fmt.Fprintln(os.Stderr, err) + return failureExitCode + } + + // set custom tls config from httpConfigFilePath + // set custom headers to every request + client, ok := writeClient.(*remote.Client) + if !ok { + fmt.Fprintln(os.Stderr, fmt.Errorf("unexpected type %T", writeClient)) + return failureExitCode + } + client.Client.Transport = &setHeadersTransport{ + RoundTripper: roundTripper, + headers: headers, + } + + for _, f := range files { + var data []byte + var err error + data, err = os.ReadFile(f) + if err != nil { + fmt.Fprintln(os.Stderr, err) + failed = true + continue + } + + fmt.Printf("Parsing metric file %s\n", f) + metricsData, err := parseMetricsTextAndFormat(bytes.NewReader(data), jobLabel) + if err != nil { + fmt.Fprintln(os.Stderr, err) + failed = true + continue + } + + raw, err := metricsData.Marshal() + if err != nil { + fmt.Fprintln(os.Stderr, err) + failed = true + continue + } + + // Encode the request body into snappy encoding. + compressed := snappy.Encode(nil, raw) + err = client.Store(context.Background(), compressed) + if err != nil { + fmt.Fprintln(os.Stderr, err) + failed = true + continue + } + fmt.Printf("Successfully pushed metric file %s\n", f) + } + + if failed { + return failureExitCode + } + + return successExitCode +} + +type setHeadersTransport struct { + http.RoundTripper + headers map[string]string +} + +func (s *setHeadersTransport) RoundTrip(req *http.Request) (*http.Response, error) { + for key, value := range s.headers { + req.Header.Set(key, value) + } + return s.RoundTripper.RoundTrip(req) +} + +var MetricMetadataTypeValue = map[string]int32{ + "UNKNOWN": 0, + "COUNTER": 1, + "GAUGE": 2, + "HISTOGRAM": 3, + "GAUGEHISTOGRAM": 4, + "SUMMARY": 5, + "INFO": 6, + "STATESET": 7, +} + +// formatMetrics convert metric family to a writerequest +func formatMetrics(mf map[string]*dto.MetricFamily, jobLabel string) (*prompb.WriteRequest, error) { + wr := &prompb.WriteRequest{} + + // build metric list + sortedMetricNames := make([]string, 0, len(mf)) + for metric := range mf { + sortedMetricNames = append(sortedMetricNames, metric) + } + // sort metrics name in lexicographical order + sort.Strings(sortedMetricNames) + + for _, metricName := range sortedMetricNames { + // Set metadata writerequest + mtype := MetricMetadataTypeValue[mf[metricName].Type.String()] + metadata := prompb.MetricMetadata{ + MetricFamilyName: mf[metricName].GetName(), + Type: prompb.MetricMetadata_MetricType(mtype), + Help: mf[metricName].GetHelp(), + } + wr.Metadata = append(wr.Metadata, metadata) + + for _, metric := range mf[metricName].Metric { + var timeserie prompb.TimeSeries + + // build labels map + labels := make(map[string]string, len(metric.Label)+2) + labels[model.MetricNameLabel] = metricName + labels[model.JobLabel] = jobLabel + + for _, label := range metric.Label { + labelname := label.GetName() + if labelname == model.JobLabel { + labelname = fmt.Sprintf("%s%s", model.ExportedLabelPrefix, labelname) + } + labels[labelname] = label.GetValue() + } + + // build labels name list + sortedLabelNames := make([]string, 0, len(labels)) + for label := range labels { + sortedLabelNames = append(sortedLabelNames, label) + } + // sort labels name in lexicographical order + sort.Strings(sortedLabelNames) + + for _, label := range sortedLabelNames { + timeserie.Labels = append(timeserie.Labels, prompb.Label{ + Name: label, + Value: labels[label], + }) + } + + timeserie.Samples = []prompb.Sample{ + { + Timestamp: time.Now().UnixNano() / int64(time.Millisecond), + Value: getMetricsValue(metric), + }, + } + + wr.Timeseries = append(wr.Timeseries, timeserie) + } + } + return wr, nil +} + +// parseMetricsTextReader consumes an io.Reader and returns the MetricFamily +func parseMetricsTextReader(input io.Reader) (map[string]*dto.MetricFamily, error) { + var parser expfmt.TextParser + mf, err := parser.TextToMetricFamilies(input) + if err != nil { + return nil, err + } + return mf, nil +} + +// getMetricsValue return the value of a timeserie without the need to give value type +func getMetricsValue(m *dto.Metric) float64 { + switch { + case m.Gauge != nil: + return m.GetGauge().GetValue() + case m.Counter != nil: + return m.GetCounter().GetValue() + case m.Untyped != nil: + return m.GetUntyped().GetValue() + default: + return 0. + } +} + +// parseMetricsTextAndFormat return the data in the expected prometheus metrics write request format +func parseMetricsTextAndFormat(input io.Reader, jobLabel string) (*prompb.WriteRequest, error) { + mf, err := parseMetricsTextReader(input) + if err != nil { + return nil, err + } + + return formatMetrics(mf, jobLabel) +} diff --git a/docs/command-line/promtool.md b/docs/command-line/promtool.md index e149d374a..ac159a921 100644 --- a/docs/command-line/promtool.md +++ b/docs/command-line/promtool.md @@ -27,6 +27,7 @@ Tooling for the Prometheus monitoring system. | check | Check the resources for validity. | | query | Run query against a Prometheus server. | | debug | Fetch debug information. | +| push | Push to a Prometheus server. | | test | Unit testing. | | tsdb | Run tsdb commands. | @@ -372,6 +373,48 @@ Fetch all debug information. +### `promtool push` + +Push to a Prometheus server. + + + +#### Flags + +| Flag | Description | +| --- | --- | +| --http.config.file | HTTP client configuration file for promtool to connect to Prometheus. | + + + + +##### `promtool push metrics` + +Push metrics to a prometheus remote write. + + + +###### Flags + +| Flag | Description | Default | +| --- | --- | --- | +| --job-label | Job label to attach to metrics. | `promtool` | +| --timeout | The time to wait for pushing metrics. | `30s` | +| --header | Prometheus remote write header. | | + + + + +###### Arguments + +| Argument | Description | Required | +| --- | --- | --- | +| remote-write-url | Prometheus remote write url to push metrics. | Yes | +| metric-files | The metric files to push. | Yes | + + + + ### `promtool test` Unit testing.