diff --git a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/SourceRecordConverter.java b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/SourceRecordConverter.java index 1fe86b71b..1acb412a5 100644 --- a/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/SourceRecordConverter.java +++ b/java-connectors/kafka-connect-gcp-pubsub/src/main/java/io/lenses/streamreactor/connect/gcp/pubsub/source/mapping/SourceRecordConverter.java @@ -17,6 +17,9 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; import io.lenses.streamreactor.common.config.base.intf.Converter; @@ -34,14 +37,22 @@ public class SourceRecordConverter extends Converter { + headers.add(k, v, Schema.STRING_SCHEMA); + }); return new SourceRecord( source.getSourcePartition().toMap(), source.getSourceOffset().toMap(), source.getTargetTopicName(), + null, getKeySchema(), getKey(source), getValueSchema(), - getValue(source) + getValue(source), + System.currentTimeMillis(), + headers ); }