From 5e1c258a984b4650eed07eccfc1810b90908e4f0 Mon Sep 17 00:00:00 2001 From: Bas Harenslak Date: Wed, 4 Oct 2017 21:51:34 +0200 Subject: [PATCH] Instrument WAL fsync --- db.go | 2 +- wal.go | 36 +++++++++++++++++++++++++++++++++--- wal_test.go | 18 +++++++++--------- 3 files changed, 43 insertions(+), 13 deletions(-) diff --git a/db.go b/db.go index 72665ae45..8b3bc6872 100644 --- a/db.go +++ b/db.go @@ -204,7 +204,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db return nil, errors.Wrap(err, "create leveled compactor") } - wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval) + wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval, r) if err != nil { return nil, err } diff --git a/wal.go b/wal.go index 2f6bb35bc..d98e06082 100644 --- a/wal.go +++ b/wal.go @@ -32,6 +32,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/tsdb/labels" + "github.com/prometheus/client_golang/prometheus" ) // WALEntryType indicates what data a WAL entry contains. @@ -65,6 +66,26 @@ type SeriesCB func([]RefSeries) error // is only valid until the call returns. type DeletesCB func([]Stone) error +type walMetrics struct { + fsyncDuration prometheus.Summary +} + +func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics { + m := &walMetrics{} + + m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "tsdb_wal_fsync_duration_seconds", + Help: "Duration of WAL fsync.", + }) + + if r != nil { + r.MustRegister( + m.fsyncDuration, + ) + } + return m +} + // WAL is a write ahead log that can log new series labels and samples. // It must be completely read before new entries are logged. type WAL interface { @@ -150,6 +171,7 @@ func newCRC32() hash.Hash32 { // SegmentWAL is a write ahead log for series data. type SegmentWAL struct { mtx sync.Mutex + metrics *walMetrics dirFile *os.File files []*segmentFile @@ -169,7 +191,7 @@ type SegmentWAL struct { // OpenSegmentWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. -func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) { +func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, r prometheus.Registerer) (*SegmentWAL, error) { if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } @@ -190,6 +212,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) segmentSize: walSegmentSizeBytes, crc32: newCRC32(), } + w.metrics = newWalMetrics(w, r) fns, err := sequenceFiles(w.dirFile.Name()) if err != nil { @@ -592,7 +615,10 @@ func (w *SegmentWAL) Sync() error { } if head != nil { // But only fsync the head segment after releasing the mutex as it will block on disk I/O. - return fileutil.Fdatasync(head.File) + start := time.Now() + err := fileutil.Fdatasync(head.File) + w.metrics.fsyncDuration.Observe(time.Since(start).Seconds()) + return err } return nil } @@ -604,7 +630,11 @@ func (w *SegmentWAL) sync() error { if w.head() == nil { return nil } - return fileutil.Fdatasync(w.head().File) + + start := time.Now() + err := fileutil.Fdatasync(w.head().File) + w.metrics.fsyncDuration.Observe(time.Since(start).Seconds()) + return err } func (w *SegmentWAL) flush() error { diff --git a/wal_test.go b/wal_test.go index b5a69ee0f..d469a888c 100644 --- a/wal_test.go +++ b/wal_test.go @@ -44,7 +44,7 @@ func TestSegmentWAL_Open(t *testing.T) { } // Initialize 5 correct segment files. - w, err := OpenSegmentWAL(tmpdir, nil, 0) + w, err := OpenSegmentWAL(tmpdir, nil, 0, nil) require.NoError(t, err) require.Equal(t, 5, len(w.files), "unexpected number of segments loaded") @@ -74,7 +74,7 @@ func TestSegmentWAL_Open(t *testing.T) { _, err = f.WriteAt([]byte{0}, 4) require.NoError(t, err) - w, err = OpenSegmentWAL(tmpdir, nil, 0) + w, err = OpenSegmentWAL(tmpdir, nil, 0, nil) require.Error(t, err, "open with corrupted segments") } @@ -84,7 +84,7 @@ func TestSegmentWAL_cut(t *testing.T) { defer os.RemoveAll(tmpdir) // This calls cut() implicitly the first time without a previous tail. - w, err := OpenSegmentWAL(tmpdir, nil, 0) + w, err := OpenSegmentWAL(tmpdir, nil, 0, nil) require.NoError(t, err) require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!"))) @@ -131,7 +131,7 @@ func TestSegmentWAL_Truncate(t *testing.T) { require.NoError(t, err) // defer os.RemoveAll(dir) - w, err := OpenSegmentWAL(dir, nil, 0) + w, err := OpenSegmentWAL(dir, nil, 0, nil) require.NoError(t, err) w.segmentSize = 10000 @@ -181,7 +181,7 @@ func TestSegmentWAL_Truncate(t *testing.T) { require.NoError(t, w.Close()) // The same again with a new WAL. - w, err = OpenSegmentWAL(dir, nil, 0) + w, err = OpenSegmentWAL(dir, nil, 0, nil) require.NoError(t, err) var readSeries []RefSeries @@ -221,7 +221,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) { // Open WAL a bunch of times, validate all previous data can be read, // write more data to it, close it. for k := 0; k < numMetrics; k += numMetrics / iterations { - w, err := OpenSegmentWAL(dir, nil, 0) + w, err := OpenSegmentWAL(dir, nil, 0, nil) require.NoError(t, err) // Set smaller segment size so we can actually write several files. @@ -390,7 +390,7 @@ func TestWALRestoreCorrupted(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - w, err := OpenSegmentWAL(dir, nil, 0) + w, err := OpenSegmentWAL(dir, nil, 0, nil) require.NoError(t, err) require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}})) @@ -415,7 +415,7 @@ func TestWALRestoreCorrupted(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) - w2, err := OpenSegmentWAL(dir, logger, 0) + w2, err := OpenSegmentWAL(dir, logger, 0, nil) require.NoError(t, err) r := w2.Reader() @@ -446,7 +446,7 @@ func TestWALRestoreCorrupted(t *testing.T) { // We should see the first valid entry and the new one, everything after // is truncated. - w3, err := OpenSegmentWAL(dir, logger, 0) + w3, err := OpenSegmentWAL(dir, logger, 0, nil) require.NoError(t, err) r = w3.Reader()