Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion state/protocol/badger/mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (m *FollowerState) ExtendCertified(ctx context.Context, certified *flow.Cer
return fmt.Errorf("failed to determine the lastest sealed block in fork: %w", err)
}
deferredBlockPersist.AddNextOperation(func(lctx lockctx.Proof, blockID flow.Identifier, rw storage.ReaderBatchWriter) error {
return operation.IndexLatestSealAtBlock(lctx, rw.Writer(), blockID, latestSeal.ID())
return operation.IndexingLatestSealAtBlock(blockID, latestSeal.ID())(lctx, rw)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We mentioned that any database operation requiring a lock context could be refactored using the functor pattern, but I think this case might be an exception—even after applying the refactor.

The functor isn’t particularly useful here since we don’t have the block ID until we start executing the deferred database operations.

I went ahead and refactored it anyway to illustrate my point, but in this case, it doesn’t provide any performance benefits over the original version and only adds unnecessary complexity.

Thoughts?

})

// TODO: we might not need the deferred db updates, because the candidate passed into
Expand Down
47 changes: 8 additions & 39 deletions storage/operation/approvals.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
package operation

import (
"errors"
"fmt"

"github.com/jordanschalm/lockctx"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/storage"
)

Expand Down Expand Up @@ -37,45 +33,18 @@ func RetrieveResultApproval(r storage.Reader, approvalID flow.Identifier, approv
//
// Expected error returns:
// - `storage.ErrDataMismatch` if a *different* approval for the same key pair (ExecutionResultID, chunk index) is already indexed
func InsertAndIndexResultApproval(approval *flow.ResultApproval) func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
func InsertAndIndexResultApproval(approval *flow.ResultApproval) Functor {
approvalID := approval.ID()
resultID := approval.Body.ExecutionResultID
chunkIndex := approval.Body.ChunkIndex

// the following functors allow encoding to be done before acquiring the lock
inserting := Upserting(MakePrefix(codeResultApproval, approvalID), approval)
indexing := Upserting(MakePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID)

return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
if !lctx.HoldsLock(storage.LockIndexResultApproval) {
return fmt.Errorf("missing lock for index result approval for result: %v", resultID)
}

var storedApprovalID flow.Identifier
err := LookupResultApproval(rw.GlobalReader(), resultID, chunkIndex, &storedApprovalID)
if err == nil {
if storedApprovalID != approvalID {
return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w",
resultID, chunkIndex, approvalID, storedApprovalID, storage.ErrDataMismatch)
}
return nil // already stored and indexed
}
if !errors.Is(err, storage.ErrNotFound) { // `storage.ErrNotFound` is expected, as this indicates that no receipt is indexed yet; anything else is an exception
return fmt.Errorf("could not lookup result approval ID: %w", irrecoverable.NewException(err))
}

err = inserting(rw.Writer())
if err != nil {
return fmt.Errorf("could not store result approval: %w", err)
}

err = indexing(rw.Writer())
if err != nil {
return fmt.Errorf("could not index result approval: %w", err)
}

return nil
}
errmsg := fmt.Sprintf("InsertAndIndexResultApproval failed with approvalID %v, chunkIndex %v, resultID %v",
approvalID, chunkIndex, resultID)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the err message, the functors like Overwriting and InsertingWIthMismatchCheck is too general that doesn't have the context. So I used WrapError to include more context.

return WrapError(errmsg, BindFunctors(
HoldingLock(storage.LockIndexResultApproval),
Overwriting(MakePrefix(codeResultApproval, approvalID), approval),
InsertingWithMismatchCheck(MakePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID),
))
}

// LookupResultApproval finds a ResultApproval by result ID and chunk index.
Expand Down
281 changes: 281 additions & 0 deletions storage/operation/functor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
// Package operation provides functional programming utilities for database operations
// with lock context support. It defines functors that can be composed to create
// complex database operations while ensuring proper lock acquisition and error handling.
package operation

import (
"bytes"
"errors"
"fmt"

"github.com/jordanschalm/lockctx"
"github.com/vmihailenco/msgpack/v4"

"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/merr"
)

// Functor represents a database operation that requires a lock context proof
// and a batch writer. It encapsulates the pattern of acquiring locks before
// performing database operations and ensures proper error handling.
type Functor func(lockctx.Proof, storage.ReaderBatchWriter) error

// BindFunctors composes multiple functors into a single functor that executes
// them sequentially. If any functor fails, the execution stops and returns the error.
// This enables functional composition of database operations.
func BindFunctors(functors ...Functor) Functor {
return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
for _, fn := range functors {
err := fn(lctx, rw)
if err != nil {
return err
}
}
return nil
}
}

// HoldingLock creates a functor that validates the lock context holds the specified lock.
// This is used as a guard to ensure operations are only performed when the required lock is held.
// Returns an error if the lock is not held, otherwise returns nil.
func HoldingLock(lockID string) Functor {
return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
if !lctx.HoldsLock(lockID) {
return fmt.Errorf("missing required lock: %s", lockID)
}
return nil
}
}

// WrapError creates a functor that wraps any error returned by the provided functor
// with additional context. This is useful for providing more descriptive error messages
// when composing complex operations.
func WrapError(wrapMsg string, fn Functor) Functor {
return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
err := fn(lctx, rw)
if err != nil {
return fmt.Errorf("%s: %w", wrapMsg, err)
}
return nil
}
}

// Overwriting returns a functor that overwrites a key-value pair in the storage.
// The value is serialized using msgpack encoding. If the key already exists,
// the value will be overwritten without any checks.
//
// This is typically used for operations where we want to update existing data
// or where we don't care about potential conflicts.
func Overwriting(key []byte, val interface{}) Functor {
value, err := msgpack.Marshal(val)
if err != nil {
return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
return irrecoverable.NewExceptionf("failed to encode value: %w", err)
}
}

return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
err = rw.Writer().Set(key, value)
if err != nil {
return irrecoverable.NewExceptionf("failed to store data: %w", err)
}

return nil
}
}

// OverwritingMul returns a functor that overwrites multiple key-value pairs in the storage.
// The values are serialized using msgpack encoding. If any of the keys already exist,
// the values will be overwritten without any checks.
//
// This is the batch version of Overwriting, useful for operations where we want to
// update multiple entries atomically or where we don't care about potential conflicts.
// The keys and vals slices must have the same length.
func OverwritingMul(keys [][]byte, vals []any) Functor {
if len(keys) != len(vals) {
return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
return irrecoverable.NewExceptionf("keys and vals length mismatch: %d vs %d", len(keys), len(vals))
}
}

values := make([][]byte, len(vals))
for i, val := range vals {
value, err := msgpack.Marshal(val)
if err != nil {
return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
return irrecoverable.NewExceptionf("failed to encode value at index %d: %w", i, err)
}
}
values[i] = value
}

return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
for i, key := range keys {
err := rw.Writer().Set(key, values[i])
if err != nil {
return irrecoverable.NewExceptionf("failed to store data at index %d: %w", i, err)
}
}

return nil
}
}

// InsertingWithExistenceCheck returns a functor that inserts a key-value pair
// only if the key does not already exist. If the key exists, it returns
// storage.ErrAlreadyExists error.
//
// This is used for operations where we want to ensure uniqueness and prevent
// accidental overwrites of existing data.
func InsertingWithExistenceCheck(key []byte, val interface{}) Functor {
value, err := msgpack.Marshal(val)
if err != nil {
return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
return irrecoverable.NewExceptionf("failed to encode value: %w", err)
}
}

return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
exist, err := KeyExists(rw.GlobalReader(), key)
if err != nil {
return irrecoverable.NewExceptionf("failed to check for existing key: %w", err)
}

if exist {
return fmt.Errorf("attempting to insert existing key: %x: %w", key, storage.ErrAlreadyExists)
}

