From a83014f53c64cf6ae101faebaf38645106d52c95 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 26 May 2017 08:44:24 +0200 Subject: [PATCH 1/2] retrieval: fix memory leak and consumption for caches --- pkg/textparse/parse.go | 4 +- retrieval/scrape.go | 82 +++++++++++++++++++++++++++------------- retrieval/scrape_test.go | 28 +++++++------- 3 files changed, 72 insertions(+), 42 deletions(-) diff --git a/pkg/textparse/parse.go b/pkg/textparse/parse.go index b3cdd6fcd..c540b627f 100644 --- a/pkg/textparse/parse.go +++ b/pkg/textparse/parse.go @@ -99,7 +99,7 @@ func (p *Parser) Err() error { } // Metric writes the labels of the current sample into the passed labels. -func (p *Parser) Metric(l *labels.Labels) { +func (p *Parser) Metric(l *labels.Labels) string { // Allocate the full immutable string immediately, so we just // have to create references on it below. s := string(p.l.b[p.l.mstart:p.l.mend]) @@ -118,6 +118,8 @@ func (p *Parser) Metric(l *labels.Labels) { } sort.Sort((*l)[1:]) + + return s } func yoloString(b []byte) string { diff --git a/retrieval/scrape.go b/retrieval/scrape.go index b646c5d99..2180307f8 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -416,25 +416,31 @@ type loop interface { type lsetCacheEntry struct { lset labels.Labels - str string + hash uint64 +} + +type refEntry struct { + ref string + lastIter uint64 } type scrapeLoop struct { scraper scraper l log.Logger + iter uint64 // scrape iteration + appender func() storage.Appender reportAppender func() storage.Appender - // TODO: Keep only the values from the last scrape to avoid a memory leak. - refCache map[string]string // Parsed string to ref. - lsetCache map[string]lsetCacheEntry // Ref to labelset and string + refCache map[string]*refEntry // Parsed string to ref. + lsetCache map[string]*lsetCacheEntry // Ref to labelset and string // seriesCur and seriesPrev store the labels of series that were seen // in the current and previous scrape. // We hold two maps and swap them out to save allocations. - seriesCur map[string]labels.Labels - seriesPrev map[string]labels.Labels + seriesCur map[uint64]labels.Labels + seriesPrev map[uint64]labels.Labels ctx context.Context scrapeCtx context.Context @@ -450,10 +456,10 @@ func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storag scraper: sc, appender: app, reportAppender: reportApp, - refCache: map[string]string{}, - lsetCache: map[string]lsetCacheEntry{}, - seriesCur: map[string]labels.Labels{}, - seriesPrev: map[string]labels.Labels{}, + refCache: map[string]*refEntry{}, + lsetCache: map[string]*lsetCacheEntry{}, + seriesCur: map[uint64]labels.Labels{}, + seriesPrev: map[uint64]labels.Labels{}, stopped: make(chan struct{}), ctx: ctx, l: l, @@ -481,6 +487,8 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { mainLoop: for { + sl.iter++ + buf.Reset() select { case <-sl.ctx.Done(): @@ -526,6 +534,16 @@ mainLoop: sl.report(start, time.Since(start), total, added, err) last = start + // refCache and lsetCache may grow over time through series churn + // or multiple string representation of the same metric. Clean up entries + // that haven't appeared in the last scrape. + for s, e := range sl.refCache { + if e.lastIter < sl.iter { + delete(sl.refCache, s) + delete(sl.lsetCache, e.ref) + } + } + select { case <-sl.ctx.Done(): close(sl.stopped) @@ -637,13 +655,17 @@ loop: } mets := yoloString(met) - ref, ok := sl.refCache[mets] + re, ok := sl.refCache[mets] if ok { - switch err = app.AddFast(ref, t, v); err { + re.lastIter = sl.iter + + switch err = app.AddFast(re.ref, t, v); err { case nil: if tp == nil { + e := sl.lsetCache[re.ref] + // Bypass staleness logic if there is an explicit timestamp. - sl.seriesCur[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset + sl.seriesCur[e.hash] = e.lset } case storage.ErrNotFound: ok = false @@ -652,10 +674,10 @@ loop: continue case storage.ErrOutOfOrderSample: sl.l.With("timeseries", string(met)).Debug("Out of order sample") - numOutOfOrder += 1 + numOutOfOrder++ continue case storage.ErrDuplicateSampleForTimestamp: - numDuplicates += 1 + numDuplicates++ sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") continue default: @@ -664,8 +686,9 @@ loop: } if !ok { var lset labels.Labels - p.Metric(&lset) + mets = p.Metric(&lset) + var ref string ref, err = app.Add(lset, t, v) // TODO(fabxc): also add a dropped-cache? switch err { @@ -676,24 +699,27 @@ loop: case storage.ErrOutOfOrderSample: err = nil sl.l.With("timeseries", string(met)).Debug("Out of order sample") - numOutOfOrder += 1 + numOutOfOrder++ continue case storage.ErrDuplicateSampleForTimestamp: err = nil - numDuplicates += 1 + numDuplicates++ sl.l.With("timeseries", string(met)).Debug("Duplicate sample for timestamp") continue default: break loop } - // Allocate a real string. - mets = string(met) - sl.refCache[mets] = ref - str := lset.String() - sl.lsetCache[ref] = lsetCacheEntry{lset: lset, str: str} + + sl.refCache[mets] = &refEntry{ref: ref, lastIter: sl.iter} + + // mets is the raw string the metric was ingested as and ambigious as it might + // not be sorted. Construct the authoritative string for the label set. + h := lset.Hash() + + sl.lsetCache[ref] = &lsetCacheEntry{lset: lset, hash: h} if tp == nil { // Bypass staleness logic if there is an explicit timestamp. - sl.seriesCur[str] = lset + sl.seriesCur[h] = lset } } added++ @@ -807,10 +833,12 @@ func (sl *scrapeLoop) reportStale(start time.Time) error { } func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { - ref, ok := sl.refCache[s] + re, ok := sl.refCache[s] if ok { - err := app.AddFast(ref, t, v) + re.lastIter = sl.iter + + err := app.AddFast(re.ref, t, v) switch err { case nil: return nil @@ -830,7 +858,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v ref, err := app.Add(met, t, v) switch err { case nil: - sl.refCache[s] = ref + sl.refCache[s] = &refEntry{ref: ref, lastIter: sl.iter} return nil case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: return nil diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index c76a70da3..d9a36e04e 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -604,10 +604,10 @@ func TestScrapeLoopAppend(t *testing.T) { sl := &scrapeLoop{ appender: func() storage.Appender { return app }, reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]string{}, - lsetCache: map[string]lsetCacheEntry{}, - seriesCur: map[string]labels.Labels{}, - seriesPrev: map[string]labels.Labels{}, + refCache: map[string]*refEntry{}, + lsetCache: map[string]*lsetCacheEntry{}, + seriesCur: map[uint64]labels.Labels{}, + seriesPrev: map[uint64]labels.Labels{}, } now := time.Now() @@ -645,10 +645,10 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { sl := &scrapeLoop{ appender: func() storage.Appender { return app }, reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]string{}, - lsetCache: map[string]lsetCacheEntry{}, - seriesCur: map[string]labels.Labels{}, - seriesPrev: map[string]labels.Labels{}, + refCache: map[string]*refEntry{}, + lsetCache: map[string]*lsetCacheEntry{}, + seriesCur: map[uint64]labels.Labels{}, + seriesPrev: map[uint64]labels.Labels{}, } now := time.Now() @@ -691,8 +691,8 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { sl := &scrapeLoop{ appender: func() storage.Appender { return app }, reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]string{}, - lsetCache: map[string]lsetCacheEntry{}, + refCache: map[string]*refEntry{}, + lsetCache: map[string]*lsetCacheEntry{}, } now := time.Now() @@ -740,10 +740,10 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) { sl := &scrapeLoop{ appender: func() storage.Appender { return app }, reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]string{}, - lsetCache: map[string]lsetCacheEntry{}, - seriesCur: map[string]labels.Labels{}, - seriesPrev: map[string]labels.Labels{}, + refCache: map[string]*refEntry{}, + lsetCache: map[string]*lsetCacheEntry{}, + seriesCur: map[uint64]labels.Labels{}, + seriesPrev: map[uint64]labels.Labels{}, l: log.Base(), } From bc7aff8cef1bc669c8473c541363566c5daa0b68 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 26 May 2017 10:44:48 +0200 Subject: [PATCH 2/2] retrieval: extract scrape cache --- pkg/textparse/parse.go | 1 + retrieval/scrape.go | 199 ++++++++++++++++++++++++--------------- retrieval/scrape_test.go | 65 ++++++------- 3 files changed, 151 insertions(+), 114 deletions(-) diff --git a/pkg/textparse/parse.go b/pkg/textparse/parse.go index c540b627f..cfb0a4ce4 100644 --- a/pkg/textparse/parse.go +++ b/pkg/textparse/parse.go @@ -99,6 +99,7 @@ func (p *Parser) Err() error { } // Metric writes the labels of the current sample into the passed labels. +// It returns the string from which the metric was parsed. func (p *Parser) Metric(l *labels.Labels) string { // Allocate the full immutable string immediately, so we just // have to create references on it below. diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 2180307f8..b01c8bb69 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -120,6 +120,16 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable // Any errors that could occur here should be caught during config validation. log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err) } + + newLoop := func( + ctx context.Context, + s scraper, + app, reportApp func() storage.Appender, + l log.Logger, + ) loop { + return newScrapeLoop(ctx, s, app, reportApp, l) + } + return &scrapePool{ appendable: app, config: cfg, @@ -127,7 +137,7 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable client: client, targets: map[uint64]*Target{}, loops: map[uint64]loop{}, - newLoop: newScrapeLoop, + newLoop: newLoop, } } @@ -427,39 +437,110 @@ type refEntry struct { type scrapeLoop struct { scraper scraper l log.Logger - - iter uint64 // scrape iteration + cache *scrapeCache appender func() storage.Appender reportAppender func() storage.Appender - refCache map[string]*refEntry // Parsed string to ref. - lsetCache map[string]*lsetCacheEntry // Ref to labelset and string - - // seriesCur and seriesPrev store the labels of series that were seen - // in the current and previous scrape. - // We hold two maps and swap them out to save allocations. - seriesCur map[uint64]labels.Labels - seriesPrev map[uint64]labels.Labels - ctx context.Context scrapeCtx context.Context cancel func() stopped chan struct{} } -func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storage.Appender, l log.Logger) loop { +// scrapeCache tracks mappings of exposed metric strings to label sets and +// storage references. Additionally, it tracks staleness of series between +// scrapes. +type scrapeCache struct { + iter uint64 // Current scrape iteration. + + refs map[string]*refEntry // Parsed string to ref. + lsets map[string]*lsetCacheEntry // Ref to labelset and string. + + // seriesCur and seriesPrev store the labels of series that were seen + // in the current and previous scrape. + // We hold two maps and swap them out to save allocations. + seriesCur map[uint64]labels.Labels + seriesPrev map[uint64]labels.Labels +} + +func newScrapeCache() *scrapeCache { + return &scrapeCache{ + refs: map[string]*refEntry{}, + lsets: map[string]*lsetCacheEntry{}, + seriesCur: map[uint64]labels.Labels{}, + seriesPrev: map[uint64]labels.Labels{}, + } +} + +func (c *scrapeCache) iterDone() { + // refCache and lsetCache may grow over time through series churn + // or multiple string representations of the same metric. Clean up entries + // that haven't appeared in the last scrape. + for s, e := range c.refs { + if e.lastIter < c.iter { + delete(c.refs, s) + delete(c.lsets, e.ref) + } + } + + // Swap current and previous series. + c.seriesPrev, c.seriesCur = c.seriesCur, c.seriesPrev + + // We have to delete every single key in the map. + for k := range c.seriesCur { + delete(c.seriesCur, k) + } + + c.iter++ +} + +func (c *scrapeCache) getRef(met string) (string, bool) { + e, ok := c.refs[met] + if !ok { + return "", false + } + e.lastIter = c.iter + return e.ref, true +} + +func (c *scrapeCache) addRef(met, ref string, lset labels.Labels) { + c.refs[met] = &refEntry{ref: ref, lastIter: c.iter} + // met is the raw string the metric was ingested as. The label set is not ordered + // and thus it's not suitable to uniquely identify cache entries. + // We store a hash over the label set instead. + c.lsets[ref] = &lsetCacheEntry{lset: lset, hash: lset.Hash()} +} + +func (c *scrapeCache) trackStaleness(ref string) { + e := c.lsets[ref] + c.seriesCur[e.hash] = e.lset +} + +func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { + for h, lset := range c.seriesPrev { + if _, ok := c.seriesCur[h]; !ok { + if !f(lset) { + break + } + } + } +} + +func newScrapeLoop( + ctx context.Context, + sc scraper, + app, reportApp func() storage.Appender, + l log.Logger, +) *scrapeLoop { if l == nil { l = log.Base() } sl := &scrapeLoop{ scraper: sc, appender: app, + cache: newScrapeCache(), reportAppender: reportApp, - refCache: map[string]*refEntry{}, - lsetCache: map[string]*lsetCacheEntry{}, - seriesCur: map[uint64]labels.Labels{}, - seriesPrev: map[uint64]labels.Labels{}, stopped: make(chan struct{}), ctx: ctx, l: l, @@ -487,8 +568,6 @@ func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { mainLoop: for { - sl.iter++ - buf.Reset() select { case <-sl.ctx.Done(): @@ -534,16 +613,6 @@ mainLoop: sl.report(start, time.Since(start), total, added, err) last = start - // refCache and lsetCache may grow over time through series churn - // or multiple string representation of the same metric. Clean up entries - // that haven't appeared in the last scrape. - for s, e := range sl.refCache { - if e.lastIter < sl.iter { - delete(sl.refCache, s) - delete(sl.lsetCache, e.ref) - } - } - select { case <-sl.ctx.Done(): close(sl.stopped) @@ -654,18 +723,12 @@ loop: t = *tp } - mets := yoloString(met) - re, ok := sl.refCache[mets] + ref, ok := sl.cache.getRef(yoloString(met)) if ok { - re.lastIter = sl.iter - - switch err = app.AddFast(re.ref, t, v); err { + switch err = app.AddFast(ref, t, v); err { case nil: if tp == nil { - e := sl.lsetCache[re.ref] - - // Bypass staleness logic if there is an explicit timestamp. - sl.seriesCur[e.hash] = e.lset + sl.cache.trackStaleness(ref) } case storage.ErrNotFound: ok = false @@ -686,7 +749,7 @@ loop: } if !ok { var lset labels.Labels - mets = p.Metric(&lset) + mets := p.Metric(&lset) var ref string ref, err = app.Add(lset, t, v) @@ -710,16 +773,11 @@ loop: break loop } - sl.refCache[mets] = &refEntry{ref: ref, lastIter: sl.iter} + sl.cache.addRef(mets, ref, lset) - // mets is the raw string the metric was ingested as and ambigious as it might - // not be sorted. Construct the authoritative string for the label set. - h := lset.Hash() - - sl.lsetCache[ref] = &lsetCacheEntry{lset: lset, hash: h} if tp == nil { // Bypass staleness logic if there is an explicit timestamp. - sl.seriesCur[h] = lset + sl.cache.trackStaleness(ref) } } added++ @@ -734,25 +792,19 @@ loop: sl.l.With("numDropped", numDuplicates).Warn("Error on ingesting samples with different value but same timestamp") } if err == nil { - for metric, lset := range sl.seriesPrev { - if _, ok := sl.seriesCur[metric]; !ok { - // Series no longer exposed, mark it stale. - _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) - switch err { - case nil: - case errSeriesDropped: - err = nil - continue - case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: - // Do not count these in logging, as this is expected if a target - // goes away and comes back again with a new scrape loop. - err = nil - continue - default: - break - } + sl.cache.forEachStale(func(lset labels.Labels) bool { + // Series no longer exposed, mark it stale. + _, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN)) + switch err { + case errSeriesDropped: + err = nil + case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: + // Do not count these in logging, as this is expected if a target + // goes away and comes back again with a new scrape loop. + err = nil } - } + return err == nil + }) } if err != nil { app.Rollback() @@ -762,13 +814,7 @@ loop: return total, 0, err } - // Swap current and previous series. - sl.seriesPrev, sl.seriesCur = sl.seriesCur, sl.seriesPrev - - // We have to delete every single key in the map. - for k := range sl.seriesCur { - delete(sl.seriesCur, k) - } + sl.cache.iterDone() return total, added, nil } @@ -833,12 +879,13 @@ func (sl *scrapeLoop) reportStale(start time.Time) error { } func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error { - re, ok := sl.refCache[s] + // Suffix s with the invalid \xff unicode rune to avoid collisions + // with scraped metrics. + s2 := s + "\xff" + ref, ok := sl.cache.getRef(s2) if ok { - re.lastIter = sl.iter - - err := app.AddFast(re.ref, t, v) + err := app.AddFast(ref, t, v) switch err { case nil: return nil @@ -858,7 +905,7 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v ref, err := app.Add(met, t, v) switch err { case nil: - sl.refCache[s] = &refEntry{ref: ref, lastIter: sl.iter} + sl.cache.addRef(s2, ref, met) return nil case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: return nil diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index d9a36e04e..3f5565530 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -373,11 +373,9 @@ func TestScrapeLoopStop(t *testing.T) { // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { - numScrapes += 1 + numScrapes++ if numScrapes == 2 { - go func() { - sl.stop() - }() + go sl.stop() } w.Write([]byte("metric_a 42\n")) return nil @@ -398,7 +396,7 @@ func TestScrapeLoopStop(t *testing.T) { t.Fatalf("Appended samples not as expected. Wanted: at least %d samples Got: %d", 2, len(appender.result)) } if !value.IsStaleNaN(appender.result[len(appender.result)-1].v) { - t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)].v)) + t.Fatalf("Appended last sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.result[len(appender.result)-1].v)) } if len(reportAppender.result) < 8 { @@ -515,7 +513,8 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { - numScrapes += 1 + numScrapes++ + if numScrapes == 1 { w.Write([]byte("metric_a 42\n")) return nil @@ -564,7 +563,8 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { // Succeed once, several failures, then stop. scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { - numScrapes += 1 + numScrapes++ + if numScrapes == 1 { w.Write([]byte("metric_a 42\n")) return nil @@ -601,15 +601,12 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { func TestScrapeLoopAppend(t *testing.T) { app := &collectResultAppender{} - sl := &scrapeLoop{ - appender: func() storage.Appender { return app }, - reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]*refEntry{}, - lsetCache: map[string]*lsetCacheEntry{}, - seriesCur: map[uint64]labels.Labels{}, - seriesPrev: map[uint64]labels.Labels{}, - } + sl := newScrapeLoop(context.Background(), nil, + func() storage.Appender { return app }, + func() storage.Appender { return nopAppender{} }, + nil, + ) now := time.Now() _, _, err := sl.append([]byte("metric_a 1\nmetric_b NaN\n"), now) if err != nil { @@ -642,14 +639,11 @@ func TestScrapeLoopAppend(t *testing.T) { func TestScrapeLoopAppendStaleness(t *testing.T) { app := &collectResultAppender{} - sl := &scrapeLoop{ - appender: func() storage.Appender { return app }, - reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]*refEntry{}, - lsetCache: map[string]*lsetCacheEntry{}, - seriesCur: map[uint64]labels.Labels{}, - seriesPrev: map[uint64]labels.Labels{}, - } + sl := newScrapeLoop(context.Background(), nil, + func() storage.Appender { return app }, + func() storage.Appender { return nopAppender{} }, + nil, + ) now := time.Now() _, _, err := sl.append([]byte("metric_a 1\n"), now) @@ -688,12 +682,11 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { app := &collectResultAppender{} - sl := &scrapeLoop{ - appender: func() storage.Appender { return app }, - reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]*refEntry{}, - lsetCache: map[string]*lsetCacheEntry{}, - } + sl := newScrapeLoop(context.Background(), nil, + func() storage.Appender { return app }, + func() storage.Appender { return nopAppender{} }, + nil, + ) now := time.Now() _, _, err := sl.append([]byte("metric_a 1 1000\n"), now) @@ -737,15 +730,11 @@ func (app *errorAppender) AddFast(ref string, t int64, v float64) error { func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrder(t *testing.T) { app := &errorAppender{} - sl := &scrapeLoop{ - appender: func() storage.Appender { return app }, - reportAppender: func() storage.Appender { return nopAppender{} }, - refCache: map[string]*refEntry{}, - lsetCache: map[string]*lsetCacheEntry{}, - seriesCur: map[uint64]labels.Labels{}, - seriesPrev: map[uint64]labels.Labels{}, - l: log.Base(), - } + sl := newScrapeLoop(context.Background(), nil, + func() storage.Appender { return app }, + func() storage.Appender { return nopAppender{} }, + nil, + ) now := time.Unix(1, 0) _, _, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\n"), now)