diff --git a/storage/remote/opentsdb/client.go b/storage/remote/opentsdb/client.go index d22aa5099..ecfa7cfcb 100644 --- a/storage/remote/opentsdb/client.go +++ b/storage/remote/opentsdb/client.go @@ -5,11 +5,13 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math" "net/http" "net/url" "regexp" "time" + "github.com/golang/glog" clientmodel "github.com/prometheus/client_golang/model" "github.com/prometheus/prometheus/utility" @@ -41,26 +43,20 @@ func NewClient(url string, timeout time.Duration) *Client { // StoreSamplesRequest is used for building a JSON request for storing samples // via the OpenTSDB. type StoreSamplesRequest struct { - Metric string `json:"metric"` - Timestamp int64 `json:"timestamp"` - Value clientmodel.SampleValue `json:"value"` - Tags map[string]string `json:"tags"` -} - -// escapeTagValue escapes Prometheus label values to valid tag values for -// OpenTSDB. -func escapeTagValue(l clientmodel.LabelValue) string { - return illegalCharsRE.ReplaceAllString(string(l), "_") + Metric TagValue `json:"metric"` + Timestamp int64 `json:"timestamp"` + Value float64 `json:"value"` + Tags map[string]TagValue `json:"tags"` } // tagsFromMetric translates Prometheus metric into OpenTSDB tags. -func tagsFromMetric(m clientmodel.Metric) map[string]string { - tags := make(map[string]string, len(m)-1) +func tagsFromMetric(m clientmodel.Metric) map[string]TagValue { + tags := make(map[string]TagValue, len(m)-1) for l, v := range m { if l == clientmodel.MetricNameLabel { continue } - tags[string(l)] = escapeTagValue(v) + tags[string(l)] = TagValue(v) } return tags } @@ -69,11 +65,16 @@ func tagsFromMetric(m clientmodel.Metric) map[string]string { func (c *Client) Store(samples clientmodel.Samples) error { reqs := make([]StoreSamplesRequest, 0, len(samples)) for _, s := range samples { - metric := escapeTagValue(s.Metric[clientmodel.MetricNameLabel]) + v := float64(s.Value) + if math.IsNaN(v) || math.IsInf(v, 0) { + glog.Warningf("cannot send value %d to OpenTSDB, skipping sample %#v", v, s) + continue + } + metric := TagValue(s.Metric[clientmodel.MetricNameLabel]) reqs = append(reqs, StoreSamplesRequest{ Metric: metric, Timestamp: s.Timestamp.Unix(), - Value: s.Value, + Value: v, Tags: tagsFromMetric(s.Metric), }) } diff --git a/storage/remote/opentsdb/client_test.go b/storage/remote/opentsdb/client_test.go index aa7f56f19..1f1372894 100644 --- a/storage/remote/opentsdb/client_test.go +++ b/storage/remote/opentsdb/client_test.go @@ -14,28 +14,62 @@ package opentsdb import ( + "bytes" + "encoding/json" + "reflect" "testing" clientmodel "github.com/prometheus/client_golang/model" ) -func TestTagsFromMetric(t *testing.T) { - input := clientmodel.Metric{ - clientmodel.MetricNameLabel: "testmetric", - "test:label": "test:value", +var ( + metric = clientmodel.Metric{ + clientmodel.MetricNameLabel: "test:metric", + "testlabel": "test:value", "many_chars": "abc!ABC:012-3!45ö67~89./", } - expected := map[string]string{ - "test:label": "test_value", - "many_chars": "abc_ABC_012-3_45_67_89./", +) + +func TestTagsFromMetric(t *testing.T) { + expected := map[string]TagValue{ + "testlabel": TagValue("test:value"), + "many_chars": TagValue("abc!ABC:012-3!45ö67~89./"), } - actual := tagsFromMetric(input) - if len(actual) != len(expected) { - t.Fatalf("Expected %v, got %v", expected, actual) - } - for k, v := range expected { - if v != actual[k] { - t.Fatalf("Expected %s => %s, got %s => %s", k, v, k, actual[k]) - } + actual := tagsFromMetric(metric) + if !reflect.DeepEqual(actual, expected) { + t.Errorf("Expected %#v, got %#v", expected, actual) + } +} + +func TestMarshalStoreSamplesRequest(t *testing.T) { + request := StoreSamplesRequest{ + Metric: TagValue("test:metric"), + Timestamp: 4711, + Value: 3.1415, + Tags: tagsFromMetric(metric), + } + expectedJSON := []byte(`{"metric":"test_.metric","timestamp":4711,"value":3.1415,"tags":{"many_chars":"abc_21ABC_.012-3_2145_C3_B667_7E89./","testlabel":"test_.value"}}`) + + resultingJSON, err := json.Marshal(request) + if err != nil { + t.Fatalf("Marshal(request) resulted in err: %s", err) + } + if !bytes.Equal(resultingJSON, expectedJSON) { + t.Errorf( + "Marshal(request) => %q, want %q", + resultingJSON, expectedJSON, + ) + } + + var unmarshaledRequest StoreSamplesRequest + err = json.Unmarshal(expectedJSON, &unmarshaledRequest) + if err != nil { + t.Fatalf("Unarshal(expectedJSON, &unmarshaledRequest) resulted in err: %s", err) + } + if !reflect.DeepEqual(unmarshaledRequest, request) { + t.Errorf( + "Unarshal(expectedJSON, &unmarshaledRequest) => %#v, want %#v", + unmarshaledRequest, request, + ) } } diff --git a/storage/remote/opentsdb/tagvalue.go b/storage/remote/opentsdb/tagvalue.go new file mode 100644 index 000000000..9f0d20ed2 --- /dev/null +++ b/storage/remote/opentsdb/tagvalue.go @@ -0,0 +1,144 @@ +package opentsdb + +import ( + "bytes" + "fmt" + + clientmodel "github.com/prometheus/client_golang/model" +) + +// TagValue is a clientmodel.LabelValue that implements json.Marshaler and +// json.Unmarshaler. These implementations avoid characters illegal in +// OpenTSDB. See the MarshalJSON for details. TagValue is used for the values of +// OpenTSDB tags as well as for OpenTSDB metric names. +type TagValue clientmodel.LabelValue + +// MarshalJSON marshals this TagValue into JSON that only contains runes allowed +// in OpenTSDB. It implements json.Marshaler. The runes allowed in OpenTSDB are +// all single-byte. This function encodes the arbitrary byte sequence found in +// this TagValue in the following way: +// +// - The string that underlies TagValue is scanned byte by byte. +// +// - If a byte represents a legal OpenTSDB rune with the exception of '_', that +// byte is directly copied to the resulting JSON byte slice. +// +// - If '_' is encountered, it is replaced by '__'. +// +// - If ':' is encountered, it is replaced by '_.'. +// +// - All other bytes are replaced by '_' followed by two bytes containing the +// uppercase ASCII representation of their hexadecimal value. +// +// This encoding allows to save arbitrary Go strings in OpenTSDB. That's +// required because Prometheus label values can contain anything, and even +// Prometheus metric names may (and often do) contain ':' (which is disallowed +// in OpenTSDB strings). The encoding uses '_' as an escape character and +// renders a ':' more or less recognizable as '_.' +// +// Examples: +// +// "foo-bar-42" -> "foo-bar-42" +// +// "foo_bar_42" -> "foo__bar__42" +// +// "http://example.org:8080" -> "http_.//example.org_.8080" +// +// "Björn's email: bjoern@soundcloud.com" -> +// "Bj_C3_B6rn_27s_20email_._20bjoern_40soundcloud.com" +// +// "日" -> "_E6_97_A5" +func (tv TagValue) MarshalJSON() ([]byte, error) { + length := len(tv) + // Need at least two more bytes than in tv. + result := bytes.NewBuffer(make([]byte, 1, length+2)) + result.WriteByte('"') + for i := 0; i < length; i++ { + b := tv[i] + switch { + case (b >= '-' && b <= '9') || // '-', '.', '/', 0-9 + (b >= 'A' && b <= 'Z') || + (b >= 'a' && b <= 'z'): + result.WriteByte(b) + case b == '_': + result.WriteString("__") + case b == ':': + result.WriteString("_.") + default: + result.WriteString(fmt.Sprintf("_%X", b)) + } + } + result.WriteByte('"') + return result.Bytes(), nil +} + +// UnmarshalJSON unmarshals JSON strings coming from OpenTSDB into Go strings +// by applying the inverse of what is described for the MarshalJSON method. +func (tv *TagValue) UnmarshalJSON(json []byte) error { + escapeLevel := 0 // How many bytes after '_'. + var parsedByte byte + + // Might need fewer bytes, but let's avoid realloc. + result := bytes.NewBuffer(make([]byte, 0, len(json)-2)) + + for i, b := range json { + if i == 0 { + if b != '"' { + return fmt.Errorf("expected '\"', got %q", b) + } + continue + } + if i == len(json)-1 { + if b != '"' { + return fmt.Errorf("expected '\"', got %q", b) + } + break + } + switch escapeLevel { + case 0: + if b == '_' { + escapeLevel = 1 + continue + } + result.WriteByte(b) + case 1: + switch { + case b == '_': + result.WriteByte('_') + escapeLevel = 0 + case b == '.': + result.WriteByte(':') + escapeLevel = 0 + case b >= '0' && b <= '9': + parsedByte = (b - 48) << 4 + escapeLevel = 2 + case b >= 'A' && b <= 'F': // A-F + parsedByte = (b - 55) << 4 + escapeLevel = 2 + default: + return fmt.Errorf( + "illegal escape sequence at byte %d (%c)", + i, b, + ) + } + case 2: + switch { + case b >= '0' && b <= '9': + parsedByte += b - 48 + case b >= 'A' && b <= 'F': // A-F + parsedByte += b - 55 + default: + return fmt.Errorf( + "illegal escape sequence at byte %d (%c)", + i, b, + ) + } + result.WriteByte(parsedByte) + escapeLevel = 0 + default: + panic("unexpected escape level") + } + } + *tv = TagValue(result.String()) + return nil +} diff --git a/storage/remote/opentsdb/tagvalue_test.go b/storage/remote/opentsdb/tagvalue_test.go new file mode 100644 index 000000000..821c3565c --- /dev/null +++ b/storage/remote/opentsdb/tagvalue_test.go @@ -0,0 +1,51 @@ +package opentsdb + +import ( + "bytes" + "encoding/json" + "testing" +) + +var stringtests = []struct { + tv TagValue + json []byte +}{ + {TagValue("foo-bar-42"), []byte(`"foo-bar-42"`)}, + {TagValue("foo_bar_42"), []byte(`"foo__bar__42"`)}, + {TagValue("http://example.org:8080"), []byte(`"http_.//example.org_.8080"`)}, + {TagValue("Björn's email: bjoern@soundcloud.com"), []byte(`"Bj_C3_B6rn_27s_20email_._20bjoern_40soundcloud.com"`)}, + {TagValue("日"), []byte(`"_E6_97_A5"`)}, +} + +func TestTagValueMarshaling(t *testing.T) { + for i, tt := range stringtests { + json, err := json.Marshal(tt.tv) + if err != nil { + t.Errorf("%d. Marshal(%q) returned err: %s", i, tt.tv, err) + } else { + if !bytes.Equal(json, tt.json) { + t.Errorf( + "%d. Marshal(%q) => %q, want %q", + i, tt.tv, json, tt.json, + ) + } + } + } +} + +func TestTagValueUnMarshaling(t *testing.T) { + for i, tt := range stringtests { + var tv TagValue + err := json.Unmarshal(tt.json, &tv) + if err != nil { + t.Errorf("%d. Unmarshal(%q, &str) returned err: %s", i, tt.json, err) + } else { + if tv != tt.tv { + t.Errorf( + "%d. Unmarshal(%q, &str) => str==%q, want %q", + i, tt.json, tv, tt.tv, + ) + } + } + } +}