From 729634a1a6923a0e16532ad187d3e41190c4656c Mon Sep 17 00:00:00 2001 From: erezrokah Date: Thu, 25 Sep 2025 10:48:48 +0100 Subject: [PATCH] feat: Support chunks in list/details patterns --- scheduler/queue/worker.go | 52 +++++++++++++++++--------------- scheduler/resolvers/resolvers.go | 41 +++++++++++++++++-------- scheduler/scheduler_dfs.go | 51 +++++++++++++++++-------------- schema/table.go | 8 +++++ 4 files changed, 92 insertions(+), 60 deletions(-) diff --git a/scheduler/queue/worker.go b/scheduler/queue/worker.go index afec99ee59..be4864d628 100644 --- a/scheduler/queue/worker.go +++ b/scheduler/queue/worker.go @@ -14,6 +14,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers" "github.com/cloudquery/plugin-sdk/v4/schema" "github.com/rs/zerolog" + "github.com/samber/lo" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -140,37 +141,38 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien go func() { defer close(resourcesChan) var wg sync.WaitGroup - for i := range resourcesSlice { - i := i + chunks := [][]any{resourcesSlice} + if table.PreResourceChunkResolver != nil { + chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize) + } + for i := range chunks { wg.Add(1) go func() { defer wg.Done() - resolvedResource := resolvers.ResolveSingleResource(ctx, w.logger, w.metrics, table, client, parent, resourcesSlice[i], w.caser) - if resolvedResource == nil { - return - } - - if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil { - w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") - w.metrics.AddErrors(ctx, 1, selector) - return - } - if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { - w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id") - } - if err := resolvedResource.Validate(); err != nil { - switch err.(type) { - case *schema.PKError: - w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") + resolvedResources := resolvers.ResolveResourcesChunk(ctx, w.logger, w.metrics, table, client, parent, chunks[i], w.caser) + for _, resolvedResource := range resolvedResources { + if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil { + w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") w.metrics.AddErrors(ctx, 1, selector) return - case *schema.PKComponentError: - w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") } - } - select { - case resourcesChan <- resolvedResource: - case <-ctx.Done(): + if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { + w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id") + } + if err := resolvedResource.Validate(); err != nil { + switch err.(type) { + case *schema.PKError: + w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") + w.metrics.AddErrors(ctx, 1, selector) + return + case *schema.PKComponentError: + w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") + } + } + select { + case resourcesChan <- resolvedResource: + case <-ctx.Done(): + } } }() } diff --git a/scheduler/resolvers/resolvers.go b/scheduler/resolvers/resolvers.go index 56f8ead9e6..2d20aee9eb 100644 --- a/scheduler/resolvers/resolvers.go +++ b/scheduler/resolvers/resolvers.go @@ -41,11 +41,14 @@ func resolveColumn(ctx context.Context, logger zerolog.Logger, m *metrics.Metric } } -func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, item any, c *caser.Caser) *schema.Resource { +func ResolveResourcesChunk(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, chunk []any, c *caser.Caser) []*schema.Resource { ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() - resource := schema.NewResourceData(table, parent, item) + resources := make([]*schema.Resource, len(chunk)) + for i, item := range chunk { + resources[i] = schema.NewResourceData(table, parent, item) + } objectStartTime := time.Now() clientID := client.ID() @@ -60,25 +63,39 @@ func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metric m.AddPanics(ctx, 1, selector) } }() - if table.PreResourceResolver != nil { - if err := table.PreResourceResolver(ctx, client, resource); err != nil { - tableLogger.Error().Err(err).Msg("pre resource resolver failed") + + if table.PreResourceChunkResolver != nil { + if err := table.PreResourceChunkResolver.RowsResolver(ctx, client, resources); err != nil { + tableLogger.Error().Stack().Err(err).Msg("pre resource chunk resolver finished with error") m.AddErrors(ctx, 1, selector) return nil } } - for _, column := range table.Columns { - resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c) + if table.PreResourceResolver != nil { + for _, resource := range resources { + if err := table.PreResourceResolver(ctx, client, resource); err != nil { + tableLogger.Error().Err(err).Msg("pre resource resolver failed") + m.AddErrors(ctx, 1, selector) + return nil + } + } + } + for _, resource := range resources { + for _, column := range table.Columns { + resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c) + } } if table.PostResourceResolver != nil { - if err := table.PostResourceResolver(ctx, client, resource); err != nil { - tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error") - m.AddErrors(ctx, 1, selector) + for _, resource := range resources { + if err := table.PostResourceResolver(ctx, client, resource); err != nil { + tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error") + m.AddErrors(ctx, 1, selector) + } } } - m.AddResources(ctx, 1, selector) - return resource + m.AddResources(ctx, int64(len(resources)), selector) + return resources } diff --git a/scheduler/scheduler_dfs.go b/scheduler/scheduler_dfs.go index dfdf4703d7..515cd9bc77 100644 --- a/scheduler/scheduler_dfs.go +++ b/scheduler/scheduler_dfs.go @@ -13,6 +13,7 @@ import ( "github.com/cloudquery/plugin-sdk/v4/scheduler/metrics" "github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers" "github.com/cloudquery/plugin-sdk/v4/schema" + "github.com/samber/lo" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -157,8 +158,11 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl go func() { defer close(resourcesChan) var wg sync.WaitGroup - for i := range resourcesSlice { - i := i + chunks := [][]any{resourcesSlice} + if table.PreResourceChunkResolver != nil { + chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize) + } + for i := range chunks { resourceConcurrencyKey := table.Name + "-" + client.ID() + "-" + "resource" resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleResourceMaxConcurrency)) resourceSem := resourceSemVal.(*semaphore.Weighted) @@ -183,33 +187,34 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl defer resourceSem.Release(1) defer s.scheduler.resourceSem.Release(1) defer wg.Done() - //nolint:all - resolvedResource := resolvers.ResolveSingleResource(ctx, s.logger, s.metrics, table, client, parent, resourcesSlice[i], s.scheduler.caser) - if resolvedResource == nil { + resolvedResources := resolvers.ResolveResourcesChunk(ctx, s.logger, s.metrics, table, client, parent, chunks[i], s.scheduler.caser) + if len(resolvedResources) == 0 { return } - if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil { - s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") - s.metrics.AddErrors(ctx, 1, selector) - return - } - if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { - s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id") - } - if err := resolvedResource.Validate(); err != nil { - switch err.(type) { - case *schema.PKError: - s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") + for _, resolvedResource := range resolvedResources { + if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil { + s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error") s.metrics.AddErrors(ctx, 1, selector) return - case *schema.PKComponentError: - s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") } - } - select { - case resourcesChan <- resolvedResource: - case <-ctx.Done(): + if err := resolvedResource.StoreCQClientID(client.ID()); err != nil { + s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id") + } + if err := resolvedResource.Validate(); err != nil { + switch err.(type) { + case *schema.PKError: + s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error") + s.metrics.AddErrors(ctx, 1, selector) + return + case *schema.PKComponentError: + s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning") + } + } + select { + case resourcesChan <- resolvedResource: + case <-ctx.Done(): + } } }() } diff --git a/schema/table.go b/schema/table.go index 49c309c465..d96c4ace0c 100644 --- a/schema/table.go +++ b/schema/table.go @@ -25,6 +25,11 @@ type TableResolver func(ctx context.Context, meta ClientMeta, parent *Resource, type RowResolver func(ctx context.Context, meta ClientMeta, resource *Resource) error +type RowsChunkResolver struct { + ChunkSize int + RowsResolver func(ctx context.Context, meta ClientMeta, resourcesChunk []*Resource) error +} + type Multiplexer func(meta ClientMeta) []ClientMeta type Transform func(table *Table) error @@ -86,6 +91,9 @@ type Table struct { // PreResourceResolver is called before all columns are resolved but after Resource is created. The ordering of resolvers is: // (Table) Resolver → PreResourceResolver → ColumnResolvers → PostResourceResolver PreResourceResolver RowResolver `json:"-"` + + PreResourceChunkResolver *RowsChunkResolver `json:"-"` + // IsIncremental is a flag that indicates if the table is incremental or not. This flag mainly affects how the table is // documented. IsIncremental bool `json:"is_incremental"`