Skip to content

Commit

Permalink
deprecate old record differ
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Sep 5, 2024
1 parent ea2e52d commit 48ca85d
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test
package io.airbyte.cdk.test.util

import java.time.Instant
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test

class MaybeRecordDifferTest {
class RecordDifferTest {
@Test
fun testBasicBehavior() {
val differ =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.airbyte.cdk.test.util

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import java.time.Instant
import java.util.UUID

/** A record that we expect to exist in the destination, whether raw _or_ final. */
data class OutputRecord(
val rawId: UUID?,
val extractedAt: Instant,
val loadedAt: Instant?,
val generationId: Long?,
/**
* strongly-typed map, e.g. ZonedDateTime for timestamp_with_timezone. this makes destination
* test implementations easier. values can be null, b/c warehouse destinations with a JSON
* column type can be either SQL null, or JSON null, and we want to distinguish between those.
* Destinations _must_ filter out the airbyte_* fields from this map.
*/
val data: Map<String, Any?>,
val airbyteMeta: JsonNode?,
) {
/** Utility constructor with easier types to write by hand */
constructor(
rawId: String,
extractedAt: Long,
loadedAt: Long?,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
) : this(
UUID.fromString(rawId),
Instant.ofEpochMilli(extractedAt),
loadedAt?.let { Instant.ofEpochMilli(it) },
generationId,
data,
airbyteMeta?.let { ObjectMapper().readTree(it) },
)

/**
* Utility constructor for "expected records". [rawId] and [loadedAt] are generated by the
* destination at runtime, so we don't have those when writing the test. Just generate arbitrary
* values for them.
*/
constructor(
extractedAt: Long,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
) : this(
null,
Instant.ofEpochMilli(extractedAt),
loadedAt = null,
generationId,
data,
airbyteMeta?.let { ObjectMapper().readTree(it) },
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,10 @@
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.test
package io.airbyte.cdk.test.util

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.cdk.test.RecordDiffer.MaybeRecordDiff
import java.time.Instant
import java.util.UUID
import kotlin.reflect.jvm.jvmName

/** A record that we expect to exist in the destination, whether raw _or_ final. */
data class OutputRecord(
val rawId: UUID?,
val extractedAt: Instant,
val loadedAt: Instant?,
val generationId: Long?,
/**
* strongly-typed map, e.g. ZonedDateTime for timestamp_with_timezone. this makes destination
* test implementations easier. values can be null, b/c warehouse destinations with a JSON
* column type can be either SQL null, or JSON null, and we want to distinguish between those.
* Destinations _must_ filter out the airbyte_* fields from this map.
*/
val data: Map<String, Any?>,
val airbyteMeta: JsonNode?,
) {
/** Utility constructor with easier types to write by hand */
constructor(
rawId: String,
extractedAt: Long,
loadedAt: Long?,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
) : this(
UUID.fromString(rawId),
Instant.ofEpochMilli(extractedAt),
loadedAt?.let { Instant.ofEpochMilli(it) },
generationId,
data,
airbyteMeta?.let { ObjectMapper().readTree(it) },
)

/**
* Utility constructor for "expected records". [rawId] and [loadedAt] are generated by the
* destination at runtime, so we don't have those when writing the test. Just generate arbitrary
* values for them.
*/
constructor(
extractedAt: Long,
generationId: Long?,
data: Map<String, Any?>,
airbyteMeta: String?,
) : this(
null,
Instant.ofEpochMilli(extractedAt),
loadedAt = null,
generationId,
data,
airbyteMeta?.let { ObjectMapper().readTree(it) },
)
}

class RecordDiffer(
/**
* A function to extract primary key fields from a record. Most streams will have some `id`
Expand All @@ -80,7 +23,7 @@ class RecordDiffer(
* This class implicitly also sorts records by extracted_at; this comparator does _not_ need to
* do that sorting.
*
* See [MaybeRecordDiff.generateRecordIdentifier] for why this is nullable.
* See [MatchingRecords.generateRecordIdentifier] for why this is nullable.
*/
val extractCursor: ((OutputRecord) -> Any?)? = null,
) {
Expand Down Expand Up @@ -127,7 +70,7 @@ class RecordDiffer(
// Match up all the records between the expected and actual records,
// or if there's no matching record then detect that also.
// We'll filter this list down to actual differing records later on.
val diffs = mutableListOf<MaybeRecordDiff>()
val matches = mutableListOf<MatchingRecords>()
var expectedRecordIndex = 0
var actualRecordIndex = 0
while (
Expand All @@ -141,40 +84,40 @@ class RecordDiffer(
.compare(expectedRecord, actualRecord)
if (compare == 0) {
// These records are the same underlying record
diffs.add(MaybeRecordDiff(expectedRecord, actualRecord))
matches.add(MatchingRecords(expectedRecord, actualRecord))
expectedRecordIndex++
actualRecordIndex++
} else if (compare < 0) {
// There's an extra expected record
diffs.add(MaybeRecordDiff(expectedRecord, actualRecord = null))
matches.add(MatchingRecords(expectedRecord, actualRecord = null))
expectedRecordIndex++
} else {
// There's an extra actual record
diffs.add(MaybeRecordDiff(expectedRecord = null, actualRecord))
matches.add(MatchingRecords(expectedRecord = null, actualRecord))
actualRecordIndex++
}
}

// Tail loops in case we reached the end of one list before the other.
while (expectedRecordIndex < expectedRecords.size) {
diffs.add(MaybeRecordDiff(expectedRecords[expectedRecordIndex], actualRecord = null))
matches.add(MatchingRecords(expectedRecords[expectedRecordIndex], actualRecord = null))
expectedRecordIndex++
}
while (actualRecordIndex < actualRecords.size) {
diffs.add(MaybeRecordDiff(expectedRecord = null, actualRecords[actualRecordIndex]))
matches.add(MatchingRecords(expectedRecord = null, actualRecords[actualRecordIndex]))
actualRecordIndex++
}

// We've paired up all the records, now find just the ones that are wrong.
val actualDiffs = diffs.filter { it.isMismatch() }
return if (actualDiffs.isEmpty()) {
val diffs = matches.filter { it.isMismatch() }
return if (diffs.isEmpty()) {
null
} else {
actualDiffs.joinToString("\n") { it.prettyPrintMismatch() }
diffs.joinToString("\n") { it.prettyPrintMismatch() }
}
}

private inner class MaybeRecordDiff(
private inner class MatchingRecords(
val expectedRecord: OutputRecord?,
val actualRecord: OutputRecord?,
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private val LOGGER = KotlinLogging.logger {}
*/
@Execution(ExecutionMode.CONCURRENT)
abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestinationState> {
protected var DIFFER: RecordDiffer = mock()
protected var DIFFER: LegacyRecordDiffer = mock()

/** Subclasses may use these four StreamConfigs in their tests. */
protected var incrementalDedupStream: StreamConfig = mock()
Expand Down Expand Up @@ -200,7 +200,7 @@ abstract class BaseSqlGeneratorIntegrationTest<DestinationState : MinimumDestina
AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE

DIFFER =
RecordDiffer(
LegacyRecordDiffer(
rawMetadataColumnNames,
finalMetadataColumnNames,
id1 to AirbyteProtocolType.INTEGER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private val LOGGER = KotlinLogging.logger {}
// execution.
@Execution(ExecutionMode.CONCURRENT)
abstract class BaseTypingDedupingTest {
protected var DIFFER: RecordDiffer? = null
protected var DIFFER: LegacyRecordDiffer? = null

private var randomSuffix: String? = null
protected var config: JsonNode? = null
Expand Down Expand Up @@ -205,7 +205,7 @@ abstract class BaseTypingDedupingTest {

val generator = sqlGenerator
DIFFER =
RecordDiffer(
LegacyRecordDiffer(
rawMetadataColumnNames,
finalMetadataColumnNames,
generator.buildColumnId("id1") to AirbyteProtocolType.INTEGER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ import org.junit.jupiter.api.function.Executable
/**
* Utility class to generate human-readable diffs between expected and actual records. Assumes 1s1t
* output format.
*
* Prefer [io.airbyte.cdk.test.RecordDiffer], which operates on strongly-typed objects instead of
* JsonNodes. This class is effectively deprecated; we're just keeping it around so that
* [BaseTypingDedupingTest] and [BaseSqlGeneratorIntegrationTest] continue to function. Once
* those classes are using the new RecordDiffer, we should remove this class.
*/
class RecordDiffer
class LegacyRecordDiffer
@SafeVarargs
constructor(
private val rawRecordColumnNames: Map<String, String>,
Expand Down

0 comments on commit 48ca85d

Please sign in to comment.