Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[issue-48] Builder API #125

Merged
merged 12 commits into from
May 14, 2018
Merged

[issue-48] Builder API #125

merged 12 commits into from
May 14, 2018

Conversation

EronWright
Copy link
Contributor

@EronWright EronWright commented Apr 28, 2018

Signed-off-by: Eron Wright [email protected]

Change log description

  • introduced PravegaConfig to capture high-level client configuration

  • general stream cut support

  • support for environment-provided default Pravega scope

  • builder for FlinkPravegaInputFormat

  • builder for FlinkPravegaReader/FlinkPravegaWriter

  • rename FlinkPravegaReaderTest to FlinkPravegaReaderITCase

  • unit test for FlinkPravegaReader/FlinkPravegaWriter

  • implement batch table API support

  • builder for FlinkPravegaTableSource/FlinkPravegaTableSink

  • introduce FlinkPravegaJsonTableSource/FlinkPravegaJsonTableSink

  • treat FlinkPravegaTableSource/FlinkPravegaTableSink as an internal class

  • introduce FlinkPravegaTableSinkTest

  • removed StreamId in favor of io.pravega.client.stream.Stream

  • removed FlinkPravegaParams in favor of PravegaConfig, builder classes

  • moved IntegerDeserializationSchema to utils

  • implemented StreamSourceOperatorTestHarness

Purpose of the change
Closes #48 (builder API), #68 (reader unit test), and #62 (TableSource batch API)

What the code does
This PR introduces a new builder API for the stream, batch, and table connectors, and (trivially) completes the table batch API integration. The builder API expands the capabilities of the connector to include support for credentials and stream cuts across all connectors. It also supports environment variables/system properties for configuring the controller URI and the default scope.

Note this is a breaking change.

How to verify it
New and revised unit tests, notably:

  • FlinkPravegaReaderTest
  • FlinkPravegaWriterTest
  • PravegaConfigTest
  • FlinkPravegaTableSourceTest
  • FlinkPravegaTableSinkTest

Documentation (Draft)

Here's some details about the proposed API. Broadly speaking, there's an API for Pravega context (e.g. controller URI, credentials), an API for the various types of readers, and an API for the various types of writers.

For readers, we're providing a builder() for FlinkPravegaReader (streaming), FlinkPravegaInputFormat (batch), and FlinkPravegaTableSource (unified), in lieu of the constructors provided today on those classes. The builders have an inheritance hierarchy, to unify the common elements.

Connecting to Pravega

PravegaConfig

A top-level config object, PravegaConfig, is provided to establish a Pravega context. It synthesizes the context from environment variables, system properties, command-line arguments, and explicit settings.

Note the configuration of a default scope, which allows unqualified stream names to be used later. An environment variable is also supported - PRAVEGA_SCOPE - to make it easy for Nautilus to apply a default scope per project.

// minimal
PravegaConfig config = PravegaConfig.fromDefaults();

// with command-line parameters parsed by Flink's ParameterTool class:
ParameterTool params = ParameterTool.fromArgs(args);
PravegaConfig config = PravegaConfig.fromParams(params);

// explicit settings
PravegaConfig config = PravegaConfig.fromDefaults()
    .withControllerURI("tcp://...")
    .withDefaultScope("scope-1")
    .withCredentials(credentials)
    .withHostnameValidation(false);

Reading a Stream

Reader-Builder (Common)

All the reader-builders across batch, streaming, and table extend the AbstractReaderBuilder, which focuses on which streams will be read (with optional cuts). Unqualified stream names are resolved against the PravegaConfig.

AbstractReaderBuilder.builder()
    .forStream("stream-1")    // unbounded
    .forStream("stream-1", fromCut)    // bounded
    .forStream("scope-1/stream-1")    // qualified stream name
    .forStream(Stream.of("scope-1/stream-1"))   // typed stream identifier
    .withPravegaConfig(...)    // use a config from above (default: PravegaConfig.fromDefaults())
    .build();

Batch API

Create a FlinkPravegaInputFormat for the Batch API:

FlinkPravegaInputFormat.<Integer> inputFormat = FlinkPravegaInputFormat.<Integer>builder()
    .forStream(...)
    .withPravegaConfig(config)
    .withDeserializationSchema(...)
    .build();

Streaming API

The streaming reader (based on a Pravega reader group and a Flink checkpoint hook) is actually used in both the DataStream API and the Table API. To support this, another abstract reader-builder, AbstractStreamingReaderBuilder, is introduced that extends AbstractReaderBuilder:

