Skip to content

Commit

Permalink
ISSUE-36 Use kafka connect standard dead letter and logger error repo…
Browse files Browse the repository at this point in the history
…rter (#37)

* ISSUE-36 Use kafka connect standard dead letter and logger error reporter

* ISSUE-36 Base image point to github repo docker repository

Co-authored-by: vivetiwa <[email protected]>
  • Loading branch information
vivetiwa and vivetiwa authored Apr 11, 2022
1 parent ec2c9ee commit fc64324
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 410 deletions.
7 changes: 5 additions & 2 deletions DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ AEP Sink connector configurations can be supplied in the call register the conne
| aep.connection.auth.client.code | IMS client code | | no | |
| aep.connection.auth.client.secret | IME client secret | | no | |
| aep.flush.bytes.kb | bytes threshold to determine the batch | 4 | no | |
| aep.error.logger | put failed message to dead letter topic or log | none | no | kafka, log, both, none |
| aep.error.topic | deadletter topic name | none | no | |

## Step-by-Step Workflow

Expand Down Expand Up @@ -285,6 +283,11 @@ curl -s -X POST \
}' http://localhost:8083/connectors
```

#### Dead Letter Configuration
To send error records to dead letter topic please use standard kafka connector error configuration.

Kafka connect dead letter configurations : `https://docs.confluent.io/platform/current/connect/concepts.html#dead-letter-queue`

#### Poxy host configuration
There are 2 ways to route request to aep endpoint through proxy server :
1. **Using Environment Variable** : Export poxyHost and proxyPort on each kafka node, then restart kafka connect node.
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ services:
- kafka-rest-proxy

