diff --git a/documentation/examples/remote_storage/remote_storage_bridge/README.md b/documentation/examples/remote_storage/remote_storage_bridge/README.md index ad194c716..4c48a7ddf 100644 --- a/documentation/examples/remote_storage/remote_storage_bridge/README.md +++ b/documentation/examples/remote_storage/remote_storage_bridge/README.md @@ -1,10 +1,13 @@ # Remote storage bridge -This is a bridge that receives samples in Prometheus's remote storage -format and forwards them to Graphite, InfluxDB, or OpenTSDB. It is meant +This is a bridge that receives samples via Prometheus's remote write +protocol and stores them in Graphite, InfluxDB, or OpenTSDB. It is meant as a replacement for the built-in specific remote storage implementations that have been removed from Prometheus. +For InfluxDB, this bridge also supports reading back data through +Prometheus via Prometheus's remote read protocol. + ## Building ``` @@ -13,10 +16,22 @@ go build ## Running -Example: +Graphite example: ``` -./remote_storage_bridge -graphite-address=localhost:8080 -opentsdb-url=http://localhost:8081/ +./remote_storage_bridge -graphite-address=localhost:8080 +``` + +OpenTSDB example: + +``` +./remote_storage_bridge -opentsdb-url=http://localhost:8081/ +``` + +InfluxDB example: + +``` +./remote_storage_bridge -influxdb-url=http://localhost:8086/ -influxdb.database=prometheus -influxdb.retention-policy=autogen ``` To show all flags: @@ -30,6 +45,11 @@ To show all flags: To configure Prometheus to send samples to this bridge, add the following to your `prometheus.yml`: ```yaml +# Remote write configuration (for Graphite, OpenTSDB, or InfluxDB). remote_write: - url: "http://localhost:9201/receive" + - url: "http://localhost:9201/write" + +# Remote read configuration (for InfluxDB only at the moment). +remote_read: + - url: "http://localhost:9201/read" ``` \ No newline at end of file diff --git a/documentation/examples/remote_storage/remote_storage_bridge/graphite/client.go b/documentation/examples/remote_storage/remote_storage_bridge/graphite/client.go index 8ce28c4a5..45373cf19 100644 --- a/documentation/examples/remote_storage/remote_storage_bridge/graphite/client.go +++ b/documentation/examples/remote_storage/remote_storage_bridge/graphite/client.go @@ -72,8 +72,8 @@ func pathFromMetric(m model.Metric, prefix string) string { return buffer.String() } -// Store sends a batch of samples to Graphite. -func (c *Client) Store(samples model.Samples) error { +// Write sends a batch of samples to Graphite. +func (c *Client) Write(samples model.Samples) error { conn, err := net.DialTimeout(c.transport, c.address, c.timeout) if err != nil { return err diff --git a/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client.go b/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client.go index ae1fa4d63..26b5d56bd 100644 --- a/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client.go +++ b/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client.go @@ -14,26 +14,30 @@ package influxdb import ( + "encoding/json" + "fmt" "math" + "strings" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/remote" - influx "github.com/influxdb/influxdb/client" + influx "github.com/influxdata/influxdb/client/v2" ) // Client allows sending batches of Prometheus samples to InfluxDB. type Client struct { - client *influx.Client + client influx.Client database string retentionPolicy string ignoredSamples prometheus.Counter } // NewClient creates a new Client. -func NewClient(conf influx.Config, db string, rp string) *Client { - c, err := influx.NewClient(conf) +func NewClient(conf influx.HTTPConfig, db string, rp string) *Client { + c, err := influx.NewHTTPClient(conf) // Currently influx.NewClient() *should* never return an error. if err != nil { log.Fatal(err) @@ -63,9 +67,9 @@ func tagsFromMetric(m model.Metric) map[string]string { return tags } -// Store sends a batch of samples to InfluxDB via its HTTP API. -func (c *Client) Store(samples model.Samples) error { - points := make([]influx.Point, 0, len(samples)) +// Write sends a batch of samples to InfluxDB via its HTTP API. +func (c *Client) Write(samples model.Samples) error { + points := make([]*influx.Point, 0, len(samples)) for _, s := range samples { v := float64(s.Value) if math.IsNaN(v) || math.IsInf(v, 0) { @@ -73,24 +77,221 @@ func (c *Client) Store(samples model.Samples) error { c.ignoredSamples.Inc() continue } - points = append(points, influx.Point{ - Measurement: string(s.Metric[model.MetricNameLabel]), - Tags: tagsFromMetric(s.Metric), - Time: s.Timestamp.Time(), - Precision: "ms", - Fields: map[string]interface{}{ - "value": v, - }, - }) + p, err := influx.NewPoint( + string(s.Metric[model.MetricNameLabel]), + tagsFromMetric(s.Metric), + map[string]interface{}{"value": v}, + s.Timestamp.Time(), + ) + if err != nil { + return err + } + points = append(points, p) } - bps := influx.BatchPoints{ - Points: points, + bps, err := influx.NewBatchPoints(influx.BatchPointsConfig{ + Precision: "ms", Database: c.database, RetentionPolicy: c.retentionPolicy, + }) + if err != nil { + return err } - _, err := c.client.Write(bps) - return err + bps.AddPoints(points) + return c.client.Write(bps) +} + +func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) { + labelsToSeries := map[string]*remote.TimeSeries{} + for _, q := range req.Queries { + command, err := buildCommand(q) + if err != nil { + return nil, err + } + + query := influx.NewQuery(command, c.database, "ms") + resp, err := c.client.Query(query) + if err != nil { + return nil, err + } + if resp.Err != "" { + return nil, fmt.Errorf(resp.Err) + } + + if err = mergeResult(labelsToSeries, resp.Results); err != nil { + return nil, err + } + } + + resp := remote.ReadResponse{ + Timeseries: make([]*remote.TimeSeries, 0, len(labelsToSeries)), + } + for _, ts := range labelsToSeries { + resp.Timeseries = append(resp.Timeseries, ts) + } + return &resp, nil +} + +func buildCommand(q *remote.Query) (string, error) { + matchers := make([]string, 0, len(q.Matchers)) + // If we don't find a metric name matcher, query all metrics + // (InfluxDB measurements) by default. + from := "FROM /.+/" + for _, m := range q.Matchers { + if m.Name == model.MetricNameLabel { + switch m.Type { + case remote.MatchType_EQUAL: + from = fmt.Sprintf("FROM %q", m.Value) + case remote.MatchType_REGEX_MATCH: + from = fmt.Sprintf("FROM /^%s$/", escapeSlashes(m.Value)) + default: + // TODO: Figure out how to support these efficiently. + return "", fmt.Errorf("non-equal or regex-non-equal matchers are not supported on the metric name yet") + } + continue + } + + switch m.Type { + case remote.MatchType_EQUAL: + matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value))) + case remote.MatchType_NOT_EQUAL: + matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value))) + case remote.MatchType_REGEX_MATCH: + matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value))) + case remote.MatchType_REGEX_NO_MATCH: + matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value))) + default: + return "", fmt.Errorf("unknown match type %v", m.Type) + } + } + matchers = append(matchers, fmt.Sprintf("time >= %vms", q.StartTimestampMs)) + matchers = append(matchers, fmt.Sprintf("time <= %vms", q.EndTimestampMs)) + + return fmt.Sprintf("SELECT value %s WHERE %v GROUP BY *", from, strings.Join(matchers, " AND ")), nil +} + +func escapeSingleQuotes(str string) string { + return strings.Replace(str, `'`, `\'`, -1) +} + +func escapeSlashes(str string) string { + return strings.Replace(str, `/`, `\/`, -1) +} + +func mergeResult(labelsToSeries map[string]*remote.TimeSeries, results []influx.Result) error { + for _, r := range results { + for _, s := range r.Series { + k := concatLabels(s.Tags) + ts, ok := labelsToSeries[k] + if !ok { + ts = &remote.TimeSeries{ + Labels: tagsToLabelPairs(s.Name, s.Tags), + } + labelsToSeries[k] = ts + } + + samples, err := valuesToSamples(s.Values) + if err != nil { + return err + } + + ts.Samples = mergeSamples(ts.Samples, samples) + } + } + return nil +} + +func concatLabels(labels map[string]string) string { + // 0xff cannot cannot occur in valid UTF-8 sequences, so use it + // as a separator here. + separator := "\xff" + pairs := make([]string, 0, len(labels)) + for k, v := range labels { + pairs = append(pairs, k+separator+v) + } + return strings.Join(pairs, separator) +} + +func tagsToLabelPairs(name string, tags map[string]string) []*remote.LabelPair { + pairs := make([]*remote.LabelPair, 0, len(tags)) + for k, v := range tags { + if v == "" { + // If we select metrics with different sets of labels names, + // InfluxDB returns *all* possible tag names on all returned + // series, with empty tag values on series where they don't + // apply. In Prometheus, an empty label value is equivalent + // to a non-existent label, so we just skip empty ones here + // to make the result correct. + continue + } + pairs = append(pairs, &remote.LabelPair{ + Name: k, + Value: v, + }) + } + pairs = append(pairs, &remote.LabelPair{ + Name: model.MetricNameLabel, + Value: name, + }) + return pairs +} + +func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) { + samples := make([]*remote.Sample, 0, len(values)) + for _, v := range values { + if len(v) != 2 { + return nil, fmt.Errorf("bad sample tuple length, expected [, ], got %v", v) + } + + jsonTimestamp, ok := v[0].(json.Number) + if !ok { + return nil, fmt.Errorf("bad timestamp: %v", v[0]) + } + + jsonValue, ok := v[1].(json.Number) + if !ok { + return nil, fmt.Errorf("bad sample value: %v", v[1]) + } + + timestamp, err := jsonTimestamp.Int64() + if err != nil { + return nil, fmt.Errorf("unable to convert sample timestamp to int64: %v", err) + } + + value, err := jsonValue.Float64() + if err != nil { + return nil, fmt.Errorf("unable to convert sample value to float64: %v", err) + } + + samples = append(samples, &remote.Sample{ + TimestampMs: timestamp, + Value: value, + }) + } + return samples, nil +} + +// mergeSamples merges two lists of sample pairs and removes duplicate +// timestamps. It assumes that both lists are sorted by timestamp. +func mergeSamples(a, b []*remote.Sample) []*remote.Sample { + result := make([]*remote.Sample, 0, len(a)+len(b)) + i, j := 0, 0 + for i < len(a) && j < len(b) { + if a[i].TimestampMs < b[j].TimestampMs { + result = append(result, a[i]) + i++ + } else if a[i].TimestampMs > b[j].TimestampMs { + result = append(result, b[j]) + j++ + } else { + result = append(result, a[i]) + i++ + j++ + } + } + result = append(result, a[i:]...) + result = append(result, b[j:]...) + return result } // Name identifies the client as an InfluxDB client. diff --git a/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client_test.go b/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client_test.go index 38a5e1a68..b3567fb75 100644 --- a/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client_test.go +++ b/documentation/examples/remote_storage/remote_storage_bridge/influxdb/client_test.go @@ -22,7 +22,7 @@ import ( "testing" "time" - influx "github.com/influxdb/influxdb/client" + influx "github.com/influxdata/influxdb/client/v2" "github.com/prometheus/common/model" ) @@ -68,8 +68,8 @@ func TestClient(t *testing.T) { }, } - expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123000000 -testmetric,test_label=test_label_value2 value=5.1234 123456789123000000 + expectedBody := `testmetric,test_label=test_label_value1 value=1.23 123456789123 +testmetric,test_label=test_label_value2 value=5.1234 123456789123 ` server := httptest.NewServer(http.HandlerFunc( @@ -97,15 +97,15 @@ testmetric,test_label=test_label_value2 value=5.1234 123456789123000000 t.Fatalf("Unable to parse server URL %s: %s", server.URL, err) } - conf := influx.Config{ - URL: *serverURL, + conf := influx.HTTPConfig{ + Addr: serverURL.String(), Username: "testuser", Password: "testpass", Timeout: time.Minute, } c := NewClient(conf, "test_db", "default") - if err := c.Store(samples); err != nil { + if err := c.Write(samples); err != nil { t.Fatalf("Error sending samples: %s", err) } } diff --git a/documentation/examples/remote_storage/remote_storage_bridge/main.go b/documentation/examples/remote_storage/remote_storage_bridge/main.go index d6e7fcb45..7603511bc 100644 --- a/documentation/examples/remote_storage/remote_storage_bridge/main.go +++ b/documentation/examples/remote_storage/remote_storage_bridge/main.go @@ -16,6 +16,7 @@ package main import ( "flag" + "fmt" "io/ioutil" "net/http" _ "net/http/pprof" @@ -30,7 +31,7 @@ import ( "github.com/prometheus/common/log" "github.com/prometheus/common/model" - influx "github.com/influxdb/influxdb/client" + influx "github.com/influxdata/influxdb/client/v2" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/graphite" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_bridge/influxdb" @@ -95,8 +96,8 @@ func main() { cfg := parseFlags() http.Handle(cfg.telemetryPath, prometheus.Handler()) - clients := buildClients(cfg) - serve(cfg.listenAddr, clients) + writers, readers := buildClients(cfg) + serve(cfg.listenAddr, writers, readers) } func parseFlags() *config { @@ -119,7 +120,7 @@ func parseFlags() *config { flag.StringVar(&cfg.influxdbURL, "influxdb-url", "", "The URL of the remote InfluxDB server to send samples to. None, if empty.", ) - flag.StringVar(&cfg.influxdbRetentionPolicy, "influxdb.retention-policy", "default", + flag.StringVar(&cfg.influxdbRetentionPolicy, "influxdb.retention-policy", "autogen", "The InfluxDB retention policy to use.", ) flag.StringVar(&cfg.influxdbUsername, "influxdb.username", "", @@ -139,38 +140,50 @@ func parseFlags() *config { return cfg } -func buildClients(cfg *config) []remote.StorageClient { - var clients []remote.StorageClient +type writer interface { + Write(samples model.Samples) error + Name() string +} + +type reader interface { + Read(req *remote.ReadRequest) (*remote.ReadResponse, error) + Name() string +} + +func buildClients(cfg *config) ([]writer, []reader) { + var writers []writer + var readers []reader if cfg.graphiteAddress != "" { c := graphite.NewClient( cfg.graphiteAddress, cfg.graphiteTransport, cfg.remoteTimeout, cfg.graphitePrefix) - clients = append(clients, c) + writers = append(writers, c) } if cfg.opentsdbURL != "" { c := opentsdb.NewClient(cfg.opentsdbURL, cfg.remoteTimeout) - clients = append(clients, c) + writers = append(writers, c) } if cfg.influxdbURL != "" { url, err := url.Parse(cfg.influxdbURL) if err != nil { log.Fatalf("Failed to parse InfluxDB URL %q: %v", cfg.influxdbURL, err) } - conf := influx.Config{ - URL: *url, + conf := influx.HTTPConfig{ + Addr: url.String(), Username: cfg.influxdbUsername, Password: cfg.influxdbPassword, Timeout: cfg.remoteTimeout, } c := influxdb.NewClient(conf, cfg.influxdbDatabase, cfg.influxdbRetentionPolicy) prometheus.MustRegister(c) - clients = append(clients, c) + writers = append(writers, c) + readers = append(readers, c) } - return clients + return writers, readers } -func serve(addr string, clients []remote.StorageClient) error { - http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) { +func serve(addr string, writers []writer, readers []reader) error { + http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) { reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body)) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -187,16 +200,57 @@ func serve(addr string, clients []remote.StorageClient) error { receivedSamples.Add(float64(len(samples))) var wg sync.WaitGroup - for _, c := range clients { + for _, w := range writers { wg.Add(1) - go func(rc remote.StorageClient) { - sendSamples(rc, samples) + go func(rw writer) { + sendSamples(rw, samples) wg.Done() - }(c) + }(w) } wg.Wait() }) + http.HandleFunc("/read", func(w http.ResponseWriter, r *http.Request) { + reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body)) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + var req remote.ReadRequest + if err := proto.Unmarshal(reqBuf, &req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // TODO: Support reading from more than one reader and merging the results. + if len(readers) != 1 { + http.Error(w, fmt.Sprintf("expected exactly one reader, found %d readers", len(readers)), http.StatusInternalServerError) + return + } + reader := readers[0] + + var resp *remote.ReadResponse + resp, err = reader.Read(&req) + if err != nil { + log.With("query", req).With("storage", reader.Name()).With("err", err).Warnf("Error executing query") + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + data, err := proto.Marshal(resp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/x-protobuf") + if _, err := snappy.NewWriter(w).Write(data); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + }) + return http.ListenAndServe(addr, nil) } @@ -219,14 +273,14 @@ func protoToSamples(req *remote.WriteRequest) model.Samples { return samples } -func sendSamples(c remote.StorageClient, samples model.Samples) { +func sendSamples(w writer, samples model.Samples) { begin := time.Now() - err := c.Store(samples) + err := w.Write(samples) duration := time.Since(begin).Seconds() if err != nil { - log.Warnf("Error sending %d samples to remote storage %q: %v", len(samples), c.Name(), err) - failedSamples.WithLabelValues(c.Name()).Add(float64(len(samples))) + log.With("num_samples", len(samples)).With("storage", w.Name()).With("err", err).Warnf("Error sending samples to remote storage") + failedSamples.WithLabelValues(w.Name()).Add(float64(len(samples))) } - sentSamples.WithLabelValues(c.Name()).Add(float64(len(samples))) - sentBatchDuration.WithLabelValues(c.Name()).Observe(duration) + sentSamples.WithLabelValues(w.Name()).Add(float64(len(samples))) + sentBatchDuration.WithLabelValues(w.Name()).Observe(duration) } diff --git a/documentation/examples/remote_storage/remote_storage_bridge/opentsdb/client.go b/documentation/examples/remote_storage/remote_storage_bridge/opentsdb/client.go index fe8b79e77..4b6c0e6f4 100644 --- a/documentation/examples/remote_storage/remote_storage_bridge/opentsdb/client.go +++ b/documentation/examples/remote_storage/remote_storage_bridge/opentsdb/client.go @@ -69,8 +69,8 @@ func tagsFromMetric(m model.Metric) map[string]TagValue { return tags } -// Store sends a batch of samples to OpenTSDB via its HTTP API. -func (c *Client) Store(samples model.Samples) error { +// Write sends a batch of samples to OpenTSDB via its HTTP API. +func (c *Client) Write(samples model.Samples) error { reqs := make([]StoreSamplesRequest, 0, len(samples)) for _, s := range samples { v := float64(s.Value) diff --git a/vendor/github.com/influxdata/influxdb/client/v2/client.go b/vendor/github.com/influxdata/influxdb/client/v2/client.go new file mode 100644 index 000000000..6a72e8d15 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/client/v2/client.go @@ -0,0 +1,609 @@ +// Package client (v2) is the current official Go client for InfluxDB. +package client // import "github.com/influxdata/influxdb/client/v2" + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/influxdata/influxdb/models" +) + +// HTTPConfig is the config data needed to create an HTTP Client. +type HTTPConfig struct { + // Addr should be of the form "http://host:port" + // or "http://[ipv6-host%zone]:port". + Addr string + + // Username is the influxdb username, optional. + Username string + + // Password is the influxdb password, optional. + Password string + + // UserAgent is the http User Agent, defaults to "InfluxDBClient". + UserAgent string + + // Timeout for influxdb writes, defaults to no timeout. + Timeout time.Duration + + // InsecureSkipVerify gets passed to the http client, if true, it will + // skip https certificate verification. Defaults to false. + InsecureSkipVerify bool + + // TLSConfig allows the user to set their own TLS config for the HTTP + // Client. If set, this option overrides InsecureSkipVerify. + TLSConfig *tls.Config +} + +// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct. +type BatchPointsConfig struct { + // Precision is the write precision of the points, defaults to "ns". + Precision string + + // Database is the database to write points to. + Database string + + // RetentionPolicy is the retention policy of the points. + RetentionPolicy string + + // Write consistency is the number of servers required to confirm write. + WriteConsistency string +} + +// Client is a client interface for writing & querying the database. +type Client interface { + // Ping checks that status of cluster, and will always return 0 time and no + // error for UDP clients. + Ping(timeout time.Duration) (time.Duration, string, error) + + // Write takes a BatchPoints object and writes all Points to InfluxDB. + Write(bp BatchPoints) error + + // Query makes an InfluxDB Query on the database. This will fail if using + // the UDP client. + Query(q Query) (*Response, error) + + // Close releases any resources a Client may be using. + Close() error +} + +// NewHTTPClient returns a new Client from the provided config. +// Client is safe for concurrent use by multiple goroutines. +func NewHTTPClient(conf HTTPConfig) (Client, error) { + if conf.UserAgent == "" { + conf.UserAgent = "InfluxDBClient" + } + + u, err := url.Parse(conf.Addr) + if err != nil { + return nil, err + } else if u.Scheme != "http" && u.Scheme != "https" { + m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+ + " must start with http:// or https://", u.Scheme) + return nil, errors.New(m) + } + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: conf.InsecureSkipVerify, + }, + } + if conf.TLSConfig != nil { + tr.TLSClientConfig = conf.TLSConfig + } + return &client{ + url: *u, + username: conf.Username, + password: conf.Password, + useragent: conf.UserAgent, + httpClient: &http.Client{ + Timeout: conf.Timeout, + Transport: tr, + }, + transport: tr, + }, nil +} + +// Ping will check to see if the server is up with an optional timeout on waiting for leader. +// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred. +func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) { + now := time.Now() + u := c.url + u.Path = "ping" + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return 0, "", err + } + + req.Header.Set("User-Agent", c.useragent) + + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + + if timeout > 0 { + params := req.URL.Query() + params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds())) + req.URL.RawQuery = params.Encode() + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return 0, "", err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return 0, "", err + } + + if resp.StatusCode != http.StatusNoContent { + var err = fmt.Errorf(string(body)) + return 0, "", err + } + + version := resp.Header.Get("X-Influxdb-Version") + return time.Since(now), version, nil +} + +// Close releases the client's resources. +func (c *client) Close() error { + c.transport.CloseIdleConnections() + return nil +} + +// client is safe for concurrent use as the fields are all read-only +// once the client is instantiated. +type client struct { + // N.B - if url.UserInfo is accessed in future modifications to the + // methods on client, you will need to syncronise access to url. + url url.URL + username string + password string + useragent string + httpClient *http.Client + transport *http.Transport +} + +// BatchPoints is an interface into a batched grouping of points to write into +// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate +// batch for each goroutine. +type BatchPoints interface { + // AddPoint adds the given point to the Batch of points. + AddPoint(p *Point) + // AddPoints adds the given points to the Batch of points. + AddPoints(ps []*Point) + // Points lists the points in the Batch. + Points() []*Point + + // Precision returns the currently set precision of this Batch. + Precision() string + // SetPrecision sets the precision of this batch. + SetPrecision(s string) error + + // Database returns the currently set database of this Batch. + Database() string + // SetDatabase sets the database of this Batch. + SetDatabase(s string) + + // WriteConsistency returns the currently set write consistency of this Batch. + WriteConsistency() string + // SetWriteConsistency sets the write consistency of this Batch. + SetWriteConsistency(s string) + + // RetentionPolicy returns the currently set retention policy of this Batch. + RetentionPolicy() string + // SetRetentionPolicy sets the retention policy of this Batch. + SetRetentionPolicy(s string) +} + +// NewBatchPoints returns a BatchPoints interface based on the given config. +func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) { + if conf.Precision == "" { + conf.Precision = "ns" + } + if _, err := time.ParseDuration("1" + conf.Precision); err != nil { + return nil, err + } + bp := &batchpoints{ + database: conf.Database, + precision: conf.Precision, + retentionPolicy: conf.RetentionPolicy, + writeConsistency: conf.WriteConsistency, + } + return bp, nil +} + +type batchpoints struct { + points []*Point + database string + precision string + retentionPolicy string + writeConsistency string +} + +func (bp *batchpoints) AddPoint(p *Point) { + bp.points = append(bp.points, p) +} + +func (bp *batchpoints) AddPoints(ps []*Point) { + bp.points = append(bp.points, ps...) +} + +func (bp *batchpoints) Points() []*Point { + return bp.points +} + +func (bp *batchpoints) Precision() string { + return bp.precision +} + +func (bp *batchpoints) Database() string { + return bp.database +} + +func (bp *batchpoints) WriteConsistency() string { + return bp.writeConsistency +} + +func (bp *batchpoints) RetentionPolicy() string { + return bp.retentionPolicy +} + +func (bp *batchpoints) SetPrecision(p string) error { + if _, err := time.ParseDuration("1" + p); err != nil { + return err + } + bp.precision = p + return nil +} + +func (bp *batchpoints) SetDatabase(db string) { + bp.database = db +} + +func (bp *batchpoints) SetWriteConsistency(wc string) { + bp.writeConsistency = wc +} + +func (bp *batchpoints) SetRetentionPolicy(rp string) { + bp.retentionPolicy = rp +} + +// Point represents a single data point. +type Point struct { + pt models.Point +} + +// NewPoint returns a point with the given timestamp. If a timestamp is not +// given, then data is sent to the database without a timestamp, in which case +// the server will assign local time upon reception. NOTE: it is recommended to +// send data with a timestamp. +func NewPoint( + name string, + tags map[string]string, + fields map[string]interface{}, + t ...time.Time, +) (*Point, error) { + var T time.Time + if len(t) > 0 { + T = t[0] + } + + pt, err := models.NewPoint(name, models.NewTags(tags), fields, T) + if err != nil { + return nil, err + } + return &Point{ + pt: pt, + }, nil +} + +// String returns a line-protocol string of the Point. +func (p *Point) String() string { + return p.pt.String() +} + +// PrecisionString returns a line-protocol string of the Point, +// with the timestamp formatted for the given precision. +func (p *Point) PrecisionString(precison string) string { + return p.pt.PrecisionString(precison) +} + +// Name returns the measurement name of the point. +func (p *Point) Name() string { + return p.pt.Name() +} + +// Tags returns the tags associated with the point. +func (p *Point) Tags() map[string]string { + return p.pt.Tags().Map() +} + +// Time return the timestamp for the point. +func (p *Point) Time() time.Time { + return p.pt.Time() +} + +// UnixNano returns timestamp of the point in nanoseconds since Unix epoch. +func (p *Point) UnixNano() int64 { + return p.pt.UnixNano() +} + +// Fields returns the fields for the point. +func (p *Point) Fields() (map[string]interface{}, error) { + return p.pt.Fields() +} + +// NewPointFrom returns a point from the provided models.Point. +func NewPointFrom(pt models.Point) *Point { + return &Point{pt: pt} +} + +func (c *client) Write(bp BatchPoints) error { + var b bytes.Buffer + + for _, p := range bp.Points() { + if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil { + return err + } + + if err := b.WriteByte('\n'); err != nil { + return err + } + } + + u := c.url + u.Path = "write" + req, err := http.NewRequest("POST", u.String(), &b) + if err != nil { + return err + } + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.useragent) + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + + params := req.URL.Query() + params.Set("db", bp.Database()) + params.Set("rp", bp.RetentionPolicy()) + params.Set("precision", bp.Precision()) + params.Set("consistency", bp.WriteConsistency()) + req.URL.RawQuery = params.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { + var err = fmt.Errorf(string(body)) + return err + } + + return nil +} + +// Query defines a query to send to the server. +type Query struct { + Command string + Database string + Precision string + Chunked bool + ChunkSize int + Parameters map[string]interface{} +} + +// NewQuery returns a query object. +// The database and precision arguments can be empty strings if they are not needed for the query. +func NewQuery(command, database, precision string) Query { + return Query{ + Command: command, + Database: database, + Precision: precision, + Parameters: make(map[string]interface{}), + } +} + +// NewQueryWithParameters returns a query object. +// The database and precision arguments can be empty strings if they are not needed for the query. +// parameters is a map of the parameter names used in the command to their values. +func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query { + return Query{ + Command: command, + Database: database, + Precision: precision, + Parameters: parameters, + } +} + +// Response represents a list of statement results. +type Response struct { + Results []Result + Err string `json:"error,omitempty"` +} + +// Error returns the first error from any statement. +// It returns nil if no errors occurred on any statements. +func (r *Response) Error() error { + if r.Err != "" { + return fmt.Errorf(r.Err) + } + for _, result := range r.Results { + if result.Err != "" { + return fmt.Errorf(result.Err) + } + } + return nil +} + +// Message represents a user message. +type Message struct { + Level string + Text string +} + +// Result represents a resultset returned from a single statement. +type Result struct { + Series []models.Row + Messages []*Message + Err string `json:"error,omitempty"` +} + +// Query sends a command to the server and returns the Response. +func (c *client) Query(q Query) (*Response, error) { + u := c.url + u.Path = "query" + + jsonParameters, err := json.Marshal(q.Parameters) + + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", u.String(), nil) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "") + req.Header.Set("User-Agent", c.useragent) + + if c.username != "" { + req.SetBasicAuth(c.username, c.password) + } + + params := req.URL.Query() + params.Set("q", q.Command) + params.Set("db", q.Database) + params.Set("params", string(jsonParameters)) + if q.Chunked { + params.Set("chunked", "true") + if q.ChunkSize > 0 { + params.Set("chunk_size", strconv.Itoa(q.ChunkSize)) + } + } + + if q.Precision != "" { + params.Set("epoch", q.Precision) + } + req.URL.RawQuery = params.Encode() + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + var response Response + if q.Chunked { + cr := NewChunkedResponse(resp.Body) + for { + r, err := cr.NextResponse() + if err != nil { + // If we got an error while decoding the response, send that back. + return nil, err + } + + if r == nil { + break + } + + response.Results = append(response.Results, r.Results...) + if r.Err != "" { + response.Err = r.Err + break + } + } + } else { + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + decErr := dec.Decode(&response) + + // ignore this error if we got an invalid status code + if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK { + decErr = nil + } + // If we got a valid decode error, send that back + if decErr != nil { + return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr) + } + } + // If we don't have an error in our json response, and didn't get statusOK + // then send back an error + if resp.StatusCode != http.StatusOK && response.Error() == nil { + return &response, fmt.Errorf("received status code %d from server", + resp.StatusCode) + } + return &response, nil +} + +// duplexReader reads responses and writes it to another writer while +// satisfying the reader interface. +type duplexReader struct { + r io.Reader + w io.Writer +} + +func (r *duplexReader) Read(p []byte) (n int, err error) { + n, err = r.r.Read(p) + if err == nil { + r.w.Write(p[:n]) + } + return n, err +} + +// ChunkedResponse represents a response from the server that +// uses chunking to stream the output. +type ChunkedResponse struct { + dec *json.Decoder + duplex *duplexReader + buf bytes.Buffer +} + +// NewChunkedResponse reads a stream and produces responses from the stream. +func NewChunkedResponse(r io.Reader) *ChunkedResponse { + resp := &ChunkedResponse{} + resp.duplex = &duplexReader{r: r, w: &resp.buf} + resp.dec = json.NewDecoder(resp.duplex) + resp.dec.UseNumber() + return resp +} + +// NextResponse reads the next line of the stream and returns a response. +func (r *ChunkedResponse) NextResponse() (*Response, error) { + var response Response + + if err := r.dec.Decode(&response); err != nil { + if err == io.EOF { + return nil, nil + } + // A decoding error happened. This probably means the server crashed + // and sent a last-ditch error message to us. Ensure we have read the + // entirety of the connection to get any remaining error text. + io.Copy(ioutil.Discard, r.duplex) + return nil, errors.New(strings.TrimSpace(r.buf.String())) + } + + r.buf.Reset() + return &response, nil +} diff --git a/vendor/github.com/influxdata/influxdb/client/v2/udp.go b/vendor/github.com/influxdata/influxdb/client/v2/udp.go new file mode 100644 index 000000000..779a28b33 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/client/v2/udp.go @@ -0,0 +1,112 @@ +package client + +import ( + "fmt" + "io" + "net" + "time" +) + +const ( + // UDPPayloadSize is a reasonable default payload size for UDP packets that + // could be travelling over the internet. + UDPPayloadSize = 512 +) + +// UDPConfig is the config data needed to create a UDP Client. +type UDPConfig struct { + // Addr should be of the form "host:port" + // or "[ipv6-host%zone]:port". + Addr string + + // PayloadSize is the maximum size of a UDP client message, optional + // Tune this based on your network. Defaults to UDPPayloadSize. + PayloadSize int +} + +// NewUDPClient returns a client interface for writing to an InfluxDB UDP +// service from the given config. +func NewUDPClient(conf UDPConfig) (Client, error) { + var udpAddr *net.UDPAddr + udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr) + if err != nil { + return nil, err + } + + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + return nil, err + } + + payloadSize := conf.PayloadSize + if payloadSize == 0 { + payloadSize = UDPPayloadSize + } + + return &udpclient{ + conn: conn, + payloadSize: payloadSize, + }, nil +} + +// Close releases the udpclient's resources. +func (uc *udpclient) Close() error { + return uc.conn.Close() +} + +type udpclient struct { + conn io.WriteCloser + payloadSize int +} + +func (uc *udpclient) Write(bp BatchPoints) error { + var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed + var d, _ = time.ParseDuration("1" + bp.Precision()) + + var delayedError error + + var checkBuffer = func(n int) { + if len(b) > 0 && len(b)+n > uc.payloadSize { + if _, err := uc.conn.Write(b); err != nil { + delayedError = err + } + b = b[:0] + } + } + + for _, p := range bp.Points() { + p.pt.Round(d) + pointSize := p.pt.StringSize() + 1 // include newline in size + //point := p.pt.RoundedString(d) + "\n" + + checkBuffer(pointSize) + + if p.Time().IsZero() || pointSize <= uc.payloadSize { + b = p.pt.AppendString(b) + b = append(b, '\n') + continue + } + + points := p.pt.Split(uc.payloadSize - 1) // account for newline character + for _, sp := range points { + checkBuffer(sp.StringSize() + 1) + b = sp.AppendString(b) + b = append(b, '\n') + } + } + + if len(b) > 0 { + if _, err := uc.conn.Write(b); err != nil { + return err + } + } + return delayedError +} + +func (uc *udpclient) Query(q Query) (*Response, error) { + return nil, fmt.Errorf("Querying via UDP is not supported") +} + +func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) { + return 0, "", nil +} diff --git a/vendor/github.com/influxdata/influxdb/models/consistency.go b/vendor/github.com/influxdata/influxdb/models/consistency.go new file mode 100644 index 000000000..2a3269bca --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/models/consistency.go @@ -0,0 +1,48 @@ +package models + +import ( + "errors" + "strings" +) + +// ConsistencyLevel represent a required replication criteria before a write can +// be returned as successful. +// +// The consistency level is handled in open-source InfluxDB but only applicable to clusters. +type ConsistencyLevel int + +const ( + // ConsistencyLevelAny allows for hinted handoff, potentially no write happened yet. + ConsistencyLevelAny ConsistencyLevel = iota + + // ConsistencyLevelOne requires at least one data node acknowledged a write. + ConsistencyLevelOne + + // ConsistencyLevelQuorum requires a quorum of data nodes to acknowledge a write. + ConsistencyLevelQuorum + + // ConsistencyLevelAll requires all data nodes to acknowledge a write. + ConsistencyLevelAll +) + +var ( + // ErrInvalidConsistencyLevel is returned when parsing the string version + // of a consistency level. + ErrInvalidConsistencyLevel = errors.New("invalid consistency level") +) + +// ParseConsistencyLevel converts a consistency level string to the corresponding ConsistencyLevel const. +func ParseConsistencyLevel(level string) (ConsistencyLevel, error) { + switch strings.ToLower(level) { + case "any": + return ConsistencyLevelAny, nil + case "one": + return ConsistencyLevelOne, nil + case "quorum": + return ConsistencyLevelQuorum, nil + case "all": + return ConsistencyLevelAll, nil + default: + return 0, ErrInvalidConsistencyLevel + } +} diff --git a/vendor/github.com/influxdata/influxdb/models/inline_fnv.go b/vendor/github.com/influxdata/influxdb/models/inline_fnv.go new file mode 100644 index 000000000..eec1ae8b0 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/models/inline_fnv.go @@ -0,0 +1,32 @@ +package models // import "github.com/influxdata/influxdb/models" + +// from stdlib hash/fnv/fnv.go +const ( + prime64 = 1099511628211 + offset64 = 14695981039346656037 +) + +// InlineFNV64a is an alloc-free port of the standard library's fnv64a. +// See https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function. +type InlineFNV64a uint64 + +// NewInlineFNV64a returns a new instance of InlineFNV64a. +func NewInlineFNV64a() InlineFNV64a { + return offset64 +} + +// Write adds data to the running hash. +func (s *InlineFNV64a) Write(data []byte) (int, error) { + hash := uint64(*s) + for _, c := range data { + hash ^= uint64(c) + hash *= prime64 + } + *s = InlineFNV64a(hash) + return len(data), nil +} + +// Sum64 returns the uint64 of the current resulting hash. +func (s *InlineFNV64a) Sum64() uint64 { + return uint64(*s) +} diff --git a/vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go b/vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go new file mode 100644 index 000000000..dcc8ae402 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/models/inline_strconv_parse.go @@ -0,0 +1,38 @@ +package models // import "github.com/influxdata/influxdb/models" + +import ( + "reflect" + "strconv" + "unsafe" +) + +// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt. +func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) { + s := unsafeBytesToString(b) + return strconv.ParseInt(s, base, bitSize) +} + +// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat. +func parseFloatBytes(b []byte, bitSize int) (float64, error) { + s := unsafeBytesToString(b) + return strconv.ParseFloat(s, bitSize) +} + +// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool. +func parseBoolBytes(b []byte) (bool, error) { + return strconv.ParseBool(unsafeBytesToString(b)) +} + +// unsafeBytesToString converts a []byte to a string without a heap allocation. +// +// It is unsafe, and is intended to prepare input to short-lived functions +// that require strings. +func unsafeBytesToString(in []byte) string { + src := *(*reflect.SliceHeader)(unsafe.Pointer(&in)) + dst := reflect.StringHeader{ + Data: src.Data, + Len: src.Len, + } + s := *(*string)(unsafe.Pointer(&dst)) + return s +} diff --git a/vendor/github.com/influxdata/influxdb/models/points.go b/vendor/github.com/influxdata/influxdb/models/points.go new file mode 100644 index 000000000..c8a39632b --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/models/points.go @@ -0,0 +1,2035 @@ +// Package models implements basic objects used throughout the TICK stack. +package models // import "github.com/influxdata/influxdb/models" + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "math" + "sort" + "strconv" + "strings" + "time" + + "github.com/influxdata/influxdb/pkg/escape" +) + +var ( + measurementEscapeCodes = map[byte][]byte{ + ',': []byte(`\,`), + ' ': []byte(`\ `), + } + + tagEscapeCodes = map[byte][]byte{ + ',': []byte(`\,`), + ' ': []byte(`\ `), + '=': []byte(`\=`), + } + + // ErrPointMustHaveAField is returned when operating on a point that does not have any fields. + ErrPointMustHaveAField = errors.New("point without fields is unsupported") + + // ErrInvalidNumber is returned when a number is expected but not provided. + ErrInvalidNumber = errors.New("invalid number") + + // ErrInvalidPoint is returned when a point cannot be parsed correctly. + ErrInvalidPoint = errors.New("point is invalid") +) + +const ( + // MaxKeyLength is the largest allowed size of the combined measurement and tag keys. + MaxKeyLength = 65535 +) + +// Point defines the values that will be written to the database. +type Point interface { + // Name return the measurement name for the point. + Name() string + + // SetName updates the measurement name for the point. + SetName(string) + + // Tags returns the tag set for the point. + Tags() Tags + + // AddTag adds or replaces a tag value for a point. + AddTag(key, value string) + + // SetTags replaces the tags for the point. + SetTags(tags Tags) + + // Fields returns the fields for the point. + Fields() (Fields, error) + + // Time return the timestamp for the point. + Time() time.Time + + // SetTime updates the timestamp for the point. + SetTime(t time.Time) + + // UnixNano returns the timestamp of the point as nanoseconds since Unix epoch. + UnixNano() int64 + + // HashID returns a non-cryptographic checksum of the point's key. + HashID() uint64 + + // Key returns the key (measurement joined with tags) of the point. + Key() []byte + + // String returns a string representation of the point. If there is a + // timestamp associated with the point then it will be specified with the default + // precision of nanoseconds. + String() string + + // MarshalBinary returns a binary representation of the point. + MarshalBinary() ([]byte, error) + + // PrecisionString returns a string representation of the point. If there + // is a timestamp associated with the point then it will be specified in the + // given unit. + PrecisionString(precision string) string + + // RoundedString returns a string representation of the point. If there + // is a timestamp associated with the point, then it will be rounded to the + // given duration. + RoundedString(d time.Duration) string + + // Split will attempt to return multiple points with the same timestamp whose + // string representations are no longer than size. Points with a single field or + // a point without a timestamp may exceed the requested size. + Split(size int) []Point + + // Round will round the timestamp of the point to the given duration. + Round(d time.Duration) + + // StringSize returns the length of the string that would be returned by String(). + StringSize() int + + // AppendString appends the result of String() to the provided buffer and returns + // the result, potentially reducing string allocations. + AppendString(buf []byte) []byte + + // FieldIterator retuns a FieldIterator that can be used to traverse the + // fields of a point without constructing the in-memory map. + FieldIterator() FieldIterator +} + +// FieldType represents the type of a field. +type FieldType int + +const ( + // Integer indicates the field's type is integer. + Integer FieldType = iota + + // Float indicates the field's type is float. + Float + + // Boolean indicates the field's type is boolean. + Boolean + + // String indicates the field's type is string. + String + + // Empty is used to indicate that there is no field. + Empty +) + +// FieldIterator provides a low-allocation interface to iterate through a point's fields. +type FieldIterator interface { + // Next indicates whether there any fields remaining. + Next() bool + + // FieldKey returns the key of the current field. + FieldKey() []byte + + // Type returns the FieldType of the current field. + Type() FieldType + + // StringValue returns the string value of the current field. + StringValue() string + + // IntegerValue returns the integer value of the current field. + IntegerValue() (int64, error) + + // BooleanValue returns the boolean value of the current field. + BooleanValue() (bool, error) + + // FloatValue returns the float value of the current field. + FloatValue() (float64, error) + + // Delete deletes the current field. + Delete() + + // Reset resets the iterator to its initial state. + Reset() +} + +// Points represents a sortable list of points by timestamp. +type Points []Point + +// Len implements sort.Interface. +func (a Points) Len() int { return len(a) } + +// Less implements sort.Interface. +func (a Points) Less(i, j int) bool { return a[i].Time().Before(a[j].Time()) } + +// Swap implements sort.Interface. +func (a Points) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// point is the default implementation of Point. +type point struct { + time time.Time + + // text encoding of measurement and tags + // key must always be stored sorted by tags, if the original line was not sorted, + // we need to resort it + key []byte + + // text encoding of field data + fields []byte + + // text encoding of timestamp + ts []byte + + // cached version of parsed fields from data + cachedFields map[string]interface{} + + // cached version of parsed name from key + cachedName string + + // cached version of parsed tags + cachedTags Tags + + it fieldIterator +} + +const ( + // the number of characters for the largest possible int64 (9223372036854775807) + maxInt64Digits = 19 + + // the number of characters for the smallest possible int64 (-9223372036854775808) + minInt64Digits = 20 + + // the number of characters required for the largest float64 before a range check + // would occur during parsing + maxFloat64Digits = 25 + + // the number of characters required for smallest float64 before a range check occur + // would occur during parsing + minFloat64Digits = 27 +) + +// ParsePoints returns a slice of Points from a text representation of a point +// with each point separated by newlines. If any points fail to parse, a non-nil error +// will be returned in addition to the points that parsed successfully. +func ParsePoints(buf []byte) ([]Point, error) { + return ParsePointsWithPrecision(buf, time.Now().UTC(), "n") +} + +// ParsePointsString is identical to ParsePoints but accepts a string. +func ParsePointsString(buf string) ([]Point, error) { + return ParsePoints([]byte(buf)) +} + +// ParseKey returns the measurement name and tags from a point. +// +// NOTE: to minimize heap allocations, the returned Tags will refer to subslices of buf. +// This can have the unintended effect preventing buf from being garbage collected. +func ParseKey(buf []byte) (string, Tags, error) { + // Ignore the error because scanMeasurement returns "missing fields" which we ignore + // when just parsing a key + state, i, _ := scanMeasurement(buf, 0) + + var tags Tags + if state == tagKeyState { + tags = parseTags(buf) + // scanMeasurement returns the location of the comma if there are tags, strip that off + return string(buf[:i-1]), tags, nil + } + return string(buf[:i]), tags, nil +} + +// ParsePointsWithPrecision is similar to ParsePoints, but allows the +// caller to provide a precision for time. +// +// NOTE: to minimize heap allocations, the returned Points will refer to subslices of buf. +// This can have the unintended effect preventing buf from being garbage collected. +func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) { + points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1) + var ( + pos int + block []byte + failed []string + ) + for pos < len(buf) { + pos, block = scanLine(buf, pos) + pos++ + + if len(block) == 0 { + continue + } + + // lines which start with '#' are comments + start := skipWhitespace(block, 0) + + // If line is all whitespace, just skip it + if start >= len(block) { + continue + } + + if block[start] == '#' { + continue + } + + // strip the newline if one is present + if block[len(block)-1] == '\n' { + block = block[:len(block)-1] + } + + pt, err := parsePoint(block[start:], defaultTime, precision) + if err != nil { + failed = append(failed, fmt.Sprintf("unable to parse '%s': %v", string(block[start:]), err)) + } else { + points = append(points, pt) + } + + } + if len(failed) > 0 { + return points, fmt.Errorf("%s", strings.Join(failed, "\n")) + } + return points, nil + +} + +func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, error) { + // scan the first block which is measurement[,tag1=value1,tag2=value=2...] + pos, key, err := scanKey(buf, 0) + if err != nil { + return nil, err + } + + // measurement name is required + if len(key) == 0 { + return nil, fmt.Errorf("missing measurement") + } + + if len(key) > MaxKeyLength { + return nil, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength) + } + + // scan the second block is which is field1=value1[,field2=value2,...] + pos, fields, err := scanFields(buf, pos) + if err != nil { + return nil, err + } + + // at least one field is required + if len(fields) == 0 { + return nil, fmt.Errorf("missing fields") + } + + // scan the last block which is an optional integer timestamp + pos, ts, err := scanTime(buf, pos) + if err != nil { + return nil, err + } + + pt := &point{ + key: key, + fields: fields, + ts: ts, + } + + if len(ts) == 0 { + pt.time = defaultTime + pt.SetPrecision(precision) + } else { + ts, err := parseIntBytes(ts, 10, 64) + if err != nil { + return nil, err + } + pt.time, err = SafeCalcTime(ts, precision) + if err != nil { + return nil, err + } + + // Determine if there are illegal non-whitespace characters after the + // timestamp block. + for pos < len(buf) { + if buf[pos] != ' ' { + return nil, ErrInvalidPoint + } + pos++ + } + } + return pt, nil +} + +// GetPrecisionMultiplier will return a multiplier for the precision specified. +func GetPrecisionMultiplier(precision string) int64 { + d := time.Nanosecond + switch precision { + case "u": + d = time.Microsecond + case "ms": + d = time.Millisecond + case "s": + d = time.Second + case "m": + d = time.Minute + case "h": + d = time.Hour + } + return int64(d) +} + +// scanKey scans buf starting at i for the measurement and tag portion of the point. +// It returns the ending position and the byte slice of key within buf. If there +// are tags, they will be sorted if they are not already. +func scanKey(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + + i = start + + // Determines whether the tags are sort, assume they are + sorted := true + + // indices holds the indexes within buf of the start of each tag. For example, + // a buf of 'cpu,host=a,region=b,zone=c' would have indices slice of [4,11,20] + // which indicates that the first tag starts at buf[4], seconds at buf[11], and + // last at buf[20] + indices := make([]int, 100) + + // tracks how many commas we've seen so we know how many values are indices. + // Since indices is an arbitrarily large slice, + // we need to know how many values in the buffer are in use. + commas := 0 + + // First scan the Point's measurement. + state, i, err := scanMeasurement(buf, i) + if err != nil { + return i, buf[start:i], err + } + + // Optionally scan tags if needed. + if state == tagKeyState { + i, commas, indices, err = scanTags(buf, i, indices) + if err != nil { + return i, buf[start:i], err + } + } + + // Now we know where the key region is within buf, and the location of tags, we + // need to determine if duplicate tags exist and if the tags are sorted. This iterates + // over the list comparing each tag in the sequence with each other. + for j := 0; j < commas-1; j++ { + // get the left and right tags + _, left := scanTo(buf[indices[j]:indices[j+1]-1], 0, '=') + _, right := scanTo(buf[indices[j+1]:indices[j+2]-1], 0, '=') + + // If left is greater than right, the tags are not sorted. We do not have to + // continue because the short path no longer works. + // If the tags are equal, then there are duplicate tags, and we should abort. + // If the tags are not sorted, this pass may not find duplicate tags and we + // need to do a more exhaustive search later. + if cmp := bytes.Compare(left, right); cmp > 0 { + sorted = false + break + } else if cmp == 0 { + return i, buf[start:i], fmt.Errorf("duplicate tags") + } + } + + // If the tags are not sorted, then sort them. This sort is inline and + // uses the tag indices we created earlier. The actual buffer is not sorted, the + // indices are using the buffer for value comparison. After the indices are sorted, + // the buffer is reconstructed from the sorted indices. + if !sorted && commas > 0 { + // Get the measurement name for later + measurement := buf[start : indices[0]-1] + + // Sort the indices + indices := indices[:commas] + insertionSort(0, commas, buf, indices) + + // Create a new key using the measurement and sorted indices + b := make([]byte, len(buf[start:i])) + pos := copy(b, measurement) + for _, i := range indices { + b[pos] = ',' + pos++ + _, v := scanToSpaceOr(buf, i, ',') + pos += copy(b[pos:], v) + } + + // Check again for duplicate tags now that the tags are sorted. + for j := 0; j < commas-1; j++ { + // get the left and right tags + _, left := scanTo(buf[indices[j]:], 0, '=') + _, right := scanTo(buf[indices[j+1]:], 0, '=') + + // If the tags are equal, then there are duplicate tags, and we should abort. + // If the tags are not sorted, this pass may not find duplicate tags and we + // need to do a more exhaustive search later. + if bytes.Equal(left, right) { + return i, b, fmt.Errorf("duplicate tags") + } + } + + return i, b, nil + } + + return i, buf[start:i], nil +} + +// The following constants allow us to specify which state to move to +// next, when scanning sections of a Point. +const ( + tagKeyState = iota + tagValueState + fieldsState +) + +// scanMeasurement examines the measurement part of a Point, returning +// the next state to move to, and the current location in the buffer. +func scanMeasurement(buf []byte, i int) (int, int, error) { + // Check first byte of measurement, anything except a comma is fine. + // It can't be a space, since whitespace is stripped prior to this + // function call. + if i >= len(buf) || buf[i] == ',' { + return -1, i, fmt.Errorf("missing measurement") + } + + for { + i++ + if i >= len(buf) { + // cpu + return -1, i, fmt.Errorf("missing fields") + } + + if buf[i-1] == '\\' { + // Skip character (it's escaped). + continue + } + + // Unescaped comma; move onto scanning the tags. + if buf[i] == ',' { + return tagKeyState, i + 1, nil + } + + // Unescaped space; move onto scanning the fields. + if buf[i] == ' ' { + // cpu value=1.0 + return fieldsState, i, nil + } + } +} + +// scanTags examines all the tags in a Point, keeping track of and +// returning the updated indices slice, number of commas and location +// in buf where to start examining the Point fields. +func scanTags(buf []byte, i int, indices []int) (int, int, []int, error) { + var ( + err error + commas int + state = tagKeyState + ) + + for { + switch state { + case tagKeyState: + // Grow our indices slice if we have too many tags. + if commas >= len(indices) { + newIndics := make([]int, cap(indices)*2) + copy(newIndics, indices) + indices = newIndics + } + indices[commas] = i + commas++ + + i, err = scanTagsKey(buf, i) + state = tagValueState // tag value always follows a tag key + case tagValueState: + state, i, err = scanTagsValue(buf, i) + case fieldsState: + indices[commas] = i + 1 + return i, commas, indices, nil + } + + if err != nil { + return i, commas, indices, err + } + } +} + +// scanTagsKey scans each character in a tag key. +func scanTagsKey(buf []byte, i int) (int, error) { + // First character of the key. + if i >= len(buf) || buf[i] == ' ' || buf[i] == ',' || buf[i] == '=' { + // cpu,{'', ' ', ',', '='} + return i, fmt.Errorf("missing tag key") + } + + // Examine each character in the tag key until we hit an unescaped + // equals (the tag value), or we hit an error (i.e., unescaped + // space or comma). + for { + i++ + + // Either we reached the end of the buffer or we hit an + // unescaped comma or space. + if i >= len(buf) || + ((buf[i] == ' ' || buf[i] == ',') && buf[i-1] != '\\') { + // cpu,tag{'', ' ', ','} + return i, fmt.Errorf("missing tag value") + } + + if buf[i] == '=' && buf[i-1] != '\\' { + // cpu,tag= + return i + 1, nil + } + } +} + +// scanTagsValue scans each character in a tag value. +func scanTagsValue(buf []byte, i int) (int, int, error) { + // Tag value cannot be empty. + if i >= len(buf) || buf[i] == ',' || buf[i] == ' ' { + // cpu,tag={',', ' '} + return -1, i, fmt.Errorf("missing tag value") + } + + // Examine each character in the tag value until we hit an unescaped + // comma (move onto next tag key), an unescaped space (move onto + // fields), or we error out. + for { + i++ + if i >= len(buf) { + // cpu,tag=value + return -1, i, fmt.Errorf("missing fields") + } + + // An unescaped equals sign is an invalid tag value. + if buf[i] == '=' && buf[i-1] != '\\' { + // cpu,tag={'=', 'fo=o'} + return -1, i, fmt.Errorf("invalid tag format") + } + + if buf[i] == ',' && buf[i-1] != '\\' { + // cpu,tag=foo, + return tagKeyState, i + 1, nil + } + + // cpu,tag=foo value=1.0 + // cpu, tag=foo\= value=1.0 + if buf[i] == ' ' && buf[i-1] != '\\' { + return fieldsState, i, nil + } + } +} + +func insertionSort(l, r int, buf []byte, indices []int) { + for i := l + 1; i < r; i++ { + for j := i; j > l && less(buf, indices, j, j-1); j-- { + indices[j], indices[j-1] = indices[j-1], indices[j] + } + } +} + +func less(buf []byte, indices []int, i, j int) bool { + // This grabs the tag names for i & j, it ignores the values + _, a := scanTo(buf, indices[i], '=') + _, b := scanTo(buf, indices[j], '=') + return bytes.Compare(a, b) < 0 +} + +// scanFields scans buf, starting at i for the fields section of a point. It returns +// the ending position and the byte slice of the fields within buf. +func scanFields(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + i = start + quoted := false + + // tracks how many '=' we've seen + equals := 0 + + // tracks how many commas we've seen + commas := 0 + + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // escaped characters? + if buf[i] == '\\' && i+1 < len(buf) { + i += 2 + continue + } + + // If the value is quoted, scan until we get to the end quote + // Only quote values in the field value since quotes are not significant + // in the field key + if buf[i] == '"' && equals > commas { + quoted = !quoted + i++ + continue + } + + // If we see an =, ensure that there is at least on char before and after it + if buf[i] == '=' && !quoted { + equals++ + + // check for "... =123" but allow "a\ =123" + if buf[i-1] == ' ' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing field key") + } + + // check for "...a=123,=456" but allow "a=123,a\,=456" + if buf[i-1] == ',' && buf[i-2] != '\\' { + return i, buf[start:i], fmt.Errorf("missing field key") + } + + // check for "... value=" + if i+1 >= len(buf) { + return i, buf[start:i], fmt.Errorf("missing field value") + } + + // check for "... value=,value2=..." + if buf[i+1] == ',' || buf[i+1] == ' ' { + return i, buf[start:i], fmt.Errorf("missing field value") + } + + if isNumeric(buf[i+1]) || buf[i+1] == '-' || buf[i+1] == 'N' || buf[i+1] == 'n' { + var err error + i, err = scanNumber(buf, i+1) + if err != nil { + return i, buf[start:i], err + } + continue + } + // If next byte is not a double-quote, the value must be a boolean + if buf[i+1] != '"' { + var err error + i, _, err = scanBoolean(buf, i+1) + if err != nil { + return i, buf[start:i], err + } + continue + } + } + + if buf[i] == ',' && !quoted { + commas++ + } + + // reached end of block? + if buf[i] == ' ' && !quoted { + break + } + i++ + } + + if quoted { + return i, buf[start:i], fmt.Errorf("unbalanced quotes") + } + + // check that all field sections had key and values (e.g. prevent "a=1,b" + if equals == 0 || commas != equals-1 { + return i, buf[start:i], fmt.Errorf("invalid field format") + } + + return i, buf[start:i], nil +} + +// scanTime scans buf, starting at i for the time section of a point. It +// returns the ending position and the byte slice of the timestamp within buf +// and and error if the timestamp is not in the correct numeric format. +func scanTime(buf []byte, i int) (int, []byte, error) { + start := skipWhitespace(buf, i) + i = start + + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // Reached end of block or trailing whitespace? + if buf[i] == '\n' || buf[i] == ' ' { + break + } + + // Handle negative timestamps + if i == start && buf[i] == '-' { + i++ + continue + } + + // Timestamps should be integers, make sure they are so we don't need + // to actually parse the timestamp until needed. + if buf[i] < '0' || buf[i] > '9' { + return i, buf[start:i], fmt.Errorf("bad timestamp") + } + i++ + } + return i, buf[start:i], nil +} + +func isNumeric(b byte) bool { + return (b >= '0' && b <= '9') || b == '.' +} + +// scanNumber returns the end position within buf, start at i after +// scanning over buf for an integer, or float. It returns an +// error if a invalid number is scanned. +func scanNumber(buf []byte, i int) (int, error) { + start := i + var isInt bool + + // Is negative number? + if i < len(buf) && buf[i] == '-' { + i++ + // There must be more characters now, as just '-' is illegal. + if i == len(buf) { + return i, ErrInvalidNumber + } + } + + // how many decimal points we've see + decimal := false + + // indicates the number is float in scientific notation + scientific := false + + for { + if i >= len(buf) { + break + } + + if buf[i] == ',' || buf[i] == ' ' { + break + } + + if buf[i] == 'i' && i > start && !isInt { + isInt = true + i++ + continue + } + + if buf[i] == '.' { + // Can't have more than 1 decimal (e.g. 1.1.1 should fail) + if decimal { + return i, ErrInvalidNumber + } + decimal = true + } + + // `e` is valid for floats but not as the first char + if i > start && (buf[i] == 'e' || buf[i] == 'E') { + scientific = true + i++ + continue + } + + // + and - are only valid at this point if they follow an e (scientific notation) + if (buf[i] == '+' || buf[i] == '-') && (buf[i-1] == 'e' || buf[i-1] == 'E') { + i++ + continue + } + + // NaN is an unsupported value + if i+2 < len(buf) && (buf[i] == 'N' || buf[i] == 'n') { + return i, ErrInvalidNumber + } + + if !isNumeric(buf[i]) { + return i, ErrInvalidNumber + } + i++ + } + + if isInt && (decimal || scientific) { + return i, ErrInvalidNumber + } + + numericDigits := i - start + if isInt { + numericDigits-- + } + if decimal { + numericDigits-- + } + if buf[start] == '-' { + numericDigits-- + } + + if numericDigits == 0 { + return i, ErrInvalidNumber + } + + // It's more common that numbers will be within min/max range for their type but we need to prevent + // out or range numbers from being parsed successfully. This uses some simple heuristics to decide + // if we should parse the number to the actual type. It does not do it all the time because it incurs + // extra allocations and we end up converting the type again when writing points to disk. + if isInt { + // Make sure the last char is an 'i' for integers (e.g. 9i10 is not valid) + if buf[i-1] != 'i' { + return i, ErrInvalidNumber + } + // Parse the int to check bounds the number of digits could be larger than the max range + // We subtract 1 from the index to remove the `i` from our tests + if len(buf[start:i-1]) >= maxInt64Digits || len(buf[start:i-1]) >= minInt64Digits { + if _, err := parseIntBytes(buf[start:i-1], 10, 64); err != nil { + return i, fmt.Errorf("unable to parse integer %s: %s", buf[start:i-1], err) + } + } + } else { + // Parse the float to check bounds if it's scientific or the number of digits could be larger than the max range + if scientific || len(buf[start:i]) >= maxFloat64Digits || len(buf[start:i]) >= minFloat64Digits { + if _, err := parseFloatBytes(buf[start:i], 10); err != nil { + return i, fmt.Errorf("invalid float") + } + } + } + + return i, nil +} + +// scanBoolean returns the end position within buf, start at i after +// scanning over buf for boolean. Valid values for a boolean are +// t, T, true, TRUE, f, F, false, FALSE. It returns an error if a invalid boolean +// is scanned. +func scanBoolean(buf []byte, i int) (int, []byte, error) { + start := i + + if i < len(buf) && (buf[i] != 't' && buf[i] != 'f' && buf[i] != 'T' && buf[i] != 'F') { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + i++ + for { + if i >= len(buf) { + break + } + + if buf[i] == ',' || buf[i] == ' ' { + break + } + i++ + } + + // Single char bool (t, T, f, F) is ok + if i-start == 1 { + return i, buf[start:i], nil + } + + // length must be 4 for true or TRUE + if (buf[start] == 't' || buf[start] == 'T') && i-start != 4 { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + // length must be 5 for false or FALSE + if (buf[start] == 'f' || buf[start] == 'F') && i-start != 5 { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + // Otherwise + valid := false + switch buf[start] { + case 't': + valid = bytes.Equal(buf[start:i], []byte("true")) + case 'f': + valid = bytes.Equal(buf[start:i], []byte("false")) + case 'T': + valid = bytes.Equal(buf[start:i], []byte("TRUE")) || bytes.Equal(buf[start:i], []byte("True")) + case 'F': + valid = bytes.Equal(buf[start:i], []byte("FALSE")) || bytes.Equal(buf[start:i], []byte("False")) + } + + if !valid { + return i, buf[start:i], fmt.Errorf("invalid boolean") + } + + return i, buf[start:i], nil + +} + +// skipWhitespace returns the end position within buf, starting at i after +// scanning over spaces in tags. +func skipWhitespace(buf []byte, i int) int { + for i < len(buf) { + if buf[i] != ' ' && buf[i] != '\t' && buf[i] != 0 { + break + } + i++ + } + return i +} + +// scanLine returns the end position in buf and the next line found within +// buf. +func scanLine(buf []byte, i int) (int, []byte) { + start := i + quoted := false + fields := false + + // tracks how many '=' and commas we've seen + // this duplicates some of the functionality in scanFields + equals := 0 + commas := 0 + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // skip past escaped characters + if buf[i] == '\\' { + i += 2 + continue + } + + if buf[i] == ' ' { + fields = true + } + + // If we see a double quote, makes sure it is not escaped + if fields { + if !quoted && buf[i] == '=' { + i++ + equals++ + continue + } else if !quoted && buf[i] == ',' { + i++ + commas++ + continue + } else if buf[i] == '"' && equals > commas { + i++ + quoted = !quoted + continue + } + } + + if buf[i] == '\n' && !quoted { + break + } + + i++ + } + + return i, buf[start:i] +} + +// scanTo returns the end position in buf and the next consecutive block +// of bytes, starting from i and ending with stop byte, where stop byte +// has not been escaped. +// +// If there are leading spaces, they are skipped. +func scanTo(buf []byte, i int, stop byte) (int, []byte) { + start := i + for { + // reached the end of buf? + if i >= len(buf) { + break + } + + // Reached unescaped stop value? + if buf[i] == stop && (i == 0 || buf[i-1] != '\\') { + break + } + i++ + } + + return i, buf[start:i] +} + +// scanTo returns the end position in buf and the next consecutive block +// of bytes, starting from i and ending with stop byte. If there are leading +// spaces, they are skipped. +func scanToSpaceOr(buf []byte, i int, stop byte) (int, []byte) { + start := i + if buf[i] == stop || buf[i] == ' ' { + return i, buf[start:i] + } + + for { + i++ + if buf[i-1] == '\\' { + continue + } + + // reached the end of buf? + if i >= len(buf) { + return i, buf[start:i] + } + + // reached end of block? + if buf[i] == stop || buf[i] == ' ' { + return i, buf[start:i] + } + } +} + +func scanTagValue(buf []byte, i int) (int, []byte) { + start := i + for { + if i >= len(buf) { + break + } + + if buf[i] == ',' && buf[i-1] != '\\' { + break + } + i++ + } + if i > len(buf) { + return i, nil + } + return i, buf[start:i] +} + +func scanFieldValue(buf []byte, i int) (int, []byte) { + start := i + quoted := false + for i < len(buf) { + // Only escape char for a field value is a double-quote and backslash + if buf[i] == '\\' && i+1 < len(buf) && (buf[i+1] == '"' || buf[i+1] == '\\') { + i += 2 + continue + } + + // Quoted value? (e.g. string) + if buf[i] == '"' { + i++ + quoted = !quoted + continue + } + + if buf[i] == ',' && !quoted { + break + } + i++ + } + return i, buf[start:i] +} + +func escapeMeasurement(in []byte) []byte { + for b, esc := range measurementEscapeCodes { + in = bytes.Replace(in, []byte{b}, esc, -1) + } + return in +} + +func unescapeMeasurement(in []byte) []byte { + for b, esc := range measurementEscapeCodes { + in = bytes.Replace(in, esc, []byte{b}, -1) + } + return in +} + +func escapeTag(in []byte) []byte { + for b, esc := range tagEscapeCodes { + if bytes.IndexByte(in, b) != -1 { + in = bytes.Replace(in, []byte{b}, esc, -1) + } + } + return in +} + +func unescapeTag(in []byte) []byte { + if bytes.IndexByte(in, '\\') == -1 { + return in + } + + for b, esc := range tagEscapeCodes { + if bytes.IndexByte(in, b) != -1 { + in = bytes.Replace(in, esc, []byte{b}, -1) + } + } + return in +} + +// escapeStringFieldReplacer replaces double quotes and backslashes +// with the same character preceded by a backslash. +// As of Go 1.7 this benchmarked better in allocations and CPU time +// compared to iterating through a string byte-by-byte and appending to a new byte slice, +// calling strings.Replace twice, and better than (*Regex).ReplaceAllString. +var escapeStringFieldReplacer = strings.NewReplacer(`"`, `\"`, `\`, `\\`) + +// EscapeStringField returns a copy of in with any double quotes or +// backslashes with escaped values. +func EscapeStringField(in string) string { + return escapeStringFieldReplacer.Replace(in) +} + +// unescapeStringField returns a copy of in with any escaped double-quotes +// or backslashes unescaped. +func unescapeStringField(in string) string { + if strings.IndexByte(in, '\\') == -1 { + return in + } + + var out []byte + i := 0 + for { + if i >= len(in) { + break + } + // unescape backslashes + if in[i] == '\\' && i+1 < len(in) && in[i+1] == '\\' { + out = append(out, '\\') + i += 2 + continue + } + // unescape double-quotes + if in[i] == '\\' && i+1 < len(in) && in[i+1] == '"' { + out = append(out, '"') + i += 2 + continue + } + out = append(out, in[i]) + i++ + + } + return string(out) +} + +// NewPoint returns a new point with the given measurement name, tags, fields and timestamp. If +// an unsupported field value (NaN) or out of range time is passed, this function returns an error. +func NewPoint(name string, tags Tags, fields Fields, t time.Time) (Point, error) { + key, err := pointKey(name, tags, fields, t) + if err != nil { + return nil, err + } + + return &point{ + key: key, + time: t, + fields: fields.MarshalBinary(), + }, nil +} + +// pointKey checks some basic requirements for valid points, and returns the +// key, along with an possible error. +func pointKey(measurement string, tags Tags, fields Fields, t time.Time) ([]byte, error) { + if len(fields) == 0 { + return nil, ErrPointMustHaveAField + } + + if !t.IsZero() { + if err := CheckTime(t); err != nil { + return nil, err + } + } + + for key, value := range fields { + switch value := value.(type) { + case float64: + // Ensure the caller validates and handles invalid field values + if math.IsNaN(value) { + return nil, fmt.Errorf("NaN is an unsupported value for field %s", key) + } + case float32: + // Ensure the caller validates and handles invalid field values + if math.IsNaN(float64(value)) { + return nil, fmt.Errorf("NaN is an unsupported value for field %s", key) + } + } + if len(key) == 0 { + return nil, fmt.Errorf("all fields must have non-empty names") + } + } + + key := MakeKey([]byte(measurement), tags) + if len(key) > MaxKeyLength { + return nil, fmt.Errorf("max key length exceeded: %v > %v", len(key), MaxKeyLength) + } + + return key, nil +} + +// NewPointFromBytes returns a new Point from a marshalled Point. +func NewPointFromBytes(b []byte) (Point, error) { + p := &point{} + if err := p.UnmarshalBinary(b); err != nil { + return nil, err + } + fields, err := p.Fields() + if err != nil { + return nil, err + } + if len(fields) == 0 { + return nil, ErrPointMustHaveAField + } + return p, nil +} + +// MustNewPoint returns a new point with the given measurement name, tags, fields and timestamp. If +// an unsupported field value (NaN) is passed, this function panics. +func MustNewPoint(name string, tags Tags, fields Fields, time time.Time) Point { + pt, err := NewPoint(name, tags, fields, time) + if err != nil { + panic(err.Error()) + } + return pt +} + +// Key returns the key (measurement joined with tags) of the point. +func (p *point) Key() []byte { + return p.key +} + +func (p *point) name() []byte { + _, name := scanTo(p.key, 0, ',') + return name +} + +// Name return the measurement name for the point. +func (p *point) Name() string { + if p.cachedName != "" { + return p.cachedName + } + p.cachedName = string(escape.Unescape(p.name())) + return p.cachedName +} + +// SetName updates the measurement name for the point. +func (p *point) SetName(name string) { + p.cachedName = "" + p.key = MakeKey([]byte(name), p.Tags()) +} + +// Time return the timestamp for the point. +func (p *point) Time() time.Time { + return p.time +} + +// SetTime updates the timestamp for the point. +func (p *point) SetTime(t time.Time) { + p.time = t +} + +// Round will round the timestamp of the point to the given duration. +func (p *point) Round(d time.Duration) { + p.time = p.time.Round(d) +} + +// Tags returns the tag set for the point. +func (p *point) Tags() Tags { + if p.cachedTags != nil { + return p.cachedTags + } + p.cachedTags = parseTags(p.key) + return p.cachedTags +} + +func parseTags(buf []byte) Tags { + if len(buf) == 0 { + return nil + } + + pos, name := scanTo(buf, 0, ',') + + // it's an empty key, so there are no tags + if len(name) == 0 { + return nil + } + + tags := make(Tags, 0, bytes.Count(buf, []byte(","))) + hasEscape := bytes.IndexByte(buf, '\\') != -1 + + i := pos + 1 + var key, value []byte + for { + if i >= len(buf) { + break + } + i, key = scanTo(buf, i, '=') + i, value = scanTagValue(buf, i+1) + + if len(value) == 0 { + continue + } + + if hasEscape { + tags = append(tags, Tag{Key: unescapeTag(key), Value: unescapeTag(value)}) + } else { + tags = append(tags, Tag{Key: key, Value: value}) + } + + i++ + } + + return tags +} + +// MakeKey creates a key for a set of tags. +func MakeKey(name []byte, tags Tags) []byte { + // unescape the name and then re-escape it to avoid double escaping. + // The key should always be stored in escaped form. + return append(escapeMeasurement(unescapeMeasurement(name)), tags.HashKey()...) +} + +// SetTags replaces the tags for the point. +func (p *point) SetTags(tags Tags) { + p.key = MakeKey([]byte(p.Name()), tags) + p.cachedTags = tags +} + +// AddTag adds or replaces a tag value for a point. +func (p *point) AddTag(key, value string) { + tags := p.Tags() + tags = append(tags, Tag{Key: []byte(key), Value: []byte(value)}) + sort.Sort(tags) + p.cachedTags = tags + p.key = MakeKey([]byte(p.Name()), tags) +} + +// Fields returns the fields for the point. +func (p *point) Fields() (Fields, error) { + if p.cachedFields != nil { + return p.cachedFields, nil + } + cf, err := p.unmarshalBinary() + if err != nil { + return nil, err + } + p.cachedFields = cf + return p.cachedFields, nil +} + +// SetPrecision will round a time to the specified precision. +func (p *point) SetPrecision(precision string) { + switch precision { + case "n": + case "u": + p.SetTime(p.Time().Truncate(time.Microsecond)) + case "ms": + p.SetTime(p.Time().Truncate(time.Millisecond)) + case "s": + p.SetTime(p.Time().Truncate(time.Second)) + case "m": + p.SetTime(p.Time().Truncate(time.Minute)) + case "h": + p.SetTime(p.Time().Truncate(time.Hour)) + } +} + +// String returns the string representation of the point. +func (p *point) String() string { + if p.Time().IsZero() { + return string(p.Key()) + " " + string(p.fields) + } + return string(p.Key()) + " " + string(p.fields) + " " + strconv.FormatInt(p.UnixNano(), 10) +} + +// AppendString appends the string representation of the point to buf. +func (p *point) AppendString(buf []byte) []byte { + buf = append(buf, p.key...) + buf = append(buf, ' ') + buf = append(buf, p.fields...) + + if !p.time.IsZero() { + buf = append(buf, ' ') + buf = strconv.AppendInt(buf, p.UnixNano(), 10) + } + + return buf +} + +// StringSize returns the length of the string that would be returned by String(). +func (p *point) StringSize() int { + size := len(p.key) + len(p.fields) + 1 + + if !p.time.IsZero() { + digits := 1 // even "0" has one digit + t := p.UnixNano() + if t < 0 { + // account for negative sign, then negate + digits++ + t = -t + } + for t > 9 { // already accounted for one digit + digits++ + t /= 10 + } + size += digits + 1 // digits and a space + } + + return size +} + +// MarshalBinary returns a binary representation of the point. +func (p *point) MarshalBinary() ([]byte, error) { + if len(p.fields) == 0 { + return nil, ErrPointMustHaveAField + } + + tb, err := p.time.MarshalBinary() + if err != nil { + return nil, err + } + + b := make([]byte, 8+len(p.key)+len(p.fields)+len(tb)) + i := 0 + + binary.BigEndian.PutUint32(b[i:], uint32(len(p.key))) + i += 4 + + i += copy(b[i:], p.key) + + binary.BigEndian.PutUint32(b[i:i+4], uint32(len(p.fields))) + i += 4 + + i += copy(b[i:], p.fields) + + copy(b[i:], tb) + return b, nil +} + +// UnmarshalBinary decodes a binary representation of the point into a point struct. +func (p *point) UnmarshalBinary(b []byte) error { + var n int + + // Read key length. + if len(b) < 4 { + return io.ErrShortBuffer + } + n, b = int(binary.BigEndian.Uint32(b[:4])), b[4:] + + // Read key. + if len(b) < n { + return io.ErrShortBuffer + } + p.key, b = b[:n], b[n:] + + // Read fields length. + if len(b) < 4 { + return io.ErrShortBuffer + } + n, b = int(binary.BigEndian.Uint32(b[:4])), b[4:] + + // Read fields. + if len(b) < n { + return io.ErrShortBuffer + } + p.fields, b = b[:n], b[n:] + + // Read timestamp. + if err := p.time.UnmarshalBinary(b); err != nil { + return err + } + return nil +} + +// PrecisionString returns a string representation of the point. If there +// is a timestamp associated with the point then it will be specified in the +// given unit. +func (p *point) PrecisionString(precision string) string { + if p.Time().IsZero() { + return fmt.Sprintf("%s %s", p.Key(), string(p.fields)) + } + return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields), + p.UnixNano()/GetPrecisionMultiplier(precision)) +} + +// RoundedString returns a string representation of the point. If there +// is a timestamp associated with the point, then it will be rounded to the +// given duration. +func (p *point) RoundedString(d time.Duration) string { + if p.Time().IsZero() { + return fmt.Sprintf("%s %s", p.Key(), string(p.fields)) + } + return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields), + p.time.Round(d).UnixNano()) +} + +func (p *point) unmarshalBinary() (Fields, error) { + iter := p.FieldIterator() + fields := make(Fields, 8) + for iter.Next() { + if len(iter.FieldKey()) == 0 { + continue + } + switch iter.Type() { + case Float: + v, err := iter.FloatValue() + if err != nil { + return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err) + } + fields[string(iter.FieldKey())] = v + case Integer: + v, err := iter.IntegerValue() + if err != nil { + return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err) + } + fields[string(iter.FieldKey())] = v + case String: + fields[string(iter.FieldKey())] = iter.StringValue() + case Boolean: + v, err := iter.BooleanValue() + if err != nil { + return nil, fmt.Errorf("unable to unmarshal field %s: %s", string(iter.FieldKey()), err) + } + fields[string(iter.FieldKey())] = v + } + } + return fields, nil +} + +// HashID returns a non-cryptographic checksum of the point's key. +func (p *point) HashID() uint64 { + h := NewInlineFNV64a() + h.Write(p.key) + sum := h.Sum64() + return sum +} + +// UnixNano returns the timestamp of the point as nanoseconds since Unix epoch. +func (p *point) UnixNano() int64 { + return p.Time().UnixNano() +} + +// Split will attempt to return multiple points with the same timestamp whose +// string representations are no longer than size. Points with a single field or +// a point without a timestamp may exceed the requested size. +func (p *point) Split(size int) []Point { + if p.time.IsZero() || len(p.String()) <= size { + return []Point{p} + } + + // key string, timestamp string, spaces + size -= len(p.key) + len(strconv.FormatInt(p.time.UnixNano(), 10)) + 2 + + var points []Point + var start, cur int + + for cur < len(p.fields) { + end, _ := scanTo(p.fields, cur, '=') + end, _ = scanFieldValue(p.fields, end+1) + + if cur > start && end-start > size { + points = append(points, &point{ + key: p.key, + time: p.time, + fields: p.fields[start : cur-1], + }) + start = cur + } + + cur = end + 1 + } + + points = append(points, &point{ + key: p.key, + time: p.time, + fields: p.fields[start:], + }) + + return points +} + +// Tag represents a single key/value tag pair. +type Tag struct { + Key []byte + Value []byte +} + +// Clone returns a shallow copy of Tag. +// +// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed. +// Use Clone to create a Tag with new byte slices that do not refer to the argument to ParsePointsWithPrecision. +func (t Tag) Clone() Tag { + other := Tag{ + Key: make([]byte, len(t.Key)), + Value: make([]byte, len(t.Value)), + } + + copy(other.Key, t.Key) + copy(other.Value, t.Value) + + return other +} + +// Tags represents a sorted list of tags. +type Tags []Tag + +// NewTags returns a new Tags from a map. +func NewTags(m map[string]string) Tags { + if len(m) == 0 { + return nil + } + a := make(Tags, 0, len(m)) + for k, v := range m { + a = append(a, Tag{Key: []byte(k), Value: []byte(v)}) + } + sort.Sort(a) + return a +} + +// Clone returns a copy of the slice where the elements are a result of calling `Clone` on the original elements +// +// Tags associated with a Point created by ParsePointsWithPrecision will hold references to the byte slice that was parsed. +// Use Clone to create Tags with new byte slices that do not refer to the argument to ParsePointsWithPrecision. +func (a Tags) Clone() Tags { + if len(a) == 0 { + return nil + } + + others := make(Tags, len(a)) + for i := range a { + others[i] = a[i].Clone() + } + + return others +} + +// Len implements sort.Interface. +func (a Tags) Len() int { return len(a) } + +// Less implements sort.Interface. +func (a Tags) Less(i, j int) bool { return bytes.Compare(a[i].Key, a[j].Key) == -1 } + +// Swap implements sort.Interface. +func (a Tags) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// Get returns the value for a key. +func (a Tags) Get(key []byte) []byte { + // OPTIMIZE: Use sort.Search if tagset is large. + + for _, t := range a { + if bytes.Equal(t.Key, key) { + return t.Value + } + } + return nil +} + +// GetString returns the string value for a string key. +func (a Tags) GetString(key string) string { + return string(a.Get([]byte(key))) +} + +// Set sets the value for a key. +func (a *Tags) Set(key, value []byte) { + for _, t := range *a { + if bytes.Equal(t.Key, key) { + t.Value = value + return + } + } + *a = append(*a, Tag{Key: key, Value: value}) + sort.Sort(*a) +} + +// SetString sets the string value for a string key. +func (a *Tags) SetString(key, value string) { + a.Set([]byte(key), []byte(value)) +} + +// Delete removes a tag by key. +func (a *Tags) Delete(key []byte) { + for i, t := range *a { + if bytes.Equal(t.Key, key) { + copy((*a)[i:], (*a)[i+1:]) + (*a)[len(*a)-1] = Tag{} + *a = (*a)[:len(*a)-1] + return + } + } +} + +// Map returns a map representation of the tags. +func (a Tags) Map() map[string]string { + m := make(map[string]string, len(a)) + for _, t := range a { + m[string(t.Key)] = string(t.Value) + } + return m +} + +// Merge merges the tags combining the two. If both define a tag with the +// same key, the merged value overwrites the old value. +// A new map is returned. +func (a Tags) Merge(other map[string]string) Tags { + merged := make(map[string]string, len(a)+len(other)) + for _, t := range a { + merged[string(t.Key)] = string(t.Value) + } + for k, v := range other { + merged[k] = v + } + return NewTags(merged) +} + +// HashKey hashes all of a tag's keys. +func (a Tags) HashKey() []byte { + // Empty maps marshal to empty bytes. + if len(a) == 0 { + return nil + } + + escaped := make(Tags, 0, len(a)) + for _, t := range a { + ek := escapeTag(t.Key) + ev := escapeTag(t.Value) + + if len(ev) > 0 { + escaped = append(escaped, Tag{Key: ek, Value: ev}) + } + } + + // Extract keys and determine final size. + sz := len(escaped) + (len(escaped) * 2) // separators + keys := make([][]byte, len(escaped)+1) + for i, t := range escaped { + keys[i] = t.Key + sz += len(t.Key) + len(t.Value) + } + keys = keys[:len(escaped)] + sort.Sort(byteSlices(keys)) + + // Generate marshaled bytes. + b := make([]byte, sz) + buf := b + idx := 0 + for i, k := range keys { + buf[idx] = ',' + idx++ + copy(buf[idx:idx+len(k)], k) + idx += len(k) + buf[idx] = '=' + idx++ + v := escaped[i].Value + copy(buf[idx:idx+len(v)], v) + idx += len(v) + } + return b[:idx] +} + +// Fields represents a mapping between a Point's field names and their +// values. +type Fields map[string]interface{} + +// FieldIterator retuns a FieldIterator that can be used to traverse the +// fields of a point without constructing the in-memory map. +func (p *point) FieldIterator() FieldIterator { + p.Reset() + return p +} + +type fieldIterator struct { + start, end int + key, keybuf []byte + valueBuf []byte + fieldType FieldType +} + +// Next indicates whether there any fields remaining. +func (p *point) Next() bool { + p.it.start = p.it.end + if p.it.start >= len(p.fields) { + return false + } + + p.it.end, p.it.key = scanTo(p.fields, p.it.start, '=') + if escape.IsEscaped(p.it.key) { + p.it.keybuf = escape.AppendUnescaped(p.it.keybuf[:0], p.it.key) + p.it.key = p.it.keybuf + } + + p.it.end, p.it.valueBuf = scanFieldValue(p.fields, p.it.end+1) + p.it.end++ + + if len(p.it.valueBuf) == 0 { + p.it.fieldType = Empty + return true + } + + c := p.it.valueBuf[0] + + if c == '"' { + p.it.fieldType = String + return true + } + + if strings.IndexByte(`0123456789-.nNiI`, c) >= 0 { + if p.it.valueBuf[len(p.it.valueBuf)-1] == 'i' { + p.it.fieldType = Integer + p.it.valueBuf = p.it.valueBuf[:len(p.it.valueBuf)-1] + } else { + p.it.fieldType = Float + } + return true + } + + // to keep the same behavior that currently exists, default to boolean + p.it.fieldType = Boolean + return true +} + +// FieldKey returns the key of the current field. +func (p *point) FieldKey() []byte { + return p.it.key +} + +// Type returns the FieldType of the current field. +func (p *point) Type() FieldType { + return p.it.fieldType +} + +// StringValue returns the string value of the current field. +func (p *point) StringValue() string { + return unescapeStringField(string(p.it.valueBuf[1 : len(p.it.valueBuf)-1])) +} + +// IntegerValue returns the integer value of the current field. +func (p *point) IntegerValue() (int64, error) { + n, err := parseIntBytes(p.it.valueBuf, 10, 64) + if err != nil { + return 0, fmt.Errorf("unable to parse integer value %q: %v", p.it.valueBuf, err) + } + return n, nil +} + +// BooleanValue returns the boolean value of the current field. +func (p *point) BooleanValue() (bool, error) { + b, err := parseBoolBytes(p.it.valueBuf) + if err != nil { + return false, fmt.Errorf("unable to parse bool value %q: %v", p.it.valueBuf, err) + } + return b, nil +} + +// FloatValue returns the float value of the current field. +func (p *point) FloatValue() (float64, error) { + f, err := parseFloatBytes(p.it.valueBuf, 64) + if err != nil { + return 0, fmt.Errorf("unable to parse floating point value %q: %v", p.it.valueBuf, err) + } + return f, nil +} + +// Delete deletes the current field. +func (p *point) Delete() { + switch { + case p.it.end == p.it.start: + case p.it.end >= len(p.fields): + p.fields = p.fields[:p.it.start] + case p.it.start == 0: + p.fields = p.fields[p.it.end:] + default: + p.fields = append(p.fields[:p.it.start], p.fields[p.it.end:]...) + } + + p.it.end = p.it.start + p.it.key = nil + p.it.valueBuf = nil + p.it.fieldType = Empty +} + +// Reset resets the iterator to its initial state. +func (p *point) Reset() { + p.it.fieldType = Empty + p.it.key = nil + p.it.valueBuf = nil + p.it.start = 0 + p.it.end = 0 +} + +// MarshalBinary encodes all the fields to their proper type and returns the binary +// represenation +// NOTE: uint64 is specifically not supported due to potential overflow when we decode +// again later to an int64 +// NOTE2: uint is accepted, and may be 64 bits, and is for some reason accepted... +func (p Fields) MarshalBinary() []byte { + var b []byte + keys := make([]string, 0, len(p)) + + for k := range p { + keys = append(keys, k) + } + + // Not really necessary, can probably be removed. + sort.Strings(keys) + + for i, k := range keys { + if i > 0 { + b = append(b, ',') + } + b = appendField(b, k, p[k]) + } + + return b +} + +func appendField(b []byte, k string, v interface{}) []byte { + b = append(b, []byte(escape.String(k))...) + b = append(b, '=') + + // check popular types first + switch v := v.(type) { + case float64: + b = strconv.AppendFloat(b, v, 'f', -1, 64) + case int64: + b = strconv.AppendInt(b, v, 10) + b = append(b, 'i') + case string: + b = append(b, '"') + b = append(b, []byte(EscapeStringField(v))...) + b = append(b, '"') + case bool: + b = strconv.AppendBool(b, v) + case int32: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case int16: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case int8: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case int: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case uint32: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case uint16: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case uint8: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + // TODO: 'uint' should be considered just as "dangerous" as a uint64, + // perhaps the value should be checked and capped at MaxInt64? We could + // then include uint64 as an accepted value + case uint: + b = strconv.AppendInt(b, int64(v), 10) + b = append(b, 'i') + case float32: + b = strconv.AppendFloat(b, float64(v), 'f', -1, 32) + case []byte: + b = append(b, v...) + case nil: + // skip + default: + // Can't determine the type, so convert to string + b = append(b, '"') + b = append(b, []byte(EscapeStringField(fmt.Sprintf("%v", v)))...) + b = append(b, '"') + + } + + return b +} + +type byteSlices [][]byte + +func (a byteSlices) Len() int { return len(a) } +func (a byteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 } +func (a byteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] } diff --git a/vendor/github.com/influxdata/influxdb/models/rows.go b/vendor/github.com/influxdata/influxdb/models/rows.go new file mode 100644 index 000000000..c087a4882 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/models/rows.go @@ -0,0 +1,62 @@ +package models + +import ( + "sort" +) + +// Row represents a single row returned from the execution of a statement. +type Row struct { + Name string `json:"name,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Columns []string `json:"columns,omitempty"` + Values [][]interface{} `json:"values,omitempty"` + Partial bool `json:"partial,omitempty"` +} + +// SameSeries returns true if r contains values for the same series as o. +func (r *Row) SameSeries(o *Row) bool { + return r.tagsHash() == o.tagsHash() && r.Name == o.Name +} + +// tagsHash returns a hash of tag key/value pairs. +func (r *Row) tagsHash() uint64 { + h := NewInlineFNV64a() + keys := r.tagsKeys() + for _, k := range keys { + h.Write([]byte(k)) + h.Write([]byte(r.Tags[k])) + } + return h.Sum64() +} + +// tagKeys returns a sorted list of tag keys. +func (r *Row) tagsKeys() []string { + a := make([]string, 0, len(r.Tags)) + for k := range r.Tags { + a = append(a, k) + } + sort.Strings(a) + return a +} + +// Rows represents a collection of rows. Rows implements sort.Interface. +type Rows []*Row + +// Len implements sort.Interface. +func (p Rows) Len() int { return len(p) } + +// Less implements sort.Interface. +func (p Rows) Less(i, j int) bool { + // Sort by name first. + if p[i].Name != p[j].Name { + return p[i].Name < p[j].Name + } + + // Sort by tag set hash. Tags don't have a meaningful sort order so we + // just compute a hash and sort by that instead. This allows the tests + // to receive rows in a predictable order every time. + return p[i].tagsHash() < p[j].tagsHash() +} + +// Swap implements sort.Interface. +func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/vendor/github.com/influxdata/influxdb/models/statistic.go b/vendor/github.com/influxdata/influxdb/models/statistic.go new file mode 100644 index 000000000..553e9d09f --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/models/statistic.go @@ -0,0 +1,42 @@ +package models + +// Statistic is the representation of a statistic used by the monitoring service. +type Statistic struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Values map[string]interface{} `json:"values"` +} + +// NewStatistic returns an initialized Statistic. +func NewStatistic(name string) Statistic { + return Statistic{ + Name: name, + Tags: make(map[string]string), + Values: make(map[string]interface{}), + } +} + +// StatisticTags is a map that can be merged with others without causing +// mutations to either map. +type StatisticTags map[string]string + +// Merge creates a new map containing the merged contents of tags and t. +// If both tags and the receiver map contain the same key, the value in tags +// is used in the resulting map. +// +// Merge always returns a usable map. +func (t StatisticTags) Merge(tags map[string]string) map[string]string { + // Add everything in tags to the result. + out := make(map[string]string, len(tags)) + for k, v := range tags { + out[k] = v + } + + // Only add values from t that don't appear in tags. + for k, v := range t { + if _, ok := tags[k]; !ok { + out[k] = v + } + } + return out +} diff --git a/vendor/github.com/influxdata/influxdb/models/time.go b/vendor/github.com/influxdata/influxdb/models/time.go new file mode 100644 index 000000000..e98f2cb33 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/models/time.go @@ -0,0 +1,74 @@ +package models + +// Helper time methods since parsing time can easily overflow and we only support a +// specific time range. + +import ( + "fmt" + "math" + "time" +) + +const ( + // MinNanoTime is the minumum time that can be represented. + // + // 1677-09-21 00:12:43.145224194 +0000 UTC + // + // The two lowest minimum integers are used as sentinel values. The + // minimum value needs to be used as a value lower than any other value for + // comparisons and another separate value is needed to act as a sentinel + // default value that is unusable by the user, but usable internally. + // Because these two values need to be used for a special purpose, we do + // not allow users to write points at these two times. + MinNanoTime = int64(math.MinInt64) + 2 + + // MaxNanoTime is the maximum time that can be represented. + // + // 2262-04-11 23:47:16.854775806 +0000 UTC + // + // The highest time represented by a nanosecond needs to be used for an + // exclusive range in the shard group, so the maximum time needs to be one + // less than the possible maximum number of nanoseconds representable by an + // int64 so that we don't lose a point at that one time. + MaxNanoTime = int64(math.MaxInt64) - 1 +) + +var ( + minNanoTime = time.Unix(0, MinNanoTime).UTC() + maxNanoTime = time.Unix(0, MaxNanoTime).UTC() + + // ErrTimeOutOfRange gets returned when time is out of the representable range using int64 nanoseconds since the epoch. + ErrTimeOutOfRange = fmt.Errorf("time outside range %d - %d", MinNanoTime, MaxNanoTime) +) + +// SafeCalcTime safely calculates the time given. Will return error if the time is outside the +// supported range. +func SafeCalcTime(timestamp int64, precision string) (time.Time, error) { + mult := GetPrecisionMultiplier(precision) + if t, ok := safeSignedMult(timestamp, mult); ok { + tme := time.Unix(0, t).UTC() + return tme, CheckTime(tme) + } + + return time.Time{}, ErrTimeOutOfRange +} + +// CheckTime checks that a time is within the safe range. +func CheckTime(t time.Time) error { + if t.Before(minNanoTime) || t.After(maxNanoTime) { + return ErrTimeOutOfRange + } + return nil +} + +// Perform the multiplication and check to make sure it didn't overflow. +func safeSignedMult(a, b int64) (int64, bool) { + if a == 0 || b == 0 || a == 1 || b == 1 { + return a * b, true + } + if a == MinNanoTime || b == MaxNanoTime { + return 0, false + } + c := a * b + return c, c/b == a +} diff --git a/vendor/github.com/influxdata/influxdb/pkg/escape/bytes.go b/vendor/github.com/influxdata/influxdb/pkg/escape/bytes.go new file mode 100644 index 000000000..ac7ed5ab3 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/pkg/escape/bytes.go @@ -0,0 +1,111 @@ +// Package escape contains utilities for escaping parts of InfluxQL +// and InfluxDB line protocol. +package escape // import "github.com/influxdata/influxdb/pkg/escape" + +import ( + "bytes" + "strings" +) + +// Codes is a map of bytes to be escaped. +var Codes = map[byte][]byte{ + ',': []byte(`\,`), + '"': []byte(`\"`), + ' ': []byte(`\ `), + '=': []byte(`\=`), +} + +// Bytes escapes characters on the input slice, as defined by Codes. +func Bytes(in []byte) []byte { + for b, esc := range Codes { + in = bytes.Replace(in, []byte{b}, esc, -1) + } + return in +} + +const escapeChars = `," =` + +// IsEscaped returns whether b has any escaped characters, +// i.e. whether b seems to have been processed by Bytes. +func IsEscaped(b []byte) bool { + for len(b) > 0 { + i := bytes.IndexByte(b, '\\') + if i < 0 { + return false + } + + if i+1 < len(b) && strings.IndexByte(escapeChars, b[i+1]) >= 0 { + return true + } + b = b[i+1:] + } + return false +} + +// AppendUnescaped appends the unescaped version of src to dst +// and returns the resulting slice. +func AppendUnescaped(dst, src []byte) []byte { + var pos int + for len(src) > 0 { + next := bytes.IndexByte(src[pos:], '\\') + if next < 0 || pos+next+1 >= len(src) { + return append(dst, src...) + } + + if pos+next+1 < len(src) && strings.IndexByte(escapeChars, src[pos+next+1]) >= 0 { + if pos+next > 0 { + dst = append(dst, src[:pos+next]...) + } + src = src[pos+next+1:] + pos = 0 + } else { + pos += next + 1 + } + } + + return dst +} + +// Unescape returns a new slice containing the unescaped version of in. +func Unescape(in []byte) []byte { + if len(in) == 0 { + return nil + } + + if bytes.IndexByte(in, '\\') == -1 { + return in + } + + i := 0 + inLen := len(in) + var out []byte + + for { + if i >= inLen { + break + } + if in[i] == '\\' && i+1 < inLen { + switch in[i+1] { + case ',': + out = append(out, ',') + i += 2 + continue + case '"': + out = append(out, '"') + i += 2 + continue + case ' ': + out = append(out, ' ') + i += 2 + continue + case '=': + out = append(out, '=') + i += 2 + continue + } + } + out = append(out, in[i]) + i += 1 + } + return out +} diff --git a/vendor/github.com/influxdata/influxdb/pkg/escape/strings.go b/vendor/github.com/influxdata/influxdb/pkg/escape/strings.go new file mode 100644 index 000000000..db98033b0 --- /dev/null +++ b/vendor/github.com/influxdata/influxdb/pkg/escape/strings.go @@ -0,0 +1,21 @@ +package escape + +import "strings" + +var ( + escaper = strings.NewReplacer(`,`, `\,`, `"`, `\"`, ` `, `\ `, `=`, `\=`) + unescaper = strings.NewReplacer(`\,`, `,`, `\"`, `"`, `\ `, ` `, `\=`, `=`) +) + +// UnescapeString returns unescaped version of in. +func UnescapeString(in string) string { + if strings.IndexByte(in, '\\') == -1 { + return in + } + return unescaper.Replace(in) +} + +// String returns the escaped version of in. +func String(in string) string { + return escaper.Replace(in) +} diff --git a/vendor/github.com/influxdb/influxdb/client/influxdb.go b/vendor/github.com/influxdb/influxdb/client/influxdb.go deleted file mode 100644 index 235beb964..000000000 --- a/vendor/github.com/influxdb/influxdb/client/influxdb.go +++ /dev/null @@ -1,180 +0,0 @@ -package client - -import ( - "bytes" - "fmt" - "io/ioutil" - "net/http" - "net/url" - "time" - - "github.com/influxdb/influxdb/tsdb" -) - -const ( - // DefaultTimeout is the default connection timeout used to connect to an InfluxDB instance - DefaultTimeout = 0 -) - -// Config is used to specify what server to connect to. -// URL: The URL of the server connecting to. -// Username/Password are optional. They will be passed via basic auth if provided. -// UserAgent: If not provided, will default "InfluxDBClient", -// Timeout: If not provided, will default to 0 (no timeout) -type Config struct { - URL url.URL - Username string - Password string - UserAgent string - Timeout time.Duration - Precision string -} - -// NewConfig will create a config to be used in connecting to the client -func NewConfig() Config { - return Config{ - Timeout: DefaultTimeout, - } -} - -// Client is used to make calls to the server. -type Client struct { - url url.URL - username string - password string - httpClient *http.Client - userAgent string - precision string -} - -const ( - ConsistencyOne = "one" - ConsistencyAll = "all" - ConsistencyQuorum = "quorum" - ConsistencyAny = "any" -) - -// NewClient will instantiate and return a connected client to issue commands to the server. -func NewClient(c Config) (*Client, error) { - client := Client{ - url: c.URL, - username: c.Username, - password: c.Password, - httpClient: &http.Client{Timeout: c.Timeout}, - userAgent: c.UserAgent, - precision: c.Precision, - } - if client.userAgent == "" { - client.userAgent = "InfluxDBClient" - } - return &client, nil -} - -// Write takes BatchPoints and allows for writing of multiple points with defaults -// If successful, error is nil and Response is nil -// If an error occurs, Response may contain additional information if populated. -func (c *Client) Write(bp BatchPoints) (*Response, error) { - u := c.url - u.Path = "write" - - var b bytes.Buffer - for _, p := range bp.Points { - if p.Raw != "" { - if _, err := b.WriteString(p.Raw); err != nil { - return nil, err - } - } else { - for k, v := range bp.Tags { - if p.Tags == nil { - p.Tags = make(map[string]string, len(bp.Tags)) - } - p.Tags[k] = v - } - - if _, err := b.WriteString(p.MarshalString()); err != nil { - return nil, err - } - } - - if err := b.WriteByte('\n'); err != nil { - return nil, err - } - } - - req, err := http.NewRequest("POST", u.String(), &b) - if err != nil { - return nil, err - } - req.Header.Set("Content-Type", "") - req.Header.Set("User-Agent", c.userAgent) - if c.username != "" { - req.SetBasicAuth(c.username, c.password) - } - params := req.URL.Query() - params.Set("db", bp.Database) - params.Set("rp", bp.RetentionPolicy) - params.Set("precision", bp.Precision) - params.Set("consistency", bp.WriteConsistency) - req.URL.RawQuery = params.Encode() - - resp, err := c.httpClient.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - var response Response - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK { - var err = fmt.Errorf(string(body)) - response.Err = err - return &response, err - } - - return nil, nil -} - -// Structs - -// Response represents a list of statement results. -type Response struct { - Err error -} - -// Point defines the fields that will be written to the database -// Measurement, Time, and Fields are required -// Precision can be specified if the time is in epoch format (integer). -// Valid values for Precision are n, u, ms, s, m, and h -type Point struct { - Measurement string - Tags map[string]string - Time time.Time - Fields map[string]interface{} - Precision string - Raw string -} - -func (p *Point) MarshalString() string { - return tsdb.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time).String() -} - -// BatchPoints is used to send batched data in a single write. -// Database and Points are required -// If no retention policy is specified, it will use the databases default retention policy. -// If tags are specified, they will be "merged" with all points. If a point already has that tag, it is ignored. -// If time is specified, it will be applied to any point with an empty time. -// Precision can be specified if the time is in epoch format (integer). -// Valid values for Precision are n, u, ms, s, m, and h -type BatchPoints struct { - Points []Point `json:"points,omitempty"` - Database string `json:"database,omitempty"` - RetentionPolicy string `json:"retentionPolicy,omitempty"` - Tags map[string]string `json:"tags,omitempty"` - Time time.Time `json:"time,omitempty"` - Precision string `json:"precision,omitempty"` - WriteConsistency string `json:"-"` -} diff --git a/vendor/github.com/influxdb/influxdb/tsdb/points.go b/vendor/github.com/influxdb/influxdb/tsdb/points.go deleted file mode 100644 index dd8dbb644..000000000 --- a/vendor/github.com/influxdb/influxdb/tsdb/points.go +++ /dev/null @@ -1,1392 +0,0 @@ -package tsdb - -import ( - "bytes" - "fmt" - "hash/fnv" - "regexp" - "sort" - "strconv" - "strings" - "time" -) - -// Point defines the values that will be written to the database -type Point interface { - Name() string - SetName(string) - - Tags() Tags - AddTag(key, value string) - SetTags(tags Tags) - - Fields() Fields - AddField(name string, value interface{}) - - Time() time.Time - SetTime(t time.Time) - UnixNano() int64 - - HashID() uint64 - Key() []byte - - Data() []byte - SetData(buf []byte) - - String() string -} - -// Points represents a sortable list of points by timestamp. -type Points []Point - -func (a Points) Len() int { return len(a) } -func (a Points) Less(i, j int) bool { return a[i].Time().Before(a[j].Time()) } -func (a Points) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -// point is the default implementation of Point. -type point struct { - time time.Time - - // text encoding of measurement and tags - // key must always be stored sorted by tags, if the original line was not sorted, - // we need to resort it - key []byte - - // text encoding of field data - fields []byte - - // text encoding of timestamp - ts []byte - - // binary encoded field data - data []byte - - // cached version of parsed fields from data - cachedFields map[string]interface{} - - // cached version of parsed name from key - cachedName string -} - -const ( - // the number of characters for the largest possible int64 (9223372036854775807) - maxInt64Digits = 19 - - // the number of characters for the smallest possible int64 (-9223372036854775808) - minInt64Digits = 20 - - // the number of characters required for the largest float64 before a range check - // would occur during parsing - maxFloat64Digits = 25 - - // the number of characters required for smallest float64 before a range check occur - // would occur during parsing - minFloat64Digits = 27 -) - -var ( - // Compile the regex that detects unquoted double quote sequences - quoteReplacer = regexp.MustCompile(`([^\\])"`) - - escapeCodes = map[byte][]byte{ - ',': []byte(`\,`), - '"': []byte(`\"`), - ' ': []byte(`\ `), - '=': []byte(`\=`), - } - - escapeCodesStr = map[string]string{} - - measurementEscapeCodes = map[byte][]byte{ - ',': []byte(`\,`), - ' ': []byte(`\ `), - } - - tagEscapeCodes = map[byte][]byte{ - ',': []byte(`\,`), - ' ': []byte(`\ `), - '=': []byte(`\=`), - } -) - -func init() { - for k, v := range escapeCodes { - escapeCodesStr[string(k)] = string(v) - } -} - -func ParsePointsString(buf string) ([]Point, error) { - return ParsePoints([]byte(buf)) -} - -// ParsePoints returns a slice of Points from a text representation of a point -// with each point separated by newlines. -func ParsePoints(buf []byte) ([]Point, error) { - return ParsePointsWithPrecision(buf, time.Now().UTC(), "n") -} - -func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) { - points := []Point{} - var ( - pos int - block []byte - ) - for { - pos, block = scanLine(buf, pos) - pos += 1 - - if len(block) == 0 { - break - } - - // lines which start with '#' are comments - start := skipWhitespace(block, 0) - - // If line is all whitespace, just skip it - if start >= len(block) { - continue - } - - if block[start] == '#' { - continue - } - - // strip the newline if one is present - if block[len(block)-1] == '\n' { - block = block[:len(block)-1] - } - - pt, err := parsePoint(block[start:len(block)], defaultTime, precision) - if err != nil { - return nil, fmt.Errorf("unable to parse '%s': %v", string(block[start:len(block)]), err) - } - points = append(points, pt) - - if pos >= len(buf) { - break - } - - } - return points, nil - -} - -func parsePoint(buf []byte, defaultTime time.Time, precision string) (Point, error) { - // scan the first block which is measurement[,tag1=value1,tag2=value=2...] - pos, key, err := scanKey(buf, 0) - if err != nil { - return nil, err - } - - // measurement name is required - if len(key) == 0 { - return nil, fmt.Errorf("missing measurement") - } - - // scan the second block is which is field1=value1[,field2=value2,...] - pos, fields, err := scanFields(buf, pos) - if err != nil { - return nil, err - } - - // at least one field is required - if len(fields) == 0 { - return nil, fmt.Errorf("missing fields") - } - - // scan the last block which is an optional integer timestamp - pos, ts, err := scanTime(buf, pos) - - if err != nil { - return nil, err - } - - pt := &point{ - key: key, - fields: fields, - ts: ts, - } - - if len(ts) == 0 { - pt.time = defaultTime - pt.SetPrecision(precision) - } else { - ts, err := strconv.ParseInt(string(ts), 10, 64) - if err != nil { - return nil, err - } - pt.time = time.Unix(0, ts*pt.GetPrecisionMultiplier(precision)) - } - return pt, nil -} - -// scanKey scans buf starting at i for the measurement and tag portion of the point. -// It returns the ending position and the byte slice of key within buf. If there -// are tags, they will be sorted if they are not already. -func scanKey(buf []byte, i int) (int, []byte, error) { - start := skipWhitespace(buf, i) - - i = start - - // Determines whether the tags are sort, assume they are - sorted := true - - // indices holds the indexes within buf of the start of each tag. For example, - // a buf of 'cpu,host=a,region=b,zone=c' would have indices slice of [4,11,20] - // which indicates that the first tag starts at buf[4], seconds at buf[11], and - // last at buf[20] - indices := make([]int, 100) - - // tracks how many commas we've seen so we know how many values are indices. - // Since indices is an arbitrarily large slice, - // we need to know how many values in the buffer are in use. - commas := 0 - - // tracks whether we've see an '=' - equals := 0 - - // loop over each byte in buf - for { - // reached the end of buf? - if i >= len(buf) { - if equals == 0 && commas > 0 { - return i, buf[start:i], fmt.Errorf("missing tag value") - } - - break - } - - // equals is special in the tags section. It must be escaped if part of a tag name or value. - // It does not need to be escaped if part of the measurement. - if buf[i] == '=' && commas > 0 { - if i-1 < 0 || i-2 < 0 { - return i, buf[start:i], fmt.Errorf("missing tag name") - } - - // Check for "cpu,=value" but allow "cpu,a\,=value" - if buf[i-1] == ',' && buf[i-2] != '\\' { - return i, buf[start:i], fmt.Errorf("missing tag name") - } - - // Check for "cpu,\ =value" - if buf[i-1] == ' ' && buf[i-2] != '\\' { - return i, buf[start:i], fmt.Errorf("missing tag name") - } - - i += 1 - equals += 1 - - // Check for "cpu,a=1,b= value=1" - if i < len(buf) && buf[i] == ' ' { - return i, buf[start:i], fmt.Errorf("missing tag value") - } - continue - } - - // escaped character - if buf[i] == '\\' { - i += 2 - continue - } - - // At a tag separator (comma), track it's location - if buf[i] == ',' { - if equals == 0 && commas > 0 { - return i, buf[start:i], fmt.Errorf("missing tag value") - } - i += 1 - - // grow our indices slice if we have too many tags - if commas >= len(indices) { - newIndics := make([]int, cap(indices)*2) - copy(newIndics, indices) - indices = newIndics - } - indices[commas] = i - commas += 1 - - // Check for "cpu, value=1" - if i < len(buf) && buf[i] == ' ' { - return i, buf[start:i], fmt.Errorf("missing tag key") - } - continue - } - - // reached end of the block? (next block would be fields) - if buf[i] == ' ' { - // check for "cpu,tag value=1" - if equals == 0 && commas > 0 { - return i, buf[start:i], fmt.Errorf("missing tag value") - } - if equals > 0 && commas-1 != equals-1 { - return i, buf[start:i], fmt.Errorf("missing tag value") - } - - // grow our indices slice if we have too many tags - if commas >= len(indices) { - newIndics := make([]int, cap(indices)*2) - copy(newIndics, indices) - indices = newIndics - } - - indices[commas] = i + 1 - break - } - - i += 1 - } - - // check that all field sections had key and values (e.g. prevent "a=1,b" - // We're using commas -1 because there should always be a comma after measurement - if equals > 0 && commas-1 != equals-1 { - return i, buf[start:i], fmt.Errorf("invalid tag format") - } - - // This check makes sure we actually received fields from the user. #3379 - // This will catch invalid syntax such as: `cpu,host=serverA,region=us-west` - if i >= len(buf) { - return i, buf[start:i], fmt.Errorf("missing fields") - } - - // Now we know where the key region is within buf, and the locations of tags, we - // need to deterimine if duplicate tags exist and if the tags are sorted. This iterates - // 1/2 of the list comparing each end with each other, walking towards the center from - // both sides. - for j := 0; j < commas/2; j++ { - // get the left and right tags - _, left := scanTo(buf[indices[j]:indices[j+1]-1], 0, '=') - _, right := scanTo(buf[indices[commas-j-1]:indices[commas-j]-1], 0, '=') - - // If the tags are equal, then there are duplicate tags, and we should abort - if bytes.Equal(left, right) { - return i, buf[start:i], fmt.Errorf("duplicate tags") - } - - // If left is greater than right, the tags are not sorted. We must continue - // since their could be duplicate tags still. - if bytes.Compare(left, right) > 0 { - sorted = false - } - } - - // If the tags are not sorted, then sort them. This sort is inline and - // uses the tag indices we created earlier. The actual buffer is not sorted, the - // indices are using the buffer for value comparison. After the indices are sorted, - // the buffer is reconstructed from the sorted indices. - if !sorted && commas > 0 { - // Get the measurement name for later - measurement := buf[start : indices[0]-1] - - // Sort the indices - indices := indices[:commas] - insertionSort(0, commas, buf, indices) - - // Create a new key using the measurement and sorted indices - b := make([]byte, len(buf[start:i])) - pos := copy(b, measurement) - for _, i := range indices { - b[pos] = ',' - pos += 1 - _, v := scanToSpaceOr(buf, i, ',') - pos += copy(b[pos:], v) - } - - return i, b, nil - } - - return i, buf[start:i], nil -} - -func insertionSort(l, r int, buf []byte, indices []int) { - for i := l + 1; i < r; i++ { - for j := i; j > l && less(buf, indices, j, j-1); j-- { - indices[j], indices[j-1] = indices[j-1], indices[j] - } - } -} - -func less(buf []byte, indices []int, i, j int) bool { - // This grabs the tag names for i & j, it ignores the values - _, a := scanTo(buf, indices[i], '=') - _, b := scanTo(buf, indices[j], '=') - return bytes.Compare(a, b) < 0 -} - -func isFieldEscapeChar(b byte) bool { - for c := range escapeCodes { - if c == b { - return true - } - } - return false -} - -// scanFields scans buf, starting at i for the fields section of a point. It returns -// the ending position and the byte slice of the fields within buf -func scanFields(buf []byte, i int) (int, []byte, error) { - start := skipWhitespace(buf, i) - i = start - quoted := false - - // tracks how many '=' we've seen - equals := 0 - - // tracks how many commas we've seen - commas := 0 - - for { - // reached the end of buf? - if i >= len(buf) { - break - } - - // escaped characters? - if buf[i] == '\\' && i+1 < len(buf) { - - // Is this an escape char within a string field? Only " and \ are allowed. - if quoted && (buf[i+1] == '"' || buf[i+1] == '\\') { - i += 2 - continue - // Non-string field escaped chars - } else if !quoted && isFieldEscapeChar(buf[i+1]) { - i += 2 - continue - } - } - - // If the value is quoted, scan until we get to the end quote - if buf[i] == '"' { - quoted = !quoted - i += 1 - continue - } - - // If we see an =, ensure that there is at least on char before and after it - if buf[i] == '=' && !quoted { - equals += 1 - - // check for "... =123" but allow "a\ =123" - if buf[i-1] == ' ' && buf[i-2] != '\\' { - return i, buf[start:i], fmt.Errorf("missing field name") - } - - // check for "...a=123,=456" but allow "a=123,a\,=456" - if buf[i-1] == ',' && buf[i-2] != '\\' { - return i, buf[start:i], fmt.Errorf("missing field name") - } - - // check for "... value=" - if i+1 >= len(buf) { - return i, buf[start:i], fmt.Errorf("missing field value") - } - - // check for "... value=,value2=..." - if buf[i+1] == ',' || buf[i+1] == ' ' { - return i, buf[start:i], fmt.Errorf("missing field value") - } - - if isNumeric(buf[i+1]) || buf[i+1] == '-' || buf[i+1] == 'N' || buf[i+1] == 'n' { - var err error - i, err = scanNumber(buf, i+1) - if err != nil { - return i, buf[start:i], err - } - continue - } - // If next byte is not a double-quote, the value must be a boolean - if buf[i+1] != '"' { - var err error - i, _, err = scanBoolean(buf, i+1) - if err != nil { - return i, buf[start:i], err - } - continue - } - } - - if buf[i] == ',' && !quoted { - commas += 1 - } - - // reached end of block? - if buf[i] == ' ' && !quoted { - break - } - i += 1 - } - - if quoted { - return i, buf[start:i], fmt.Errorf("unbalanced quotes") - } - - // check that all field sections had key and values (e.g. prevent "a=1,b" - if equals == 0 || commas != equals-1 { - return i, buf[start:i], fmt.Errorf("invalid field format") - } - - return i, buf[start:i], nil -} - -// scanTime scans buf, starting at i for the time section of a point. It returns -// the ending position and the byte slice of the fields within buf and error if the -// timestamp is not in the correct numeric format -func scanTime(buf []byte, i int) (int, []byte, error) { - start := skipWhitespace(buf, i) - i = start - for { - // reached the end of buf? - if i >= len(buf) { - break - } - - // Timestamps should integers, make sure they are so we don't need to actually - // parse the timestamp until needed - if buf[i] < '0' || buf[i] > '9' { - return i, buf[start:i], fmt.Errorf("bad timestamp") - } - - // reached end of block? - if buf[i] == '\n' { - break - } - i += 1 - } - return i, buf[start:i], nil -} - -func isNumeric(b byte) bool { - return (b >= '0' && b <= '9') || b == '.' -} - -// scanNumber returns the end position within buf, start at i after -// scanning over buf for an integer, or float. It returns an -// error if a invalid number is scanned. -func scanNumber(buf []byte, i int) (int, error) { - start := i - var isInt bool - - // Is negative number? - if i < len(buf) && buf[i] == '-' { - i += 1 - } - - // how many decimal points we've see - decimals := 0 - - // indicates the number is float in scientific notation - scientific := false - - for { - if i >= len(buf) { - break - } - - if buf[i] == ',' || buf[i] == ' ' { - break - } - - if buf[i] == 'i' && i > start && !isInt { - isInt = true - i += 1 - continue - } - - if buf[i] == '.' { - decimals += 1 - } - - // Can't have more than 1 decimal (e.g. 1.1.1 should fail) - if decimals > 1 { - return i, fmt.Errorf("invalid number") - } - - // `e` is valid for floats but not as the first char - if i > start && (buf[i] == 'e') { - scientific = true - i += 1 - continue - } - - // + and - are only valid at this point if they follow an e (scientific notation) - if (buf[i] == '+' || buf[i] == '-') && buf[i-1] == 'e' { - i += 1 - continue - } - - // NaN is a valid float - if i+2 < len(buf) && (buf[i] == 'N' || buf[i] == 'n') { - if (buf[i+1] == 'a' || buf[i+1] == 'A') && (buf[i+2] == 'N' || buf[i+2] == 'n') { - i += 3 - continue - } - return i, fmt.Errorf("invalid number") - } - if !isNumeric(buf[i]) { - return i, fmt.Errorf("invalid number") - } - i += 1 - } - if isInt && (decimals > 0 || scientific) { - return i, fmt.Errorf("invalid number") - } - - // It's more common that numbers will be within min/max range for their type but we need to prevent - // out or range numbers from being parsed successfully. This uses some simple heuristics to decide - // if we should parse the number to the actual type. It does not do it all the time because it incurs - // extra allocations and we end up converting the type again when writing points to disk. - if isInt { - // Make sure the last char is an 'i' for integers (e.g. 9i10 is not valid) - if buf[i-1] != 'i' { - return i, fmt.Errorf("invalid number") - } - // Parse the int to check bounds the number of digits could be larger than the max range - // We subtract 1 from the index to remove the `i` from our tests - if len(buf[start:i-1]) >= maxInt64Digits || len(buf[start:i-1]) >= minInt64Digits { - if _, err := strconv.ParseInt(string(buf[start:i-1]), 10, 64); err != nil { - return i, fmt.Errorf("unable to parse integer %s: %s", buf[start:i-1], err) - } - } - } else { - // Parse the float to check bounds if it's scientific or the number of digits could be larger than the max range - if scientific || len(buf[start:i]) >= maxFloat64Digits || len(buf[start:i]) >= minFloat64Digits { - if _, err := strconv.ParseFloat(string(buf[start:i]), 10); err != nil { - return i, fmt.Errorf("invalid float") - } - } - } - - return i, nil -} - -// scanBoolean returns the end position within buf, start at i after -// scanning over buf for boolean. Valid values for a boolean are -// t, T, true, TRUE, f, F, false, FALSE. It returns an error if a invalid boolean -// is scanned. -func scanBoolean(buf []byte, i int) (int, []byte, error) { - start := i - - if i < len(buf) && (buf[i] != 't' && buf[i] != 'f' && buf[i] != 'T' && buf[i] != 'F') { - return i, buf[start:i], fmt.Errorf("invalid boolean") - } - - i += 1 - for { - if i >= len(buf) { - break - } - - if buf[i] == ',' || buf[i] == ' ' { - break - } - i += 1 - } - - // Single char bool (t, T, f, F) is ok - if i-start == 1 { - return i, buf[start:i], nil - } - - // length must be 4 for true or TRUE - if (buf[start] == 't' || buf[start] == 'T') && i-start != 4 { - return i, buf[start:i], fmt.Errorf("invalid boolean") - } - - // length must be 5 for false or FALSE - if (buf[start] == 'f' || buf[start] == 'F') && i-start != 5 { - return i, buf[start:i], fmt.Errorf("invalid boolean") - } - - // Otherwise - valid := false - switch buf[start] { - case 't': - valid = bytes.Equal(buf[start:i], []byte("true")) - case 'f': - valid = bytes.Equal(buf[start:i], []byte("false")) - case 'T': - valid = bytes.Equal(buf[start:i], []byte("TRUE")) || bytes.Equal(buf[start:i], []byte("True")) - case 'F': - valid = bytes.Equal(buf[start:i], []byte("FALSE")) || bytes.Equal(buf[start:i], []byte("False")) - } - - if !valid { - return i, buf[start:i], fmt.Errorf("invalid boolean") - } - - return i, buf[start:i], nil - -} - -// skipWhitespace returns the end position within buf, starting at i after -// scanning over spaces in tags -func skipWhitespace(buf []byte, i int) int { - for { - if i >= len(buf) { - return i - } - - if buf[i] == ' ' || buf[i] == '\t' { - i += 1 - continue - } - break - } - return i -} - -// scanLine returns the end position in buf and the next line found within -// buf. -func scanLine(buf []byte, i int) (int, []byte) { - start := i - quoted := false - fields := false - for { - // reached the end of buf? - if i >= len(buf) { - break - } - - if buf[i] == ' ' { - fields = true - } - - // If we see a double quote, makes sure it is not escaped - if fields && buf[i] == '"' && (i-1 > 0 && buf[i-1] != '\\') { - i += 1 - quoted = !quoted - continue - } - - if buf[i] == '\n' && !quoted { - break - } - - i += 1 - } - - return i, buf[start:i] -} - -// scanTo returns the end position in buf and the next consecutive block -// of bytes, starting from i and ending with stop byte. If there are leading -// spaces or escaped chars, they are skipped. -func scanTo(buf []byte, i int, stop byte) (int, []byte) { - start := i - for { - // reached the end of buf? - if i >= len(buf) { - break - } - - if buf[i] == '\\' { - i += 2 - continue - } - - // reached end of block? - if buf[i] == stop { - break - } - i += 1 - } - - return i, buf[start:i] -} - -// scanTo returns the end position in buf and the next consecutive block -// of bytes, starting from i and ending with stop byte. If there are leading -// spaces, they are skipped. -func scanToSpaceOr(buf []byte, i int, stop byte) (int, []byte) { - start := i - for { - // reached the end of buf? - if i >= len(buf) { - break - } - - if buf[i] == '\\' { - i += 2 - continue - } - // reached end of block? - if buf[i] == stop || buf[i] == ' ' { - break - } - i += 1 - } - - return i, buf[start:i] -} - -func scanTagValue(buf []byte, i int) (int, []byte) { - start := i - for { - if i >= len(buf) { - break - } - - if buf[i] == '\\' { - i += 2 - continue - } - - if buf[i] == ',' { - break - } - i += 1 - } - return i, buf[start:i] -} - -func scanFieldValue(buf []byte, i int) (int, []byte) { - start := i - quoted := false - for { - if i >= len(buf) { - break - } - - // Only escape char for a field value is a double-quote - if buf[i] == '\\' && i+1 < len(buf) && buf[i+1] == '"' { - i += 2 - continue - } - - // Quoted value? (e.g. string) - if buf[i] == '"' { - i += 1 - quoted = !quoted - continue - } - - if buf[i] == ',' && !quoted { - break - } - i += 1 - } - return i, buf[start:i] -} - -func escapeMeasurement(in []byte) []byte { - for b, esc := range measurementEscapeCodes { - in = bytes.Replace(in, []byte{b}, esc, -1) - } - return in -} - -func unescapeMeasurement(in []byte) []byte { - for b, esc := range measurementEscapeCodes { - in = bytes.Replace(in, esc, []byte{b}, -1) - } - return in -} - -func escapeTag(in []byte) []byte { - for b, esc := range tagEscapeCodes { - in = bytes.Replace(in, []byte{b}, esc, -1) - } - return in -} - -func unescapeTag(in []byte) []byte { - for b, esc := range tagEscapeCodes { - in = bytes.Replace(in, esc, []byte{b}, -1) - } - return in -} - -func escape(in []byte) []byte { - for b, esc := range escapeCodes { - in = bytes.Replace(in, []byte{b}, esc, -1) - } - return in -} - -func escapeString(in string) string { - for b, esc := range escapeCodesStr { - in = strings.Replace(in, b, esc, -1) - } - return in -} - -func unescape(in []byte) []byte { - i := 0 - inLen := len(in) - var out []byte - - for { - if i >= inLen { - break - } - if in[i] == '\\' && i+1 < inLen { - switch in[i+1] { - case ',': - out = append(out, ',') - i += 2 - continue - case '"': - out = append(out, '"') - i += 2 - continue - case ' ': - out = append(out, ' ') - i += 2 - continue - case '=': - out = append(out, '=') - i += 2 - continue - } - } - out = append(out, in[i]) - i += 1 - } - return out -} - -func unescapeString(in string) string { - for b, esc := range escapeCodesStr { - in = strings.Replace(in, esc, b, -1) - } - return in -} - -// escapeStringField returns a copy of in with any double quotes or -// backslashes with escaped values -func escapeStringField(in string) string { - var out []byte - i := 0 - for { - if i >= len(in) { - break - } - // escape double-quotes - if in[i] == '\\' { - out = append(out, '\\') - out = append(out, '\\') - i += 1 - continue - } - // escape double-quotes - if in[i] == '"' { - out = append(out, '\\') - out = append(out, '"') - i += 1 - continue - } - out = append(out, in[i]) - i += 1 - - } - return string(out) -} - -// unescapeStringField returns a copy of in with any escaped double-quotes -// or backslashes unescaped -func unescapeStringField(in string) string { - var out []byte - i := 0 - for { - if i >= len(in) { - break - } - // unescape backslashes - if in[i] == '\\' && i+1 < len(in) && in[i+1] == '\\' { - out = append(out, '\\') - i += 2 - continue - } - // unescape double-quotes - if in[i] == '\\' && i+1 < len(in) && in[i+1] == '"' { - out = append(out, '"') - i += 2 - continue - } - out = append(out, in[i]) - i += 1 - - } - return string(out) -} - -// NewPoint returns a new point with the given measurement name, tags, fields and timestamp -func NewPoint(name string, tags Tags, fields Fields, time time.Time) Point { - return &point{ - key: MakeKey([]byte(name), tags), - time: time, - fields: fields.MarshalBinary(), - } -} - -func (p *point) Data() []byte { - return p.data -} - -func (p *point) SetData(b []byte) { - p.data = b -} - -func (p *point) Key() []byte { - return p.key -} - -func (p *point) name() []byte { - _, name := scanTo(p.key, 0, ',') - return name -} - -// Name return the measurement name for the point -func (p *point) Name() string { - if p.cachedName != "" { - return p.cachedName - } - p.cachedName = string(unescape(p.name())) - return p.cachedName -} - -// SetName updates the measurement name for the point -func (p *point) SetName(name string) { - p.cachedName = "" - p.key = MakeKey([]byte(name), p.Tags()) -} - -// Time return the timestamp for the point -func (p *point) Time() time.Time { - return p.time -} - -// SetTime updates the timestamp for the point -func (p *point) SetTime(t time.Time) { - p.time = t -} - -// Tags returns the tag set for the point -func (p *point) Tags() Tags { - tags := map[string]string{} - - if len(p.key) != 0 { - pos, name := scanTo(p.key, 0, ',') - - // it's an empyt key, so there are no tags - if len(name) == 0 { - return tags - } - - i := pos + 1 - var key, value []byte - for { - if i >= len(p.key) { - break - } - i, key = scanTo(p.key, i, '=') - i, value = scanTagValue(p.key, i+1) - - tags[string(unescapeTag(key))] = string(unescapeTag(value)) - - i += 1 - } - } - return tags -} - -func MakeKey(name []byte, tags Tags) []byte { - // unescape the name and then re-escape it to avoid double escaping. - // The key should always be stored in escaped form. - return append(escapeMeasurement(unescapeMeasurement(name)), tags.HashKey()...) -} - -// SetTags replaces the tags for the point -func (p *point) SetTags(tags Tags) { - p.key = MakeKey([]byte(p.Name()), tags) -} - -// AddTag adds or replaces a tag value for a point -func (p *point) AddTag(key, value string) { - tags := p.Tags() - tags[key] = value - p.key = MakeKey([]byte(p.Name()), tags) -} - -// Fields returns the fields for the point -func (p *point) Fields() Fields { - if p.cachedFields != nil { - return p.cachedFields - } - p.cachedFields = p.unmarshalBinary() - return p.cachedFields -} - -// AddField adds or replaces a field value for a point -func (p *point) AddField(name string, value interface{}) { - fields := p.Fields() - fields[name] = value - p.fields = fields.MarshalBinary() - p.cachedFields = nil -} - -// SetPrecision will round a time to the specified precision -func (p *point) SetPrecision(precision string) { - switch precision { - case "n": - case "u": - p.SetTime(p.Time().Truncate(time.Microsecond)) - case "ms": - p.SetTime(p.Time().Truncate(time.Millisecond)) - case "s": - p.SetTime(p.Time().Truncate(time.Second)) - case "m": - p.SetTime(p.Time().Truncate(time.Minute)) - case "h": - p.SetTime(p.Time().Truncate(time.Hour)) - } -} - -// GetPrecisionMultiplier will return a multiplier for the precision specified -func (p *point) GetPrecisionMultiplier(precision string) int64 { - d := time.Nanosecond - switch precision { - case "u": - d = time.Microsecond - case "ms": - d = time.Millisecond - case "s": - d = time.Second - case "m": - d = time.Minute - case "h": - d = time.Hour - } - return int64(d) -} - -func (p *point) String() string { - if p.Time().IsZero() { - return fmt.Sprintf("%s %s", p.Key(), string(p.fields)) - } - return fmt.Sprintf("%s %s %d", p.Key(), string(p.fields), p.UnixNano()) -} - -func (p *point) unmarshalBinary() Fields { - return newFieldsFromBinary(p.fields) -} - -func (p *point) HashID() uint64 { - h := fnv.New64a() - h.Write(p.key) - sum := h.Sum64() - return sum -} - -func (p *point) UnixNano() int64 { - return p.Time().UnixNano() -} - -type Tags map[string]string - -func (t Tags) HashKey() []byte { - // Empty maps marshal to empty bytes. - if len(t) == 0 { - return nil - } - - escaped := Tags{} - for k, v := range t { - ek := escapeTag([]byte(k)) - ev := escapeTag([]byte(v)) - escaped[string(ek)] = string(ev) - } - - // Extract keys and determine final size. - sz := len(escaped) + (len(escaped) * 2) // separators - keys := make([]string, len(escaped)+1) - i := 0 - for k, v := range escaped { - keys[i] = k - i += 1 - sz += len(k) + len(v) - } - keys = keys[:i] - sort.Strings(keys) - // Generate marshaled bytes. - b := make([]byte, sz) - buf := b - idx := 0 - for _, k := range keys { - buf[idx] = ',' - idx += 1 - copy(buf[idx:idx+len(k)], k) - idx += len(k) - buf[idx] = '=' - idx += 1 - v := escaped[k] - copy(buf[idx:idx+len(v)], v) - idx += len(v) - } - return b[:idx] -} - -type Fields map[string]interface{} - -func parseNumber(val []byte) (interface{}, error) { - if val[len(val)-1] == 'i' { - val = val[:len(val)-1] - return strconv.ParseInt(string(val), 10, 64) - } - for i := 0; i < len(val); i++ { - // If there is a decimal or an N (NaN), I (Inf), parse as float - if val[i] == '.' || val[i] == 'N' || val[i] == 'n' || val[i] == 'I' || val[i] == 'i' || val[i] == 'e' { - return strconv.ParseFloat(string(val), 64) - } - if val[i] < '0' && val[i] > '9' { - return string(val), nil - } - } - return strconv.ParseFloat(string(val), 64) -} - -func newFieldsFromBinary(buf []byte) Fields { - fields := Fields{} - var ( - i int - name, valueBuf []byte - value interface{} - err error - ) - for { - if i >= len(buf) { - break - } - - i, name = scanTo(buf, i, '=') - if len(name) == 0 { - continue - } - name = unescape(name) - - i, valueBuf = scanFieldValue(buf, i+1) - if len(valueBuf) == 0 { - fields[string(name)] = nil - continue - } - - // If the first char is a double-quote, then unmarshal as string - if valueBuf[0] == '"' { - value = unescapeStringField(string(valueBuf[1 : len(valueBuf)-1])) - // Check for numeric characters and special NaN or Inf - } else if (valueBuf[0] >= '0' && valueBuf[0] <= '9') || valueBuf[0] == '-' || valueBuf[0] == '+' || valueBuf[0] == '.' || - valueBuf[0] == 'N' || valueBuf[0] == 'n' || // NaN - valueBuf[0] == 'I' || valueBuf[0] == 'i' { // Inf - - value, err = parseNumber(valueBuf) - if err != nil { - panic(fmt.Sprintf("unable to parse number value '%v': %v", string(valueBuf), err)) - } - - // Otherwise parse it as bool - } else { - value, err = strconv.ParseBool(string(valueBuf)) - if err != nil { - panic(fmt.Sprintf("unable to parse bool value '%v': %v\n", string(valueBuf), err)) - } - } - fields[string(name)] = value - i += 1 - } - return fields -} - -// MarshalBinary encodes all the fields to their proper type and returns the binary -// represenation -// NOTE: uint64 is specifically not supported due to potential overflow when we decode -// again later to an int64 -func (p Fields) MarshalBinary() []byte { - b := []byte{} - keys := make([]string, len(p)) - i := 0 - for k, _ := range p { - keys[i] = k - i += 1 - } - sort.Strings(keys) - - for _, k := range keys { - v := p[k] - b = append(b, []byte(escapeString(k))...) - b = append(b, '=') - switch t := v.(type) { - case int: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case int8: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case int16: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case int32: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case int64: - b = append(b, []byte(strconv.FormatInt(t, 10))...) - b = append(b, 'i') - case uint: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case uint8: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case uint16: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case uint32: - b = append(b, []byte(strconv.FormatInt(int64(t), 10))...) - b = append(b, 'i') - case float32: - val := []byte(strconv.FormatFloat(float64(t), 'f', -1, 32)) - b = append(b, val...) - case float64: - val := []byte(strconv.FormatFloat(t, 'f', -1, 64)) - b = append(b, val...) - case bool: - b = append(b, []byte(strconv.FormatBool(t))...) - case []byte: - b = append(b, t...) - case string: - b = append(b, '"') - b = append(b, []byte(escapeStringField(t))...) - b = append(b, '"') - case nil: - // skip - default: - // Can't determine the type, so convert to string - b = append(b, '"') - b = append(b, []byte(escapeStringField(fmt.Sprintf("%v", v)))...) - b = append(b, '"') - - } - b = append(b, ',') - } - if len(b) > 0 { - return b[0 : len(b)-1] - } - return b -} - -type indexedSlice struct { - indices []int - b []byte -} - -func (s *indexedSlice) Less(i, j int) bool { - _, a := scanTo(s.b, s.indices[i], '=') - _, b := scanTo(s.b, s.indices[j], '=') - return bytes.Compare(a, b) < 0 -} - -func (s *indexedSlice) Swap(i, j int) { - s.indices[i], s.indices[j] = s.indices[j], s.indices[i] -} - -func (s *indexedSlice) Len() int { - return len(s.indices) -} diff --git a/vendor/vendor.json b/vendor/vendor.json index c33c029cb..7858b09d8 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -445,14 +445,19 @@ "revisionTime": "2016-10-07T00:41:22Z" }, { - "path": "github.com/influxdb/influxdb/client", - "revision": "291aaeb9485b43b16875c238482b2f7d0a22a13b", - "revisionTime": "2015-09-16T14:41:53+02:00" + "path": "github.com/influxdata/influxdb/client/v2", + "revision": "15e594fc09f112cb696c084a20beaca25538a5fa", + "revisionTime": "2017-03-31T16:09:02-05:00" }, { - "path": "github.com/influxdb/influxdb/tsdb", - "revision": "291aaeb9485b43b16875c238482b2f7d0a22a13b", - "revisionTime": "2015-09-16T14:41:53+02:00" + "path": "github.com/influxdata/influxdb/models", + "revision": "15e594fc09f112cb696c084a20beaca25538a5fa", + "revisionTime": "2017-03-31T16:09:02-05:00" + }, + { + "path": "github.com/influxdata/influxdb/pkg/escape", + "revision": "15e594fc09f112cb696c084a20beaca25538a5fa", + "revisionTime": "2017-03-31T16:09:02-05:00" }, { "checksumSHA1": "0ZrwvB6KoGPj2PoDNSEJwxQ6Mog=",