diff --git a/build.gradle b/build.gradle
index 8b599437..a221e925 100644
--- a/build.gradle
+++ b/build.gradle
@@ -84,6 +84,8 @@ test {
}
dependencies {
+ compileOnly group: 'com.google.code.findbugs', name: 'annotations', version: findbugsVersion
+
compile(group: 'io.pravega', name: 'pravega-client', version: pravegaVersion) {
exclude group: 'org.slf4j', module: 'slf4j-api'
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index cf758c1c..a6ee408c 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -16,6 +16,7 @@
+
diff --git a/gradle.properties b/gradle.properties
index 2cb22cb9..7e924330 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -12,6 +12,7 @@ shadowGradlePlugin=2.0.3
slf4jApiVersion=1.7.25
junitVersion=4.12
mockitoVersion=1.10.19
+findbugsVersion=3.0.1
# Version and base tags can be overridden at build time.
connectorVersion=0.3.0-SNAPSHOT
diff --git a/pravega b/pravega
index b8e1e2bb..2347210d 160000
--- a/pravega
+++ b/pravega
@@ -1 +1 @@
-Subproject commit b8e1e2bbc63ddbd010b7faad1d2e3017cad16c82
+Subproject commit 2347210d1f7263b4428dc99594f71e81aaf17a7c
diff --git a/src/main/java/io/pravega/connectors/flink/AbstractReaderBuilder.java b/src/main/java/io/pravega/connectors/flink/AbstractReaderBuilder.java
new file mode 100644
index 00000000..72b9cacb
--- /dev/null
+++ b/src/main/java/io/pravega/connectors/flink/AbstractReaderBuilder.java
@@ -0,0 +1,169 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.connectors.flink;
+
+import io.pravega.client.stream.Stream;
+import io.pravega.client.stream.StreamCut;
+import io.pravega.connectors.flink.util.StreamWithBoundaries;
+import lombok.Data;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A base builder for connectors that consume a Pravega stream.
+ *
+ * @param the builder class.
+ */
+public abstract class AbstractReaderBuilder implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final List streams;
+
+ private PravegaConfig pravegaConfig;
+
+ protected AbstractReaderBuilder() {
+ this.streams = new ArrayList<>(1);
+ this.pravegaConfig = PravegaConfig.fromDefaults();
+ }
+
+ /**
+ * Set the Pravega client configuration, which includes connection info, security info, and a default scope.
+ *
+ * The default client configuration is obtained from {@code PravegaConfig.fromDefaults()}.
+ *
+ * @param pravegaConfig the configuration to use.
+ */
+ public B withPravegaConfig(PravegaConfig pravegaConfig) {
+ this.pravegaConfig = pravegaConfig;
+ return builder();
+ }
+
+ /**
+ * Add a stream to be read by the source, from the given start position in the stream.
+ *
+ * @param streamSpec the unqualified or qualified name of the stream.
+ * @param startStreamCut Start {@link StreamCut}
+ * @return A builder to configure and create a reader.
+ */
+ public B forStream(final String streamSpec, final StreamCut startStreamCut) {
+ return forStream(streamSpec, startStreamCut, StreamCut.UNBOUNDED);
+ }
+
+ /**
+ * Add a stream to be read by the source, from the given start position in the stream.
+ *
+ * @param streamSpec the unqualified or qualified name of the stream.
+ * @param startStreamCut Start {@link StreamCut}
+ * @param endStreamCut End {@link StreamCut}
+ * @return A builder to configure and create a reader.
+ */
+ public B forStream(final String streamSpec, final StreamCut startStreamCut, final StreamCut endStreamCut) {
+ streams.add(StreamSpec.of(streamSpec, startStreamCut, endStreamCut));
+ return builder();
+ }
+
+ /**
+ * Add a stream to be read by the source, from the earliest available position in the stream.
+ *
+ * @param streamSpec the unqualified or qualified name of the stream.
+ * @return A builder to configure and create a reader.
+ */
+ public B forStream(final String streamSpec) {
+ return forStream(streamSpec, StreamCut.UNBOUNDED);
+ }
+
+ /**
+ * Add a stream to be read by the source, from the given start position in the stream.
+ *
+ * @param stream Stream.
+ * @param startStreamCut Start {@link StreamCut}
+ * @return A builder to configure and create a reader.
+ */
+ public B forStream(final Stream stream, final StreamCut startStreamCut) {
+ return forStream(stream, startStreamCut, StreamCut.UNBOUNDED);
+ }
+
+ /**
+ * Add a stream to be read by the source, from the given start position in the stream to the given end position.
+ *
+ * @param stream Stream.
+ * @param startStreamCut Start {@link StreamCut}
+ * @param endStreamCut End {@link StreamCut}
+ * @return A builder to configure and create a reader.
+ */
+ public B forStream(final Stream stream, final StreamCut startStreamCut, final StreamCut endStreamCut) {
+ streams.add(StreamSpec.of(stream, startStreamCut, endStreamCut));
+ return builder();
+ }
+
+ /**
+ * Add a stream to be read by the source, from the earliest available position in the stream.
+ *
+ * @param stream Stream.
+ * @return A builder to configure and create a reader.
+ */
+ public B forStream(final Stream stream) {
+ return forStream(stream, StreamCut.UNBOUNDED);
+ }
+
+ /**
+ * Gets the Pravega configuration.
+ */
+ protected PravegaConfig getPravegaConfig() {
+ Preconditions.checkState(pravegaConfig != null, "A Pravega configuration must be supplied.");
+ return pravegaConfig;
+ }
+
+ /**
+ * Resolves the streams to be provided to the reader, based on the configured default scope.
+ */
+ protected List resolveStreams() {
+ Preconditions.checkState(!streams.isEmpty(), "At least one stream must be supplied.");
+ PravegaConfig pravegaConfig = getPravegaConfig();
+ return streams.stream()
+ .map(s -> StreamWithBoundaries.of(pravegaConfig.resolve(s.streamSpec), s.from, s.to))
+ .collect(Collectors.toList());
+ }
+
+ protected abstract B builder();
+
+ /**
+ * A Pravega stream with optional boundaries based on stream cuts.
+ */
+ @Data
+ private static class StreamSpec implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String streamSpec;
+ private final StreamCut from;
+ private final StreamCut to;
+
+ public static StreamSpec of(String streamSpec, StreamCut from, StreamCut to) {
+ Preconditions.checkNotNull(streamSpec, "streamSpec");
+ Preconditions.checkNotNull(streamSpec, "from");
+ Preconditions.checkNotNull(streamSpec, "to");
+ return new StreamSpec(streamSpec, from, to);
+ }
+
+ public static StreamSpec of(Stream stream, StreamCut from, StreamCut to) {
+ Preconditions.checkNotNull(stream, "stream");
+ Preconditions.checkNotNull(stream, "from");
+ Preconditions.checkNotNull(stream, "to");
+ return new StreamSpec(stream.getScopedName(), from, to);
+ }
+ }
+
+}
diff --git a/src/main/java/io/pravega/connectors/flink/AbstractStreamingReaderBuilder.java b/src/main/java/io/pravega/connectors/flink/AbstractStreamingReaderBuilder.java
new file mode 100644
index 00000000..57b86076
--- /dev/null
+++ b/src/main/java/io/pravega/connectors/flink/AbstractStreamingReaderBuilder.java
@@ -0,0 +1,173 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.connectors.flink;
+
+import com.google.common.base.Preconditions;
+import io.pravega.client.stream.ReaderGroupConfig;
+import io.pravega.connectors.flink.util.FlinkPravegaUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.time.Time;
+
+import java.util.Optional;
+
+/**
+ * An abstract streaming reader builder.
+ *
+ * The builder is abstracted to act as the base for both the {@link FlinkPravegaReader} and {@link FlinkPravegaTableSource} builders.
+ *
+ * @param the element type.
+ * @param the builder type.
+ */
+abstract class AbstractStreamingReaderBuilder extends AbstractReaderBuilder {
+
+ private static final Time DEFAULT_EVENT_READ_TIMEOUT = Time.seconds(1);
+ private static final Time DEFAULT_CHECKPOINT_INITIATE_TIMEOUT = Time.seconds(5);
+
+ protected String uid;
+ protected String readerGroupScope;
+ protected String readerGroupName;
+ protected Time readerGroupRefreshTime;
+ protected Time checkpointInitiateTimeout;
+ protected Time eventReadTimeout;
+
+ protected AbstractStreamingReaderBuilder() {
+ this.checkpointInitiateTimeout = DEFAULT_CHECKPOINT_INITIATE_TIMEOUT;
+ this.eventReadTimeout = DEFAULT_EVENT_READ_TIMEOUT;
+ }
+
+ /**
+ * Configures the source uid to identify the checkpoint state of this source.
+ *
+ * The default value is generated based on other inputs.
+ *
+ * @param uid the uid to use.
+ */
+ public B uid(String uid) {
+ this.uid = uid;
+ return builder();
+ }
+
+ /**
+ * Configures the reader group scope for synchronization purposes.
+ *
+ * The default value is taken from the {@link PravegaConfig} {@code defaultScope} property.
+ *
+ * @param scope the scope name.
+ */
+ public B withReaderGroupScope(String scope) {
+ this.readerGroupScope = Preconditions.checkNotNull(scope);
+ return builder();
+ }
+
+ /**
+ * Configures the reader group name.
+ *
+ * @param readerGroupName the reader group name.
+ */
+ public B withReaderGroupName(String readerGroupName) {
+ this.readerGroupName = Preconditions.checkNotNull(readerGroupName);
+ return builder();
+ }
+
+ /**
+ * Sets the group refresh time, with a default of 1 second.
+ *
+ * @param groupRefreshTime The group refresh time
+ */
+ public B withReaderGroupRefreshTime(Time groupRefreshTime) {
+ this.readerGroupRefreshTime = groupRefreshTime;
+ return builder();
+ }
+
+ /**
+ * Sets the timeout for initiating a checkpoint in Pravega.
+ *
+ * @param checkpointInitiateTimeout The timeout
+ */
+ public B withCheckpointInitiateTimeout(Time checkpointInitiateTimeout) {
+ Preconditions.checkArgument(checkpointInitiateTimeout.getSize() > 0, "timeout must be > 0");
+ this.checkpointInitiateTimeout = checkpointInitiateTimeout;
+ return builder();
+ }
+
+ /**
+ * Sets the timeout for the call to read events from Pravega. After the timeout
+ * expires (without an event being returned), another call will be made.
+ *
+ * @param eventReadTimeout The timeout
+ */
+ public B withEventReadTimeout(Time eventReadTimeout) {
+ Preconditions.checkArgument(eventReadTimeout.getSize() > 0, "timeout must be > 0");
+ this.eventReadTimeout = eventReadTimeout;
+ return builder();
+ }
+
+ protected abstract DeserializationSchema getDeserializationSchema();
+
+ /**
+ * Builds a {@link FlinkPravegaReader} based on the configuration.
+ *
+ * Note that the {@link FlinkPravegaTableSource} supports both the batch and streaming API, and so creates both
+ * a source function and an input format and then uses one or the other.
+ *
+ * Be sure to call {@code initialize()} before returning the reader to user code.
+ *
+ * @throws IllegalStateException if the configuration is invalid.
+ * @return an uninitiailized reader as a source function.
+ */
+ FlinkPravegaReader buildSourceFunction() {
+
+ // rgConfig
+ ReaderGroupConfig.ReaderGroupConfigBuilder rgConfigBuilder = ReaderGroupConfig
+ .builder()
+ .disableAutomaticCheckpoints();
+ if (this.readerGroupRefreshTime != null) {
+ rgConfigBuilder.groupRefreshTimeMillis(this.readerGroupRefreshTime.toMilliseconds());
+ }
+ resolveStreams().forEach(s -> rgConfigBuilder.stream(s.getStream(), s.getFrom(), s.getTo()));
+ final ReaderGroupConfig rgConfig = rgConfigBuilder.build();
+
+ // rgScope
+ final String rgScope = Optional.ofNullable(this.readerGroupScope).orElseGet(() -> {
+ Preconditions.checkState(getPravegaConfig().getDefaultScope() != null, "A reader group scope or default scope must be configured");
+ return getPravegaConfig().getDefaultScope();
+ });
+
+ // rgName
+ final String rgName = Optional.ofNullable(this.readerGroupName).orElseGet(FlinkPravegaUtils::generateRandomReaderGroupName);
+
+ return new FlinkPravegaReader<>(
+ Optional.ofNullable(this.uid).orElseGet(this::generateUid),
+ getPravegaConfig().getClientConfig(),
+ rgConfig,
+ rgScope,
+ rgName,
+ getDeserializationSchema(),
+ this.eventReadTimeout,
+ this.checkpointInitiateTimeout);
+ }
+
+ /**
+ * Generate a UID for the source, to distinguish the state associated with the checkpoint hook. A good generated UID will:
+ * 1. be stable across savepoints for the same inputs
+ * 2. disambiguate one source from another (e.g. in a program that uses numerous instances of {@link FlinkPravegaReader})
+ * 3. allow for reconfiguration of the timeouts
+ */
+ String generateUid() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(readerGroupScope).append('\n');
+ resolveStreams().forEach(s -> sb
+ .append(s.getStream().getScopedName())
+ .append('/').append(s.getFrom().hashCode())
+ .append('/').append(s.getTo().hashCode())
+ .append('\n'));
+ return Integer.toString(sb.toString().hashCode());
+ }
+}
diff --git a/src/main/java/io/pravega/connectors/flink/AbstractStreamingWriterBuilder.java b/src/main/java/io/pravega/connectors/flink/AbstractStreamingWriterBuilder.java
new file mode 100644
index 00000000..ce04f476
--- /dev/null
+++ b/src/main/java/io/pravega/connectors/flink/AbstractStreamingWriterBuilder.java
@@ -0,0 +1,97 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.connectors.flink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * An abstract streaming writer builder.
+ *
+ * The builder is abstracted to act as the base for both the {@link FlinkPravegaWriter} and {@link FlinkPravegaTableSink} builders.
+ *
+ * @param the element type.
+ * @param the builder type.
+ */
+public abstract class AbstractStreamingWriterBuilder extends AbstractWriterBuilder {
+
+ // the numbers below are picked somewhat arbitrarily at this point
+ private static final long DEFAULT_TXN_TIMEOUT_MILLIS = 2 * 60 * 60 * 1000; // 2 hours
+ private static final long DEFAULT_TX_SCALE_GRACE_MILLIS = 10 * 60 * 1000; // 10 minutes
+
+ protected PravegaWriterMode writerMode;
+ protected Time txnTimeout;
+ protected Time txnGracePeriod;
+
+ protected AbstractStreamingWriterBuilder() {
+ writerMode = PravegaWriterMode.ATLEAST_ONCE;
+ txnTimeout = Time.milliseconds(DEFAULT_TXN_TIMEOUT_MILLIS);
+ txnGracePeriod = Time.milliseconds(DEFAULT_TX_SCALE_GRACE_MILLIS);
+ }
+
+ /**
+ * Sets the writer mode to provide at-least-once or exactly-once guarantees.
+ *
+ * @param writerMode The writer mode of {@code BEST_EFFORT}, {@code ATLEAST_ONCE}, or {@code EXACTLY_ONCE}.
+ */
+ public B withWriterMode(PravegaWriterMode writerMode) {
+ this.writerMode = writerMode;
+ return builder();
+ }
+
+ /**
+ * Sets the transaction timeout.
+ *
+ * When the writer mode is set to {@code EXACTLY_ONCE}, transactions are used to persist events to the Pravega stream.
+ * The timeout refers to the maximum amount of time that a transaction may remain uncommitted, after which the
+ * transaction will be aborted. The default timeout is 2 hours.
+ *
+ * @param timeout the timeout
+ */
+ public B withTxnTimeout(Time timeout) {
+ Preconditions.checkArgument(timeout.getSize() > 0, "The timeout must be a positive value.");
+ this.txnTimeout = timeout;
+ return builder();
+ }
+
+ /**
+ * Sets the transaction grace period.
+ *
+ * The grace period is the maximum amount of time for which a transaction may
+ * remain active, after a scale operation has been initiated on the underlying stream.
+ *
+ * @param timeout the timeout
+ */
+ public B withTxnGracePeriod(Time timeout) {
+ Preconditions.checkArgument(timeout.getSize() > 0, "The timeout must be a positive value.");
+ this.txnGracePeriod = timeout;
+ return builder();
+ }
+
+ /**
+ * Creates the sink function for the current builder state.
+ *
+ * @param serializationSchema the deserialization schema to use.
+ * @param eventRouter the event router to use.
+ */
+ FlinkPravegaWriter createSinkFunction(SerializationSchema serializationSchema, PravegaEventRouter eventRouter) {
+ Preconditions.checkNotNull(serializationSchema, "serializationSchema");
+ Preconditions.checkNotNull(eventRouter, "eventRouter");
+ return new FlinkPravegaWriter<>(
+ getPravegaConfig().getClientConfig(),
+ resolveStream(),
+ serializationSchema,
+ eventRouter,
+ writerMode,
+ txnTimeout.toMilliseconds(),
+ txnGracePeriod.toMilliseconds());
+ }
+}
diff --git a/src/main/java/io/pravega/connectors/flink/AbstractWriterBuilder.java b/src/main/java/io/pravega/connectors/flink/AbstractWriterBuilder.java
new file mode 100644
index 00000000..06a1cf02
--- /dev/null
+++ b/src/main/java/io/pravega/connectors/flink/AbstractWriterBuilder.java
@@ -0,0 +1,105 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.connectors.flink;
+
+import io.pravega.client.stream.Stream;
+import lombok.Data;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * A base builder for connectors that emit a Pravega stream.
+ *
+ * @param the builder class.
+ */
+public abstract class AbstractWriterBuilder implements Serializable {
+
+ private PravegaConfig pravegaConfig;
+
+ private StreamSpec stream;
+
+ public AbstractWriterBuilder() {
+ this.pravegaConfig = PravegaConfig.fromDefaults();
+ }
+
+ /**
+ * Set the Pravega client configuration, which includes connection info, security info, and a default scope.
+ *
+ * The default client configuration is obtained from {@code PravegaConfig.fromDefaults()}.
+ *
+ * @param pravegaConfig the configuration to use.
+ */
+ public B withPravegaConfig(PravegaConfig pravegaConfig) {
+ this.pravegaConfig = pravegaConfig;
+ return builder();
+ }
+
+ /**
+ * Add a stream to be written to by the writer.
+ *
+ * @param streamSpec the unqualified or qualified name of the stream.
+ * @return A builder to configure and create a writer.
+ */
+ public B forStream(final String streamSpec) {
+ this.stream = StreamSpec.of(streamSpec);
+ return builder();
+ }
+
+ /**
+ * Add a stream to be written to by the writer.
+ *
+ * @param stream the stream.
+ * @return A builder to configure and create a writer.
+ */
+ public B forStream(final Stream stream) {
+ this.stream = StreamSpec.of(stream);
+ return builder();
+ }
+
+ /**
+ * Gets the Pravega configuration.
+ */
+ protected PravegaConfig getPravegaConfig() {
+ Preconditions.checkState(pravegaConfig != null, "A Pravega configuration must be supplied.");
+ return pravegaConfig;
+ }
+
+ /**
+ * Resolves the stream to be provided to the writer, based on the configured default scope.
+ */
+ protected Stream resolveStream() {
+ Preconditions.checkState(stream != null, "A stream must be supplied.");
+ PravegaConfig pravegaConfig = getPravegaConfig();
+ return pravegaConfig.resolve(stream.streamSpec);
+ }
+
+ protected abstract B builder();
+
+ /**
+ * A Pravega stream.
+ */
+ @Data
+ private static class StreamSpec implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String streamSpec;
+
+ public static StreamSpec of(String streamSpec) {
+ Preconditions.checkNotNull(streamSpec, "streamSpec");
+ return new StreamSpec(streamSpec);
+ }
+
+ public static StreamSpec of(Stream stream) {
+ Preconditions.checkNotNull(stream, "stream");
+ return new StreamSpec(stream.getScopedName());
+ }
+ }
+}
diff --git a/src/main/java/io/pravega/connectors/flink/CheckpointSerializer.java b/src/main/java/io/pravega/connectors/flink/CheckpointSerializer.java
index 028cea3d..910cdd00 100644
--- a/src/main/java/io/pravega/connectors/flink/CheckpointSerializer.java
+++ b/src/main/java/io/pravega/connectors/flink/CheckpointSerializer.java
@@ -10,10 +10,10 @@
package io.pravega.connectors.flink;
import io.pravega.client.stream.Checkpoint;
-import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import java.io.IOException;
+import java.nio.ByteBuffer;
/**
* Simple serializer for {@link Checkpoint} objects.
@@ -23,7 +23,7 @@
*/
class CheckpointSerializer implements SimpleVersionedSerializer {
- private static final int VERSION = 1;
+ private static final int VERSION = 2;
@Override
public int getVersion() {
@@ -32,7 +32,10 @@ public int getVersion() {
@Override
public byte[] serialize(Checkpoint checkpoint) throws IOException {
- return SerializationUtils.serialize(checkpoint);
+ ByteBuffer buf = checkpoint.toBytes();
+ byte[] b = new byte[buf.remaining()];
+ buf.get(b);
+ return b;
}
@Override
@@ -40,7 +43,6 @@ public Checkpoint deserialize(int version, byte[] bytes) throws IOException {
if (version != VERSION) {
throw new IOException("Invalid format version for serialized Pravega Checkpoint: " + version);
}
-
- return (Checkpoint) SerializationUtils.deserialize(bytes);
+ return Checkpoint.fromBytes(ByteBuffer.wrap(bytes));
}
}
diff --git a/src/main/java/io/pravega/connectors/flink/FlinkPravegaInputFormat.java b/src/main/java/io/pravega/connectors/flink/FlinkPravegaInputFormat.java
index e9481aef..4a8e16c2 100644
--- a/src/main/java/io/pravega/connectors/flink/FlinkPravegaInputFormat.java
+++ b/src/main/java/io/pravega/connectors/flink/FlinkPravegaInputFormat.java
@@ -12,16 +12,16 @@
import com.google.common.base.Preconditions;
+import io.pravega.client.ClientConfig;
import io.pravega.client.ClientFactory;
import io.pravega.client.batch.BatchClient;
import io.pravega.client.batch.SegmentIterator;
import io.pravega.client.batch.SegmentRange;
-import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.Serializer;
-import io.pravega.client.stream.Stream;
import io.pravega.connectors.flink.serialization.WrappingSerializer;
import io.pravega.connectors.flink.util.FlinkPravegaUtils;
+import io.pravega.connectors.flink.util.StreamWithBoundaries;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
@@ -32,11 +32,9 @@
import org.apache.flink.core.io.InputSplitAssigner;
import java.io.IOException;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
/**
* A Flink {@link InputFormat} that can be added as a source to read from Pravega in a Flink batch job.
@@ -46,17 +44,19 @@ public class FlinkPravegaInputFormat extends RichInputFormat deserializationSchema;
+ private static final String DEFAULT_CLIENT_SCOPE_NAME = "__NOT_USED";
- // The pravega controller endpoint.
- private final URI controllerURI;
+ // The Pravega client configuration.
+ private final ClientConfig clientConfig;
- // The scope name of the destination stream.
- private final String scopeName;
+ // The scope name to use to construct the Pravega client.
+ private final String clientScope;
// The names of Pravega streams to read.
- private final Set streamNames;
+ private final List streams;
+
+ // The supplied event deserializer.
+ private final DeserializationSchema deserializationSchema;
// The factory used to create Pravega clients; closing this will also close all Pravega connections.
private transient ClientFactory clientFactory;
@@ -70,30 +70,18 @@ public class FlinkPravegaInputFormat extends RichInputFormatThe number of created input splits is equivalent to the parallelism of the source. For each input split,
- * a Pravega reader will be created to read from the specified Pravega streams. Each input split is closed when
- * the next read event returns {@code null} on {@link EventRead#getEvent()}.
- *
- * @param controllerURI The pravega controller endpoint address.
- * @param scope The destination stream's scope name.
- * @param streamNames The list of stream names to read events from.
+ * @param clientConfig The pravega client configuration.
+ * @param streams The list of streams to read events from.
* @param deserializationSchema The implementation to deserialize events from pravega streams.
*/
- public FlinkPravegaInputFormat(
- final URI controllerURI,
- final String scope,
- final Set streamNames,
- final DeserializationSchema deserializationSchema) {
-
- Preconditions.checkNotNull(controllerURI, "controllerURI");
- Preconditions.checkNotNull(scope, "scope");
- Preconditions.checkNotNull(streamNames, "streamNames");
- Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
-
- this.controllerURI = controllerURI;
- this.scopeName = scope;
- this.deserializationSchema = deserializationSchema;
- this.streamNames = streamNames;
+ protected FlinkPravegaInputFormat(
+ ClientConfig clientConfig,
+ List streams,
+ DeserializationSchema deserializationSchema) {
+ this.clientConfig = Preconditions.checkNotNull(clientConfig, "clientConfig");
+ this.clientScope = DEFAULT_CLIENT_SCOPE_NAME;
+ this.streams = Preconditions.checkNotNull(streams, "streams");
+ this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
}
// ------------------------------------------------------------------------
@@ -104,7 +92,7 @@ public FlinkPravegaInputFormat(
public void openInputFormat() throws IOException {
super.openInputFormat();
- this.clientFactory = ClientFactory.withScope(scopeName, controllerURI);
+ this.clientFactory = ClientFactory.withScope(clientScope, clientConfig);
this.batchClient = clientFactory.createBatchClient();
}
@@ -131,20 +119,19 @@ public PravegaInputSplit[] createInputSplits(int minNumSplits) throws IOExceptio
// createInputSplits() is called in the JM, so we have to establish separate
// short-living connections to Pravega here to retrieve the segments list
- try (ClientFactory clientFactory = ClientFactory.withScope(scopeName, controllerURI)) {
+ try (ClientFactory clientFactory = ClientFactory.withScope(clientScope, clientConfig)) {
BatchClient batchClient = clientFactory.createBatchClient();
- for (String stream : streamNames) {
-
+ for (StreamWithBoundaries stream : streams) {
Iterator segmentRangeIterator =
- batchClient.getSegments(Stream.of(scopeName, stream), null, null).getIterator();
+ batchClient.getSegments(stream.getStream(), stream.getFrom(), stream.getTo()).getIterator();
while (segmentRangeIterator.hasNext()) {
- splits.add(new PravegaInputSplit(splits.size(),
- segmentRangeIterator.next()));
+ splits.add(new PravegaInputSplit(splits.size(), segmentRangeIterator.next()));
}
}
}
+ log.info("Prepared {} input splits", splits.size());
return splits.toArray(new PravegaInputSplit[splits.size()]);
}
@@ -183,4 +170,45 @@ public T nextRecord(T t) throws IOException {
public void close() throws IOException {
this.segmentIterator.close();
}
+
+ /**
+ * Gets a builder {@link FlinkPravegaInputFormat} to read Pravega streams using the Flink batch API.
+ * @param the element type.
+ */
+ public static Builder builder() {
+ return new Builder<>();
+ }
+
+ /**
+ * A builder for {@link FlinkPravegaInputFormat} to read Pravega streams using the Flink batch API.
+ *
+ * @param the element type.
+ */
+ public static class Builder extends AbstractReaderBuilder> {
+
+ private DeserializationSchema deserializationSchema;
+
+ protected Builder builder() {
+ return this;
+ }
+
+ /**
+ * Sets the deserialization schema.
+ *
+ * @param deserializationSchema The deserialization schema
+ */
+ public Builder withDeserializationSchema(DeserializationSchema deserializationSchema) {
+ this.deserializationSchema = deserializationSchema;
+ return builder();
+ }
+
+ protected DeserializationSchema getDeserializationSchema() {
+ Preconditions.checkState(deserializationSchema != null, "Deserialization schema must not be null.");
+ return deserializationSchema;
+ }
+
+ public FlinkPravegaInputFormat build() {
+ return new FlinkPravegaInputFormat<>(getPravegaConfig().getClientConfig(), resolveStreams(), getDeserializationSchema());
+ }
+ }
}
diff --git a/src/main/java/io/pravega/connectors/flink/FlinkPravegaJsonTableSink.java b/src/main/java/io/pravega/connectors/flink/FlinkPravegaJsonTableSink.java
new file mode 100644
index 00000000..16a74661
--- /dev/null
+++ b/src/main/java/io/pravega/connectors/flink/FlinkPravegaJsonTableSink.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.connectors.flink;
+
+import io.pravega.connectors.flink.serialization.JsonRowSerializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+
+import java.util.function.Function;
+
+/**
+ * An append-only table sink to emit a streaming table as a Pravega stream containing JSON-formatted events.
+ */
+public class FlinkPravegaJsonTableSink extends FlinkPravegaTableSink {
+ private FlinkPravegaJsonTableSink(Function> writerFactory) {
+ super(writerFactory);
+ }
+
+ @Override
+ protected FlinkPravegaTableSink createCopy() {
+ return new FlinkPravegaJsonTableSink(writerFactory);
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * A builder for {@link FlinkPravegaJsonTableSink}.
+ */
+ public static class Builder extends AbstractTableSinkBuilder {
+
+ protected Builder builder() {
+ return this;
+ }
+
+ @Override
+ protected SerializationSchema getSerializationSchema(String[] fieldNames) {
+ return new JsonRowSerializationSchema(fieldNames);
+ }
+
+ /**
+ * Builds the {@link FlinkPravegaJsonTableSink}.
+ */
+ public FlinkPravegaJsonTableSink build() {
+ return new FlinkPravegaJsonTableSink(this::createSinkFunction);
+ }
+ }
+}
diff --git a/src/main/java/io/pravega/connectors/flink/FlinkPravegaJsonTableSource.java b/src/main/java/io/pravega/connectors/flink/FlinkPravegaJsonTableSource.java
new file mode 100644
index 00000000..44ae49e8
--- /dev/null
+++ b/src/main/java/io/pravega/connectors/flink/FlinkPravegaJsonTableSource.java
@@ -0,0 +1,95 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.connectors.flink;
+
+import io.pravega.connectors.flink.serialization.JsonRowDeserializationSchema;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+
+import java.util.function.Supplier;
+
+/**
+ * A {@link TableSource} to read JSON-formatted Pravega streams using the Flink Table API.
+ */
+public class FlinkPravegaJsonTableSource extends FlinkPravegaTableSource {
+
+ protected FlinkPravegaJsonTableSource(
+ Supplier> readerFactory,
+ Supplier> inputFormatFactory,
+ TableSchema tableSchema) {
+ super(readerFactory, inputFormatFactory, tableSchema, jsonSchemaToReturnType(tableSchema));
+ }
+
+ @Override
+ public String explainSource() {
+ return "FlinkPravegaJsonTableSource";
+ }
+
+ /**
+ * A builder for {@link FlinkPravegaJsonTableSource} to read Pravega streams using the Flink Table API.
+ */
+ public static FlinkPravegaJsonTableSource.Builder builder() {
+ return new Builder();
+ }
+
+ /** Converts the JSON schema into into the return type. */
+ private static RowTypeInfo jsonSchemaToReturnType(TableSchema jsonSchema) {
+ return new RowTypeInfo(jsonSchema.getTypes(), jsonSchema.getColumnNames());
+ }
+
+ /**
+ * A builder for {@link FlinkPravegaJsonTableSource} to read JSON-formatted Pravega streams using the Flink Table API.
+ */
+ public static class Builder
+ extends FlinkPravegaTableSource.BuilderBase {
+
+ private boolean failOnMissingField = false;
+
+ @Override
+ protected Builder builder() {
+ return this;
+ }
+
+ /**
+ * Sets flag whether to fail if a field is missing.
+ *
+ * @param failOnMissingField If set to true, the TableSource fails if a missing fields.
+ * If set to false, a missing field is set to null.
+ * @return The builder.
+ */
+ public Builder failOnMissingField(boolean failOnMissingField) {
+ this.failOnMissingField = failOnMissingField;
+ return builder();
+ }
+
+ @Override
+ protected JsonRowDeserializationSchema getDeserializationSchema() {
+ JsonRowDeserializationSchema deserSchema = new JsonRowDeserializationSchema(jsonSchemaToReturnType(getTableSchema()));
+ deserSchema.setFailOnMissingField(failOnMissingField);
+ return deserSchema;
+ }
+
+ /**
+ * Builds a {@link FlinkPravegaReader} based on the configuration.
+ *
+ * @throws IllegalStateException if the configuration is invalid.
+ */
+ public FlinkPravegaJsonTableSource build() {
+ FlinkPravegaJsonTableSource tableSource = new FlinkPravegaJsonTableSource(
+ this::buildSourceFunction,
+ this::buildInputFormat,
+ getTableSchema());
+ configureTableSource(tableSource);
+ return tableSource;
+ }
+ }
+}
diff --git a/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java b/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java
index 89ea49ef..61b7377f 100644
--- a/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java
+++ b/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java
@@ -10,7 +10,7 @@
package io.pravega.connectors.flink;
import com.google.common.base.Preconditions;
-
+import io.pravega.client.ClientConfig;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.EventRead;
@@ -18,30 +18,19 @@
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
-
-import io.pravega.client.stream.Stream;
-import io.pravega.client.stream.StreamCut;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.flink.api.common.functions.StoppableFunction;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.util.FlinkException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-
import static io.pravega.connectors.flink.util.FlinkPravegaUtils.createPravegaReader;
-import static io.pravega.connectors.flink.util.FlinkPravegaUtils.generateRandomReaderGroupName;
-import static io.pravega.connectors.flink.util.FlinkPravegaUtils.getDefaultReaderName;
/**
* Flink source implementation for reading from pravega storage.
@@ -55,41 +44,37 @@ public class FlinkPravegaReader
private static final long serialVersionUID = 1L;
- private static final long DEFAULT_EVENT_READ_TIMEOUT = 1000;
-
- private static final long DEFAULT_CHECKPOINT_INITIATE_TIMEOUT = 5000;
-
// ----- configuration fields -----
- // The supplied event deserializer.
- private final DeserializationSchema deserializationSchema;
+ // the uuid of the checkpoint hook, used to store state and resume existing state from savepoints
+ final String hookUid;
- // The pravega controller endpoint.
- private final URI controllerURI;
+ // The Pravega client config.
+ final ClientConfig clientConfig;
- // The scope name of the destination stream.
- private final String scopeName;
+ // The Pravega reader group config.
+ final ReaderGroupConfig readerGroupConfig;
- // The readergroup name to coordinate the parallel readers. This should be unique for a Flink job.
- private final String readerGroupName;
+ // The scope name of the reader group.
+ final String readerGroupScope;
- // The stream names of the associated scope
- private final Set streamNames;
+ // The readergroup name to coordinate the parallel readers. This should be unique for a Flink job.
+ final String readerGroupName;
- // the name of the reader, used to store state and resume existing state from savepoints
- private final String readerName;
+ // The supplied event deserializer.
+ final DeserializationSchema deserializationSchema;
// the timeout for reading events from Pravega
- private long eventReadTimeout = DEFAULT_EVENT_READ_TIMEOUT;
+ final Time eventReadTimeout;
// the timeout for call that initiates the Pravega checkpoint
- private long checkpointInitiateTimeout = DEFAULT_CHECKPOINT_INITIATE_TIMEOUT;
+ final Time checkpointInitiateTimeout;
// ----- runtime fields -----
// Flag to terminate the source. volatile, because 'stop()' and 'cancel()'
// may be called asynchronously
- private volatile boolean running = true;
+ volatile boolean running = true;
// checkpoint trigger callback, invoked when a checkpoint event is received.
// no need to be volatile, the source is driven by only one thread
@@ -97,27 +82,6 @@ public class FlinkPravegaReader
// ------------------------------------------------------------------------
- /**
- * Creates a new Flink Pravega reader instance which can be added as a source to a Flink job.
- *
- *
The reader will use a random name under which it stores its state in a checkpoint. While
- * checkpoints still work, this means that matching the state into another Flink jobs
- * (when resuming from a savepoint) will not be possible. Thus it is generally recommended
- * to give a reader name to each reader.
- *
- * @param controllerURI The pravega controller endpoint address.
- * @param scope The destination stream's scope name.
- * @param streamNames The list of stream names to read events from.
- * @param startTime The start time from when to read events from.
- * Use 0 to read all stream events from the beginning.
- * @param deserializationSchema The implementation to deserialize events from pravega streams.
- */
- public FlinkPravegaReader(final URI controllerURI, final String scope, final Set streamNames,
- final long startTime, final DeserializationSchema deserializationSchema) {
-
- this(controllerURI, scope, streamNames, startTime, deserializationSchema, null);
- }
-
/**
* Creates a new Flink Pravega reader instance which can be added as a source to a Flink job.
*
@@ -129,95 +93,37 @@ public FlinkPravegaReader(final URI controllerURI, final String scope, final Set
*
Without specifying a {@code readerName}, the job will correctly checkpoint and recover,
* but new instances of the job can typically not resume this reader's state (positions).
*
- * @param controllerURI The pravega controller endpoint address.
- * @param scope The destination stream's scope name.
- * @param streamNames The list of stream names to read events from.
- * @param startTime The start time from when to read events from.
- * Use 0 to read all stream events from the beginning.
- * @param deserializationSchema The implementation to deserialize events from pravega streams.
- * @param readerName The name of the reader, used to store state and resume existing
- * state from savepoints.
+ * @param hookUid The UID of the source hook in the job graph.
+ * @param clientConfig The Pravega client configuration.
+ * @param readerGroupConfig The Pravega reader group configuration.
+ * @param readerGroupScope The reader group scope name.
+ * @param readerGroupName The reader group name.
+ * @param deserializationSchema The implementation to deserialize events from Pravega streams.
+ * @param eventReadTimeout The event read timeout.
+ * @param checkpointInitiateTimeout The checkpoint initiation timeout.
*/
- public FlinkPravegaReader(final URI controllerURI, final String scope, final Set streamNames,
- final long startTime, final DeserializationSchema deserializationSchema,
- final String readerName) {
-
- Preconditions.checkNotNull(controllerURI, "controllerURI");
- Preconditions.checkNotNull(scope, "scope");
- Preconditions.checkNotNull(streamNames, "streamNames");
- Preconditions.checkArgument(startTime >= 0, "start time must be >= 0");
- Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
- if (readerName == null) {
- this.readerName = getDefaultReaderName(scope, streamNames);
- } else {
- this.readerName = readerName;
- }
-
- this.controllerURI = controllerURI;
- this.scopeName = scope;
- this.deserializationSchema = deserializationSchema;
- this.readerGroupName = generateRandomReaderGroupName();
- this.streamNames = streamNames;
-
- // TODO: This will require the client to have access to the pravega controller and handle any temporary errors.
- // See https://github.com/pravega/pravega/issues/553.
- log.info("Creating reader group: {} for the Flink job", this.readerGroupName);
-
- createReaderGroup();
-
+ protected FlinkPravegaReader(String hookUid, ClientConfig clientConfig,
+ ReaderGroupConfig readerGroupConfig, String readerGroupScope, String readerGroupName,
+ DeserializationSchema deserializationSchema, Time eventReadTimeout, Time checkpointInitiateTimeout) {
+
+ this.hookUid = Preconditions.checkNotNull(hookUid, "hookUid");
+ this.clientConfig = Preconditions.checkNotNull(clientConfig, "clientConfig");
+ this.readerGroupConfig = Preconditions.checkNotNull(readerGroupConfig, "readerGroupConfig");
+ this.readerGroupScope = Preconditions.checkNotNull(readerGroupScope, "readerGroupScope");
+ this.readerGroupName = Preconditions.checkNotNull(readerGroupName, "readerGroupName");
+ this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
+ this.eventReadTimeout = Preconditions.checkNotNull(eventReadTimeout, "eventReadTimeout");
+ this.checkpointInitiateTimeout = Preconditions.checkNotNull(checkpointInitiateTimeout, "checkpointInitiateTimeout");
}
- // ------------------------------------------------------------------------
- // properties
- // ------------------------------------------------------------------------
-
/**
- * Sets the timeout for initiating a checkpoint in Pravega.
- *
- *
This timeout if applied to the future returned by
- * {@link io.pravega.client.stream.ReaderGroup#initiateCheckpoint(String, ScheduledExecutorService)}.
- *
- * @param checkpointInitiateTimeout The timeout, in milliseconds
+ * Initializes the reader.
*/
- public void setCheckpointInitiateTimeout(long checkpointInitiateTimeout) {
- Preconditions.checkArgument(checkpointInitiateTimeout > 0, "timeout must be >= 0");
- this.checkpointInitiateTimeout = checkpointInitiateTimeout;
- }
-
- /**
- * Gets the timeout for initiating a checkpoint in Pravega.
- *
- *
This timeout if applied to the future returned by
- * {@link io.pravega.client.stream.ReaderGroup#initiateCheckpoint(String, ScheduledExecutorService)}.
- *
- * @return The timeout, in milliseconds
- */
- public long getCheckpointInitiateTimeout() {
- return checkpointInitiateTimeout;
- }
-
- /**
- * Gets the timeout for the call to read events from Pravega. After the timeout
- * expires (without an event being returned), another call will be made.
- *
- *
This timeout is passed to {@link EventStreamReader#readNextEvent(long)}.
- *
- * @param eventReadTimeout The timeout, in milliseconds
- */
- public void setEventReadTimeout(long eventReadTimeout) {
- Preconditions.checkArgument(eventReadTimeout > 0, "timeout must be >= 0");
- this.eventReadTimeout = eventReadTimeout;
- }
-
- /**
- * Gets the timeout for the call to read events from Pravega.
- *
- *
This timeout is the value passed to {@link EventStreamReader#readNextEvent(long)}.
- *
- * @return The timeout, in milliseconds
- */
- public long getEventReadTimeout() {
- return eventReadTimeout;
+ void initialize() {
+ // TODO: This will require the client to have access to the pravega controller and handle any temporary errors.
+ // See https://github.com/pravega/flink-connectors/issues/130.
+ log.info("Creating reader group: {}/{} for the Flink job", this.readerGroupScope, this.readerGroupName);
+ createReaderGroup();
}
// ------------------------------------------------------------------------
@@ -230,21 +136,15 @@ public void run(SourceContext ctx) throws Exception {
final String readerId = getRuntimeContext().getTaskNameWithSubtasks();
log.info("{} : Creating Pravega reader with ID '{}' for controller URI: {}",
- getRuntimeContext().getTaskNameWithSubtasks(), readerId, this.controllerURI);
+ getRuntimeContext().getTaskNameWithSubtasks(), readerId, this.clientConfig.getControllerURI());
- try (EventStreamReader pravegaReader = createPravegaReader(
- this.scopeName,
- this.controllerURI,
- readerId,
- this.readerGroupName,
- this.deserializationSchema,
- ReaderConfig.builder().build())) {
+ try (EventStreamReader pravegaReader = createEventStreamReader(readerId)) {
- log.info("Starting Pravega reader '{}' for controller URI {}", readerId, this.controllerURI);
+ log.info("Starting Pravega reader '{}' for controller URI {}", readerId, this.clientConfig.getControllerURI());
// main work loop, which this task is running
while (this.running) {
- final EventRead eventRead = pravegaReader.readNextEvent(eventReadTimeout);
+ final EventRead eventRead = pravegaReader.readNextEvent(eventReadTimeout.toMilliseconds());
final T event = eventRead.getEvent();
// emit the event, if one was carried
@@ -291,7 +191,7 @@ public TypeInformation getProducedType() {
@Override
public MasterTriggerRestoreHook createMasterTriggerRestoreHook() {
- return new ReaderCheckpointHook(this.readerName, createReaderGroup(), this.checkpointInitiateTimeout);
+ return new ReaderCheckpointHook(this.hookUid, createReaderGroup(), this.checkpointInitiateTimeout);
}
@Override
@@ -320,23 +220,89 @@ private void triggerCheckpoint(String checkpointIdentifier) throws FlinkExceptio
checkpointTrigger.triggerCheckpoint(checkpointId);
}
- /*
- * create reader group for the associated stream names and scope
+ // ------------------------------------------------------------------------
+ // utility
+ // ------------------------------------------------------------------------
+
+ /**
+ * Create the {@link ReaderGroup} for the current configuration.
+ */
+ protected ReaderGroup createReaderGroup() {
+ ReaderGroupManager readerGroupManager = createReaderGroupManager();
+ readerGroupManager.createReaderGroup(this.readerGroupName, readerGroupConfig);
+ return readerGroupManager.getReaderGroup(this.readerGroupName);
+ }
+
+ /**
+ * Create the {@link ReaderGroupManager} for the current configuration.
+ */
+ protected ReaderGroupManager createReaderGroupManager() {
+ return ReaderGroupManager.withScope(readerGroupScope, clientConfig);
+ }
+
+ /**
+ * Create the {@link EventStreamReader} for the current configuration.
+ * @param readerId the readerID to use.
+ */
+ protected EventStreamReader createEventStreamReader(String readerId) {
+ return createPravegaReader(
+ this.clientConfig,
+ readerId,
+ this.readerGroupScope,
+ this.readerGroupName,
+ this.deserializationSchema,
+ ReaderConfig.builder().build());
+ }
+
+ // ------------------------------------------------------------------------
+ // configuration
+ // ------------------------------------------------------------------------
+
+ /**
+ * Gets a builder for {@link FlinkPravegaReader} to read Pravega streams using the Flink streaming API.
+ * @param the element type.
*/
- private ReaderGroup createReaderGroup() {
- Map streamConfigMap = new HashMap<>();
- for (String stream: streamNames) {
- streamConfigMap.put(Stream.of(scopeName, stream), StreamCut.UNBOUNDED);
+ public static FlinkPravegaReader.Builder builder() {
+ return new Builder<>();
+ }
+
+ /**
+ * A builder for {@link FlinkPravegaReader}.
+ *
+ * @param the element type.
+ */
+ public static class Builder extends AbstractStreamingReaderBuilder> {
+
+ private DeserializationSchema deserializationSchema;
+
+ protected Builder builder() {
+ return this;
}
- ReaderGroupConfig groupConfig = ReaderGroupConfig.builder()
- .disableAutomaticCheckpoints()
- .startFromStreamCuts(streamConfigMap)
- .build();
+ /**
+ * Sets the deserialization schema.
+ *
+ * @param deserializationSchema The deserialization schema
+ */
+ public Builder withDeserializationSchema(DeserializationSchema deserializationSchema) {
+ this.deserializationSchema = deserializationSchema;
+ return builder();
+ }
- ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scopeName, controllerURI);
- readerGroupManager.createReaderGroup(this.readerGroupName, groupConfig);
+ @Override
+ protected DeserializationSchema getDeserializationSchema() {
+ Preconditions.checkState(deserializationSchema != null, "Deserialization schema must not be null.");
+ return deserializationSchema;
+ }
- return readerGroupManager.getReaderGroup(this.readerGroupName);
+ /**
+ * Builds a {@link FlinkPravegaReader} based on the configuration.
+ * @throws IllegalStateException if the configuration is invalid.
+ */
+ public FlinkPravegaReader build() {
+ FlinkPravegaReader reader = buildSourceFunction();
+ reader.initialize();
+ return reader;
+ }
}
}
\ No newline at end of file
diff --git a/src/main/java/io/pravega/connectors/flink/FlinkPravegaTableSink.java b/src/main/java/io/pravega/connectors/flink/FlinkPravegaTableSink.java
index 98e6bbfb..a3314a45 100644
--- a/src/main/java/io/pravega/connectors/flink/FlinkPravegaTableSink.java
+++ b/src/main/java/io/pravega/connectors/flink/FlinkPravegaTableSink.java
@@ -7,10 +7,10 @@
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
-
package io.pravega.connectors.flink;
-import io.pravega.connectors.flink.util.StreamId;
+import lombok.Getter;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -20,7 +20,6 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
-import java.net.URI;
import java.util.Arrays;
import java.util.function.Function;
@@ -29,64 +28,32 @@
import static org.apache.flink.util.Preconditions.checkState;
/**
- * An append-only table sink to emit a streaming table as a Pravaga stream.
+ * An append-only table sink to emit a streaming table as a Pravega stream.
*/
-public class FlinkPravegaTableSink implements AppendStreamTableSink {
-
- /** The Pravega controller endpoint. */
- protected final URI controllerURI;
-
- /** The Pravega stream to use. */
- protected final StreamId stream;
-
- protected SerializationSchema serializationSchema;
- protected PravegaEventRouter eventRouter;
- protected String[] fieldNames;
- protected TypeInformation[] fieldTypes;
+public abstract class FlinkPravegaTableSink implements AppendStreamTableSink {
- /** Serialization schema to use for Pravega stream events. */
- private final Function> serializationSchemaFactory;
+ /** A factory for the stream writer. */
+ protected final Function> writerFactory;
- private final String routingKeyFieldName;
+ /** The effective table sink configuration */
+ private TableSinkConfiguration tableSinkConfiguration;
/**
* Creates a Pravega {@link AppendStreamTableSink}.
*
- *
The {@code serializationSchemaFactory} supplies a {@link SerializationSchema}
- * based on the output field names.
- *
*
Each row is written to a Pravega stream with a routing key based on the {@code routingKeyFieldName}.
* The specified field must of type {@code STRING}.
*
- * @param controllerURI The pravega controller endpoint address.
- * @param stream The stream to write events to.
- * @param serializationSchemaFactory A factory for the serialization schema to use for stream events.
- * @param routingKeyFieldName The field name to use as a Pravega event routing key.
+ * @param writerFactory A factory for the stream writer.
*/
- public FlinkPravegaTableSink(
- URI controllerURI,
- StreamId stream,
- Function> serializationSchemaFactory,
- String routingKeyFieldName) {
- this.controllerURI = controllerURI;
- this.stream = stream;
- this.serializationSchemaFactory = serializationSchemaFactory;
- this.routingKeyFieldName = routingKeyFieldName;
+ protected FlinkPravegaTableSink(Function> writerFactory) {
+ this.writerFactory = Preconditions.checkNotNull(writerFactory, "writerFactory");
}
/**
* Creates a copy of the sink for configuration purposes.
*/
- protected FlinkPravegaTableSink createCopy() {
- return new FlinkPravegaTableSink(controllerURI, stream, serializationSchemaFactory, routingKeyFieldName);
- }
-
- /**
- * Returns the low-level writer.
- */
- protected FlinkPravegaWriter createFlinkPravegaWriter() {
- return new FlinkPravegaWriter<>(controllerURI, stream.getScope(), stream.getName(), serializationSchema, eventRouter);
- }
+ protected abstract FlinkPravegaTableSink createCopy();
/**
* NOTE: This method is for internal use only for defining a TableSink.
@@ -94,12 +61,8 @@ protected FlinkPravegaWriter createFlinkPravegaWriter() {
*/
@Override
public void emitDataStream(DataStream dataStream) {
- checkState(fieldNames != null, "Table sink is not configured");
- checkState(fieldTypes != null, "Table sink is not configured");
- checkState(serializationSchema != null, "Table sink is not configured");
- checkState(eventRouter != null, "Table sink is not configured");
-
- FlinkPravegaWriter writer = createFlinkPravegaWriter();
+ checkState(tableSinkConfiguration != null, "Table sink is not configured");
+ FlinkPravegaWriter writer = writerFactory.apply(tableSinkConfiguration);
dataStream.addSink(writer);
}
@@ -109,35 +72,48 @@ public TypeInformation getOutputType() {
}
public String[] getFieldNames() {
- return fieldNames;
+ checkState(tableSinkConfiguration != null, "Table sink is not configured");
+ return tableSinkConfiguration.fieldNames;
}
@Override
public TypeInformation>[] getFieldTypes() {
- return fieldTypes;
+ checkState(tableSinkConfiguration != null, "Table sink is not configured");
+ return tableSinkConfiguration.fieldTypes;
}
@Override
public FlinkPravegaTableSink configure(String[] fieldNames, TypeInformation>[] fieldTypes) {
-
// called to configure the sink with a specific subset of fields
-
- FlinkPravegaTableSink copy = createCopy();
- copy.fieldNames = checkNotNull(fieldNames, "fieldNames");
- copy.fieldTypes = checkNotNull(fieldTypes, "fieldTypes");
+ checkNotNull(fieldNames, "fieldNames");
+ checkNotNull(fieldTypes, "fieldTypes");
Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
"Number of provided field names and types does not match.");
- copy.serializationSchema = serializationSchemaFactory.apply(fieldNames);
- copy.eventRouter = new RowBasedRouter(routingKeyFieldName, fieldNames, fieldTypes);
-
+ FlinkPravegaTableSink copy = createCopy();
+ copy.tableSinkConfiguration = new TableSinkConfiguration(fieldNames, fieldTypes);
return copy;
}
+ /**
+ * The table sink configuration which is provided by the table environment via {@code TableSink::configure}.
+ */
+ @Getter
+ protected static class TableSinkConfiguration {
+ // the set of projected fields and their types
+ private String[] fieldNames;
+ private TypeInformation[] fieldTypes;
+
+ public TableSinkConfiguration(String[] fieldNames, TypeInformation[] fieldTypes) {
+ this.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
+ this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
+ }
+ }
+
/**
* An event router that extracts the routing key from a {@link Row} by field name.
*/
- private static class RowBasedRouter implements PravegaEventRouter {
+ static class RowBasedRouter implements PravegaEventRouter {
private final int keyIndex;
@@ -156,5 +132,54 @@ public RowBasedRouter(String keyFieldName, String[] fieldNames, TypeInformation<
public String getRoutingKey(Row event) {
return (String) event.getField(keyIndex);
}
+
+ int getKeyIndex() {
+ return keyIndex;
+ }
+ }
+
+ /**
+ * An abstract {@link FlinkPravegaTableSink} builder.
+ * @param the builder type.
+ */
+ @Internal
+ public abstract static class AbstractTableSinkBuilder extends AbstractStreamingWriterBuilder {
+
+ private String routingKeyFieldName;
+
+ /**
+ * Sets the field name to use as a Pravega event routing key.
+ *
+ * Each row is written to a Pravega stream with a routing key based on the given field name.
+ * The specified field must of type {@code STRING}.
+ *
+ * @param fieldName the field name.
+ */
+ public B withRoutingKeyField(String fieldName) {
+ this.routingKeyFieldName = fieldName;
+ return builder();
+ }
+
+ // region Internal
+
+ /**
+ * Gets a serialization schema based on the given output field names.
+ * @param fieldNames the field names to emit.
+ */
+ protected abstract SerializationSchema getSerializationSchema(String[] fieldNames);
+
+ /**
+ * Creates the sink function based on the given table sink configuration and current builder state.
+ *
+ * @param configuration the table sink configuration, incl. projected fields
+ */
+ protected FlinkPravegaWriter createSinkFunction(TableSinkConfiguration configuration) {
+ Preconditions.checkState(routingKeyFieldName != null, "The routing key field must be provided.");
+ SerializationSchema serializationSchema = getSerializationSchema(configuration.getFieldNames());
+ PravegaEventRouter eventRouter = new RowBasedRouter(routingKeyFieldName, configuration.getFieldNames(), configuration.getFieldTypes());
+ return createSinkFunction(serializationSchema, eventRouter);
+ }
+
+ // endregion
}
}
diff --git a/src/main/java/io/pravega/connectors/flink/FlinkPravegaTableSource.java b/src/main/java/io/pravega/connectors/flink/FlinkPravegaTableSource.java
index 797842b0..657c50e2 100644
--- a/src/main/java/io/pravega/connectors/flink/FlinkPravegaTableSource.java
+++ b/src/main/java/io/pravega/connectors/flink/FlinkPravegaTableSource.java
@@ -10,66 +10,58 @@
package io.pravega.connectors.flink;
-import io.pravega.connectors.flink.util.StreamId;
-
+import io.pravega.client.ClientConfig;
+import io.pravega.connectors.flink.util.StreamWithBoundaries;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.table.sources.StreamTableSource;
+import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
-import java.net.URI;
-import java.util.Collections;
-import java.util.function.Function;
+import java.util.List;
+import java.util.function.Supplier;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * A table source to produce a streaming table from a Pravega stream.
+ * A {@link TableSource} to read Pravega streams using the Flink Table API.
+ *
+ * Supports both stream and batch environments.
*/
-public class FlinkPravegaTableSource implements StreamTableSource {
-
- /** The Pravega controller endpoint. */
- private final URI controllerURI;
+public abstract class FlinkPravegaTableSource implements StreamTableSource, BatchTableSource {
- /** The Pravega stream to use. */
- private final StreamId stream;
+ private final Supplier> sourceFunctionFactory;
- /** The start time from when to read events from. */
- private final long startTime;
+ private final Supplier> inputFormatFactory;
- /** Deserialization schema to use for Pravega stream events. */
- private final DeserializationSchema deserializationSchema;
+ private final TableSchema schema;
/** Type information describing the result type. */
- private final TypeInformation typeInfo;
+ private final TypeInformation returnType;
/**
- * Creates a Pravega {@link StreamTableSource}.
- *
- *
The {@code deserializationSchemaFactory} supplies a {@link DeserializationSchema}
- * based on the result type information.
- *
- * @param controllerURI The pravega controller endpoint address.
- * @param stream The stream to read events from.
- * @param startTime The start time from when to read events from.
- * @param deserializationSchemaFactory The deserialization schema to use for stream events.
- * @param typeInfo The type information describing the result type.
+ * Creates a Pravega {@link TableSource}.
+ * @param sourceFunctionFactory a factory for the {@link FlinkPravegaReader} to implement {@link StreamTableSource}
+ * @param inputFormatFactory a factory for the {@link FlinkPravegaInputFormat} to implement {@link BatchTableSource}
+ * @param schema the table schema
+ * @param returnType the return type based on the table schema
*/
- public FlinkPravegaTableSource(
- final URI controllerURI,
- final StreamId stream,
- final long startTime,
- Function, DeserializationSchema> deserializationSchemaFactory,
- TypeInformation typeInfo) {
- this.controllerURI = controllerURI;
- this.stream = stream;
- this.startTime = startTime;
- checkNotNull(deserializationSchemaFactory, "Deserialization schema factory");
- this.typeInfo = checkNotNull(typeInfo, "Type information");
- this.deserializationSchema = deserializationSchemaFactory.apply(typeInfo);
+ protected FlinkPravegaTableSource(
+ Supplier> sourceFunctionFactory,
+ Supplier> inputFormatFactory,
+ TableSchema schema,
+ TypeInformation returnType) {
+ this.sourceFunctionFactory = checkNotNull(sourceFunctionFactory, "sourceFunctionFactory");
+ this.inputFormatFactory = checkNotNull(inputFormatFactory, "inputFormatFactory");
+ this.schema = checkNotNull(schema, "schema");
+ this.returnType = checkNotNull(returnType, "returnType");
}
/**
@@ -78,42 +70,85 @@ public FlinkPravegaTableSource(
*/
@Override
public DataStream getDataStream(StreamExecutionEnvironment env) {
- FlinkPravegaReader reader = createFlinkPravegaReader();
+ FlinkPravegaReader reader = sourceFunctionFactory.get();
+ reader.initialize();
return env.addSource(reader);
}
+ /**
+ * NOTE: This method is for internal use only for defining a TableSource.
+ * Do not use it in Table API programs.
+ */
@Override
- public TypeInformation getReturnType() {
- return typeInfo;
+ public DataSet getDataSet(ExecutionEnvironment env) {
+ FlinkPravegaInputFormat inputFormat = inputFormatFactory.get();
+ return env.createInput(inputFormat);
}
@Override
- public TableSchema getTableSchema() {
- return TableSchema.fromTypeInfo(typeInfo);
+ public TypeInformation getReturnType() {
+ return returnType;
}
- /**
- * Returns the low-level reader.
- */
- protected FlinkPravegaReader createFlinkPravegaReader() {
- return new FlinkPravegaReader<>(
- controllerURI,
- stream.getScope(), Collections.singleton(stream.getName()),
- startTime,
- deserializationSchema);
+ @Override
+ public TableSchema getTableSchema() {
+ return schema;
}
/**
- * Returns the deserialization schema.
+ * A base builder for {@link FlinkPravegaTableSource} to read Pravega streams using the Flink Table API.
*
- * @return The deserialization schema
+ * @param the table source type.
+ * @param the builder type.
*/
- protected DeserializationSchema getDeserializationSchema() {
- return deserializationSchema;
- }
-
- @Override
- public String explainSource() {
- return "";
+ public abstract static class BuilderBase
+ extends AbstractStreamingReaderBuilder {
+
+ private TableSchema schema;
+
+ /**
+ * Sets the schema of the produced table.
+ *
+ * @param schema The schema of the produced table.
+ * @return The builder.
+ */
+ public B withSchema(TableSchema schema) {
+ Preconditions.checkNotNull(schema, "Schema must not be null.");
+ Preconditions.checkArgument(this.schema == null, "Schema has already been set.");
+ this.schema = schema;
+ return builder();
+ }
+
+ /**
+ * Returns the configured table schema.
+ *
+ * @return the configured table schema.
+ */
+ protected TableSchema getTableSchema() {
+ Preconditions.checkState(this.schema != null, "Schema hasn't been set.");
+ return this.schema;
+ }
+
+ /**
+ * Applies a configuration to the table source.
+ *
+ * @param source the table source.
+ */
+ protected void configureTableSource(T source) {
+ }
+
+ /**
+ * Gets a factory to build an {@link FlinkPravegaInputFormat} for using the Table API in a Flink batch environment.
+ *
+ * @return a supplier to eagerly validate the configuration and lazily construct the input format.
+ */
+ FlinkPravegaInputFormat buildInputFormat() {
+
+ final List streams = resolveStreams();
+ final ClientConfig clientConfig = getPravegaConfig().getClientConfig();
+ final DeserializationSchema deserializationSchema = getDeserializationSchema();
+
+ return new FlinkPravegaInputFormat<>(clientConfig, streams, deserializationSchema);
+ }
}
}
diff --git a/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java b/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java
index 9c47ec98..34dffd77 100644
--- a/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java
+++ b/src/main/java/io/pravega/connectors/flink/FlinkPravegaWriter.java
@@ -9,14 +9,15 @@
*/
package io.pravega.connectors.flink;
-import com.google.common.base.Preconditions;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.pravega.client.ClientConfig;
import io.pravega.client.ClientFactory;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Serializer;
+import io.pravega.client.stream.Stream;
import io.pravega.client.stream.Transaction;
import io.pravega.common.Exceptions;
-import io.pravega.connectors.flink.util.StreamId;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
@@ -26,10 +27,10 @@
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.io.Serializable;
-import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -57,97 +58,63 @@ public class FlinkPravegaWriter
private static final long serialVersionUID = 1L;
- // the numbers below are picked somewhat arbitrarily at this point
-
- private static final long DEFAULT_TXN_TIMEOUT_MILLIS = 2 * 60 * 60 * 1000; // 2 hours
-
- private static final long DEFAULT_TX_SCALE_GRACE_MILLIS = 10 * 60 * 1000; // 10 minutes
-
// Writer interface to assist exactly-once and at-least-once functionality
@VisibleForTesting
transient AbstractInternalWriter writer = null;
// ----------- configuration fields -----------
+ // The Pravega client config.
+ final ClientConfig clientConfig;
+
// The supplied event serializer.
- private final SerializationSchema serializationSchema;
+ final SerializationSchema serializationSchema;
// The router used to partition events within a stream.
- private final PravegaEventRouter eventRouter;
-
- // The pravega controller endpoint.
- private final URI controllerURI;
-
- // The scope name of the destination stream.
- private final String scopeName;
+ final PravegaEventRouter eventRouter;
- // The pravega stream name to write events to.
- private final String streamName;
+ // The destination stream.
+ @SuppressFBWarnings("SE_BAD_FIELD")
+ final Stream stream;
+ // Various timeouts
private final long txnTimeoutMillis;
private final long txnGracePeriodMillis;
// The sink's mode of operation. This is used to provide different guarantees for the written events.
- private PravegaWriterMode writerMode = PravegaWriterMode.ATLEAST_ONCE;
+ private PravegaWriterMode writerMode;
// Client factory for PravegaWriter instances
private transient ClientFactory clientFactory = null;
/**
- * The flink pravega writer instance which can be added as a sink to a flink job.
- *
- * @param controllerURI The pravega controller endpoint address.
- * @param scope The destination stream's scope name.
- * @param streamName The destination stream Name.
- * @param serializationSchema The implementation for serializing every event into pravega's storage format.
- * @param router The implementation to extract the partition key from the event.
- */
- public FlinkPravegaWriter(
- final URI controllerURI,
- final String scope,
- final String streamName,
- final SerializationSchema serializationSchema,
- final PravegaEventRouter router) {
-
- this(controllerURI, scope, streamName, serializationSchema, router,
- DEFAULT_TXN_TIMEOUT_MILLIS, DEFAULT_TX_SCALE_GRACE_MILLIS);
- }
-
- /**
- * The flink pravega writer instance which can be added as a sink to a flink job.
+ * The flink pravega writer instance which can be added as a sink to a Flink job.
*
- * @param controllerURI The pravega controller endpoint address.
- * @param scope The destination stream's scope name.
- * @param streamName The destination stream Name.
+ * @param clientConfig The Pravega client configuration.
+ * @param stream The destination stream.
* @param serializationSchema The implementation for serializing every event into pravega's storage format.
- * @param router The implementation to extract the partition key from the event.
+ * @param eventRouter The implementation to extract the partition key from the event.
+ * @param writerMode The Pravega writer mode.
* @param txnTimeoutMillis The number of milliseconds after which the transaction will be aborted.
* @param txnGracePeriodMillis The maximum amount of time, in milliseconds, until which transaction may
* remain active, after a scale operation has been initiated on the underlying stream.
*/
- public FlinkPravegaWriter(
- final URI controllerURI,
- final String scope,
- final String streamName,
+ protected FlinkPravegaWriter(
+ final ClientConfig clientConfig,
+ final Stream stream,
final SerializationSchema serializationSchema,
- final PravegaEventRouter router,
+ final PravegaEventRouter eventRouter,
+ final PravegaWriterMode writerMode,
final long txnTimeoutMillis,
final long txnGracePeriodMillis) {
- Preconditions.checkNotNull(controllerURI, "controllerURI");
- Preconditions.checkNotNull(scope, "scope");
- Preconditions.checkNotNull(streamName, "streamName");
- Preconditions.checkNotNull(serializationSchema, "serializationSchema");
- Preconditions.checkNotNull(router, "router");
+ this.clientConfig = Preconditions.checkNotNull(clientConfig, "clientConfig");
+ this.stream = Preconditions.checkNotNull(stream, "stream");
+ this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "serializationSchema");
+ this.eventRouter = Preconditions.checkNotNull(eventRouter, "eventRouter");
+ this.writerMode = Preconditions.checkNotNull(writerMode, "writerMode");
Preconditions.checkArgument(txnTimeoutMillis > 0, "txnTimeoutMillis must be > 0");
Preconditions.checkArgument(txnGracePeriodMillis > 0, "txnGracePeriodMillis must be > 0");
-
- this.controllerURI = controllerURI;
- this.scopeName = scope;
- this.streamName = streamName;
- this.serializationSchema = serializationSchema;
- this.eventRouter = router;
-
this.txnTimeoutMillis = txnTimeoutMillis;
this.txnGracePeriodMillis = txnGracePeriodMillis;
}
@@ -166,24 +133,14 @@ public PravegaWriterMode getPravegaWriterMode() {
return this.writerMode;
}
- /**
- * Sets this writer's operating mode.
- *
- * @param writerMode The mode of operation.
- */
- public void setPravegaWriterMode(PravegaWriterMode writerMode) {
- Preconditions.checkNotNull(writerMode);
- this.writerMode = writerMode;
- }
-
// ------------------------------------------------------------------------
@Override
public void open(Configuration parameters) throws Exception {
initializeInternalWriter();
writer.open();
- log.info("Initialized Pravega writer for stream: {}/{} with controller URI: {}", scopeName,
- streamName, controllerURI);
+ log.info("Initialized Pravega writer for stream: {} with controller URI: {}", stream,
+ clientConfig.getControllerURI());
}
@Override
@@ -255,8 +212,8 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
// ------------------------------------------------------------------------
@VisibleForTesting
- protected ClientFactory createClientFactory(String scopeName, URI controllerURI) {
- return ClientFactory.withScope(scopeName, controllerURI);
+ protected ClientFactory createClientFactory(String scopeName, ClientConfig clientConfig) {
+ return ClientFactory.withScope(scopeName, clientConfig);
}
@VisibleForTesting
@@ -285,7 +242,7 @@ private void initializeInternalWriter() {
throw new UnsupportedOperationException("Enable checkpointing to use the exactly-once writer mode.");
}
- this.clientFactory = createClientFactory(scopeName, controllerURI);
+ this.clientFactory = createClientFactory(stream.getScope(), clientConfig);
this.writer = createInternalWriter();
}
@@ -297,6 +254,10 @@ protected String name() {
return getRuntimeContext().getTaskNameWithSubtasks();
}
+ public static FlinkPravegaWriter.Builder builder() {
+ return new Builder<>();
+ }
+
// ------------------------------------------------------------------------
// serializer
// ------------------------------------------------------------------------
@@ -364,7 +325,7 @@ abstract class AbstractInternalWriter {
.transactionTimeoutTime(txnTimeoutMillis)
.transactionTimeoutScaleGracePeriod(txnGracePeriodMillis)
.build();
- pravegaWriter = clientFactory.createEventWriter(streamName, eventSerializer, writerConfig);
+ pravegaWriter = clientFactory.createEventWriter(stream.getStreamName(), eventSerializer, writerConfig);
}
abstract void open() throws Exception;
@@ -466,7 +427,9 @@ public List snapshotState(long checkpointId, long checkpoint
log.debug("{} - storing pending transactions {}", name(), txnsPendingCommit);
// store all pending transactions in the checkpoint state
- return txnsPendingCommit.stream().map(v -> new PendingTransaction(v.transaction().getTxnId(), scopeName, streamName)).collect(Collectors.toList());
+ return txnsPendingCommit.stream()
+ .map(v -> new PendingTransaction(v.transaction().getTxnId(), stream.getScope(), stream.getStreamName()))
+ .collect(Collectors.toList());
}
@Override
@@ -549,16 +512,16 @@ public void restoreState(List pendingTransactionList) throws
Exception exception = null;
- Map> pendingTransactionsMap =
- pendingTransactionList.stream().collect(groupingBy(s -> new StreamId(s.getScope(), s.getStream())));
+ Map> pendingTransactionsMap =
+ pendingTransactionList.stream().collect(groupingBy(s -> Stream.of(s.getScope(), s.getStream())));
log.debug("pendingTransactionsMap:: " + pendingTransactionsMap);
- for (Map.Entry> transactionsEntry: pendingTransactionsMap.entrySet()) {
+ for (Map.Entry> transactionsEntry: pendingTransactionsMap.entrySet()) {
- StreamId streamId = transactionsEntry.getKey();
+ Stream streamId = transactionsEntry.getKey();
String scope = streamId.getScope();
- String stream = streamId.getName();
+ String stream = streamId.getStreamName();
Serializer eventSerializer = new FlinkSerializer<>(serializationSchema);
EventWriterConfig writerConfig = EventWriterConfig.builder()
@@ -567,7 +530,7 @@ public void restoreState(List pendingTransactionList) throws
.build();
try (
- ClientFactory restoreClientFactory = createClientFactory(scope, controllerURI);
+ ClientFactory restoreClientFactory = createClientFactory(scope, clientConfig);
EventStreamWriter restorePravegaWriter = restoreClientFactory.createEventWriter(stream,
eventSerializer,
writerConfig);
@@ -764,4 +727,52 @@ public String getStream() {
}
}
+ // ------------------------------------------------------------------------
+ // builder
+ // ------------------------------------------------------------------------
+
+ /**
+ * A builder for {@link FlinkPravegaWriter}.
+ *
+ * @param the element type.
+ */
+ public static class Builder extends AbstractStreamingWriterBuilder> {
+
+ private SerializationSchema serializationSchema;
+
+ private PravegaEventRouter eventRouter;
+
+ protected Builder builder() {
+ return this;
+ }
+
+ /**
+ * Sets the serialization schema.
+ *
+ * @param serializationSchema The serialization schema
+ */
+ public Builder withSerializationSchema(SerializationSchema serializationSchema) {
+ this.serializationSchema = serializationSchema;
+ return builder();
+ }
+
+ /**
+ * Sets the event router.
+ *
+ * @param eventRouter the event router which produces a key per event.
+ */
+ public Builder withEventRouter(PravegaEventRouter eventRouter) {
+ this.eventRouter = eventRouter;
+ return builder();
+ }
+
+ /**
+ * Builds the {@link FlinkPravegaWriter}.
+ */
+ public FlinkPravegaWriter build() {
+ Preconditions.checkState(eventRouter != null, "Event router must be supplied.");
+ Preconditions.checkState(serializationSchema != null, "Serialization schema must be supplied.");
+ return createSinkFunction(serializationSchema, eventRouter);
+ }
+ }
}
\ No newline at end of file
diff --git a/src/main/java/io/pravega/connectors/flink/PravegaConfig.java b/src/main/java/io/pravega/connectors/flink/PravegaConfig.java
new file mode 100644
index 00000000..eac588e6
--- /dev/null
+++ b/src/main/java/io/pravega/connectors/flink/PravegaConfig.java
@@ -0,0 +1,189 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.connectors.flink;
+
+import io.pravega.client.ClientConfig;
+import io.pravega.client.stream.Stream;
+import io.pravega.client.stream.impl.Credentials;
+import lombok.Data;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * The Pravega client configuration.
+ */
+public class PravegaConfig implements Serializable {
+
+ static final PravegaParameter CONTROLLER_PARAM = new PravegaParameter("controller", "pravega.controller.uri", "PRAVEGA_CONTROLLER_URI");
+ static final PravegaParameter SCOPE_PARAM = new PravegaParameter("scope", "pravega.scope", "PRAVEGA_SCOPE");
+
+ private static final long serialVersionUID = 1L;
+
+ private URI controllerURI;
+ private String defaultScope;
+ private Credentials credentials;
+ private boolean validateHostname = true;
+
+ // region Factory methods
+ PravegaConfig(Properties properties, Map env, ParameterTool params) {
+ this.controllerURI = CONTROLLER_PARAM.resolve(params, properties, env).map(URI::create).orElse(null);
+ this.defaultScope = SCOPE_PARAM.resolve(params, properties, env).orElse(null);
+ }
+
+ /**
+ * Gets a configuration based on defaults obtained from the local environment.
+ */
+ public static PravegaConfig fromDefaults() {
+ return new PravegaConfig(System.getProperties(), System.getenv(), ParameterTool.fromMap(Collections.emptyMap()));
+ }
+
+ /**
+ * Gets a configuration based on defaults obtained from the local environment plus the given program parameters.
+ *
+ * @param params the parameters to use.
+ */
+ public static PravegaConfig fromParams(ParameterTool params) {
+ return new PravegaConfig(System.getProperties(), System.getenv(), params);
+ }
+
+ // endregion
+
+ /**
+ * Gets the {@link ClientConfig} to use with the Pravega client.
+ */
+ public ClientConfig getClientConfig() {
+ ClientConfig.ClientConfigBuilder builder = ClientConfig.builder()
+ .validateHostName(validateHostname);
+ if (controllerURI != null) {
+ builder.controllerURI(controllerURI);
+ }
+ if (credentials != null) {
+ builder.credentials(credentials);
+ }
+ return builder.build();
+ }
+
+ /**
+ * Resolves the given stream name.
+ *
+ * The scope name is resolved in the following order:
+ * 1. from the stream name (if fully-qualified)
+ * 2. from the program argument {@code --scope} (if program arguments were provided to the {@link PravegaConfig})
+ * 3. from the system property {@code pravega.scope}
+ * 4. from the system environment variable {@code PRAVEGA_SCOPE}
+ *
+ * @param streamSpec a qualified or unqualified stream name
+ * @return a fully-qualified stream name
+ * @throws IllegalStateException if an unqualified stream name is supplied but the scope is not configured.
+ */
+ public Stream resolve(String streamSpec) {
+ Preconditions.checkNotNull(streamSpec, "streamSpec");
+ String[] split = streamSpec.split("/", 2);
+ if (split.length == 1) {
+ // unqualified
+ Preconditions.checkState(defaultScope != null, "The default scope is not configured.");
+ return Stream.of(defaultScope, split[0]);
+ } else {
+ // qualified
+ assert split.length == 2;
+ return Stream.of(split[0], split[1]);
+ }
+ }
+
+ // region Discovery
+
+ /**
+ * Configures the Pravega controller RPC URI.
+ *
+ * @param controllerURI The URI.
+ */
+ public PravegaConfig withControllerURI(URI controllerURI) {
+ this.controllerURI = controllerURI;
+ return this;
+ }
+
+ /**
+ * Configures the default Pravega scope, to resolve unqualified stream names and to support reader groups.
+ *
+ * @param scope The scope to use.
+ */
+ public PravegaConfig withDefaultScope(String scope) {
+ this.defaultScope = scope;
+ return this;
+ }
+
+ /**
+ * Gets the default Pravega scope.
+ */
+ @Nullable
+ public String getDefaultScope() {
+ return defaultScope;
+ }
+
+ // endregion
+
+ // region Security
+
+ /**
+ * Configures the Pravega credentials to use.
+ *
+ * @param credentials a credentials object.
+ */
+ public PravegaConfig withCredentials(Credentials credentials) {
+ this.credentials = credentials;
+ return this;
+ }
+
+ /**
+ * Enables or disables TLS hostname validation (default: true).
+ *
+ * @param validateHostname a boolean indicating whether to validate the hostname on incoming requests.
+ */
+ public PravegaConfig withHostnameValidation(boolean validateHostname) {
+ this.validateHostname = validateHostname;
+ return this;
+ }
+
+ // endregion
+
+ /**
+ * A configuration parameter resolvable via command-line parameters, system properties, or OS environment variables.
+ */
+ @Data
+ static class PravegaParameter implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String parameterName;
+ private final String propertyName;
+ private final String variableName;
+
+ public Optional resolve(ParameterTool parameters, Properties properties, Map variables) {
+ if (parameters != null && parameters.has(parameterName)) {
+ return Optional.of(parameters.get(parameterName));
+ }
+ if (properties != null && properties.containsKey(propertyName)) {
+ return Optional.of(properties.getProperty(propertyName));
+ }
+ if (variables != null && variables.containsKey(variableName)) {
+ return Optional.of(variables.get(variableName));
+ }
+ return Optional.empty();
+ }
+ }
+}
diff --git a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java b/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java
index 0ec3d399..93bf5700 100644
--- a/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java
+++ b/src/main/java/io/pravega/connectors/flink/ReaderCheckpointHook.java
@@ -14,6 +14,7 @@
import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
@@ -41,7 +42,7 @@ class ReaderCheckpointHook implements MasterTriggerRestoreHook {
/** The logical name of the operator. This is different from the (randomly generated)
* reader group name, because it is used to identify the state in a checkpoint/savepoint
* when resuming the checkpoint/savepoint with another job. */
- private final String readerName;
+ private final String hookUid;
/** The serializer for Pravega checkpoints, to store them in Flink checkpoints */
private final CheckpointSerializer checkpointSerializer;
@@ -50,12 +51,12 @@ class ReaderCheckpointHook implements MasterTriggerRestoreHook {
private final ReaderGroup readerGroup;
/** The timeout on the future returned by the 'initiateCheckpoint()' call */
- private final long triggerTimeout;
+ private final Time triggerTimeout;
- ReaderCheckpointHook(String readerName, ReaderGroup readerGroup, long triggerTimeout) {
+ ReaderCheckpointHook(String hookUid, ReaderGroup readerGroup, Time triggerTimeout) {
- this.readerName = checkNotNull(readerName);
+ this.hookUid = checkNotNull(hookUid);
this.readerGroup = checkNotNull(readerGroup);
this.triggerTimeout = triggerTimeout;
this.checkpointSerializer = new CheckpointSerializer();
@@ -65,7 +66,7 @@ class ReaderCheckpointHook implements MasterTriggerRestoreHook {
@Override
public String getIdentifier() {
- return this.readerName;
+ return this.hookUid;
}
@Override
@@ -80,13 +81,13 @@ public CompletableFuture triggerCheckpoint(
// (we should change that by adding a shutdown() method to these hooks)
// ths shutdown
- final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+ final ScheduledExecutorService scheduledExecutorService = createScheduledExecutorService();
final CompletableFuture checkpointResult =
this.readerGroup.initiateCheckpoint(checkpointName, scheduledExecutorService);
// Add a timeout to the future, to prevent long blocking calls
- scheduledExecutorService.schedule(() -> checkpointResult.cancel(false), triggerTimeout, TimeUnit.MILLISECONDS);
+ scheduledExecutorService.schedule(() -> checkpointResult.cancel(false), triggerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
// we make sure the executor is shut down after the future completes
checkpointResult.handle((success, failure) -> scheduledExecutorService.shutdownNow());
@@ -113,6 +114,10 @@ public SimpleVersionedSerializer createCheckpointDataSerializer() {
// utils
// ------------------------------------------------------------------------
+ protected ScheduledExecutorService createScheduledExecutorService() {
+ return Executors.newSingleThreadScheduledExecutor();
+ }
+
static long parseCheckpointId(String checkpointName) {
checkArgument(checkpointName.startsWith(PRAVEGA_CHECKPOINT_NAME_PREFIX));
diff --git a/src/main/java/io/pravega/connectors/flink/serialization/JsonRowDeserializationSchema.java b/src/main/java/io/pravega/connectors/flink/serialization/JsonRowDeserializationSchema.java
index 8e977617..a2f7703c 100644
--- a/src/main/java/io/pravega/connectors/flink/serialization/JsonRowDeserializationSchema.java
+++ b/src/main/java/io/pravega/connectors/flink/serialization/JsonRowDeserializationSchema.java
@@ -110,4 +110,10 @@ public void setFailOnMissingField(boolean failOnMissingField) {
this.failOnMissingField = failOnMissingField;
}
+ /**
+ * Gets the failure behavior if a JSON field is missing.
+ */
+ public boolean getFailOnMissingField() {
+ return this.failOnMissingField;
+ }
}
diff --git a/src/main/java/io/pravega/connectors/flink/util/FlinkPravegaParams.java b/src/main/java/io/pravega/connectors/flink/util/FlinkPravegaParams.java
deleted file mode 100644
index 00294e16..00000000
--- a/src/main/java/io/pravega/connectors/flink/util/FlinkPravegaParams.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.pravega.connectors.flink.util;
-
-import com.google.common.collect.Sets;
-import io.pravega.client.admin.StreamManager;
-import io.pravega.client.stream.ScalingPolicy;
-import io.pravega.client.stream.StreamConfiguration;
-import io.pravega.connectors.flink.FlinkPravegaReader;
-import io.pravega.connectors.flink.FlinkPravegaWriter;
-import io.pravega.connectors.flink.PravegaEventRouter;
-import io.pravega.connectors.flink.serialization.PravegaSerialization;
-import java.io.Serializable;
-import java.net.URI;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-
-/**
- * Convenience class for extracting pravega parameters from flink job parameters.
- *
- * As a convention, the pravega controller uri will be passed as a parameter named 'controller'.
- *
- * Operations are provided to create streams, readers and writers based on stream names as
- * parameters.
- *
- * @see StreamId
- */
-public class FlinkPravegaParams {
- public static final String DEFAULT_CONTROLLER_URI = "tcp://127.0.0.1:9090";
- public static final String CONTROLLER_PARAM_NAME = "controller";
-
- private ParameterTool params;
-
- public FlinkPravegaParams(ParameterTool params) {
- this.params = params;
- }
-
- /**
- * Gets the controller URI from the 'controller' job parameter. If this parameter
- * is not specified this defaults to DEFAULT_CONTROLLER_URI.
- */
- public URI getControllerUri() {
- return URI.create(params.get(CONTROLLER_PARAM_NAME, DEFAULT_CONTROLLER_URI));
- }
-
- /**
- * Gets a StreamId from the flink job parameters.
- *
- * @param streamParamName Parameter name that contains the stream
- * @param defaultStreamSpec Default stream in the format [scope]/[stream] if the parameter was not found.
- * @return stream found in the parameter or the default if no parameter is found.
- */
- public StreamId getStreamFromParam(final String streamParamName, final String defaultStreamSpec) {
- return StreamId.fromSpec(params.get(streamParamName, defaultStreamSpec));
- }
-
- /**
- * Constructs a new reader using stream/scope name from job parameters. Uses PravegaSerialization to only require
- * event class type to be specified.
- *
- * @param stream Stream to read from.
- * @param startTime The start time from when to read events from. Use 0 to read all stream events from the beginning.
- * @param eventType Class type for events on this stream.
- * @param Type for events on this stream.
- * @see PravegaSerialization
- */
- public FlinkPravegaReader newReader(final StreamId stream,
- final long startTime,
- final Class eventType) {
- return newReader(stream, startTime, PravegaSerialization.deserializationFor(eventType));
- }
-
- /**
- * Constructs a new reader using stream/scope name from job parameters.
- *
- * @param stream Stream to read from.
- * @param startTime The start time from when to read events from. Use 0 to read all stream events from the beginning.
- * @param deserializationSchema The implementation to deserialize events from pravega streams.
- * @param Type for events on this stream.
- */
- public FlinkPravegaReader newReader(final StreamId stream,
- final long startTime,
- final DeserializationSchema deserializationSchema) {
- return new FlinkPravegaReader<>(getControllerUri(), stream.getScope(), Sets.newHashSet(stream.getName()),
- startTime, deserializationSchema);
- }
-
- /**
- * Constructs a new writer using stream/scope name from job parameters. Uses PravegaSerialization to only require
- * event class type to be specified.
- *
- * @param stream Stream to read from.
- * @param eventType Class type for events on this stream.
- * @param router The implementation to extract the partition key from the event.
- * @param Type for events on this stream.
- * @see PravegaSerialization
- */
- public FlinkPravegaWriter newWriter(final StreamId stream,
- final Class eventType,
- final PravegaEventRouter router) {
- return newWriter(stream, PravegaSerialization.serializationFor(eventType), router);
- }
-
- /**
- * Constructs a new writer using stream/scope name from job parameters.
- *
- * @param stream Stream to read from.
- * @param serializationSchema The implementation for serializing every event into pravega's storage format.
- * @param router The implementation to extract the partition key from the event.
- * @param Type for events on this stream.
- */
- public FlinkPravegaWriter newWriter(final StreamId stream,
- final SerializationSchema serializationSchema,
- final PravegaEventRouter router) {
- return new FlinkPravegaWriter<>(getControllerUri(), stream.getScope(), stream.getName(),
- serializationSchema, router);
- }
-
- /**
- * Ensures a stream is created.
- *
- * @param streamParamName Parameter name that contains the stream
- * @param defaultStreamSpec Default stream in the format [scope]/[stream]
- */
- public StreamId createStreamFromParam(final String streamParamName, final String defaultStreamSpec) {
- StreamId streamId = getStreamFromParam(streamParamName, defaultStreamSpec);
- createStream(streamId);
- return streamId;
- }
-
- public void createStream(final StreamId streamId) {
- createStream(streamId, ScalingPolicy.fixed(1));
- }
-
- public void createStream(final StreamId streamId, final ScalingPolicy scalingPolicy) {
- StreamManager streamManager = StreamManager.create(getControllerUri());
- streamManager.createScope(streamId.getScope());
-
- StreamConfiguration streamConfig = StreamConfiguration.builder().scalingPolicy(scalingPolicy).build();
- streamManager.createStream(streamId.getScope(), streamId.getName(), streamConfig);
- }
-}
diff --git a/src/main/java/io/pravega/connectors/flink/util/FlinkPravegaUtils.java b/src/main/java/io/pravega/connectors/flink/util/FlinkPravegaUtils.java
index d200fd88..7e2f4c65 100644
--- a/src/main/java/io/pravega/connectors/flink/util/FlinkPravegaUtils.java
+++ b/src/main/java/io/pravega/connectors/flink/util/FlinkPravegaUtils.java
@@ -9,6 +9,7 @@
*/
package io.pravega.connectors.flink.util;
+import io.pravega.client.ClientConfig;
import io.pravega.client.ClientFactory;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
@@ -23,7 +24,6 @@
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.api.common.serialization.DeserializationSchema;
-import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.stream.Collectors;
@@ -83,9 +83,9 @@ public static String generateRandomReaderGroupName() {
/**
* Creates a Pravga {@link EventStreamReader}.
*
- * @param scopeName The destination stream's scope name.
- * @param controllerURI The Pravega controller endpoint address.
+ * @param clientConfig The Pravega client configuration.
* @param readerId The id of the Pravega reader.
+ * @param readerGroupScopeName The reader group scope name.
* @param readerGroupName The reader group name.
* @param deserializationSchema The implementation to deserialize events from pravega streams.
* @param readerConfig The reader configuration.
@@ -93,9 +93,9 @@ public static String generateRandomReaderGroupName() {
* @return the create Pravega reader.
*/
public static EventStreamReader createPravegaReader(
- String scopeName,
- URI controllerURI,
+ ClientConfig clientConfig,
String readerId,
+ String readerGroupScopeName,
String readerGroupName,
DeserializationSchema deserializationSchema,
ReaderConfig readerConfig) {
@@ -106,7 +106,7 @@ public static EventStreamReader createPravegaReader(
? ((WrappingSerializer) deserializationSchema).getWrappedSerializer()
: new FlinkDeserializer<>(deserializationSchema);
- return ClientFactory.withScope(scopeName, controllerURI)
+ return ClientFactory.withScope(readerGroupScopeName, clientConfig)
.createReader(readerId, readerGroupName, deserializer, readerConfig);
}
diff --git a/src/main/java/io/pravega/connectors/flink/util/StreamId.java b/src/main/java/io/pravega/connectors/flink/util/StreamId.java
deleted file mode 100644
index bb0e1b4d..00000000
--- a/src/main/java/io/pravega/connectors/flink/util/StreamId.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- */
-package io.pravega.connectors.flink.util;
-
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Objects;
-
-/**
- * Captures the fully qualified name of a stream. The convention to represent this as a
- * single string is using [scope]/[stream].
- */
-public class StreamId {
- public static final char STREAM_SPEC_SEPARATOR = '/';
-
- private String scope;
- private String name;
-
- public StreamId(String scope, String name) {
- this.scope = scope;
- this.name = name;
- }
-
- public String getScope() {
- return scope;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- StreamId streamId = (StreamId) o;
- return Objects.equals(scope, streamId.scope) &&
- Objects.equals(name, streamId.name);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(scope, name);
- }
-
- public String getName() {
- return name;
- }
-
- /**
- * Creates a StreamId from a stream specification (
- * ).
- *
- * @param streamSpec StreamId
- */
- public static StreamId fromSpec(String streamSpec) {
- String[] parts = StringUtils.split(streamSpec, STREAM_SPEC_SEPARATOR);
- if (parts.length != 2) {
- throw new IllegalArgumentException("Stream spec must be in the form [scope]/[stream]");
- }
- return new StreamId(parts[0], parts[1]);
- }
-
- @Override
- public String toString() {
- return scope + STREAM_SPEC_SEPARATOR + name;
- }
-}
diff --git a/src/main/java/io/pravega/connectors/flink/util/StreamWithBoundaries.java b/src/main/java/io/pravega/connectors/flink/util/StreamWithBoundaries.java
new file mode 100644
index 00000000..ca16ef56
--- /dev/null
+++ b/src/main/java/io/pravega/connectors/flink/util/StreamWithBoundaries.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.connectors.flink.util;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import io.pravega.client.stream.Stream;
+import io.pravega.client.stream.StreamCut;
+import lombok.Data;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * A Pravega stream with optional boundaries based on stream cuts.
+ */
+@Data
+@Internal
+public class StreamWithBoundaries implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @SuppressFBWarnings("SE_BAD_FIELD")
+ private final Stream stream;
+ private final StreamCut from;
+ private final StreamCut to;
+
+ public static StreamWithBoundaries of(Stream stream, StreamCut from, StreamCut to) {
+ Preconditions.checkNotNull(stream, "stream");
+ Preconditions.checkNotNull(from, "from");
+ Preconditions.checkNotNull(to, "to");
+ return new StreamWithBoundaries(stream, from, to);
+ }
+}
diff --git a/src/test/java/io/pravega/connectors/flink/EventTimeOrderingOperatorTest.java b/src/test/java/io/pravega/connectors/flink/EventTimeOrderingOperatorTest.java
index 2f934e69..22e054ca 100644
--- a/src/test/java/io/pravega/connectors/flink/EventTimeOrderingOperatorTest.java
+++ b/src/test/java/io/pravega/connectors/flink/EventTimeOrderingOperatorTest.java
@@ -28,6 +28,9 @@
import static org.junit.Assert.assertEquals;
+/**
+ * Unit tests for {@link EventTimeOrderingOperator}.
+ */
@Slf4j
public class EventTimeOrderingOperatorTest {
diff --git a/src/test/java/io/pravega/connectors/flink/FlinkPravegaInputFormatITCase.java b/src/test/java/io/pravega/connectors/flink/FlinkPravegaInputFormatITCase.java
index 7e6f351d..78464635 100644
--- a/src/test/java/io/pravega/connectors/flink/FlinkPravegaInputFormatITCase.java
+++ b/src/test/java/io/pravega/connectors/flink/FlinkPravegaInputFormatITCase.java
@@ -11,6 +11,7 @@
package io.pravega.connectors.flink;
import io.pravega.client.stream.EventStreamWriter;
+import io.pravega.connectors.flink.utils.IntegerDeserializationSchema;
import io.pravega.connectors.flink.utils.SetupUtils;
import io.pravega.connectors.flink.utils.ThrottledIntegerWriter;
import lombok.extern.slf4j.Slf4j;
@@ -21,7 +22,6 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -29,9 +29,6 @@
import org.junit.Test;
import org.junit.rules.Timeout;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -110,11 +107,12 @@ public void testBatchInput() throws Exception {
// simple pipeline that reads from Pravega and collects the events
DataSet integers = env.createInput(
- new FlinkPravegaInputFormat<>(
- SETUP_UTILS.getControllerUri(),
- SETUP_UTILS.getScope(),
- streams,
- new IntDeserializer()),
+ FlinkPravegaInputFormat.builder()
+ .forStream(streamName1)
+ .forStream(streamName2)
+ .withPravegaConfig(SETUP_UTILS.getPravegaConfig())
+ .withDeserializationSchema(new IntegerDeserializationSchema())
+ .build(),
BasicTypeInfo.INT_TYPE_INFO
);
@@ -158,11 +156,11 @@ public void testBatchInputWithFailure() throws Exception {
// simple pipeline that reads from Pravega and collects the events
List integers = env.createInput(
- new FlinkPravegaInputFormat<>(
- SETUP_UTILS.getControllerUri(),
- SETUP_UTILS.getScope(),
- Collections.singleton(streamName),
- new IntDeserializer()),
+ FlinkPravegaInputFormat.builder()
+ .forStream(streamName)
+ .withPravegaConfig(SETUP_UTILS.getPravegaConfig())
+ .withDeserializationSchema(new IntegerDeserializationSchema())
+ .build(),
BasicTypeInfo.INT_TYPE_INFO
).map(new FailOnceMapper(numElements / 2)).collect();
@@ -203,17 +201,4 @@ public Integer map(Integer value) throws Exception {
return value;
}
}
-
- private static class IntDeserializer extends AbstractDeserializationSchema {
-
- @Override
- public Integer deserialize(byte[] message) throws IOException {
- return ByteBuffer.wrap(message).getInt();
- }
-
- @Override
- public boolean isEndOfStream(Integer nextElement) {
- return false;
- }
- }
}
\ No newline at end of file
diff --git a/src/test/java/io/pravega/connectors/flink/FlinkPravegaJsonTableSourceTest.java b/src/test/java/io/pravega/connectors/flink/FlinkPravegaJsonTableSourceTest.java
new file mode 100644
index 00000000..0ed66a31
--- /dev/null
+++ b/src/test/java/io/pravega/connectors/flink/FlinkPravegaJsonTableSourceTest.java
@@ -0,0 +1,53 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.connectors.flink;
+
+import io.pravega.connectors.flink.serialization.JsonRowDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for {@link FlinkPravegaJsonTableSource} and its builder.
+ */
+public class FlinkPravegaJsonTableSourceTest {
+
+ private static final TableSchema SAMPLE_SCHEMA = TableSchema.builder()
+ .field("category", Types.STRING)
+ .field("value", Types.INT)
+ .build();
+
+ @Test
+ public void testReturnType() {
+ FlinkPravegaJsonTableSource source = FlinkPravegaJsonTableSource.builder()
+ .withReaderGroupScope("scope")
+ .forStream("scope/stream")
+ .withSchema(SAMPLE_SCHEMA)
+ .build();
+ TypeInformation expected = new RowTypeInfo(SAMPLE_SCHEMA.getTypes(), SAMPLE_SCHEMA.getColumnNames());
+ assertEquals(expected, source.getReturnType());
+ }
+
+ @Test
+ public void testGetDeserializationSchema() {
+ FlinkPravegaJsonTableSource.Builder builder = new FlinkPravegaJsonTableSource.Builder();
+ builder
+ .withSchema(SAMPLE_SCHEMA)
+ .failOnMissingField(true);
+ JsonRowDeserializationSchema deserializer = builder.getDeserializationSchema();
+ assertTrue(deserializer.getFailOnMissingField());
+ }
+}
diff --git a/src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderITCase.java b/src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderITCase.java
new file mode 100644
index 00000000..dfc4fe49
--- /dev/null
+++ b/src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderITCase.java
@@ -0,0 +1,171 @@
+/**
+ * Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ */
+package io.pravega.connectors.flink;
+
+import io.pravega.client.stream.EventStreamWriter;
+import io.pravega.connectors.flink.utils.FailingMapper;
+import io.pravega.connectors.flink.utils.IntSequenceExactlyOnceValidator;
+import io.pravega.connectors.flink.utils.IntegerDeserializationSchema;
+import io.pravega.connectors.flink.utils.NotifyingMapper;
+import io.pravega.connectors.flink.utils.SetupUtils;
+import io.pravega.connectors.flink.utils.SuccessException;
+import io.pravega.connectors.flink.utils.ThrottledIntegerWriter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration tests for {@link FlinkPravegaReader}.
+ */
+@Slf4j
+public class FlinkPravegaReaderITCase extends StreamingMultipleProgramsTestBase {
+
+ // Number of events to produce into the test stream.
+ private static final int NUM_STREAM_ELEMENTS = 10000;
+
+ // Setup utility.
+ private static final SetupUtils SETUP_UTILS = new SetupUtils();
+
+ //Ensure each test completes within 120 seconds.
+ @Rule
+ public final Timeout globalTimeout = new Timeout(120, TimeUnit.SECONDS);
+
+ @BeforeClass
+ public static void setupPravega() throws Exception {
+ SETUP_UTILS.startAllServices();
+ }
+
+ @AfterClass
+ public static void tearDownPravega() throws Exception {
+ SETUP_UTILS.stopAllServices();
+ }
+
+ @Test
+ public void testOneSourceOneSegment() throws Exception {
+ runTest(1, 1, NUM_STREAM_ELEMENTS);
+ }
+
+ @Test
+ public void testOneSourceMultipleSegments() throws Exception {
+ runTest(1, 4, NUM_STREAM_ELEMENTS);
+ }
+
+ // this test currently does ot work, see https://github.com/pravega/pravega/issues/1152
+ //@Test
+ //public void testMultipleSourcesOneSegment() throws Exception {
+ // runTest(4, 1, NUM_STREAM_ELEMENTS);
+ //}
+
+ @Test
+ public void testMultipleSourcesMultipleSegments() throws Exception {
+ runTest(4, 4, NUM_STREAM_ELEMENTS);
+ }
+
+
+ private static void runTest(
+ final int sourceParallelism,
+ final int numPravegaSegments,
+ final int numElements) throws Exception {
+
+ // set up the stream
+ final String streamName = RandomStringUtils.randomAlphabetic(20);
+ SETUP_UTILS.createTestStream(streamName, numPravegaSegments);
+
+ try (
+ final EventStreamWriter eventWriter = SETUP_UTILS.getIntegerWriter(streamName);
+
+ // create the producer that writes to the stream
+ final ThrottledIntegerWriter producer = new ThrottledIntegerWriter(
+ eventWriter,
+ numElements,
+ numElements / 2, // the latest when a checkpoint must have happened
+ 1 // the initial sleep time per element
+ )
+
+ ) {
+ producer.start();
+
+ // the producer is throttled so that we don't run the (whatever small) risk of pumping
+ // all elements through before completing the first checkpoint (that would make the test senseless)
+
+ // to speed the test up, we un-throttle the producer as soon as the first checkpoint
+ // has gone through. Rather than implementing a complicated observer that polls the status
+ // from Flink, we simply forward the 'checkpoint complete' notification from the user functions
+ // the thr throttler, via a static variable
+ NotifyingMapper.TO_CALL_ON_COMPLETION.set(producer::unthrottle);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.setParallelism(sourceParallelism);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0L));
+
+ // we currently need this to work around the case where tasks are
+ // started too late, a checkpoint was already triggered, and some tasks
+ // never see the checkpoint event
+ env.getCheckpointConfig().setCheckpointTimeout(2000);
+
+ // the Pravega reader
+ final FlinkPravegaReader pravegaSource = FlinkPravegaReader.builder()
+ .forStream(streamName)
+ .withPravegaConfig(SETUP_UTILS.getPravegaConfig())
+ .withDeserializationSchema(new IntegerDeserializationSchema())
+ .build();
+
+ env
+ .addSource(pravegaSource)
+
+ // this mapper throws an exception at 2/3rd of the data stream,
+ // which is strictly after the checkpoint happened (the latest at 1/2 of the stream)
+
+ // to make sure that this is not affected by how fast subtasks of the source
+ // manage to pull data from pravega, we make this task non-parallel
+ .map(new FailingMapper<>(numElements * 2 / 3))
+ .setParallelism(1)
+
+ // hook in the notifying mapper
+ .map(new NotifyingMapper<>())
+ .setParallelism(1)
+
+ // the sink validates that the exactly-once semantics hold
+ // it must be non-parallel so that it sees all elements and can trivially
+ // check for duplicates
+ .addSink(new IntSequenceExactlyOnceValidator(numElements))
+ .setParallelism(1);
+
+ final long executeStart = System.nanoTime();
+
+ // if these calls complete without exception, then the test passes
+ try {
+ env.execute();
+ } catch (Exception e) {
+ if (!(ExceptionUtils.getRootCause(e) instanceof SuccessException)) {
+ throw e;
+ }
+ }
+
+ // this method forwards exception thrown in the data generator thread
+ producer.sync();
+
+ final long executeEnd = System.nanoTime();
+ System.out.println(String.format("Test execution took %d ms", (executeEnd - executeStart) / 1_000_000));
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderSavepointTest.java b/src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderSavepointITCase.java
similarity index 87%
rename from src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderSavepointTest.java
rename to src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderSavepointITCase.java
index bbd9e3a7..9db10b35 100644
--- a/src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderSavepointTest.java
+++ b/src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderSavepointITCase.java
@@ -12,15 +12,13 @@
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.connectors.flink.utils.FlinkMiniClusterWithSavepointCommand;
import io.pravega.connectors.flink.utils.IntSequenceExactlyOnceValidator;
+import io.pravega.connectors.flink.utils.IntegerDeserializationSchema;
import io.pravega.connectors.flink.utils.NotifyingMapper;
import io.pravega.connectors.flink.utils.SetupUtils;
import io.pravega.connectors.flink.utils.SuccessException;
import io.pravega.connectors.flink.utils.ThrottledIntegerWriter;
-
import lombok.extern.slf4j.Slf4j;
-
import org.apache.commons.lang3.RandomStringUtils;
-
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.testutils.CheckedThread;
@@ -30,9 +28,7 @@
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema;
import org.apache.flink.util.TestLogger;
-
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -41,17 +37,15 @@
import org.junit.rules.Timeout;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertNotNull;
/**
- * Automated tests for {@link FlinkPravegaReader}.
+ * Integration tests for {@link FlinkPravegaReader} focused on savepoint integration.
*/
@Slf4j
-public class FlinkPravegaReaderSavepointTest extends TestLogger {
+public class FlinkPravegaReaderSavepointITCase extends TestLogger {
// Number of events to produce into the test stream.
private static final int NUM_STREAM_ELEMENTS = 10000;
@@ -198,13 +192,12 @@ private JobGraph getFlinkJob(
env.setStateBackend(new FsStateBackend(tmpFolder.newFolder().toURI(), 1024 * 1024, false));
// the Pravega reader
- final FlinkPravegaReader pravegaSource = new FlinkPravegaReader<>(
- SETUP_UTILS.getControllerUri(),
- SETUP_UTILS.getScope(),
- Collections.singleton(streamName),
- 0,
- new IntDeserializer(),
- "my_reader_name");
+ final FlinkPravegaReader pravegaSource = FlinkPravegaReader.builder()
+ .forStream(streamName)
+ .withPravegaConfig(SETUP_UTILS.getPravegaConfig())
+ .withDeserializationSchema(new IntegerDeserializationSchema())
+ .uid("my_reader_name")
+ .build();
env
.addSource(pravegaSource)
@@ -221,21 +214,4 @@ private JobGraph getFlinkJob(
return env.getStreamGraph().getJobGraph();
}
-
- // ----------------------------------------------------------------------------
- // utilities
- // ----------------------------------------------------------------------------
-
- private static class IntDeserializer extends AbstractDeserializationSchema {
-
- @Override
- public Integer deserialize(byte[] message) throws IOException {
- return ByteBuffer.wrap(message).getInt();
- }
-
- @Override
- public boolean isEndOfStream(Integer nextElement) {
- return false;
- }
- }
}
\ No newline at end of file
diff --git a/src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderTest.java b/src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderTest.java
index 687c1b77..23dcf9e2 100644
--- a/src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderTest.java
+++ b/src/test/java/io/pravega/connectors/flink/FlinkPravegaReaderTest.java
@@ -9,186 +9,331 @@
*/
package io.pravega.connectors.flink;
-import io.pravega.connectors.flink.utils.FailingMapper;
-import io.pravega.connectors.flink.utils.IntSequenceExactlyOnceValidator;
-import io.pravega.connectors.flink.utils.NotifyingMapper;
-import io.pravega.connectors.flink.utils.SetupUtils;
-import io.pravega.connectors.flink.utils.SuccessException;
-import io.pravega.connectors.flink.utils.ThrottledIntegerWriter;
-import io.pravega.client.stream.EventStreamWriter;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.commons.lang3.RandomStringUtils;
-
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
+import io.pravega.client.ClientConfig;
+import io.pravega.client.admin.ReaderGroupManager;
+import io.pravega.client.segment.impl.Segment;
+import io.pravega.client.stream.EventPointer;
+import io.pravega.client.stream.EventRead;
+import io.pravega.client.stream.EventStreamReader;
+import io.pravega.client.stream.Position;
+import io.pravega.client.stream.ReaderGroup;
+import io.pravega.client.stream.ReaderGroupConfig;
+import io.pravega.client.stream.Sequence;
+import io.pravega.client.stream.Stream;
+import io.pravega.client.stream.StreamCut;
+import io.pravega.client.stream.impl.EventReadImpl;
+import io.pravega.client.stream.impl.StreamCutImpl;
+import io.pravega.connectors.flink.utils.IntegerDeserializationSchema;
+import io.pravega.connectors.flink.utils.StreamSourceOperatorTestHarness;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Test;
-import org.junit.rules.Timeout;
-import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Collections;
-import java.util.concurrent.TimeUnit;
+import java.util.Properties;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
/**
- * Automated tests for {@link FlinkPravegaReader}.
+ * Unit tests for {@link FlinkPravegaReader} and its builder.
*/
-@Slf4j
-public class FlinkPravegaReaderTest extends StreamingMultipleProgramsTestBase {
-
- // Number of events to produce into the test stream.
- private static final int NUM_STREAM_ELEMENTS = 10000;
-
- // Setup utility.
- private static final SetupUtils SETUP_UTILS = new SetupUtils();
-
- //Ensure each test completes within 120 seconds.
- @Rule
- public final Timeout globalTimeout = new Timeout(120, TimeUnit.SECONDS);
-
- @BeforeClass
- public static void setupPravega() throws Exception {
- SETUP_UTILS.startAllServices();
+public class FlinkPravegaReaderTest {
+
+ private static final String SAMPLE_SCOPE = "scope";
+ private static final Stream SAMPLE_STREAM = Stream.of(SAMPLE_SCOPE, "stream");
+ private static final Segment SAMPLE_SEGMENT = new Segment(SAMPLE_SCOPE, SAMPLE_STREAM.getStreamName(), 1);
+ private static final StreamCut SAMPLE_CUT = new StreamCutImpl(SAMPLE_STREAM, Collections.singletonMap(SAMPLE_SEGMENT, 42L));
+ private static final StreamCut SAMPLE_CUT2 = new StreamCutImpl(SAMPLE_STREAM, Collections.singletonMap(SAMPLE_SEGMENT, 1024L));
+
+ private static final Time GROUP_REFRESH_TIME = Time.seconds(10);
+ private static final String GROUP_NAME = "group";
+
+ private static final IntegerDeserializationSchema DESERIALIZATION_SCHEMA = new TestDeserializationSchema();
+ private static final Time READER_TIMEOUT = Time.seconds(1);
+ private static final Time CHKPT_TIMEOUT = Time.seconds(1);
+
+ // region Source Function Tests
+
+ /**
+ * Tests the behavior of {@code initialize()}.
+ */
+ @Test
+ public void testInitialize() {
+ TestableFlinkPravegaReader reader = createReader();
+ reader.initialize();
+ verify(reader.readerGroupManager).createReaderGroup(GROUP_NAME, reader.readerGroupConfig);
}
- @AfterClass
- public static void tearDownPravega() throws Exception {
- SETUP_UTILS.stopAllServices();
+ /**
+ * Tests the behavior of {@code run()}.
+ */
+ @Test
+ public void testRun() throws Exception {
+ TestableFlinkPravegaReader reader = createReader();
+
+ try (StreamSourceOperatorTestHarness> testHarness = createTestHarness(reader, 1, 1, 0)) {
+ testHarness.open();
+
+ // prepare a sequence of events
+ TestEventGenerator evts = new TestEventGenerator<>();
+ when(reader.eventStreamReader.readNextEvent(anyLong()))
+ .thenReturn(evts.event(1))
+ .thenReturn(evts.event(2))
+ .thenReturn(evts.checkpoint(42L))
+ .thenReturn(evts.idle())
+ .thenReturn(evts.event(3))
+ .thenReturn(evts.event(TestDeserializationSchema.END_OF_STREAM));
+
+ // run the source
+ testHarness.run();
+
+ // verify that the event stream was read until the end of stream
+ verify(reader.eventStreamReader, times(6)).readNextEvent(anyLong());
+ Queue