AbstractStreamingReaderBuilder.builder()
    .uid(...)   // hook state identifier (default: generated)
    .withReaderGroupScope(...)   // override of RG scope (default: PravegaConfig's default scope)
    .withReaderGroupName(...)   // RG stream name
    .withReaderGroupRefreshTime(...)  
    .withCheckpointInitiateTimeout(...) 
    .withEventReadTimeout(...)

Each concrete reader has a builder that extends the appropriate abstract builder.

Create a FlinkPravegaReader for the DataStream API:

FlinkPravegaReader<Integer> pravegaSource = FlinkPravegaReader.<Integer>builder()
    .forStream(...)
    .withPravegaConfig(config)
    .withDeserializationSchema(...)
    .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(pravegaSource);

Table API

The Table API provides a unified API for both the Flink streaming and batch environment. As such, a TableSource implements StreamTableSource and BatchTableSource. Our builder's API must handle both scenarios.

The table API is oriented around Flink's TableSchema classes which describe the columns. Concrete subclasses of FlinkPravegaTableSource must be provided to parse a serialized event as an instance of that schema. We provide JSON support out-of-box with FlinkPravegaJsonTableSource.

Create a FlinkPravegaTableSource for the Table API:

TableSchema schema = ...;
FlinkPravegaTableSource source = FlinkPravegaJsonTableSource.builder()
    .forStream(...)
    .withPravegaConfig(config)
    .withSchema(...)
    .build();

ExecutionEnvironment batchEnv = ...;
// OR
StreamingExecutionEnvironment streamEnv = ...;

Writing a Stream

Writer-Builder (Common)

All the writer-builders share a base builder, which focuses on which stream will be written to.

AbstractWriterBuilder.builder()
    .forStream(...)   // unqualified, qualified, or typed stream identifier
    .withPravegaConfig(config)

Streaming API

The streaming writer is used for both the DataStream API and the Table API. To support this, another abstract writer-builder, AbstractStreamingWriterBuilder, is introduced that extends AbstractWriterBuilder to cover writer semantics and timeouts:

AbstractStreamingWriterBuilder.builder()
    .withWriterMode(AT_LEAST_ONCE | EXACTLY_ONCE)
    .withTxnTimeout(Time.seconds(30))
    .withTxnGracePeriod(Time.seconds(30))

For the DataStream API, construct a concrete sink function, FlinkPravegaWriter:

FlinkPravegaWriter<Integer> pravegaSink = FlinkPravegaWriter.<Integer>builder()
   .forStream(...)
   .withPravegaConfig(...)
   .withWriterMode(EXACTLY_ONCE)
   .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> stream = ...
stream.addSink(pravegaSink);

Table API

Note: the Pravega table sink works only in the streaming environment at this time.

Create a FlinkPravegaTableSink for the Table API using JSON format:

FlinkPravegaTableSink pravegaSink = FlinkPravegaJsonTableSink.builder()
    .forStream(...)
    .withPravegaConfig(...)
    .withRoutingKeyField("field")
    .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
Table table = ...
table.writeToSink(pravegaSink);

…t, FlinkPravegaTableSource

- introduced PravegaConfig to capture high-level client configuration
- general stream cut support
- support for environment-provided default Pravega scope

- builder for FlinkPravegaInputFormat

- builder for FlinkPravegaReader
- rename FlinkPravegaReaderTest to FlinkPravegaReaderITCase
- unit test for FlinkPravegaReader

- implement batch table API support
- builder for FlinkPravegaTableSource
- introduce FlinkPravegaJsonTableSource
- treat FlinkPravegaTableSource as an internal class

- deprecated StreamId in favor of io.pravega.client.stream.Stream
- deprecated FlinkPravegaParams in favor of PravegaConfig, builder classes
- moved IntegerDeserializationSchema to utils
- implemented StreamSourceOperatorTestHarness

Signed-off-by: Eron Wright <[email protected]>
@EronWright EronWright changed the title [issue-48] Builder API [issue-48] Builder API (WIP) Apr 28, 2018
- remove FlinkPravegaParams in favor of PravegaConfig
- remove StreamId

Signed-off-by: Eron Wright <[email protected]>
@EronWright EronWright requested review from tzulitai and vijikarthi May 1, 2018 22:50
- implement builders for sinks - FlinkPravegaTabkeSink, FlinkPravegaWriter

Signed-off-by: Eron Wright <[email protected]>
@EronWright EronWright changed the title [issue-48] Builder API (WIP) [issue-48] Builder API May 3, 2018
- update pravega submodule (w/ client-side serialization changes)

