Extract head serialization into Head method
This commit is contained in:
parent
b08f82fa4e
commit
bad93d8d57
40
db.go
40
db.go
|
@ -333,6 +333,8 @@ func (s *Shard) persist() error {
|
|||
|
||||
s.mtx.Unlock()
|
||||
|
||||
// Only allow another persistence to be triggered after the current one
|
||||
// has completed (successful or not.)
|
||||
defer func() {
|
||||
<-s.persistCh
|
||||
}()
|
||||
|
@ -345,11 +347,11 @@ func (s *Shard) persist() error {
|
|||
return err
|
||||
}
|
||||
|
||||
sf, err := os.Create(filepath.Join(p, "series"))
|
||||
sf, err := os.Create(chunksFileName(p))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
xf, err := os.Create(filepath.Join(p, "index"))
|
||||
xf, err := os.Create(indexFileName(p))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -360,36 +362,24 @@ func (s *Shard) persist() error {
|
|||
defer sw.Close()
|
||||
defer iw.Close()
|
||||
|
||||
for ref, cd := range head.index.forward {
|
||||
if err := sw.WriteSeries(ref, cd.lset, []*chunkDesc{cd}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := iw.WriteStats(s.head.stats); err != nil {
|
||||
if err := head.persist(sw, iw); err != nil {
|
||||
return err
|
||||
}
|
||||
for n, v := range head.index.values {
|
||||
s := make([]string, 0, len(v))
|
||||
for x := range v {
|
||||
s = append(s, x)
|
||||
}
|
||||
|
||||
if err := iw.WriteLabelIndex([]string{n}, s); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for t := range head.index.postings.m {
|
||||
if err := iw.WritePostings(t.name, t.value, head.index.postings.get(t)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
sz := fmt.Sprintf("%.2fMiB", float64(sw.Size()+iw.Size())/1024/1024)
|
||||
|
||||
s.logger.Log("size", sz, "samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head")
|
||||
|
||||
// Reopen block as persisted block for querying.
|
||||
pb, err := newPersistedBlock(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.mtx.Lock()
|
||||
s.persisted = append(s.persisted, pb)
|
||||
s.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
30
head.go
30
head.go
|
@ -134,3 +134,33 @@ func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *HeadBlock) persist(sw SeriesWriter, iw IndexWriter) error {
|
||||
for ref, cd := range h.index.forward {
|
||||
if err := sw.WriteSeries(ref, cd.lset, []*chunkDesc{cd}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := iw.WriteStats(h.stats); err != nil {
|
||||
return err
|
||||
}
|
||||
for n, v := range h.index.values {
|
||||
s := make([]string, 0, len(v))
|
||||
for x := range v {
|
||||
s = append(s, x)
|
||||
}
|
||||
|
||||
if err := iw.WriteLabelIndex([]string{n}, s); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for t := range h.index.postings.m {
|
||||
if err := iw.WritePostings(t.name, t.value, h.index.postings.get(t)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue