Properly close files before reopening
This commit is contained in:
parent
00a503129b
commit
1b23d62e3f
20
db.go
20
db.go
|
@ -351,27 +351,11 @@ func (s *Shard) persist() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
sf, err := os.Create(chunksFileName(p))
|
n, err := head.persist(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
xf, err := os.Create(indexFileName(p))
|
sz := fmt.Sprintf("%.2fMiB", float64(n)/1024/1024)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
iw := newIndexWriter(xf)
|
|
||||||
sw := newSeriesWriter(sf, iw, s.head.stats.MinTime)
|
|
||||||
|
|
||||||
defer sw.Close()
|
|
||||||
defer iw.Close()
|
|
||||||
|
|
||||||
if err := head.persist(sw, iw); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
sz := fmt.Sprintf("%.2fMiB", float64(sw.Size()+iw.Size())/1024/1024)
|
|
||||||
|
|
||||||
s.logger.Log("size", sz, "samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head")
|
s.logger.Log("size", sz, "samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head")
|
||||||
|
|
||||||
// Reopen block as persisted block for querying.
|
// Reopen block as persisted block for querying.
|
||||||
|
|
28
head.go
28
head.go
|
@ -2,6 +2,7 @@ package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
@ -135,15 +136,30 @@ func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HeadBlock) persist(sw SeriesWriter, iw IndexWriter) error {
|
func (h *HeadBlock) persist(p string) (int64, error) {
|
||||||
|
sf, err := os.Create(chunksFileName(p))
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
xf, err := os.Create(indexFileName(p))
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
iw := newIndexWriter(xf)
|
||||||
|
sw := newSeriesWriter(sf, iw, h.stats.MinTime)
|
||||||
|
|
||||||
|
defer sw.Close()
|
||||||
|
defer iw.Close()
|
||||||
|
|
||||||
for ref, cd := range h.index.forward {
|
for ref, cd := range h.index.forward {
|
||||||
if err := sw.WriteSeries(ref, cd.lset, []*chunkDesc{cd}); err != nil {
|
if err := sw.WriteSeries(ref, cd.lset, []*chunkDesc{cd}); err != nil {
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := iw.WriteStats(h.stats); err != nil {
|
if err := iw.WriteStats(h.stats); err != nil {
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
for n, v := range h.index.values {
|
for n, v := range h.index.values {
|
||||||
s := make([]string, 0, len(v))
|
s := make([]string, 0, len(v))
|
||||||
|
@ -152,15 +168,15 @@ func (h *HeadBlock) persist(sw SeriesWriter, iw IndexWriter) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := iw.WriteLabelIndex([]string{n}, s); err != nil {
|
if err := iw.WriteLabelIndex([]string{n}, s); err != nil {
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for t := range h.index.postings.m {
|
for t := range h.index.postings.m {
|
||||||
if err := iw.WritePostings(t.name, t.value, h.index.postings.get(t)); err != nil {
|
if err := iw.WritePostings(t.name, t.value, h.index.postings.get(t)); err != nil {
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return iw.Size() + sw.Size(), nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue