diff --git a/state/protocol/badger/mutator.go b/state/protocol/badger/mutator.go index 07bc9eaceb2..7ad5a441c09 100644 --- a/state/protocol/badger/mutator.go +++ b/state/protocol/badger/mutator.go @@ -195,7 +195,7 @@ func (m *FollowerState) ExtendCertified(ctx context.Context, certified *flow.Cer return fmt.Errorf("failed to determine the lastest sealed block in fork: %w", err) } deferredBlockPersist.AddNextOperation(func(lctx lockctx.Proof, blockID flow.Identifier, rw storage.ReaderBatchWriter) error { - return operation.IndexLatestSealAtBlock(lctx, rw.Writer(), blockID, latestSeal.ID()) + return operation.IndexingLatestSealAtBlock(blockID, latestSeal.ID())(lctx, rw) }) // TODO: we might not need the deferred db updates, because the candidate passed into diff --git a/storage/operation/approvals.go b/storage/operation/approvals.go index 08903484a34..70f06ee40aa 100644 --- a/storage/operation/approvals.go +++ b/storage/operation/approvals.go @@ -1,13 +1,9 @@ package operation import ( - "errors" "fmt" - "github.com/jordanschalm/lockctx" - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/storage" ) @@ -37,45 +33,18 @@ func RetrieveResultApproval(r storage.Reader, approvalID flow.Identifier, approv // // Expected error returns: // - `storage.ErrDataMismatch` if a *different* approval for the same key pair (ExecutionResultID, chunk index) is already indexed -func InsertAndIndexResultApproval(approval *flow.ResultApproval) func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { +func InsertAndIndexResultApproval(approval *flow.ResultApproval) Functor { approvalID := approval.ID() resultID := approval.Body.ExecutionResultID chunkIndex := approval.Body.ChunkIndex - // the following functors allow encoding to be done before acquiring the lock - inserting := Upserting(MakePrefix(codeResultApproval, approvalID), approval) - indexing := Upserting(MakePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID) - - return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { - if !lctx.HoldsLock(storage.LockIndexResultApproval) { - return fmt.Errorf("missing lock for index result approval for result: %v", resultID) - } - - var storedApprovalID flow.Identifier - err := LookupResultApproval(rw.GlobalReader(), resultID, chunkIndex, &storedApprovalID) - if err == nil { - if storedApprovalID != approvalID { - return fmt.Errorf("attempting to store conflicting approval (result: %v, chunk index: %d): storing: %v, stored: %v. %w", - resultID, chunkIndex, approvalID, storedApprovalID, storage.ErrDataMismatch) - } - return nil // already stored and indexed - } - if !errors.Is(err, storage.ErrNotFound) { // `storage.ErrNotFound` is expected, as this indicates that no receipt is indexed yet; anything else is an exception - return fmt.Errorf("could not lookup result approval ID: %w", irrecoverable.NewException(err)) - } - - err = inserting(rw.Writer()) - if err != nil { - return fmt.Errorf("could not store result approval: %w", err) - } - - err = indexing(rw.Writer()) - if err != nil { - return fmt.Errorf("could not index result approval: %w", err) - } - - return nil - } + errmsg := fmt.Sprintf("InsertAndIndexResultApproval failed with approvalID %v, chunkIndex %v, resultID %v", + approvalID, chunkIndex, resultID) + return WrapError(errmsg, BindFunctors( + HoldingLock(storage.LockIndexResultApproval), + Overwriting(MakePrefix(codeResultApproval, approvalID), approval), + InsertingWithMismatchCheck(MakePrefix(codeIndexResultApprovalByChunk, resultID, chunkIndex), approvalID), + )) } // LookupResultApproval finds a ResultApproval by result ID and chunk index. diff --git a/storage/operation/functor.go b/storage/operation/functor.go new file mode 100644 index 00000000000..9db016fbafa --- /dev/null +++ b/storage/operation/functor.go @@ -0,0 +1,281 @@ +// Package operation provides functional programming utilities for database operations +// with lock context support. It defines functors that can be composed to create +// complex database operations while ensuring proper lock acquisition and error handling. +package operation + +import ( + "bytes" + "errors" + "fmt" + + "github.com/jordanschalm/lockctx" + "github.com/vmihailenco/msgpack/v4" + + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/utils/merr" +) + +// Functor represents a database operation that requires a lock context proof +// and a batch writer. It encapsulates the pattern of acquiring locks before +// performing database operations and ensures proper error handling. +type Functor func(lockctx.Proof, storage.ReaderBatchWriter) error + +// BindFunctors composes multiple functors into a single functor that executes +// them sequentially. If any functor fails, the execution stops and returns the error. +// This enables functional composition of database operations. +func BindFunctors(functors ...Functor) Functor { + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + for _, fn := range functors { + err := fn(lctx, rw) + if err != nil { + return err + } + } + return nil + } +} + +// HoldingLock creates a functor that validates the lock context holds the specified lock. +// This is used as a guard to ensure operations are only performed when the required lock is held. +// Returns an error if the lock is not held, otherwise returns nil. +func HoldingLock(lockID string) Functor { + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + if !lctx.HoldsLock(lockID) { + return fmt.Errorf("missing required lock: %s", lockID) + } + return nil + } +} + +// WrapError creates a functor that wraps any error returned by the provided functor +// with additional context. This is useful for providing more descriptive error messages +// when composing complex operations. +func WrapError(wrapMsg string, fn Functor) Functor { + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + err := fn(lctx, rw) + if err != nil { + return fmt.Errorf("%s: %w", wrapMsg, err) + } + return nil + } +} + +// Overwriting returns a functor that overwrites a key-value pair in the storage. +// The value is serialized using msgpack encoding. If the key already exists, +// the value will be overwritten without any checks. +// +// This is typically used for operations where we want to update existing data +// or where we don't care about potential conflicts. +func Overwriting(key []byte, val interface{}) Functor { + value, err := msgpack.Marshal(val) + if err != nil { + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } + } + + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + err = rw.Writer().Set(key, value) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) + } + + return nil + } +} + +// OverwritingMul returns a functor that overwrites multiple key-value pairs in the storage. +// The values are serialized using msgpack encoding. If any of the keys already exist, +// the values will be overwritten without any checks. +// +// This is the batch version of Overwriting, useful for operations where we want to +// update multiple entries atomically or where we don't care about potential conflicts. +// The keys and vals slices must have the same length. +func OverwritingMul(keys [][]byte, vals []any) Functor { + if len(keys) != len(vals) { + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + return irrecoverable.NewExceptionf("keys and vals length mismatch: %d vs %d", len(keys), len(vals)) + } + } + + values := make([][]byte, len(vals)) + for i, val := range vals { + value, err := msgpack.Marshal(val) + if err != nil { + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + return irrecoverable.NewExceptionf("failed to encode value at index %d: %w", i, err) + } + } + values[i] = value + } + + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + for i, key := range keys { + err := rw.Writer().Set(key, values[i]) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data at index %d: %w", i, err) + } + } + + return nil + } +} + +// InsertingWithExistenceCheck returns a functor that inserts a key-value pair +// only if the key does not already exist. If the key exists, it returns +// storage.ErrAlreadyExists error. +// +// This is used for operations where we want to ensure uniqueness and prevent +// accidental overwrites of existing data. +func InsertingWithExistenceCheck(key []byte, val interface{}) Functor { + value, err := msgpack.Marshal(val) + if err != nil { + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } + } + + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + exist, err := KeyExists(rw.GlobalReader(), key) + if err != nil { + return irrecoverable.NewExceptionf("failed to check for existing key: %w", err) + } + + if exist { + return fmt.Errorf("attempting to insert existing key: %x: %w", key, storage.ErrAlreadyExists) + } + + err = rw.Writer().Set(key, value) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) + } + + return nil + } +} + +// InsertingWithMismatchCheck returns a functor that inserts a key-value pair +// with conflict detection. If the key already exists, it compares the existing +// value with the new value. If they differ, it returns storage.ErrDataMismatch. +// If they are the same, the operation succeeds without modification. +// +// This is used for operations where we want to ensure data consistency and +// detect potential race conditions or conflicting updates. +func InsertingWithMismatchCheck(key []byte, val interface{}) Functor { + value, err := msgpack.Marshal(val) + if err != nil { + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + return irrecoverable.NewExceptionf("failed to encode value: %w", err) + } + } + + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) (errToReturn error) { + existing, closer, err := rw.GlobalReader().Get(key) + if err != nil { + if !errors.Is(err, storage.ErrNotFound) { + return irrecoverable.NewExceptionf("could not load existing data when inserting new data: %w", err) + } + + // no existing data stored under this key, proceed with insert + err = rw.Writer().Set(key, value) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data: %w", err) + } + + return nil + } + + defer func() { + errToReturn = merr.CloseAndMergeError(closer, errToReturn) + }() + + if !bytes.Equal(existing, value) { + return fmt.Errorf("attempting to insert existing key with different value: %x: %w", key, storage.ErrDataMismatch) + } + + return nil + } +} + +// InsertingMulWithMismatchCheck returns a functor that inserts multiple key-value pairs +// with conflict detection. For each key, if it already exists, it compares the existing +// value with the new value. If they differ, it returns storage.ErrDataMismatch. +// If they are the same, the operation succeeds without modification. +// +// This is the batch version of InsertingWithMismatchCheck, useful for operations where +// we want to ensure data consistency and detect potential race conditions or conflicting +// updates across multiple entries atomically. The keys and vals slices must have the same length. +func InsertingMulWithMismatchCheck(keys [][]byte, vals []any) Functor { + if len(keys) != len(vals) { + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + return irrecoverable.NewExceptionf("keys and vals length mismatch: %d vs %d", len(keys), len(vals)) + } + } + + values := make([][]byte, len(vals)) + for i, val := range vals { + value, err := msgpack.Marshal(val) + if err != nil { + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + return irrecoverable.NewExceptionf("failed to encode value at index %d: %w", i, err) + } + } + values[i] = value + } + + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) (errToReturn error) { + for i, key := range keys { + existing, closer, err := rw.GlobalReader().Get(key) + if err != nil { + if !errors.Is(err, storage.ErrNotFound) { + return irrecoverable.NewExceptionf("could not load existing data when inserting new data at index %d: %x: %w", + i, key, err) + } + + // no existing data stored under this key, proceed with insert + err = rw.Writer().Set(key, values[i]) + if err != nil { + return irrecoverable.NewExceptionf("failed to store data at index %d: %x: %w", i, key, err) + } + + continue + } + + errToReturn = merr.CloseAndMergeError(closer, errToReturn) + if errToReturn != nil { + return errToReturn + } + + if !bytes.Equal(existing, values[i]) { + return fmt.Errorf("attempting to insert existing key with different value at index %d: %x: %w", + i, key, storage.ErrDataMismatch) + } + } + + return nil + } + +} + +// OnCommitSucceedFunctor returns a functor that registers a callback to be executed +// when the database transaction commits successfully. The callback is executed after +// the transaction is committed but before the batch writer is closed. +// +// This is useful for operations that need to perform additional actions (like notifications +// or cache updates) only after the database changes are permanently stored. +func OnCommitSucceedFunctor(callback func()) Functor { + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + storage.OnCommitSucceed(rw, callback) + return nil + } +} + +// NoOpFunctor returns a functor that performs no operation and always succeeds. +// This is useful as a placeholder in functor compositions or when a conditional +// operation needs to be skipped without affecting the overall composition. +func NoOpFunctor() Functor { + return func(lctx lockctx.Proof, rw storage.ReaderBatchWriter) error { + return nil + } +} diff --git a/storage/operation/functor_test.go b/storage/operation/functor_test.go new file mode 100644 index 00000000000..bb253cc11d7 --- /dev/null +++ b/storage/operation/functor_test.go @@ -0,0 +1,530 @@ +package operation_test + +import ( + "testing" + + "github.com/jordanschalm/lockctx" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/storage/operation" + "github.com/onflow/flow-go/storage/operation/dbtest" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestFunctorBindFunctors tests the composition of multiple functors +func TestFunctorBindFunctors(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + + // Test successful composition + t.Run("successful_composition", func(t *testing.T) { + key1 := []byte("success_key1") + key2 := []byte("success_key2") + value1 := "value1" + value2 := "value2" + + composed := operation.BindFunctors( + operation.Overwriting(key1, value1), + operation.Overwriting(key2, value2), + ) + + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return composed(lctx, rw) + }) + }) + require.NoError(t, err) + + // Verify both values were stored + var retrieved1, retrieved2 string + err = operation.RetrieveByKey(db.Reader(), key1, &retrieved1) + require.NoError(t, err) + assert.Equal(t, value1, retrieved1) + + err = operation.RetrieveByKey(db.Reader(), key2, &retrieved2) + require.NoError(t, err) + assert.Equal(t, value2, retrieved2) + }) + + // Test composition with failure + t.Run("composition_with_failure", func(t *testing.T) { + key1 := []byte("composition_key1") + key2 := []byte("composition_key2") + value1 := "value1" + value2 := "value2" + + // First insert key2 to cause conflict + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.InsertingWithExistenceCheck(key2, value2)(lctx, rw) + }) + }) + require.NoError(t, err) + + // Test that the first operation alone works + err = unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.Overwriting(key1, value1)(lctx, rw) + }) + }) + require.NoError(t, err) + + // Verify the first operation worked + var retrieved1 string + err = operation.RetrieveByKey(db.Reader(), key1, &retrieved1) + require.NoError(t, err) + assert.Equal(t, value1, retrieved1) + + // Now test the composition where the second operation fails + composed := operation.BindFunctors( + operation.Overwriting(key1, "new_value1"), + operation.InsertingWithExistenceCheck(key2, "different_value"), + ) + + err = unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return composed(lctx, rw) + }) + }) + require.Error(t, err) + assert.ErrorIs(t, err, storage.ErrAlreadyExists) + + // The first operation should have been executed before the second one failed + // Since the transaction was rolled back, key1 should still have the original value + var retrieved1After string + err = operation.RetrieveByKey(db.Reader(), key1, &retrieved1After) + require.NoError(t, err) + assert.Equal(t, value1, retrieved1After) // Should be the original value, not "new_value1" + }) + }) +} + +// TestFunctorHoldingLock tests the lock validation functor +func TestFunctorHoldingLock(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + + t.Run("valid_lock", func(t *testing.T) { + lockValidator := operation.HoldingLock(storage.LockInsertBlock) + + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return lockValidator(lctx, rw) + }) + }) + require.NoError(t, err) + }) + + t.Run("missing_lock", func(t *testing.T) { + lockValidator := operation.HoldingLock(storage.LockInsertBlock) + + err := unittest.WithLock(t, lockManager, storage.LockFinalizeBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return lockValidator(lctx, rw) + }) + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "missing required lock") + assert.Contains(t, err.Error(), storage.LockInsertBlock) + }) + }) +} + +// TestFunctorWrapError tests error wrapping functionality +func TestFunctorWrapError(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + + t.Run("wrap_successful_operation", func(t *testing.T) { + key := []byte("test_key") + value := "test_value" + + wrapped := operation.WrapError("test operation", operation.Overwriting(key, value)) + + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return wrapped(lctx, rw) + }) + }) + require.NoError(t, err) + + // Verify the operation succeeded + var retrieved string + err = operation.RetrieveByKey(db.Reader(), key, &retrieved) + require.NoError(t, err) + assert.Equal(t, value, retrieved) + }) + + t.Run("wrap_failing_operation", func(t *testing.T) { + key := []byte("existing_key") + value := "test_value" + + // First insert the key + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.InsertingWithExistenceCheck(key, value)(lctx, rw) + }) + }) + require.NoError(t, err) + + // Now try to insert again with wrapped error + wrapped := operation.WrapError("duplicate insert", operation.InsertingWithExistenceCheck(key, "different_value")) + + err = unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return wrapped(lctx, rw) + }) + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "duplicate insert") + assert.ErrorIs(t, err, storage.ErrAlreadyExists) + }) + }) +} + +// TestFunctorOverwriting tests the overwriting functor +func TestFunctorOverwriting(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + + t.Run("overwrite_new_key", func(t *testing.T) { + key := []byte("new_key") + value := "new_value" + + overwrite := operation.Overwriting(key, value) + + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return overwrite(lctx, rw) + }) + }) + require.NoError(t, err) + + // Verify the value was stored + var retrieved string + err = operation.RetrieveByKey(db.Reader(), key, &retrieved) + require.NoError(t, err) + assert.Equal(t, value, retrieved) + }) + + t.Run("overwrite_existing_key", func(t *testing.T) { + key := []byte("existing_key") + originalValue := "original_value" + newValue := "new_value" + + // First insert the original value + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.Overwriting(key, originalValue)(lctx, rw) + }) + }) + require.NoError(t, err) + + // Now overwrite with new value + overwrite := operation.Overwriting(key, newValue) + + err = unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return overwrite(lctx, rw) + }) + }) + require.NoError(t, err) + + // Verify the value was overwritten + var retrieved string + err = operation.RetrieveByKey(db.Reader(), key, &retrieved) + require.NoError(t, err) + assert.Equal(t, newValue, retrieved) + }) + + t.Run("serialization_error", func(t *testing.T) { + key := []byte("test_key") + // Create a value that cannot be serialized (channel) + type Unserializable struct { + Channel chan int + } + unserializable := &Unserializable{Channel: make(chan int)} + + overwrite := operation.Overwriting(key, unserializable) + + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return overwrite(lctx, rw) + }) + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to encode value") + }) + }) +} + +// TestFunctorInsertingWithExistenceCheck tests the existence check functor +func TestFunctorInsertingWithExistenceCheck(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + + t.Run("insert_new_key", func(t *testing.T) { + key := []byte("new_key") + value := "new_value" + + insert := operation.InsertingWithExistenceCheck(key, value) + + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return insert(lctx, rw) + }) + }) + require.NoError(t, err) + + // Verify the value was stored + var retrieved string + err = operation.RetrieveByKey(db.Reader(), key, &retrieved) + require.NoError(t, err) + assert.Equal(t, value, retrieved) + }) + + t.Run("insert_existing_key", func(t *testing.T) { + key := []byte("existing_key") + value := "existing_value" + + // First insert the key + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.InsertingWithExistenceCheck(key, value)(lctx, rw) + }) + }) + require.NoError(t, err) + + // Try to insert the same key again + insert := operation.InsertingWithExistenceCheck(key, "different_value") + + err = unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return insert(lctx, rw) + }) + }) + require.Error(t, err) + assert.ErrorIs(t, err, storage.ErrAlreadyExists) + assert.Contains(t, err.Error(), "attempting to insert existing key") + + // Verify original value is unchanged + var retrieved string + err = operation.RetrieveByKey(db.Reader(), key, &retrieved) + require.NoError(t, err) + assert.Equal(t, value, retrieved) + }) + }) +} + +// TestFunctorInsertingWithMismatchCheck tests the mismatch check functor +func TestFunctorInsertingWithMismatchCheck(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + + t.Run("insert_new_key", func(t *testing.T) { + key := []byte("new_key") + value := "new_value" + + insert := operation.InsertingWithMismatchCheck(key, value) + + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return insert(lctx, rw) + }) + }) + require.NoError(t, err) + + // Verify the value was stored + var retrieved string + err = operation.RetrieveByKey(db.Reader(), key, &retrieved) + require.NoError(t, err) + assert.Equal(t, value, retrieved) + }) + + t.Run("insert_same_value", func(t *testing.T) { + key := []byte("existing_key") + value := "same_value" + + // First insert the key + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.InsertingWithMismatchCheck(key, value)(lctx, rw) + }) + }) + require.NoError(t, err) + + // Try to insert the same value again + insert := operation.InsertingWithMismatchCheck(key, value) + + err = unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return insert(lctx, rw) + }) + }) + require.NoError(t, err) // Should succeed since values are the same + + // Verify value is unchanged + var retrieved string + err = operation.RetrieveByKey(db.Reader(), key, &retrieved) + require.NoError(t, err) + assert.Equal(t, value, retrieved) + }) + + t.Run("insert_different_value", func(t *testing.T) { + key := []byte("conflict_key") + originalValue := "original_value" + conflictValue := "conflict_value" + + // First insert the original value + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return operation.InsertingWithMismatchCheck(key, originalValue)(lctx, rw) + }) + }) + require.NoError(t, err) + + // Try to insert a different value + insert := operation.InsertingWithMismatchCheck(key, conflictValue) + + err = unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return insert(lctx, rw) + }) + }) + require.Error(t, err) + assert.ErrorIs(t, err, storage.ErrDataMismatch) + assert.Contains(t, err.Error(), "attempting to insert existing key with different value") + + // Verify original value is unchanged + var retrieved string + err = operation.RetrieveByKey(db.Reader(), key, &retrieved) + require.NoError(t, err) + assert.Equal(t, originalValue, retrieved) + }) + }) +} + +// TestFunctorComplexComposition tests complex functor compositions +func TestFunctorComplexComposition(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + + t.Run("complex_successful_operation", func(t *testing.T) { + key1 := []byte("key1") + key2 := []byte("key2") + key3 := []byte("key3") + value1 := "value1" + value2 := "value2" + value3 := "value3" + + // Create a complex operation that: + // 1. Validates lock is held + // 2. Inserts key1 with existence check + // 3. Overwrites key2 + // 4. Inserts key3 with mismatch check + complexOp := operation.BindFunctors( + operation.HoldingLock(storage.LockInsertBlock), + operation.InsertingWithExistenceCheck(key1, value1), + operation.Overwriting(key2, value2), + operation.InsertingWithMismatchCheck(key3, value3), + ) + + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return complexOp(lctx, rw) + }) + }) + require.NoError(t, err) + + // Verify all operations succeeded + var retrieved1, retrieved2, retrieved3 string + err = operation.RetrieveByKey(db.Reader(), key1, &retrieved1) + require.NoError(t, err) + assert.Equal(t, value1, retrieved1) + + err = operation.RetrieveByKey(db.Reader(), key2, &retrieved2) + require.NoError(t, err) + assert.Equal(t, value2, retrieved2) + + err = operation.RetrieveByKey(db.Reader(), key3, &retrieved3) + require.NoError(t, err) + assert.Equal(t, value3, retrieved3) + }) + + t.Run("complex_operation_with_wrapped_errors", func(t *testing.T) { + key1 := []byte("complex_key1") + key2 := []byte("complex_key2") + value1 := "value1" + value2 := "value2" + + // Create a complex operation with wrapped errors + complexOp := operation.BindFunctors( + operation.WrapError("lock validation", operation.HoldingLock(storage.LockInsertBlock)), + operation.WrapError("first insert", operation.InsertingWithExistenceCheck(key1, value1)), + operation.WrapError("second insert", operation.InsertingWithExistenceCheck(key2, value2)), + ) + + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return complexOp(lctx, rw) + }) + }) + require.NoError(t, err) + + // Verify both values were stored + var retrieved1, retrieved2 string + err = operation.RetrieveByKey(db.Reader(), key1, &retrieved1) + require.NoError(t, err) + assert.Equal(t, value1, retrieved1) + + err = operation.RetrieveByKey(db.Reader(), key2, &retrieved2) + require.NoError(t, err) + assert.Equal(t, value2, retrieved2) + }) + }) +} + +// TestFunctorErrorHandling tests error handling in various scenarios +func TestFunctorErrorHandling(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + + t.Run("irrecoverable_exception_on_serialization", func(t *testing.T) { + key := []byte("test_key") + // Create a value that causes serialization to fail + type Unserializable struct { + Channel chan int + } + unserializable := &Unserializable{Channel: make(chan int)} + + overwrite := operation.Overwriting(key, unserializable) + + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return overwrite(lctx, rw) + }) + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to encode value") + }) + + t.Run("irrecoverable_exception_on_storage", func(t *testing.T) { + // This test is more theoretical since we can't easily simulate storage failures + // in the test environment, but we can test the error path exists + key := []byte("test_key") + value := "test_value" + + overwrite := operation.Overwriting(key, value) + + // The operation should succeed in normal conditions + err := unittest.WithLock(t, lockManager, storage.LockInsertBlock, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + return overwrite(lctx, rw) + }) + }) + require.NoError(t, err) + }) + }) +} diff --git a/storage/operation/guarantees.go b/storage/operation/guarantees.go index c518d394b36..7e3a73abee3 100644 --- a/storage/operation/guarantees.go +++ b/storage/operation/guarantees.go @@ -10,6 +10,45 @@ import ( "github.com/onflow/flow-go/storage" ) +// InsertAndIndexGuarantee creates a functor that atomically inserts a collection guarantee +// and creates an index mapping from collection ID to guarantee ID. +// Caller must ensure guaranteeID equals to guarantee.ID() +// Caller must acquire the [storage.LockInsertBlock] lock +// It returns [storage.ErrDataMismatch] if a different guarantee is already indexed for the collection +func InsertAndIndexGuarantee(guaranteeID flow.Identifier, guarantee *flow.CollectionGuarantee) Functor { + errmsg := fmt.Sprintf("InsertAndIndexGuarantee failed with guaranteeID %v, collectionID %v", + guaranteeID, guarantee.CollectionID) + return WrapError(errmsg, BindFunctors( + HoldingLock(storage.LockInsertBlock), + Overwriting(MakePrefix(codeGuarantee, guaranteeID), guarantee), + InsertingWithMismatchCheck(MakePrefix(codeGuaranteeByCollectionID, guarantee.CollectionID), guaranteeID), + )) +} + +type CollectionGuaranteeWithID struct { + GuaranteeID flow.Identifier + *flow.CollectionGuarantee +} + +func InsertAndIndexGuarantees(guaranteesWithID []*CollectionGuaranteeWithID) Functor { + guaranteeIDKeys, guarantees := make([][]byte, 0, len(guaranteesWithID)), make([]any, 0, len(guaranteesWithID)) + collectionIDKeys, guaranteeIDs := make([][]byte, 0, len(guaranteesWithID)), make([]any, 0, len(guaranteesWithID)) + + for _, g := range guaranteesWithID { + guaranteeIDKeys = append(guaranteeIDKeys, MakePrefix(codeGuarantee, g.GuaranteeID)) + guarantees = append(guarantees, g.CollectionGuarantee) + + collectionIDKeys = append(collectionIDKeys, MakePrefix(codeGuaranteeByCollectionID, g.CollectionID)) + guaranteeIDs = append(guaranteeIDs, g.GuaranteeID) + } + + return WrapError("InsertAndIndexGuarantees failed", BindFunctors( + HoldingLock(storage.LockInsertBlock), + WrapError("insert guarantee failed", OverwritingMul(guaranteeIDKeys, guarantees)), + WrapError("index guarantee failed", InsertingMulWithMismatchCheck(collectionIDKeys, guaranteeIDs)), + )) +} + // InsertGuarantee inserts a collection guarantee by ID. // // CAUTION: The caller must ensure guaranteeID is a collision-resistant hash of the provided diff --git a/storage/operation/payload.go b/storage/operation/payload.go index e7f10bbdd7a..60d933f39e6 100644 --- a/storage/operation/payload.go +++ b/storage/operation/payload.go @@ -177,6 +177,13 @@ func IndexLatestSealAtBlock(lctx lockctx.Proof, w storage.Writer, blockID flow.I return UpsertByKey(w, MakePrefix(codeBlockIDToLatestSealID, blockID), sealID) } +func IndexingLatestSealAtBlock(blockID flow.Identifier, sealID flow.Identifier) Functor { + return BindFunctors( + HoldingLock(storage.LockInsertBlock), + Overwriting(MakePrefix(codeBlockIDToLatestSealID, blockID), sealID), + ) +} + // LookupLatestSealAtBlock finds the highest seal that was included in the fork up to (and including) blockID. // Frequently, the highest seal included in this block's payload. However, if there are no seals in // this block, sealID should reference the highest seal in blockID's ancestors. diff --git a/storage/operation/writes.go b/storage/operation/writes.go index 920cc232d3d..b46d45c706b 100644 --- a/storage/operation/writes.go +++ b/storage/operation/writes.go @@ -30,24 +30,6 @@ func UpsertByKey(w storage.Writer, key []byte, val interface{}) error { return nil } -// Upserting returns a functor, whose execution will append the given key-value-pair to the provided -// storage writer (typically a pending batch of database writes). -func Upserting(key []byte, val interface{}) func(storage.Writer) error { - value, err := msgpack.Marshal(val) - return func(w storage.Writer) error { - if err != nil { - return irrecoverable.NewExceptionf("failed to encode value: %w", err) - } - - err = w.Set(key, value) - if err != nil { - return irrecoverable.NewExceptionf("failed to store data: %w", err) - } - - return nil - } -} - // RemoveByKey removes the entity with the given key, if it exists. If it doesn't // exist, this is a no-op. // Error returns: diff --git a/storage/store/approvals.go b/storage/store/approvals.go index e53ccd7e8b8..e90842e954e 100644 --- a/storage/store/approvals.go +++ b/storage/store/approvals.go @@ -70,10 +70,6 @@ func (r *ResultApprovals) StoreMyApproval(approval *flow.ResultApproval) func(lc storing := operation.InsertAndIndexResultApproval(approval) return func(lctx lockctx.Proof) error { - if !lctx.HoldsLock(storage.LockIndexResultApproval) { - return fmt.Errorf("missing lock for index result approval") - } - return r.db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { storage.OnCommitSucceed(rw, func() { // the success callback is called after the lock is released, so diff --git a/storage/store/guarantees.go b/storage/store/guarantees.go index 45e5b5eab85..e098d1faec0 100644 --- a/storage/store/guarantees.go +++ b/storage/store/guarantees.go @@ -3,8 +3,6 @@ package store import ( "fmt" - "github.com/jordanschalm/lockctx" - "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/metrics" @@ -34,24 +32,12 @@ func NewGuarantees( byCollectionIDCacheSize uint, ) *Guarantees { - storeByGuaranteeIDWithLock := func(rw storage.ReaderBatchWriter, guaranteeID flow.Identifier, guarantee *flow.CollectionGuarantee) error { - return operation.InsertGuarantee(rw.Writer(), guaranteeID, guarantee) - } - retrieveByGuaranteeID := func(r storage.Reader, guaranteeID flow.Identifier) (*flow.CollectionGuarantee, error) { var guarantee flow.CollectionGuarantee err := operation.RetrieveGuarantee(r, guaranteeID, &guarantee) return &guarantee, err } - // While a collection guarantee can only be present once in the finalized chain, - // across different consensus forks we may encounter the same guarantee multiple times. - // On the happy path there is a 1:1 correspondence between CollectionGuarantees and Collections. - // However, the finalization status of guarantees is not yet verified by consensus nodes, - // nor is the possibility of byzantine collection nodes dealt with, so we check here that - // there are no conflicting guarantees for the same collection. - indexByCollectionID := operation.IndexGuarantee - lookupByCollectionID := func(r storage.Reader, collID flow.Identifier) (flow.Identifier, error) { var guaranteeID flow.Identifier err := operation.LookupGuarantee(r, collID, &guaranteeID) @@ -65,33 +51,56 @@ func NewGuarantees( db: db, cache: newCache(collector, metrics.ResourceGuarantee, withLimit[flow.Identifier, *flow.CollectionGuarantee](cacheSize), - withStore(storeByGuaranteeIDWithLock), withRetrieve(retrieveByGuaranteeID)), byCollectionIdCache: newCache[flow.Identifier, flow.Identifier](collector, metrics.ResourceGuaranteeByCollectionID, withLimit[flow.Identifier, flow.Identifier](byCollectionIDCacheSize), - withStoreWithLock(indexByCollectionID), withRetrieve(lookupByCollectionID)), } return g } -func (g *Guarantees) storeTx(lctx lockctx.Proof, rw storage.ReaderBatchWriter, guarantee *flow.CollectionGuarantee) error { - guaranteeID := guarantee.ID() - err := g.cache.PutTx(rw, guaranteeID, guarantee) - if err != nil { - return err +func (g *Guarantees) storeGuarantees(guarantees []*flow.CollectionGuarantee) operation.Functor { + if len(guarantees) == 0 { + return operation.NoOpFunctor() } - err = g.byCollectionIdCache.PutWithLockTx(lctx, rw, guarantee.CollectionID, guaranteeID) - if err != nil { - return fmt.Errorf("could not index guarantee %x under collection %x: %w", - guaranteeID, guarantee.CollectionID[:], err) + guaranteesWithID := make([]*operation.CollectionGuaranteeWithID, 0, len(guarantees)) + for _, guarantee := range guarantees { + guaranteesWithID = append(guaranteesWithID, &operation.CollectionGuaranteeWithID{ + GuaranteeID: guarantee.ID(), + CollectionGuarantee: guarantee, + }) } - return nil + // While a collection guarantee can only be present once in the finalized chain, + // across different consensus forks we may encounter the same guarantee multiple times. + // On the happy path there is a 1:1 correspondence between CollectionGuarantees and Collections. + // However, the finalization status of guarantees is not yet verified by consensus nodes, + // nor is the possibility of byzantine collection nodes dealt with, so we check here that + // there are no conflicting guarantees for the same collection. + return operation.BindFunctors( + operation.InsertAndIndexGuarantees(guaranteesWithID), + operation.OnCommitSucceedFunctor(func() { + for _, gd := range guaranteesWithID { + g.cache.Insert(gd.GuaranteeID, gd.CollectionGuarantee) + g.byCollectionIdCache.Insert(gd.CollectionGuarantee.CollectionID, gd.GuaranteeID) + } + }), + ) } +// func (g *Guarantees) storeGuarantee(guarantee *flow.CollectionGuarantee) operation.Functor { +// guaranteeID := guarantee.ID() +// return operation.BindFunctors( +// operation.InsertAndIndexGuarantee(guaranteeID, guarantee), +// operation.OnCommitSucceedFunctor(func() { +// g.cache.Insert(guaranteeID, guarantee) +// g.byCollectionIdCache.Insert(guarantee.CollectionID, guaranteeID) +// }), +// ) +// } + func (g *Guarantees) retrieveTx(guaranteeID flow.Identifier) (*flow.CollectionGuarantee, error) { val, err := g.cache.Get(g.db.Reader(), guaranteeID) if err != nil { diff --git a/storage/store/payloads.go b/storage/store/payloads.go index fc5562d7f36..63e91ad2252 100644 --- a/storage/store/payloads.go +++ b/storage/store/payloads.go @@ -63,11 +63,9 @@ func (p *Payloads) storeTx(lctx lockctx.Proof, rw storage.ReaderBatchWriter, blo } // make sure all payload guarantees are stored - for _, guarantee := range payload.Guarantees { - err := p.guarantees.storeTx(lctx, rw, guarantee) - if err != nil { - return fmt.Errorf("could not store guarantee: %w", err) - } + err = p.guarantees.storeGuarantees(payload.Guarantees)(lctx, rw) + if err != nil { + return fmt.Errorf("could not store guarantees: %w", err) } // make sure all payload seals are stored