Skip to content

Commit

Permalink
wal: reduce in-use bytes in scenarios with many (and possibly large) …
Browse files Browse the repository at this point in the history
…writes (#562)

* wal: remove reference to logRequest when popping from slice

This allows the GC to collect logRequests that are not in use.

* wal: size batches according to segment size and track WAL queue

Previously, we weren't controlling the total size of WAL entries written at
once to the WAL. The underlying WAL does not rotate a file until after a batch
is written, so could allocate a lot (we've seen >GB in production) for the
commit buffer.

This commit should alleviate in-use bytes for cases where lots of [large]
entries are written.
  • Loading branch information
asubiotto authored Oct 12, 2023
1 parent 8627989 commit ae16844
Showing 1 changed file with 34 additions and 4 deletions.
38 changes: 34 additions & 4 deletions wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type fileWALMetrics struct {
walRepairs prometheus.Counter
walRepairsLostRecords prometheus.Counter
walCloseTimeouts prometheus.Counter
walQueueSize prometheus.Gauge
}

const dirPerms = os.FileMode(0o750)
Expand All @@ -90,6 +91,10 @@ type FileWAL struct {
nextTx uint64
}

// segmentSize indicates what the underlying WAL segment size is. This helps
// the run goroutine size batches more or less appropriately.
segmentSize int

cancel func()
shutdownCh chan struct{}
closeTimeout time.Duration
Expand Down Expand Up @@ -118,6 +123,10 @@ func (q *logRequestQueue) Pop() any {
old := *q
n := len(old)
x := old[n-1]
// Remove this reference to a logRequest since the GC considers the popped
// element still accessible otherwise. Since these are sync pooled, we want
// to defer object lifetime management to the pool without interfering.
old[n-1] = nil
*q = old[0 : n-1]
return x
}
Expand All @@ -132,7 +141,8 @@ func Open(
}

reg = prometheus.WrapRegistererWithPrefix("frostdb_wal_", reg)
logStore, err := wal.Open(path, wal.WithLogger(logger), wal.WithMetricsRegisterer(reg))
segmentSize := wal.DefaultSegmentSize
logStore, err := wal.Open(path, wal.WithLogger(logger), wal.WithMetricsRegisterer(reg), wal.WithSegmentSize(segmentSize))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,8 +191,13 @@ func Open(
Name: "close_timeouts_total",
Help: "The number of times the WAL failed to close due to a timeout",
}),
walQueueSize: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "queue_size",
Help: "The number of unprocessed requests in the WAL queue",
}),
},
shutdownCh: make(chan struct{}),
segmentSize: segmentSize,
shutdownCh: make(chan struct{}),
}

w.protected.nextTx = lastIndex + 1
Expand Down Expand Up @@ -242,7 +257,8 @@ func (w *FileWAL) run(ctx context.Context) {
case <-ticker.C:
batch := batch[:0]
w.protected.Lock()
for w.protected.queue.Len() > 0 {
batchSize := 0
for w.protected.queue.Len() > 0 && batchSize < w.segmentSize {
if minTx := w.protected.queue[0].tx; minTx != w.protected.nextTx {
if minTx < w.protected.nextTx {
// The next entry must be dropped otherwise progress
Expand All @@ -254,11 +270,14 @@ func (w *FileWAL) run(ctx context.Context) {
"found", minTx,
)
_ = heap.Pop(&w.protected.queue)
w.metrics.walQueueSize.Sub(1)
}
break
}
r := heap.Pop(&w.protected.queue).(*logRequest)
w.metrics.walQueueSize.Sub(1)
batch = append(batch, r)
batchSize += len(r.data)
w.protected.nextTx++
}
// truncateTx will be non-zero if we either are about to log a
Expand Down Expand Up @@ -319,7 +338,16 @@ func (w *FileWAL) run(ctx context.Context) {
}
}

for _, r := range batch {
// Remove references to a logRequest since the GC considers the
// popped element still accessible otherwise. Since these are sync
// pooled, we want to defer object lifetime management to the pool
// without interfering.
for i := range walBatch {
walBatch[i].Data = nil
}

for i, r := range batch {
batch[i] = nil
w.logRequestPool.Put(r)
}

Expand Down Expand Up @@ -380,6 +408,7 @@ func (w *FileWAL) Log(tx uint64, record *walpb.Record) error {

w.protected.Lock()
heap.Push(&w.protected.queue, r)
w.metrics.walQueueSize.Add(1)
w.protected.Unlock()

return nil
Expand Down Expand Up @@ -438,6 +467,7 @@ func (w *FileWAL) LogRecord(tx uint64, txnMetadata []byte, table string, record

w.protected.Lock()
heap.Push(&w.protected.queue, r)
w.metrics.walQueueSize.Add(1)
w.protected.Unlock()

return nil
Expand Down

0 comments on commit ae16844

Please sign in to comment.