From e0bfbbd21e08fc7e846212520509a28b9a0bc897 Mon Sep 17 00:00:00 2001 From: ZhouXing19 Date: Thu, 14 Nov 2024 10:06:44 -0500 Subject: [PATCH] draft --- internal/sinktest/mocks/collector.go | 1 + internal/source/objstore/conn.go | 7 +++++- internal/source/objstore/conn_test.go | 23 +++++++++++++++++++- internal/source/objstore/eventproc/local.go | 9 +++++--- internal/source/objstore/integration_test.go | 1 + 5 files changed, 36 insertions(+), 5 deletions(-) diff --git a/internal/sinktest/mocks/collector.go b/internal/sinktest/mocks/collector.go index c74baf09..6cae5a98 100644 --- a/internal/sinktest/mocks/collector.go +++ b/internal/sinktest/mocks/collector.go @@ -46,6 +46,7 @@ var _ eventproc.Processor = &Collector{} func NewCollector(capacity int) *Collector { c := &Collector{} c.mu.batch = make([]string, 0, capacity) + c.mu.batchChan = make(chan *types.BatchCursor) return c } diff --git a/internal/source/objstore/conn.go b/internal/source/objstore/conn.go index 431fc101..164e7481 100644 --- a/internal/source/objstore/conn.go +++ b/internal/source/objstore/conn.go @@ -391,7 +391,12 @@ func (c *Conn) processBatch(ctx *stopper.Context, resolved hlc.Time, files []str err := c.retry(operation, fmt.Sprintf("processing %s", file)) processDuration.WithLabelValues(c.config.bucketName). Observe(float64(time.Since(start).Seconds())) - return errors.Wrapf(err, "process %q failed", file) + if err != nil { + return errors.Wrapf(err, "process %q failed", file) + } + // log.Infof("successfully processed %q", file) + return nil + }) } err := g.Wait() diff --git a/internal/source/objstore/conn_test.go b/internal/source/objstore/conn_test.go index 8b54e73f..4c858541 100644 --- a/internal/source/objstore/conn_test.go +++ b/internal/source/objstore/conn_test.go @@ -119,11 +119,31 @@ func TestApplyWithinRange(t *testing.T) { timeRange := timeRange(low, high) conn, tracker, err := buildTimestampVerifier(rootFS) r.NoError(err) + // Configure the time range we are interested in. conn.config.MinTimestamp = timeRange.Min() conn.config.timeRange = timeRange + conn.config.prefix = baseDir + + //stop.Go(func(ctx *stopper.Context) error { + // resChan, err := conn.Read(ctx) + // if err != nil { + // return err + // } + // for { + // select { + // case res, ok := <-resChan: + // if !ok { + // } + // t.Logf("received %d muts", res.Batch.Count()) + // case <-ctx.Stopping(): + // } + // } + //}) + stop.Go( func(ctx *stopper.Context) error { + // Calling state.setlast. err := conn.apply(ctx, baseDir) return err }) @@ -172,6 +192,7 @@ func TestApplyRange(t *testing.T) { r.NoError(err) conn, _, err := buildConn(rootFS, defaultProbTransientError) r.NoError(err) + for _, rg := range ranges { conveyor := ×tampTracker{} collector := mocks.NewCollector(defaultUpperLimit) @@ -415,7 +436,7 @@ func buildTimestampVerifier(fs fs.FS) (*Conn, *timestampTracker, error) { if err != nil { return nil, nil, err } - processor := eventproc.NewLocal(conveyor, bucket, parser, ident.MustSchema(ident.Public)) + processor := eventproc.NewLocal(bucket, parser, ident.MustSchema(ident.Public)) return &Conn{ bucket: bucket, config: &Config{ diff --git a/internal/source/objstore/eventproc/local.go b/internal/source/objstore/eventproc/local.go index 100ee1ca..bf8bff56 100644 --- a/internal/source/objstore/eventproc/local.go +++ b/internal/source/objstore/eventproc/local.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/replicator/internal/util/hlc" "github.com/cockroachdb/replicator/internal/util/ident" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" ) // Acceptor receives batches for processing. @@ -51,9 +52,10 @@ func (c *localProcessor) BatchChan() <-chan *types.BatchCursor { // NewLocal creates a local processor. func NewLocal(bucket bucket.Bucket, parser *cdcjson.NDJsonParser, schema ident.Schema) Processor { return &localProcessor{ - bucket: bucket, - parser: parser, - schema: schema, + bucket: bucket, + parser: parser, + schema: schema, + batchChan: make(chan *types.BatchCursor), } } @@ -79,6 +81,7 @@ func (c *localProcessor) Process( if err != nil { return errors.Wrapf(err, "failed to parse %s", path) } + log.Infof("pushing to cursor chan") // Send the batch downstream to the target. for _, tempBatch := range batch.Data { diff --git a/internal/source/objstore/integration_test.go b/internal/source/objstore/integration_test.go index ed74d189..ec3f4e1b 100644 --- a/internal/source/objstore/integration_test.go +++ b/internal/source/objstore/integration_test.go @@ -201,6 +201,7 @@ func TestWorkload(t *testing.T) { } func testWorkload(t *testing.T, fc *fixtureConfig) { + log.SetLevel(log.TraceLevel) r := require.New(t) fixture, err := all.NewFixture(t)