diff --git a/.gitignore b/.gitignore index 1d9ef55b1..2fe870e94 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,5 @@ benchmark.txt !/circle.yml !/.travis.yml !/.promu.yml +/documentation/examples/remote_storage/remote_storage_adapter/remote_storage_adapter +/documentation/examples/remote_storage/example_write_adapter/example_writer_adapter diff --git a/documentation/examples/remote_storage/example_write_adapter/server.go b/documentation/examples/remote_storage/example_write_adapter/server.go index 54a09321c..38cf0dff9 100644 --- a/documentation/examples/remote_storage/example_write_adapter/server.go +++ b/documentation/examples/remote_storage/example_write_adapter/server.go @@ -27,7 +27,13 @@ import ( func main() { http.HandleFunc("/receive", func(w http.ResponseWriter, r *http.Request) { - reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body)) + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + reqBuf, err := snappy.Decode(nil, compressed) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return diff --git a/documentation/examples/remote_storage/remote_storage_adapter/main.go b/documentation/examples/remote_storage/remote_storage_adapter/main.go index 5bcf02662..87112b596 100644 --- a/documentation/examples/remote_storage/remote_storage_adapter/main.go +++ b/documentation/examples/remote_storage/remote_storage_adapter/main.go @@ -184,7 +184,13 @@ func buildClients(cfg *config) ([]writer, []reader) { func serve(addr string, writers []writer, readers []reader) error { http.HandleFunc("/write", func(w http.ResponseWriter, r *http.Request) { - reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body)) + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + reqBuf, err := snappy.Decode(nil, compressed) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -211,7 +217,13 @@ func serve(addr string, writers []writer, readers []reader) error { }) http.HandleFunc("/read", func(w http.ResponseWriter, r *http.Request) { - reqBuf, err := ioutil.ReadAll(snappy.NewReader(r.Body)) + compressed, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + reqBuf, err := snappy.Decode(nil, compressed) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -245,7 +257,10 @@ func serve(addr string, writers []writer, readers []reader) error { } w.Header().Set("Content-Type", "application/x-protobuf") - if _, err := snappy.NewWriter(w).Write(data); err != nil { + w.Header().Set("Content-Encoding", "snappy") + + compressed = snappy.Encode(nil, data) + if _, err := w.Write(compressed); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/storage/remote/client.go b/storage/remote/client.go index 98acf9633..1447a3e9f 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -94,12 +94,8 @@ func (c *Client) Store(samples model.Samples) error { return err } - buf := bytes.Buffer{} - if _, err := snappy.NewWriter(&buf).Write(data); err != nil { - return err - } - - httpReq, err := http.NewRequest("POST", c.url.String(), &buf) + compressed := snappy.Encode(nil, data) + httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewBuffer(compressed)) if err != nil { // Errors from NewRequest are from unparseable URLs, so are not // recoverable. @@ -107,7 +103,7 @@ func (c *Client) Store(samples model.Samples) error { } httpReq.Header.Add("Content-Encoding", "snappy") httpReq.Header.Set("Content-Type", "application/x-protobuf") - httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.0.1") + httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") ctx, cancel := context.WithTimeout(context.Background(), c.timeout) defer cancel() @@ -151,17 +147,14 @@ func (c *Client) Read(ctx context.Context, from, through model.Time, matchers me return nil, fmt.Errorf("unable to marshal read request: %v", err) } - buf := bytes.Buffer{} - if _, err := snappy.NewWriter(&buf).Write(data); err != nil { - return nil, err - } - - httpReq, err := http.NewRequest("POST", c.url.String(), &buf) + compressed := snappy.Encode(nil, data) + httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewBuffer(compressed)) if err != nil { return nil, fmt.Errorf("unable to create request: %v", err) } + httpReq.Header.Add("Content-Encoding", "snappy") httpReq.Header.Set("Content-Type", "application/x-protobuf") - httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.0.1") + httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") ctx, cancel := context.WithTimeout(ctx, c.timeout) defer cancel() @@ -175,12 +168,18 @@ func (c *Client) Read(ctx context.Context, from, through model.Time, matchers me return nil, fmt.Errorf("server returned HTTP status %s", httpResp.Status) } - if data, err = ioutil.ReadAll(snappy.NewReader(httpResp.Body)); err != nil { + compressed, err = ioutil.ReadAll(httpResp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response: %v", err) + } + + uncompressed, err := snappy.Decode(nil, compressed) + if err != nil { return nil, fmt.Errorf("error reading response: %v", err) } var resp ReadResponse - err = proto.Unmarshal(data, &resp) + err = proto.Unmarshal(uncompressed, &resp) if err != nil { return nil, fmt.Errorf("unable to unmarshal response body: %v", err) }