Skip to content

Commit 8b52fa6

Browse files
committed
feat: add processing time on gcs partition
1 parent de5d3bb commit 8b52fa6

File tree

5 files changed

+65
-11
lines changed

5 files changed

+65
-11
lines changed

src/main/java/com/gotocompany/firehose/config/BlobSinkConfig.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ public interface BlobSinkConfig extends AppConfig {
4949
@Key("SINK_BLOB_FILE_PARTITION_PROTO_TIMESTAMP_FIELD_NAME")
5050
String getFilePartitionProtoTimestampFieldName();
5151

52+
@Key("SINK_BLOB_FILE_PARTITION_PROCESSING_TIME_ENABLED")
53+
@DefaultValue("false")
54+
boolean getFilePartitionProcessingTimeEnabled();
55+
5256
@Key("SINK_BLOB_FILE_PARTITION_TIME_GRANULARITY_TYPE")
5357
@DefaultValue("day")
5458
@ConverterClass(BlobSinkFilePartitionTypeConverter.class)

src/main/java/com/gotocompany/firehose/sink/blob/message/Record.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
import com.google.protobuf.Descriptors;
44
import com.google.protobuf.DynamicMessage;
5+
import com.gotocompany.firehose.config.BlobSinkConfig;
56
import com.gotocompany.firehose.sink.blob.proto.KafkaMetadataProtoMessage;
67
import lombok.AllArgsConstructor;
78
import lombok.Data;
89

910
import java.time.Instant;
11+
import java.time.LocalDateTime;
12+
import java.time.ZoneId;
1013

1114
@AllArgsConstructor
1215
@Data
@@ -34,4 +37,14 @@ public Instant getTimestamp(String fieldName) {
3437
int nanos = (int) timestamp.getField(timestamp.getDescriptorForType().findFieldByName("nanos"));
3538
return Instant.ofEpochSecond(seconds, nanos);
3639
}
40+
41+
public LocalDateTime getLocalDateTime(BlobSinkConfig config) {
42+
if (config.getFilePartitionProcessingTimeEnabled()) {
43+
return LocalDateTime.now();
44+
} else {
45+
return LocalDateTime.ofInstant(
46+
getTimestamp(config.getFilePartitionProtoTimestampFieldName()),
47+
ZoneId.of(config.getFilePartitionProtoTimestampTimezone()));
48+
}
49+
}
3750
}

src/main/java/com/gotocompany/firehose/sink/blob/writer/local/path/TimePartitionedPathUtils.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,7 @@
77

88
import java.nio.file.Path;
99
import java.nio.file.Paths;
10-
import java.time.Instant;
11-
import java.time.LocalDate;
1210
import java.time.LocalDateTime;
13-
import java.time.LocalTime;
14-
import java.time.ZoneId;
1511
import java.time.format.DateTimeFormatter;
1612