Signed-off-by: Eron Wright <[email protected]>
@fpj
Copy link
Contributor

fpj commented May 8, 2018

There is an unused import that is causing checkstyle to fail:

:checkstyleMain[ant:checkstyle] [ERROR] /home/travis/build/pravega/flink-
connectors/src/main/java/io/pravega/connectors/flink/CheckpointSerializer.java:13:8: Unused import - 
org.apache.commons.lang3.SerializationUtils. [UnusedImports]

EronWright added 2 commits May 8, 2018 11:40
Signed-off-by: Eron Wright <[email protected]>
- suppress findbugs warnings due to Pravega issue

Signed-off-by: Eron Wright <[email protected]>
public static StreamWithBoundaries of(Stream stream, StreamCut from, StreamCut to) {
Preconditions.checkNotNull(stream, "stream");
Preconditions.checkNotNull(stream, "from");
Preconditions.checkNotNull(stream, "to");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

36:37 - please change the variable name stream to from and to

if (this.readerGroupRefreshTime != null) {
rgConfigBuilder.groupRefreshTimeMillis(this.readerGroupRefreshTime.toMilliseconds());
}
resolveStreams().forEach(s -> rgConfigBuilder.stream(s.getStream(), s.getFrom() /*, s.getTo() */));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the to stream cut commented and left to default UNBOUNDED?

import org.apache.flink.types.Row;
import org.junit.Test;

import static org.junit.Assert.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please replace the bulk import statement with specific package imports.

@vijikarthi
Copy link
Contributor

Since this patch introduces breaking API changes, we need to fix the connector dependent projects (nautilus-samples, pravega-samples etc.,) before merging this code.

EronWright added 5 commits May 8, 2018 17:11
- workaround for pravega/pravega#2519

Signed-off-by: Eron Wright <[email protected]>
- update Pravega submodule

Signed-off-by: Eron Wright <[email protected]>
Signed-off-by: Eron Wright <[email protected]>
- support for end cut

Signed-off-by: Eron Wright <[email protected]>
- cleanup import statements

Signed-off-by: Eron Wright <[email protected]>
@EronWright
Copy link
Contributor Author

@vijikarthi OK this is ready for another review.

Copy link
Contributor

@fpj fpj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice work, @EronWright . I have left a few comments for your consideration.

private final StreamCut from;
private final StreamCut to;

public static StreamSpec of(String streamSpec, StreamCut from, StreamCut to) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: the name of the first variable is a bit confusing. The name of the class is StreamSpec and it contains a stream name + frm/to cuts. Consequently, the streamSpec string isn't a full stream spec according to the class definition. Perhaps call the variable streamName?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the main idea here is that the 'streamSpec' is an unprocessed user input that will be parsed later (out of necessity). I avoided 'streamName' because I don't want anyone to treat the value as an actual stream name. Also it is a private class.

@@ -32,15 +32,17 @@ public int getVersion() {

@Override
public byte[] serialize(Checkpoint checkpoint) throws IOException {
return SerializationUtils.serialize(checkpoint);
ByteBuffer buf = checkpoint.toBytes();
byte[] b = new byte[buf.remaining()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be flipping the byte buffer instead of doing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the way to convert an opaque ByteBuffer to a byte[]. I would argue that toBytes should return a byte[].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this is expecting the buffer to be in the right position, makes sense.

As for checkpoint.toBytes() returning a ByteBuffer, I agree that typically it returns a byte[]. We should consider changing it in Pravega.

@@ -183,4 +170,45 @@ public T nextRecord(T t) throws IOException {
public void close() throws IOException {
this.segmentIterator.close();
}

/**
* Gets a builder {@link FlinkPravegaInputFormat} to read Pravega streams using the Flink batch API.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is not consistently adding a new line between description and first @param. Having the extra line is good for readability.

// ----- configuration fields -----

// The supplied event deserializer.
private final DeserializationSchema<T> deserializationSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these variables not private any longer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the sake of the new unit test that makes assertions against them.

return eventReadTimeout;
void initialize() {
// TODO: This will require the client to have access to the pravega controller and handle any temporary errors.
// See https://github.com/pravega/pravega/issues/553.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is actually closed. If this is still a concern, then we need an issue against this repository.

I'm wondering if this is an actual concern, though. On the one hand, if the controller is not accessible when this environment is being constructed, then it is a good idea to fail early. On the other hand, this is just setting up configuration for submission, and creating a reader group is really part of runtime.

It also depends on when this initialize call is actually invoked. If at job submission or job execution time, then we are fine and we don't need another issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will be attempting to solving this in #89. Note that this block of code is simply moving around. So, let it wait a bit OK?

As for the rationale, in Flink it is considered a best practice to defer source/sink communication until the job executes. I did some prep for this in the hook enhancement that was merged recently in Flink 1.5. It is thought that the init code can move to where the hook is instantiated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine, but let's at least replace the issue from the pravega queue, which is currently closed, with one in the flink-connector queue. I have created this one:

#130

* @param <T> the element type.
* @param <B> the builder type.
*/
public abstract static class AbstractStreamingWriterBuilder<T, B extends AbstractStreamingWriterBuilder> extends AbstractWriterBuilder<B> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question about being in a separate file.

* Captures the fully qualified name of a stream. The convention to represent this as a
* single string is using [scope]/[stream].
*/
public class StreamId {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description says that this is being deprecated, but the class is actually being removed rather than marked as deprecated.


//Ensure each test completes within 120 seconds.
@Rule
public final Timeout globalTimeout = new Timeout(120, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like these test cases have been left timeout-less.

Copy link
Contributor Author

@EronWright EronWright May 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unit tests don't wait for anything in an unbounded way that would necessitate a timeout. I'll review whether any of integration tests (which I agree should use a timeout) are lacking a timeout.
Update: reviewed, all integration tests have a timeout.

@@ -146,6 +155,63 @@ public void testEndToEnd() throws Exception {
}
}


/**
* Tests the end-to-end functionality of a batch table source & sink.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

& -> and

* @throws Exception on exception
*/
@Test
@Ignore("[issue-124] FlinkPravegaTableSink doesn't support BatchTableSink")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of ignoring the test case, should we remove and add it back once 124 is done? It can go in the same PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer we leave it, since these integration tests are illustrative and we don't have any other samples. It will be fixed soon in #124

protected ReaderGroup createReaderGroup() {
ReaderGroupManager readerGroupManager = createReaderGroupManager();
readerGroupManager.createReaderGroup(this.readerGroupName, readerGroupConfig);
return readerGroupManager.getReaderGroup(this.readerGroupName);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should invoke readerGroupManager.close() (since closing the connectionFactory does not close the connection to controller)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point. The reader group object needs to be closed after this merge:

https://github.com/pravega/pravega/pull/2383/files#diff-67be2af2da7593612509f695490a147bR150

Looking at the checkpoint hook, it is not clear to me where it needs to happen, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fpj
fpj previously requested changes May 11, 2018
@@ -32,15 +32,17 @@ public int getVersion() {

@Override
public byte[] serialize(Checkpoint checkpoint) throws IOException {
return SerializationUtils.serialize(checkpoint);
ByteBuffer buf = checkpoint.toBytes();
byte[] b = new byte[buf.remaining()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this is expecting the buffer to be in the right position, makes sense.

As for checkpoint.toBytes() returning a ByteBuffer, I agree that typically it returns a byte[]. We should consider changing it in Pravega.

return eventReadTimeout;
void initialize() {
// TODO: This will require the client to have access to the pravega controller and handle any temporary errors.
// See https://github.com/pravega/pravega/issues/553.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine, but let's at least replace the issue from the pravega queue, which is currently closed, with one in the flink-connector queue. I have created this one:

#130

protected ReaderGroup createReaderGroup() {
ReaderGroupManager readerGroupManager = createReaderGroupManager();
readerGroupManager.createReaderGroup(this.readerGroupName, readerGroupConfig);
return readerGroupManager.getReaderGroup(this.readerGroupName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point. The reader group object needs to be closed after this merge:

https://github.com/pravega/pravega/pull/2383/files#diff-67be2af2da7593612509f695490a147bR150

Looking at the checkpoint hook, it is not clear to me where it needs to happen, though.

- move abstract builders to separate files
- update reference to issue pertaining to client-to-controller communication
- add whitespace in comments

Signed-off-by: Eron Wright <[email protected]>
@EronWright
Copy link
Contributor Author

@fpj and @shrids I think I've addressed all feedback.

@EronWright EronWright dismissed fpj’s stale review May 14, 2018 18:54

All feedback addressed

@EronWright EronWright merged commit a60cfac into master May 14, 2018
@EronWright EronWright deleted the issue-48-builder branch May 31, 2019 17:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Use a builder for creating Pravega Reader/Writer
4 participants