Skip to content

Commit 3a21c27

Browse files
PLUGIN-80 Confluent Cloud realtime Source and Sink plugins
1 parent d6b932f commit 3a21c27

23 files changed

+3909
-0
lines changed

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,22 @@ To build this plugin:
2626
The build will create a .jar and .json file under the ``target`` directory.
2727
These files can be used to deploy your plugins.
2828

29+
Run tests
30+
-----
31+
To run all tests:
32+
33+
```
34+
mvn clean test
35+
```
36+
37+
System properties required for tests:
38+
**test.kafka_server** - Kafka broker instance address.
39+
**test.cluster_api_key** - Confluent API key.
40+
**test.cluster_api_secret** - Confluent API secret.
41+
**test.schema_registry_url** - Schema Registry URL.
42+
**test.schema_registry_api_key** - Schema Registry API key.
43+
**test.schema_registry_api_secret** - Schema Registry API secret.
44+
2945
Deployment
3046
----------
3147
You can deploy your plugins using the CDAP CLI:
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Confluent Streaming Sink
2+
3+
4+
Description
5+
-----------
6+
This sink writes data to Confluent.
7+
Sends message to specified Kafka topic per received record. It can also be
8+
configured to partition events being written to kafka based on a configurable key.
9+
The sink can also be configured to operate in sync or async mode and apply different
10+
compression types to events.
11+
Can be used with self-managed Confluent Platform or Confluent Cloud. Supports Schema Registry.
12+
13+
14+
Properties
15+
----------
16+
**Reference Name:** This will be used to uniquely identify this source for lineage, annotating metadata, etc.
17+
18+
**Kafka Brokers:** List of Kafka brokers specified in host1:port1,host2:port2 form. (Macro-enabled)
19+
20+
**Kafka Topic:** The Kafka topic to read from. (Macro-enabled)
21+
22+
**Async:** Specifies whether an acknowledgment is required from broker that message was received. Default is No.
23+
24+
**Compression Type:** Compression type to be applied on message.
25+
26+
**Time Field:** Optional name of the field containing the read time of the message.
27+
If this is not set, message will be send with current timestamp.
28+
If set, this field must be present in the input schema and must be a long.
29+
30+
**Key Field:** Optional name of the field containing the message key.
31+
If this is not set, message will be send without a key.
32+
If set, this field must be present in the schema property and must be of type bytes.
33+
34+
**Partition Field:** Optional name of the field containing the partition the message should be written to.
35+
If this is not set, default partition will be used for all messages.
36+
If set, this field must be present in the schema property and must be an int.
37+
38+
**Message Format:** Optional format a structured record should be converted to.
39+
Required if used without Schema Registry.
40+
41+
**Additional Kafka Producer Properties:** Additional Kafka producer properties to set.
42+
43+
**Cluster API Key:** The Confluent API Key used for the source.
44+
45+
**Cluster API Secret:** The Confluent API Secret used for the source.
46+
47+
**Schema Registry URL:** The Schema Registry endpoint URL.
48+
49+
**Schema Registry API Key:** The Schema Registry API Key.
50+
51+
**Schema Registry API Secret:** The Schema Registry API Secret.
52+
53+
Example
54+
-------
55+
This example writes structured record to kafka topic 'alarm' in asynchronous manner
56+
using compression type 'gzip'. The written events will be written in csv format
57+
to kafka running at localhost. The Kafka partition will be decided based on the provided key 'ts'.
58+
Additional properties like number of acknowledgements and client id can also be provided.
59+
60+
```json
61+
{
62+
"name": "Confluent",
63+
"type": "batchsink",
64+
"properties": {
65+
"referenceName": "Kafka",
66+
"brokers": "host1.example.com:9092,host2.example.com:9092",
67+
"topic": "alarm",
68+
"async": "true",
69+
"compressionType": "gzip",
70+
"format": "CSV",
71+
"kafkaProperties": "acks:2,client.id:myclient",
72+
"key": "message",
73+
"clusterApiKey": "",
74+
"clusterApiSecret": ""
75+
}
76+
}
77+
```
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
# Confluent Streaming Source
2+
3+
4+
Description
5+
-----------
6+
This source reads data from Confluent.
7+
Emits a record per message from specified Kafka topic.
8+
Can be used with self-managed Confluent Platform or Confluent Cloud. Supports Schema Registry.
9+
10+
Can be configured to parse values from source in following ways:
11+
1. User-defined format. Use **Message Format** field to choose any format supported by CDAP.
12+
1. Schema Registry. Requires credentials for Schema Registry to be specified.
13+
Uses Avro schemas to deserialize Kafka messages. Use **Get Schema** button to fetch key and value schemas from registry.
14+
1. Binary format. Used in case if no message format or Schema Registry credentials were provided.
15+
16+
17+
Properties
18+
----------
19+
**Reference Name:** This will be used to uniquely identify this source for lineage, annotating metadata, etc.
20+
21+
**Kafka Brokers:** List of Kafka brokers specified in host1:port1,host2:port2 form. (Macro-enabled)
22+
23+
**Kafka Topic:** The Kafka topic to read from. (Macro-enabled)
24+
25+
**Topic Partitions:** List of topic partitions to read from. If not specified, all partitions will be read. (Macro-enabled)
26+
27+
**Default Initial Offset:** The default initial offset for all topic partitions.
28+
An offset of -2 means the smallest offset. An offset of -1 means the latest offset. Defaults to -1.
29+
Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read.
30+
If you wish to set different initial offsets for different partitions, use the initialPartitionOffsets property. (Macro-enabled)
31+
32+
**Initial Partition Offsets:** The initial offset for each topic partition. If this is not specified,
33+
all partitions will use the same initial offset, which is determined by the defaultInitialOffset property.
34+
Any partitions specified in the partitions property, but not in this property will use the defaultInitialOffset.
35+
An offset of -2 means the smallest offset. An offset of -1 means the latest offset.
36+
Offsets are inclusive. If an offset of 5 is used, the message at offset 5 will be read. (Macro-enabled)
37+
38+
**Time Field:** Optional name of the field containing the read time of the batch.
39+
If this is not set, no time field will be added to output records.
40+
If set, this field must be present in the schema property and must be a long.
41+
42+
**Key Field:** Optional name of the field containing the message key.
43+
If this is not set, no key field will be added to output records.
44+
If set, this field must be present in the schema property and must be bytes.
45+
46+
**Partition Field:** Optional name of the field containing the partition the message was read from.
47+
If this is not set, no partition field will be added to output records.
48+
If set, this field must be present in the schema property and must be an int.
49+
50+
**Offset Field:** Optional name of the field containing the partition offset the message was read from.
51+
If this is not set, no offset field will be added to output records.
52+
If set, this field must be present in the schema property and must be a long.
53+
54+
**Message Format:** Optional format of the Kafka event message. Any format supported by CDAP is supported.
55+
For example, a value of 'csv' will attempt to parse Kafka payloads as comma-separated values.
56+
If no format is given, Kafka message payloads will be treated as bytes.
57+
58+
**Max Rate Per Partition:** Maximum number of records to read per second per partition. Defaults to 1000.
59+
60+
**Additional Kafka Consumer Properties:** Additional Kafka consumer properties to set.
61+
62+
**Cluster API Key:** The Confluent API Key used for the source.
63+
64+
**Cluster API Secret:** The Confluent API Secret used for the source.
65+
66+
**Schema Registry URL:** The Schema Registry endpoint URL.
67+
68+
**Schema Registry API Key:** The Schema Registry API Key.
69+
70+
**Schema Registry API Secret:** The Schema Registry API Secret.
71+
72+
**Value Field:** The name of the field containing the message value. Required to fetch schema from Schema Registry.
73+
74+
**Schema:** Output schema of the source. If you would like the output records to contain a field with the
75+
Kafka message key, the schema must include a field of type bytes/nullable bytes or string/nullable string, and you must
76+
set the **Key Field** property to that field's name. Similarly, if you would like the output records to contain a field
77+
with the timestamp of when the record was read, the schema must include a field of type long or nullable long, and you
78+
must set the **Time Field** property to that field's name. Any field that is not the **Time Field** or **Key Field**
79+
will be used in conjunction with the format to parse Kafka message payloads. If used with Schema Registry then should
80+
be fetched using **Get Schema** button.
81+
82+
Example
83+
-------
84+
***Example 1:*** Read from the 'purchases' topic of a Kafka instance running
85+
on brokers host1.example.com:9092 and host2.example.com:9092. The source will add
86+
a time field named 'readTime' that contains a timestamp corresponding to the micro
87+
batch when the record was read. It will also contain a field named 'key' which will have
88+
the message key in it. It parses the Kafka messages using the 'csv' format
89+
with 'user', 'item', 'count', and 'price' as the message schema.
90+
91+
```json
92+
{
93+
"name": "Confluent",
94+
"type": "streamingsource",
95+
"properties": {
96+
"topics": "purchases",
97+
"brokers": "host1.example.com:9092,host2.example.com:9092",
98+
"format": "csv",
99+
"timeField": "readTime",
100+
"keyField": "key",
101+
"clusterApiKey": "",
102+
"clusterApiSecret": "",
103+
"defaultInitialOffset": "-2",
104+
"schema": "{
105+
\"type\":\"record\",
106+
\"name\":\"purchase\",
107+
\"fields\":[
108+
{\"name\":\"readTime\",\"type\":\"long\"},
109+
{\"name\":\"key\",\"type\":\"bytes\"},
110+
{\"name\":\"user\",\"type\":\"string\"},
111+
{\"name\":\"item\",\"type\":\"string\"},
112+
{\"name\":\"count\",\"type\":\"int\"},
113+
{\"name\":\"price\",\"type\":\"double\"}
114+
]
115+
}"
116+
}
117+
}
118+
```
119+
120+
For each Kafka message read, it will output a record with the schema:
121+
122+
| field name | type |
123+
| ----------- | ---------------- |
124+
| readTime | long |
125+
| key | bytes |
126+
| user | string |
127+
| item | string |
128+
| count | int |
129+
| price | double |
130+
131+
Note that the readTime field is not derived from the Kafka message, but from the time that the
132+
message was read.
Loading
Loading

0 commit comments

Comments
 (0)