From 9f29fc8a9c952324845061e9b7a6791e5bce71b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan-Otto=20Kr=C3=B6pke?= Date: Mon, 18 Nov 2024 02:07:03 +0100 Subject: [PATCH] service: parallelize api calls (#1744) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jan-Otto Kröpke --- internal/collector/cpu/cpu.go | 8 + internal/collector/service/service.go | 223 +++++++++++++++++--------- 2 files changed, 156 insertions(+), 75 deletions(-) diff --git a/internal/collector/cpu/cpu.go b/internal/collector/cpu/cpu.go index 4d9600af..1aa0ab4f 100644 --- a/internal/collector/cpu/cpu.go +++ b/internal/collector/cpu/cpu.go @@ -5,6 +5,7 @@ package cpu import ( "fmt" "log/slog" + "sync" "github.com/alecthomas/kingpin/v2" "github.com/prometheus-community/windows_exporter/internal/mi" @@ -25,6 +26,8 @@ type Collector struct { perfDataCollector *perfdata.Collector + mu sync.Mutex + processorRTCValues map[string]utils.Counter processorMPerfValues map[string]utils.Counter @@ -73,6 +76,8 @@ func (c *Collector) Close() error { func (c *Collector) Build(_ *slog.Logger, _ *mi.Session) error { var err error + c.mu = sync.Mutex{} + c.perfDataCollector, err = perfdata.NewCollector("Processor Information", perfdata.InstanceAll, []string{ c1TimeSeconds, c2TimeSeconds, @@ -219,6 +224,9 @@ func (c *Collector) Build(_ *slog.Logger, _ *mi.Session) error { } func (c *Collector) Collect(ch chan<- prometheus.Metric) error { + c.mu.Lock() // Lock is needed to prevent concurrent map access to c.processorRTCValues + defer c.mu.Unlock() + data, err := c.perfDataCollector.Collect() if err != nil { return fmt.Errorf("failed to collect Processor Information metrics: %w", err) diff --git a/internal/collector/service/service.go b/internal/collector/service/service.go index dbd4c073..7b7ac1e5 100644 --- a/internal/collector/service/service.go +++ b/internal/collector/service/service.go @@ -8,6 +8,7 @@ import ( "log/slog" "regexp" "strconv" + "sync" "unsafe" "github.com/alecthomas/kingpin/v2" @@ -36,11 +37,18 @@ type Collector struct { logger *slog.Logger + apiStateValues map[uint32]string + apiStartModeValues map[uint32]string + state *prometheus.Desc processID *prometheus.Desc info *prometheus.Desc startMode *prometheus.Desc + // serviceConfigPoolBytes is a pool of byte slices used to avoid allocations + // ref: https://victoriametrics.com/blog/go-sync-pool/ + serviceConfigPoolBytes sync.Pool + serviceManagerHandle *mgr.Mgr } @@ -111,6 +119,12 @@ func (c *Collector) Build(logger *slog.Logger, _ *mi.Session) error { c.logger.Warn("No filters specified for service collector. This will generate a very large number of metrics!") } + c.serviceConfigPoolBytes = sync.Pool{ + New: func() any { + return new([]byte) + }, + } + c.info = prometheus.NewDesc( prometheus.BuildFQName(types.Namespace, Name, "info"), "A metric with a constant '1' value labeled with service information", @@ -136,6 +150,24 @@ func (c *Collector) Build(logger *slog.Logger, _ *mi.Session) error { nil, ) + c.apiStateValues = map[uint32]string{ + windows.SERVICE_CONTINUE_PENDING: "continue pending", + windows.SERVICE_PAUSE_PENDING: "pause pending", + windows.SERVICE_PAUSED: "paused", + windows.SERVICE_RUNNING: "running", + windows.SERVICE_START_PENDING: "start pending", + windows.SERVICE_STOP_PENDING: "stop pending", + windows.SERVICE_STOPPED: "stopped", + } + + c.apiStartModeValues = map[uint32]string{ + windows.SERVICE_AUTO_START: "auto", + windows.SERVICE_BOOT_START: "boot", + windows.SERVICE_DEMAND_START: "manual", + windows.SERVICE_DISABLED: "disabled", + windows.SERVICE_SYSTEM_START: "system", + } + // EnumServiceStatusEx requires only SC_MANAGER_ENUM_SERVICE. handle, err := windows.OpenSCManager(nil, nil, windows.SC_MANAGER_ENUMERATE_SERVICE) if err != nil { @@ -171,73 +203,73 @@ func (c *Collector) Collect(ch chan<- prometheus.Metric) error { return nil } - // Iterate through the Services List. - for _, service := range services { - serviceName := windows.UTF16PtrToString(service.ServiceName) - if c.config.ServiceExclude.MatchString(serviceName) || - !c.config.ServiceInclude.MatchString(serviceName) { - continue - } + servicesCh := make(chan windows.ENUM_SERVICE_STATUS_PROCESS, len(services)) + wg := sync.WaitGroup{} + wg.Add(len(services)) - if err := c.collectService(ch, service); err != nil { - c.logger.Warn("failed collecting service info", - slog.Any("err", err), - slog.String("service", windows.UTF16PtrToString(service.ServiceName)), - ) - } + for range 4 { + go func(ch chan<- prometheus.Metric, wg *sync.WaitGroup) { + for service := range servicesCh { + c.collectWorker(ch, service) + wg.Done() + } + }(ch, &wg) } + for _, service := range services { + servicesCh <- service + } + + close(servicesCh) + + wg.Wait() + return nil } -var apiStateValues = map[uint32]string{ - windows.SERVICE_CONTINUE_PENDING: "continue pending", - windows.SERVICE_PAUSE_PENDING: "pause pending", - windows.SERVICE_PAUSED: "paused", - windows.SERVICE_RUNNING: "running", - windows.SERVICE_START_PENDING: "start pending", - windows.SERVICE_STOP_PENDING: "stop pending", - windows.SERVICE_STOPPED: "stopped", +func (c *Collector) collectWorker(ch chan<- prometheus.Metric, service windows.ENUM_SERVICE_STATUS_PROCESS) { + serviceName := windows.UTF16PtrToString(service.ServiceName) + + if c.config.ServiceExclude.MatchString(serviceName) || !c.config.ServiceInclude.MatchString(serviceName) { + return + } + + if err := c.collectService(ch, serviceName, service); err != nil { + c.logger.Warn("failed collecting service info", + slog.Any("err", err), + slog.String("service", serviceName), + ) + } } -var apiStartModeValues = map[uint32]string{ - windows.SERVICE_AUTO_START: "auto", - windows.SERVICE_BOOT_START: "boot", - windows.SERVICE_DEMAND_START: "manual", - windows.SERVICE_DISABLED: "disabled", - windows.SERVICE_SYSTEM_START: "system", -} - -func (c *Collector) collectService(ch chan<- prometheus.Metric, service windows.ENUM_SERVICE_STATUS_PROCESS) error { +func (c *Collector) collectService(ch chan<- prometheus.Metric, serviceName string, service windows.ENUM_SERVICE_STATUS_PROCESS) error { // Open connection for service handler. serviceHandle, err := windows.OpenService(c.serviceManagerHandle.Handle, service.ServiceName, windows.SERVICE_QUERY_CONFIG) if err != nil { return fmt.Errorf("failed to open service: %w", err) } - serviceNameString := windows.UTF16PtrToString(service.ServiceName) - // Create handle for each service. - serviceManager := &mgr.Service{Name: serviceNameString, Handle: serviceHandle} + serviceManager := &mgr.Service{Name: serviceName, Handle: serviceHandle} defer func(serviceManager *mgr.Service) { if err := serviceManager.Close(); err != nil { c.logger.Warn("failed to close service handle", slog.Any("err", err), - slog.String("service", serviceNameString), + slog.String("service", serviceName), ) } }(serviceManager) // Get Service Configuration. - serviceConfig, err := serviceManager.Config() + serviceConfig, err := c.getServiceConfig(serviceManager) if err != nil { if !errors.Is(err, windows.ERROR_FILE_NOT_FOUND) && !errors.Is(err, windows.ERROR_MUI_FILE_NOT_FOUND) { return fmt.Errorf("failed to get service configuration: %w", err) } - c.logger.Debug("failed collecting service", + c.logger.Debug("failed collecting service config", slog.Any("err", err), - slog.String("service", serviceNameString), + slog.String("service", serviceName), ) } @@ -245,7 +277,7 @@ func (c *Collector) collectService(ch chan<- prometheus.Metric, service windows. c.info, prometheus.GaugeValue, 1.0, - serviceNameString, + serviceName, serviceConfig.DisplayName, serviceConfig.ServiceStartName, serviceConfig.BinaryPathName, @@ -256,21 +288,21 @@ func (c *Collector) collectService(ch chan<- prometheus.Metric, service windows. isCurrentState float64 ) - for _, startMode := range apiStartModeValues { + for _, startMode := range c.apiStartModeValues { isCurrentStartMode = 0.0 - if startMode == apiStartModeValues[serviceConfig.StartType] { + if startMode == c.apiStartModeValues[serviceConfig.StartType] { isCurrentStartMode = 1.0 } ch <- prometheus.MustNewConstMetric( c.startMode, prometheus.GaugeValue, isCurrentStartMode, - serviceNameString, + serviceName, startMode, ) } - for state, stateValue := range apiStateValues { + for state, stateValue := range c.apiStateValues { isCurrentState = 0.0 if state == service.ServiceStatusProcess.CurrentState { isCurrentState = 1.0 @@ -280,7 +312,7 @@ func (c *Collector) collectService(ch chan<- prometheus.Metric, service windows. c.state, prometheus.GaugeValue, isCurrentState, - serviceNameString, + serviceName, stateValue, ) } @@ -297,7 +329,7 @@ func (c *Collector) collectService(ch chan<- prometheus.Metric, service windows. c.processID, prometheus.GaugeValue, float64(processStartTime/1_000_000_000), - serviceNameString, + serviceName, processID, ) @@ -306,12 +338,12 @@ func (c *Collector) collectService(ch chan<- prometheus.Metric, service windows. if errors.Is(err, windows.ERROR_ACCESS_DENIED) { c.logger.Debug("failed to get process start time", - slog.String("service", serviceNameString), + slog.String("service", serviceName), slog.Any("err", err), ) } else { c.logger.Warn("failed to get process start time", - slog.String("service", serviceNameString), + slog.String("service", serviceName), slog.Any("err", err), ) } @@ -325,38 +357,38 @@ func (c *Collector) queryAllServices() ([]windows.ENUM_SERVICE_STATUS_PROCESS, e var ( bytesNeeded uint32 servicesReturned uint32 - resumeHandle uint32 + err error ) - if err := windows.EnumServicesStatusEx( - c.serviceManagerHandle.Handle, - windows.SC_STATUS_PROCESS_INFO, - windows.SERVICE_WIN32, - windows.SERVICE_STATE_ALL, - nil, - 0, - &bytesNeeded, - &servicesReturned, - &resumeHandle, - nil, - ); !errors.Is(err, windows.ERROR_MORE_DATA) { - return nil, fmt.Errorf("could not fetch buffer size for EnumServicesStatusEx: %w", err) - } + buf := make([]byte, 1024*100) - buf := make([]byte, bytesNeeded) - if err := windows.EnumServicesStatusEx( - c.serviceManagerHandle.Handle, - windows.SC_STATUS_PROCESS_INFO, - windows.SERVICE_WIN32, - windows.SERVICE_STATE_ALL, - &buf[0], - bytesNeeded, - &bytesNeeded, - &servicesReturned, - &resumeHandle, - nil, - ); err != nil { - return nil, fmt.Errorf("could not query windows service list: %w", err) + for { + err = windows.EnumServicesStatusEx( + c.serviceManagerHandle.Handle, + windows.SC_STATUS_PROCESS_INFO, + windows.SERVICE_WIN32, + windows.SERVICE_STATE_ALL, + &buf[0], + uint32(len(buf)), + &bytesNeeded, + &servicesReturned, + nil, + nil, + ) + + if err == nil { + break + } + + if !errors.Is(err, windows.ERROR_MORE_DATA) { + return nil, err + } + + if bytesNeeded <= uint32(len(buf)) { + return nil, err + } + + buf = make([]byte, bytesNeeded) } if servicesReturned == 0 { @@ -397,3 +429,44 @@ func (c *Collector) getProcessStartTime(pid uint32) (uint64, error) { return uint64(creation.Nanoseconds()), nil } + +// getServiceConfig is an optimized variant of [mgr.Service] that only +// retrieves the necessary information. +func (c *Collector) getServiceConfig(service *mgr.Service) (mgr.Config, error) { + var serviceConfig *windows.QUERY_SERVICE_CONFIG + + bytesNeeded := uint32(1024) + + buf, ok := c.serviceConfigPoolBytes.Get().(*[]byte) + if !ok || len(*buf) == 0 { + *buf = make([]byte, bytesNeeded) + } + + for { + serviceConfig = (*windows.QUERY_SERVICE_CONFIG)(unsafe.Pointer(&(*buf)[0])) + + err := windows.QueryServiceConfig(service.Handle, serviceConfig, bytesNeeded, &bytesNeeded) + if err == nil { + break + } + + if !errors.Is(err, windows.ERROR_INSUFFICIENT_BUFFER) { + return mgr.Config{}, err + } + + if bytesNeeded <= uint32(len(*buf)) { + return mgr.Config{}, err + } + + *buf = make([]byte, bytesNeeded) + } + + c.serviceConfigPoolBytes.Put(buf) + + return mgr.Config{ + BinaryPathName: windows.UTF16PtrToString(serviceConfig.BinaryPathName), + DisplayName: windows.UTF16PtrToString(serviceConfig.DisplayName), + StartType: serviceConfig.StartType, + ServiceStartName: windows.UTF16PtrToString(serviceConfig.ServiceStartName), + }, nil +}