diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 3eadc36dc..df69c8866 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -369,7 +369,8 @@ func main() { var ( localStorage = &readyStorage{} - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline)) + scraper = &readyScrapeManager{} + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) @@ -414,6 +415,8 @@ func main() { }) ) + scraper.Set(scrapeManager) + cfg.web.Context = ctxWeb cfg.web.TSDBRetentionDuration = cfg.tsdb.RetentionDuration cfg.web.TSDBMaxBytes = cfg.tsdb.MaxBytes @@ -1095,6 +1098,35 @@ func (s *readyStorage) Stats(statsByLabelName string) (*tsdb.Stats, error) { return nil, tsdb.ErrNotReady } +// ErrNotReady is returned if the underlying scrape manager is not ready yet. +var ErrNotReady = errors.New("Scrape manager not ready") + +// ReadyScrapeManager allows a scrape manager to be retrieved. Even if it's set at a later point in time. +type readyScrapeManager struct { + mtx sync.RWMutex + m *scrape.Manager +} + +// Set the scrape manager. +func (rm *readyScrapeManager) Set(m *scrape.Manager) { + rm.mtx.Lock() + defer rm.mtx.Unlock() + + rm.m = m +} + +// Get the scrape manager. If is not ready, return an error. +func (rm *readyScrapeManager) Get() (*scrape.Manager, error) { + rm.mtx.RLock() + defer rm.mtx.RUnlock() + + if rm.m != nil { + return rm.m, nil + } + + return nil, ErrNotReady +} + // tsdbOptions is tsdb.Option version with defined units. // This is required as tsdb.Option fields are unit agnostic (time). type tsdbOptions struct { diff --git a/config/config.go b/config/config.go index 223c8abfe..75904f763 100644 --- a/config/config.go +++ b/config/config.go @@ -98,8 +98,9 @@ var ( // DefaultRemoteWriteConfig is the default remote write configuration. DefaultRemoteWriteConfig = RemoteWriteConfig{ - RemoteTimeout: model.Duration(30 * time.Second), - QueueConfig: DefaultQueueConfig, + RemoteTimeout: model.Duration(30 * time.Second), + QueueConfig: DefaultQueueConfig, + MetadataConfig: DefaultMetadataConfig, } // DefaultQueueConfig is the default remote queue configuration. @@ -121,6 +122,12 @@ var ( MaxBackoff: model.Duration(100 * time.Millisecond), } + // DefaultMetadataConfig is the default metadata configuration for a remote write endpoint. + DefaultMetadataConfig = MetadataConfig{ + Send: true, + SendInterval: model.Duration(1 * time.Minute), + } + // DefaultRemoteReadConfig is the default remote read configuration. DefaultRemoteReadConfig = RemoteReadConfig{ RemoteTimeout: model.Duration(1 * time.Minute), @@ -570,6 +577,7 @@ type RemoteWriteConfig struct { // values arbitrarily into the overflow maps of further-down types. HTTPClientConfig config.HTTPClientConfig `yaml:",inline"` QueueConfig QueueConfig `yaml:"queue_config,omitempty"` + MetadataConfig MetadataConfig `yaml:"metadata_config,omitempty"` } // SetDirectory joins any relative file paths with dir. @@ -623,6 +631,15 @@ type QueueConfig struct { MaxBackoff model.Duration `yaml:"max_backoff,omitempty"` } +// MetadataConfig is the configuration for sending metadata to remote +// storage. +type MetadataConfig struct { + // Send controls whether we send metric metadata to remote storage. + Send bool `yaml:"send"` + // SendInterval controls how frequently we send metric metadata. + SendInterval model.Duration `yaml:"send_interval"` +} + // RemoteReadConfig is the configuration for reading from remote storage. type RemoteReadConfig struct { URL *config.URL `yaml:"url"` diff --git a/config/config_test.go b/config/config_test.go index 04980239a..d225d283d 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -87,13 +87,15 @@ var expectedConf = &Config{ Action: relabel.Drop, }, }, - QueueConfig: DefaultQueueConfig, + QueueConfig: DefaultQueueConfig, + MetadataConfig: DefaultMetadataConfig, }, { - URL: mustParseURL("http://remote2/push"), - RemoteTimeout: model.Duration(30 * time.Second), - QueueConfig: DefaultQueueConfig, - Name: "rw_tls", + URL: mustParseURL("http://remote2/push"), + RemoteTimeout: model.Duration(30 * time.Second), + QueueConfig: DefaultQueueConfig, + MetadataConfig: DefaultMetadataConfig, + Name: "rw_tls", HTTPClientConfig: config.HTTPClientConfig{ TLSConfig: config.TLSConfig{ CertFile: filepath.FromSlash("testdata/valid_cert_file"), diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index ba356c09a..b9f35c484 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -1769,7 +1769,15 @@ queue_config: [ min_backoff: | default = 30ms ] # Maximum retry delay. [ max_backoff: | default = 100ms ] - +# Configures the sending of series metadata to remote storage. +# Metadata configuration is subject to change at any point +# or be removed in future releases. +metadata_config: + # Whether metric metadata is sent to remote storage or not. + [ send: | default = true ] + # How frequently metric metadata is sent to remote storage. + [ send_interval: | default = 1m ] + ``` There is a list of diff --git a/prompb/remote.pb.go b/prompb/remote.pb.go index 3f34afe4c..2a5bd8c8e 100644 --- a/prompb/remote.pb.go +++ b/prompb/remote.pb.go @@ -62,10 +62,11 @@ func (ReadRequest_ResponseType) EnumDescriptor() ([]byte, []int) { } type WriteRequest struct { - Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"` + Metadata []MetricMetadata `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *WriteRequest) Reset() { *m = WriteRequest{} } @@ -108,6 +109,13 @@ func (m *WriteRequest) GetTimeseries() []TimeSeries { return nil } +func (m *WriteRequest) GetMetadata() []MetricMetadata { + if m != nil { + return m.Metadata + } + return nil +} + // ReadRequest represents a remote read request. type ReadRequest struct { Queries []*Query `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries,omitempty"` @@ -410,37 +418,38 @@ func init() { func init() { proto.RegisterFile("remote.proto", fileDescriptor_eefc82927d57d89b) } var fileDescriptor_eefc82927d57d89b = []byte{ - // 466 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xcf, 0x6e, 0xd3, 0x40, - 0x10, 0xc6, 0xbb, 0x4d, 0xdb, 0xa0, 0x71, 0x88, 0xc2, 0xb6, 0x25, 0xa6, 0x87, 0x34, 0xb2, 0x38, - 0x58, 0x2a, 0x0a, 0x22, 0x54, 0x9c, 0x38, 0x90, 0x96, 0x48, 0x45, 0x24, 0xfc, 0x59, 0x07, 0x81, - 0x10, 0x92, 0xe5, 0xd8, 0xa3, 0xc6, 0xa2, 0xfe, 0xd3, 0xdd, 0xb5, 0xd4, 0xbc, 0x1e, 0xa7, 0x9e, - 0x10, 0x4f, 0x80, 0x50, 0x9e, 0x04, 0xed, 0xda, 0x0e, 0x1b, 0xb8, 0x70, 0x5b, 0x7f, 0xdf, 0x37, - 0x3f, 0xef, 0x8c, 0xc7, 0xd0, 0xe2, 0x98, 0x64, 0x12, 0x07, 0x39, 0xcf, 0x64, 0x46, 0x21, 0xe7, - 0x59, 0x82, 0x72, 0x81, 0x85, 0x38, 0xb2, 0xe4, 0x32, 0x47, 0x51, 0x1a, 0x47, 0x07, 0x97, 0xd9, - 0x65, 0xa6, 0x8f, 0x8f, 0xd5, 0xa9, 0x54, 0x9d, 0x09, 0xb4, 0x3e, 0xf2, 0x58, 0x22, 0xc3, 0xeb, - 0x02, 0x85, 0xa4, 0xcf, 0x01, 0x64, 0x9c, 0xa0, 0x40, 0x1e, 0xa3, 0xb0, 0x49, 0xbf, 0xe1, 0x5a, - 0xc3, 0xfb, 0x83, 0x3f, 0xcc, 0xc1, 0x2c, 0x4e, 0xd0, 0xd3, 0xee, 0xd9, 0xce, 0xed, 0xcf, 0xe3, - 0x2d, 0x66, 0xe4, 0x9d, 0xef, 0x04, 0x2c, 0x86, 0x41, 0x54, 0xd3, 0x4e, 0xa0, 0x79, 0x5d, 0x98, - 0xa8, 0x7b, 0x26, 0xea, 0x7d, 0x81, 0x7c, 0xc9, 0xea, 0x04, 0xfd, 0x02, 0xdd, 0x20, 0x0c, 0x31, - 0x97, 0x18, 0xf9, 0x1c, 0x45, 0x9e, 0xa5, 0x02, 0x7d, 0xdd, 0x81, 0xbd, 0xdd, 0x6f, 0xb8, 0xed, - 0xe1, 0x43, 0xb3, 0xd8, 0x78, 0xcd, 0x80, 0x55, 0xe9, 0xd9, 0x32, 0x47, 0x76, 0x58, 0x43, 0x4c, - 0x55, 0x38, 0xa7, 0xd0, 0x32, 0x05, 0x6a, 0x41, 0xd3, 0x1b, 0x4d, 0xdf, 0x4d, 0xc6, 0x5e, 0x67, - 0x8b, 0x76, 0x61, 0xdf, 0x9b, 0xb1, 0xf1, 0x68, 0x3a, 0x7e, 0xe9, 0x7f, 0x7a, 0xcb, 0xfc, 0xf3, - 0x8b, 0x0f, 0x6f, 0x5e, 0x7b, 0x1d, 0xe2, 0x8c, 0x54, 0x55, 0xb0, 0x46, 0xd1, 0x27, 0xd0, 0xe4, - 0x28, 0x8a, 0x2b, 0x59, 0x37, 0xd4, 0xfd, 0xb7, 0x21, 0xed, 0xb3, 0x3a, 0xe7, 0x7c, 0x23, 0xb0, - 0xab, 0x0d, 0xfa, 0x08, 0xa8, 0x90, 0x01, 0x97, 0xbe, 0x9e, 0x98, 0x0c, 0x92, 0xdc, 0x4f, 0x14, - 0x87, 0xb8, 0x0d, 0xd6, 0xd1, 0xce, 0xac, 0x36, 0xa6, 0x82, 0xba, 0xd0, 0xc1, 0x34, 0xda, 0xcc, - 0x6e, 0xeb, 0x6c, 0x1b, 0xd3, 0xc8, 0x4c, 0x9e, 0xc2, 0x9d, 0x24, 0x90, 0xe1, 0x02, 0xb9, 0xb0, - 0x1b, 0xfa, 0x56, 0xb6, 0x79, 0xab, 0x49, 0x30, 0xc7, 0xab, 0x69, 0x19, 0x60, 0xeb, 0x24, 0x3d, - 0x81, 0xdd, 0x45, 0x9c, 0x4a, 0x61, 0xef, 0xf4, 0x89, 0x6b, 0x0d, 0x0f, 0xff, 0x1e, 0xee, 0x85, - 0x32, 0x59, 0x99, 0x71, 0xc6, 0x60, 0x19, 0xcd, 0xd1, 0x67, 0xff, 0xbf, 0x25, 0x1b, 0xfb, 0x71, - 0x03, 0xfb, 0xe7, 0x8b, 0x22, 0xfd, 0xaa, 0x3e, 0x8e, 0x31, 0xd5, 0x17, 0xd0, 0x0e, 0x4b, 0xd9, - 0xdf, 0x40, 0x3e, 0x30, 0x91, 0x55, 0x61, 0x45, 0xbd, 0x1b, 0x9a, 0x8f, 0xf4, 0x18, 0x2c, 0xb5, - 0x46, 0x4b, 0x3f, 0x4e, 0x23, 0xbc, 0xa9, 0xe6, 0x04, 0x5a, 0x7a, 0xa5, 0x94, 0xb3, 0x83, 0xdb, - 0x55, 0x8f, 0xfc, 0x58, 0xf5, 0xc8, 0xaf, 0x55, 0x8f, 0x7c, 0xde, 0x53, 0xdc, 0x7c, 0x3e, 0xdf, - 0xd3, 0x3f, 0xc1, 0xd3, 0xdf, 0x01, 0x00, 0x00, 0xff, 0xff, 0x9a, 0xb6, 0x6b, 0xcd, 0x43, 0x03, - 0x00, 0x00, + // 496 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x53, 0xcd, 0x6e, 0xd3, 0x40, + 0x10, 0xee, 0x26, 0x69, 0x13, 0x8d, 0x43, 0x14, 0xb6, 0x2d, 0x09, 0x39, 0xa4, 0x91, 0xc5, 0x21, + 0x52, 0x51, 0x10, 0xa1, 0xe2, 0xd4, 0x03, 0x69, 0x89, 0x54, 0xa0, 0xe6, 0x67, 0x13, 0x04, 0x42, + 0x48, 0xd6, 0xc6, 0x1e, 0x35, 0x16, 0xf5, 0x4f, 0x77, 0xd7, 0x52, 0xf3, 0x16, 0x3c, 0x13, 0xa7, + 0x9e, 0x10, 0x4f, 0x80, 0x50, 0x9e, 0x04, 0x79, 0x6d, 0x87, 0x2d, 0x5c, 0xb8, 0xad, 0xbf, 0x3f, + 0xcf, 0xcc, 0xce, 0x42, 0x53, 0x60, 0x18, 0x2b, 0x1c, 0x25, 0x22, 0x56, 0x31, 0x85, 0x44, 0xc4, + 0x21, 0xaa, 0x25, 0xa6, 0xb2, 0x67, 0xa9, 0x55, 0x82, 0x32, 0x27, 0x7a, 0x7b, 0x17, 0xf1, 0x45, + 0xac, 0x8f, 0x8f, 0xb2, 0x53, 0x8e, 0xda, 0x5f, 0x09, 0x34, 0x3f, 0x88, 0x40, 0x21, 0xc3, 0xab, + 0x14, 0xa5, 0xa2, 0xc7, 0x00, 0x2a, 0x08, 0x51, 0xa2, 0x08, 0x50, 0x76, 0xc9, 0xa0, 0x3a, 0xb4, + 0xc6, 0xf7, 0x46, 0x7f, 0x42, 0x47, 0xf3, 0x20, 0xc4, 0x99, 0x66, 0x4f, 0x6a, 0x37, 0x3f, 0x0f, + 0xb6, 0x98, 0xa1, 0xa7, 0xc7, 0xd0, 0x08, 0x51, 0x71, 0x9f, 0x2b, 0xde, 0xad, 0x6a, 0x6f, 0xcf, + 0xf4, 0x3a, 0xa8, 0x44, 0xe0, 0x39, 0x85, 0xa2, 0xf0, 0x6f, 0x1c, 0x2f, 0x6b, 0x8d, 0x4a, 0xbb, + 0x6a, 0x7f, 0x27, 0x60, 0x31, 0xe4, 0x7e, 0x59, 0xd1, 0x21, 0xd4, 0xaf, 0x52, 0xb3, 0x9c, 0xbb, + 0x66, 0xe4, 0xbb, 0x14, 0xc5, 0x8a, 0x95, 0x0a, 0xfa, 0x19, 0x3a, 0xdc, 0xf3, 0x30, 0x51, 0xe8, + 0xbb, 0x02, 0x65, 0x12, 0x47, 0x12, 0x5d, 0x3d, 0x86, 0x6e, 0x65, 0x50, 0x1d, 0xb6, 0xc6, 0x0f, + 0x4c, 0xb3, 0xf1, 0x9b, 0x11, 0x2b, 0xd4, 0xf3, 0x55, 0x82, 0x6c, 0xbf, 0x0c, 0x31, 0x51, 0x69, + 0x1f, 0x41, 0xd3, 0x04, 0xa8, 0x05, 0xf5, 0xd9, 0xc4, 0x79, 0x7b, 0x3e, 0x9d, 0xb5, 0xb7, 0x68, + 0x07, 0x76, 0x67, 0x73, 0x36, 0x9d, 0x38, 0xd3, 0xe7, 0xee, 0xc7, 0x37, 0xcc, 0x3d, 0x3d, 0x7b, + 0xff, 0xfa, 0xd5, 0xac, 0x4d, 0xec, 0x49, 0xe6, 0xe2, 0x9b, 0x28, 0xfa, 0x18, 0xea, 0x02, 0x65, + 0x7a, 0xa9, 0xca, 0x86, 0x3a, 0xff, 0x36, 0xa4, 0x79, 0x56, 0xea, 0xec, 0x6f, 0x04, 0xb6, 0x35, + 0x41, 0x1f, 0x02, 0x95, 0x8a, 0x0b, 0xe5, 0xea, 0xa9, 0x2b, 0x1e, 0x26, 0x6e, 0x98, 0xe5, 0x90, + 0x61, 0x95, 0xb5, 0x35, 0x33, 0x2f, 0x09, 0x47, 0xd2, 0x21, 0xb4, 0x31, 0xf2, 0x6f, 0x6b, 0x2b, + 0x5a, 0xdb, 0xc2, 0xc8, 0x37, 0x95, 0x47, 0xd0, 0x08, 0xb9, 0xf2, 0x96, 0x28, 0x64, 0x71, 0x73, + 0x5d, 0xb3, 0xaa, 0x73, 0xbe, 0xc0, 0x4b, 0x27, 0x17, 0xb0, 0x8d, 0x92, 0x1e, 0xc2, 0xf6, 0x32, + 0x88, 0x94, 0xec, 0xd6, 0x06, 0x64, 0x68, 0x8d, 0xf7, 0xff, 0x1e, 0xee, 0x59, 0x46, 0xb2, 0x5c, + 0x63, 0x4f, 0xc1, 0x32, 0x9a, 0xa3, 0x4f, 0xff, 0x7f, 0xd3, 0xcc, 0x1d, 0xb3, 0xaf, 0x61, 0xf7, + 0x74, 0x99, 0x46, 0x5f, 0xb2, 0xcb, 0x31, 0xa6, 0xfa, 0x0c, 0x5a, 0x5e, 0x0e, 0xbb, 0xb7, 0x22, + 0xef, 0x9b, 0x91, 0x85, 0xb1, 0x48, 0xbd, 0xe3, 0x99, 0x9f, 0xf4, 0x00, 0xac, 0x6c, 0x8d, 0x56, + 0x6e, 0x10, 0xf9, 0x78, 0x5d, 0xcc, 0x09, 0x34, 0xf4, 0x22, 0x43, 0x4e, 0xf6, 0x6e, 0xd6, 0x7d, + 0xf2, 0x63, 0xdd, 0x27, 0xbf, 0xd6, 0x7d, 0xf2, 0x69, 0x27, 0xcb, 0x4d, 0x16, 0x8b, 0x1d, 0xfd, + 0x92, 0x9e, 0xfc, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x13, 0x18, 0x12, 0x0a, 0x88, 0x03, 0x00, 0x00, } func (m *WriteRequest) Marshal() (dAtA []byte, err error) { @@ -467,6 +476,20 @@ func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i -= len(m.XXX_unrecognized) copy(dAtA[i:], m.XXX_unrecognized) } + if len(m.Metadata) > 0 { + for iNdEx := len(m.Metadata) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Metadata[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRemote(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x1a + } + } if len(m.Timeseries) > 0 { for iNdEx := len(m.Timeseries) - 1; iNdEx >= 0; iNdEx-- { { @@ -757,6 +780,12 @@ func (m *WriteRequest) Size() (n int) { n += 1 + l + sovRemote(uint64(l)) } } + if len(m.Metadata) > 0 { + for _, e := range m.Metadata { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -942,6 +971,40 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRemote + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Metadata = append(m.Metadata, MetricMetadata{}) + if err := m.Metadata[len(m.Metadata)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRemote(dAtA[iNdEx:]) diff --git a/prompb/remote.proto b/prompb/remote.proto index ecd8f0bb1..70c6dd3fb 100644 --- a/prompb/remote.proto +++ b/prompb/remote.proto @@ -21,6 +21,10 @@ import "gogoproto/gogo.proto"; message WriteRequest { repeated prometheus.TimeSeries timeseries = 1 [(gogoproto.nullable) = false]; + // Cortex uses this field to determine the source of the write request. + // We reserve it to avoid any compatibility issues. + reserved 2; + repeated prometheus.MetricMetadata metadata = 3 [(gogoproto.nullable) = false]; } // ReadRequest represents a remote read request. diff --git a/prompb/types.pb.go b/prompb/types.pb.go index 9c6e26a2c..5e593b73d 100644 --- a/prompb/types.pb.go +++ b/prompb/types.pb.go @@ -25,6 +25,49 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package +type MetricMetadata_MetricType int32 + +const ( + MetricMetadata_UNKNOWN MetricMetadata_MetricType = 0 + MetricMetadata_COUNTER MetricMetadata_MetricType = 1 + MetricMetadata_GAUGE MetricMetadata_MetricType = 2 + MetricMetadata_HISTOGRAM MetricMetadata_MetricType = 3 + MetricMetadata_GAUGEHISTOGRAM MetricMetadata_MetricType = 4 + MetricMetadata_SUMMARY MetricMetadata_MetricType = 5 + MetricMetadata_INFO MetricMetadata_MetricType = 6 + MetricMetadata_STATESET MetricMetadata_MetricType = 7 +) + +var MetricMetadata_MetricType_name = map[int32]string{ + 0: "UNKNOWN", + 1: "COUNTER", + 2: "GAUGE", + 3: "HISTOGRAM", + 4: "GAUGEHISTOGRAM", + 5: "SUMMARY", + 6: "INFO", + 7: "STATESET", +} + +var MetricMetadata_MetricType_value = map[string]int32{ + "UNKNOWN": 0, + "COUNTER": 1, + "GAUGE": 2, + "HISTOGRAM": 3, + "GAUGEHISTOGRAM": 4, + "SUMMARY": 5, + "INFO": 6, + "STATESET": 7, +} + +func (x MetricMetadata_MetricType) String() string { + return proto.EnumName(MetricMetadata_MetricType_name, int32(x)) +} + +func (MetricMetadata_MetricType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_d938547f84707355, []int{0, 0} +} + type LabelMatcher_Type int32 const ( @@ -53,7 +96,7 @@ func (x LabelMatcher_Type) String() string { } func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{4, 0} + return fileDescriptor_d938547f84707355, []int{5, 0} } // We require this to match chunkenc.Encoding. @@ -79,7 +122,80 @@ func (x Chunk_Encoding) String() string { } func (Chunk_Encoding) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{6, 0} + return fileDescriptor_d938547f84707355, []int{7, 0} +} + +type MetricMetadata struct { + // Represents the metric type, these match the set from Prometheus. + // Refer to pkg/textparse/interface.go for details. + Type MetricMetadata_MetricType `protobuf:"varint,1,opt,name=type,proto3,enum=prometheus.MetricMetadata_MetricType" json:"type,omitempty"` + MetricFamilyName string `protobuf:"bytes,2,opt,name=metric_family_name,json=metricFamilyName,proto3" json:"metric_family_name,omitempty"` + Help string `protobuf:"bytes,4,opt,name=help,proto3" json:"help,omitempty"` + Unit string `protobuf:"bytes,5,opt,name=unit,proto3" json:"unit,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *MetricMetadata) Reset() { *m = MetricMetadata{} } +func (m *MetricMetadata) String() string { return proto.CompactTextString(m) } +func (*MetricMetadata) ProtoMessage() {} +func (*MetricMetadata) Descriptor() ([]byte, []int) { + return fileDescriptor_d938547f84707355, []int{0} +} +func (m *MetricMetadata) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *MetricMetadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_MetricMetadata.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *MetricMetadata) XXX_Merge(src proto.Message) { + xxx_messageInfo_MetricMetadata.Merge(m, src) +} +func (m *MetricMetadata) XXX_Size() int { + return m.Size() +} +func (m *MetricMetadata) XXX_DiscardUnknown() { + xxx_messageInfo_MetricMetadata.DiscardUnknown(m) +} + +var xxx_messageInfo_MetricMetadata proto.InternalMessageInfo + +func (m *MetricMetadata) GetType() MetricMetadata_MetricType { + if m != nil { + return m.Type + } + return MetricMetadata_UNKNOWN +} + +func (m *MetricMetadata) GetMetricFamilyName() string { + if m != nil { + return m.MetricFamilyName + } + return "" +} + +func (m *MetricMetadata) GetHelp() string { + if m != nil { + return m.Help + } + return "" +} + +func (m *MetricMetadata) GetUnit() string { + if m != nil { + return m.Unit + } + return "" } type Sample struct { @@ -94,7 +210,7 @@ func (m *Sample) Reset() { *m = Sample{} } func (m *Sample) String() string { return proto.CompactTextString(m) } func (*Sample) ProtoMessage() {} func (*Sample) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{0} + return fileDescriptor_d938547f84707355, []int{1} } func (m *Sample) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -150,7 +266,7 @@ func (m *TimeSeries) Reset() { *m = TimeSeries{} } func (m *TimeSeries) String() string { return proto.CompactTextString(m) } func (*TimeSeries) ProtoMessage() {} func (*TimeSeries) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{1} + return fileDescriptor_d938547f84707355, []int{2} } func (m *TimeSeries) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -205,7 +321,7 @@ func (m *Label) Reset() { *m = Label{} } func (m *Label) String() string { return proto.CompactTextString(m) } func (*Label) ProtoMessage() {} func (*Label) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{2} + return fileDescriptor_d938547f84707355, []int{3} } func (m *Label) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -259,7 +375,7 @@ func (m *Labels) Reset() { *m = Labels{} } func (m *Labels) String() string { return proto.CompactTextString(m) } func (*Labels) ProtoMessage() {} func (*Labels) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{3} + return fileDescriptor_d938547f84707355, []int{4} } func (m *Labels) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -309,7 +425,7 @@ func (m *LabelMatcher) Reset() { *m = LabelMatcher{} } func (m *LabelMatcher) String() string { return proto.CompactTextString(m) } func (*LabelMatcher) ProtoMessage() {} func (*LabelMatcher) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{4} + return fileDescriptor_d938547f84707355, []int{5} } func (m *LabelMatcher) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -376,7 +492,7 @@ func (m *ReadHints) Reset() { *m = ReadHints{} } func (m *ReadHints) String() string { return proto.CompactTextString(m) } func (*ReadHints) ProtoMessage() {} func (*ReadHints) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{5} + return fileDescriptor_d938547f84707355, []int{6} } func (m *ReadHints) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -470,7 +586,7 @@ func (m *Chunk) Reset() { *m = Chunk{} } func (m *Chunk) String() string { return proto.CompactTextString(m) } func (*Chunk) ProtoMessage() {} func (*Chunk) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{6} + return fileDescriptor_d938547f84707355, []int{7} } func (m *Chunk) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -542,7 +658,7 @@ func (m *ChunkedSeries) Reset() { *m = ChunkedSeries{} } func (m *ChunkedSeries) String() string { return proto.CompactTextString(m) } func (*ChunkedSeries) ProtoMessage() {} func (*ChunkedSeries) Descriptor() ([]byte, []int) { - return fileDescriptor_d938547f84707355, []int{7} + return fileDescriptor_d938547f84707355, []int{8} } func (m *ChunkedSeries) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -586,8 +702,10 @@ func (m *ChunkedSeries) GetChunks() []Chunk { } func init() { + proto.RegisterEnum("prometheus.MetricMetadata_MetricType", MetricMetadata_MetricType_name, MetricMetadata_MetricType_value) proto.RegisterEnum("prometheus.LabelMatcher_Type", LabelMatcher_Type_name, LabelMatcher_Type_value) proto.RegisterEnum("prometheus.Chunk_Encoding", Chunk_Encoding_name, Chunk_Encoding_value) + proto.RegisterType((*MetricMetadata)(nil), "prometheus.MetricMetadata") proto.RegisterType((*Sample)(nil), "prometheus.Sample") proto.RegisterType((*TimeSeries)(nil), "prometheus.TimeSeries") proto.RegisterType((*Label)(nil), "prometheus.Label") @@ -601,41 +719,104 @@ func init() { func init() { proto.RegisterFile("types.proto", fileDescriptor_d938547f84707355) } var fileDescriptor_d938547f84707355 = []byte{ - // 539 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xcd, 0x6e, 0xd3, 0x40, - 0x10, 0xee, 0xda, 0x89, 0x9d, 0x4c, 0x4a, 0x95, 0xae, 0x8a, 0x30, 0x15, 0x04, 0xcb, 0x27, 0x9f, - 0x5c, 0x35, 0x9c, 0x90, 0x38, 0x15, 0x45, 0x42, 0xa2, 0x4e, 0xd5, 0x6d, 0x11, 0x88, 0x4b, 0xb5, - 0x89, 0x17, 0xc7, 0x22, 0x5e, 0xbb, 0xde, 0x0d, 0x6a, 0x1e, 0x84, 0xc7, 0xe0, 0xc0, 0x5b, 0xf4, - 0xc8, 0x13, 0x20, 0x94, 0x27, 0x41, 0x3b, 0x76, 0x7e, 0xa4, 0x72, 0x81, 0xdb, 0xfc, 0x7c, 0xf3, - 0x7d, 0x9f, 0x77, 0xc6, 0xd0, 0xd3, 0xcb, 0x52, 0xa8, 0xa8, 0xac, 0x0a, 0x5d, 0x50, 0x28, 0xab, - 0x22, 0x17, 0x7a, 0x26, 0x16, 0xea, 0xf8, 0x28, 0x2d, 0xd2, 0x02, 0xcb, 0x27, 0x26, 0xaa, 0x11, - 0xc1, 0x6b, 0x70, 0xae, 0x78, 0x5e, 0xce, 0x05, 0x3d, 0x82, 0xf6, 0x57, 0x3e, 0x5f, 0x08, 0x8f, - 0xf8, 0x24, 0x24, 0xac, 0x4e, 0xe8, 0x33, 0xe8, 0xea, 0x2c, 0x17, 0x4a, 0xf3, 0xbc, 0xf4, 0x2c, - 0x9f, 0x84, 0x36, 0xdb, 0x16, 0x82, 0x5b, 0x80, 0xeb, 0x2c, 0x17, 0x57, 0xa2, 0xca, 0x84, 0xa2, - 0x27, 0xe0, 0xcc, 0xf9, 0x44, 0xcc, 0x95, 0x47, 0x7c, 0x3b, 0xec, 0x0d, 0x0f, 0xa3, 0xad, 0x7c, - 0x74, 0x6e, 0x3a, 0x67, 0xad, 0xfb, 0x5f, 0x2f, 0xf6, 0x58, 0x03, 0xa3, 0x43, 0x70, 0x15, 0x8a, - 0x2b, 0xcf, 0xc2, 0x09, 0xba, 0x3b, 0x51, 0xfb, 0x6a, 0x46, 0xd6, 0xc0, 0xe0, 0x14, 0xda, 0x48, - 0x45, 0x29, 0xb4, 0x24, 0xcf, 0x6b, 0xbb, 0x5d, 0x86, 0xf1, 0xf6, 0x1b, 0x2c, 0x2c, 0xd6, 0x49, - 0xf0, 0x0a, 0x9c, 0xf3, 0x5a, 0xf0, 0x5f, 0x1d, 0x06, 0xdf, 0x08, 0xec, 0x63, 0x3d, 0xe6, 0x7a, - 0x3a, 0x13, 0x15, 0x3d, 0x85, 0x96, 0x79, 0x60, 0x54, 0x3d, 0x18, 0x3e, 0x7f, 0x30, 0xdf, 0xe0, - 0xa2, 0xeb, 0x65, 0x29, 0x18, 0x42, 0x37, 0x46, 0xad, 0xbf, 0x19, 0xb5, 0x77, 0x8d, 0x86, 0xd0, - 0x32, 0x73, 0xd4, 0x01, 0x6b, 0x74, 0xd9, 0xdf, 0xa3, 0x2e, 0xd8, 0xe3, 0xd1, 0x65, 0x9f, 0x98, - 0x02, 0x1b, 0xf5, 0x2d, 0x2c, 0xb0, 0x51, 0xdf, 0x0e, 0x7e, 0x10, 0xe8, 0x32, 0xc1, 0x93, 0xb7, - 0x99, 0xd4, 0x8a, 0x3e, 0x01, 0x57, 0x69, 0x51, 0xde, 0xe4, 0x0a, 0x7d, 0xd9, 0xcc, 0x31, 0x69, - 0xac, 0x8c, 0xf4, 0xe7, 0x85, 0x9c, 0xae, 0xa5, 0x4d, 0x4c, 0x9f, 0x42, 0x47, 0x69, 0x5e, 0x69, - 0x83, 0xb6, 0x11, 0xed, 0x62, 0x1e, 0x2b, 0xfa, 0x18, 0x1c, 0x21, 0x13, 0xd3, 0x68, 0x61, 0xa3, - 0x2d, 0x64, 0x12, 0x2b, 0x7a, 0x0c, 0x9d, 0xb4, 0x2a, 0x16, 0x65, 0x26, 0x53, 0xaf, 0xed, 0xdb, - 0x61, 0x97, 0x6d, 0x72, 0x7a, 0x00, 0xd6, 0x64, 0xe9, 0x39, 0x3e, 0x09, 0x3b, 0xcc, 0x9a, 0x2c, - 0x0d, 0x7b, 0xc5, 0x65, 0x2a, 0x0c, 0x89, 0x5b, 0xb3, 0x63, 0x1e, 0xab, 0xe0, 0x3b, 0x81, 0xf6, - 0x9b, 0xd9, 0x42, 0x7e, 0xa1, 0x03, 0xe8, 0xe5, 0x99, 0xbc, 0x31, 0x77, 0xb4, 0xf5, 0xdc, 0xcd, - 0x33, 0x69, 0x8e, 0x29, 0x56, 0xd8, 0xe7, 0x77, 0x9b, 0x7e, 0x73, 0x76, 0x39, 0xbf, 0x6b, 0xfa, - 0x51, 0xb3, 0x04, 0x1b, 0x97, 0x70, 0xbc, 0xbb, 0x04, 0x14, 0x88, 0x46, 0x72, 0x5a, 0x24, 0x99, - 0x4c, 0xb7, 0x1b, 0x48, 0xb8, 0xe6, 0xf8, 0x55, 0xfb, 0x0c, 0xe3, 0xc0, 0x87, 0xce, 0x1a, 0x45, - 0x7b, 0xe0, 0xbe, 0x1f, 0xbf, 0x1b, 0x5f, 0x7c, 0x18, 0xd7, 0x8f, 0xfe, 0xf1, 0x82, 0xf5, 0x49, - 0x70, 0x0b, 0x8f, 0x90, 0x4d, 0x24, 0xff, 0x7b, 0xdf, 0x27, 0xe0, 0x4c, 0x0d, 0xc3, 0xfa, 0xbc, - 0x0f, 0x1f, 0x38, 0x5d, 0x0f, 0xd4, 0xb0, 0xb3, 0xa3, 0xfb, 0xd5, 0x80, 0xfc, 0x5c, 0x0d, 0xc8, - 0xef, 0xd5, 0x80, 0x7c, 0x72, 0x0c, 0xba, 0x9c, 0x4c, 0x1c, 0xfc, 0x55, 0x5f, 0xfe, 0x09, 0x00, - 0x00, 0xff, 0xff, 0xed, 0x99, 0x84, 0x88, 0xdb, 0x03, 0x00, 0x00, + // 690 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xcd, 0x6e, 0xda, 0x40, + 0x10, 0xce, 0xfa, 0x17, 0x86, 0x04, 0x39, 0xab, 0x54, 0x75, 0xa3, 0x96, 0x22, 0x4b, 0x95, 0x38, + 0x54, 0x44, 0x49, 0x4f, 0x91, 0x7a, 0x21, 0x91, 0xf3, 0xa3, 0xc6, 0xa0, 0x2c, 0xa0, 0xfe, 0x5c, + 0xd0, 0x02, 0x1b, 0xb0, 0x8a, 0x8d, 0xe3, 0x5d, 0xaa, 0xf0, 0x20, 0xbd, 0xf5, 0x15, 0x7a, 0xe8, + 0x5b, 0xe4, 0xd8, 0x27, 0xa8, 0xaa, 0x3c, 0x49, 0xb5, 0x6b, 0x13, 0x13, 0xa5, 0x97, 0xf6, 0x36, + 0xf3, 0x7d, 0xdf, 0xfc, 0xec, 0xcc, 0xd8, 0x50, 0x11, 0xcb, 0x84, 0xf1, 0x66, 0x92, 0xce, 0xc5, + 0x1c, 0x43, 0x92, 0xce, 0x23, 0x26, 0xa6, 0x6c, 0xc1, 0x77, 0x77, 0x26, 0xf3, 0xc9, 0x5c, 0xc1, + 0x7b, 0xd2, 0xca, 0x14, 0xde, 0x37, 0x0d, 0xaa, 0x01, 0x13, 0x69, 0x38, 0x0a, 0x98, 0xa0, 0x63, + 0x2a, 0x28, 0x3e, 0x04, 0x43, 0xe6, 0x70, 0x51, 0x1d, 0x35, 0xaa, 0x07, 0xaf, 0x9a, 0x45, 0x8e, + 0xe6, 0x43, 0x65, 0xee, 0xf6, 0x96, 0x09, 0x23, 0x2a, 0x04, 0xbf, 0x06, 0x1c, 0x29, 0x6c, 0x70, + 0x45, 0xa3, 0x70, 0xb6, 0x1c, 0xc4, 0x34, 0x62, 0xae, 0x56, 0x47, 0x8d, 0x32, 0x71, 0x32, 0xe6, + 0x44, 0x11, 0x6d, 0x1a, 0x31, 0x8c, 0xc1, 0x98, 0xb2, 0x59, 0xe2, 0x1a, 0x8a, 0x57, 0xb6, 0xc4, + 0x16, 0x71, 0x28, 0x5c, 0x33, 0xc3, 0xa4, 0xed, 0x2d, 0x01, 0x8a, 0x4a, 0xb8, 0x02, 0x76, 0xbf, + 0xfd, 0xae, 0xdd, 0x79, 0xdf, 0x76, 0x36, 0xa4, 0x73, 0xdc, 0xe9, 0xb7, 0x7b, 0x3e, 0x71, 0x10, + 0x2e, 0x83, 0x79, 0xda, 0xea, 0x9f, 0xfa, 0x8e, 0x86, 0xb7, 0xa0, 0x7c, 0x76, 0xde, 0xed, 0x75, + 0x4e, 0x49, 0x2b, 0x70, 0x74, 0x8c, 0xa1, 0xaa, 0x98, 0x02, 0x33, 0x64, 0x68, 0xb7, 0x1f, 0x04, + 0x2d, 0xf2, 0xd1, 0x31, 0x71, 0x09, 0x8c, 0xf3, 0xf6, 0x49, 0xc7, 0xb1, 0xf0, 0x26, 0x94, 0xba, + 0xbd, 0x56, 0xcf, 0xef, 0xfa, 0x3d, 0xc7, 0xf6, 0xde, 0x82, 0xd5, 0xa5, 0x51, 0x32, 0x63, 0x78, + 0x07, 0xcc, 0x2f, 0x74, 0xb6, 0xc8, 0xc6, 0x82, 0x48, 0xe6, 0xe0, 0xe7, 0x50, 0x16, 0x61, 0xc4, + 0xb8, 0xa0, 0x51, 0xa2, 0xde, 0xa9, 0x93, 0x02, 0xf0, 0xae, 0x01, 0x7a, 0x61, 0xc4, 0xba, 0x2c, + 0x0d, 0x19, 0xc7, 0x7b, 0x60, 0xcd, 0xe8, 0x90, 0xcd, 0xb8, 0x8b, 0xea, 0x7a, 0xa3, 0x72, 0xb0, + 0xbd, 0x3e, 0xd9, 0x0b, 0xc9, 0x1c, 0x19, 0xb7, 0xbf, 0x5e, 0x6e, 0x90, 0x5c, 0x86, 0x0f, 0xc0, + 0xe6, 0xaa, 0x38, 0x77, 0x35, 0x15, 0x81, 0xd7, 0x23, 0xb2, 0xbe, 0xf2, 0x90, 0x95, 0xd0, 0xdb, + 0x07, 0x53, 0xa5, 0x92, 0x83, 0x54, 0xc3, 0x47, 0xd9, 0x20, 0xa5, 0x5d, 0xbc, 0x21, 0xdb, 0x48, + 0xe6, 0x78, 0x87, 0x60, 0x5d, 0x64, 0x05, 0xff, 0xb5, 0x43, 0xef, 0x2b, 0x82, 0x4d, 0x85, 0x07, + 0x54, 0x8c, 0xa6, 0x2c, 0xc5, 0xfb, 0x0f, 0x6e, 0xe7, 0xc5, 0xa3, 0xf8, 0x5c, 0xd7, 0x5c, 0xbb, + 0x99, 0x55, 0xa3, 0xda, 0xdf, 0x1a, 0xd5, 0xd7, 0x1b, 0x6d, 0x80, 0xa1, 0x2e, 0xc0, 0x02, 0xcd, + 0xbf, 0x74, 0x36, 0xb0, 0x0d, 0x7a, 0xdb, 0xbf, 0x74, 0x90, 0x04, 0x88, 0xdc, 0xba, 0x04, 0x88, + 0xef, 0xe8, 0xde, 0x0f, 0x04, 0x65, 0xc2, 0xe8, 0xf8, 0x2c, 0x8c, 0x05, 0xc7, 0x4f, 0xc1, 0xe6, + 0x82, 0x25, 0x83, 0x88, 0xab, 0xbe, 0x74, 0x62, 0x49, 0x37, 0xe0, 0xb2, 0xf4, 0xd5, 0x22, 0x1e, + 0xad, 0x4a, 0x4b, 0x1b, 0x3f, 0x83, 0x12, 0x17, 0x34, 0x15, 0x52, 0xad, 0x2b, 0xb5, 0xad, 0xfc, + 0x80, 0xe3, 0x27, 0x60, 0xb1, 0x78, 0x2c, 0x09, 0x43, 0x11, 0x26, 0x8b, 0xc7, 0x01, 0xc7, 0xbb, + 0x50, 0x9a, 0xa4, 0xf3, 0x45, 0x12, 0xc6, 0x13, 0xd7, 0xac, 0xeb, 0x8d, 0x32, 0xb9, 0xf7, 0x71, + 0x15, 0xb4, 0xe1, 0xd2, 0xb5, 0xea, 0xa8, 0x51, 0x22, 0xda, 0x70, 0x29, 0xb3, 0xa7, 0x34, 0x9e, + 0x30, 0x99, 0xc4, 0xce, 0xb2, 0x2b, 0x3f, 0xe0, 0xde, 0x77, 0x04, 0xe6, 0xf1, 0x74, 0x11, 0x7f, + 0xc6, 0x35, 0xa8, 0x44, 0x61, 0x3c, 0x90, 0x77, 0x54, 0xf4, 0x5c, 0x8e, 0xc2, 0x58, 0x1e, 0x53, + 0xc0, 0x15, 0x4f, 0x6f, 0xee, 0xf9, 0xfc, 0xec, 0x22, 0x7a, 0x93, 0xf3, 0xcd, 0x7c, 0x09, 0xba, + 0x5a, 0xc2, 0xee, 0xfa, 0x12, 0x54, 0x81, 0xa6, 0x1f, 0x8f, 0xe6, 0xe3, 0x30, 0x9e, 0x14, 0x1b, + 0x90, 0x9f, 0xb3, 0x7a, 0xd5, 0x26, 0x51, 0xb6, 0x57, 0x87, 0xd2, 0x4a, 0xf5, 0xf0, 0x8b, 0xb3, + 0x41, 0xff, 0xd0, 0x21, 0x0e, 0xf2, 0xae, 0x61, 0x4b, 0x65, 0x63, 0xe3, 0xff, 0xbd, 0xef, 0x3d, + 0xb0, 0x46, 0x32, 0xc3, 0xea, 0xbc, 0xb7, 0x1f, 0x75, 0xba, 0x0a, 0xc8, 0x64, 0x47, 0x3b, 0xb7, + 0x77, 0x35, 0xf4, 0xf3, 0xae, 0x86, 0x7e, 0xdf, 0xd5, 0xd0, 0x27, 0x4b, 0xaa, 0x93, 0xe1, 0xd0, + 0x52, 0x7f, 0xb2, 0x37, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0xf3, 0xb7, 0x12, 0x44, 0xfa, 0x04, + 0x00, 0x00, +} + +func (m *MetricMetadata) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *MetricMetadata) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *MetricMetadata) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.Unit) > 0 { + i -= len(m.Unit) + copy(dAtA[i:], m.Unit) + i = encodeVarintTypes(dAtA, i, uint64(len(m.Unit))) + i-- + dAtA[i] = 0x2a + } + if len(m.Help) > 0 { + i -= len(m.Help) + copy(dAtA[i:], m.Help) + i = encodeVarintTypes(dAtA, i, uint64(len(m.Help))) + i-- + dAtA[i] = 0x22 + } + if len(m.MetricFamilyName) > 0 { + i -= len(m.MetricFamilyName) + copy(dAtA[i:], m.MetricFamilyName) + i = encodeVarintTypes(dAtA, i, uint64(len(m.MetricFamilyName))) + i-- + dAtA[i] = 0x12 + } + if m.Type != 0 { + i = encodeVarintTypes(dAtA, i, uint64(m.Type)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil } func (m *Sample) Marshal() (dAtA []byte, err error) { @@ -1047,6 +1228,33 @@ func encodeVarintTypes(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } +func (m *MetricMetadata) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Type != 0 { + n += 1 + sovTypes(uint64(m.Type)) + } + l = len(m.MetricFamilyName) + if l > 0 { + n += 1 + l + sovTypes(uint64(l)) + } + l = len(m.Help) + if l > 0 { + n += 1 + l + sovTypes(uint64(l)) + } + l = len(m.Unit) + if l > 0 { + n += 1 + l + sovTypes(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + func (m *Sample) Size() (n int) { if m == nil { return 0 @@ -1242,6 +1450,175 @@ func sovTypes(x uint64) (n int) { func sozTypes(x uint64) (n int) { return sovTypes(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } +func (m *MetricMetadata) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: MetricMetadata: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: MetricMetadata: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + m.Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Type |= MetricMetadata_MetricType(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field MetricFamilyName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.MetricFamilyName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Help", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Help = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Unit", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Unit = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTypes(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *Sample) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/prompb/types.proto b/prompb/types.proto index de437d182..259a0d40d 100644 --- a/prompb/types.proto +++ b/prompb/types.proto @@ -18,6 +18,26 @@ option go_package = "prompb"; import "gogoproto/gogo.proto"; +message MetricMetadata { + enum MetricType { + UNKNOWN = 0; + COUNTER = 1; + GAUGE = 2; + HISTOGRAM = 3; + GAUGEHISTOGRAM = 4; + SUMMARY = 5; + INFO = 6; + STATESET = 7; + } + + // Represents the metric type, these match the set from Prometheus. + // Refer to pkg/textparse/interface.go for details. + MetricType type = 1; + string metric_family_name = 2; + string help = 4; + string unit = 5; +} + message Sample { double value = 1; int64 timestamp = 2; diff --git a/scrape/target.go b/scrape/target.go index ac9cad031..2b4c4301c 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -75,6 +75,7 @@ func (t *Target) String() string { return t.URL().String() } +// MetricMetadataStore represents a storage for metadata. type MetricMetadataStore interface { ListMetadata() []MetricMetadata GetMetadata(metric string) (MetricMetadata, bool) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 474e4e400..f8033111c 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "net/http" "sort" + "strings" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" @@ -26,6 +27,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -484,3 +486,14 @@ func labelsToLabelsProto(labels labels.Labels, buf []prompb.Label) []prompb.Labe } return result } + +// metricTypeToMetricTypeProto transforms a Prometheus metricType into prompb metricType. Since the former is a string we need to transform it to an enum. +func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_MetricType { + mt := strings.ToUpper(string(t)) + v, ok := prompb.MetricMetadata_MetricType_value[mt] + if !ok { + return prompb.MetricMetadata_UNKNOWN + } + + return prompb.MetricMetadata_MetricType(v) +} diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index ab2aa5739..bd507b7c4 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -20,6 +20,7 @@ import ( "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" ) @@ -230,3 +231,34 @@ func TestMergeLabels(t *testing.T) { require.Equal(t, tc.expected, MergeLabels(tc.primary, tc.secondary)) } } + +func TestMetricTypeToMetricTypeProto(t *testing.T) { + tc := []struct { + desc string + input textparse.MetricType + expected prompb.MetricMetadata_MetricType + }{ + { + desc: "with a single-word metric", + input: textparse.MetricTypeCounter, + expected: prompb.MetricMetadata_COUNTER, + }, + { + desc: "with a two-word metric", + input: textparse.MetricTypeStateset, + expected: prompb.MetricMetadata_STATESET, + }, + { + desc: "with an unknown metric", + input: "not-known", + expected: prompb.MetricMetadata_UNKNOWN, + }, + } + + for _, tt := range tc { + t.Run(tt.desc, func(t *testing.T) { + m := metricTypeToMetricTypeProto(tt.input) + require.Equal(t, tt.expected, m) + }) + } +} diff --git a/storage/remote/metadata_watcher.go b/storage/remote/metadata_watcher.go new file mode 100644 index 000000000..a347a3f25 --- /dev/null +++ b/storage/remote/metadata_watcher.go @@ -0,0 +1,163 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/scrape" +) + +// MetadataAppender is an interface used by the Metadata Watcher to send metadata, It is read from the scrape manager, on to somewhere else. +type MetadataAppender interface { + AppendMetadata(context.Context, []scrape.MetricMetadata) +} + +// Watchable represents from where we fetch active targets for metadata. +type Watchable interface { + TargetsActive() map[string][]*scrape.Target +} + +type noopScrapeManager struct{} + +func (noop *noopScrapeManager) Get() (*scrape.Manager, error) { + return nil, errors.New("Scrape manager not ready") +} + +// MetadataWatcher watches the Scrape Manager for a given WriteMetadataTo. +type MetadataWatcher struct { + name string + logger log.Logger + + managerGetter ReadyScrapeManager + manager Watchable + writer MetadataAppender + + interval model.Duration + deadline time.Duration + + done chan struct{} + + softShutdownCtx context.Context + softShutdownCancel context.CancelFunc + hardShutdownCancel context.CancelFunc + hardShutdownCtx context.Context +} + +// NewMetadataWatcher builds a new MetadataWatcher. +func NewMetadataWatcher(l log.Logger, mg ReadyScrapeManager, name string, w MetadataAppender, interval model.Duration, deadline time.Duration) *MetadataWatcher { + if l == nil { + l = log.NewNopLogger() + } + + if mg == nil { + mg = &noopScrapeManager{} + } + + return &MetadataWatcher{ + name: name, + logger: l, + + managerGetter: mg, + writer: w, + + interval: interval, + deadline: deadline, + + done: make(chan struct{}), + } +} + +// Start the MetadataWatcher. +func (mw *MetadataWatcher) Start() { + level.Info(mw.logger).Log("msg", "Starting scraped metadata watcher") + mw.hardShutdownCtx, mw.hardShutdownCancel = context.WithCancel(context.Background()) + mw.softShutdownCtx, mw.softShutdownCancel = context.WithCancel(mw.hardShutdownCtx) + go mw.loop() +} + +// Stop the MetadataWatcher. +func (mw *MetadataWatcher) Stop() { + level.Info(mw.logger).Log("msg", "Stopping metadata watcher...") + defer level.Info(mw.logger).Log("msg", "Scraped metadata watcher stopped") + + mw.softShutdownCancel() + select { + case <-mw.done: + return + case <-time.After(mw.deadline): + level.Error(mw.logger).Log("msg", "Failed to flush metadata") + } + + mw.hardShutdownCancel() + <-mw.done +} + +func (mw *MetadataWatcher) loop() { + ticker := time.NewTicker(time.Duration(mw.interval)) + defer ticker.Stop() + defer close(mw.done) + + for { + select { + case <-mw.softShutdownCtx.Done(): + return + case <-ticker.C: + mw.collect() + } + } +} + +func (mw *MetadataWatcher) collect() { + if !mw.ready() { + return + } + + // We create a set of the metadata to help deduplicating based on the attributes of a + // scrape.MetricMetadata. In this case, a combination of metric name, help, type, and unit. + metadataSet := map[scrape.MetricMetadata]struct{}{} + metadata := []scrape.MetricMetadata{} + for _, tset := range mw.manager.TargetsActive() { + for _, target := range tset { + for _, entry := range target.MetadataList() { + if _, ok := metadataSet[entry]; !ok { + metadata = append(metadata, entry) + metadataSet[entry] = struct{}{} + } + } + } + } + + // Blocks until the metadata is sent to the remote write endpoint or hardShutdownContext is expired. + mw.writer.AppendMetadata(mw.hardShutdownCtx, metadata) +} + +func (mw *MetadataWatcher) ready() bool { + if mw.manager != nil { + return true + } + + m, err := mw.managerGetter.Get() + if err != nil { + return false + } + + mw.manager = m + return true +} diff --git a/storage/remote/metadata_watcher_test.go b/storage/remote/metadata_watcher_test.go new file mode 100644 index 000000000..4577eb8f6 --- /dev/null +++ b/storage/remote/metadata_watcher_test.go @@ -0,0 +1,155 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/prometheus/prometheus/scrape" + "github.com/stretchr/testify/require" +) + +var ( + interval = model.Duration(1 * time.Millisecond) + deadline = 1 * time.Millisecond +) + +// TestMetaStore satisfies the MetricMetadataStore interface. +// It is used to inject specific metadata as part of a test case. +type TestMetaStore struct { + Metadata []scrape.MetricMetadata +} + +func (s *TestMetaStore) ListMetadata() []scrape.MetricMetadata { + return s.Metadata +} + +func (s *TestMetaStore) GetMetadata(metric string) (scrape.MetricMetadata, bool) { + for _, m := range s.Metadata { + if metric == m.Metric { + return m, true + } + } + + return scrape.MetricMetadata{}, false +} + +func (s *TestMetaStore) SizeMetadata() int { return 0 } +func (s *TestMetaStore) LengthMetadata() int { return 0 } + +type writeMetadataToMock struct { + metadataAppended int +} + +func (mwtm *writeMetadataToMock) AppendMetadata(_ context.Context, m []scrape.MetricMetadata) { + mwtm.metadataAppended += len(m) +} + +func newMetadataWriteToMock() *writeMetadataToMock { + return &writeMetadataToMock{} +} + +type scrapeManagerMock struct { + manager *scrape.Manager + ready bool +} + +func (smm *scrapeManagerMock) Get() (*scrape.Manager, error) { + if smm.ready { + return smm.manager, nil + } + + return nil, errors.New("not ready") +} + +type fakeManager struct { + activeTargets map[string][]*scrape.Target +} + +func (fm *fakeManager) TargetsActive() map[string][]*scrape.Target { + return fm.activeTargets +} + +func TestWatchScrapeManager_NotReady(t *testing.T) { + wt := newMetadataWriteToMock() + smm := &scrapeManagerMock{ + ready: false, + } + + mw := NewMetadataWatcher(nil, smm, "", wt, interval, deadline) + require.Equal(t, false, mw.ready()) + + mw.collect() + + require.Equal(t, 0, wt.metadataAppended) +} + +func TestWatchScrapeManager_ReadyForCollection(t *testing.T) { + wt := newMetadataWriteToMock() + + metadata := &TestMetaStore{ + Metadata: []scrape.MetricMetadata{ + { + Metric: "prometheus_tsdb_head_chunks_created_total", + Type: textparse.MetricTypeCounter, + Help: "Total number", + Unit: "", + }, + { + Metric: "prometheus_remote_storage_retried_samples_total", + Type: textparse.MetricTypeCounter, + Help: "Total number", + Unit: "", + }, + }, + } + metadataDup := &TestMetaStore{ + Metadata: []scrape.MetricMetadata{ + { + Metric: "prometheus_tsdb_head_chunks_created_total", + Type: textparse.MetricTypeCounter, + Help: "Total number", + Unit: "", + }, + }, + } + + target := &scrape.Target{} + target.SetMetadataStore(metadata) + targetWithDup := &scrape.Target{} + targetWithDup.SetMetadataStore(metadataDup) + + manager := &fakeManager{ + activeTargets: map[string][]*scrape.Target{ + "job": []*scrape.Target{target}, + "dup": []*scrape.Target{targetWithDup}, + }, + } + + smm := &scrapeManagerMock{ + ready: true, + } + + mw := NewMetadataWatcher(nil, smm, "", wt, interval, deadline) + mw.manager = manager + + mw.collect() + + require.Equal(t, 2, wt.metadataAppended) +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 3bc19b7b6..66d2e67b9 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -26,13 +26,14 @@ import ( "github.com/golang/snappy" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" - "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/wal" ) @@ -50,21 +51,25 @@ const ( type queueManagerMetrics struct { reg prometheus.Registerer - succeededSamplesTotal prometheus.Counter - failedSamplesTotal prometheus.Counter - retriedSamplesTotal prometheus.Counter - droppedSamplesTotal prometheus.Counter - enqueueRetriesTotal prometheus.Counter - sentBatchDuration prometheus.Histogram - highestSentTimestamp *maxTimestamp - pendingSamples prometheus.Gauge - shardCapacity prometheus.Gauge - numShards prometheus.Gauge - maxNumShards prometheus.Gauge - minNumShards prometheus.Gauge - desiredNumShards prometheus.Gauge - bytesSent prometheus.Counter - maxSamplesPerSend prometheus.Gauge + samplesTotal prometheus.Counter + metadataTotal prometheus.Counter + failedSamplesTotal prometheus.Counter + failedMetadataTotal prometheus.Counter + retriedSamplesTotal prometheus.Counter + retriedMetadataTotal prometheus.Counter + droppedSamplesTotal prometheus.Counter + enqueueRetriesTotal prometheus.Counter + sentBatchDuration prometheus.Histogram + highestSentTimestamp *maxTimestamp + pendingSamples prometheus.Gauge + shardCapacity prometheus.Gauge + numShards prometheus.Gauge + maxNumShards prometheus.Gauge + minNumShards prometheus.Gauge + desiredNumShards prometheus.Gauge + samplesBytesTotal prometheus.Counter + metadataBytesTotal prometheus.Counter + maxSamplesPerSend prometheus.Gauge } func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManagerMetrics { @@ -76,31 +81,52 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager endpoint: e, } - m.succeededSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + m.samplesTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "succeeded_samples_total", - Help: "Total number of samples successfully sent to remote storage.", + Name: "samples_total", + Help: "Total number of samples sent to remote storage.", + ConstLabels: constLabels, + }) + m.metadataTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "metadata_total", + Help: "Total number of metadata entries sent to remote storage.", ConstLabels: constLabels, }) m.failedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "failed_samples_total", + Name: "samples_failed_total", Help: "Total number of samples which failed on send to remote storage, non-recoverable errors.", ConstLabels: constLabels, }) + m.failedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "metadata_failed_total", + Help: "Total number of metadata entries which failed on send to remote storage, non-recoverable errors.", + ConstLabels: constLabels, + }) m.retriedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "retried_samples_total", + Name: "samples_retried_total", Help: "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable.", ConstLabels: constLabels, }) + m.retriedMetadataTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "metadata_retried_total", + Help: "Total number of metadata entries which failed on send to remote storage but were retried because the send error was recoverable.", + ConstLabels: constLabels, + }) m.droppedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "dropped_samples_total", + Name: "samples_dropped_total", Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write.", ConstLabels: constLabels, }) @@ -115,7 +141,7 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Namespace: namespace, Subsystem: subsystem, Name: "sent_batch_duration_seconds", - Help: "Duration of sample batch send calls to the remote storage.", + Help: "Duration of send calls to the remote storage.", Buckets: append(prometheus.DefBuckets, 25, 60, 120, 300), ConstLabels: constLabels, }) @@ -131,7 +157,7 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager m.pendingSamples = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "pending_samples", + Name: "samples_pending", Help: "The number of samples pending in the queues shards to be sent to the remote storage.", ConstLabels: constLabels, }) @@ -170,11 +196,18 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager Help: "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out.", ConstLabels: constLabels, }) - m.bytesSent = prometheus.NewCounter(prometheus.CounterOpts{ + m.samplesBytesTotal = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "sent_bytes_total", - Help: "The total number of bytes sent by the queue.", + Name: "samples_bytes_total", + Help: "The total number of bytes of samples sent by the queue after compression.", + ConstLabels: constLabels, + }) + m.metadataBytesTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "metadata_bytes_total", + Help: "The total number of bytes of metadata sent by the queue after compression.", ConstLabels: constLabels, }) m.maxSamplesPerSend = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -191,9 +224,12 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager func (m *queueManagerMetrics) register() { if m.reg != nil { m.reg.MustRegister( - m.succeededSamplesTotal, + m.samplesTotal, + m.metadataTotal, m.failedSamplesTotal, + m.failedMetadataTotal, m.retriedSamplesTotal, + m.retriedMetadataTotal, m.droppedSamplesTotal, m.enqueueRetriesTotal, m.sentBatchDuration, @@ -204,7 +240,8 @@ func (m *queueManagerMetrics) register() { m.maxNumShards, m.minNumShards, m.desiredNumShards, - m.bytesSent, + m.samplesBytesTotal, + m.metadataBytesTotal, m.maxSamplesPerSend, ) } @@ -212,9 +249,12 @@ func (m *queueManagerMetrics) register() { func (m *queueManagerMetrics) unregister() { if m.reg != nil { - m.reg.Unregister(m.succeededSamplesTotal) + m.reg.Unregister(m.samplesTotal) + m.reg.Unregister(m.metadataTotal) m.reg.Unregister(m.failedSamplesTotal) + m.reg.Unregister(m.failedMetadataTotal) m.reg.Unregister(m.retriedSamplesTotal) + m.reg.Unregister(m.retriedMetadataTotal) m.reg.Unregister(m.droppedSamplesTotal) m.reg.Unregister(m.enqueueRetriesTotal) m.reg.Unregister(m.sentBatchDuration) @@ -225,7 +265,8 @@ func (m *queueManagerMetrics) unregister() { m.reg.Unregister(m.maxNumShards) m.reg.Unregister(m.minNumShards) m.reg.Unregister(m.desiredNumShards) - m.reg.Unregister(m.bytesSent) + m.reg.Unregister(m.samplesBytesTotal) + m.reg.Unregister(m.metadataBytesTotal) m.reg.Unregister(m.maxSamplesPerSend) } } @@ -247,12 +288,14 @@ type WriteClient interface { type QueueManager struct { lastSendTimestamp atomic.Int64 - logger log.Logger - flushDeadline time.Duration - cfg config.QueueConfig - externalLabels labels.Labels - relabelConfigs []*relabel.Config - watcher *wal.Watcher + logger log.Logger + flushDeadline time.Duration + cfg config.QueueConfig + mcfg config.MetadataConfig + externalLabels labels.Labels + relabelConfigs []*relabel.Config + watcher *wal.Watcher + metadataWatcher *MetadataWatcher clientMtx sync.RWMutex storeClient WriteClient @@ -284,12 +327,14 @@ func NewQueueManager( walDir string, samplesIn *ewmaRate, cfg config.QueueConfig, + mCfg config.MetadataConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client WriteClient, flushDeadline time.Duration, interner *pool, highestRecvTimestamp *maxTimestamp, + sm ReadyScrapeManager, ) *QueueManager { if logger == nil { logger = log.NewNopLogger() @@ -300,6 +345,7 @@ func NewQueueManager( logger: logger, flushDeadline: flushDeadline, cfg: cfg, + mcfg: mCfg, externalLabels: externalLabels, relabelConfigs: relabelConfigs, storeClient: client, @@ -323,11 +369,77 @@ func NewQueueManager( } t.watcher = wal.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, walDir) + if t.mcfg.Send { + t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline) + } t.shards = t.newShards() return t } +// AppendMetadata sends metadata the remote storage. Metadata is sent all at once and is not parallelized. +func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata) { + mm := make([]prompb.MetricMetadata, 0, len(metadata)) + for _, entry := range metadata { + mm = append(mm, prompb.MetricMetadata{ + MetricFamilyName: entry.Metric, + Help: entry.Help, + Type: metricTypeToMetricTypeProto(entry.Type), + Unit: entry.Unit, + }) + } + + err := t.sendMetadataWithBackoff(ctx, mm) + + if err != nil { + t.metrics.failedMetadataTotal.Add(float64(len(metadata))) + level.Error(t.logger).Log("msg", "non-recoverable error while sending metadata", "count", len(metadata), "err", err) + } +} + +func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata) error { + // Build the WriteRequest with no samples. + req, _, err := buildWriteRequest(nil, metadata, nil) + if err != nil { + return err + } + + metadataCount := len(metadata) + + attemptStore := func(try int) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "Remote Metadata Send Batch") + defer span.Finish() + + span.SetTag("metadata", metadataCount) + span.SetTag("try", try) + span.SetTag("remote_name", t.storeClient.Name()) + span.SetTag("remote_url", t.storeClient.Endpoint()) + + begin := time.Now() + err := t.storeClient.Store(ctx, req) + t.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) + + if err != nil { + span.LogKV("error", err) + ext.Error.Set(span, true) + return err + } + + return nil + } + + retry := func() { + t.metrics.retriedMetadataTotal.Add(float64(len(metadata))) + } + err = sendWriteRequestWithBackoff(ctx, t.cfg, t.client(), t.logger, req, attemptStore, retry) + if err != nil { + return err + } + t.metrics.metadataTotal.Add(float64(len(metadata))) + t.metrics.metadataBytesTotal.Add(float64(len(req))) + return nil +} + // Append queues a sample to be sent to the remote storage. Blocks until all samples are // enqueued on their shards or a shutdown signal is received. func (t *QueueManager) Append(samples []record.RefSample) bool { @@ -386,6 +498,9 @@ func (t *QueueManager) Start() { t.shards.start(t.numShards) t.watcher.Start() + if t.mcfg.Send { + t.metadataWatcher.Start() + } t.wg.Add(2) go t.updateShardsLoop() @@ -400,11 +515,14 @@ func (t *QueueManager) Stop() { close(t.quit) t.wg.Wait() - // Wait for all QueueManager routines to end before stopping shards and WAL watcher. This + // Wait for all QueueManager routines to end before stopping shards, metadata watcher, and WAL watcher. This // is to ensure we don't end up executing a reshard and shards.stop() at the same time, which // causes a closed channel panic. t.shards.stop() t.watcher.Stop() + if t.mcfg.Send { + t.metadataWatcher.Stop() + } // On shutdown, release the strings in the labels from the intern pool. t.seriesMtx.Lock() @@ -868,23 +986,22 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, b // sendSamples to the remote storage with backoff for recoverable errors. func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, buf *[]byte) error { - req, highest, err := buildWriteRequest(samples, *buf) + // Build the WriteRequest with no metadata. + req, highest, err := buildWriteRequest(samples, nil, *buf) if err != nil { // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. return err } - backoff := s.qm.cfg.MinBackoff reqSize := len(*buf) sampleCount := len(samples) *buf = req - try := 0 // An anonymous function allows us to defer the completion of our per-try spans // without causing a memory leak, and it has the nice effect of not propagating any // parameters for sendSamplesWithBackoff/3. - attemptStore := func() error { + attemptStore := func(try int) error { span, ctx := opentracing.StartSpanFromContext(ctx, "Remote Send Batch") defer span.Finish() @@ -895,6 +1012,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti span.SetTag("remote_url", s.qm.storeClient.Endpoint()) begin := time.Now() + s.qm.metrics.samplesTotal.Add(float64(sampleCount)) err := s.qm.client().Store(ctx, *buf) s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds()) @@ -907,6 +1025,23 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti return nil } + onRetry := func() { + s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) + } + + err = sendWriteRequestWithBackoff(ctx, s.qm.cfg, s.qm.client(), s.qm.logger, req, attemptStore, onRetry) + if err != nil { + return err + } + s.qm.metrics.samplesBytesTotal.Add(float64(reqSize)) + s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000)) + return nil +} + +func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, s WriteClient, l log.Logger, req []byte, attempt func(int) error, onRetry func()) error { + backoff := cfg.MinBackoff + try := 0 + for { select { case <-ctx.Done(): @@ -914,37 +1049,34 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti default: } - err = attemptStore() + err := attempt(try) - if err != nil { - // If the error is unrecoverable, we should not retry. - if _, ok := err.(RecoverableError); !ok { - return err - } - - // If we make it this far, we've encountered a recoverable error and will retry. - s.qm.metrics.retriedSamplesTotal.Add(float64(sampleCount)) - level.Warn(s.qm.logger).Log("msg", "Failed to send batch, retrying", "err", err) - time.Sleep(time.Duration(backoff)) - backoff = backoff * 2 - - if backoff > s.qm.cfg.MaxBackoff { - backoff = s.qm.cfg.MaxBackoff - } - - try++ - continue + if err == nil { + return nil } - // Since we retry forever on recoverable errors, this needs to stay inside the loop. - s.qm.metrics.succeededSamplesTotal.Add(float64(sampleCount)) - s.qm.metrics.bytesSent.Add(float64(reqSize)) - s.qm.metrics.highestSentTimestamp.Set(float64(highest / 1000)) - return nil + // If the error is unrecoverable, we should not retry. + if _, ok := err.(RecoverableError); !ok { + return err + } + + // If we make it this far, we've encountered a recoverable error and will retry. + onRetry() + level.Debug(l).Log("msg", "failed to send batch, retrying", "err", err) + + time.Sleep(time.Duration(backoff)) + backoff = backoff * 2 + + if backoff > cfg.MaxBackoff { + backoff = cfg.MaxBackoff + } + + try++ + continue } } -func buildWriteRequest(samples []prompb.TimeSeries, buf []byte) ([]byte, int64, error) { +func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, buf []byte) ([]byte, int64, error) { var highest int64 for _, ts := range samples { // At the moment we only ever append a TimeSeries with a single sample in it. @@ -952,8 +1084,10 @@ func buildWriteRequest(samples []prompb.TimeSeries, buf []byte) ([]byte, int64, highest = ts.Samples[0].Timestamp } } + req := &prompb.WriteRequest{ Timeseries: samples, + Metadata: metadata, } data, err := proto.Marshal(req) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 03b88215d..d4682f63e 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -39,8 +39,10 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/tsdb/record" ) @@ -78,7 +80,7 @@ func TestSampleDelivery(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) defer s.Close() writeConfig := config.DefaultRemoteWriteConfig @@ -110,6 +112,33 @@ func TestSampleDelivery(t *testing.T) { c.waitForExpectedSamples(t) } +func TestMetadataDelivery(t *testing.T) { + c := NewTestWriteClient() + + dir, err := ioutil.TempDir("", "TestMetadataDelivery") + require.NoError(t, err) + defer os.RemoveAll(dir) + + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig + + metrics := newQueueManagerMetrics(nil, "", "") + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) + m.Start() + defer m.Stop() + + m.AppendMetadata(context.Background(), []scrape.MetricMetadata{ + scrape.MetricMetadata{ + Metric: "prometheus_remote_storage_sent_metadata_bytes_total", + Type: textparse.MetricTypeCounter, + Help: "a nice help text", + Unit: "", + }, + }) + + require.Equal(t, len(c.receivedMetadata), 1) +} + func TestSampleDeliveryTimeout(t *testing.T) { // Let's send one less sample than batch size, and wait the timeout duration n := 9 @@ -117,6 +146,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { c := NewTestWriteClient() cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) @@ -127,7 +157,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { }() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -169,8 +199,11 @@ func TestSampleDeliveryOrder(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig + metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, newPool(), nil) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) m.StoreSeries(series, 0) m.Start() @@ -190,9 +223,11 @@ func TestShutdown(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline, newPool(), newHighestTimestampMetric()) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -231,8 +266,10 @@ func TestSeriesReset(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline, newPool(), newHighestTimestampMetric()) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, deadline, newPool(), newHighestTimestampMetric(), nil) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { @@ -255,6 +292,7 @@ func TestReshard(t *testing.T) { c.expectSamples(samples, series) cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig cfg.MaxShards = 1 dir, err := ioutil.TempDir("", "TestReshard") @@ -264,7 +302,7 @@ func TestReshard(t *testing.T) { }() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) m.StoreSeries(series, 0) m.Start() @@ -294,10 +332,12 @@ func TestReshardRaceWithStop(t *testing.T) { h.Lock() + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig go func() { for { metrics := newQueueManagerMetrics(nil, "", "") - m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) + m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) m.Start() h.Unlock() h.Lock() @@ -313,9 +353,11 @@ func TestReshardRaceWithStop(t *testing.T) { } func TestReleaseNoninternedString(t *testing.T) { + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") c := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) m.Start() for i := 1; i < 1000; i++ { @@ -360,10 +402,13 @@ func TestShouldReshard(t *testing.T) { expectedToReshard: true, }, } + + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig for _, c := range cases { metrics := newQueueManagerMetrics(nil, "", "") client := NewTestWriteClient() - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) m.numShards = c.startingShards m.samplesIn.incr(c.samplesIn) m.samplesOut.incr(c.samplesOut) @@ -411,19 +456,21 @@ func getSeriesNameFromRef(r record.RefSeries) string { } type TestWriteClient struct { - receivedSamples map[string][]prompb.Sample - expectedSamples map[string][]prompb.Sample - withWaitGroup bool - wg sync.WaitGroup - mtx sync.Mutex - buf []byte + receivedSamples map[string][]prompb.Sample + expectedSamples map[string][]prompb.Sample + receivedMetadata map[string][]prompb.MetricMetadata + withWaitGroup bool + wg sync.WaitGroup + mtx sync.Mutex + buf []byte } func NewTestWriteClient() *TestWriteClient { return &TestWriteClient{ - withWaitGroup: true, - receivedSamples: map[string][]prompb.Sample{}, - expectedSamples: map[string][]prompb.Sample{}, + withWaitGroup: true, + receivedSamples: map[string][]prompb.Sample{}, + expectedSamples: map[string][]prompb.Sample{}, + receivedMetadata: map[string][]prompb.MetricMetadata{}, } } @@ -510,6 +557,11 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte) error { if c.withWaitGroup { c.wg.Add(-count) } + + for _, m := range reqProto.Metadata { + c.receivedMetadata[m.MetricFamilyName] = append(c.receivedMetadata[m.MetricFamilyName], m) + } + return nil } @@ -560,6 +612,7 @@ func BenchmarkSampleDelivery(b *testing.B) { c := NewTestWriteClient() cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) cfg.MaxShards = 1 @@ -568,7 +621,7 @@ func BenchmarkSampleDelivery(b *testing.B) { defer os.RemoveAll(dir) metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) m.StoreSeries(series, 0) // These should be received by the client. @@ -607,12 +660,14 @@ func BenchmarkStartup(b *testing.B) { logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout)) logger = log.With(logger, "caller", log.DefaultCaller) + cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig for n := 0; n < b.N; n++ { metrics := newQueueManagerMetrics(nil, "", "") c := NewTestBlockedWriteClient() m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), - config.DefaultQueueConfig, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric()) + cfg, mcfg, nil, nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] err := m.watcher.Run() @@ -654,6 +709,7 @@ func TestProcessExternalLabels(t *testing.T) { func TestCalculateDesiredShards(t *testing.T) { c := NewTestWriteClient() cfg := config.DefaultQueueConfig + mcfg := config.DefaultMetadataConfig dir, err := ioutil.TempDir("", "TestCalculateDesiredShards") require.NoError(t, err) @@ -663,7 +719,7 @@ func TestCalculateDesiredShards(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric()) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, nil, nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil) // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index fc3794a54..646d00c46 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -95,7 +95,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) { for _, tc := range cases { t.Run("", func(t *testing.T) { - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteReadConfigs: tc.cfgs, diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 5fb4bfbec..2ca540ed3 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/logging" + "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" ) @@ -40,6 +41,10 @@ const ( endpoint = "url" ) +type ReadyScrapeManager interface { + Get() (*scrape.Manager, error) +} + // startTimeCallback is a callback func that return the oldest timestamp stored in a storage. type startTimeCallback func() (int64, error) @@ -57,7 +62,7 @@ type Storage struct { } // NewStorage returns a remote.Storage. -func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration) *Storage { +func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *Storage { if l == nil { l = log.NewNopLogger() } @@ -66,7 +71,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal logger: logging.Dedupe(l, 1*time.Minute), localStartTimeCallback: stCallback, } - s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline) + s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm) return s } diff --git a/storage/remote/storage_test.go b/storage/remote/storage_test.go index a170bf497..29fc67e0d 100644 --- a/storage/remote/storage_test.go +++ b/storage/remote/storage_test.go @@ -30,7 +30,7 @@ func TestStorageLifecycle(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -69,7 +69,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil) conf := &config.Config{ GlobalConfig: config.GlobalConfig{}, diff --git a/storage/remote/write.go b/storage/remote/write.go index 64d3be326..296ffbcef 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -52,13 +52,14 @@ type WriteStorage struct { samplesIn *ewmaRate flushDeadline time.Duration interner *pool + scraper ReadyScrapeManager // For timestampTracker. highestTimestamp *maxTimestamp } // NewWriteStorage creates and runs a WriteStorage. -func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string, flushDeadline time.Duration) *WriteStorage { +func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager) *WriteStorage { if logger == nil { logger = log.NewNopLogger() } @@ -72,6 +73,7 @@ func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, walDir string samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), walDir: walDir, interner: newPool(), + scraper: sm, highestTimestamp: &maxTimestamp{ Gauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, @@ -155,12 +157,14 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.walDir, rws.samplesIn, rwConf.QueueConfig, + rwConf.MetadataConfig, conf.GlobalConfig.ExternalLabels, rwConf.WriteRelabelConfigs, c, rws.flushDeadline, rws.interner, rws.highestTimestamp, + rws.scraper, ) // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash) diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index 40503a6ea..27c568e1c 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -115,7 +115,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) { } for _, tc := range cases { - s := NewWriteStorage(nil, nil, dir, time.Millisecond) + s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: tc.cfgs, @@ -139,7 +139,7 @@ func TestRestartOnNameChange(t *testing.T) { hash, err := toHash(cfg) require.NoError(t, err) - s := NewWriteStorage(nil, nil, dir, time.Millisecond) + s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -167,7 +167,7 @@ func TestUpdateWithRegisterer(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() - s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond) + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil) c1 := &config.RemoteWriteConfig{ Name: "named", URL: &common_config.URL{ @@ -211,7 +211,7 @@ func TestWriteStorageLifecycle(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -232,7 +232,7 @@ func TestUpdateExternalLabels(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() - s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second) + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil) externalLabels := labels.FromStrings("external", "true") conf := &config.Config{ @@ -265,7 +265,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) conf := &config.Config{ GlobalConfig: config.GlobalConfig{}, @@ -301,7 +301,7 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil) c0 := &config.RemoteWriteConfig{ RemoteTimeout: model.Duration(10 * time.Second), diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 6b81eb732..c313a4633 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -367,7 +367,9 @@ func TestEndpoints(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dbDir) - remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, nil, dbDir, 1*time.Second) + remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) { + return 0, nil + }, dbDir, 1*time.Second, nil) err = remote.ApplyConfig(&config.Config{ RemoteReadConfigs: []*config.RemoteReadConfig{