Skip to content

Commit de2c45a

Browse files
author
Niraj Patel
committed
upgrading kafka, breaking out configuration
1 parent f52fd03 commit de2c45a

15 files changed

+501
-363
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ For every message, a SourceRecord is created, having the following schema:
6868
* **database**: database in which the operation took place
6969
* **object**: inserted/updated/deleted object
7070

71-
## Sample Configuration
71+
## Source Configuration
7272
```ini
7373
name=mongodb-source-connector
7474
connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector

pom.xml

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@
88
<artifactId>connect-mongodb</artifactId>
99
<version>1.0</version>
1010

11+
<properties>
12+
<kafka.version>0.10.0.0</kafka.version>
13+
<mongodb.version>3.2.1</mongodb.version>
14+
</properties>
15+
1116
<build>
1217
<plugins>
1318
<plugin>
@@ -16,8 +21,8 @@
1621
<version>3.3</version>
1722
<inherited>true</inherited>
1823
<configuration>
19-
<source>1.7</source>
20-
<target>1.7</target>
24+
<source>1.8</source>
25+
<target>1.8</target>
2126
</configuration>
2227
</plugin>
2328
<plugin>
@@ -46,12 +51,18 @@
4651
<dependency>
4752
<groupId>org.mongodb</groupId>
4853
<artifactId>mongodb-driver</artifactId>
49-
<version>3.2.1</version>
54+
<version>${mongodb.version}</version>
5055
</dependency>
5156
<dependency>
5257
<groupId>org.apache.kafka</groupId>
5358
<artifactId>connect-api</artifactId>
54-
<version>0.9.0.0</version>
59+
<version>${kafka.version}</version>
60+
<scope>provided</scope>
61+
</dependency>
62+
<dependency>
63+
<groupId>org.apache.kafka</groupId>
64+
<artifactId>kafka-clients</artifactId>
65+
<version>${kafka.version}</version>
5566
<scope>provided</scope>
5667
</dependency>
5768
<dependency>

src/main/java/org/apache/kafka/connect/mongodb/DatabaseReader.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.mongodb.CursorType;
44
import com.mongodb.MongoClient;
5+
import com.mongodb.ServerAddress;
56
import com.mongodb.client.FindIterable;
67
import com.mongodb.client.MongoCollection;
78
import com.mongodb.client.MongoDatabase;
@@ -14,7 +15,10 @@
1415
import org.slf4j.Logger;
1516
import org.slf4j.LoggerFactory;
1617

18+
import java.util.Arrays;
19+
import java.util.List;
1720
import java.util.concurrent.ConcurrentLinkedQueue;
21+
import java.util.stream.Collectors;
1822

1923
/**
2024
* Reads mutation from a mongodb database
@@ -23,8 +27,7 @@
2327
*/
2428
public class DatabaseReader implements Runnable {
2529
Logger log = LoggerFactory.getLogger(DatabaseReader.class);
26-
private String host;
27-
private Integer port;
30+
private String hosts;
2831
private String db;
2932
private String start;
3033

@@ -33,9 +36,8 @@ public class DatabaseReader implements Runnable {
3336
private MongoCollection<Document> oplog;
3437
private Bson query;
3538

36-
public DatabaseReader(String host, Integer port, String db, String start, ConcurrentLinkedQueue<Document> messages) {
37-
this.host = host;
38-
this.port = port;
39+
public DatabaseReader(String hosts, String db, String start, ConcurrentLinkedQueue<Document> messages) {
40+
this.hosts = hosts;
3941
this.db = db;
4042
this.start = start;
4143
this.messages = messages;
@@ -81,7 +83,19 @@ private void init() {
8183
* @return the oplog collection
8284
*/
8385
private MongoCollection readCollection() {
84-
MongoClient mongoClient = new MongoClient(host, port);
86+
List<ServerAddress> addresses = Arrays.stream(hosts.split(",")).map(hostUrl -> {
87+
try {
88+
String[] hostAndPort = hostUrl.split(":");
89+
String host = hostAndPort[0];
90+
int port = Integer.parseInt(hostAndPort[1]);
91+
return new ServerAddress(host, port);
92+
} catch (ArrayIndexOutOfBoundsException aioobe) {
93+
throw new ConnectException("hosts must be in host:port format");
94+
} catch (NumberFormatException nfe) {
95+
throw new ConnectException("port in the hosts field must be an integer");
96+
}
97+
}).collect(Collectors.toList());
98+
MongoClient mongoClient = new MongoClient(addresses);
8599
MongoDatabase db = mongoClient.getDatabase("local");
86100
return db.getCollection("oplog.rs");
87101
}

src/main/java/org/apache/kafka/connect/mongodb/MongodbReader.java renamed to src/main/java/org/apache/kafka/connect/mongodb/MongoDBReader.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,17 @@
1515
*
1616
* @author Andrea Patelli
1717
*/
18-
public class MongodbReader {
19-
private static final Logger log = LoggerFactory.getLogger(MongodbReader.class);
18+
public class MongoDBReader {
19+
private static final Logger log = LoggerFactory.getLogger(MongoDBReader.class);
2020

2121
protected ConcurrentLinkedQueue<Document> messages;
2222

2323
private List<String> dbs;
24-
private String host;
25-
private Integer port;
24+
private String hosts;
2625
private Map<Map<String, String>, Map<String, Object>> start;
2726

28-
public MongodbReader(String host, Integer port, List<String> dbs, Map<Map<String, String>, Map<String, Object>> start) {
29-
this.host = host;
30-
this.port = port;
27+
public MongoDBReader(String hosts, List<String> dbs, Map<Map<String, String>, Map<String, Object>> start) {
28+
this.hosts = hosts;
3129
this.dbs = new ArrayList<>(0);
3230
this.dbs.addAll(dbs);
3331
this.start = start;
@@ -36,7 +34,7 @@ public MongodbReader(String host, Integer port, List<String> dbs, Map<Map<String
3634

3735
public void run() {
3836
// for every database to watch
39-
for (String db : dbs) {
37+
dbs.stream().forEach(db -> {
4038
String start;
4139
// get the last message that was read
4240
Map<String, Object> dbOffset = this.start.get(Collections.singletonMap("mongodb", db));
@@ -46,13 +44,12 @@ public void run() {
4644
start = (String) this.start.get(Collections.singletonMap("mongodb", db)).get(db);
4745

4846
log.trace("Starting database reader with configuration: ");
49-
log.trace("host: {}", host);
50-
log.trace("port: {}", port);
47+
log.trace("hosts: {}", hosts);
5148
log.trace("db: {}", db);
5249
log.trace("start: {}", start);
5350
// start a new thread for reading mutation of the specific database
54-
DatabaseReader reader = new DatabaseReader(host, port, db, start, messages);
51+
DatabaseReader reader = new DatabaseReader(hosts, db, start, messages);
5552
new Thread(reader).start();
56-
}
53+
});
5754
}
5855
}
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,36 @@
11
package org.apache.kafka.connect.mongodb;
22

3+
import org.apache.kafka.common.config.ConfigDef;
34
import org.apache.kafka.common.utils.AppInfoParser;
45
import org.apache.kafka.connect.connector.Task;
56
import org.apache.kafka.connect.errors.ConnectException;
7+
import org.apache.kafka.connect.mongodb.configuration.MongoDBSinkConfiguration;
68
import org.apache.kafka.connect.sink.SinkConnector;
79
import org.apache.kafka.connect.util.ConnectorUtils;
810
import org.apache.kafka.connect.utils.LogUtils;
911
import org.apache.kafka.connect.utils.StringUtils;
1012
import org.slf4j.Logger;
1113
import org.slf4j.LoggerFactory;
1214

13-
import java.util.*;
15+
import java.util.ArrayList;
16+
import java.util.Arrays;
17+
import java.util.HashMap;
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.stream.IntStream;
1421

1522
/**
16-
* MongodbSinkConnector implement the Connector interface to send Kafka
23+
* MongoDBSinkConnector implement the Connector interface to send Kafka
1724
* data to Mongodb
1825
*
1926
* @author Andrea Patelli
27+
* @author Niraj Patel
2028
*/
21-
public class MongodbSinkConnector extends SinkConnector {
22-
private final static Logger log = LoggerFactory.getLogger(MongodbSinkConnector.class);
29+
public class MongoDBSinkConnector extends SinkConnector {
2330

24-
public static final String PORT = "port";
25-
public static final String HOST = "host";
26-
public static final String BULK_SIZE = "bulk.size";
27-
public static final String DATABASE = "mongodb.database";
28-
public static final String COLLECTIONS = "mongodb.collections";
29-
public static final String TOPICS = "topics";
31+
private final static Logger log = LoggerFactory.getLogger(MongoDBSinkConnector.class);
3032

31-
private String port;
32-
private String host;
33-
private String bulkSize;
34-
private String database;
35-
private String collections;
36-
private String topics;
33+
private Map<String, String> configuration;
3734

3835
/**
3936
* Get the version of this connector.
@@ -49,33 +46,20 @@ public String version() {
4946
* Start this Connector. This method will only be called on a clean Connector, i.e. it has
5047
* either just been instantiated and initialized or {@link #stop()} has been invoked.
5148
*
52-
* @param map configuration settings
49+
* @param configuration configuration settings
5350
*/
5451
@Override
55-
public void start(Map<String, String> map) {
56-
log.trace("Parsing configuration");
57-
58-
port = map.get(PORT);
59-
if (port == null || port.isEmpty())
60-
throw new ConnectException("Missing " + PORT + " config");
61-
62-
bulkSize = map.get(BULK_SIZE);
63-
if (bulkSize == null || bulkSize.isEmpty())
64-
throw new ConnectException("Missing " + BULK_SIZE + " config");
65-
66-
host = map.get(HOST);
67-
if (host == null || host.isEmpty())
68-
throw new ConnectException("Missing " + HOST + " config");
69-
70-
database = map.get(DATABASE);
71-
collections = map.get(COLLECTIONS);
72-
topics = map.get(TOPICS);
52+
public void start(Map<String, String> configuration) {
53+
MongoDBSinkConfiguration sinkConfiguration = new MongoDBSinkConfiguration(configuration);
54+
this.configuration = sinkConfiguration.originalsStrings();
7355

56+
String collections = configuration.get(MongoDBSinkConfiguration.COLLECTIONS_CONFIG);
57+
String topics = configuration.get(MongoDBSinkConfiguration.TOPICS_CONFIG);
7458
if (collections.split(",").length != topics.split(",").length) {
7559
throw new ConnectException("The number of topics should be the same as the number of collections");
7660
}
7761

78-
LogUtils.dumpConfiguration(map, log);
62+
LogUtils.dumpConfiguration(configuration, log);
7963
}
8064

8165
/**
@@ -85,7 +69,7 @@ public void start(Map<String, String> map) {
8569
*/
8670
@Override
8771
public Class<? extends Task> taskClass() {
88-
return MongodbSinkTask.class;
72+
return MongoDBSinkTask.class;
8973
}
9074

9175
/**
@@ -98,22 +82,21 @@ public Class<? extends Task> taskClass() {
9882
@Override
9983
public List<Map<String, String>> taskConfigs(int maxTasks) {
10084
List<Map<String, String>> configs = new ArrayList<>();
101-
List<String> coll = Arrays.asList(collections.split(","));
85+
List<String> coll = Arrays.asList(configuration.get(MongoDBSinkConfiguration.COLLECTIONS_CONFIG).split(","));
10286
int numGroups = Math.min(coll.size(), maxTasks);
10387
List<List<String>> dbsGrouped = ConnectorUtils.groupPartitions(coll, numGroups);
104-
List<String> topics = Arrays.asList(this.topics.split(","));
88+
List<String> topics = Arrays.asList(configuration.get(MongoDBSinkConfiguration.TOPICS_CONFIG).split(","));
10589
List<List<String>> topicsGrouped = ConnectorUtils.groupPartitions(topics, numGroups);
10690

107-
for (int i = 0; i < numGroups; i++) {
91+
IntStream.range(0, numGroups).forEach(i -> {
10892
Map<String, String> config = new HashMap<>();
109-
config.put(PORT, port);
110-
config.put(BULK_SIZE, bulkSize);
111-
config.put(HOST, host);
112-
config.put(DATABASE, database);
113-
config.put(COLLECTIONS, StringUtils.join(dbsGrouped.get(i), ","));
114-
config.put(TOPICS, StringUtils.join(topicsGrouped.get(i), ","));
93+
config.put(MongoDBSinkConfiguration.BULK_SIZE_CONFIG, configuration.get(MongoDBSinkConfiguration.BULK_SIZE_CONFIG));
94+
config.put(MongoDBSinkConfiguration.HOST_URLS_CONFIG, configuration.get(MongoDBSinkConfiguration.HOST_URLS_CONFIG));
95+
config.put(MongoDBSinkConfiguration.DATABASE_CONFIG, configuration.get(MongoDBSinkConfiguration.DATABASE_CONFIG));
96+
config.put(MongoDBSinkConfiguration.COLLECTIONS_CONFIG, StringUtils.join(dbsGrouped.get(i), ","));
97+
config.put(MongoDBSinkConfiguration.TOPICS_CONFIG, StringUtils.join(topicsGrouped.get(i), ","));
11598
configs.add(config);
116-
}
99+
});
117100
return configs;
118101
}
119102

@@ -122,6 +105,12 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
122105
*/
123106
@Override
124107
public void stop() {
108+
// not implemented
109+
}
125110

111+
@Override
112+
public ConfigDef config() {
113+
return MongoDBSinkConfiguration.config;
126114
}
115+
127116
}

0 commit comments

Comments
 (0)