From 6a30736917bdc98f17470597790d9a6771a440a5 Mon Sep 17 00:00:00 2001 From: Steven Baldwin Date: Mon, 15 Jan 2024 11:46:39 -0800 Subject: [PATCH] Propagate record timestamps --- .../java/rockset/RocksetRequestWrapper.java | 10 +++ src/main/java/rockset/RocksetSinkTask.java | 2 +- .../java/rockset/models/KafkaMessage.java | 79 +++++++++++++------ 3 files changed, 66 insertions(+), 25 deletions(-) diff --git a/src/main/java/rockset/RocksetRequestWrapper.java b/src/main/java/rockset/RocksetRequestWrapper.java index 2f98c55..5e8233c 100644 --- a/src/main/java/rockset/RocksetRequestWrapper.java +++ b/src/main/java/rockset/RocksetRequestWrapper.java @@ -14,6 +14,7 @@ import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; @@ -68,6 +69,15 @@ public void addDoc( .key(key) .offset(record.kafkaOffset()) .partition(record.kafkaPartition()); + + if(record.timestamp() != null){ + if (record.timestampType() == TimestampType.CREATE_TIME){ + message.createTime(record.timestamp()); + } else if (record.timestampType() == TimestampType.LOG_APPEND_TIME){ + message.logAppendTime(record.timestamp()); + } + } + messages.add(message); } catch (Exception e) { throw new ConnectException("Invalid JSON encountered in stream ", e); diff --git a/src/main/java/rockset/RocksetSinkTask.java b/src/main/java/rockset/RocksetSinkTask.java index 7fe51c6..85ca2ce 100644 --- a/src/main/java/rockset/RocksetSinkTask.java +++ b/src/main/java/rockset/RocksetSinkTask.java @@ -106,7 +106,7 @@ public void put(Collection records) { e -> executorService.submit( () -> - requestWrapper.addDoc( + requestWrapper.addDoc( e.getKey().topic(), e.getValue(), recordParser, diff --git a/src/main/java/rockset/models/KafkaMessage.java b/src/main/java/rockset/models/KafkaMessage.java index fceed73..1e34491 100644 --- a/src/main/java/rockset/models/KafkaMessage.java +++ b/src/main/java/rockset/models/KafkaMessage.java @@ -1,9 +1,9 @@ package rockset.models; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Objects; import com.google.gson.annotations.SerializedName; import io.swagger.annotations.ApiModelProperty; -import java.util.Objects; public class KafkaMessage { @@ -23,6 +23,12 @@ public class KafkaMessage { @SerializedName("key") public Object key; + @SerializedName("create_time") + public Long createTime; + + @SerializedName("log_append_time") + public Long logAppendTime; + /* * Getters */ @@ -51,6 +57,19 @@ public Object getKey() { return this.key; } + @JsonProperty("create_time") + @ApiModelProperty(required = false, value = "Create time of the message") + public Long getCreateTime() { + return this.createTime; + } + + @JsonProperty("log_append_time") + @ApiModelProperty(required = false, value = "Log append time of the message") + public Long getLogAppendTime() { + return this.logAppendTime; + } + + /* * Setters */ @@ -71,6 +90,16 @@ public void setKey(Object key) { this.key = key; } + + public void setLogAppendTime(Long timestamp) { + this.logAppendTime = timestamp; + } + + public void setCreateTime(Long timestamp) { + this.createTime = timestamp; + } + + /* * Builders */ @@ -95,6 +124,16 @@ public KafkaMessage key(Object key) { return this; } + public KafkaMessage logAppendTime(Long timestamp) { + this.logAppendTime = timestamp; + return this; + } + + public KafkaMessage createTime(Long timestamp) { + this.createTime = timestamp; + return this; + } + /* * Utilities */ @@ -107,35 +146,27 @@ private String toIndentedString(java.lang.Object o) { } @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("class KafkaMessage {\n"); - - sb.append(" partition: ").append(this.toIndentedString(this.partition)).append("\n"); - sb.append(" offset: ").append(this.toIndentedString(this.offset)).append("\n"); - sb.append(" document: ").append(this.toIndentedString(this.document)).append("\n"); - sb.append(" key: ").append(this.toIndentedString(this.key)).append("\n"); - sb.append("}"); - return sb.toString(); + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + KafkaMessage that = (KafkaMessage) o; + return partition == that.partition && offset == that.offset && Objects.equal(document, that.document) && Objects.equal(key, that.key) && Objects.equal(createTime, that.createTime) && Objects.equal(logAppendTime, that.logAppendTime); } @Override public int hashCode() { - return Objects.hash(this.document, this.partition, this.offset, this.key); + return Objects.hashCode(document, partition, offset, key, createTime, logAppendTime); } @Override - public boolean equals(java.lang.Object o) { - if (this == o) { - return true; - } - if (o == null || this.getClass() != o.getClass()) { - return false; - } - final KafkaMessage kafkaMessage = (KafkaMessage) o; - return this.getPartition() == kafkaMessage.getPartition() - && this.getOffset() == kafkaMessage.getOffset() - && Objects.equals(this.document, kafkaMessage.document) - && Objects.equals(this.key, kafkaMessage.key); + public String toString() { + return "KafkaMessage{" + + "document=" + document + + ", partition=" + partition + + ", offset=" + offset + + ", key=" + key + + ", createTime=" + createTime + + ", logAppendTime=" + logAppendTime + + '}'; } }