// Copyright 2021 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package xds import ( "context" "io" "net/http" "net/http/httptest" "testing" "time" v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "go.uber.org/goleak" "google.golang.org/protobuf/types/known/anypb" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/targetgroup" ) var sdConf = SDConfig{ Server: "http://127.0.0.1", RefreshInterval: model.Duration(10 * time.Second), ClientID: "test-id", } func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } type discoveryResponder func(request *v3.DiscoveryRequest) (*v3.DiscoveryResponse, error) func createTestHTTPServer(t *testing.T, responder discoveryResponder) *httptest.Server { return httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Validate req MIME types. require.Equal(t, "application/json", r.Header.Get("Content-Type")) require.Equal(t, "application/json", r.Header.Get("Accept")) body, err := io.ReadAll(r.Body) defer func() { _, _ = io.Copy(io.Discard, r.Body) _ = r.Body.Close() }() require.NotEmpty(t, body) require.NoError(t, err) // Validate discovery request. discoveryReq := &v3.DiscoveryRequest{} err = protoJSONUnmarshalOptions.Unmarshal(body, discoveryReq) require.NoError(t, err) discoveryRes, err := responder(discoveryReq) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } if discoveryRes == nil { w.WriteHeader(http.StatusNotModified) return } w.WriteHeader(http.StatusOK) data, err := protoJSONMarshalOptions.Marshal(discoveryRes) require.NoError(t, err) _, err = w.Write(data) require.NoError(t, err) })) } func constantResourceParser(targets []model.LabelSet, err error) resourceParser { return func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error) { return targets, err } } var nopLogger = log.NewNopLogger() type testResourceClient struct { resourceTypeURL string server string protocolVersion ProtocolVersion fetch func(ctx context.Context) (*v3.DiscoveryResponse, error) } func (rc testResourceClient) ResourceTypeURL() string { return rc.resourceTypeURL } func (rc testResourceClient) Server() string { return rc.server } func (rc testResourceClient) Fetch(ctx context.Context) (*v3.DiscoveryResponse, error) { return rc.fetch(ctx) } func (rc testResourceClient) ID() string { return "test-client" } func (rc testResourceClient) Close() { } func TestPollingRefreshSkipUpdate(t *testing.T) { rc := &testResourceClient{ fetch: func(ctx context.Context) (*v3.DiscoveryResponse, error) { return nil, nil }, } cfg := &SDConfig{} reg := prometheus.NewRegistry() refreshMetrics := discovery.NewRefreshMetrics(reg) metrics := cfg.NewDiscovererMetrics(reg, refreshMetrics) require.NoError(t, metrics.Register()) xdsMetrics, ok := metrics.(*xdsMetrics) if !ok { require.Fail(t, "invalid discovery metrics type") } pd := &fetchDiscovery{ client: rc, logger: nopLogger, metrics: xdsMetrics, } ctx, cancel := context.WithCancel(context.Background()) go func() { time.Sleep(1 * time.Second) cancel() }() ch := make(chan []*targetgroup.Group, 1) pd.poll(ctx, ch) select { case <-ctx.Done(): return case <-ch: require.Fail(t, "no update expected") } metrics.Unregister() refreshMetrics.Unregister() } func TestPollingRefreshAttachesGroupMetadata(t *testing.T) { server := "http://198.161.2.0" source := "test" rc := &testResourceClient{ server: server, protocolVersion: ProtocolV3, fetch: func(ctx context.Context) (*v3.DiscoveryResponse, error) { return &v3.DiscoveryResponse{}, nil }, } reg := prometheus.NewRegistry() refreshMetrics := discovery.NewRefreshMetrics(reg) metrics := newDiscovererMetrics(reg, refreshMetrics) require.NoError(t, metrics.Register()) xdsMetrics, ok := metrics.(*xdsMetrics) require.True(t, ok) pd := &fetchDiscovery{ source: source, client: rc, logger: nopLogger, parseResources: constantResourceParser([]model.LabelSet{ { "__meta_custom_xds_label": "a-value", "__address__": "10.1.4.32:9090", "instance": "prometheus-01", }, { "__meta_custom_xds_label": "a-value", "__address__": "10.1.5.32:9090", "instance": "prometheus-02", }, }, nil), metrics: xdsMetrics, } ch := make(chan []*targetgroup.Group, 1) pd.poll(context.Background(), ch) groups := <-ch require.NotNil(t, groups) require.Len(t, groups, 1) group := groups[0] require.Equal(t, source, group.Source) require.Len(t, group.Targets, 2) target2 := group.Targets[1] require.Contains(t, target2, model.LabelName("__meta_custom_xds_label")) require.Equal(t, model.LabelValue("a-value"), target2["__meta_custom_xds_label"]) metrics.Unregister() refreshMetrics.Unregister() } func TestPollingDisappearingTargets(t *testing.T) { server := "http://198.161.2.0" source := "test" rc := &testResourceClient{ server: server, protocolVersion: ProtocolV3, fetch: func(ctx context.Context) (*v3.DiscoveryResponse, error) { return &v3.DiscoveryResponse{}, nil }, } // On the first poll, send back two targets. On the next, send just one. counter := 0 parser := func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error) { counter++ if counter == 1 { return []model.LabelSet{ { "__meta_custom_xds_label": "a-value", "__address__": "10.1.4.32:9090", "instance": "prometheus-01", }, { "__meta_custom_xds_label": "a-value", "__address__": "10.1.5.32:9090", "instance": "prometheus-02", }, }, nil } return []model.LabelSet{ { "__meta_custom_xds_label": "a-value", "__address__": "10.1.4.32:9090", "instance": "prometheus-01", }, }, nil } cfg := &SDConfig{} reg := prometheus.NewRegistry() refreshMetrics := discovery.NewRefreshMetrics(reg) metrics := cfg.NewDiscovererMetrics(reg, refreshMetrics) require.NoError(t, metrics.Register()) xdsMetrics, ok := metrics.(*xdsMetrics) if !ok { require.Fail(t, "invalid discovery metrics type") } pd := &fetchDiscovery{ source: source, client: rc, logger: nopLogger, parseResources: parser, metrics: xdsMetrics, } ch := make(chan []*targetgroup.Group, 1) pd.poll(context.Background(), ch) groups := <-ch require.NotNil(t, groups) require.Len(t, groups, 1) require.Equal(t, source, groups[0].Source) require.Len(t, groups[0].Targets, 2) pd.poll(context.Background(), ch) groups = <-ch require.NotNil(t, groups) require.Len(t, groups, 1) require.Equal(t, source, groups[0].Source) require.Len(t, groups[0].Targets, 1) metrics.Unregister() refreshMetrics.Unregister() }