diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index 1a29215b5..19fa01a29 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -206,6 +206,11 @@ func init() { &cfg.remote.InfluxdbDatabase, "storage.remote.influxdb.database", "prometheus", "The name of the database to use for storing samples in InfluxDB.", ) + cfg.fs.StringVar( + &cfg.remote.GenericAddress, "storage.remote.generic-address", "", + "The address of the generic remote server to send samples to via gRPC. None, if empty.", + ) + cfg.fs.DurationVar( &cfg.remote.StorageTimeout, "storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to the remote storage.", diff --git a/documentation/examples/remote_storage_generic/README.md b/documentation/examples/remote_storage_generic/README.md new file mode 100644 index 000000000..6c97de380 --- /dev/null +++ b/documentation/examples/remote_storage_generic/README.md @@ -0,0 +1,17 @@ +## Generic Remote Storage Example + +This is a simple example of how to write a server to +recieve samples from the generic remote storage output. + +To use it: + +``` +go build +remote_storage_generic +``` + +and then run Prometheus as: + +``` +./prometheus -storage.remote.generic-url http://localhost:1234/remote +``` diff --git a/documentation/examples/remote_storage_generic/server.go b/documentation/examples/remote_storage_generic/server.go new file mode 100644 index 000000000..7ae2f31fe --- /dev/null +++ b/documentation/examples/remote_storage_generic/server.go @@ -0,0 +1,53 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "fmt" + "log" + "net" + + "golang.org/x/net/context" + "google.golang.org/grpc" + + "github.com/prometheus/prometheus/storage/remote/generic" +) + +type server struct{} + +func (server *server) Write(ctx context.Context, req *generic.GenericWriteRequest) (*generic.GenericWriteResponse, error) { + for _, ts := range req.Timeseries { + fmt.Printf("%s", ts.Name) + for _, l := range ts.Labels { + fmt.Printf(" %s=%s", l.Name, l.Value) + } + fmt.Printf("\n") + + for _, s := range ts.Samples { + fmt.Printf(" %f %d\n", s.Value, s.TimestampMs) + } + } + + return &generic.GenericWriteResponse{}, nil +} + +func main() { + lis, err := net.Listen("tcp", ":1234") + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + s := grpc.NewServer() + generic.RegisterGenericWriteServer(s, &server{}) + s.Serve(lis) +} diff --git a/storage/remote/generic/generic.go b/storage/remote/generic/generic.go new file mode 100644 index 000000000..d9458a3f3 --- /dev/null +++ b/storage/remote/generic/generic.go @@ -0,0 +1,77 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package generic + +import ( + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + + "github.com/prometheus/common/model" +) + +// Client allows sending batches of Prometheus samples to a http endpoint. +type Client struct { + conn *grpc.ClientConn + timeout time.Duration +} + +// NewClient creates a new Client. +func NewClient(address string, timeout time.Duration) *Client { + // TODO: Do something with this error. + conn, _ := grpc.Dial(address, grpc.WithInsecure()) + return &Client{ + conn: conn, + timeout: timeout, + } +} + +// Store sends a batch of samples to the http endpoint. +func (c *Client) Store(samples model.Samples) error { + req := &GenericWriteRequest{} + for _, s := range samples { + ts := &TimeSeries{ + Name: string(s.Metric[model.MetricNameLabel]), + } + for k, v := range s.Metric { + if k != model.MetricNameLabel { + ts.Labels = append(ts.Labels, + &LabelPair{ + Name: string(k), + Value: string(v), + }) + } + } + ts.Samples = []*Sample{ + &Sample{ + Value: float64(s.Value), + TimestampMs: int64(s.Timestamp), + }, + } + req.Timeseries = append(req.Timeseries, ts) + } + client := NewGenericWriteClient(c.conn) + ctxt, _ := context.WithTimeout(context.Background(), c.timeout) + _, err := client.Write(ctxt, req) + if err != nil { + return err + } + return nil +} + +// Name identifies the client as a genric client. +func (c Client) Name() string { + return "generic" +} diff --git a/storage/remote/generic/generic.pb.go b/storage/remote/generic/generic.pb.go new file mode 100644 index 000000000..628fb52e7 --- /dev/null +++ b/storage/remote/generic/generic.pb.go @@ -0,0 +1,211 @@ +// Code generated by protoc-gen-go. +// source: generic.proto +// DO NOT EDIT! + +/* +Package generic is a generated protocol buffer package. + +It is generated from these files: + generic.proto + +It has these top-level messages: + Sample + LabelPair + TimeSeries + GenericWriteRequest + GenericWriteResponse +*/ +package generic + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Sample struct { + Value float64 `protobuf:"fixed64,1,opt,name=value" json:"value,omitempty"` + TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs" json:"timestamp_ms,omitempty"` +} + +func (m *Sample) Reset() { *m = Sample{} } +func (m *Sample) String() string { return proto.CompactTextString(m) } +func (*Sample) ProtoMessage() {} +func (*Sample) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type LabelPair struct { + Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` +} + +func (m *LabelPair) Reset() { *m = LabelPair{} } +func (m *LabelPair) String() string { return proto.CompactTextString(m) } +func (*LabelPair) ProtoMessage() {} +func (*LabelPair) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +type TimeSeries struct { + Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` + Labels []*LabelPair `protobuf:"bytes,2,rep,name=labels" json:"labels,omitempty"` + // Sorted by time, oldest sample first. + Samples []*Sample `protobuf:"bytes,3,rep,name=samples" json:"samples,omitempty"` +} + +func (m *TimeSeries) Reset() { *m = TimeSeries{} } +func (m *TimeSeries) String() string { return proto.CompactTextString(m) } +func (*TimeSeries) ProtoMessage() {} +func (*TimeSeries) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *TimeSeries) GetLabels() []*LabelPair { + if m != nil { + return m.Labels + } + return nil +} + +func (m *TimeSeries) GetSamples() []*Sample { + if m != nil { + return m.Samples + } + return nil +} + +type GenericWriteRequest struct { + Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"` +} + +func (m *GenericWriteRequest) Reset() { *m = GenericWriteRequest{} } +func (m *GenericWriteRequest) String() string { return proto.CompactTextString(m) } +func (*GenericWriteRequest) ProtoMessage() {} +func (*GenericWriteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *GenericWriteRequest) GetTimeseries() []*TimeSeries { + if m != nil { + return m.Timeseries + } + return nil +} + +type GenericWriteResponse struct { +} + +func (m *GenericWriteResponse) Reset() { *m = GenericWriteResponse{} } +func (m *GenericWriteResponse) String() string { return proto.CompactTextString(m) } +func (*GenericWriteResponse) ProtoMessage() {} +func (*GenericWriteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func init() { + proto.RegisterType((*Sample)(nil), "generic.Sample") + proto.RegisterType((*LabelPair)(nil), "generic.LabelPair") + proto.RegisterType((*TimeSeries)(nil), "generic.TimeSeries") + proto.RegisterType((*GenericWriteRequest)(nil), "generic.GenericWriteRequest") + proto.RegisterType((*GenericWriteResponse)(nil), "generic.GenericWriteResponse") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion3 + +// Client API for GenericWrite service + +type GenericWriteClient interface { + Write(ctx context.Context, in *GenericWriteRequest, opts ...grpc.CallOption) (*GenericWriteResponse, error) +} + +type genericWriteClient struct { + cc *grpc.ClientConn +} + +func NewGenericWriteClient(cc *grpc.ClientConn) GenericWriteClient { + return &genericWriteClient{cc} +} + +func (c *genericWriteClient) Write(ctx context.Context, in *GenericWriteRequest, opts ...grpc.CallOption) (*GenericWriteResponse, error) { + out := new(GenericWriteResponse) + err := grpc.Invoke(ctx, "/generic.GenericWrite/Write", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for GenericWrite service + +type GenericWriteServer interface { + Write(context.Context, *GenericWriteRequest) (*GenericWriteResponse, error) +} + +func RegisterGenericWriteServer(s *grpc.Server, srv GenericWriteServer) { + s.RegisterService(&_GenericWrite_serviceDesc, srv) +} + +func _GenericWrite_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GenericWriteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(GenericWriteServer).Write(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/generic.GenericWrite/Write", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(GenericWriteServer).Write(ctx, req.(*GenericWriteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _GenericWrite_serviceDesc = grpc.ServiceDesc{ + ServiceName: "generic.GenericWrite", + HandlerType: (*GenericWriteServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Write", + Handler: _GenericWrite_Write_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: fileDescriptor0, +} + +func init() { proto.RegisterFile("generic.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 264 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x91, 0x4f, 0x4b, 0xc3, 0x40, + 0x10, 0xc5, 0x4d, 0x63, 0x53, 0xfa, 0x5a, 0x11, 0xa6, 0x45, 0x82, 0x28, 0xe8, 0x9e, 0xaa, 0x87, + 0x1e, 0x5a, 0xfc, 0x00, 0x5e, 0x14, 0x44, 0x41, 0xb6, 0xa2, 0x47, 0xd9, 0xca, 0x20, 0x81, 0xfc, + 0x33, 0xbb, 0xd5, 0xaf, 0xef, 0x66, 0xb7, 0xdd, 0x56, 0xa8, 0xb7, 0x9d, 0x79, 0x2f, 0xbf, 0x79, + 0x8f, 0xe0, 0xe8, 0x93, 0x4b, 0x6e, 0xb2, 0x8f, 0x69, 0xdd, 0x54, 0xa6, 0xa2, 0xde, 0x7a, 0x14, + 0xb7, 0x48, 0x16, 0xaa, 0xa8, 0x73, 0xa6, 0x31, 0xba, 0xdf, 0x2a, 0x5f, 0x71, 0x1a, 0x5d, 0x44, + 0x93, 0x48, 0xfa, 0x81, 0x2e, 0x31, 0x34, 0x59, 0xc1, 0xda, 0x58, 0xd3, 0x7b, 0xa1, 0xd3, 0x8e, + 0x15, 0x63, 0x39, 0x08, 0xbb, 0x27, 0x2d, 0x6e, 0xd0, 0x7f, 0x54, 0x4b, 0xce, 0x9f, 0x55, 0xd6, + 0x10, 0xe1, 0xb0, 0x54, 0x85, 0x87, 0xf4, 0xa5, 0x7b, 0x6f, 0xc9, 0x1d, 0xb7, 0xf4, 0x83, 0xf8, + 0x01, 0x5e, 0x2c, 0x65, 0x61, 0x63, 0xb0, 0xde, 0xfb, 0xdd, 0x35, 0x92, 0xbc, 0x05, 0xb7, 0x57, + 0xe3, 0xc9, 0x60, 0x46, 0xd3, 0x4d, 0x89, 0x70, 0x4f, 0xae, 0x1d, 0x74, 0x85, 0x9e, 0x76, 0x3d, + 0x74, 0x1a, 0x3b, 0xf3, 0x71, 0x30, 0xfb, 0x7e, 0x72, 0xa3, 0x8b, 0x07, 0x8c, 0xee, 0xbd, 0xf4, + 0xd6, 0x64, 0x86, 0x25, 0x7f, 0xad, 0x6c, 0x17, 0x9a, 0x03, 0xae, 0x95, 0xcb, 0x63, 0x73, 0xb4, + 0x90, 0x51, 0x80, 0x6c, 0xa3, 0xca, 0x1d, 0x9b, 0x38, 0xc1, 0xf8, 0x2f, 0x4b, 0xd7, 0x55, 0xa9, + 0x79, 0xf6, 0x8a, 0xe1, 0xee, 0x9e, 0xee, 0xd0, 0xf5, 0x8f, 0xb3, 0x40, 0xdc, 0x93, 0xe1, 0xf4, + 0xfc, 0x1f, 0xd5, 0x53, 0xc5, 0xc1, 0x32, 0x71, 0xbf, 0x6f, 0xfe, 0x1b, 0x00, 0x00, 0xff, 0xff, + 0x40, 0x1e, 0x0f, 0xf8, 0xcf, 0x01, 0x00, 0x00, +} diff --git a/storage/remote/generic/generic.proto b/storage/remote/generic/generic.proto new file mode 100644 index 000000000..9c92d55dc --- /dev/null +++ b/storage/remote/generic/generic.proto @@ -0,0 +1,45 @@ +// Copyright 2016 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package generic; + + +message Sample { + double value = 1; + int64 timestamp_ms = 2; +} + +message LabelPair { + string name = 1; + string value = 2; +} + +message TimeSeries { + string name = 1; + repeated LabelPair labels = 2; + // Sorted by time, oldest sample first. + repeated Sample samples = 3; +} + +message GenericWriteRequest { + repeated TimeSeries timeseries = 1; +} + +message GenericWriteResponse { +} + +service GenericWrite { + rpc Write(GenericWriteRequest) returns (GenericWriteResponse) {} +} diff --git a/storage/remote/remote.go b/storage/remote/remote.go index e4b222837..5a28e7843 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -24,6 +24,7 @@ import ( influx "github.com/influxdb/influxdb/client" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/storage/remote/generic" "github.com/prometheus/prometheus/storage/remote/graphite" "github.com/prometheus/prometheus/storage/remote/influxdb" "github.com/prometheus/prometheus/storage/remote/opentsdb" @@ -69,6 +70,10 @@ func New(o *Options) *Storage { prometheus.MustRegister(c) s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) } + if o.GenericAddress != "" { + c := generic.NewClient(o.GenericAddress, o.StorageTimeout) + s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) + } if len(s.queues) == 0 { return nil } @@ -87,6 +92,7 @@ type Options struct { GraphiteAddress string GraphiteTransport string GraphitePrefix string + GenericAddress string } // Run starts the background processing of the storage queues.