-
Notifications
You must be signed in to change notification settings - Fork 0
(chore): migrate to using confluent's kafka library #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
aef7658
b773d65
774cdac
0d6b47a
57404b1
cd51791
9955283
a9309d8
078f906
dcd67cb
8119f5f
e859f49
cca14f2
eaa26e8
8f35fa4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
# kafka-wrapper | ||
A simple kafka wrapper for `node-rdkafka` client. | ||
A simple kafka wrapper for `@confluentinc/kafka-javascript` client. |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,34 @@ | ||||||
/// <reference types="node" /> | ||||||
import Client from './client'; | ||||||
import { GlobalConfig } from '@confluentinc/kafka-javascript'; | ||||||
import EventEmitter from 'events'; | ||||||
declare class KafkaAdmin extends Client { | ||||||
private adminClient; | ||||||
/** | ||||||
* Initialzes a KafkaAdmin client with config. | ||||||
* Requires using connect() function after initalizing. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a small typo in the documentation comment:
Suggested change
Spotted by Diamond |
||||||
* @param {string} clientId - id of client performing request | ||||||
* @param {object} config - global kafka config | ||||||
* @param {object} emitter - emitter to emit log event | ||||||
*/ | ||||||
constructor(clientId: string, config: GlobalConfig, emitter: EventEmitter); | ||||||
/** | ||||||
* Connect to kafka server as admin. | ||||||
*/ | ||||||
connect(): Promise<void>; | ||||||
createTopic(topic: any, timeout: any, actionPostTopicCreation: any): void; | ||||||
deleteTopic(topic: any, timeout: any, actionPostTopicDeletion: any): void; | ||||||
/** | ||||||
* Create new partitions for a topic. | ||||||
* @param {string} `topic | ||||||
* @param {number} totalPartitions: The total number of partitions topic should have after request. | ||||||
* @param {number} timeout | ||||||
* @param {function} actionPostPartitionCreation | ||||||
*/ | ||||||
createPartitions(topic: any, totalPartitions: any, timeout: any, actionPostPartitionCreation: any): void; | ||||||
/** | ||||||
* Synchronous method. | ||||||
*/ | ||||||
disconnect(): void; | ||||||
} | ||||||
export default KafkaAdmin; |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
/// <reference types="node" /> | ||
import EventEmitter from "events"; | ||
import { GlobalConfig, LibrdKafkaError, TopicConfig } from "@confluentinc/kafka-javascript"; | ||
export default class Client { | ||
private clientId; | ||
private clientType; | ||
protected config: GlobalConfig; | ||
protected topicConfig: TopicConfig; | ||
private emitter; | ||
constructor(clientId: string, clientType: string, config: GlobalConfig, topicConfig: TopicConfig, emitter: EventEmitter); | ||
_logMessage(msgType: 'log' | 'success' | 'error', message: string, data: any): void; | ||
log(message: string, data?: any): void; | ||
success(message: string, data?: any): void; | ||
error(err: string, data?: any): void; | ||
} | ||
export declare type ErrorHandlingFunction = (err: LibrdKafkaError) => void; |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,71 @@ | ||||||
/// <reference types="node" /> | ||||||
import { ConsumerGlobalConfig, ConsumerTopicConfig, LibrdKafkaError, Message, SubscribeTopicList } from '@confluentinc/kafka-javascript'; | ||||||
import { EventEmitter } from 'stream'; | ||||||
import Client from './client'; | ||||||
export declare type ConsumeActionFunction = (err: LibrdKafkaError, messages: Message[]) => void; | ||||||
export declare type ListenActionFunction = (arg: Message) => void; | ||||||
declare class KafkaConsumer extends Client { | ||||||
private consumer; | ||||||
/** | ||||||
* Initializes a KafkaConsumer. | ||||||
* @param {String} clientId: id to identify a client consuming the message. | ||||||
* @param {String} groupId: consumer group id, the consumer belongs to. | ||||||
* @param {import('@confluentinc/kafka-javascript').ConsumerGlobalConfig} config: configs for consumer. | ||||||
* @param {import('@confluentinc/kafka-javascript').ConsumerTopicConfig} topicConfig: topic configs | ||||||
* @param {EventEmitter} emitter: to emit log events | ||||||
*/ | ||||||
constructor(clientId: string, groupId: string, config: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, emitter: EventEmitter); | ||||||
/** | ||||||
* Asynchronous function which connects to kafka cluster. | ||||||
* Resolves when connection is ready. | ||||||
* | ||||||
* @returns {Promise} | ||||||
*/ | ||||||
connect(): Promise<this | LibrdKafkaError>; | ||||||
/** | ||||||
* Subscribe to topics. | ||||||
* @param {import('@confluentinc/kafka-javascript').SubscribeTopicList} topics: array of topic names. | ||||||
* @returns {KafkaConsumer} | ||||||
*/ | ||||||
subscribe(topics: SubscribeTopicList): this; | ||||||
/** | ||||||
* Unsubscribe from all the subscribed topics.s | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a small typo in the comment:
Suggested change
Spotted by Diamond |
||||||
* @returns {KafkaConsumer} | ||||||
*/ | ||||||
unsubscribe(): this; | ||||||
/** | ||||||
* Consumes message one-by-one and executes actionsOnData callback | ||||||
* on the message read. | ||||||
* | ||||||
* NOTE: Needs to be called in infinite loop to have it consuming messages continuously. | ||||||
* | ||||||
* @param {Function} actionOnData: callback to return when message is read. | ||||||
*/ | ||||||
consume(actionOnData: ConsumeActionFunction): void; | ||||||
/** | ||||||
* Consumes messages in a batch and executes actionsOnData callback | ||||||
* on the message read. | ||||||
* | ||||||
* NOTE: Needs to be called in infinite loop to have it consuming messages continuously. | ||||||
* | ||||||
* @param {Number} msgCount: number of messages to read. | ||||||
* @param {Function} actionOnData: callback to be executed for each message. | ||||||
*/ | ||||||
consumeBatch(msgCount: number, actionOnData: ConsumeActionFunction): void; | ||||||
/** | ||||||
* Listens to subscribed topic in flowing mode. Triggers a thread in background which keeps polling for events. | ||||||
* | ||||||
* @param {Function} actionOnData | ||||||
*/ | ||||||
listen(actionOnData: ListenActionFunction): void; | ||||||
_wrapConsumeCallbackWrapper(actionOnData: any): (err: any, msgs: any) => void; | ||||||
_wrapListenCallbackWrapper(actionOnData: any): (msg: any) => void; | ||||||
/** | ||||||
* Parses message before passing it to consumer callback. | ||||||
* @param {Object} msg - expects it to be in @confluentinc/kafka-javascript msg format. | ||||||
* @returns | ||||||
*/ | ||||||
_parseMessage(msg: any): any; | ||||||
} | ||||||
declare function getKafkaConsumer(clientId: string, groupId: string, config: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, emitter: EventEmitter, createNew?: boolean): KafkaConsumer; | ||||||
export default getKafkaConsumer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a small typo in the method documentation:
Initialzes
should be corrected toInitializes
.Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.