Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink exactly-once Reader and Writer #4

Merged
merged 2 commits into from
May 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright (c) 2017 Dell Inc., or its subsidiaries.
*/
package io.pravega.connectors.flink;

import io.pravega.stream.Checkpoint;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.IOException;

/**
* Simple serializer for {@link Checkpoint} objects.
*
* <p>The serializer currently uses {@link java.io.Serializable Java Serialization} to
* serialize the checkpoint objects.
*/
class CheckpointSerializer implements SimpleVersionedSerializer<Checkpoint> {

private static final int VERSION = 1;

@Override
public int getVersion() {
return VERSION;
}

@Override
public byte[] serialize(Checkpoint checkpoint) throws IOException {
return SerializationUtils.serialize(checkpoint);
}

@Override
public Checkpoint deserialize(int version, byte[] bytes) throws IOException {
if (version != VERSION) {
throw new IOException("Invalid format version for serialized Pravega Checkpoint: " + version);
}

return (Checkpoint) SerializationUtils.deserialize(bytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
/**
* Copyright (c) 2017 Dell Inc., or its subsidiaries.
*/
package io.pravega.connectors.flink;

import com.google.common.base.Preconditions;

import io.pravega.ClientFactory;
import io.pravega.ReaderGroupManager;
import io.pravega.stream.Checkpoint;
import io.pravega.stream.EventRead;
import io.pravega.stream.EventStreamReader;
import io.pravega.stream.ReaderConfig;
import io.pravega.stream.ReaderGroupConfig;
import io.pravega.stream.Serializer;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.RandomStringUtils;

import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.FlinkException;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;

/**
* Flink source implementation for reading from pravega storage.
*
* @param <T> The type of the event to be written.
*/
@Slf4j
public class FlinkExactlyOncePravegaReader<T>
extends RichParallelSourceFunction<T>
implements ResultTypeQueryable<T>, StoppableFunction, ExternallyInducedSource<T, Checkpoint> {

private static final long serialVersionUID = 1L;

private static final long DEFAULT_EVENT_READ_TIMEOUT = 1000;

private static final long DEFAULT_CHECKPOINT_INITIATE_TIMEOUT = 5000;

// ----- configuration fields -----

// The supplied event deserializer.
private final DeserializationSchema<T> deserializationSchema;

// The pravega controller endpoint.
private final URI controllerURI;

// The scope name of the destination stream.
private final String scopeName;

// The readergroup name to coordinate the parallel readers. This should be unique for a Flink job.
private final String readerGroupName;

// the name of the reader, used to store state and resume existing state from savepoints
private final String readerName;

// the timeout for reading events from Pravega
private long eventReadTimeout = DEFAULT_EVENT_READ_TIMEOUT;

// the timeout for call that initiates the Pravega checkpoint
private long checkpointInitiateTimeout = DEFAULT_CHECKPOINT_INITIATE_TIMEOUT;

// ----- runtime fields -----

// Flag to terminate the source. volatile, because 'stop()' and 'cancel()'
// may be called asynchronously
private volatile boolean running = true;

// checkpoint trigger callback, invoked when a checkpoint event is received.
// no need to be volatile, the source is driven by only one thread
private transient CheckpointTrigger checkpointTrigger;

// ------------------------------------------------------------------------

/**
* Creates a new Flink Pravega reader instance which can be added as a source to a Flink job.
*
* <p>The reader will use a random name under which it stores its state in a checkpoint. While
* checkpoints still work, this means that matching the state into another Flink jobs
* (when resuming from a savepoint) will not be possible. Thus it is generally recommended
* to give a reader name to each reader.
*
* @param controllerURI The pravega controller endpoint address.
* @param scope The destination stream's scope name.
* @param streamNames The list of stream names to read events from.
* @param startTime The start time from when to read events from.
* Use 0 to read all stream events from the beginning.
* @param deserializationSchema The implementation to deserialize events from pravega streams.
*/
public FlinkExactlyOncePravegaReader(final URI controllerURI, final String scope, final Set<String> streamNames,
final long startTime, final DeserializationSchema<T> deserializationSchema) {

this(controllerURI, scope, streamNames, startTime, deserializationSchema, UUID.randomUUID().toString());
}

/**
* Creates a new Flink Pravega reader instance which can be added as a source to a Flink job.
*
* <p>The reader will use the given {@code readerName} to store its state (its positions
* in the stream segments) in Flink's checkpoints/savepoints. This name is used in a similar
* way as the operator UIDs ({@link SingleOutputStreamOperator#uid(String)}) to identify state
* when matching it into another job that resumes from this job's checkpoints/savepoints.
*
* <p>Without specifying a {@code readerName}, the job will correctly checkpoint and recover,
* but new instances of the job can typically not resume this reader's state (positions).
*
* @param controllerURI The pravega controller endpoint address.
* @param scope The destination stream's scope name.
* @param streamNames The list of stream names to read events from.
* @param startTime The start time from when to read events from.
* Use 0 to read all stream events from the beginning.
* @param deserializationSchema The implementation to deserialize events from pravega streams.
* @param readerName The name of the reader, used to store state and resume existing
* state from savepoints.
*/
public FlinkExactlyOncePravegaReader(final URI controllerURI, final String scope, final Set<String> streamNames,
final long startTime, final DeserializationSchema<T> deserializationSchema,
final String readerName) {

Preconditions.checkNotNull(controllerURI, "controllerURI");
Preconditions.checkNotNull(scope, "scope");
Preconditions.checkNotNull(streamNames, "streamNames");
Preconditions.checkArgument(startTime >= 0, "start time must be >= 0");
Preconditions.checkNotNull(deserializationSchema, "deserializationSchema");
Preconditions.checkNotNull(readerName, "readerName");

this.controllerURI = controllerURI;
this.scopeName = scope;
this.deserializationSchema = deserializationSchema;
this.readerGroupName = "flink" + RandomStringUtils.randomAlphanumeric(20).toLowerCase();
this.readerName = readerName;

// TODO: This will require the client to have access to the pravega controller and handle any temporary errors.
// See https://github.com/pravega/pravega/issues/553.
log.info("Creating reader group: {} for the Flink job", this.readerGroupName);

ReaderGroupManager.withScope(scope, controllerURI)
.createReaderGroup(this.readerGroupName, ReaderGroupConfig.builder().startingTime(startTime).build(),
streamNames);
}

// ------------------------------------------------------------------------
// properties
// ------------------------------------------------------------------------

/**
* Sets the timeout for initiating a checkpoint in Pravega.
*
* <p>This timeout if applied to the future returned by
* {@link io.pravega.stream.ReaderGroup#initiateCheckpoint(String, ScheduledExecutorService)}.
*
* @param checkpointInitiateTimeout The timeout, in milliseconds
*/
public void setCheckpointInitiateTimeout(long checkpointInitiateTimeout) {
Preconditions.checkArgument(checkpointInitiateTimeout > 0, "timeout must be >= 0");
this.checkpointInitiateTimeout = checkpointInitiateTimeout;
}

/**
* Gets the timeout for initiating a checkpoint in Pravega.
*
* <p>This timeout if applied to the future returned by
* {@link io.pravega.stream.ReaderGroup#initiateCheckpoint(String, ScheduledExecutorService)}.
*
* @return The timeout, in milliseconds
*/
public long getCheckpointInitiateTimeout() {
return checkpointInitiateTimeout;
}

/**
* Gets the timeout for the call to read events from Pravega. After the timeout
* expires (without an event being returned), another call will be made.
*
* <p>This timeout is passed to {@link EventStreamReader#readNextEvent(long)}.
*
* @param eventReadTimeout The timeout, in milliseconds
*/
public void setEventReadTimeout(long eventReadTimeout) {
Preconditions.checkArgument(checkpointInitiateTimeout > 0, "timeout must be >= 0");
this.eventReadTimeout = eventReadTimeout;
}

/**
* Gets the timeout for the call to read events from Pravega.
*
* <p>This timeout is the value passed to {@link EventStreamReader#readNextEvent(long)}.
*
* @return The timeout, in milliseconds
*/
public long getEventReadTimeout() {
return eventReadTimeout;
}

// ------------------------------------------------------------------------
// source function methods
// ------------------------------------------------------------------------

@Override
public void run(SourceContext<T> ctx) throws Exception {
// the reader ID is random unique per source task
final String readerId = "flink-reader-" + UUID.randomUUID();

log.info("{} : Creating Pravega reader with ID '{}' for controller URI: {}",
getRuntimeContext().getTaskNameWithSubtasks(), readerId, this.controllerURI);

// create the adapter between Pravega's serializers and Flink's serializers
final Serializer<T> deserializer = new FlinkDeserializer<>(this.deserializationSchema);

// build the reader
try (EventStreamReader<T> pravegaReader = ClientFactory.withScope(this.scopeName, this.controllerURI)
.createReader(readerId, this.readerGroupName, deserializer, ReaderConfig.builder().build())) {

log.info("Starting Pravega reader '{}' for controller URI {}", readerId, this.controllerURI);

// main work loop, which this task is running
while (this.running) {
final EventRead<T> eventRead = pravegaReader.readNextEvent(eventReadTimeout);
final T event = eventRead.getEvent();

// emit the event, if one was carried
if (event != null) {
if (this.deserializationSchema.isEndOfStream(event)) {
// Found stream end marker.
// TODO: Handle scenario when reading from multiple segments. This will be cleaned up as part of:
// https://github.com/pravega/pravega/issues/551.
log.info("Reached end of stream for reader: {}", readerId);
return;
}
ctx.collect(event);
}

// if the read marks a checkpoint, trigger the checkpoint
if (eventRead.isCheckpoint()) {
triggerCheckpoint(eventRead.getCheckpointName());
}
}
}
}

@Override
public void cancel() {
this.running = false;
}

@Override
public void stop() {
this.running = false;
}

@Override
public TypeInformation<T> getProducedType() {
return this.deserializationSchema.getProducedType();
}

// ------------------------------------------------------------------------
// checkpoints
// ------------------------------------------------------------------------

@Override
public MasterTriggerRestoreHook<Checkpoint> createMasterTriggerRestoreHook() {
return new ReaderCheckpointHook(this.readerName, this.readerGroupName,
this.scopeName, this.controllerURI, this.checkpointInitiateTimeout);
}

@Override
public void setCheckpointTrigger(CheckpointTrigger checkpointTrigger) {
this.checkpointTrigger = checkpointTrigger;
}

/**
* Triggers the checkpoint in the Flink source operator.
*
* <p>This method assumes that the {@code checkpointIdentifier} is a string of the form
*/
private void triggerCheckpoint(String checkpointIdentifier) throws FlinkException {
Preconditions.checkState(checkpointTrigger != null, "checkpoint trigger not set");

log.debug("{} received checkpoint event for {}",
getRuntimeContext().getTaskNameWithSubtasks(), checkpointIdentifier);

final long checkpointId;
try {
checkpointId = ReaderCheckpointHook.parseCheckpointId(checkpointIdentifier);
} catch (IllegalArgumentException e) {
throw new FlinkException("Cannot trigger checkpoint due to invalid Pravega checkpoint name", e.getCause());
}

checkpointTrigger.triggerCheckpoint(checkpointId);
}

// ------------------------------------------------------------------------
// serializer
// ------------------------------------------------------------------------

private static final class FlinkDeserializer<T> implements Serializer<T> {

private final DeserializationSchema<T> deserializationSchema;

FlinkDeserializer(DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
}

@Override
public ByteBuffer serialize(T value) {
throw new IllegalStateException("serialize() called within a deserializer");
}

@Override
@SneakyThrows
public T deserialize(ByteBuffer serializedValue) {
return deserializationSchema.deserialize(serializedValue.array());
}
}
}
Loading