Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: engine core module log #642

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions internal/engine/worker/decentralized/core/arweave/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/rss3-network/protocol-go/schema/typex"
"github.com/samber/lo"
"github.com/shopspring/decimal"
"go.uber.org/zap"
)

var _ engine.Worker = (*worker)(nil)
Expand Down Expand Up @@ -67,6 +68,8 @@ func (w *worker) Transform(ctx context.Context, task engine.Task) (*activityx.Ac
return nil, fmt.Errorf("invalid task type: %T", task)
}

zap.L().Debug("transforming arweave task", zap.String("task_id", arweaveTask.ID()))

// Build the activity.
activity, err := task.BuildActivity()
if err != nil {
Expand All @@ -85,6 +88,8 @@ func (w *worker) Transform(ctx context.Context, task engine.Task) (*activityx.Ac
activity.Actions = append(activity.Actions, action)
}

zap.L().Debug("successfully transformed arweave task")

return activity, nil
}

Expand Down Expand Up @@ -119,6 +124,11 @@ func (w *worker) handleArweaveNativeTransferTransaction(ctx context.Context, tas

// buildArweaveTransactionTransferAction returns the native transfer transaction action.
func (w *worker) buildArweaveTransactionTransferAction(_ context.Context, from, to string, tokenValue *big.Int) (*activityx.Action, error) {
zap.L().Debug("building arweave transaction transfer action",
zap.String("from", from),
zap.String("to", to),
zap.Any("token_value", tokenValue))

action := activityx.Action{
Type: typex.TransactionTransfer,
From: from,
Expand All @@ -128,6 +138,8 @@ func (w *worker) buildArweaveTransactionTransferAction(_ context.Context, from,
},
}

zap.L().Debug("successfully built arweave transaction transfer action")

return &action, nil
}

Expand Down
111 changes: 111 additions & 0 deletions internal/engine/worker/decentralized/core/ethereum/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/sourcegraph/conc/pool"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

var _ engine.Worker = (*worker)(nil)
Expand Down Expand Up @@ -116,6 +117,8 @@ func (w *worker) Transform(ctx context.Context, task engine.Task) (*activityx.Ac
return nil, fmt.Errorf("invalid task type: %T", task)
}

zap.L().Debug("transforming ethereum task", zap.String("task_id", ethereumTask.ID()))

activity, err := task.BuildActivity()
if err != nil {
return nil, fmt.Errorf("build activity: %w", err)
Expand All @@ -129,6 +132,8 @@ func (w *worker) Transform(ctx context.Context, task engine.Task) (*activityx.Ac
}

if w.matchNativeTransferTransaction(ethereumTask) {
zap.L().Debug("handling native transfer transaction")

action, err := w.handleNativeTransferTransaction(ctx, ethereumTask)
if err != nil {
return nil, fmt.Errorf("handle native transfer transaction: %w", err)
Expand All @@ -152,6 +157,13 @@ func (w *worker) Transform(ctx context.Context, task engine.Task) (*activityx.Ac
continue
}

zap.L().Debug("handling ethereum log",
zap.String("task_id", ethereumTask.ID()),
zap.Uint("log_index", log.Index),
zap.String("address", log.Address.String()),
zap.Any("topic", log.Topics[0]),
)

contextPool.Go(func(ctx context.Context) error {
var (
logActions []*activityx.Action
Expand All @@ -161,27 +173,49 @@ func (w *worker) Transform(ctx context.Context, task engine.Task) (*activityx.Ac
switch {
// VSL Bridge
case w.matchL2StandardBridgeWithdrawalInitiatedLog(ethereumTask, log):
zap.L().Debug("handling l2 standard bridge withdrawal initiated log", zap.String("task_id", ethereumTask.ID()))

logActions, err = w.transformL2StandardBridgeWithdrawalInitiatedLog(ctx, ethereumTask, log)
case w.matchL2StandardBridgeDepositFinalizedLog(ethereumTask, log):
zap.L().Debug("handling l2 standard bridge deposit finalized log", zap.String("task_id", ethereumTask.ID()))

logActions, err = w.transformL2StandardBridgeDepositFinalizedLog(ctx, ethereumTask, log)
// VSL Staking
case w.matchStakingVSLDeposited(ethereumTask, log):
zap.L().Debug("handling staking vsl deposited log", zap.String("task_id", ethereumTask.ID()))

logActions, err = w.handleStakingVSLDeposited(ctx, ethereumTask, log)
case w.matchStakingVSLStaked(ethereumTask, log):
zap.L().Debug("handling staking vsl staked log", zap.String("task_id", ethereumTask.ID()))

logActions, err = w.handleStakingVSLStaked(ctx, ethereumTask, log)
case w.matchChipsTransfer(ethereumTask, log):
zap.L().Debug("handling chips mint log", zap.String("task_id", ethereumTask.ID()))

logActions, err = w.handleChipsMint(ctx, ethereumTask, log)
case w.matchERC20TransferLog(ethereumTask, log):
zap.L().Debug("handling erc20 transfer log", zap.String("task_id", ethereumTask.ID()))

logActions, err = w.handleERC20TransferLog(ctx, ethereumTask, log)
case w.matchERC20ApprovalLog(ethereumTask, log):
zap.L().Debug("handling erc20 approval log", zap.String("task_id", ethereumTask.ID()))

logActions, err = w.handleERC20ApproveLog(ctx, ethereumTask, log)
case w.matchERC721TransferLog(ethereumTask, log):
zap.L().Debug("handling erc721 transfer log", zap.String("task_id", ethereumTask.ID()))

logActions, err = w.handleERC721TransferLog(ctx, ethereumTask, log)
case w.matchERC721ApprovalLog(ethereumTask, log):
zap.L().Debug("handling erc721 approval log", zap.String("task_id", ethereumTask.ID()))

logActions, err = w.handleERC721ApproveLog(ctx, ethereumTask, log)
case w.matchERC1155TransferLog(ethereumTask, log):
zap.L().Debug("handling erc1155 transfer log", zap.String("task_id", ethereumTask.ID()))

logActions, err = w.handleERC1155TransferLog(ctx, ethereumTask, log)
case w.matchERC1155ApprovalLog(ethereumTask, log):
zap.L().Debug("handling erc1155 approval log", zap.String("task_id", ethereumTask.ID()))

logActions, err = w.handleERC1155ApproveLog(ctx, ethereumTask, log)
}

Expand Down Expand Up @@ -209,6 +243,8 @@ func (w *worker) Transform(ctx context.Context, task engine.Task) (*activityx.Ac
activity.Type = action.Type
}

zap.L().Debug("successfully transformed ethereum task")

return activity, nil
}

Expand Down Expand Up @@ -528,6 +564,14 @@ func (w *worker) transformL2StandardBridgeDepositFinalizedLog(ctx context.Contex
}

func (w *worker) buildTransactionTransferAction(ctx context.Context, task *source.Task, from, to common.Address, tokenAddress *common.Address, tokenValue *big.Int) (*activityx.Action, error) {
zap.L().Debug("building transaction transfer action",
zap.String("task_id", task.ID()),
zap.String("from", from.String()),
zap.String("to", to.String()),
zap.Any("token_address", tokenAddress),
zap.Any("token_value", tokenValue),
)

chainID, err := network.EthereumChainIDString(task.GetNetwork().String())
if err != nil {
return nil, fmt.Errorf("invalid chain id: %w", err)
Expand Down Expand Up @@ -558,10 +602,20 @@ func (w *worker) buildTransactionTransferAction(ctx context.Context, task *sourc
Metadata: metadata.TransactionTransfer(*tokenMetadata),
}

zap.L().Debug("successfully built transaction transfer action", zap.String("task_id", task.ID()))

return &action, nil
}

func (w *worker) buildTransactionApprovalAction(ctx context.Context, task *source.Task, from, to common.Address, tokenAddress *common.Address, tokenValue *big.Int) (*activityx.Action, error) {
zap.L().Debug("building transaction approval action",
zap.String("task_id", task.ID()),
zap.String("from", from.String()),
zap.String("to", to.String()),
zap.Any("token_address", tokenAddress),
zap.Any("token_value", tokenValue),
)

chainID, err := network.EthereumChainIDString(task.GetNetwork().String())
if err != nil {
return nil, fmt.Errorf("invalid chain id: %w", err)
Expand Down Expand Up @@ -590,10 +644,21 @@ func (w *worker) buildTransactionApprovalAction(ctx context.Context, task *sourc
},
}

zap.L().Debug("successfully built transaction approval action", zap.String("task_id", task.ID()))

return &action, nil
}

func (w *worker) buildCollectibleTransferAction(ctx context.Context, task *source.Task, from, to common.Address, tokenAddress common.Address, tokenID *big.Int, tokenValue *big.Int) (*activityx.Action, error) {
zap.L().Debug("building collectible transfer action",
zap.String("task_id", task.ID()),
zap.String("from", from.String()),
zap.String("to", to.String()),
zap.String("token_address", tokenAddress.String()),
zap.Any("token_id", tokenID),
zap.Any("token_value", tokenValue),
)

chainID, err := network.EthereumChainIDString(task.GetNetwork().String())
if err != nil {
return nil, fmt.Errorf("invalid chain id: %w", err)
Expand Down Expand Up @@ -624,10 +689,21 @@ func (w *worker) buildCollectibleTransferAction(ctx context.Context, task *sourc
Metadata: metadata.CollectibleTransfer(*tokenMetadata),
}

zap.L().Debug("successfully built collectible transfer action", zap.String("task_id", task.ID()))

return &action, nil
}

func (w *worker) buildCollectibleApprovalAction(ctx context.Context, task *source.Task, from common.Address, to common.Address, tokenAddress common.Address, id *big.Int, approved *bool) (*activityx.Action, error) {
zap.L().Debug("building collectible approval action",
zap.String("task_id", task.ID()),
zap.String("from", from.String()),
zap.String("to", to.String()),
zap.String("token_address", tokenAddress.String()),
zap.Any("id", id),
zap.Any("approved", approved),
)

chainID, err := network.EthereumChainIDString(task.GetNetwork().String())
if err != nil {
return nil, fmt.Errorf("invalid chain id: %w", err)
Expand Down Expand Up @@ -661,11 +737,21 @@ func (w *worker) buildCollectibleApprovalAction(ctx context.Context, task *sourc
},
}

zap.L().Debug("successfully built collectible approval action", zap.String("task_id", task.ID()))

return &action, nil
}

// buildExchangeStakingVSLAction builds the exchange staking VSL action.
func (w *worker) buildExchangeStakingVSLAction(ctx context.Context, task *source.Task, from, to common.Address, tokenValue *big.Int, stakingAction metadata.ExchangeStakingAction) (*activityx.Action, error) {
zap.L().Debug("building exchange staking vsl action",
zap.String("task_id", task.ID()),
zap.String("from", from.String()),
zap.String("to", to.String()),
zap.Any("token_value", tokenValue),
zap.String("staking_action", stakingAction.String()),
)

// The Token always is $RSS3.
tokenMetadata, err := w.tokenClient.Lookup(ctx, task.ChainID, nil, nil, task.Header.Number)
if err != nil {
Expand All @@ -685,18 +771,31 @@ func (w *worker) buildExchangeStakingVSLAction(ctx context.Context, task *source
},
}

zap.L().Debug("successfully built exchange staking vsl action", zap.String("task_id", task.ID()))

return &action, nil
}

// buildChipsMintAction builds the ChipsMint action.
func (w *worker) buildChipsMintAction(ctx context.Context, task *source.Task, from, to common.Address, contract common.Address, id *big.Int, value *big.Int) (*activityx.Action, error) {
zap.L().Debug("building chips mint action",
zap.String("task_id", task.ID()),
zap.String("from", from.String()),
zap.String("to", to.String()),
zap.Any("contract", contract),
zap.Any("id", id),
zap.Any("value", value),
)

tokenMetadata, err := w.tokenClient.Lookup(ctx, task.ChainID, &contract, id, task.Header.Number)
if err != nil {
return nil, fmt.Errorf("lookup token metadata: %w", err)
}

tokenMetadata.Value = lo.ToPtr(decimal.NewFromBigInt(value, 0))

zap.L().Debug("successfully built chips mint action", zap.String("task_id", task.ID()))

return &activityx.Action{
Type: typex.CollectibleMint,
Platform: w.Platform(),
Expand All @@ -707,6 +806,16 @@ func (w *worker) buildChipsMintAction(ctx context.Context, task *source.Task, fr
}

func (w *worker) buildTransactionBridgeAction(ctx context.Context, chainID uint64, sender, receiver common.Address, source, target network.Network, bridgeAction metadata.TransactionBridgeAction, tokenAddress *common.Address, tokenValue *big.Int, blockNumber *big.Int) (*activityx.Action, error) {
zap.L().Debug("building transaction bridge action",
zap.String("sender", sender.String()),
zap.String("receiver", receiver.String()),
zap.String("source", source.String()),
zap.String("target", target.String()),
zap.String("bridge_action", bridgeAction.String()),
zap.Any("token_address", tokenAddress),
zap.Any("token_value", tokenValue),
)

tokenMetadata, err := w.tokenClient.Lookup(ctx, chainID, tokenAddress, nil, blockNumber)
if err != nil {
return nil, fmt.Errorf("lookup token %s: %w", tokenAddress, err)
Expand All @@ -727,6 +836,8 @@ func (w *worker) buildTransactionBridgeAction(ctx context.Context, chainID uint6
},
}

zap.L().Debug("successfully built transaction bridge action")

return &action, nil
}

Expand Down
Loading
Loading