Validate the metric names and labels in the remote write handler

Signed-off-by: Xiaochao Dong (@damnever) <the.xcdong@gmail.com>
This commit is contained in:
Xiaochao Dong (@damnever) 2022-12-09 15:27:56 +08:00
parent 8dba9163f1
commit 2b7202c4cc
3 changed files with 63 additions and 9 deletions

View File

@ -22,6 +22,8 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
@ -30,15 +32,28 @@ import (
type writeHandler struct {
logger log.Logger
appendable storage.Appendable
samplesWithInvalidLabelsTotal prometheus.Counter
}
// NewWriteHandler creates a http.Handler that accepts remote write requests and
// writes them to the provided appendable.
func NewWriteHandler(logger log.Logger, appendable storage.Appendable) http.Handler {
return &writeHandler{
func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable) http.Handler {
h := &writeHandler{
logger: logger,
appendable: appendable,
samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "prometheus",
Subsystem: "api",
Name: "remote_write_invalid_labels_samples_total",
Help: "The total number of remote write samples which contains invalid labels.",
}),
}
if reg != nil {
reg.MustRegister(h.samplesWithInvalidLabelsTotal)
}
return h
}
func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -85,6 +100,7 @@ func (h *writeHandler) checkAppendExemplarError(err error, e exemplar.Exemplar,
func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err error) {
outOfOrderExemplarErrs := 0
samplesWithInvalidLabels := 0
app := h.appendable.Appender(ctx)
defer func() {
@ -98,6 +114,11 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
var exemplarErr error
for _, ts := range req.Timeseries {
labels := labelProtosToLabels(ts.Labels)
if !labels.IsValid() {
level.Warn(h.logger).Log("msg", "Invalid metric names or labels", "got", labels.String())
samplesWithInvalidLabels++
continue
}
for _, s := range ts.Samples {
_, err = app.Append(0, labels, s.Timestamp, s.Value)
if err != nil {
@ -150,6 +171,9 @@ func (h *writeHandler) write(ctx context.Context, req *prompb.WriteRequest) (err
if outOfOrderExemplarErrs > 0 {
_ = level.Warn(h.logger).Log("msg", "Error on ingesting out-of-order exemplars", "num_dropped", outOfOrderExemplarErrs)
}
if samplesWithInvalidLabels > 0 {
h.samplesWithInvalidLabelsTotal.Add(float64(samplesWithInvalidLabels))
}
return nil
}

View File

@ -20,6 +20,8 @@ import (
"io"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
@ -43,7 +45,7 @@ func TestRemoteWriteHandler(t *testing.T) {
require.NoError(t, err)
appendable := &mockAppendable{}
handler := NewWriteHandler(nil, appendable)
handler := NewWriteHandler(nil, nil, appendable)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -94,7 +96,7 @@ func TestOutOfOrderSample(t *testing.T) {
appendable := &mockAppendable{
latestSample: 100,
}
handler := NewWriteHandler(log.NewNopLogger(), appendable)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -119,7 +121,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
appendable := &mockAppendable{
latestExemplar: 100,
}
handler := NewWriteHandler(log.NewNopLogger(), appendable)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -142,7 +144,7 @@ func TestOutOfOrderHistogram(t *testing.T) {
appendable := &mockAppendable{
latestHistogram: 100,
}
handler := NewWriteHandler(log.NewNopLogger(), appendable)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -151,6 +153,34 @@ func TestOutOfOrderHistogram(t *testing.T) {
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
}
func BenchmarkRemoteWritehandler(b *testing.B) {
const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte"
reqs := []*http.Request{}
for i := 0; i < b.N; i++ {
num := strings.Repeat(strconv.Itoa(i), 16)
buf, _, err := buildWriteRequest([]prompb.TimeSeries{{
Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric"},
{Name: "test_label_name_" + num, Value: labelValue + num},
},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)},
}}, nil, nil, nil)
require.NoError(b, err)
req, err := http.NewRequest("", "", bytes.NewReader(buf))
require.NoError(b, err)
reqs = append(reqs, req)
}
appendable := &mockAppendable{}
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
recorder := httptest.NewRecorder()
b.ResetTimer()
for _, req := range reqs {
handler.ServeHTTP(recorder, req)
}
}
func TestCommitErr(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
require.NoError(t, err)
@ -161,7 +191,7 @@ func TestCommitErr(t *testing.T) {
appendable := &mockAppendable{
commitErr: fmt.Errorf("commit error"),
}
handler := NewWriteHandler(log.NewNopLogger(), appendable)
handler := NewWriteHandler(log.NewNopLogger(), nil, appendable)
recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
@ -187,7 +217,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {
require.NoError(b, db.Close())
})
handler := NewWriteHandler(log.NewNopLogger(), db.Head())
handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head())
buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil)
require.NoError(b, err)

View File

@ -284,7 +284,7 @@ func NewAPI(
}
if ap != nil {
a.remoteWriteHandler = remote.NewWriteHandler(logger, ap)
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap)
}
return a