Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions scheduler/queue/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/rs/zerolog"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -140,37 +141,38 @@ func (w *worker) resolveResource(ctx context.Context, table *schema.Table, clien
go func() {
defer close(resourcesChan)
var wg sync.WaitGroup
for i := range resourcesSlice {
i := i
chunks := [][]any{resourcesSlice}
if table.PreResourceChunkResolver != nil {
chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize)
}
for i := range chunks {
wg.Add(1)
go func() {
defer wg.Done()
resolvedResource := resolvers.ResolveSingleResource(ctx, w.logger, w.metrics, table, client, parent, resourcesSlice[i], w.caser)
if resolvedResource == nil {
return
}

if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil {
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
w.metrics.AddErrors(ctx, 1, selector)
return
}
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
resolvedResources := resolvers.ResolveResourcesChunk(ctx, w.logger, w.metrics, table, client, parent, chunks[i], w.caser)
for _, resolvedResource := range resolvedResources {
if err := resolvedResource.CalculateCQID(w.deterministicCQID); err != nil {
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
w.metrics.AddErrors(ctx, 1, selector)
return
case *schema.PKComponentError:
w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
}
}
select {
case resourcesChan <- resolvedResource:
case <-ctx.Done():
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
w.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
w.metrics.AddErrors(ctx, 1, selector)
return
case *schema.PKComponentError:
w.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
}
}
select {
case resourcesChan <- resolvedResource:
case <-ctx.Done():
}
}
}()
}
Expand Down
41 changes: 29 additions & 12 deletions scheduler/resolvers/resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ func resolveColumn(ctx context.Context, logger zerolog.Logger, m *metrics.Metric
}
}

func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, item any, c *caser.Caser) *schema.Resource {
func ResolveResourcesChunk(ctx context.Context, logger zerolog.Logger, m *metrics.Metrics, table *schema.Table, client schema.ClientMeta, parent *schema.Resource, chunk []any, c *caser.Caser) []*schema.Resource {
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()

resource := schema.NewResourceData(table, parent, item)
resources := make([]*schema.Resource, len(chunk))
for i, item := range chunk {
resources[i] = schema.NewResourceData(table, parent, item)
}
objectStartTime := time.Now()

clientID := client.ID()
Expand All @@ -60,25 +63,39 @@ func ResolveSingleResource(ctx context.Context, logger zerolog.Logger, m *metric
m.AddPanics(ctx, 1, selector)
}
}()
if table.PreResourceResolver != nil {
if err := table.PreResourceResolver(ctx, client, resource); err != nil {
tableLogger.Error().Err(err).Msg("pre resource resolver failed")

if table.PreResourceChunkResolver != nil {
if err := table.PreResourceChunkResolver.RowsResolver(ctx, client, resources); err != nil {
tableLogger.Error().Stack().Err(err).Msg("pre resource chunk resolver finished with error")
m.AddErrors(ctx, 1, selector)
return nil
}
}

for _, column := range table.Columns {
resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c)
if table.PreResourceResolver != nil {
for _, resource := range resources {
if err := table.PreResourceResolver(ctx, client, resource); err != nil {
tableLogger.Error().Err(err).Msg("pre resource resolver failed")
m.AddErrors(ctx, 1, selector)
return nil
}
}
}
for _, resource := range resources {
for _, column := range table.Columns {
resolveColumn(ctx, tableLogger, m, selector, client, resource, column, c)
}
}

if table.PostResourceResolver != nil {
if err := table.PostResourceResolver(ctx, client, resource); err != nil {
tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error")
m.AddErrors(ctx, 1, selector)
for _, resource := range resources {
if err := table.PostResourceResolver(ctx, client, resource); err != nil {
tableLogger.Error().Stack().Err(err).Msg("post resource resolver finished with error")
m.AddErrors(ctx, 1, selector)
}
}
}

m.AddResources(ctx, 1, selector)
return resource
m.AddResources(ctx, int64(len(resources)), selector)
return resources
}
51 changes: 28 additions & 23 deletions scheduler/scheduler_dfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
"github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
"github.com/cloudquery/plugin-sdk/v4/schema"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -157,8 +158,11 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
go func() {
defer close(resourcesChan)
var wg sync.WaitGroup
for i := range resourcesSlice {
i := i
chunks := [][]any{resourcesSlice}
if table.PreResourceChunkResolver != nil {
chunks = lo.Chunk(resourcesSlice, table.PreResourceChunkResolver.ChunkSize)
}
for i := range chunks {
resourceConcurrencyKey := table.Name + "-" + client.ID() + "-" + "resource"
resourceSemVal, _ := s.scheduler.singleTableConcurrency.LoadOrStore(resourceConcurrencyKey, semaphore.NewWeighted(s.scheduler.singleResourceMaxConcurrency))
resourceSem := resourceSemVal.(*semaphore.Weighted)
Expand All @@ -183,33 +187,34 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
defer resourceSem.Release(1)
defer s.scheduler.resourceSem.Release(1)
defer wg.Done()
//nolint:all
resolvedResource := resolvers.ResolveSingleResource(ctx, s.logger, s.metrics, table, client, parent, resourcesSlice[i], s.scheduler.caser)
if resolvedResource == nil {
resolvedResources := resolvers.ResolveResourcesChunk(ctx, s.logger, s.metrics, table, client, parent, chunks[i], s.scheduler.caser)
if len(resolvedResources) == 0 {
return
}

if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil {
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
s.metrics.AddErrors(ctx, 1, selector)
return
}
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
for _, resolvedResource := range resolvedResources {
if err := resolvedResource.CalculateCQID(s.deterministicCQID); err != nil {
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with primary key calculation error")
s.metrics.AddErrors(ctx, 1, selector)
return
case *schema.PKComponentError:
s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
}
}
select {
case resourcesChan <- resolvedResource:
case <-ctx.Done():
if err := resolvedResource.StoreCQClientID(client.ID()); err != nil {
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("failed to store _cq_client_id")
}
if err := resolvedResource.Validate(); err != nil {
switch err.(type) {
case *schema.PKError:
s.logger.Error().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation error")
s.metrics.AddErrors(ctx, 1, selector)
return
case *schema.PKComponentError:
s.logger.Warn().Err(err).Str("table", table.Name).Str("client", client.ID()).Msg("resource resolver finished with validation warning")
}
}
select {
case resourcesChan <- resolvedResource:
case <-ctx.Done():
}
}
}()
}
Expand Down
8 changes: 8 additions & 0 deletions schema/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ type TableResolver func(ctx context.Context, meta ClientMeta, parent *Resource,

type RowResolver func(ctx context.Context, meta ClientMeta, resource *Resource) error

type RowsChunkResolver struct {
ChunkSize int
RowsResolver func(ctx context.Context, meta ClientMeta, resourcesChunk []*Resource) error
}

type Multiplexer func(meta ClientMeta) []ClientMeta

type Transform func(table *Table) error
Expand Down Expand Up @@ -86,6 +91,9 @@ type Table struct {
// PreResourceResolver is called before all columns are resolved but after Resource is created. The ordering of resolvers is:
// (Table) Resolver β†’ PreResourceResolver β†’ ColumnResolvers β†’ PostResourceResolver
PreResourceResolver RowResolver `json:"-"`

PreResourceChunkResolver *RowsChunkResolver `json:"-"`

// IsIncremental is a flag that indicates if the table is incremental or not. This flag mainly affects how the table is
// documented.
IsIncremental bool `json:"is_incremental"`
Expand Down