Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gzip: Use sync.pool #1080

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/conveyor/conveyor.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,17 @@ func (c *Conveyor) metrics(ctx *stopper.Context) {
max := resolvedMaxTimestamp.WithLabelValues(labels...)
sourceLag := sourceLagDuration.WithLabelValues(labels...)
targetLag := targetLagDuration.WithLabelValues(labels...)
sourceLagHist := sourceLagHist.WithLabelValues(labels...)
targetLagHist := targetLagHist.WithLabelValues(labels...)
_, err := stopvar.DoWhenChangedOrInterval(ctx,
hlc.RangeEmpty(), &c.resolvingRange, tick,
func(ctx *stopper.Context, _, new hlc.Range) error {
min.Set(float64(new.Min().Nanos()) / 1e9)
targetLag.Set(float64(time.Now().UnixNano()-new.Min().Nanos()) / 1e9)
targetLagHist.Observe(float64(time.Now().UnixNano()-new.Min().Nanos()) / 1e9)
max.Set(float64(new.MaxInclusive().Nanos()) / 1e9)
sourceLag.Set(float64(time.Now().UnixNano()-new.MaxInclusive().Nanos()) / 1e9)
sourceLagHist.Observe(float64(time.Now().UnixNano()-new.MaxInclusive().Nanos()) / 1e9)
return nil
})
return err
Expand Down
11 changes: 11 additions & 0 deletions internal/conveyor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package conveyor

import (
"github.com/cockroachdb/replicator/internal/util/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand All @@ -38,10 +39,20 @@ var (
Name: "source_lag_seconds",
Help: "the age of the most recently received checkpoint",
}, []string{"kind", "target"})
sourceLagHist = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "source_lag_hist",
Buckets: metrics.LatencyBuckets,
Help: "the age of the most recently received checkpoint",
}, []string{"kind", "target"})
targetLagDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "target_lag_seconds",
Help: "the age of the data applied to the table",
}, []string{"kind", "target"})
targetLagHist = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "target_lag_hist",
Buckets: metrics.LatencyBuckets,
Help: "the age of the data applied to the table",
}, []string{"kind", "target"})
resolvedMinTimestamp = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "target_applied_timestamp_seconds",
Help: "the wall time of the most recent applied resolved timestamp",
Expand Down
1 change: 1 addition & 0 deletions internal/sequencer/core/round.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (r *round) scheduleCommit(
// retried, mark them as poisoned.
if r.targetPool.IsDeferrable(err) {
finalReport = false
log.WithError(err).Error("deferred event")
return lockset.RetryAtHead(err).Or(func() {
r.poisoned.MarkPoisoned(r.batch)
close(progressReport)
Expand Down
28 changes: 25 additions & 3 deletions internal/staging/stage/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"compress/gzip"
"io"
"sync"

"github.com/pkg/errors"
)
Expand All @@ -29,14 +30,31 @@ import (
// with some benefit for tables that have very wide rows or values.
const gzipMinSize = 1024

// Create a sync.Pool for gzip.Writer instances
var gzipWriterPool = sync.Pool{
New: func() interface{} {
return gzip.NewWriter(nil)
},
}

// Create a sync.Pool for gzip.Reader instances
var gzipReaderPool = sync.Pool{
New: func() interface{} {
return &gzip.Reader{}
},
}

// maybeGZip compresses the given data if it is larger than gzipMinSize.
func maybeGZip(data []byte) ([]byte, error) {
if len(data) <= gzipMinSize {
return data, nil
}

var buf bytes.Buffer
gzWriter := gzip.NewWriter(&buf)
gzWriter := gzipWriterPool.Get().(*gzip.Writer)
gzWriter.Reset(&buf)
ryanluu12345 marked this conversation as resolved.
Show resolved Hide resolved
defer gzipWriterPool.Put(gzWriter)
ryanluu12345 marked this conversation as resolved.
Show resolved Hide resolved

if _, err := gzWriter.Write(data); err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -55,9 +73,13 @@ func maybeGunzip(data []byte) ([]byte, error) {
if len(data) < 2 || data[0] != 0x1f || data[1] != 0x8b {
return data, nil
}
r, err := gzip.NewReader(bytes.NewReader(data))

gzReader := gzipReaderPool.Get().(*gzip.Reader)
defer gzipReaderPool.Put(gzReader)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Reset() failed below, are we sure we want to put this back in the pool?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm thats a good question. Im looking at the Reset code, and I think it just calls bufio.NewReader(r) to reset it back. So the put call is really just putting the base gzip.Reader{} object back and reset internally resets z.r which is the actual reader.

Does that check out with your understand of reset as well?

Copy link

@stevendanna stevendanna Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never looked at the inside of Reset before today :D.

Looking at this code, I'd be a little concerned that this could result in us keeping data alive for longer than is necessary because the gzipReader will still have a reference to it.

Given that your profile only shows the writer being a problem, I wonder if it would be better to start by just pooling the writer's and not the readers.

But don't feel a need to block on my feedback here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good call out. The reader doesn't show up in this profile since I think the other things dominated it.

We're doing a lot of profile work on this now, so I can certainly test that as well. The new heap profiles after this pool change don't seem to point to a big issue of data living for longer than necessary. But it could also be that GC is running very aggressively due to other poorly managed allocations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
For a reference, here is a new CPU profile taken after teh pooling changes. Im highlighting over where the reader shows up which is much smaller that the writer. But certainly there are way more places to look a for bad allocations or things remaining on the heap leading to tha giant GC block still.

err := gzReader.Reset(bytes.NewReader(data))
if err != nil {
return nil, errors.WithStack(err)
}
return io.ReadAll(r)
defer gzReader.Close()
return io.ReadAll(gzReader)
}
8 changes: 8 additions & 0 deletions internal/target/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,14 @@ func toDBType(colData *types.ColData, value any) (any, error) {
// exceeds the precision or scale for the target column.
return removeExponent(num).String(), nil
}

// if boolVal, ok := value.(bool); ok {
// if boolVal {
// return "true", nil
// }
// return "false", nil
// }

if value != nil && colData.Parse != nil {
// Target-driver specific fixups.
next, err := colData.Parse(value)
Expand Down
1 change: 1 addition & 0 deletions internal/util/stdpool/ora.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func oraErrorDeferrable(err error) bool {
if !ok {
return false
}
log.WithField("code", code).Error("oracle error code")
switch code {
case "1": // ORA-0001 unique constraint violated
// The MERGE that we execute uses read-committed reads, so
Expand Down
Loading