diff --git a/internal/conveyor/conveyor.go b/internal/conveyor/conveyor.go index 1d1dcd5bf..cf36d1d48 100644 --- a/internal/conveyor/conveyor.go +++ b/internal/conveyor/conveyor.go @@ -259,13 +259,17 @@ func (c *Conveyor) metrics(ctx *stopper.Context) { max := resolvedMaxTimestamp.WithLabelValues(labels...) sourceLag := sourceLagDuration.WithLabelValues(labels...) targetLag := targetLagDuration.WithLabelValues(labels...) + sourceLagHist := sourceLagHist.WithLabelValues(labels...) + targetLagHist := targetLagHist.WithLabelValues(labels...) _, err := stopvar.DoWhenChangedOrInterval(ctx, hlc.RangeEmpty(), &c.resolvingRange, tick, func(ctx *stopper.Context, _, new hlc.Range) error { min.Set(float64(new.Min().Nanos()) / 1e9) targetLag.Set(float64(time.Now().UnixNano()-new.Min().Nanos()) / 1e9) + targetLagHist.Observe(float64(time.Now().UnixNano()-new.Min().Nanos()) / 1e9) max.Set(float64(new.MaxInclusive().Nanos()) / 1e9) sourceLag.Set(float64(time.Now().UnixNano()-new.MaxInclusive().Nanos()) / 1e9) + sourceLagHist.Observe(float64(time.Now().UnixNano()-new.MaxInclusive().Nanos()) / 1e9) return nil }) return err diff --git a/internal/conveyor/metrics.go b/internal/conveyor/metrics.go index 4aa80174f..76b3955c4 100644 --- a/internal/conveyor/metrics.go +++ b/internal/conveyor/metrics.go @@ -17,6 +17,7 @@ package conveyor import ( + "github.com/cockroachdb/replicator/internal/util/metrics" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -38,10 +39,20 @@ var ( Name: "source_lag_seconds", Help: "the age of the most recently received checkpoint", }, []string{"kind", "target"}) + sourceLagHist = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "source_lag_hist", + Buckets: metrics.LatencyBuckets, + Help: "the age of the most recently received checkpoint", + }, []string{"kind", "target"}) targetLagDuration = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "target_lag_seconds", Help: "the age of the data applied to the table", }, []string{"kind", "target"}) + targetLagHist = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "target_lag_hist", + Buckets: metrics.LatencyBuckets, + Help: "the age of the data applied to the table", + }, []string{"kind", "target"}) resolvedMinTimestamp = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "target_applied_timestamp_seconds", Help: "the wall time of the most recent applied resolved timestamp", diff --git a/internal/sequencer/core/round.go b/internal/sequencer/core/round.go index 292d3e4f5..deac6839a 100644 --- a/internal/sequencer/core/round.go +++ b/internal/sequencer/core/round.go @@ -118,6 +118,7 @@ func (r *round) scheduleCommit( // retried, mark them as poisoned. if r.targetPool.IsDeferrable(err) { finalReport = false + log.WithError(err).Error("deferred event") return lockset.RetryAtHead(err).Or(func() { r.poisoned.MarkPoisoned(r.batch) close(progressReport) diff --git a/internal/staging/stage/gzip.go b/internal/staging/stage/gzip.go index 73e2dba26..abd3187b0 100644 --- a/internal/staging/stage/gzip.go +++ b/internal/staging/stage/gzip.go @@ -20,6 +20,7 @@ import ( "bytes" "compress/gzip" "io" + "sync" "github.com/pkg/errors" ) @@ -29,6 +30,20 @@ import ( // with some benefit for tables that have very wide rows or values. const gzipMinSize = 1024 +// Create a sync.Pool for gzip.Writer instances +var gzipWriterPool = sync.Pool{ + New: func() interface{} { + return gzip.NewWriter(nil) + }, +} + +// Create a sync.Pool for gzip.Reader instances +var gzipReaderPool = sync.Pool{ + New: func() interface{} { + return &gzip.Reader{} + }, +} + // maybeGZip compresses the given data if it is larger than gzipMinSize. func maybeGZip(data []byte) ([]byte, error) { if len(data) <= gzipMinSize { @@ -36,7 +51,10 @@ func maybeGZip(data []byte) ([]byte, error) { } var buf bytes.Buffer - gzWriter := gzip.NewWriter(&buf) + gzWriter := gzipWriterPool.Get().(*gzip.Writer) + gzWriter.Reset(&buf) + defer gzipWriterPool.Put(gzWriter) + if _, err := gzWriter.Write(data); err != nil { return nil, errors.WithStack(err) } @@ -55,9 +73,13 @@ func maybeGunzip(data []byte) ([]byte, error) { if len(data) < 2 || data[0] != 0x1f || data[1] != 0x8b { return data, nil } - r, err := gzip.NewReader(bytes.NewReader(data)) + + gzReader := gzipReaderPool.Get().(*gzip.Reader) + defer gzipReaderPool.Put(gzReader) + err := gzReader.Reset(bytes.NewReader(data)) if err != nil { return nil, errors.WithStack(err) } - return io.ReadAll(r) + defer gzReader.Close() + return io.ReadAll(gzReader) } diff --git a/internal/target/apply/apply.go b/internal/target/apply/apply.go index 70eceaecb..44210d516 100644 --- a/internal/target/apply/apply.go +++ b/internal/target/apply/apply.go @@ -820,6 +820,14 @@ func toDBType(colData *types.ColData, value any) (any, error) { // exceeds the precision or scale for the target column. return removeExponent(num).String(), nil } + + // if boolVal, ok := value.(bool); ok { + // if boolVal { + // return "true", nil + // } + // return "false", nil + // } + if value != nil && colData.Parse != nil { // Target-driver specific fixups. next, err := colData.Parse(value) diff --git a/internal/util/stdpool/ora.go b/internal/util/stdpool/ora.go index 824f9226a..d54c7aa5a 100644 --- a/internal/util/stdpool/ora.go +++ b/internal/util/stdpool/ora.go @@ -42,6 +42,7 @@ func oraErrorDeferrable(err error) bool { if !ok { return false } + log.WithField("code", code).Error("oracle error code") switch code { case "1": // ORA-0001 unique constraint violated // The MERGE that we execute uses read-committed reads, so