From e2f4850fea78d9c902dc8faec20f71eff5701b1d Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Sat, 11 Nov 2017 12:06:13 +0000 Subject: [PATCH] Refactor main.go with oklog/pkg/group actors pattern --- cmd/prometheus/main.go | 274 ++++++++++++------ vendor/github.com/oklog/oklog/LICENSE | 201 +++++++++++++ .../github.com/oklog/oklog/pkg/group/group.go | 62 ++++ vendor/vendor.json | 6 + web/web.go | 14 +- 5 files changed, 462 insertions(+), 95 deletions(-) create mode 100644 vendor/github.com/oklog/oklog/LICENSE create mode 100644 vendor/github.com/oklog/oklog/pkg/group/group.go diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 75f1a3177..e0f03e6f8 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -31,6 +31,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/oklog/oklog/pkg/group" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -216,12 +217,6 @@ func main() { level.Info(logger).Log("build_context", version.BuildContext()) level.Info(logger).Log("host_details", Uname()) - // Make sure that sighup handler is registered with a redirect to the channel before the potentially - // long and synchronous tsdb init. - hup := make(chan os.Signal) - hupReady := make(chan bool) - signal.Notify(hup, syscall.SIGHUP) - var ( localStorage = &tsdb.ReadyStorage{} remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime) @@ -282,105 +277,196 @@ func main() { notifier, } - // Wait for reload or termination signals. Start the handler for SIGHUP as - // early as possible, but ignore it until we are ready to handle reloading - // our config. - go func() { - <-hupReady - for { - select { - case <-hup: - if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil { - level.Error(logger).Log("msg", "Error reloading config", "err", err) - } - case rc := <-webHandler.Reload(): - if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil { - level.Error(logger).Log("msg", "Error reloading config", "err", err) - rc <- err - } else { - rc <- nil - } - } - } - }() + prometheus.MustRegister(configSuccess) + prometheus.MustRegister(configSuccessTime) // Start all components while we wait for TSDB to open but only load // initial config and mark ourselves as ready after it completed. dbOpen := make(chan struct{}) + // Wait until the server is ready to handle reloading + reloadReady := make(chan struct{}) - go func() { - defer close(dbOpen) - - level.Info(logger).Log("msg", "Starting TSDB") - - db, err := tsdb.Open( - cfg.localStoragePath, - log.With(logger, "component", "tsdb"), - prometheus.DefaultRegisterer, - &cfg.tsdb, + var g group.Group + { + term := make(chan os.Signal) + signal.Notify(term, os.Interrupt, syscall.SIGTERM) + cancel := make(chan struct{}) + g.Add( + func() error { + select { + case <-term: + level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...") + case <-webHandler.Quit(): + level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...") + case <-cancel: + break + } + return nil + }, + func(err error) { + close(cancel) + }, ) - if err != nil { - level.Error(logger).Log("msg", "Opening storage failed", "err", err) - os.Exit(1) - } - level.Info(logger).Log("msg", "TSDB started") - - startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000) - localStorage.Set(db, startTimeMargin) - }() - - prometheus.MustRegister(configSuccess) - prometheus.MustRegister(configSuccessTime) - - // The notifier is a dependency of the rule manager. It has to be - // started before and torn down afterwards. - go notifier.Run() - defer notifier.Stop() - - go ruleManager.Run() - defer ruleManager.Stop() - - go targetManager.Run() - defer targetManager.Stop() - - // Shutting down the query engine before the rule manager will cause pending queries - // to be canceled and ensures a quick shutdown of the rule manager. - defer cancelCtx() - - errc := make(chan error) - go func() { errc <- webHandler.Run(ctx) }() - - <-dbOpen - - if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil { - level.Error(logger).Log("msg", "Error loading config", "err", err) - os.Exit(1) } + { + // Make sure that sighup handler is registered with a redirect to the channel before the potentially + // long and synchronous tsdb init. + hup := make(chan os.Signal) + signal.Notify(hup, syscall.SIGHUP) + cancel := make(chan struct{}) + g.Add( + func() error { + select { + case <-reloadReady: + break + // In case a shutdown is initiated before the reloadReady is released. + case <-cancel: + return nil + } - defer func() { - if err := fanoutStorage.Close(); err != nil { - level.Error(logger).Log("msg", "Error stopping storage", "err", err) - } - }() + for { + select { + case <-hup: + if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil { + level.Error(logger).Log("msg", "Error reloading config", "err", err) + } + case rc := <-webHandler.Reload(): + if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil { + level.Error(logger).Log("msg", "Error reloading config", "err", err) + rc <- err + } else { + rc <- nil + } + case <-cancel: + return nil + } + } - // Wait for reload or termination signals. - close(hupReady) // Unblock SIGHUP handler. - - // Set web server to ready. - webHandler.Ready() - level.Info(logger).Log("msg", "Server is ready to receive requests.") - - term := make(chan os.Signal, 1) - signal.Notify(term, os.Interrupt, syscall.SIGTERM) - select { - case <-term: - level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...") - case <-webHandler.Quit(): - level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...") - case err := <-errc: - level.Error(logger).Log("msg", "Error starting web server, exiting gracefully", "err", err) + }, + func(err error) { + close(cancel) + }, + ) } + { + cancel := make(chan struct{}) + g.Add( + func() error { + select { + case <-dbOpen: + break + // In case a shutdown is initiated before the dbOpen is released + case <-cancel: + return nil + } + if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil { + return fmt.Errorf("Error loading config %s", err) + } + + close(reloadReady) + webHandler.Ready() + level.Info(logger).Log("msg", "Server is ready to receive requests.") + <-cancel + return nil + }, + func(err error) { + close(cancel) + }, + ) + } + { + cancel := make(chan struct{}) + g.Add( + func() error { + level.Info(logger).Log("msg", "Starting TSDB ...") + db, err := tsdb.Open( + cfg.localStoragePath, + log.With(logger, "component", "tsdb"), + prometheus.DefaultRegisterer, + &cfg.tsdb, + ) + if err != nil { + return fmt.Errorf("Opening storage failed %s", err) + } + level.Info(logger).Log("msg", "TSDB started") + + startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000) + localStorage.Set(db, startTimeMargin) + close(dbOpen) + <-cancel + return nil + }, + func(err error) { + if err := fanoutStorage.Close(); err != nil { + level.Error(logger).Log("msg", "Error stopping storage", "err", err) + } + close(cancel) + }, + ) + } + { + g.Add( + func() error { + if err := webHandler.Run(ctx); err != nil { + return fmt.Errorf("Error starting web server: %s", err) + } + return nil + }, + func(err error) { + // Keep this interrupt before the ruleManager.Stop(). + // Shutting down the query engine before the rule manager will cause pending queries + // to be canceled and ensures a quick shutdown of the rule manager. + cancelCtx() + }, + ) + } + { + // TODO(krasi) refactor ruleManager.Run() to be blocking to avoid using an extra blocking channel. + cancel := make(chan struct{}) + g.Add( + func() error { + ruleManager.Run() + <-cancel + return nil + }, + func(err error) { + ruleManager.Stop() + close(cancel) + }, + ) + } + { + // Calling notifier.Stop() before ruleManager.Stop() will cause a panic if the ruleManager isn't running, + // so keep this interrupt after the ruleManager.Stop(). + g.Add( + func() error { + notifier.Run() + return nil + }, + func(err error) { + notifier.Stop() + }, + ) + } + { + // TODO(krasi) refactor targetManager.Run() to be blocking to avoid using an extra blocking channel. + cancel := make(chan struct{}) + g.Add( + func() error { + targetManager.Run() + <-cancel + return nil + }, + func(err error) { + targetManager.Stop() + close(cancel) + }, + ) + } + if err := g.Run(); err != nil { + level.Error(logger).Log("err", err) + } level.Info(logger).Log("msg", "See you next time!") } diff --git a/vendor/github.com/oklog/oklog/LICENSE b/vendor/github.com/oklog/oklog/LICENSE new file mode 100644 index 000000000..8dada3eda --- /dev/null +++ b/vendor/github.com/oklog/oklog/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/oklog/oklog/pkg/group/group.go b/vendor/github.com/oklog/oklog/pkg/group/group.go new file mode 100644 index 000000000..72b69f4b3 --- /dev/null +++ b/vendor/github.com/oklog/oklog/pkg/group/group.go @@ -0,0 +1,62 @@ +// Package group implements an actor-runner with deterministic teardown. It is +// somewhat similar to package errgroup, except it does not require actor +// goroutines to understand context semantics. This makes it suitable for use +// in more circumstances; for example, goroutines which are handling +// connections from net.Listeners, or scanning input from a closable io.Reader. +package group + +// Group collects actors (functions) and runs them concurrently. +// When one actor (function) returns, all actors are interrupted. +// The zero value of a Group is useful. +type Group struct { + actors []actor +} + +// Add an actor (function) to the group. Each actor must be pre-emptable by an +// interrupt function. That is, if interrupt is invoked, execute should return. +// Also, it must be safe to call interrupt even after execute has returned. +// +// The first actor (function) to return interrupts all running actors. +// The error is passed to the interrupt functions, and is returned by Run. +func (g *Group) Add(execute func() error, interrupt func(error)) { + g.actors = append(g.actors, actor{execute, interrupt}) +} + +// Run all actors (functions) concurrently. +// When the first actor returns, all others are interrupted. +// Run only returns when all actors have exited. +// Run returns the error returned by the first exiting actor. +func (g *Group) Run() error { + if len(g.actors) == 0 { + return nil + } + + // Run each actor. + errors := make(chan error, len(g.actors)) + for _, a := range g.actors { + go func(a actor) { + errors <- a.execute() + }(a) + } + + // Wait for the first actor to stop. + err := <-errors + + // Signal all actors to stop. + for _, a := range g.actors { + a.interrupt(err) + } + + // Wait for all actors to stop. + for i := 1; i < cap(errors); i++ { + <-errors + } + + // Return the original error. + return err +} + +type actor struct { + execute func() error + interrupt func(error) +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 0766c91a7..f3ec812e3 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -665,6 +665,12 @@ "revision": "1d49c987357a327b5b03aa84cbddd582c328615d", "revisionTime": "2016-09-28T00:14:32Z" }, + { + "checksumSHA1": "gkyBg/2hcIWR/8qGEeGVoHwOyfo=", + "path": "github.com/oklog/oklog/pkg/group", + "revision": "f857583a70c345341d679b3f27aa542c8db70a21", + "revisionTime": "2017-09-18T07:00:58Z" + }, { "checksumSHA1": "B1iGaUz7NrjEmCjVdIgH5pvkTe8=", "path": "github.com/oklog/ulid", diff --git a/web/web.go b/web/web.go index 93e37dcb4..1ded9650e 100644 --- a/web/web.go +++ b/web/web.go @@ -460,7 +460,19 @@ func (h *Handler) Run(ctx context.Context) error { } }() - return m.Serve() + errCh := make(chan error) + go func() { + errCh <- m.Serve() + }() + + select { + case e := <-errCh: + return e + case <-ctx.Done(): + httpSrv.Shutdown(ctx) + grpcSrv.GracefulStop() + return nil + } } func (h *Handler) alerts(w http.ResponseWriter, r *http.Request) {