From cafb26dd338ce7f47a23db1e2adc932cdfea6606 Mon Sep 17 00:00:00 2001 From: Anush008 Date: Wed, 26 Jun 2024 02:53:27 +0530 Subject: [PATCH] docs: Rephrase README.md --- CONFLUENT.md | 31 +++++++++ KAFKA.md | 44 ++++++++++++ README.md | 69 +++++++------------ archive/manifest.json | 6 +- build.gradle | 4 +- message_samples/combination.json | 48 ------------- message_samples/named_sparse_vector.json | 38 ---------- message_samples/named_vector.json | 31 --------- message_samples/unnamed_vector.json | 20 ------ .../qdrant/kafka/QdrantSinkConnectorTest.java | 4 +- 10 files changed, 107 insertions(+), 188 deletions(-) create mode 100644 CONFLUENT.md create mode 100644 KAFKA.md delete mode 100644 message_samples/combination.json delete mode 100644 message_samples/named_sparse_vector.json delete mode 100644 message_samples/named_vector.json delete mode 100644 message_samples/unnamed_vector.json diff --git a/CONFLUENT.md b/CONFLUENT.md new file mode 100644 index 0000000..10212dd --- /dev/null +++ b/CONFLUENT.md @@ -0,0 +1,31 @@ +# Usage with Self-Hosted Kafka + +## Installation + +1) Download the latest connector zip file from [Github Releases](https://github.com/qdrant/qdrant-kafka/releases). + +2) Configure an environment and cluster on Confluent and create a topic to produce messages for. + +3) Navigate to the `Connectors` section of the Confluent cluster and click `Add Plugin`. Upload the zip file with the following info. + +Screenshot 2024-06-26 at 1 51 26 AM + +4) Once installed, navigate to the connector and set the following configuration values. + +Screenshot 2024-06-26 at 1 45 57 AM + +Replace the placeholder values with your credentials. + +5) Add the Qdrant instance host to the allowed networking endpoints. + +Screenshot 2024-06-26 at 2 46 16 AM + +7) Start the connector. + +## Usage + +You can now produce messages for the configured topic and they'll be written into the configured Qdrant instance. + +Screenshot 2024-06-26 at 2 50 56 AM + +Refer to the [message formats](https://github.com/qdrant/qdrant-kafka/blob/main/README.md#message-formats) for the available options when producing messages. diff --git a/KAFKA.md b/KAFKA.md new file mode 100644 index 0000000..80bf54c --- /dev/null +++ b/KAFKA.md @@ -0,0 +1,44 @@ +# Usage with Self-Hosted Kafka + +## Installation + +1) Download the latest connector zip file from [Github Releases](https://github.com/qdrant/qdrant-kafka/releases). + +2) Refer to the first 3 steps of the [Kafka Quickstart](https://kafka.apache.org/quickstart#quickstart_download) to set up a local Kafka instance and create a topic named `topic_0`. + +3) Navigate to the Kafka installation directory. + +4) Unzip and copy the `qdrant-kafka-xxx` directory to your Kafka installation's `libs` directory. + +5) Update the `connect-standalone.properties` file in your Kafka installation's `config` directory. + + ```properties + key.converter.schemas.enable=false + value.converter.schemas.enable=false + plugin.path=libs/qdrant-kafka-xxx + ``` + +6) Create a `qdrant-kafka.properties` file in your Kafka installation's `config` directory. + + ```properties + name=qdrant-kafka + connector.class=io.qdrant.kafka.QdrantSinkConnnector + qdrant.grpc.url=https://xyz-example.eu-central.aws.cloud.qdrant.io:6334 + qdrant.api.key= + topics=topic_0 + ``` + +7) Start Kafka Connect with the configured properties. + + ```sh + bin/connect-standalone.sh config/connect-standalone.properties config/qdrant-kafka.properties + ``` + +8) You can now produce messages for the `topic_0` topic and they'll be written into the configured Qdrant instance. + +```sh +bin/kafka-console-producer.sh --topic topic_0 --bootstrap-server localhost:9092 +> { "collection_name": "{collection_name}", "id": 1, "vector": [ 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8 ], "payload": { "name": "kafka", "description": "Kafka is a distributed streaming platform", "url": "https://kafka.apache.org/" } } +``` + +Refer to the [message formats](https://github.com/qdrant/qdrant-kafka/blob/main/README.md#message-formats) for the available options when producing messages. diff --git a/README.md b/README.md index 0451fc9..e2044c2 100644 --- a/README.md +++ b/README.md @@ -1,60 +1,29 @@ -# Qdrant Kafka Connector +# Qdrant Connector with Self-Hosted Kafka -Use Qdrant as a sink destination in [Kafka connect](https://docs.confluent.io/platform/current/connect/index.html). Supports streaming dense/sparse vectors into Qdrant collections. +Use Qdrant as a sink destination in [Kafka Connect](https://docs.confluent.io/platform/current/connect/index.html). Supports streaming dense/sparse vectors into Qdrant collections. -## Installation - -- Download the latest connector zip file from [Github Releases](https://github.com/qdrant/qdrant-kafka/releases). - -- Refer to the first 3 steps of the [Kafka Quickstart](https://kafka.apache.org/quickstart#quickstart_download) to set up a local Kafka instance and create a topic named `topic_0`. - -- Navigate to the Kafka installation directory. - -- Unzip and copy the `qdrant-kafka-xxx` directories to the `libs` directory of your Kafka installation. - -- Update the `connect-standalone.properties` file in the `config` directory of your Kafka installation. - - ```properties - key.converter.schemas.enable=false - value.converter.schemas.enable=false - plugin.path=libs/qdrant-kafka-xxx - ``` - -- Create a `qdrant-kafka.properties` file in the `config` directory of your Kafka installation. - - ```properties - name=qdrant-kafka - connector.class=io.qdrant.kafka.QdrantSinkConnnector - qdrant.grpc.url=https://xyz-example.eu-central.aws.cloud.qdrant.io:6334 - qdrant.api.key= - topics=topic_0 - ``` - -- Start the connector with the configured properties - - ```sh - bin/connect-standalone.sh config/connect-standalone.properties config/qdrant-kafka.properties - ``` - ## Usage > [!IMPORTANT] -> Before loading the data using this connector, a collection has to be [created](https://qdrant.tech/documentation/concepts/collections/#create-a-collection) in advance with the appropriate vector dimensions and configurations. +> Qdrant collections have to be [created](https://qdrant.tech/documentation/concepts/collections/#create-a-collection) in advance with the appropriate vector dimensions and configurations. -You can now produce messages with the following command to the `topic_0` topic you created and they'll be streamed to the configured Qdrant instance. +Learn to use the connector with -```sh -bin/kafka-console-producer.sh --topic topic_0 --bootstrap-server localhost:9092 -> { "collection_name": "{collection_name}", "id": 1, "vector": [ 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8 ], "payload": { "name": "kafka", "description": "Kafka is a distributed streaming platform", "url": "https://kafka.apache.org/" } } -``` +- [Kafka on Confluent Cloud](https://github.com/qdrant/qdrant-kafka/blob/main/CONFLUENT.md) + +- [Self-hosted Kafka](https://github.com/qdrant/qdrant-kafka/blob/main/KAFKA.md) + +## Message Formats -This sink connector supports ingesting multiple named/unnamed, dense/sparse vectors. +This sink connector supports messages with multiple dense/sparse vectors. _Click each to expand._
Unnamed/Default vector +Reference: [Creating a collection with a default vector](https://qdrant.tech/documentation/concepts/collections/#create-a-collection). + ```json { "collection_name": "{collection_name}", @@ -80,7 +49,9 @@ _Click each to expand._
- Named vector + Named multiple vectors + +Reference: [Creating a collection with multiple vectors](https://qdrant.tech/documentation/concepts/collections/#collection-with-multiple-vectors). ```json { @@ -121,11 +92,12 @@ _Click each to expand._
Sparse vectors +Reference: [Creating a collection with sparse vectors](https://qdrant.tech/documentation/concepts/collections/#collection-with-sparse-vectors). + ```json { "collection_name": "{collection_name}", "id": 1, - "shard_key_selector": [5235], "vector": { "some-sparse": { "indices": [ @@ -167,11 +139,16 @@ _Click each to expand._
Combination of named dense and sparse vectors +Reference: + +- [Creating a collection with multiple vectors](https://qdrant.tech/documentation/concepts/collections/#collection-with-multiple-vectors). + +- [Creating a collection with sparse vectors](https://qdrant.tech/documentation/concepts/collections/#collection-with-sparse-vectors). + ```json { "collection_name": "{collection_name}", "id": "a10435b5-2a58-427a-a3a0-a5d845b147b7", - "shard_key_selector": ["some-key"], "vector": { "some-other-dense": [ 0.1, diff --git a/archive/manifest.json b/archive/manifest.json index a0d7312..d0f8f03 100644 --- a/archive/manifest.json +++ b/archive/manifest.json @@ -1,8 +1,8 @@ { "name": "qdrant-kafka", "version": "${project.version}", - "title": "Qdrant Sink Connector for Apache Kafka", - "description": "The official Kafka Sink Connector for Qdrant.", + "title": "Qdrant Connector for Apache Kafka", + "description": "Connector to use Qdrant as a sink destination in Kafka Connect.", "owner": { "username": "qdrant", "name": "Qdrant", @@ -36,7 +36,7 @@ }, "logo": "assets/qdrant_logo.png", "documentation_url": "https://github.com/qdrant/qdrant-kafka/blob/main/README.md", - "source_url": "https://github.com/qdrant/qdrant-kafka/tree/main", + "source_url": "https://github.com/qdrant/qdrant-kafka/", "docker_image": {}, "license": [ { diff --git a/build.gradle b/build.gradle index 01fec7b..3b66e75 100644 --- a/build.gradle +++ b/build.gradle @@ -45,8 +45,9 @@ dependencies { implementation "org.apache.kafka:connect-api:$kafkaVersion" implementation 'io.qdrant:client:1.9.1' implementation 'io.grpc:grpc-protobuf:1.59.0' + implementation "io.grpc:grpc-netty-shaded:1.59.0" implementation 'com.google.guava:guava:33.2.1-jre' - implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.1' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2' implementation 'com.google.protobuf:protobuf-java-util:3.25.3' implementation 'org.slf4j:slf4j-api:2.0.13' @@ -85,6 +86,7 @@ spotless { } shadowJar { + relocate 'io.grpc', 'shadow.grpc' mergeServiceFiles() archiveClassifier.set('') } diff --git a/message_samples/combination.json b/message_samples/combination.json deleted file mode 100644 index a17f0e5..0000000 --- a/message_samples/combination.json +++ /dev/null @@ -1,48 +0,0 @@ -{ - "collection_name": "kafka", - "id": "a10435b5-2a58-427a-a3a0-a5d845b147b7", - "shard_key_selector": ["some-key"], - "vector": { - "some-other-dense": [ - 0.1, - 0.2, - 0.3, - 0.4, - 0.5, - 0.6, - 0.7, - 0.8 - ], - "some-sparse": { - "indices": [ - 0, - 1, - 2, - 3, - 4, - 5, - 6, - 7, - 8, - 9 - ], - "values": [ - 0.1, - 0.2, - 0.3, - 0.4, - 0.5, - 0.6, - 0.7, - 0.8, - 0.9, - 1.0 - ] - } - }, - "payload": { - "name": "kafka", - "description": "Kafka is a distributed streaming platform", - "url": "https://kafka.apache.org/" - } -} \ No newline at end of file diff --git a/message_samples/named_sparse_vector.json b/message_samples/named_sparse_vector.json deleted file mode 100644 index 8c53dcc..0000000 --- a/message_samples/named_sparse_vector.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "collection_name": "kafka", - "id": 1, - "shard_key_selector": [5235], - "vector": { - "some-sparse": { - "indices": [ - 0, - 1, - 2, - 3, - 4, - 5, - 6, - 7, - 8, - 9 - ], - "values": [ - 0.1, - 0.2, - 0.3, - 0.4, - 0.5, - 0.6, - 0.7, - 0.8, - 0.9, - 1.0 - ] - } - }, - "payload": { - "name": "kafka", - "description": "Kafka is a distributed streaming platform", - "url": "https://kafka.apache.org/" - } -} \ No newline at end of file diff --git a/message_samples/named_vector.json b/message_samples/named_vector.json deleted file mode 100644 index 4b2e47c..0000000 --- a/message_samples/named_vector.json +++ /dev/null @@ -1,31 +0,0 @@ -{ - "collection_name": "kafka", - "id": 1, - "vector": { - "some-dense": [ - 0.1, - 0.2, - 0.3, - 0.4, - 0.5, - 0.6, - 0.7, - 0.8 - ], - "some-other-dense": [ - 0.1, - 0.2, - 0.3, - 0.4, - 0.5, - 0.6, - 0.7, - 0.8 - ] - }, - "payload": { - "name": "kafka", - "description": "Kafka is a distributed streaming platform", - "url": "https://kafka.apache.org/" - } -} \ No newline at end of file diff --git a/message_samples/unnamed_vector.json b/message_samples/unnamed_vector.json deleted file mode 100644 index 06da83b..0000000 --- a/message_samples/unnamed_vector.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "collection_name": "kafka", - "id": 1, - "shard_key_selector": [5342, "some-other-key"], - "vector": [ - 0.1, - 0.2, - 0.3, - 0.4, - 0.5, - 0.6, - 0.7, - 0.8 - ], - "payload": { - "name": "kafka", - "description": "Kafka is a distributed streaming platform", - "url": "https://kafka.apache.org/" - } -} \ No newline at end of file diff --git a/src/intTest/java/io/qdrant/kafka/QdrantSinkConnectorTest.java b/src/intTest/java/io/qdrant/kafka/QdrantSinkConnectorTest.java index fe1d2a6..9adae95 100644 --- a/src/intTest/java/io/qdrant/kafka/QdrantSinkConnectorTest.java +++ b/src/intTest/java/io/qdrant/kafka/QdrantSinkConnectorTest.java @@ -1,6 +1,7 @@ /* (C)2024 */ package io.qdrant.kafka; +import java.util.UUID; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,8 @@ public void testSparseVector() throws Exception { int sparseVecCount = randomPositiveInt(100); for (int i = 0; i < pointsCount; i++) { - writeSparseVector(sparseVecCollection, i, sparseVecName, sparseVecCount); + String uuid = UUID.randomUUID().toString(); + writeSparseVector(sparseVecCollection, uuid, sparseVecName, sparseVecCount); } waitForPoints(sparseVecCollection, pointsCount);