err = rw.Writer().Set(key, value)
if err != nil {
return irrecoverable.NewExceptionf("failed to store data: %w", err)
}

return nil
}
}

// InsertingWithMismatchCheck returns a functor that inserts a key-value pair
// with conflict detection. If the key already exists, it compares the existing
// value with the new value. If they differ, it returns storage.ErrDataMismatch.
// If they are the same, the operation succeeds without modification.
//
// This is used for operations where we want to ensure data consistency and
// detect potential race conditions or conflicting updates.
func InsertingWithMismatchCheck(key []byte, val interface{}) Functor {
value, err := msgpack.Marshal(val)
if err != nil {
return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
return irrecoverable.NewExceptionf("failed to encode value: %w", err)
}
}

return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) (errToReturn error) {
existing, closer, err := rw.GlobalReader().Get(key)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return irrecoverable.NewExceptionf("could not load existing data when inserting new data: %w", err)
}

// no existing data stored under this key, proceed with insert
err = rw.Writer().Set(key, value)
if err != nil {
return irrecoverable.NewExceptionf("failed to store data: %w", err)
}

return nil
}

defer func() {
errToReturn = merr.CloseAndMergeError(closer, errToReturn)
}()

if !bytes.Equal(existing, value) {
return fmt.Errorf("attempting to insert existing key with different value: %x: %w", key, storage.ErrDataMismatch)
}

return nil
}
}

// InsertingMulWithMismatchCheck returns a functor that inserts multiple key-value pairs
// with conflict detection. For each key, if it already exists, it compares the existing
// value with the new value. If they differ, it returns storage.ErrDataMismatch.
// If they are the same, the operation succeeds without modification.
//
// This is the batch version of InsertingWithMismatchCheck, useful for operations where
// we want to ensure data consistency and detect potential race conditions or conflicting
// updates across multiple entries atomically. The keys and vals slices must have the same length.
func InsertingMulWithMismatchCheck(keys [][]byte, vals []any) Functor {
if len(keys) != len(vals) {
return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
return irrecoverable.NewExceptionf("keys and vals length mismatch: %d vs %d", len(keys), len(vals))
}
}

values := make([][]byte, len(vals))
for i, val := range vals {
value, err := msgpack.Marshal(val)
if err != nil {
return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
return irrecoverable.NewExceptionf("failed to encode value at index %d: %w", i, err)
}
}
values[i] = value
}

return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) (errToReturn error) {
for i, key := range keys {
existing, closer, err := rw.GlobalReader().Get(key)
if err != nil {
if !errors.Is(err, storage.ErrNotFound) {
return irrecoverable.NewExceptionf("could not load existing data when inserting new data at index %d: %x: %w",
i, key, err)
}

// no existing data stored under this key, proceed with insert
err = rw.Writer().Set(key, values[i])
if err != nil {
return irrecoverable.NewExceptionf("failed to store data at index %d: %x: %w", i, key, err)
}

continue
}

errToReturn = merr.CloseAndMergeError(closer, errToReturn)
if errToReturn != nil {
return errToReturn
}

if !bytes.Equal(existing, values[i]) {
return fmt.Errorf("attempting to insert existing key with different value at index %d: %x: %w",
i, key, storage.ErrDataMismatch)
}
}

return nil
}

}

// OnCommitSucceedFunctor returns a functor that registers a callback to be executed
// when the database transaction commits successfully. The callback is executed after
// the transaction is committed but before the batch writer is closed.
//
// This is useful for operations that need to perform additional actions (like notifications
// or cache updates) only after the database changes are permanently stored.
func OnCommitSucceedFunctor(callback func()) Functor {
return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
storage.OnCommitSucceed(rw, callback)
return nil
}
}

// NoOpFunctor returns a functor that performs no operation and always succeeds.
// This is useful as a placeholder in functor compositions or when a conditional
// operation needs to be skipped without affecting the overall composition.
func NoOpFunctor() Functor {
return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error {
return nil
}
}
Loading
Loading