Merge pull request #192 from prometheus/feature/negotiate-telemetry-schema-through-mime-type

Use Content-Type data for telemetry versioning
This commit is contained in:
Bernerd Schaefer 2013-04-29 01:30:37 -07:00
commit b04cd28862
6 changed files with 440 additions and 3 deletions

View File

@ -34,6 +34,22 @@ const (
// match. // match.
type LabelSet map[LabelName]LabelValue type LabelSet map[LabelName]LabelValue
// Helper function to non-destructively merge two label sets.
func (l LabelSet) Merge(other LabelSet) LabelSet {
result := make(LabelSet, len(l))
for k, v := range l {
result[k] = v
}
for k, v := range other {
result[k] = v
}
return result
}
func (l LabelSet) String() string { func (l LabelSet) String() string {
var ( var (
buffer bytes.Buffer buffer bytes.Buffer

View File

@ -15,6 +15,7 @@ package format
import ( import (
"fmt" "fmt"
"mime"
"net/http" "net/http"
) )
@ -39,9 +40,30 @@ func (r *registry) ProcessorForRequestHeader(header http.Header) (processor Proc
return return
} }
prometheusApiVersion := header.Get("X-Prometheus-API-Version") mediatype, params, err := mime.ParseMediaType(header.Get("Content-Type"))
if err != nil {
err = fmt.Errorf("Invalid Content-Type header %q: %s", header.Get("Content-Type"), err)
return
}
if mediatype != "application/json" {
err = fmt.Errorf("Unsupported media type %q, expected %q", mediatype, "application/json")
return
}
var prometheusApiVersion string
if params["schema"] == "prometheus/telemetry" && params["version"] != "" {
prometheusApiVersion = params["version"]
} else {
prometheusApiVersion = header.Get("X-Prometheus-API-Version")
}
switch prometheusApiVersion { switch prometheusApiVersion {
case "0.0.2":
processor = Processor002
return
case "0.0.1": case "0.0.1":
processor = Processor001 processor = Processor001
return return

View File

@ -31,12 +31,22 @@ func testDiscriminatorHttpHeader(t test.Tester) {
err: fmt.Errorf("Received illegal and nil header."), err: fmt.Errorf("Received illegal and nil header."),
}, },
{ {
input: map[string]string{"X-Prometheus-API-Version": "0.0.0"}, input: map[string]string{"Content-Type": "application/json", "X-Prometheus-API-Version": "0.0.0"},
output: nil, output: nil,
err: fmt.Errorf("Unrecognized API version 0.0.0"), err: fmt.Errorf("Unrecognized API version 0.0.0"),
}, },
{ {
input: map[string]string{"X-Prometheus-API-Version": "0.0.1"}, input: map[string]string{"Content-Type": "application/json", "X-Prometheus-API-Version": "0.0.1"},
output: Processor001,
err: nil,
},
{
input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.0`},
output: nil,
err: fmt.Errorf("Unrecognized API version 0.0.0"),
},
{
input: map[string]string{"Content-Type": `application/json; schema="prometheus/telemetry"; version=0.0.1`},
output: Processor001, output: Processor001,
err: nil, err: nil,
}, },

View File

@ -26,3 +26,24 @@ type Processor interface {
// Process performs the work on the input and closes the incoming stream. // Process performs the work on the input and closes the incoming stream.
Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) (err error)
} }
// The ProcessorFunc type allows the use of ordinary functions for processors.
type ProcessorFunc func(io.ReadCloser, time.Time, model.LabelSet, chan Result) error
func (f ProcessorFunc) Process(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) error {
return f(stream, timestamp, baseLabels, results)
}
// Helper function to convert map[string]string into model.LabelSet.
//
// NOTE: This should be deleted when support for go 1.0.3 is removed; 1.1 is
// smart enough to unmarshal JSON objects into model.LabelSet directly.
func LabelSet(labels map[string]string) model.LabelSet {
labelset := make(model.LabelSet, len(labels))
for k, v := range labels {
labelset[model.LabelName(k)] = model.LabelValue(v)
}
return labelset
}

View File

@ -0,0 +1,113 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package format
import (
"encoding/json"
"fmt"
"github.com/prometheus/prometheus/model"
"io"
"time"
)
// Processor for telemetry schema version 0.0.2.
var Processor002 ProcessorFunc = func(stream io.ReadCloser, timestamp time.Time, baseLabels model.LabelSet, results chan Result) error {
// container for telemetry data
var entities []struct {
BaseLabels map[string]string `json:"baseLabels"`
Docstring string `json:"docstring"`
Metric struct {
Type string `json:"type"`
Values json.RawMessage `json:"value"`
} `json:"metric"`
}
// concrete type for histogram values
type histogram struct {
Labels map[string]string `json:"labels"`
Values map[string]model.SampleValue `json:"value"`
}
// concrete type for counter and gauge values
type counter struct {
Labels map[string]string `json:"labels"`
Value model.SampleValue `json:"value"`
}
defer stream.Close()
if err := json.NewDecoder(stream).Decode(&entities); err != nil {
return err
}
for _, entity := range entities {
entityLabels := baseLabels.Merge(LabelSet(entity.BaseLabels))
switch entity.Metric.Type {
case "counter", "gauge":
var values []counter
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
results <- Result{
Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
}
continue
}
for _, counter := range values {
labels := entityLabels.Merge(LabelSet(counter.Labels))
results <- Result{
Sample: model.Sample{
Metric: model.Metric(labels),
Timestamp: timestamp,
Value: counter.Value,
},
}
}
case "histogram":
var values []histogram
if err := json.Unmarshal(entity.Metric.Values, &values); err != nil {
results <- Result{
Err: fmt.Errorf("Could not extract %s value: %s", entity.Metric.Type, err),
}
continue
}
for _, histogram := range values {
for percentile, value := range histogram.Values {
labels := entityLabels.Merge(LabelSet(histogram.Labels))
labels[model.LabelName("percentile")] = model.LabelValue(percentile)
results <- Result{
Sample: model.Sample{
Metric: model.Metric(labels),
Timestamp: timestamp,
Value: value,
},
}
}
}
default:
results <- Result{
Err: fmt.Errorf("Unknown metric type %q", entity.Metric.Type),
}
}
}
return nil
}

View File

@ -0,0 +1,255 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package format
import (
"container/list"
"fmt"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/utility/test"
"io/ioutil"
"strings"
"testing"
"time"
)
func testProcessor002Process(t test.Tester) {
var scenarios = []struct {
in string
out []Result
err error
}{
{
err: fmt.Errorf("EOF"),
},
{
in: `[{"baseLabels":{"name":"rpc_calls_total"},"docstring":"RPC calls.","metric":{"type":"counter","value":[{"labels":{"service":"zed"},"value":25},{"labels":{"service":"bar"},"value":25},{"labels":{"service":"foo"},"value":25}]}},{"baseLabels":{"name":"rpc_latency_microseconds"},"docstring":"RPC latency.","metric":{"type":"histogram","value":[{"labels":{"service":"foo"},"value":{"0.010000":15.890724674774395,"0.050000":15.890724674774395,"0.500000":84.63044031436561,"0.900000":160.21100853053224,"0.990000":172.49828748957728}},{"labels":{"service":"zed"},"value":{"0.010000":0.0459814091918713,"0.050000":0.0459814091918713,"0.500000":0.6120456642749681,"0.900000":1.355915069887731,"0.990000":1.772733213161236}},{"labels":{"service":"bar"},"value":{"0.010000":78.48563317257356,"0.050000":78.48563317257356,"0.500000":97.31798360385088,"0.900000":109.89202084295582,"0.990000":109.99626121011262}}]}}]`,
out: []Result{
{
Sample: model.Sample{
Metric: model.Metric{"service": "zed", model.MetricNameLabel: "rpc_calls_total"},
Value: 25,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"service": "bar", model.MetricNameLabel: "rpc_calls_total"},
Value: 25,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"service": "foo", model.MetricNameLabel: "rpc_calls_total"},
Value: 25,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 0.0459814091918713,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 78.48563317257356,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.010000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 15.890724674774395,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 0.0459814091918713,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 78.48563317257356,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.050000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 15.890724674774395,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 0.6120456642749681,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 97.31798360385088,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.500000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 84.63044031436561,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 1.355915069887731,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 109.89202084295582,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.900000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 160.21100853053224,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "zed"},
Value: 1.772733213161236,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "bar"},
Value: 109.99626121011262,
},
},
{
Sample: model.Sample{
Metric: model.Metric{"percentile": "0.990000", model.MetricNameLabel: "rpc_latency_microseconds", "service": "foo"},
Value: 172.49828748957728,
},
},
},
},
}
for i, scenario := range scenarios {
inputChannel := make(chan Result, 1024)
defer func(c chan Result) {
close(c)
}(inputChannel)
reader := strings.NewReader(scenario.in)
err := Processor002.Process(ioutil.NopCloser(reader), time.Now(), model.LabelSet{}, inputChannel)
if !test.ErrorEqual(scenario.err, err) {
t.Errorf("%d. expected err of %s, got %s", i, scenario.err, err)
continue
}
delivered := make([]Result, 0)
for len(inputChannel) != 0 {
delivered = append(delivered, <-inputChannel)
}
if len(delivered) != len(scenario.out) {
t.Errorf("%d. expected output length of %d, got %d", i, len(scenario.out), len(delivered))
continue
}
expectedElements := list.New()
for _, j := range scenario.out {
expectedElements.PushBack(j)
}
for j := 0; j < len(delivered); j++ {
actual := delivered[j]
found := false
for element := expectedElements.Front(); element != nil && found == false; element = element.Next() {
candidate := element.Value.(Result)
if !test.ErrorEqual(candidate.Err, actual.Err) {
continue
}
if candidate.Sample.Value != actual.Sample.Value {
continue
}
if len(candidate.Sample.Metric) != len(actual.Sample.Metric) {
continue
}
labelsMatch := false
for key, value := range candidate.Sample.Metric {
actualValue, ok := actual.Sample.Metric[key]
if !ok {
break
}
if actualValue == value {
labelsMatch = true
break
}
}
if !labelsMatch {
continue
}
// XXX: Test time.
found = true
expectedElements.Remove(element)
}
if !found {
t.Errorf("%d.%d. expected to find %s among candidate, absent", i, j, actual.Sample)
}
}
}
}
func TestProcessor002Process(t *testing.T) {
testProcessor002Process(t)
}
func BenchmarkProcessor002Process(b *testing.B) {
for i := 0; i < b.N; i++ {
testProcessor002Process(b)
}
}