Skip to content

Commit

Permalink
chore: more params details
Browse files Browse the repository at this point in the history
  • Loading branch information
brucexc committed Nov 12, 2024
1 parent 7157bb9 commit 4cd7d0b
Showing 1 changed file with 4 additions and 24 deletions.
28 changes: 4 additions & 24 deletions internal/database/dialer/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ func (c *client) Migrate(ctx context.Context) error {

// WithTransaction executes a transaction.
func (c *client) WithTransaction(ctx context.Context, transactionFunction func(ctx context.Context, client database.Client) error, transactionOptions ...*sql.TxOptions) error {
zap.L().Debug("starting database transaction")

transactionFunc := func() error {
transaction, err := c.Begin(ctx, transactionOptions...)
if err != nil {
Expand All @@ -76,8 +74,6 @@ func (c *client) WithTransaction(ctx context.Context, transactionFunction func(c
zap.L().Debug("transaction began successfully")

if err := transactionFunction(ctx, transaction); err != nil {
zap.L().Warn("rolling back transaction due to error", zap.Error(err))

_ = transaction.Rollback()

return fmt.Errorf("execute transaction: %w", err)
Expand All @@ -87,19 +83,12 @@ func (c *client) WithTransaction(ctx context.Context, transactionFunction func(c
return fmt.Errorf("commit transaction: %w", err)
}

zap.L().Debug("transaction committed successfully")

return nil
}

retryIfFunc := func(err error) bool {
// https://www.cockroachlabs.com/docs/stable/transaction-retry-error-reference#retry_serializable
shouldRetry := strings.Contains(err.Error(), "TransactionRetryWithProtoRefreshError: TransactionRetryError:")
if shouldRetry {
zap.L().Debug("transaction needs retry", zap.Error(err))
}

return shouldRetry
return strings.Contains(err.Error(), "TransactionRetryWithProtoRefreshError: TransactionRetryError:")
}

onRetryFunc := func(n uint, err error) {
Expand Down Expand Up @@ -168,9 +157,7 @@ func (c *client) LoadCheckpoint(ctx context.Context, id string, network networkx
}

zap.L().Debug("successfully loaded checkpoint",
zap.String("id", id),
zap.String("network", network.String()),
zap.String("worker", worker))
zap.Any("value", value))

return value.Export()
}
Expand Down Expand Up @@ -224,19 +211,14 @@ func (c *client) LoadCheckpoints(ctx context.Context, id string, network network
}

zap.L().Debug("successfully loaded checkpoints",
zap.Int("count", len(result)),
zap.String("id", id),
zap.String("network", network.String()),
zap.String("worker", worker))
zap.Any("checkpoints", result))

return result, nil
}

func (c *client) SaveCheckpoint(ctx context.Context, checkpoint *engine.Checkpoint) error {
zap.L().Debug("saving checkpoint",
zap.String("id", checkpoint.ID),
zap.String("state", string(checkpoint.State)),
zap.Int64("index_count", checkpoint.IndexCount))
zap.Any("checkpoint", checkpoint))

spanStartOptions := []trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindClient),
Expand Down Expand Up @@ -291,8 +273,6 @@ func (c *client) SaveActivities(ctx context.Context, activities []*activityx.Act
return nil
}

zap.L().Warn("saving activities without partition is not implemented")

return fmt.Errorf("not implemented")
}

Expand Down

0 comments on commit 4cd7d0b

Please sign in to comment.