Stop CMUX and GRPC servers even with stale connections (#7810)

I have received a recent report for a user which confirms that sometime
the GRPC server does not stop propery.

It appears that there are 2 issues:

1. The cmux server can refuse to stop if there are stale connections.
For that we set the ReadTimeout.
2. The GRPC server graceful stop can never finish.

What this PR avoids is:

```
goroutine 227 [semacquire, 2 minutes]:
sync.runtime_Semacquire(0xc00059a75c)
    /usr/local/go/src/runtime/sema.go:56 +0x42
sync.(*WaitGroup).Wait(0xc00059a75c)
    /usr/local/go/src/sync/waitgroup.go:130 +0x64
google.golang.org/grpc.(*Server).GracefulStop(0xc00059a600)
```

This PR stops the GRPC server after 15s. Related to the go routing dumps
in #6747.

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
Julien Pivotto 2020-08-17 10:50:32 +02:00 committed by GitHub
parent dca84112a9
commit e96d786fb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 98 additions and 3 deletions

View File

@ -540,6 +540,10 @@ func (h *Handler) Run(ctx context.Context) error {
httpl = m.Match(cmux.HTTP1Fast()) httpl = m.Match(cmux.HTTP1Fast())
grpcSrv = grpc.NewServer() grpcSrv = grpc.NewServer()
) )
// Prevent open connections to block the shutdown of the handler.
m.SetReadTimeout(h.options.ReadTimeout)
av2 := api_v2.New( av2 := api_v2.New(
h.options.LocalStorage, h.options.LocalStorage,
h.options.TSDBDir, h.options.TSDBDir,
@ -603,11 +607,27 @@ func (h *Handler) Run(ctx context.Context) error {
return e return e
case <-ctx.Done(): case <-ctx.Done():
httpSrv.Shutdown(ctx) httpSrv.Shutdown(ctx)
grpcSrv.GracefulStop() stopGRPCSrv(grpcSrv)
return nil return nil
} }
} }
// stopGRPCSrv stops a given GRPC server. An attempt to stop the server
// gracefully is made first. After 15s, the server to forced to stop.
func stopGRPCSrv(srv *grpc.Server) {
stop := make(chan struct{})
go func() {
srv.GracefulStop()
close(stop)
}()
select {
case <-time.After(15 * time.Second):
srv.Stop()
case <-stop:
}
}
func (h *Handler) alerts(w http.ResponseWriter, r *http.Request) { func (h *Handler) alerts(w http.ResponseWriter, r *http.Request) {
var groups []*rules.Group var groups []*rules.Group

View File

@ -17,6 +17,7 @@ import (
"context" "context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
@ -138,8 +139,10 @@ func TestReadyAndHealthy(t *testing.T) {
webHandler.config = &config.Config{} webHandler.config = &config.Config{}
webHandler.notifier = &notifier.Manager{} webHandler.notifier = &notifier.Manager{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { go func() {
err := webHandler.Run(context.Background()) err := webHandler.Run(ctx)
if err != nil { if err != nil {
panic(fmt.Sprintf("Can't start web handler:%s", err)) panic(fmt.Sprintf("Can't start web handler:%s", err))
} }
@ -323,8 +326,10 @@ func TestRoutePrefix(t *testing.T) {
opts.Flags = map[string]string{} opts.Flags = map[string]string{}
webHandler := New(nil, opts) webHandler := New(nil, opts)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { go func() {
err := webHandler.Run(context.Background()) err := webHandler.Run(ctx)
if err != nil { if err != nil {
panic(fmt.Sprintf("Can't start web handler:%s", err)) panic(fmt.Sprintf("Can't start web handler:%s", err))
} }
@ -459,3 +464,73 @@ func TestHTTPMetrics(t *testing.T) {
testutil.Equals(t, 2, int(prom_testutil.ToFloat64(counter.WithLabelValues("/-/ready", strconv.Itoa(http.StatusOK))))) testutil.Equals(t, 2, int(prom_testutil.ToFloat64(counter.WithLabelValues("/-/ready", strconv.Itoa(http.StatusOK)))))
testutil.Equals(t, 1, int(prom_testutil.ToFloat64(counter.WithLabelValues("/-/ready", strconv.Itoa(http.StatusServiceUnavailable))))) testutil.Equals(t, 1, int(prom_testutil.ToFloat64(counter.WithLabelValues("/-/ready", strconv.Itoa(http.StatusServiceUnavailable)))))
} }
func TestShutdownWithStaleConnection(t *testing.T) {
dbDir, err := ioutil.TempDir("", "tsdb-ready")
testutil.Ok(t, err)
defer testutil.Ok(t, os.RemoveAll(dbDir))
db, err := tsdb.Open(dbDir, nil, nil, nil)
testutil.Ok(t, err)
timeout := 10 * time.Second
opts := &Options{
ListenAddress: ":9090",
ReadTimeout: timeout,
MaxConnections: 512,
Context: nil,
Storage: nil,
LocalStorage: &dbAdapter{db},
TSDBDir: dbDir,
QueryEngine: nil,
ScrapeManager: &scrape.Manager{},
RuleManager: &rules.Manager{},
Notifier: nil,
RoutePrefix: "/",
ExternalURL: &url.URL{
Scheme: "http",
Host: "localhost:9090",
Path: "/",
},
Version: &PrometheusVersion{},
Gatherer: prometheus.DefaultGatherer,
}
opts.Flags = map[string]string{}
webHandler := New(nil, opts)
webHandler.config = &config.Config{}
webHandler.notifier = &notifier.Manager{}
closed := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
go func() {
err := webHandler.Run(ctx)
if err != nil {
panic(fmt.Sprintf("Can't start web handler:%s", err))
}
close(closed)
}()
// Give some time for the web goroutine to run since we need the server
// to be up before starting tests.
time.Sleep(5 * time.Second)
// Open a socket, and don't use it. This connection should then be closed
// after the ReadTimeout.
c, err := net.Dial("tcp", "localhost:9090")
testutil.Ok(t, err)
t.Cleanup(func() { testutil.Ok(t, c.Close()) })
// Stop the web handler.
cancel()
select {
case <-closed:
case <-time.After(timeout + 5*time.Second):
t.Fatalf("Server still running after read timeout.")
}
}