Skip to content

Commit

Permalink
[issue-48] Builder API (#125)
Browse files Browse the repository at this point in the history
* [issue-48] Builder API for FlinkPravegaReader, FlinkPravegaInputFormat, FlinkPravegaTableSource, FlinkPravegaWriter, FlinkPravegaTableSink

- introduced PravegaConfig to capture high-level client configuration
- general stream cut support
- support for environment-provided default Pravega scope

- builder for FlinkPravegaInputFormat

- builder for FlinkPravegaReader
- rename FlinkPravegaReaderTest to FlinkPravegaReaderITCase
- unit test for FlinkPravegaReader

- implement batch table API support
- builder for FlinkPravegaTableSource
- introduce FlinkPravegaJsonTableSource
- treat FlinkPravegaTableSource as an internal class

- deprecated StreamId in favor of io.pravega.client.stream.Stream
- deprecated FlinkPravegaParams in favor of PravegaConfig, builder classes
- moved IntegerDeserializationSchema to utils
- implemented StreamSourceOperatorTestHarness

Signed-off-by: Eron Wright <[email protected]>
  • Loading branch information
EronWright authored May 14, 2018
1 parent fb1b5ca commit a60cfac
Show file tree
Hide file tree
Showing 43 changed files with 2,718 additions and 1,023 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ test {
}

dependencies {
compileOnly group: 'com.google.code.findbugs', name: 'annotations', version: findbugsVersion

compile(group: 'io.pravega', name: 'pravega-client', version: pravegaVersion) {
exclude group: 'org.slf4j', module: 'slf4j-api'

Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<allow pkg="lombok"/>
<allow pkg="com.fasterxml.jackson"/>
<allow pkg="org.apache"/>
<allow pkg="edu.umd.cs.findbugs"/>

<!-- flink dependencies -->
<allow pkg="scala"/>
Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ shadowGradlePlugin=2.0.3
slf4jApiVersion=1.7.25
junitVersion=4.12
mockitoVersion=1.10.19
findbugsVersion=3.0.1

# Version and base tags can be overridden at build time.
connectorVersion=0.3.0-SNAPSHOT
Expand Down
2 changes: 1 addition & 1 deletion pravega
Submodule pravega updated 313 files
169 changes: 169 additions & 0 deletions src/main/java/io/pravega/connectors/flink/AbstractReaderBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/**
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.connectors.flink;

import io.pravega.client.stream.Stream;
import io.pravega.client.stream.StreamCut;
import io.pravega.connectors.flink.util.StreamWithBoundaries;
import lombok.Data;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* A base builder for connectors that consume a Pravega stream.
*
* @param <B> the builder class.
*/
public abstract class AbstractReaderBuilder<B extends AbstractReaderBuilder> implements Serializable {

private static final long serialVersionUID = 1L;

private final List<StreamSpec> streams;

private PravegaConfig pravegaConfig;

protected AbstractReaderBuilder() {
this.streams = new ArrayList<>(1);
this.pravegaConfig = PravegaConfig.fromDefaults();
}

/**
* Set the Pravega client configuration, which includes connection info, security info, and a default scope.
*
* The default client configuration is obtained from {@code PravegaConfig.fromDefaults()}.
*
* @param pravegaConfig the configuration to use.
*/
public B withPravegaConfig(PravegaConfig pravegaConfig) {
this.pravegaConfig = pravegaConfig;
return builder();
}

/**
* Add a stream to be read by the source, from the given start position in the stream.
*
* @param streamSpec the unqualified or qualified name of the stream.
* @param startStreamCut Start {@link StreamCut}
* @return A builder to configure and create a reader.
*/
public B forStream(final String streamSpec, final StreamCut startStreamCut) {
return forStream(streamSpec, startStreamCut, StreamCut.UNBOUNDED);
}

/**
* Add a stream to be read by the source, from the given start position in the stream.
*
* @param streamSpec the unqualified or qualified name of the stream.
* @param startStreamCut Start {@link StreamCut}
* @param endStreamCut End {@link StreamCut}
* @return A builder to configure and create a reader.
*/
public B forStream(final String streamSpec, final StreamCut startStreamCut, final StreamCut endStreamCut) {
streams.add(StreamSpec.of(streamSpec, startStreamCut, endStreamCut));
return builder();
}

/**
* Add a stream to be read by the source, from the earliest available position in the stream.
*
* @param streamSpec the unqualified or qualified name of the stream.
* @return A builder to configure and create a reader.
*/
public B forStream(final String streamSpec) {
return forStream(streamSpec, StreamCut.UNBOUNDED);
}

/**
* Add a stream to be read by the source, from the given start position in the stream.
*
* @param stream Stream.
* @param startStreamCut Start {@link StreamCut}
* @return A builder to configure and create a reader.
*/
public B forStream(final Stream stream, final StreamCut startStreamCut) {
return forStream(stream, startStreamCut, StreamCut.UNBOUNDED);
}

/**
* Add a stream to be read by the source, from the given start position in the stream to the given end position.
*
* @param stream Stream.
* @param startStreamCut Start {@link StreamCut}
* @param endStreamCut End {@link StreamCut}
* @return A builder to configure and create a reader.
*/
public B forStream(final Stream stream, final StreamCut startStreamCut, final StreamCut endStreamCut) {
streams.add(StreamSpec.of(stream, startStreamCut, endStreamCut));
return builder();
}

/**
* Add a stream to be read by the source, from the earliest available position in the stream.
*
* @param stream Stream.
* @return A builder to configure and create a reader.
*/
public B forStream(final Stream stream) {
return forStream(stream, StreamCut.UNBOUNDED);
}

/**
* Gets the Pravega configuration.
*/
protected PravegaConfig getPravegaConfig() {
Preconditions.checkState(pravegaConfig != null, "A Pravega configuration must be supplied.");
return pravegaConfig;
}

/**
* Resolves the streams to be provided to the reader, based on the configured default scope.
*/
protected List<StreamWithBoundaries> resolveStreams() {
Preconditions.checkState(!streams.isEmpty(), "At least one stream must be supplied.");
PravegaConfig pravegaConfig = getPravegaConfig();
return streams.stream()
.map(s -> StreamWithBoundaries.of(pravegaConfig.resolve(s.streamSpec), s.from, s.to))
.collect(Collectors.toList());
}

protected abstract B builder();

/**
* A Pravega stream with optional boundaries based on stream cuts.
*/
@Data
private static class StreamSpec implements Serializable {

private static final long serialVersionUID = 1L;

private final String streamSpec;
private final StreamCut from;
private final StreamCut to;

public static StreamSpec of(String streamSpec, StreamCut from, StreamCut to) {
Preconditions.checkNotNull(streamSpec, "streamSpec");
Preconditions.checkNotNull(streamSpec, "from");
Preconditions.checkNotNull(streamSpec, "to");
return new StreamSpec(streamSpec, from, to);
}

public static StreamSpec of(Stream stream, StreamCut from, StreamCut to) {
Preconditions.checkNotNull(stream, "stream");
Preconditions.checkNotNull(stream, "from");
Preconditions.checkNotNull(stream, "to");
return new StreamSpec(stream.getScopedName(), from, to);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/**
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.connectors.flink;

import com.google.common.base.Preconditions;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.connectors.flink.util.FlinkPravegaUtils;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.time.Time;

import java.util.Optional;

/**
* An abstract streaming reader builder.
*
* The builder is abstracted to act as the base for both the {@link FlinkPravegaReader} and {@link FlinkPravegaTableSource} builders.
*
* @param <T> the element type.
* @param <B> the builder type.
*/
abstract class AbstractStreamingReaderBuilder<T, B extends AbstractStreamingReaderBuilder> extends AbstractReaderBuilder<B> {

private static final Time DEFAULT_EVENT_READ_TIMEOUT = Time.seconds(1);
private static final Time DEFAULT_CHECKPOINT_INITIATE_TIMEOUT = Time.seconds(5);

protected String uid;
protected String readerGroupScope;
protected String readerGroupName;
protected Time readerGroupRefreshTime;
protected Time checkpointInitiateTimeout;
protected Time eventReadTimeout;

protected AbstractStreamingReaderBuilder() {
this.checkpointInitiateTimeout = DEFAULT_CHECKPOINT_INITIATE_TIMEOUT;
this.eventReadTimeout = DEFAULT_EVENT_READ_TIMEOUT;
}

/**
* Configures the source uid to identify the checkpoint state of this source.
* <p>
* The default value is generated based on other inputs.
*
* @param uid the uid to use.
*/
public B uid(String uid) {
this.uid = uid;
return builder();
}

/**
* Configures the reader group scope for synchronization purposes.
* <p>
* The default value is taken from the {@link PravegaConfig} {@code defaultScope} property.
*
* @param scope the scope name.
*/
public B withReaderGroupScope(String scope) {
this.readerGroupScope = Preconditions.checkNotNull(scope);
return builder();
}

/**
* Configures the reader group name.
*
* @param readerGroupName the reader group name.
*/
public B withReaderGroupName(String readerGroupName) {
this.readerGroupName = Preconditions.checkNotNull(readerGroupName);
return builder();
}

/**
* Sets the group refresh time, with a default of 1 second.
*
* @param groupRefreshTime The group refresh time
*/
public B withReaderGroupRefreshTime(Time groupRefreshTime) {
this.readerGroupRefreshTime = groupRefreshTime;
return builder();
}

/**
* Sets the timeout for initiating a checkpoint in Pravega.
*
* @param checkpointInitiateTimeout The timeout
*/
public B withCheckpointInitiateTimeout(Time checkpointInitiateTimeout) {
Preconditions.checkArgument(checkpointInitiateTimeout.getSize() > 0, "timeout must be > 0");
this.checkpointInitiateTimeout = checkpointInitiateTimeout;
return builder();
}

/**
* Sets the timeout for the call to read events from Pravega. After the timeout
* expires (without an event being returned), another call will be made.
*
* @param eventReadTimeout The timeout
*/
public B withEventReadTimeout(Time eventReadTimeout) {
Preconditions.checkArgument(eventReadTimeout.getSize() > 0, "timeout must be > 0");
this.eventReadTimeout = eventReadTimeout;
return builder();
}

protected abstract DeserializationSchema<T> getDeserializationSchema();

/**
* Builds a {@link FlinkPravegaReader} based on the configuration.
*
* Note that the {@link FlinkPravegaTableSource} supports both the batch and streaming API, and so creates both
* a source function and an input format and then uses one or the other.
*
* Be sure to call {@code initialize()} before returning the reader to user code.
*
* @throws IllegalStateException if the configuration is invalid.
* @return an uninitiailized reader as a source function.
*/
FlinkPravegaReader<T> buildSourceFunction() {

// rgConfig
ReaderGroupConfig.ReaderGroupConfigBuilder rgConfigBuilder = ReaderGroupConfig
.builder()
.disableAutomaticCheckpoints();
if (this.readerGroupRefreshTime != null) {
rgConfigBuilder.groupRefreshTimeMillis(this.readerGroupRefreshTime.toMilliseconds());
}
resolveStreams().forEach(s -> rgConfigBuilder.stream(s.getStream(), s.getFrom(), s.getTo()));
final ReaderGroupConfig rgConfig = rgConfigBuilder.build();

// rgScope
final String rgScope = Optional.ofNullable(this.readerGroupScope).orElseGet(() -> {
Preconditions.checkState(getPravegaConfig().getDefaultScope() != null, "A reader group scope or default scope must be configured");
return getPravegaConfig().getDefaultScope();
});

// rgName
final String rgName = Optional.ofNullable(this.readerGroupName).orElseGet(FlinkPravegaUtils::generateRandomReaderGroupName);

return new FlinkPravegaReader<>(
Optional.ofNullable(this.uid).orElseGet(this::generateUid),
getPravegaConfig().getClientConfig(),
rgConfig,
rgScope,
rgName,
getDeserializationSchema(),
this.eventReadTimeout,
this.checkpointInitiateTimeout);
}

/**
* Generate a UID for the source, to distinguish the state associated with the checkpoint hook. A good generated UID will:
* 1. be stable across savepoints for the same inputs
* 2. disambiguate one source from another (e.g. in a program that uses numerous instances of {@link FlinkPravegaReader})
* 3. allow for reconfiguration of the timeouts
*/
String generateUid() {
StringBuilder sb = new StringBuilder();
sb.append(readerGroupScope).append('\n');
resolveStreams().forEach(s -> sb
.append(s.getStream().getScopedName())
.append('/').append(s.getFrom().hashCode())
.append('/').append(s.getTo().hashCode())
.append('\n'));
return Integer.toString(sb.toString().hashCode());
}
}
Loading

0 comments on commit a60cfac

Please sign in to comment.