kafka-connect:
image: streaming-connect
image: ghcr.io/adobe/experience-platform-streaming-connect
hostname: kafka-connect
ports:
- "8083:8083"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,9 @@ public HttpException(String message, Throwable cause, int responseCode) {
public int getResponseCode() {
return responseCode;
}

@Override
public String getMessage() {
return String.format("Message : %s, ResponseCode : %s", super.getMessage(), this.responseCode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ public abstract class AbstractAEPPublisher implements DataPublisher {

private static final String AEP_CONNECTION_AUTH_ENABLED_VALUE = "true";
private static final String AEP_CONNECTION_AUTH_DISABLED_VALUE = "false";
public static final String AEP_ERROR_LOGGER = "aep.error.logger";
public static final String AEP_ERROR_TOPIC = "aep.error.topic";

protected HttpProducer getHttpProducer(Map<String, String> props) throws AEPStreamingException {
return HttpProducer.newBuilder(getAepEndpoint(props.get(AEP_ENDPOINT)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@
import com.google.gson.GsonBuilder;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.codehaus.plexus.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* @author Adobe Inc.
Expand All @@ -55,7 +53,7 @@ public abstract class AbstractSinkTask<T> extends SinkTask {
private int flushIntervalMillis;
private int flushBytesCount;
private long lastFlushMilliSec = System.currentTimeMillis();
private String bootstrapServers;
private ErrantRecordReporter errantRecordReporter;

@Override
public String version() {
Expand All @@ -65,21 +63,18 @@ public String version() {
@Override
public void initialize(SinkTaskContext context) {
super.initialize(context);
bootstrapServers = getBootstrapServers(context);
errantRecordReporter = context.errantRecordReporter();
}

@Override
public void start(Map<String, String> props) {
LOG.info("Started Sink Task with props: {}", props);
if (Objects.nonNull(bootstrapServers)) {
props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
}

try {
flushIntervalMillis = SinkUtils.getProperty(props, FLUSH_INTERVAL_SECS, DEFAULT_FLUSH_INTERVAL, MILLIS_IN_A_SEC);
flushBytesCount = SinkUtils.getProperty(props, FLUSH_BYTES_KB, DEFAULT_FLUSH_BYTES_KB, BYTES_IN_A_KB);

init(props);
init(props, errantRecordReporter);
LOG.info("Connection created with flush interval {} secs and flush bytes {} KB",
flushIntervalMillis / MILLIS_IN_A_SEC, flushBytesCount / BYTES_IN_A_KB);
} catch (AEPStreamingException aepStreamingException) {
Expand All @@ -104,7 +99,7 @@ public void put(Collection<SinkRecord> records) {

List<T> eventsToPublish = new ArrayList<>();
for (SinkRecord record : records) {
T dataToPublish = getDataToPublish(SinkUtils.getStringPayload(GSON, record));
T dataToPublish = getDataToPublish(Pair.of(SinkUtils.getStringPayload(GSON, record), record));
eventsToPublish.add(dataToPublish);
bytesRead += getPayloadLength(dataToPublish);

Expand All @@ -127,9 +122,10 @@ public void put(Collection<SinkRecord> records) {
}
}

public abstract void init(Map<String, String> properties) throws AEPStreamingException;
public abstract void init(Map<String, String> properties,
ErrantRecordReporter errantRecordReporter) throws AEPStreamingException;

public abstract T getDataToPublish(String sinkRecord);
public abstract T getDataToPublish(Pair<String, SinkRecord> sinkRecord);

public abstract int getPayloadLength(T dataToPublish);

Expand All @@ -150,19 +146,6 @@ private void publishAndLogIfRequired(List<T> eventsToPublish) {
}
}

private String getBootstrapServers(SinkTaskContext context) {
try {
Object workerSinkTask = ReflectionUtils.getValueIncludingSuperclasses("sinkTask", context);
WorkerConfig workerConfig = (WorkerConfig) ReflectionUtils
.getValueIncludingSuperclasses("workerConfig", workerSinkTask);

return Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",");
} catch (IllegalAccessException exception) {
LOG.error("Failed to get bootstrap server.", exception);
}
return null;
}

private void reset(List<T> eventsToPublish, long tempCurrentTime) {
lastFlushMilliSec = tempCurrentTime;
bytesRead = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
package com.adobe.platform.streaming.sink;

import com.adobe.platform.streaming.AEPStreamingException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.sink.SinkRecord;

import java.util.List;

Expand All @@ -23,7 +25,7 @@ public interface DataPublisher {

void start();

void publishData(List<String> messages) throws AEPStreamingException;
void publishData(List<Pair<String, SinkRecord>> messages) throws AEPStreamingException;

void stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@
import com.adobe.platform.streaming.http.HttpProducer;
import com.adobe.platform.streaming.http.HttpUtil;
import com.adobe.platform.streaming.sink.AbstractAEPPublisher;
import com.adobe.platform.streaming.sink.impl.reporter.CompositeErrorReporter;
import com.adobe.platform.streaming.sink.impl.reporter.ErrorReporter;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.entity.ContentType;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.Objects;

/**
* @author Adobe Inc.
Expand All @@ -42,24 +43,28 @@ public class AEPPublisher extends AbstractAEPPublisher {

private int count;
private final HttpProducer producer;
private final ErrorReporter<List<Future<?>>> errorReporter;
private final ErrantRecordReporter errorReporter;

AEPPublisher(Map<String, String> props) throws AEPStreamingException {
AEPPublisher(Map<String, String> props, ErrantRecordReporter errantRecordReporter) throws AEPStreamingException {
count = 0;
producer = getHttpProducer(props);
errorReporter = new CompositeErrorReporter(props);
errorReporter = errantRecordReporter;
}

@Override
public void publishData(List<String> messages) throws AEPStreamingException {
public void publishData(List<Pair<String, SinkRecord>> messages) throws AEPStreamingException {
if (CollectionUtils.isEmpty(messages)) {
LOG.debug("No messages to publish");
return;
}

try {
JSONArray jsonMessages = new JSONArray();
messages.stream().map(JSONObject::new).forEach(jsonMessages::put);
final JSONArray jsonMessages = new JSONArray();
messages.stream()
.map(Pair::getKey)
.map(JSONObject::new)
.forEach(jsonMessages::put);

JSONObject payload = new JSONObject();
payload.put(MESSAGES_KEY, jsonMessages);

Expand All @@ -74,7 +79,9 @@ public void publishData(List<String> messages) throws AEPStreamingException {
LOG.debug("Successfully published data to Adobe Experience Platform: {}", response);
} catch (HttpException httpException) {
LOG.error("Failed to publish data to Adobe Experience Platform", httpException);
errorReporter.report(messages, httpException);
if (Objects.nonNull(errorReporter)) {
messages.forEach(message -> errorReporter.report(message.getValue(), httpException));
}
if (HttpUtil.is500(httpException.getResponseCode()) || HttpUtil.isUnauthorized(httpException.getResponseCode())) {
throw new AEPStreamingException("Failed to publish", httpException);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,38 @@
import com.adobe.platform.streaming.AEPStreamingException;
import com.adobe.platform.streaming.sink.AbstractSinkTask;
import com.adobe.platform.streaming.sink.DataPublisher;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;

import java.util.List;
import java.util.Map;

/**
* @author Adobe Inc.
*/
public class AEPSinkTask extends AbstractSinkTask<String> {
public class AEPSinkTask extends AbstractSinkTask<Pair<String, SinkRecord>> {

private DataPublisher publisher;

@Override
public void init(Map<String, String> props) throws AEPStreamingException {
publisher = new AEPPublisher(props);
public void init(Map<String, String> props, ErrantRecordReporter errantRecordReporter) throws AEPStreamingException {
publisher = new AEPPublisher(props, errantRecordReporter);
publisher.start();
}

@Override
public String getDataToPublish(String sinkRecord) {
public Pair<String, SinkRecord> getDataToPublish(Pair<String, SinkRecord> sinkRecord) {
return sinkRecord;
}

@Override
public int getPayloadLength(String dataToPublish) {
return dataToPublish.length();
public int getPayloadLength(Pair<String, SinkRecord> dataToPublish) {
return dataToPublish.getKey().length();
}

@Override
public void publishData(List<String> eventDataList) throws AEPStreamingException {
public void publishData(List<Pair<String, SinkRecord>> eventDataList) throws AEPStreamingException {
publisher.publishData(eventDataList);
}

Expand Down

This file was deleted.

Loading

0 comments on commit fc64324

Please sign in to comment.