1713
/**
@@ -25,22 +21,21 @@ public class TimePartitionedPathUtils {
2521

2622
public static Path getTimePartitionedPath(Record record, BlobSinkConfig sinkConfig) {
2723
String topic = record.getTopic(sinkConfig.getOutputKafkaMetadataColumnName());
28-
Instant timestamp = record.getTimestamp(sinkConfig.getFilePartitionProtoTimestampFieldName());
24+
Path path = Paths.get(topic);
2925
if (sinkConfig.getFilePartitionTimeGranularityType() == Constants.FilePartitionType.NONE) {
30-
return Paths.get(topic);
26+
return path;
3127
}
32-
LocalDate localDate = LocalDateTime.ofInstant(timestamp, ZoneId.of(sinkConfig.getFilePartitionProtoTimestampTimezone())).toLocalDate();
33-
String datePart = DATE_FORMATTER.format(localDate);
34-
LocalTime localTime = LocalDateTime.ofInstant(timestamp, ZoneId.of(sinkConfig.getFilePartitionProtoTimestampTimezone())).toLocalTime();
35-
String hourPart = HOUR_FORMATTER.format(localTime);
28+
LocalDateTime dateTime = record.getLocalDateTime(sinkConfig);
29+
String datePart = DATE_FORMATTER.format(dateTime.toLocalDate());
30+
String hourPart = HOUR_FORMATTER.format(dateTime.toLocalTime());
3631

3732
String dateSegment = String.format("%s%s", sinkConfig.getFilePartitionTimeDatePrefix(), datePart);
3833
String hourSegment = String.format("%s%s", sinkConfig.getFilePartitionTimeHourPrefix(), hourPart);
3934

4035
String dateTimePartition;
4136
switch (sinkConfig.getFilePartitionTimeGranularityType()) {
4237
case NONE:
43-
return Paths.get(topic);
38+
return path;
4439
case DAY:
4540
dateTimePartition = String.format("%s", dateSegment);
4641
break;

src/test/java/com/gotocompany/firehose/sink/blob/message/RecordTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package com.gotocompany.firehose.sink.blob.message;
22

33
import com.google.protobuf.DynamicMessage;
4+
import com.gotocompany.firehose.config.BlobSinkConfig;
45
import com.gotocompany.firehose.sink.blob.TestUtils;
56
import org.junit.Assert;
67
import org.junit.Test;
8+
import org.mockito.Mockito;
79

810
import java.time.Instant;
11+
import java.time.LocalDateTime;
12+
import java.time.ZoneId;
913

1014
public class RecordTest {
1115

@@ -39,4 +43,33 @@ public void shouldGetTimeStampFromMessage() {
3943
Record record = new Record(message, metadata);
4044
Assert.assertEquals(defaultTimestamp, record.getTimestamp("created_time"));
4145
}
46+
47+
@Test
48+
public void shouldGetDateTimeLocally() throws InterruptedException {
49+
BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class);
50+
Mockito.when(config.getFilePartitionProcessingTimeEnabled()).thenReturn(true);
51+
DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber);
52+
DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic);
53+
Record record = new Record(message, metadata);
54+
LocalDateTime before = LocalDateTime.now();
55+
Thread.sleep(1000);
56+
LocalDateTime localDateTime = record.getLocalDateTime(config);
57+
Thread.sleep(1000);
58+
LocalDateTime after = LocalDateTime.now();
59+
Assert.assertTrue(localDateTime.isAfter(before));
60+
Assert.assertTrue(localDateTime.isBefore(after));
61+
}
62+
63+
@Test
64+
public void shouldGetDateTimeFromMessage() throws InterruptedException {
65+
BlobSinkConfig config = Mockito.mock(BlobSinkConfig.class);
66+
Mockito.when(config.getFilePartitionProcessingTimeEnabled()).thenReturn(false);
67+
Mockito.when(config.getFilePartitionProtoTimestampFieldName()).thenReturn("created_time");
68+
Mockito.when(config.getFilePartitionProtoTimestampTimezone()).thenReturn("UTC");
69+
DynamicMessage message = TestUtils.createMessage(defaultTimestamp, defaultOrderNumber);
70+
DynamicMessage metadata = TestUtils.createMetadata("nested_field", defaultTimestamp, defaultOffset, defaultPartition, defaultTopic);
71+
Record record = new Record(message, metadata);
72+
LocalDateTime localDateTime = record.getLocalDateTime(config);
73+
Assert.assertEquals(LocalDateTime.ofInstant(defaultTimestamp, ZoneId.of("UTC")), localDateTime);
74+
}
4275
}

src/test/java/com/gotocompany/firehose/sink/blob/writer/WriterOrchestratorTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
import java.io.IOException;
2626
import java.time.Instant;
27+
import java.time.LocalDateTime;
28+
import java.time.ZoneId;
2729
import java.util.HashSet;
2830
import java.util.Set;
2931

@@ -58,6 +60,7 @@ public void setUp() {
5860
MockitoAnnotations.initMocks(this);
5961
this.sinkConfig = Mockito.mock(BlobSinkConfig.class);
6062
Mockito.when(sinkConfig.getFilePartitionProtoTimestampTimezone()).thenReturn(zone);
63+
Mockito.when(sinkConfig.getFilePartitionProcessingTimeEnabled()).thenReturn(false);
6164
Mockito.when(sinkConfig.getOutputKafkaMetadataColumnName()).thenReturn("");
6265
Mockito.when(sinkConfig.getFilePartitionProtoTimestampFieldName()).thenReturn(timeStampFieldName);
6366
Mockito.when(sinkConfig.getFilePartitionTimeGranularityType()).thenReturn(Constants.FilePartitionType.HOUR);
@@ -68,6 +71,7 @@ public void setUp() {
6871
@Test
6972
public void shouldCreateLocalFileWriter() throws Exception {
7073
Record record = Mockito.mock(Record.class);
74+
Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now());
7175
Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L));
7276
Mockito.when(record.getTopic("")).thenReturn(defaultTopic);
7377
Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test");
@@ -82,13 +86,15 @@ public void shouldCreateLocalFileWriter() throws Exception {
8286
@Test
8387
public void shouldCreateMultipleWriterBasedOnPartition() throws Exception {
8488
Record record1 = Mockito.mock(Record.class);
89+
Mockito.when(record1.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(3600000L), ZoneId.of(zone)));
8590
Mockito.when(record1.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L));
8691
Mockito.when(record1.getTopic("")).thenReturn(defaultTopic);
8792
Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record1, sinkConfig))).thenReturn(localFileWriter1);
8893
Mockito.when(localFileWriter1.write(record1)).thenReturn(true);
8994
Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test1");
9095

9196
Record record2 = Mockito.mock(Record.class);
97+
Mockito.when(record2.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(7200000L), ZoneId.of(zone)));
9298
Mockito.when(record2.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(7200000L));
9399
Mockito.when(record2.getTopic("")).thenReturn(defaultTopic);
94100
Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record2, sinkConfig))).thenReturn(localFileWriter2);
@@ -106,6 +112,7 @@ public void shouldCreateMultipleWriterBasedOnPartition() throws Exception {
106112
@Test(expected = IOException.class)
107113
public void shouldThrowIOExceptionWhenWriteThrowsException() throws Exception {
108114
Record record = Mockito.mock(Record.class);
115+
Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now());
109116
Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L));
110117
Mockito.when(record.getTopic("")).thenReturn(defaultTopic);
111118
Mockito.when(localFileWriter1.getMetadata()).thenReturn(new LocalFileMetadata("/tmp/", "/tmp/test1", 0, 0, 0));
@@ -120,6 +127,7 @@ public void shouldThrowIOExceptionWhenWriteThrowsException() throws Exception {
120127
public void shouldThrowIOExceptionWhenOpenNewWriterFailed() throws Exception {
121128
expectedException.expect(LocalFileWriterFailedException.class);
122129
Record record = Mockito.mock(Record.class);
130+
Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.now());
123131
Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(3600000L));
124132
Mockito.when(record.getTopic("")).thenReturn(defaultTopic);
125133
Mockito.when(localFileWriter1.getMetadata()).thenReturn(new LocalFileMetadata("/tmp/", "/tmp/test1", 0, 0, 0));
@@ -133,6 +141,7 @@ public void shouldThrowIOExceptionWhenOpenNewWriterFailed() throws Exception {
133141
public void shouldGetEmptyFlushedPath() throws Exception {
134142
Record record = Mockito.mock(Record.class);
135143
Mockito.when(record.getTimestamp(timeStampFieldName)).thenReturn(Instant.ofEpochMilli(1L));
144+
Mockito.when(record.getLocalDateTime(sinkConfig)).thenReturn(LocalDateTime.ofInstant(Instant.ofEpochMilli(1L), ZoneId.of(zone)));
136145
Mockito.when(record.getTopic("")).thenReturn(defaultTopic);
137146
Mockito.when(localFileWriter1.getFullPath()).thenReturn("/tmp/test");
138147
Mockito.when(localStorage.createLocalFileWriter(TimePartitionedPathUtils.getTimePartitionedPath(record, sinkConfig))).thenReturn(localFileWriter1);

0 commit comments

Comments
 (0)