Skip to content

Commit d919134

Browse files
committed
Added schema cache
1 parent 2e33435 commit d919134

File tree

2 files changed

+65
-35
lines changed

2 files changed

+65
-35
lines changed

src/main/kotlin/org/phpinnacle/toblerone/RadixTransform.kt

Lines changed: 55 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.phpinnacle.toblerone
22

3+
import org.apache.kafka.common.cache.LRUCache
4+
import org.apache.kafka.common.cache.SynchronizedCache
35
import org.apache.kafka.common.config.ConfigDef
46
import org.apache.kafka.connect.connector.ConnectRecord
57
import org.apache.kafka.connect.data.Schema
@@ -30,6 +32,8 @@ abstract class RadixTransform<R : ConnectRecord<R>?> : Transformation<R> {
3032
)
3133

3234
private const val PURPOSE = "base-convert-transform"
35+
36+
private val cache = SynchronizedCache(LRUCache<Schema, Schema>(16))
3337
}
3438

3539
private lateinit var fields: Map<String, Int>
@@ -76,30 +80,20 @@ abstract class RadixTransform<R : ConnectRecord<R>?> : Transformation<R> {
7680
val value = Requirements.requireStruct(operatingValue(record), PURPOSE)
7781
val schema = operatingSchema(record) ?: return record
7882

79-
val outputValues = mutableMapOf<String, Any>()
80-
val outputSchema = SchemaUtil.copySchemaBasics(schema)
83+
val outputSchema = copySchema(schema)
84+
val outputValues = Struct(outputSchema)
8185

8286
for (field in schema.fields()) {
8387
val name = field.name()
8488

85-
if (name !in fields.keys) {
86-
outputValues[name] = value.get(field)
87-
outputSchema.field(name, field.schema())
88-
89-
continue
89+
if (name in fields.keys) {
90+
outputValues.put(name, convert(name, field.schema(), value))
91+
} else {
92+
outputValues.put(name, value.get(field))
9093
}
91-
92-
val (newSchema, newValue) = convert(name, field.schema(), value)
93-
94-
outputSchema.field(name, newSchema)
95-
outputValues[name] = newValue
9694
}
9795

98-
val outputStruct = Struct(outputSchema)
99-
100-
outputValues.forEach { outputStruct.put(it.key, it.value) }
101-
102-
return newRecord(record, outputSchema.schema(), outputStruct)
96+
return newRecord(record, outputSchema.schema(), outputValues)
10397
}
10498

10599
private fun convert(key: String, value: Any): Any {
@@ -114,19 +108,55 @@ abstract class RadixTransform<R : ConnectRecord<R>?> : Transformation<R> {
114108
}
115109
}
116110

117-
private fun convert(key: String, schema: Schema, value: Struct): Pair<Schema, Any> {
118-
val radix = fields[key] ?: return Pair(schema, value)
111+
private fun convert(key: String, schema: Schema, value: Struct): Any {
112+
val radix = fields[key] ?: return value
119113

120114
return when (schema.type()) {
121-
Schema.Type.INT8 -> Pair(Schema.STRING_SCHEMA, value.getInt16(key).toString(radix))
122-
Schema.Type.INT16 -> Pair(Schema.STRING_SCHEMA, value.getInt16(key).toString(radix))
123-
Schema.Type.INT32 -> Pair(Schema.STRING_SCHEMA, value.getInt32(key).toString(radix))
124-
Schema.Type.INT64 -> Pair(Schema.STRING_SCHEMA, value.getInt64(key).toString(radix))
125-
Schema.Type.STRING -> Pair(Schema.INT32_SCHEMA, value.getString(key).trim().toInt(radix))
126-
else -> Pair(schema, value)
115+
Schema.Type.INT8 -> value.getInt16(key).toString(radix)
116+
Schema.Type.INT16 -> value.getInt16(key).toString(radix)
117+
Schema.Type.INT32 -> value.getInt32(key).toString(radix)
118+
Schema.Type.INT64 -> value.getInt64(key).toString(radix)
119+
Schema.Type.STRING -> value.getString(key).trim().toInt(radix)
120+
else -> value
121+
}
122+
}
123+
124+
private fun infer(schema: Schema): Schema {
125+
return when (schema.type()) {
126+
Schema.Type.INT8 -> Schema.STRING_SCHEMA
127+
Schema.Type.INT16 -> Schema.STRING_SCHEMA
128+
Schema.Type.INT32 -> Schema.STRING_SCHEMA
129+
Schema.Type.INT64 -> Schema.STRING_SCHEMA
130+
Schema.Type.STRING -> Schema.INT32_SCHEMA
131+
else -> schema
127132
}
128133
}
129134

135+
private fun copySchema(schema: Schema): Schema
136+
{
137+
val cached = cache.get(schema)
138+
139+
if (cached != null) {
140+
return cached
141+
}
142+
143+
val output = SchemaUtil.copySchemaBasics(schema)
144+
145+
for (field in schema.fields()) {
146+
val name = field.name()
147+
148+
if (name in fields.keys) {
149+
output.field(name, infer(field.schema()))
150+
} else {
151+
output.field(name, field.schema())
152+
}
153+
}
154+
155+
cache.put(schema, output)
156+
157+
return output
158+
}
159+
130160
class Key<R : ConnectRecord<R>?> : RadixTransform<R>() {
131161
override fun operatingSchema(record: R?): Schema? = record?.keySchema()
132162

src/test/kotlin/org/phpinnacle/toblerone/RadixTransformTest.kt

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,20 @@ internal class RadixTransformTest {
5757

5858
@Test
5959
fun copyValueSchemaAndConvertFields() {
60-
configure(xformValue, "int32:8,string:36")
60+
configure(xformValue, "id:36")
6161

6262
val schema = SchemaBuilder
6363
.struct()
6464
.name("name")
6565
.version(1)
6666
.doc("doc")
67+
.field("id", Schema.STRING_SCHEMA)
6768
.field("int32", Schema.INT32_SCHEMA)
68-
.field("string", Schema.STRING_SCHEMA)
6969
.build()
7070

7171
val expected = Struct(schema)
72+
.put("id", " PJV ")
7273
.put("int32", 42)
73-
.put("string", " PJV ")
7474

7575
val original = SourceRecord(null, null, "test", 0, schema, expected)
7676
val transformed: SourceRecord = xformValue.apply(original)
@@ -79,26 +79,26 @@ internal class RadixTransformTest {
7979
assertEquals(schema.version(), transformed.valueSchema().version())
8080
assertEquals(schema.doc(), transformed.valueSchema().doc())
8181

82-
assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema().field("int32").schema())
83-
assertEquals("52", (transformed.value() as Struct).getString("int32"))
82+
assertEquals(Schema.INT32_SCHEMA, transformed.valueSchema().field("id").schema())
83+
assertEquals(33115, (transformed.value() as Struct).getInt32("id"))
8484

85-
assertEquals(Schema.INT32_SCHEMA, transformed.valueSchema().field("string").schema())
86-
assertEquals(33115, (transformed.value() as Struct).getInt32("string"))
85+
assertEquals(Schema.INT32_SCHEMA, transformed.valueSchema().field("int32").schema())
86+
assertEquals(42, (transformed.value() as Struct).getInt32("int32"))
8787
}
8888

8989
@Test
9090
fun schemalessValueConvertField() {
91-
configure(xformValue, "string:36")
91+
configure(xformValue, "id:36")
9292
val original = mapOf(
93+
"id" to " PJV ",
9394
"int32" to 42,
94-
"string" to "PJV"
9595
)
9696

9797
val record = SourceRecord(null, null, "test", 0, null, original)
9898
val transformed = xformValue.apply(record).value() as Map<*, *>
9999

100+
assertEquals(33115, transformed["id"])
100101
assertEquals(42, transformed["int32"])
101-
assertEquals(33115, transformed["string"])
102102
}
103103

104104
@Test

0 commit comments

Comments
 (0)