diff --git a/.gitignore b/.gitignore index 50e1295..656705b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .envrc .env.release .idea +.vscode .DS_Store .release_notes.md dist/ diff --git a/CHANGELOG.md b/CHANGELOG.md index e026567..9db0dc1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,14 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +### DatabaseChanges mode improvements + +* Added support for delta update operations (`add`/`sub`/`min`/`max`/`set_if_null`) on rows for PostgreSQL. These operations allow atomic increments, decrements, and conditional updates. Requires latest [substreams-sink-database-changes](https://github.com/streamingfast/substreams-sink-database-changes) Rust crate version (`>= 4.0.0`). + +* Relaxed UpdateOp transition rules: `ADD`, `MAX`, `MIN`, and `SET_IF_NULL` operations can now be followed by a `SET` operation. Previously, non-SET operations could only be followed by the same operation type. + ## v4.11.3 ### DatabaseChanges mode improvements diff --git a/README.md b/README.md index 47b16a2..56cb779 100644 --- a/README.md +++ b/README.md @@ -106,7 +106,7 @@ The schema file should contain `CREATE TABLE IF NOT EXISTS` statements to ensure ### Network -Your Substreams manifest defines which network to connect to by default. For example, a manifest configured for `mainnet` will connect to the `mainnet.eth.streamingfast.io:443` endpoint automatically. +Your Substreams manifest defines which network to connect to by default. For example, a manifest configured for `mainnet` will connect to the `mainnet.eth.streamingfast.io:443` endpoint automatically. You can override the default endpoint in two ways: - **Command line flag**: Use `-e another.endpoint:443` when running the sink @@ -195,7 +195,7 @@ The `substreams-sink-sql` accepts two types of Substreams output modules: #### Database Changes Modules -For the **Database Changes** approach, your module output type must be [`sf.substreams.sink.database.v1.DatabaseChanges`](https://github.com/streamingfast/substreams-sink-database-changes/blob/develop/proto/sf/substreams/sink/database/v1/database.proto#L7). +For the **Database Changes** approach, your module output type must be [`sf.substreams.sink.database.v1.DatabaseChanges`](https://github.com/streamingfast/substreams-sink-database-changes/blob/develop/proto/sf/substreams/sink/database/v1/database.proto#L7). **Development Resources:** - **Rust**: Use the [`substreams-database-change`](https://github.com/streamingfast/substreams-database-change) crate for bindings and helpers @@ -203,6 +203,38 @@ For the **Database Changes** approach, your module output type must be [`sf.subs By convention, the module that emits `DatabaseChanges` is named `db_out`. +##### Postgres Delta Update Operations + +When using the **Database Changes** approach with PostgreSQL, you can use delta update operations to perform atomic increments, decrements, and conditional updates on numeric and nullable columns. These operations are particularly useful for aggregations and counters that need to be updated across multiple blocks. + +> [!NOTE] +> Delta operations are currently supported only on PostgreSQL. ClickHouse support is not available at this time. + +**Available Operations:** + +| Operation | SQL Equivalent | Description | +|-----------|----------------|-------------| +| `add` | `column = COALESCE(column, 0) + value` | Atomically add to a column | +| `sub` | `column = COALESCE(column, 0) - value` | Atomically subtract from a column | +| `max` | `column = GREATEST(column, value)` | Keep the maximum value | +| `min` | `column = LEAST(column, value)` | Keep the minimum value | +| `set_if_null` | `column = COALESCE(column, value)` | Set only if column is NULL | + +**Rust Example:** + +```rust +tables.upsert_row("Account", id) + .set("owner", owner) + .add("balance", 100i64) // column = COALESCE(column, 0) + 100 + .sub("debt", 50i64) // column = COALESCE(column, 0) - 50 + .max("high_score", score) // column = GREATEST(column, score) + .min("best_time", duration) // column = LEAST(column, duration) + .set_if_null("created_at", timestamp); // column = COALESCE(column, timestamp) +``` + +> [!IMPORTANT] +> Delta update operations require [substreams-sink-database-changes](https://github.com/streamingfast/substreams-sink-database-changes) Rust crate version `>= 4.0.0`. + #### Relational Mappings Modules For the **Relational Mappings** approach, your module can output any Protobuf message type. The sink automatically extracts table and row data from your Protobuf messages using annotations and field mappings. diff --git a/db_changes/db/dialect_clickhouse.go b/db_changes/db/dialect_clickhouse.go index 2fb4ea7..0d182ef 100644 --- a/db_changes/db/dialect_clickhouse.go +++ b/db_changes/db/dialect_clickhouse.go @@ -262,9 +262,10 @@ func convertOpToClickhouseValues(o *Operation) ([]any, error) { values := make([]any, len(o.data)) for i, v := range columns { if col, exists := o.table.columnsByName[v]; exists { - convertedType, err := convertToType(o.data[v], col.scanType) + fieldData := o.data[v] + convertedType, err := convertToType(fieldData.Value, col.scanType) if err != nil { - return nil, fmt.Errorf("converting value %q to type %q in column %q: %w", o.data[v], col.scanType, v, err) + return nil, fmt.Errorf("converting value %q to type %q in column %q: %w", fieldData.Value, col.scanType, v, err) } values[i] = convertedType } else { diff --git a/db_changes/db/dialect_postgres.go b/db_changes/db/dialect_postgres.go index 6cf0449..6dd3c7c 100644 --- a/db_changes/db/dialect_postgres.go +++ b/db_changes/db/dialect_postgres.go @@ -385,10 +385,30 @@ func (d PostgresDialect) saveRow(op, schema, escapedTableName string, primaryKey } +// getResultCast returns the appropriate cast suffix for the result of arithmetic operations +// based on the column's scan type. TEXT columns need ::text cast, numeric types don't need cast. +func getResultCast(scanType reflect.Type) string { + if scanType == nil { + return "" // unknown type, let PostgreSQL handle it + } + switch scanType.Kind() { + case reflect.String: + return "::text" // TEXT columns need explicit cast from numeric + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, + reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, + reflect.Float32, reflect.Float64: + return "" // numeric types don't need cast, PostgreSQL will handle it + default: + return "" // unknown type, let PostgreSQL handle it + } +} + func (d *PostgresDialect) prepareStatement(schema string, o *Operation) (normalQuery string, undoQuery string, err error) { var columns, values []string + var updateOps []UpdateOp + var scanTypes []reflect.Type if o.opType == OperationTypeInsert || o.opType == OperationTypeUpsert || o.opType == OperationTypeUpdate { - columns, values, err = d.prepareColValues(o.table, o.data) + columns, values, updateOps, scanTypes, err = d.prepareColValues(o.table, o.data) if err != nil { return "", "", fmt.Errorf("preparing column & values: %w", err) } @@ -415,9 +435,30 @@ func (d *PostgresDialect) prepareStatement(schema string, o *Operation) (normalQ return insertQuery, "", nil case OperationTypeUpsert: + // Build per-field update expressions based on UpdateOp updates := make([]string, len(columns)) for i := range columns { - updates[i] = fmt.Sprintf("%s=EXCLUDED.%s", columns[i], columns[i]) + col := columns[i] + resultCast := getResultCast(scanTypes[i]) + switch updateOps[i] { + case UpdateOpSet: + // Direct assignment: col = EXCLUDED.col + updates[i] = fmt.Sprintf("%s=EXCLUDED.%s", col, col) + case UpdateOpAdd: + // Accumulate: col = COALESCE(col, 0) + EXCLUDED.col + updates[i] = fmt.Sprintf("%s=(COALESCE(%s.%s::numeric, 0) + EXCLUDED.%s::numeric)%s", col, o.table.nameEscaped, col, col, resultCast) + case UpdateOpMax: + // Maximum: col = GREATEST(COALESCE(col, 0), EXCLUDED.col) + updates[i] = fmt.Sprintf("%s=GREATEST(COALESCE(%s.%s::numeric, 0), EXCLUDED.%s::numeric)%s", col, o.table.nameEscaped, col, col, resultCast) + case UpdateOpMin: + // Minimum: col = LEAST(COALESCE(col, 0), EXCLUDED.col) + updates[i] = fmt.Sprintf("%s=LEAST(COALESCE(%s.%s::numeric, 0), EXCLUDED.%s::numeric)%s", col, o.table.nameEscaped, col, col, resultCast) + case UpdateOpSetIfNull: + // Set only if NULL (first value wins): col = COALESCE(col, EXCLUDED.col) + updates[i] = fmt.Sprintf("%s=COALESCE(%s.%s, EXCLUDED.%s)", col, o.table.nameEscaped, col, col) + default: + updates[i] = fmt.Sprintf("%s=EXCLUDED.%s", col, col) + } } // Escape primary key column names to preserve case sensitivity (e.g., camelCase) @@ -441,9 +482,31 @@ func (d *PostgresDialect) prepareStatement(schema string, o *Operation) (normalQ return insertQuery, "", nil case OperationTypeUpdate: + // Build per-field update expressions based on UpdateOp updates := make([]string, len(columns)) - for i := 0; i < len(columns); i++ { - updates[i] = fmt.Sprintf("%s=%s", columns[i], values[i]) + for i := range columns { + col := columns[i] + val := values[i] + resultCast := getResultCast(scanTypes[i]) + switch updateOps[i] { + case UpdateOpSet: + // Direct assignment: col = value + updates[i] = fmt.Sprintf("%s=%s", col, val) + case UpdateOpAdd: + // Accumulate: col = COALESCE(col, 0) + value + updates[i] = fmt.Sprintf("%s=(COALESCE(%s::numeric, 0) + %s::numeric)%s", col, col, val, resultCast) + case UpdateOpMax: + // Maximum: col = GREATEST(COALESCE(col, 0), value) + updates[i] = fmt.Sprintf("%s=GREATEST(COALESCE(%s::numeric, 0), %s::numeric)%s", col, col, val, resultCast) + case UpdateOpMin: + // Minimum: col = LEAST(COALESCE(col, 0), value) + updates[i] = fmt.Sprintf("%s=LEAST(COALESCE(%s::numeric, 0), %s::numeric)%s", col, col, val, resultCast) + case UpdateOpSetIfNull: + // Set only if NULL (first value wins): col = COALESCE(col, value) + updates[i] = fmt.Sprintf("%s=COALESCE(%s, %s)", col, col, val) + default: + updates[i] = fmt.Sprintf("%s=%s", col, val) + } } primaryKeySelector := getPrimaryKeyWhereClause(o.primaryKey, "") @@ -475,13 +538,15 @@ func (d *PostgresDialect) prepareStatement(schema string, o *Operation) (normalQ } } -func (d *PostgresDialect) prepareColValues(table *TableInfo, colValues map[string]string) (columns []string, values []string, err error) { +func (d *PostgresDialect) prepareColValues(table *TableInfo, colValues map[string]FieldData) (columns []string, values []string, updateOps []UpdateOp, scanTypes []reflect.Type, err error) { if len(colValues) == 0 { return } columns = make([]string, len(colValues)) values = make([]string, len(colValues)) + updateOps = make([]UpdateOp, len(colValues)) + scanTypes = make([]reflect.Type, len(colValues)) i := 0 for colName := range colValues { @@ -491,19 +556,21 @@ func (d *PostgresDialect) prepareColValues(table *TableInfo, colValues map[strin sort.Strings(columns) // sorted for determinism in tests for i, columnName := range columns { - value := colValues[columnName] + fieldData := colValues[columnName] columnInfo, found := table.columnsByName[columnName] if !found { - return nil, nil, fmt.Errorf("cannot find column %q for table %q (valid columns are %q)", columnName, table.identifier, strings.Join(maps.Keys(table.columnsByName), ", ")) + return nil, nil, nil, nil, fmt.Errorf("cannot find column %q for table %q (valid columns are %q)", columnName, table.identifier, strings.Join(maps.Keys(table.columnsByName), ", ")) } - normalizedValue, err := d.normalizeValueType(value, columnInfo.scanType) + normalizedValue, err := d.normalizeValueType(fieldData.Value, columnInfo.scanType) if err != nil { - return nil, nil, fmt.Errorf("getting sql value from table %s for column %q raw value %q: %w", table.identifier, columnName, value, err) + return nil, nil, nil, nil, fmt.Errorf("getting sql value from table %s for column %q raw value %q: %w", table.identifier, columnName, fieldData.Value, err) } values[i] = normalizedValue columns[i] = columnInfo.escapedName // escape the column name + updateOps[i] = fieldData.UpdateOp + scanTypes[i] = columnInfo.scanType } return } diff --git a/db_changes/db/dialect_postgres_test.go b/db_changes/db/dialect_postgres_test.go index 95173ae..1849b9d 100644 --- a/db_changes/db/dialect_postgres_test.go +++ b/db_changes/db/dialect_postgres_test.go @@ -266,4 +266,157 @@ func TestRevertOp(t *testing.T) { }) } +} + +// TestPrepareStatement_UpdateOp tests SQL generation for each UpdateOp type +func TestPrepareStatement_UpdateOp(t *testing.T) { + // Create a test table with numeric column + table := createTestTable(t, "test_table", "id", "amount") + + tests := []struct { + name string + opType OperationType + updateOp UpdateOp + value string + expectSQL string // substring to check in generated SQL + }{ + // UPSERT with different UpdateOps + { + name: "UPSERT SET", + opType: OperationTypeUpsert, + updateOp: UpdateOpSet, + value: "100", + expectSQL: `"amount"=EXCLUDED."amount"`, + }, + { + name: "UPSERT ADD", + opType: OperationTypeUpsert, + updateOp: UpdateOpAdd, + value: "100", + expectSQL: `"amount"=(COALESCE("test_table"."amount"::numeric, 0) + EXCLUDED."amount"::numeric)`, + }, + { + name: "UPSERT MAX", + opType: OperationTypeUpsert, + updateOp: UpdateOpMax, + value: "100", + expectSQL: `"amount"=GREATEST(COALESCE("test_table"."amount"::numeric, 0), EXCLUDED."amount"::numeric)`, + }, + { + name: "UPSERT MIN", + opType: OperationTypeUpsert, + updateOp: UpdateOpMin, + value: "100", + expectSQL: `"amount"=LEAST(COALESCE("test_table"."amount"::numeric, 0), EXCLUDED."amount"::numeric)`, + }, + { + name: "UPSERT SET_IF_NULL", + opType: OperationTypeUpsert, + updateOp: UpdateOpSetIfNull, + value: "100", + expectSQL: `"amount"=COALESCE("test_table"."amount", EXCLUDED."amount")`, + }, + + // UPDATE with different UpdateOps + { + name: "UPDATE SET", + opType: OperationTypeUpdate, + updateOp: UpdateOpSet, + value: "100", + expectSQL: `"amount"=100`, + }, + { + name: "UPDATE ADD", + opType: OperationTypeUpdate, + updateOp: UpdateOpAdd, + value: "100", + expectSQL: `"amount"=(COALESCE("amount"::numeric, 0) + 100::numeric)`, + }, + { + name: "UPDATE MAX", + opType: OperationTypeUpdate, + updateOp: UpdateOpMax, + value: "100", + expectSQL: `"amount"=GREATEST(COALESCE("amount"::numeric, 0), 100::numeric)`, + }, + { + name: "UPDATE MIN", + opType: OperationTypeUpdate, + updateOp: UpdateOpMin, + value: "100", + expectSQL: `"amount"=LEAST(COALESCE("amount"::numeric, 0), 100::numeric)`, + }, + { + name: "UPDATE SET_IF_NULL", + opType: OperationTypeUpdate, + updateOp: UpdateOpSetIfNull, + value: "100", + expectSQL: `"amount"=COALESCE("amount", 100)`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dialect := PostgresDialect{schemaName: "public"} + op := &Operation{ + table: table, + opType: tt.opType, + primaryKey: map[string]string{"id": "123"}, + data: map[string]FieldData{ + "amount": {Value: tt.value, UpdateOp: tt.updateOp}, + }, + } + + sql, _, err := dialect.prepareStatement("public", op) + require.NoError(t, err) + assert.Contains(t, sql, tt.expectSQL, "SQL should contain expected UpdateOp clause") + }) + } +} + +// TestPrepareStatement_INSERT_IgnoresUpdateOp tests that INSERT ignores UpdateOp (always direct values) +func TestPrepareStatement_INSERT_IgnoresUpdateOp(t *testing.T) { + table := createTestTable(t, "test_table", "id", "amount") + + // For INSERT, UpdateOp should not affect the SQL - it's always a direct INSERT + ops := []UpdateOp{UpdateOpSet, UpdateOpAdd, UpdateOpMax, UpdateOpMin, UpdateOpSetIfNull} + + for _, updateOp := range ops { + t.Run(updateOpName(updateOp), func(t *testing.T) { + dialect := PostgresDialect{schemaName: "public"} + op := &Operation{ + table: table, + opType: OperationTypeInsert, + primaryKey: map[string]string{"id": "123"}, + data: map[string]FieldData{ + "amount": {Value: "100", UpdateOp: updateOp}, + }, + } + + sql, _, err := dialect.prepareStatement("public", op) + require.NoError(t, err) + // INSERT should always be a simple INSERT regardless of UpdateOp + assert.Contains(t, sql, "INSERT INTO") + assert.Contains(t, sql, "VALUES") + assert.NotContains(t, sql, "ON CONFLICT", "INSERT should not have ON CONFLICT clause") + }) + } +} + +// createTestTable creates a TableInfo for testing with numeric columns +func createTestTable(t *testing.T, name, pkCol string, extraCols ...string) *TableInfo { + t.Helper() + columns := make(map[string]*ColumnInfo) + + // Primary key column (text) + columns[pkCol] = NewColumnInfo(pkCol, "text", "") + + // Extra columns (numeric for UpdateOp testing) + for _, col := range extraCols { + columns[col] = NewColumnInfo(col, "numeric", int64(0)) + } + + table, err := NewTableInfo("public", name, []string{pkCol}, columns) + require.NoError(t, err) + return table } \ No newline at end of file diff --git a/db_changes/db/operations.go b/db_changes/db/operations.go index 937668c..297ddba 100644 --- a/db_changes/db/operations.go +++ b/db_changes/db/operations.go @@ -3,6 +3,7 @@ package db import ( "encoding/json" "fmt" + "math/big" "reflect" "regexp" "strings" @@ -24,11 +25,28 @@ const ( OperationTypeDelete OperationType = "DELETE" ) +// UpdateOp defines the operation to apply when updating a field on conflict +type UpdateOp int32 + +const ( + UpdateOpSet UpdateOp = 0 // Direct assignment: col = value + UpdateOpAdd UpdateOp = 1 // Accumulate: col = COALESCE(col, 0) + value + UpdateOpMax UpdateOp = 2 // Maximum: col = GREATEST(COALESCE(col, 0), value) + UpdateOpMin UpdateOp = 3 // Minimum: col = LEAST(COALESCE(col, 0), value) + UpdateOpSetIfNull UpdateOp = 4 // Set only if NULL: col = COALESCE(col, value) +) + +// FieldData holds a field's value and its update operation +type FieldData struct { + Value string + UpdateOp UpdateOp +} + type Operation struct { table *TableInfo opType OperationType primaryKey map[string]string - data map[string]string + data map[string]FieldData ordinal uint64 reversibleBlockNum *uint64 // nil if that block is known to be irreversible } @@ -37,7 +55,7 @@ func (o *Operation) String() string { return fmt.Sprintf("%s/%s (%s)", o.table.identifier, createRowUniqueID(o.primaryKey), strings.ToLower(string(o.opType))) } -func (l *Loader) newInsertOperation(table *TableInfo, primaryKey map[string]string, data map[string]string, ordinal uint64, reversibleBlockNum *uint64) *Operation { +func (l *Loader) newInsertOperation(table *TableInfo, primaryKey map[string]string, data map[string]FieldData, ordinal uint64, reversibleBlockNum *uint64) *Operation { return &Operation{ table: table, opType: OperationTypeInsert, @@ -48,7 +66,7 @@ func (l *Loader) newInsertOperation(table *TableInfo, primaryKey map[string]stri } } -func (l *Loader) newUpsertOperation(table *TableInfo, primaryKey map[string]string, data map[string]string, ordinal uint64, reversibleBlockNum *uint64) *Operation { +func (l *Loader) newUpsertOperation(table *TableInfo, primaryKey map[string]string, data map[string]FieldData, ordinal uint64, reversibleBlockNum *uint64) *Operation { return &Operation{ table: table, opType: OperationTypeUpsert, @@ -59,7 +77,7 @@ func (l *Loader) newUpsertOperation(table *TableInfo, primaryKey map[string]stri } } -func (l *Loader) newUpdateOperation(table *TableInfo, primaryKey map[string]string, data map[string]string, ordinal uint64, reversibleBlockNum *uint64) *Operation { +func (l *Loader) newUpdateOperation(table *TableInfo, primaryKey map[string]string, data map[string]FieldData, ordinal uint64, reversibleBlockNum *uint64) *Operation { return &Operation{ table: table, opType: OperationTypeUpdate, @@ -80,19 +98,197 @@ func (l *Loader) newDeleteOperation(table *TableInfo, primaryKey map[string]stri } } -func (o *Operation) mergeData(newData map[string]string) error { +func (o *Operation) mergeData(newData map[string]FieldData) error { if o.opType == OperationTypeDelete { return fmt.Errorf("unable to merge data for a delete operation") } - for k, v := range newData { - o.data[k] = v + for k, fd := range newData { + existing, exists := o.data[k] + if !exists { + o.data[k] = fd + continue + } + + // Validate transition based on strict rules (consistent with Rust library) + // SET can be followed by any op, but non-SET ops can only be followed by same type + if err := validateOpTransition(k, existing.UpdateOp, fd.UpdateOp); err != nil { + return err + } + + // Handle each incoming operation type + switch fd.UpdateOp { + case UpdateOpSet: + // SET: latest value wins, overwrites any previous operation + o.data[k] = fd + + case UpdateOpAdd: + // ADD: accumulate values (valid after SET or ADD) + existingDec, err1 := parseDecimal(existing.Value) + newDec, err2 := parseDecimal(fd.Value) + if err1 == nil && err2 == nil { + o.data[k] = FieldData{ + Value: existingDec.Add(newDec).String(), + UpdateOp: existing.UpdateOp, // Keep existing op: SET stays SET, ADD stays ADD + } + } else { + // Non-numeric: latest value wins + o.data[k] = fd + } + + case UpdateOpMax: + // MAX: compute maximum (valid after SET or MAX) + existingDec, err1 := parseDecimal(existing.Value) + newDec, err2 := parseDecimal(fd.Value) + if err1 == nil && err2 == nil { + maxVal := existingDec + if newDec.Cmp(existingDec.Rat) > 0 { + maxVal = newDec + } + o.data[k] = FieldData{ + Value: maxVal.String(), + UpdateOp: existing.UpdateOp, // Keep existing op: SET stays SET, MAX stays MAX + } + } else { + // Non-numeric: latest value wins + o.data[k] = fd + } + + case UpdateOpMin: + // MIN: compute minimum (valid after SET or MIN) + existingDec, err1 := parseDecimal(existing.Value) + newDec, err2 := parseDecimal(fd.Value) + if err1 == nil && err2 == nil { + minVal := existingDec + if newDec.Cmp(existingDec.Rat) < 0 { + minVal = newDec + } + o.data[k] = FieldData{ + Value: minVal.String(), + UpdateOp: existing.UpdateOp, // Keep existing op: SET stays SET, MIN stays MIN + } + } else { + // Non-numeric: latest value wins + o.data[k] = fd + } + + case UpdateOpSetIfNull: + // SET_IF_NULL: keep existing value (first value wins) + // Field already exists, so keep it and don't overwrite + continue + } } return nil } +// validateOpTransition checks if the transition from existing to incoming op is valid. +// Returns an error for invalid transitions. +// +// Valid transitions: +// - SET → any op: OK +// - any op → SET: OK (SET always overwrites) +// - ADD → ADD: OK (accumulates) +// - MAX → MAX: OK (computes max) +// - MIN → MIN: OK (computes min) +// - SET_IF_NULL → SET_IF_NULL: OK (first value wins) +// +// All other transitions are invalid. +func validateOpTransition(fieldName string, existing, incoming UpdateOp) error { + // SET can be followed by any operation + if existing == UpdateOpSet { + return nil + } + + // Any operation can be followed by SET (SET overwrites) + if incoming == UpdateOpSet { + return nil + } + + // Non-SET ops can only be followed by the same op type + if existing == incoming { + return nil + } + + // Invalid transition + return fmt.Errorf( + "invalid UpdateOp transition for field %q: cannot apply %s after %s (only %s → %s or SET → %s is allowed)", + fieldName, + updateOpName(incoming), + updateOpName(existing), + updateOpName(existing), + updateOpName(existing), + updateOpName(incoming), + ) +} + +func updateOpName(op UpdateOp) string { + switch op { + case UpdateOpSet: + return "SET" + case UpdateOpAdd: + return "ADD" + case UpdateOpMax: + return "MAX" + case UpdateOpMin: + return "MIN" + case UpdateOpSetIfNull: + return "SET_IF_NULL" + default: + return fmt.Sprintf("UNKNOWN(%d)", op) + } +} + +func parseDecimal(s string) (decimal, error) { + // Simple decimal parsing - just use big.Rat for precision + var d decimal + _, ok := d.SetString(s) + if !ok { + return decimal{}, fmt.Errorf("invalid decimal: %s", s) + } + return d, nil +} + +// decimal is a simple wrapper around big.Rat for delta accumulation +type decimal struct { + *big.Rat +} + +func (d *decimal) SetString(s string) (*decimal, bool) { + if d.Rat == nil { + d.Rat = new(big.Rat) + } + _, ok := d.Rat.SetString(s) + return d, ok +} + +func (d decimal) Add(other decimal) decimal { + result := new(big.Rat) + result.Add(d.Rat, other.Rat) + return decimal{result} +} + +func (d decimal) Sub(other decimal) decimal { + result := new(big.Rat) + result.Sub(d.Rat, other.Rat) + return decimal{result} +} + +func (d decimal) Neg() decimal { + result := new(big.Rat) + result.Neg(d.Rat) + return decimal{result} +} + +func (d decimal) Sign() int { + return d.Rat.Sign() +} + +func (d decimal) String() string { + return d.Rat.FloatString(18) +} + // mergeOperation merges another operation into this one, keeping the lowest ordinal -func (o *Operation) mergeOperation(otherData map[string]string) error { +func (o *Operation) mergeOperation(otherData map[string]FieldData) error { if o.opType == OperationTypeDelete { return fmt.Errorf("unable to merge operation for a delete operation") } diff --git a/db_changes/db/operations_test.go b/db_changes/db/operations_test.go index f3861c7..abc866e 100644 --- a/db_changes/db/operations_test.go +++ b/db_changes/db/operations_test.go @@ -156,7 +156,7 @@ func TestEscapeValues(t *testing.T) { func Test_prepareColValues(t *testing.T) { type args struct { table *TableInfo - colValues map[string]string + colValues map[string]FieldData } tests := []struct { name string @@ -169,7 +169,7 @@ func Test_prepareColValues(t *testing.T) { "bool true", args{ newTable(t, "schemaName", "name", "id", NewColumnInfo("col", "bool", true)), - map[string]string{"col": "true"}, + map[string]FieldData{"col": {Value: "true", UpdateOp: UpdateOpSet}}, }, []string{`"col"`}, []string{`'true'`}, @@ -180,7 +180,7 @@ func Test_prepareColValues(t *testing.T) { t.Run(tt.name, func(t *testing.T) { dialect := PostgresDialect{} - gotColumns, gotValues, err := dialect.prepareColValues(tt.args.table, tt.args.colValues) + gotColumns, gotValues, _, _, err := dialect.prepareColValues(tt.args.table, tt.args.colValues) tt.assertion(t, err) assert.Equal(t, tt.wantColumns, gotColumns) assert.Equal(t, tt.wantValues, gotValues) @@ -200,3 +200,229 @@ func newTable(t *testing.T, schema, name, primaryColumn string, columnInfos ...* return table } + +// TestMergeData_ValidTransitions tests all valid UpdateOp transitions +func TestMergeData_ValidTransitions(t *testing.T) { + tests := []struct { + name string + existingOp UpdateOp + existingValue string + incomingOp UpdateOp + incomingValue string + expectedValue string + expectedOp UpdateOp + }{ + // SET → any (all allowed) + {"SET → SET", UpdateOpSet, "100", UpdateOpSet, "200", "200", UpdateOpSet}, + {"SET → ADD", UpdateOpSet, "100", UpdateOpAdd, "50", "150.000000000000000000", UpdateOpSet}, + {"SET → MAX", UpdateOpSet, "100", UpdateOpMax, "150", "150.000000000000000000", UpdateOpSet}, + {"SET → MAX (existing wins)", UpdateOpSet, "200", UpdateOpMax, "150", "200.000000000000000000", UpdateOpSet}, + {"SET → MIN", UpdateOpSet, "100", UpdateOpMin, "50", "50.000000000000000000", UpdateOpSet}, + {"SET → MIN (existing wins)", UpdateOpSet, "50", UpdateOpMin, "100", "50.000000000000000000", UpdateOpSet}, + {"SET → SET_IF_NULL", UpdateOpSet, "100", UpdateOpSetIfNull, "200", "100", UpdateOpSet}, + + // ADD → ADD (accumulates) + {"ADD → ADD", UpdateOpAdd, "100", UpdateOpAdd, "50", "150.000000000000000000", UpdateOpAdd}, + {"ADD → ADD (negative)", UpdateOpAdd, "100", UpdateOpAdd, "-30", "70.000000000000000000", UpdateOpAdd}, + + // MAX → MAX (computes max) + {"MAX → MAX (new wins)", UpdateOpMax, "100", UpdateOpMax, "150", "150.000000000000000000", UpdateOpMax}, + {"MAX → MAX (existing wins)", UpdateOpMax, "200", UpdateOpMax, "150", "200.000000000000000000", UpdateOpMax}, + + // MIN → MIN (computes min) + {"MIN → MIN (new wins)", UpdateOpMin, "100", UpdateOpMin, "50", "50.000000000000000000", UpdateOpMin}, + {"MIN → MIN (existing wins)", UpdateOpMin, "50", UpdateOpMin, "100", "50.000000000000000000", UpdateOpMin}, + + // SET_IF_NULL → SET_IF_NULL (first value wins) + {"SET_IF_NULL → SET_IF_NULL", UpdateOpSetIfNull, "100", UpdateOpSetIfNull, "200", "100", UpdateOpSetIfNull}, + + // any → SET (SET always overwrites) + {"ADD → SET", UpdateOpAdd, "100", UpdateOpSet, "200", "200", UpdateOpSet}, + {"MAX → SET", UpdateOpMax, "100", UpdateOpSet, "200", "200", UpdateOpSet}, + {"MIN → SET", UpdateOpMin, "100", UpdateOpSet, "200", "200", UpdateOpSet}, + {"SET_IF_NULL → SET", UpdateOpSetIfNull, "100", UpdateOpSet, "200", "200", UpdateOpSet}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + op := &Operation{ + opType: OperationTypeUpsert, + data: map[string]FieldData{"field": {Value: tt.existingValue, UpdateOp: tt.existingOp}}, + } + + err := op.mergeData(map[string]FieldData{"field": {Value: tt.incomingValue, UpdateOp: tt.incomingOp}}) + require.NoError(t, err) + + assert.Equal(t, tt.expectedValue, op.data["field"].Value) + assert.Equal(t, tt.expectedOp, op.data["field"].UpdateOp) + }) + } +} + +// TestMergeData_InvalidTransitions tests all invalid UpdateOp transitions return errors +func TestMergeData_InvalidTransitions(t *testing.T) { + tests := []struct { + name string + existingOp UpdateOp + incomingOp UpdateOp + }{ + // ADD → others (except ADD and SET) + {"ADD → MAX", UpdateOpAdd, UpdateOpMax}, + {"ADD → MIN", UpdateOpAdd, UpdateOpMin}, + {"ADD → SET_IF_NULL", UpdateOpAdd, UpdateOpSetIfNull}, + + // MAX → others (except MAX and SET) + {"MAX → ADD", UpdateOpMax, UpdateOpAdd}, + {"MAX → MIN", UpdateOpMax, UpdateOpMin}, + {"MAX → SET_IF_NULL", UpdateOpMax, UpdateOpSetIfNull}, + + // MIN → others (except MIN and SET) + {"MIN → ADD", UpdateOpMin, UpdateOpAdd}, + {"MIN → MAX", UpdateOpMin, UpdateOpMax}, + {"MIN → SET_IF_NULL", UpdateOpMin, UpdateOpSetIfNull}, + + // SET_IF_NULL → others (except SET_IF_NULL and SET) + {"SET_IF_NULL → ADD", UpdateOpSetIfNull, UpdateOpAdd}, + {"SET_IF_NULL → MAX", UpdateOpSetIfNull, UpdateOpMax}, + {"SET_IF_NULL → MIN", UpdateOpSetIfNull, UpdateOpMin}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + op := &Operation{ + opType: OperationTypeUpsert, + data: map[string]FieldData{"field": {Value: "100", UpdateOp: tt.existingOp}}, + } + + err := op.mergeData(map[string]FieldData{"field": {Value: "200", UpdateOp: tt.incomingOp}}) + require.Error(t, err) + assert.Contains(t, err.Error(), "invalid UpdateOp transition") + }) + } +} + +// TestMergeData_NewField tests adding a new field (no existing) +func TestMergeData_NewField(t *testing.T) { + ops := []UpdateOp{UpdateOpSet, UpdateOpAdd, UpdateOpMax, UpdateOpMin, UpdateOpSetIfNull} + + for _, op := range ops { + t.Run(updateOpName(op), func(t *testing.T) { + operation := &Operation{ + opType: OperationTypeUpsert, + data: map[string]FieldData{}, + } + + err := operation.mergeData(map[string]FieldData{"field": {Value: "100", UpdateOp: op}}) + require.NoError(t, err) + assert.Equal(t, "100", operation.data["field"].Value) + assert.Equal(t, op, operation.data["field"].UpdateOp) + }) + } +} + +// TestMergeData_NonNumeric tests handling of non-numeric values +func TestMergeData_NonNumeric(t *testing.T) { + tests := []struct { + name string + existingValue string + incomingValue string + op UpdateOp + expectedValue string + }{ + {"ADD non-numeric", "hello", "world", UpdateOpAdd, "world"}, + {"MAX non-numeric", "hello", "world", UpdateOpMax, "world"}, + {"MIN non-numeric", "hello", "world", UpdateOpMin, "world"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + op := &Operation{ + opType: OperationTypeUpsert, + data: map[string]FieldData{"field": {Value: tt.existingValue, UpdateOp: tt.op}}, + } + + err := op.mergeData(map[string]FieldData{"field": {Value: tt.incomingValue, UpdateOp: tt.op}}) + require.NoError(t, err) + assert.Equal(t, tt.expectedValue, op.data["field"].Value) + }) + } +} + +// TestMergeData_DecimalPrecision tests high precision decimal handling +func TestMergeData_DecimalPrecision(t *testing.T) { + tests := []struct { + name string + existingValue string + incomingValue string + expectedValue string + }{ + {"small decimals", "0.000000000000000001", "0.000000000000000002", "0.000000000000000003"}, + {"large numbers", "1000000000000000000", "1000000000000000000", "2000000000000000000.000000000000000000"}, + {"mixed precision", "100.5", "50.25", "150.750000000000000000"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + op := &Operation{ + opType: OperationTypeUpsert, + data: map[string]FieldData{"field": {Value: tt.existingValue, UpdateOp: UpdateOpAdd}}, + } + + err := op.mergeData(map[string]FieldData{"field": {Value: tt.incomingValue, UpdateOp: UpdateOpAdd}}) + require.NoError(t, err) + assert.Equal(t, tt.expectedValue, op.data["field"].Value) + }) + } +} + +// TestMergeData_MultipleFields tests merging multiple fields at once +func TestMergeData_MultipleFields(t *testing.T) { + op := &Operation{ + opType: OperationTypeUpsert, + data: map[string]FieldData{ + "counter": {Value: "100", UpdateOp: UpdateOpAdd}, + "max_price": {Value: "50", UpdateOp: UpdateOpMax}, + "min_price": {Value: "100", UpdateOp: UpdateOpMin}, + "first_seen": {Value: "2024-01-01", UpdateOp: UpdateOpSetIfNull}, + "name": {Value: "old", UpdateOp: UpdateOpSet}, + }, + } + + incoming := map[string]FieldData{ + "counter": {Value: "50", UpdateOp: UpdateOpAdd}, + "max_price": {Value: "75", UpdateOp: UpdateOpMax}, + "min_price": {Value: "80", UpdateOp: UpdateOpMin}, + "first_seen": {Value: "2024-02-01", UpdateOp: UpdateOpSetIfNull}, + "name": {Value: "new", UpdateOp: UpdateOpSet}, + } + + err := op.mergeData(incoming) + require.NoError(t, err) + + assert.Equal(t, "150.000000000000000000", op.data["counter"].Value) + assert.Equal(t, UpdateOpAdd, op.data["counter"].UpdateOp) + + assert.Equal(t, "75.000000000000000000", op.data["max_price"].Value) + assert.Equal(t, UpdateOpMax, op.data["max_price"].UpdateOp) + + assert.Equal(t, "80.000000000000000000", op.data["min_price"].Value) + assert.Equal(t, UpdateOpMin, op.data["min_price"].UpdateOp) + + assert.Equal(t, "2024-01-01", op.data["first_seen"].Value) + assert.Equal(t, UpdateOpSetIfNull, op.data["first_seen"].UpdateOp) + + assert.Equal(t, "new", op.data["name"].Value) + assert.Equal(t, UpdateOpSet, op.data["name"].UpdateOp) +} + +// TestMergeData_DeleteOperation tests that merging into a delete operation fails +func TestMergeData_DeleteOperation(t *testing.T) { + op := &Operation{ + opType: OperationTypeDelete, + data: map[string]FieldData{}, + } + + err := op.mergeData(map[string]FieldData{"field": {Value: "100", UpdateOp: UpdateOpSet}}) + require.Error(t, err) + assert.Contains(t, err.Error(), "delete operation") +} diff --git a/db_changes/db/ops.go b/db_changes/db/ops.go index 69d6df6..e9537a1 100644 --- a/db_changes/db/ops.go +++ b/db_changes/db/ops.go @@ -11,7 +11,7 @@ import ( // Insert a row in the DB, it is assumed the table exists, you can do a // check before with HasTable() -func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) error { +func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map[string]FieldData, reversibleBlockNum *uint64) error { uniqueID := createRowUniqueID(primaryKey) if l.tracer.Enabled() { @@ -55,7 +55,7 @@ func (l *Loader) Insert(tableName string, primaryKey map[string]string, data map // We need to make sure to add the primary key(s) in the data so that those column get created correctly, but only if there is data for _, primary := range l.tables[tableName].primaryColumns { if dataFromPrimaryKey, ok := primaryKey[primary.name]; ok { - data[primary.name] = dataFromPrimaryKey + data[primary.name] = FieldData{Value: dataFromPrimaryKey, UpdateOp: UpdateOpSet} } } @@ -101,7 +101,7 @@ func (l *Loader) GetPrimaryKey(tableName string, pk string) (map[string]string, // Upsert a row in the DB, it is assumed the table exists, you can do a // check before with HasTable(). -func (l *Loader) Upsert(tableName string, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) error { +func (l *Loader) Upsert(tableName string, primaryKey map[string]string, data map[string]FieldData, reversibleBlockNum *uint64) error { if l.dialect.OnlyInserts() { return fmt.Errorf("update operation is not supported by the current database") } @@ -161,7 +161,7 @@ func (l *Loader) Upsert(tableName string, primaryKey map[string]string, data map // We need to make sure to add the primary key(s) in the data so that those column get created correctly, but only if there is data for _, primary := range l.tables[tableName].primaryColumns { if dataFromPrimaryKey, ok := primaryKey[primary.name]; ok { - data[primary.name] = dataFromPrimaryKey + data[primary.name] = FieldData{Value: dataFromPrimaryKey, UpdateOp: UpdateOpSet} } } @@ -171,7 +171,7 @@ func (l *Loader) Upsert(tableName string, primaryKey map[string]string, data map // Update a row in the DB, it is assumed the table exists, you can do a // check before with HasTable() -func (l *Loader) Update(tableName string, primaryKey map[string]string, data map[string]string, reversibleBlockNum *uint64) error { +func (l *Loader) Update(tableName string, primaryKey map[string]string, data map[string]FieldData, reversibleBlockNum *uint64) error { if l.dialect.OnlyInserts() { return fmt.Errorf("update operation is not supported by the current database") } @@ -277,3 +277,4 @@ func (l *Loader) Delete(tableName string, primaryKey map[string]string, reversib entry.Set(uniqueID, l.newDeleteOperation(table, primaryKey, l.NextBatchOrdinal(), reversibleBlockNum)) return nil } + diff --git a/db_changes/sinker/generate_csv_sinker.go b/db_changes/sinker/generate_csv_sinker.go index e5afa88..84b130c 100644 --- a/db_changes/sinker/generate_csv_sinker.go +++ b/db_changes/sinker/generate_csv_sinker.go @@ -253,7 +253,7 @@ func (s *GenerateCSVSinker) dumpDatabaseChangesIntoCSV(dbChanges *pbdatabase.Dat case pbdatabase.TableChange_OPERATION_CREATE: // Add fields for _, field := range change.Fields { - fields[field.Name] = field.NewValue + fields[field.Name] = field.Value } data, _ := bundler2.CSVEncode(fields) diff --git a/db_changes/sinker/sinker.go b/db_changes/sinker/sinker.go index 0dbc1b3..d7a1b7f 100644 --- a/db_changes/sinker/sinker.go +++ b/db_changes/sinker/sinker.go @@ -274,9 +274,12 @@ func (s *SQLSinker) applyDatabaseChanges(dbChanges *pbdatabase.DatabaseChanges, return fmt.Errorf("unknown primary key type: %T", change.PrimaryKey) } - changes := map[string]string{} + changes := map[string]db2.FieldData{} for _, field := range change.Fields { - changes[field.Name] = field.NewValue + changes[field.Name] = db2.FieldData{ + Value: field.Value, + UpdateOp: protoUpdateOpToDbUpdateOp(field.UpdateOp), + } } var reversibleBlockNum *uint64 @@ -312,6 +315,22 @@ func (s *SQLSinker) applyDatabaseChanges(dbChanges *pbdatabase.DatabaseChanges, return nil } +// protoUpdateOpToDbUpdateOp converts proto Field_UpdateOp to db UpdateOp +func protoUpdateOpToDbUpdateOp(op pbdatabase.Field_UpdateOp) db2.UpdateOp { + switch op { + case pbdatabase.Field_UPDATE_OP_ADD: + return db2.UpdateOpAdd + case pbdatabase.Field_UPDATE_OP_MAX: + return db2.UpdateOpMax + case pbdatabase.Field_UPDATE_OP_MIN: + return db2.UpdateOpMin + case pbdatabase.Field_UPDATE_OP_SET_IF_NULL: + return db2.UpdateOpSetIfNull + default: + return db2.UpdateOpSet + } +} + func (s *SQLSinker) HandleBlockRangeCompletion(ctx context.Context, cursor *sink.Cursor) error { // To be moved in the base sinker library, happens usually only on integration tests where the connection // can close with "nil" error but we haven't completed the range for real yet. diff --git a/db_changes/sinker/sinker_test.go b/db_changes/sinker/sinker_test.go index b5fb8c3..949c497 100644 --- a/db_changes/sinker/sinker_test.go +++ b/db_changes/sinker/sinker_test.go @@ -268,8 +268,8 @@ func getFields(fieldsAndValues ...string) (out []*pbdatabase.Field) { } for i := 0; i < len(fieldsAndValues); i += 2 { out = append(out, &pbdatabase.Field{ - Name: fieldsAndValues[i], - NewValue: fieldsAndValues[i+1], + Name: fieldsAndValues[i], + Value: fieldsAndValues[i+1], }) } return @@ -363,3 +363,33 @@ func simpleCursor(num, finalNum uint64) string { HeadBlock: blk, }).ToOpaque() } + +// TestProtoUpdateOpToDbUpdateOp tests the proto-to-db UpdateOp converter +func TestProtoUpdateOpToDbUpdateOp(t *testing.T) { + tests := []struct { + name string + protoOp pbdatabase.Field_UpdateOp + expected db2.UpdateOp + }{ + {"SET (default)", pbdatabase.Field_UPDATE_OP_SET, db2.UpdateOpSet}, + {"ADD", pbdatabase.Field_UPDATE_OP_ADD, db2.UpdateOpAdd}, + {"MAX", pbdatabase.Field_UPDATE_OP_MAX, db2.UpdateOpMax}, + {"MIN", pbdatabase.Field_UPDATE_OP_MIN, db2.UpdateOpMin}, + {"SET_IF_NULL", pbdatabase.Field_UPDATE_OP_SET_IF_NULL, db2.UpdateOpSetIfNull}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := protoUpdateOpToDbUpdateOp(tt.protoOp) + assert.Equal(t, tt.expected, result) + }) + } +} + +// TestProtoUpdateOpToDbUpdateOp_UnknownValue tests that unknown proto values default to SET +func TestProtoUpdateOpToDbUpdateOp_UnknownValue(t *testing.T) { + // Test with an unknown/invalid proto value - should default to SET + unknownOp := pbdatabase.Field_UpdateOp(999) + result := protoUpdateOpToDbUpdateOp(unknownOp) + assert.Equal(t, db2.UpdateOpSet, result, "unknown proto values should default to SET") +} diff --git a/go.mod b/go.mod index 7a55350..f61a5df 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/streamingfast/logging v0.0.0-20251127143054-23a35e5bd633 github.com/streamingfast/substreams v1.16.7-0.20250925152521-9d7a8ef0f261 github.com/streamingfast/substreams-sink v0.5.3-0.20250818134825-6b25ffb8232c - github.com/streamingfast/substreams-sink-database-changes v1.3.2-0.20250509171446-cb64cdbfea72 + github.com/streamingfast/substreams-sink-database-changes v1.3.2-0.20260110015235-04b544bbecb9 github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.38.0 github.com/testcontainers/testcontainers-go/modules/clickhouse v0.38.0 diff --git a/go.sum b/go.sum index 447292b..7205897 100644 --- a/go.sum +++ b/go.sum @@ -626,8 +626,8 @@ github.com/streamingfast/substreams v1.16.7-0.20250925152521-9d7a8ef0f261 h1:5vk github.com/streamingfast/substreams v1.16.7-0.20250925152521-9d7a8ef0f261/go.mod h1:3s/qTXV85jh40Li1gbYTRtbAOyFzpaHN0Bi1OtuXIDA= github.com/streamingfast/substreams-sink v0.5.3-0.20250818134825-6b25ffb8232c h1:LCJhSgR7XTiKxCtIBPyoFLQdYRCbFaW8EljIL8IvaWs= github.com/streamingfast/substreams-sink v0.5.3-0.20250818134825-6b25ffb8232c/go.mod h1:EbwZgN7FRZY6oNBmA7ufcaHZ215nDo3ejyyasuS3xj4= -github.com/streamingfast/substreams-sink-database-changes v1.3.2-0.20250509171446-cb64cdbfea72 h1:yjEm3ypUUxCVFiaAwHr477ClRAJXHLXX2VHzk6HsxHk= -github.com/streamingfast/substreams-sink-database-changes v1.3.2-0.20250509171446-cb64cdbfea72/go.mod h1:f51ljuUsQEYuyuDdo5BB/4AfB87QDAegy5e8p5qBxis= +github.com/streamingfast/substreams-sink-database-changes v1.3.2-0.20260110015235-04b544bbecb9 h1:B+YC18umjoESDnLKZScl0GZtPQoE0/U2Mz8shnfTISc= +github.com/streamingfast/substreams-sink-database-changes v1.3.2-0.20260110015235-04b544bbecb9/go.mod h1:f51ljuUsQEYuyuDdo5BB/4AfB87QDAegy5e8p5qBxis= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= diff --git a/tests/integration/db_changes_postgres_test.go b/tests/integration/db_changes_postgres_test.go index 714bcfb..6274c7a 100644 --- a/tests/integration/db_changes_postgres_test.go +++ b/tests/integration/db_changes_postgres_test.go @@ -804,6 +804,1004 @@ func TestSinker_Integration_UndoBufferWorks(t *testing.T) { ) } +func TestSinker_Integration_DeltaUpdate_Add(t *testing.T) { + type CounterRow struct { + ID string `db:"id"` + Count int64 `db:"count"` + } + + tests := []sinkerTestCase{ + { + "add - initial upsert sets the value", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaAdd("100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "add - multiple adds accumulate within same block", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaAdd("100")), + upsertRowSinglePK("counters", "counter1", "count", deltaAdd("50")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 150}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "add - adds accumulate across blocks", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaAdd("100")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", deltaAdd("25")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 125}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "add - negative delta subtracts", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaAdd("100")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", deltaAdd("-30")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 70}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "add - insert then update with delta", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", "100"), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + updateRowSinglePK("counters", "counter1", "count", deltaAdd("50")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 150}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "add - insert with delta sets initial value", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", deltaAdd("100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "add - insert with delta then update with delta accumulates", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", deltaAdd("100")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + updateRowSinglePK("counters", "counter1", "count", deltaAdd("25")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 125}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "add - multiple updates with delta in same block", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", "100"), + updateRowSinglePK("counters", "counter1", "count", deltaAdd("20")), + updateRowSinglePK("counters", "counter1", "count", deltaAdd("30")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 150}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "add - set then add in same block", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", "100"), + upsertRowSinglePK("counters", "counter1", "count", deltaAdd("10")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 110}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "add - set then add across blocks", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", "100"), + upsertRowSinglePK("counters", "counter1", "count", deltaAdd("10")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", "50"), + upsertRowSinglePK("counters", "counter1", "count", deltaAdd("10")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 60}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "sub - basic subtraction", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", "100"), + upsertRowSinglePK("counters", "counter1", "count", deltaSub("30")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 70}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "sub - multiple subtractions accumulate", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", "100"), + upsertRowSinglePK("counters", "counter1", "count", deltaSub("20")), + upsertRowSinglePK("counters", "counter1", "count", deltaSub("15")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 65}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "sub - subtracting negative value adds", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", "100"), + upsertRowSinglePK("counters", "counter1", "count", deltaSub("-25")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 125}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runSinkerTest( + t, + sharedDbChangesPostgresContainer, + nil, + nil, + rawSQLInput(func(schema string) string { + return fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS "%s".counters ( + id TEXT PRIMARY KEY, + count BIGINT DEFAULT 0 + ); + `, schema) + }), + test.responses, + test.expected, + test.expectedFinalCursor, + ) + }) + } +} + +func TestSinker_Integration_DeltaUpdate_Max(t *testing.T) { + type CounterRow struct { + ID string `db:"id"` + Count int64 `db:"count"` + } + + tests := []sinkerTestCase{ + { + "max - initial upsert sets the value", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "max - keeps higher value in same block", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("100")), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("50")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "max - updates to higher value in same block", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("50")), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "max - keeps higher value across blocks", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("100")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("50")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "max - updates to higher value across blocks", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("50")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "max - insert then update with max", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", "100"), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + updateRowSinglePK("counters", "counter1", "count", deltaMax("150")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 150}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "max - insert then update with lower max keeps original", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", "100"), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + updateRowSinglePK("counters", "counter1", "count", deltaMax("50")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "max - insert with max sets initial value", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", deltaMax("100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "max - multiple updates with max in same block", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", "100"), + updateRowSinglePK("counters", "counter1", "count", deltaMax("80")), + updateRowSinglePK("counters", "counter1", "count", deltaMax("150")), + updateRowSinglePK("counters", "counter1", "count", deltaMax("120")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 150}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "max - set then max in same block", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", "100"), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("150")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 150}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "max - set then max across blocks overwrites", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", "100"), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("150")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + // set(50) resets the value, then max(80) computes max(50,80)=80 + // Since UpdateOp stays SET after set→max, the flush overwrites with 80 + upsertRowSinglePK("counters", "counter1", "count", "50"), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("80")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 80}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "max - handles negative values", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("-50")), + upsertRowSinglePK("counters", "counter1", "count", deltaMax("-100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: -50}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runSinkerTest( + t, + sharedDbChangesPostgresContainer, + nil, + nil, + rawSQLInput(func(schema string) string { + return fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS "%s".counters ( + id TEXT PRIMARY KEY, + count BIGINT DEFAULT 0 + ); + `, schema) + }), + test.responses, + test.expected, + test.expectedFinalCursor, + ) + }) + } +} + +func TestSinker_Integration_DeltaUpdate_Min(t *testing.T) { + type CounterRow struct { + ID string `db:"id"` + Count int64 `db:"count"` + } + + tests := []sinkerTestCase{ + { + "min - initial upsert sets the value", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "min - keeps lower value in same block", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("50")), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 50}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "min - updates to lower value in same block", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("100")), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("50")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 50}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "min - keeps lower value across blocks", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("50")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 50}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "min - updates to lower value across blocks", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("100")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("50")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 50}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "min - insert then update with min", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", "100"), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + updateRowSinglePK("counters", "counter1", "count", deltaMin("50")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 50}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "min - insert then update with higher min keeps original", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", "100"), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + updateRowSinglePK("counters", "counter1", "count", deltaMin("150")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "min - insert with min sets initial value", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", deltaMin("100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "min - multiple updates with min in same block", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", "100"), + updateRowSinglePK("counters", "counter1", "count", deltaMin("120")), + updateRowSinglePK("counters", "counter1", "count", deltaMin("50")), + updateRowSinglePK("counters", "counter1", "count", deltaMin("80")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 50}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "min - set then min in same block", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", "100"), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("50")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 50}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "min - set then min across blocks overwrites", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", "100"), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("50")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + // set(200) resets the value, then min(150) computes min(200,150)=150 + // Since UpdateOp stays SET after set→min, the flush overwrites with 150 + upsertRowSinglePK("counters", "counter1", "count", "200"), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("150")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 150}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "min - handles negative values", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("-50")), + upsertRowSinglePK("counters", "counter1", "count", deltaMin("-100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: -100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runSinkerTest( + t, + sharedDbChangesPostgresContainer, + nil, + nil, + rawSQLInput(func(schema string) string { + return fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS "%s".counters ( + id TEXT PRIMARY KEY, + count BIGINT DEFAULT 0 + ); + `, schema) + }), + test.responses, + test.expected, + test.expectedFinalCursor, + ) + }) + } +} + +func TestSinker_Integration_DeltaUpdate_SetIfNull(t *testing.T) { + type CounterRow struct { + ID string `db:"id"` + Count int64 `db:"count"` + } + + tests := []sinkerTestCase{ + { + "set_if_null - initial upsert sets the value", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "set_if_null - keeps first value in same block", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("100")), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("200")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "set_if_null - keeps first value across blocks", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("100")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("200")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "set_if_null - insert then update with set_if_null keeps original", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", "100"), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + updateRowSinglePK("counters", "counter1", "count", setIfNull("200")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "set_if_null - insert with set_if_null sets initial value", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + insertRowSinglePK("counters", "counter1", "count", setIfNull("100")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "set_if_null - multiple set_if_null in same block keeps first", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("100")), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("200")), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("300")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "set_if_null - set then set_if_null in same block keeps set value", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", "100"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("200")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "set_if_null - set then set_if_null across blocks keeps set value", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", "100"), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("200")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "set_if_null - set after set_if_null in same block overwrites", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + // SET_IF_NULL → SET is a valid transition in the same block + // The SET value overwrites the SET_IF_NULL value + upsertRowSinglePK("counters", "counter1", "count", setIfNull("100")), + upsertRowSinglePK("counters", "counter1", "count", "200"), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 200}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + { + "set_if_null - set overwrites previous set_if_null across blocks", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("100")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", "200"), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 200}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "set_if_null - handles negative values", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("-100")), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("-50")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: -100}, rows[0]) + }, + "Block #10 (10a) - LIB #10 (10a)", + }, + // Mixed set_if_null and set across blocks + { + "set_if_null - set in block1 then set_if_null in block2 keeps block1 value", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", "100"), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("200")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "set_if_null - set_if_null in block1 then set in block2 overwrites", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("100")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", "200"), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 200}, rows[0]) + }, + "Block #11 (11a) - LIB #11 (11a)", + }, + { + "set_if_null - alternating set and set_if_null across three blocks", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("100")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", "200"), + ), + dbChangesBlockData(t, "12a", finalBlock("12a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("300")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + // Block1: set_if_null(100) -> 100 + // Block2: set(200) -> 200 (overwrites) + // Block3: set_if_null(300) -> 200 (keeps existing) + require.Equal(t, &CounterRow{ID: "counter1", Count: 200}, rows[0]) + }, + "Block #12 (12a) - LIB #12 (12a)", + }, + { + "set_if_null - set then set_if_null then set across three blocks", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", "100"), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("200")), + ), + dbChangesBlockData(t, "12a", finalBlock("12a"), + upsertRowSinglePK("counters", "counter1", "count", "300"), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + // Block1: set(100) -> 100 + // Block2: set_if_null(200) -> 100 (keeps existing) + // Block3: set(300) -> 300 (overwrites) + require.Equal(t, &CounterRow{ID: "counter1", Count: 300}, rows[0]) + }, + "Block #12 (12a) - LIB #12 (12a)", + }, + { + "set_if_null - multiple set_if_null across blocks only first wins", + streamMock( + dbChangesBlockData(t, "10a", finalBlock("10a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("100")), + ), + dbChangesBlockData(t, "11a", finalBlock("11a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("200")), + ), + dbChangesBlockData(t, "12a", finalBlock("12a"), + upsertRowSinglePK("counters", "counter1", "count", setIfNull("300")), + ), + ), + func(t *testing.T, dbx *sqlx.DB, schema string) { + rows := readDbChangesRows[CounterRow](t, dbx, schema, "counters") + require.Len(t, rows, 1) + require.Equal(t, &CounterRow{ID: "counter1", Count: 100}, rows[0]) + }, + "Block #12 (12a) - LIB #12 (12a)", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runSinkerTest( + t, + sharedDbChangesPostgresContainer, + nil, + nil, + rawSQLInput(func(schema string) string { + return fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS "%s".counters ( + id TEXT PRIMARY KEY, + count BIGINT DEFAULT 0 + ); + `, schema) + }), + test.responses, + test.expected, + test.expectedFinalCursor, + ) + }) + } +} + type sinkerTestCase struct { name string responses []any @@ -879,7 +1877,7 @@ func runSinkerTest( HistoryTableName: setupOptions.HistoryTableName, ClickhouseCluster: setupOptions.ClickhouseCluster, BatchBlockFlushInterval: 1, - BatchRowFlushInterval: 3, + BatchRowFlushInterval: 5, LiveBlockFlushInterval: 1, OnModuleHashMismatch: setupOptions.OnModuleHashMismatch, HandleReorgs: true, @@ -928,15 +1926,47 @@ func randomSchemaName() string { return "testschema" + string(b) } -func getFields(fieldsAndValues ...string) (out []*pbdatabase.Field) { +// deltaValue wraps a value with an UpdateOp for delta updates +type deltaValue struct { + value string + updateOp pbdatabase.Field_UpdateOp +} + +func deltaAdd(value string) deltaValue { return deltaValue{value, pbdatabase.Field_UPDATE_OP_ADD} } +func deltaSub(value string) deltaValue { + // Negate the value for subtraction (sub is just add with negated value) + if len(value) > 0 && value[0] == '-' { + return deltaValue{value[1:], pbdatabase.Field_UPDATE_OP_ADD} // Remove the minus sign + } + return deltaValue{"-" + value, pbdatabase.Field_UPDATE_OP_ADD} // Add minus sign +} +func deltaMax(value string) deltaValue { return deltaValue{value, pbdatabase.Field_UPDATE_OP_MAX} } +func deltaMin(value string) deltaValue { return deltaValue{value, pbdatabase.Field_UPDATE_OP_MIN} } +func setIfNull(value string) deltaValue { + return deltaValue{value, pbdatabase.Field_UPDATE_OP_SET_IF_NULL} +} + +func getFields(fieldsAndValues ...any) (out []*pbdatabase.Field) { if len(fieldsAndValues)%2 != 0 { - panic("tableChangeSinglePK needs even number of fieldsAndValues") + panic("getFields needs even number of fieldsAndValues") } for i := 0; i < len(fieldsAndValues); i += 2 { - out = append(out, &pbdatabase.Field{ - Name: fieldsAndValues[i], - NewValue: fieldsAndValues[i+1], - }) + name, ok := fieldsAndValues[i].(string) + if !ok { + panic(fmt.Sprintf("field name at index %d must be a string, got %T", i, fieldsAndValues[i])) + } + + field := &pbdatabase.Field{Name: name} + switch v := fieldsAndValues[i+1].(type) { + case string: + field.Value = v + case deltaValue: + field.Value = v.value + field.UpdateOp = v.updateOp + default: + panic(fmt.Sprintf("field value at index %d must be string or deltaValue, got %T", i+1, fieldsAndValues[i+1])) + } + out = append(out, field) } return } @@ -952,7 +1982,7 @@ func compositePK(keyValuePairs ...string) map[string]string { return out } -func insertRowSinglePK(table string, pk string, fieldsAndValues ...string) *pbdatabase.TableChange { +func insertRowSinglePK(table string, pk string, fieldsAndValues ...any) *pbdatabase.TableChange { return &pbdatabase.TableChange{ Table: table, PrimaryKey: &pbdatabase.TableChange_Pk{ @@ -963,7 +1993,7 @@ func insertRowSinglePK(table string, pk string, fieldsAndValues ...string) *pbda } } -func insertRowCompositePK(table string, pk map[string]string, fieldsAndValues ...string) *pbdatabase.TableChange { +func insertRowCompositePK(table string, pk map[string]string, fieldsAndValues ...any) *pbdatabase.TableChange { return &pbdatabase.TableChange{ Table: table, PrimaryKey: &pbdatabase.TableChange_CompositePk{ @@ -976,7 +2006,7 @@ func insertRowCompositePK(table string, pk map[string]string, fieldsAndValues .. } } -func updateRowSinglePK(table string, pk string, fieldsAndValues ...string) *pbdatabase.TableChange { +func updateRowSinglePK(table string, pk string, fieldsAndValues ...any) *pbdatabase.TableChange { return &pbdatabase.TableChange{ Table: table, PrimaryKey: &pbdatabase.TableChange_Pk{ @@ -987,7 +2017,7 @@ func updateRowSinglePK(table string, pk string, fieldsAndValues ...string) *pbda } } -func upsertRowSinglePK(table string, pk string, fieldsAndValues ...string) *pbdatabase.TableChange { +func upsertRowSinglePK(table string, pk string, fieldsAndValues ...any) *pbdatabase.TableChange { return &pbdatabase.TableChange{ Table: table, PrimaryKey: &pbdatabase.TableChange_Pk{ @@ -998,7 +2028,7 @@ func upsertRowSinglePK(table string, pk string, fieldsAndValues ...string) *pbda } } -func upsertRowMultiplePK(table string, pk map[string]string, fieldsAndValues ...string) *pbdatabase.TableChange { +func upsertRowMultiplePK(table string, pk map[string]string, fieldsAndValues ...any) *pbdatabase.TableChange { return &pbdatabase.TableChange{ Table: table, PrimaryKey: &pbdatabase.TableChange_CompositePk{ @@ -1011,7 +2041,7 @@ func upsertRowMultiplePK(table string, pk map[string]string, fieldsAndValues ... } } -func updateRowMultiplePK(table string, pk map[string]string, fieldsAndValues ...string) *pbdatabase.TableChange { +func updateRowMultiplePK(table string, pk map[string]string, fieldsAndValues ...any) *pbdatabase.TableChange { return &pbdatabase.TableChange{ Table: table, PrimaryKey: &pbdatabase.TableChange_CompositePk{