Skip to content

Commit

Permalink
Merge pull request pravega#4 from pravega/master
Browse files Browse the repository at this point in the history
Update
  • Loading branch information
Guangfeng-Xu authored Sep 28, 2020
2 parents 42f7369 + 947ba2b commit b790f9c
Show file tree
Hide file tree
Showing 15 changed files with 221 additions and 199 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jacocoVersion=0.8.2

# Version and base tags can be overridden at build time.
connectorVersion=0.8.0-SNAPSHOT
pravegaVersion=0.8.0-2584.c40c6b5-SNAPSHOT
pravegaVersion=0.8.0-2623.279ac21-SNAPSHOT
apacheCommonsVersion=3.7

# flag to indicate if Pravega sub-module should be used instead of the version defined in 'pravegaVersion'
Expand Down
2 changes: 1 addition & 1 deletion pravega
Submodule pravega updated 451 files

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

package io.pravega.connectors.flink;

import org.apache.flink.table.factories.BatchTableSinkFactory;
import org.apache.flink.table.factories.BatchTableSourceFactory;
import org.apache.flink.table.sinks.BatchTableSink;
import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.types.Row;

Expand All @@ -22,7 +24,8 @@
/**
* A batch table source factory implementation of {@link BatchTableSourceFactory} to access Pravega streams.
*/
public class FlinkPravegaBatchTableSourceFactory extends FlinkPravegaTableFactoryBase implements BatchTableSourceFactory<Row> {
public class FlinkPravegaBatchTableSourceSinkFactory extends FlinkPravegaTableFactoryBase implements
BatchTableSourceFactory<Row>, BatchTableSinkFactory<Row> {

@Override
public Map<String, String> requiredContext() {
Expand All @@ -39,6 +42,11 @@ public BatchTableSource<Row> createBatchTableSource(Map<String, String> properti
return createFlinkPravegaTableSource(properties);
}

@Override
public BatchTableSink<Row> createBatchTableSink(Map<String, String> properties) {
return createFlinkPravegaTableSink(properties);
}

@Override
protected String getVersion() {
return String.valueOf(CONNECTOR_VERSION_VALUE);
Expand Down
55 changes: 49 additions & 6 deletions src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.connectors.flink;

import io.pravega.client.EventStreamClientFactory;
import org.apache.flink.util.Preconditions;
import io.pravega.client.ClientConfig;
import io.pravega.client.admin.ReaderGroupManager;
Expand Down Expand Up @@ -89,6 +90,14 @@ public class FlinkPravegaReader<T>

private static final long serialVersionUID = 1L;

// ----- runtime fields -----

// Pravega Event Stream Client Factory (NOTE: MUST be closed when reader closed)
protected transient EventStreamClientFactory eventStreamClientFactory;

// Pravega Reader Group Manager (NOTE: MUST be closed when reader closed)
protected transient ReaderGroupManager readerGroupManager = null;

// ----- configuration fields -----

// the uuid of the checkpoint hook, used to store state and resume existing state from savepoints
Expand Down Expand Up @@ -181,9 +190,12 @@ protected FlinkPravegaReader(String hookUid, ClientConfig clientConfig,
* Initializes the reader.
*/
void initialize() {
createEventStreamClientFactory();

// TODO: This will require the client to have access to the pravega controller and handle any temporary errors.
// See https://github.com/pravega/flink-connectors/issues/130.
log.info("Creating reader group: {}/{} for the Flink job", this.readerGroupScope, this.readerGroupName);
createReaderGroupManager();
createReaderGroup();
if (isEventTimeMode()) {
Preconditions.checkArgument(readerGroup.getStreamNames().size() == 1,
Expand Down Expand Up @@ -333,6 +345,8 @@ public TypeInformation<T> getProducedType() {

@Override
public void open(Configuration parameters) throws Exception {
createEventStreamClientFactory();
createReaderGroupManager();
createReaderGroup();
if (enableMetrics) {
registerMetrics();
Expand All @@ -346,7 +360,18 @@ public void open(Configuration parameters) throws Exception {

@Override
public void close() throws Exception {
if (eventStreamClientFactory != null) {
log.info("Closing Pravega eventStreamClientFactory");
eventStreamClientFactory.close();
}

if (readerGroupManager != null) {
log.info("Closing Pravega ReaderGroupManager");
readerGroupManager.close();
}

if (readerGroup != null) {
log.info("Closing Pravega ReaderGroup");
readerGroup.close();
}
}
Expand All @@ -357,7 +382,12 @@ public void close() throws Exception {

@Override
public MasterTriggerRestoreHook<Checkpoint> createMasterTriggerRestoreHook() {
return new ReaderCheckpointHook(this.hookUid, createReaderGroup(), this.checkpointInitiateTimeout, this.readerGroupConfig);
return new ReaderCheckpointHook(this.hookUid,
this.readerGroupName,
this.readerGroupScope,
this.checkpointInitiateTimeout,
this.clientConfig,
this.readerGroupConfig);
}

@Override
Expand Down Expand Up @@ -542,7 +572,6 @@ private void registerMetrics() {
* Create the {@link ReaderGroup} for the current configuration.
*/
private ReaderGroup createReaderGroup() {
ReaderGroupManager readerGroupManager = createReaderGroupManager();
readerGroupManager.createReaderGroup(this.readerGroupName, readerGroupConfig);
readerGroup = readerGroupManager.getReaderGroup(this.readerGroupName);
return readerGroup;
Expand All @@ -552,7 +581,22 @@ private ReaderGroup createReaderGroup() {
* Create the {@link ReaderGroupManager} for the current configuration.
*/
protected ReaderGroupManager createReaderGroupManager() {
return ReaderGroupManager.withScope(readerGroupScope, clientConfig);
if (readerGroupManager == null) {
readerGroupManager = ReaderGroupManager.withScope(readerGroupScope, clientConfig);
}

return readerGroupManager;
}

/**
* Create the {@link EventStreamClientFactory} for the current configuration.
*/
protected EventStreamClientFactory createEventStreamClientFactory() {
if (eventStreamClientFactory == null) {
eventStreamClientFactory = EventStreamClientFactory.withScope(readerGroupScope, clientConfig);
}

return eventStreamClientFactory;
}

/**
Expand All @@ -561,12 +605,11 @@ protected ReaderGroupManager createReaderGroupManager() {
*/
protected EventStreamReader<T> createEventStreamReader(String readerId) {
return createPravegaReader(
this.clientConfig,
readerId,
this.readerGroupScope,
this.readerGroupName,
this.deserializationSchema,
ReaderConfig.builder().build());
ReaderConfig.builder().build(),
eventStreamClientFactory);
}

// ------------------------------------------------------------------------
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

package io.pravega.connectors.flink;

import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;

Expand All @@ -24,7 +26,8 @@
/**
* A stream table source factory implementation of {@link StreamTableSourceFactory} to access Pravega streams.
*/
public class FlinkPravegaStreamTableSourceFactory extends FlinkPravegaTableFactoryBase implements StreamTableSourceFactory<Row> {
public class FlinkPravegaStreamTableSourceSinkFactory extends FlinkPravegaTableFactoryBase implements
StreamTableSourceFactory<Row>, StreamTableSinkFactory<Row> {

@Override
public Map<String, String> requiredContext() {
Expand All @@ -43,6 +46,11 @@ public StreamTableSource<Row> createStreamTableSource(Map<String, String> proper
return createFlinkPravegaTableSource(properties);
}

@Override
public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
return createFlinkPravegaTableSink(properties);
}

@Override
protected String getVersion() {
return String.valueOf(CONNECTOR_VERSION_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/
package io.pravega.connectors.flink;

import io.pravega.client.ClientConfig;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.Checkpoint;
import io.pravega.client.stream.ReaderGroup;
import io.pravega.client.stream.ReaderGroupConfig;
Expand Down Expand Up @@ -42,6 +44,12 @@ class ReaderCheckpointHook implements MasterTriggerRestoreHook<Checkpoint> {

// ------------------------------------------------------------------------

/** The reader group used to trigger and restore pravega checkpoints (MUST be closed when Hook Closed) */
protected ReaderGroup readerGroup;

/** The reader group manager used to create the reader group (MUST be closed when Hook Closed) */
protected ReaderGroupManager readerGroupManager;

/** The logical name of the operator. This is different from the (randomly generated)
* reader group name, because it is used to identify the state in a checkpoint/savepoint
* when resuming the checkpoint/savepoint with another job. */
Expand All @@ -50,9 +58,6 @@ class ReaderCheckpointHook implements MasterTriggerRestoreHook<Checkpoint> {
/** The serializer for Pravega checkpoints, to store them in Flink checkpoints */
private final CheckpointSerializer checkpointSerializer;

/** The reader group used to trigger and restore pravega checkpoints */
private final ReaderGroup readerGroup;

/** The timeout on the future returned by the 'initiateCheckpoint()' call */
private final Time triggerTimeout;

Expand All @@ -65,16 +70,21 @@ class ReaderCheckpointHook implements MasterTriggerRestoreHook<Checkpoint> {
@GuardedBy("scheduledExecutorLock")
private ScheduledExecutorService scheduledExecutorService;

ReaderCheckpointHook(String hookUid, ReaderGroup readerGroup, Time triggerTimeout, ReaderGroupConfig readerGroupConfig) {

ReaderCheckpointHook(String hookUid, String readerGroupName, String readerGroupScope, Time triggerTimeout, ClientConfig clientConfig, ReaderGroupConfig readerGroupConfig) {
this.hookUid = checkNotNull(hookUid);
this.readerGroup = checkNotNull(readerGroup);
this.triggerTimeout = triggerTimeout;
this.readerGroupConfig = readerGroupConfig;
this.checkpointSerializer = new CheckpointSerializer();

initializeReaderGroup(readerGroupName, readerGroupScope, clientConfig);
}

// ------------------------------------------------------------------------
protected void initializeReaderGroup(String readerGroupName, String readerGroupScope, ClientConfig clientConfig) {
readerGroupManager = ReaderGroupManager.withScope(readerGroupScope, clientConfig);
readerGroupManager.createReaderGroup(readerGroupName, readerGroupConfig);
readerGroup = readerGroupManager.getReaderGroup(readerGroupName);
}

@Override
public String getIdentifier() {
Expand Down Expand Up @@ -123,6 +133,9 @@ public void reset() {

@Override
public void close() {
log.info("closing reader group Manager");
this.readerGroupManager.close();

// close the reader group properly
log.info("closing the reader group");
this.readerGroup.close();
Expand Down
Loading

0 comments on commit b790f9c

Please sign in to comment.