Merge pull request #13015 from bboreham/smaller-txring
tsdb: make transaction isolation data structures smaller
This commit is contained in:
commit
3f30ad3cc2
|
@ -2079,7 +2079,7 @@ func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled
|
||||||
nextAt: math.MinInt64,
|
nextAt: math.MinInt64,
|
||||||
}
|
}
|
||||||
if !isolationDisabled {
|
if !isolationDisabled {
|
||||||
s.txs = newTxRing(4)
|
s.txs = newTxRing(0)
|
||||||
}
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
|
@ -682,7 +682,7 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState *
|
||||||
|
|
||||||
// Removing the extra transactionIDs that are relevant for samples that
|
// Removing the extra transactionIDs that are relevant for samples that
|
||||||
// come after this chunk, from the total transactionIDs.
|
// come after this chunk, from the total transactionIDs.
|
||||||
appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples))
|
appendIDsToConsider := int(s.txs.txIDCount) - (totalSamples - (previousSamples + numSamples))
|
||||||
|
|
||||||
// Iterate over the appendIDs, find the first one that the isolation state says not
|
// Iterate over the appendIDs, find the first one that the isolation state says not
|
||||||
// to return.
|
// to return.
|
||||||
|
|
|
@ -2552,7 +2552,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
|
||||||
|
|
||||||
ok, _ := s.append(0, 0, 0, cOpts)
|
ok, _ := s.append(0, 0, 0, cOpts)
|
||||||
require.True(t, ok, "Series append failed.")
|
require.True(t, ok, "Series append failed.")
|
||||||
require.Equal(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.")
|
require.Equal(t, 0, int(s.txs.txIDCount), "Series should not have an appendID after append with appendID=0.")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHeadSeriesChunkRace(t *testing.T) {
|
func TestHeadSeriesChunkRace(t *testing.T) {
|
||||||
|
|
|
@ -240,8 +240,8 @@ func (i *isolation) closeAppend(appendID uint64) {
|
||||||
// The transactionID ring buffer.
|
// The transactionID ring buffer.
|
||||||
type txRing struct {
|
type txRing struct {
|
||||||
txIDs []uint64
|
txIDs []uint64
|
||||||
txIDFirst int // Position of the first id in the ring.
|
txIDFirst uint32 // Position of the first id in the ring.
|
||||||
txIDCount int // How many ids in the ring.
|
txIDCount uint32 // How many ids in the ring.
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTxRing(capacity int) *txRing {
|
func newTxRing(capacity int) *txRing {
|
||||||
|
@ -251,21 +251,28 @@ func newTxRing(capacity int) *txRing {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (txr *txRing) add(appendID uint64) {
|
func (txr *txRing) add(appendID uint64) {
|
||||||
if txr.txIDCount == len(txr.txIDs) {
|
if int(txr.txIDCount) == len(txr.txIDs) {
|
||||||
// Ring buffer is full, expand by doubling.
|
// Ring buffer is full, expand by doubling.
|
||||||
newRing := make([]uint64, txr.txIDCount*2)
|
newLen := txr.txIDCount * 2
|
||||||
|
if newLen == 0 {
|
||||||
|
newLen = 4
|
||||||
|
}
|
||||||
|
newRing := make([]uint64, newLen)
|
||||||
idx := copy(newRing, txr.txIDs[txr.txIDFirst:])
|
idx := copy(newRing, txr.txIDs[txr.txIDFirst:])
|
||||||
copy(newRing[idx:], txr.txIDs[:txr.txIDFirst])
|
copy(newRing[idx:], txr.txIDs[:txr.txIDFirst])
|
||||||
txr.txIDs = newRing
|
txr.txIDs = newRing
|
||||||
txr.txIDFirst = 0
|
txr.txIDFirst = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
txr.txIDs[(txr.txIDFirst+txr.txIDCount)%len(txr.txIDs)] = appendID
|
txr.txIDs[int(txr.txIDFirst+txr.txIDCount)%len(txr.txIDs)] = appendID
|
||||||
txr.txIDCount++
|
txr.txIDCount++
|
||||||
}
|
}
|
||||||
|
|
||||||
func (txr *txRing) cleanupAppendIDsBelow(bound uint64) {
|
func (txr *txRing) cleanupAppendIDsBelow(bound uint64) {
|
||||||
pos := txr.txIDFirst
|
if len(txr.txIDs) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pos := int(txr.txIDFirst)
|
||||||
|
|
||||||
for txr.txIDCount > 0 {
|
for txr.txIDCount > 0 {
|
||||||
if txr.txIDs[pos] < bound {
|
if txr.txIDs[pos] < bound {
|
||||||
|
@ -281,7 +288,7 @@ func (txr *txRing) cleanupAppendIDsBelow(bound uint64) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
txr.txIDFirst %= len(txr.txIDs)
|
txr.txIDFirst %= uint32(len(txr.txIDs))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (txr *txRing) iterator() *txRingIterator {
|
func (txr *txRing) iterator() *txRingIterator {
|
||||||
|
@ -296,7 +303,7 @@ func (txr *txRing) iterator() *txRingIterator {
|
||||||
type txRingIterator struct {
|
type txRingIterator struct {
|
||||||
ids []uint64
|
ids []uint64
|
||||||
|
|
||||||
pos int
|
pos uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
func (it *txRingIterator) At() uint64 {
|
func (it *txRingIterator) At() uint64 {
|
||||||
|
@ -305,7 +312,7 @@ func (it *txRingIterator) At() uint64 {
|
||||||
|
|
||||||
func (it *txRingIterator) Next() {
|
func (it *txRingIterator) Next() {
|
||||||
it.pos++
|
it.pos++
|
||||||
if it.pos == len(it.ids) {
|
if int(it.pos) == len(it.ids) {
|
||||||
it.pos = 0
|
it.pos = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue