diff --git a/README.md b/README.md index ef871ef..dd19a25 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ # kafka-wrapper -A simple kafka wrapper for `node-rdkafka` client. +A simple kafka wrapper for `@confluentinc/kafka-javascript` client. diff --git a/lib/admin.d.ts b/lib/admin.d.ts new file mode 100644 index 0000000..18561cf --- /dev/null +++ b/lib/admin.d.ts @@ -0,0 +1,34 @@ +/// +import Client from './client'; +import { GlobalConfig } from '@confluentinc/kafka-javascript'; +import EventEmitter from 'events'; +declare class KafkaAdmin extends Client { + private adminClient; + /** + * Initialzes a KafkaAdmin client with config. + * Requires using connect() function after initalizing. + * @param {string} clientId - id of client performing request + * @param {object} config - global kafka config + * @param {object} emitter - emitter to emit log event + */ + constructor(clientId: string, config: GlobalConfig, emitter: EventEmitter); + /** + * Connect to kafka server as admin. + */ + connect(): Promise; + createTopic(topic: any, timeout: any, actionPostTopicCreation: any): void; + deleteTopic(topic: any, timeout: any, actionPostTopicDeletion: any): void; + /** + * Create new partitions for a topic. + * @param {string} `topic + * @param {number} totalPartitions: The total number of partitions topic should have after request. + * @param {number} timeout + * @param {function} actionPostPartitionCreation + */ + createPartitions(topic: any, totalPartitions: any, timeout: any, actionPostPartitionCreation: any): void; + /** + * Synchronous method. + */ + disconnect(): void; +} +export default KafkaAdmin; diff --git a/lib/admin.js b/lib/admin.js new file mode 100644 index 0000000..f5c58ab --- /dev/null +++ b/lib/admin.js @@ -0,0 +1,77 @@ +"use strict"; +var __importDefault = (this && this.__importDefault) || function (mod) { + return (mod && mod.__esModule) ? mod : { "default": mod }; +}; +Object.defineProperty(exports, "__esModule", { value: true }); +const client_1 = __importDefault(require("./client")); +const node_rdkafka_1 = require("@confluentinc/kafka-javascript"); +class KafkaAdmin extends client_1.default { + adminClient; + /** + * Initialzes a KafkaAdmin client with config. + * Requires using connect() function after initalizing. + * @param {string} clientId - id of client performing request + * @param {object} config - global kafka config + * @param {object} emitter - emitter to emit log event + */ + constructor(clientId, config, emitter) { + super(clientId, 'admin', config, {}, emitter); + this.adminClient = null; + } + /** + * Connect to kafka server as admin. + */ + async connect() { + try { + if (this.adminClient === null) { + this.adminClient = await node_rdkafka_1.AdminClient.create(this.config); + } + this.success('Successfully connected to kafka as admin'); + } + catch (err) { + this.error('Encountered error while connecting to kafka as admin', err); + } + } + createTopic(topic, timeout, actionPostTopicCreation) { + try { + this.adminClient.createTopic(topic, timeout, actionPostTopicCreation); + this.success('Successfully created new topic.', topic.topic); + } + catch (err) { + this.error(`Encountered error while creating topic=${topic}:`, err); + } + } + deleteTopic(topic, timeout, actionPostTopicDeletion) { + try { + this.adminClient.deleteTopic(topic, timeout, actionPostTopicDeletion); + this.success('Successfully deleted a topic.', topic); + } + catch (err) { + this.error(`Encountered error while deleting topic=${topic}.`, err); + } + } + /** + * Create new partitions for a topic. + * @param {string} `topic + * @param {number} totalPartitions: The total number of partitions topic should have after request. + * @param {number} timeout + * @param {function} actionPostPartitionCreation + */ + createPartitions(topic, totalPartitions, timeout, actionPostPartitionCreation) { + try { + this.adminClient.createPartitions(topic, totalPartitions, timeout, actionPostPartitionCreation); + this.success(`Successfully created new topic partitons: topic=${topic}, totalParitions=${totalPartitions}`); + } + catch (err) { + this.error(`Encountered error while creating new partitions for topic: topic=${topic}, totalPartitons=${totalPartitions}`, err); + } + } + /** + * Synchronous method. + */ + disconnect() { + this.adminClient.disconnect(); + } +} +exports.default = KafkaAdmin; +//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoiYWRtaW4uanMiLCJzb3VyY2VSb290IjoiIiwic291cmNlcyI6WyIuLi9zcmMvYWRtaW4udHMiXSwibmFtZXMiOltdLCJtYXBwaW5ncyI6Ijs7Ozs7QUFBQSxzREFBOEI7QUFDOUIsK0NBQXlEO0FBR3pELE1BQU0sVUFBVyxTQUFRLGdCQUFNO0lBQ25CLFdBQVcsQ0FBQztJQUVwQjs7Ozs7O09BTUc7SUFDSCxZQUFZLFFBQWdCLEVBQUUsTUFBb0IsRUFBRSxPQUFxQjtRQUNyRSxLQUFLLENBQUMsUUFBUSxFQUFFLE9BQU8sRUFBRSxNQUFNLEVBQUUsRUFBRSxFQUFFLE9BQU8sQ0FBQyxDQUFDO1FBQzlDLElBQUksQ0FBQyxXQUFXLEdBQUcsSUFBSSxDQUFDO0lBQzVCLENBQUM7SUFFRDs7T0FFRztJQUNILEtBQUssQ0FBQyxPQUFPO1FBQ1QsSUFBSTtZQUNBLElBQUksSUFBSSxDQUFDLFdBQVcsS0FBSyxJQUFJLEVBQUU7Z0JBQzNCLElBQUksQ0FBQyxXQUFXLEdBQUcsTUFBTSwwQkFBVyxDQUFDLE1BQU0sQ0FBQyxJQUFJLENBQUMsTUFBTSxDQUFDLENBQUM7YUFDNUQ7WUFDRCxJQUFJLENBQUMsT0FBTyxDQUFDLDBDQUEwQyxDQUFDLENBQUM7U0FDNUQ7UUFBQyxPQUFPLEdBQUcsRUFBRTtZQUNWLElBQUksQ0FBQyxLQUFLLENBQUMsc0RBQXNELEVBQUUsR0FBRyxDQUFDLENBQUM7U0FDM0U7SUFDTCxDQUFDO0lBRUQsV0FBVyxDQUFDLEtBQUssRUFBRSxPQUFPLEVBQUUsdUJBQXVCO1FBQy9DLElBQUk7WUFDQSxJQUFJLENBQUMsV0FBVyxDQUFDLFdBQVcsQ0FBQyxLQUFLLEVBQUUsT0FBTyxFQUFFLHVCQUF1QixDQUFDLENBQUM7WUFDdEUsSUFBSSxDQUFDLE9BQU8sQ0FBQyxpQ0FBaUMsRUFBRSxLQUFLLENBQUMsS0FBSyxDQUFDLENBQUM7U0FDaEU7UUFBQyxPQUFPLEdBQUcsRUFBRTtZQUNWLElBQUksQ0FBQyxLQUFLLENBQUMsMENBQTBDLEtBQUssR0FBRyxFQUFFLEdBQUcsQ0FBQyxDQUFDO1NBQ3ZFO0lBQ0wsQ0FBQztJQUVELFdBQVcsQ0FBQyxLQUFLLEVBQUUsT0FBTyxFQUFFLHVCQUF1QjtRQUMvQyxJQUFJO1lBQ0EsSUFBSSxDQUFDLFdBQVcsQ0FBQyxXQUFXLENBQUMsS0FBSyxFQUFFLE9BQU8sRUFBRSx1QkFBdUIsQ0FBQyxDQUFDO1lBQ3RFLElBQUksQ0FBQyxPQUFPLENBQUMsK0JBQStCLEVBQUUsS0FBSyxDQUFDLENBQUM7U0FDeEQ7UUFBQyxPQUFPLEdBQUcsRUFBRTtZQUNWLElBQUksQ0FBQyxLQUFLLENBQUMsMENBQTBDLEtBQUssR0FBRyxFQUFFLEdBQUcsQ0FBQyxDQUFDO1NBQ3ZFO0lBRUwsQ0FBQztJQUVEOzs7Ozs7T0FNRztJQUNILGdCQUFnQixDQUFDLEtBQUssRUFBRSxlQUFlLEVBQUUsT0FBTyxFQUFFLDJCQUEyQjtRQUN6RSxJQUFJO1lBQ0EsSUFBSSxDQUFDLFdBQVcsQ0FBQyxnQkFBZ0IsQ0FBQyxLQUFLLEVBQUUsZUFBZSxFQUFFLE9BQU8sRUFBRSwyQkFBMkIsQ0FBQyxDQUFDO1lBQ2hHLElBQUksQ0FBQyxPQUFPLENBQUMsbURBQW1ELEtBQUssb0JBQW9CLGVBQWUsRUFBRSxDQUFDLENBQUM7U0FDL0c7UUFBQyxPQUFPLEdBQUcsRUFBRTtZQUNWLElBQUksQ0FBQyxLQUFLLENBQ04sb0VBQW9FLEtBQUssb0JBQW9CLGVBQWUsRUFBRSxFQUM5RyxHQUFHLENBQ04sQ0FBQztTQUNMO0lBQ0wsQ0FBQztJQUVEOztPQUVHO0lBQ0gsVUFBVTtRQUNOLElBQUksQ0FBQyxXQUFXLENBQUMsVUFBVSxFQUFFLENBQUM7SUFDbEMsQ0FBQztDQUNKO0FBRUQsa0JBQWUsVUFBVSxDQUFDIn0= \ No newline at end of file diff --git a/lib/client.d.ts b/lib/client.d.ts new file mode 100644 index 0000000..db8804b --- /dev/null +++ b/lib/client.d.ts @@ -0,0 +1,16 @@ +/// +import EventEmitter from "events"; +import { GlobalConfig, LibrdKafkaError, TopicConfig } from "@confluentinc/kafka-javascript"; +export default class Client { + private clientId; + private clientType; + protected config: GlobalConfig; + protected topicConfig: TopicConfig; + private emitter; + constructor(clientId: string, clientType: string, config: GlobalConfig, topicConfig: TopicConfig, emitter: EventEmitter); + _logMessage(msgType: 'log' | 'success' | 'error', message: string, data: any): void; + log(message: string, data?: any): void; + success(message: string, data?: any): void; + error(err: string, data?: any): void; +} +export declare type ErrorHandlingFunction = (err: LibrdKafkaError) => void; diff --git a/lib/client.js b/lib/client.js new file mode 100644 index 0000000..5074914 --- /dev/null +++ b/lib/client.js @@ -0,0 +1,53 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +class Client { + clientId; + clientType; + config; + topicConfig; + emitter; + constructor(clientId, clientType, config, topicConfig, emitter) { + this.clientId = clientId; + this.clientType = clientType; + this.config = config; + this.topicConfig = topicConfig; + this.emitter = emitter; + this.clientId = clientId; + this.clientType = clientType; + // common config defaults should go here. + this.config = Object.assign({ + 'metadata.broker.list': 'localhost:9092', + 'socket.keepalive.enable': true, + }, config, { 'client.id': clientId }); + // commong topic configs defaults should go here. + this.topicConfig = topicConfig; + this.emitter = emitter; + } + _logMessage(msgType, message, data) { + if (this.emitter != null) { + this.emitter.emit(msgType, { + clientId: this.clientId, + clientType: this.clientType, + message, + data, + }); + } + else if (msgType === 'error') { + console.error(this.clientId, this.clientType, message, typeof data !== 'undefined' ? data : ''); + } + else { + console.log(this.clientId, this.clientType, message, typeof data !== 'undefined' ? data : ''); + } + } + log(message, data) { + this._logMessage('log', message, data); + } + success(message, data) { + this._logMessage('success', message, data); + } + error(err, data) { + this._logMessage('error', err, data); + } +} +exports.default = Client; +//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoiY2xpZW50LmpzIiwic291cmNlUm9vdCI6IiIsInNvdXJjZXMiOlsiLi4vc3JjL2NsaWVudC50cyJdLCJuYW1lcyI6W10sIm1hcHBpbmdzIjoiOztBQUdBLE1BQXFCLE1BQU07SUFDSDtJQUNSO0lBQThCO0lBQWdDO0lBQWtDO0lBRDVHLFlBQW9CLFFBQWdCLEVBQ3hCLFVBQWtCLEVBQVksTUFBb0IsRUFBWSxXQUF3QixFQUFVLE9BQXFCO1FBRDdHLGFBQVEsR0FBUixRQUFRLENBQVE7UUFDeEIsZUFBVSxHQUFWLFVBQVUsQ0FBUTtRQUFZLFdBQU0sR0FBTixNQUFNLENBQWM7UUFBWSxnQkFBVyxHQUFYLFdBQVcsQ0FBYTtRQUFVLFlBQU8sR0FBUCxPQUFPLENBQWM7UUFDN0gsSUFBSSxDQUFDLFFBQVEsR0FBRyxRQUFRLENBQUM7UUFDekIsSUFBSSxDQUFDLFVBQVUsR0FBRyxVQUFVLENBQUM7UUFFN0IseUNBQXlDO1FBQ3pDLElBQUksQ0FBQyxNQUFNLEdBQUcsTUFBTSxDQUFDLE1BQU0sQ0FBQztZQUN4QixzQkFBc0IsRUFBRSxnQkFBZ0I7WUFDeEMseUJBQXlCLEVBQUUsSUFBSTtTQUNoQyxFQUNELE1BQU0sRUFDTixFQUFFLFdBQVcsRUFBRSxRQUFRLEVBQUUsQ0FDMUIsQ0FBQztRQUNGLGtEQUFrRDtRQUNsRCxJQUFJLENBQUMsV0FBVyxHQUFHLFdBQVcsQ0FBQztRQUMvQixJQUFJLENBQUMsT0FBTyxHQUFHLE9BQU8sQ0FBQztJQUMzQixDQUFDO0lBRUQsV0FBVyxDQUFDLE9BQW9DLEVBQUUsT0FBZSxFQUFFLElBQVM7UUFDeEUsSUFBSSxJQUFJLENBQUMsT0FBTyxJQUFJLElBQUksRUFBRTtZQUN0QixJQUFJLENBQUMsT0FBTyxDQUFDLElBQUksQ0FBQyxPQUFPLEVBQUU7Z0JBQ3ZCLFFBQVEsRUFBRSxJQUFJLENBQUMsUUFBUTtnQkFDdkIsVUFBVSxFQUFFLElBQUksQ0FBQyxVQUFVO2dCQUMzQixPQUFPO2dCQUNQLElBQUk7YUFDUCxDQUFDLENBQUM7U0FDTjthQUFNLElBQUksT0FBTyxLQUFLLE9BQU8sRUFBRTtZQUM1QixPQUFPLENBQUMsS0FBSyxDQUFDLElBQUksQ0FBQyxRQUFRLEVBQUUsSUFBSSxDQUFDLFVBQVUsRUFBRSxPQUFPLEVBQUUsT0FBTyxJQUFJLEtBQUssV0FBVyxDQUFDLENBQUMsQ0FBQyxJQUFJLENBQUMsQ0FBQyxDQUFDLEVBQUUsQ0FBQyxDQUFDO1NBQ25HO2FBQU07WUFDSCxPQUFPLENBQUMsR0FBRyxDQUFDLElBQUksQ0FBQyxRQUFRLEVBQUUsSUFBSSxDQUFDLFVBQVUsRUFBRSxPQUFPLEVBQUUsT0FBTyxJQUFJLEtBQUssV0FBVyxDQUFDLENBQUMsQ0FBQyxJQUFJLENBQUMsQ0FBQyxDQUFDLEVBQUUsQ0FBQyxDQUFDO1NBQ2pHO0lBQ0wsQ0FBQztJQUVELEdBQUcsQ0FBQyxPQUFlLEVBQUUsSUFBVTtRQUMzQixJQUFJLENBQUMsV0FBVyxDQUFDLEtBQUssRUFBRSxPQUFPLEVBQUUsSUFBSSxDQUFDLENBQUM7SUFDM0MsQ0FBQztJQUVELE9BQU8sQ0FBQyxPQUFlLEVBQUUsSUFBVTtRQUMvQixJQUFJLENBQUMsV0FBVyxDQUFDLFNBQVMsRUFBRSxPQUFPLEVBQUUsSUFBSSxDQUFDLENBQUM7SUFDL0MsQ0FBQztJQUVELEtBQUssQ0FBQyxHQUFXLEVBQUUsSUFBVTtRQUN6QixJQUFJLENBQUMsV0FBVyxDQUFDLE9BQU8sRUFBRSxHQUFHLEVBQUUsSUFBSSxDQUFDLENBQUM7SUFDekMsQ0FBQztDQUNKO0FBN0NELHlCQTZDQyJ9 \ No newline at end of file diff --git a/lib/consumer.d.ts b/lib/consumer.d.ts new file mode 100644 index 0000000..20c1301 --- /dev/null +++ b/lib/consumer.d.ts @@ -0,0 +1,71 @@ +/// +import { ConsumerGlobalConfig, ConsumerTopicConfig, LibrdKafkaError, Message, SubscribeTopicList } from '@confluentinc/kafka-javascript'; +import { EventEmitter } from 'stream'; +import Client from './client'; +export declare type ConsumeActionFunction = (err: LibrdKafkaError, messages: Message[]) => void; +export declare type ListenActionFunction = (arg: Message) => void; +declare class KafkaConsumer extends Client { + private consumer; + /** + * Initializes a KafkaConsumer. + * @param {String} clientId: id to identify a client consuming the message. + * @param {String} groupId: consumer group id, the consumer belongs to. + * @param {import('@confluentinc/kafka-javascript').ConsumerGlobalConfig} config: configs for consumer. + * @param {import('@confluentinc/kafka-javascript').ConsumerTopicConfig} topicConfig: topic configs + * @param {EventEmitter} emitter: to emit log events + */ + constructor(clientId: string, groupId: string, config: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, emitter: EventEmitter); + /** + * Asynchronous function which connects to kafka cluster. + * Resolves when connection is ready. + * + * @returns {Promise} + */ + connect(): Promise; + /** + * Subscribe to topics. + * @param {import('@confluentinc/kafka-javascript').SubscribeTopicList} topics: array of topic names. + * @returns {KafkaConsumer} + */ + subscribe(topics: SubscribeTopicList): this; + /** + * Unsubscribe from all the subscribed topics.s + * @returns {KafkaConsumer} + */ + unsubscribe(): this; + /** + * Consumes message one-by-one and executes actionsOnData callback + * on the message read. + * + * NOTE: Needs to be called in infinite loop to have it consuming messages continuously. + * + * @param {Function} actionOnData: callback to return when message is read. + */ + consume(actionOnData: ConsumeActionFunction): void; + /** + * Consumes messages in a batch and executes actionsOnData callback + * on the message read. + * + * NOTE: Needs to be called in infinite loop to have it consuming messages continuously. + * + * @param {Number} msgCount: number of messages to read. + * @param {Function} actionOnData: callback to be executed for each message. + */ + consumeBatch(msgCount: number, actionOnData: ConsumeActionFunction): void; + /** + * Listens to subscribed topic in flowing mode. Triggers a thread in background which keeps polling for events. + * + * @param {Function} actionOnData + */ + listen(actionOnData: ListenActionFunction): void; + _wrapConsumeCallbackWrapper(actionOnData: any): (err: any, msgs: any) => void; + _wrapListenCallbackWrapper(actionOnData: any): (msg: any) => void; + /** + * Parses message before passing it to consumer callback. + * @param {Object} msg - expects it to be in @confluentinc/kafka-javascript msg format. + * @returns + */ + _parseMessage(msg: any): any; +} +declare function getKafkaConsumer(clientId: string, groupId: string, config: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, emitter: EventEmitter, createNew?: boolean): KafkaConsumer; +export default getKafkaConsumer; diff --git a/lib/consumer.js b/lib/consumer.js new file mode 100644 index 0000000..cc6a2d8 --- /dev/null +++ b/lib/consumer.js @@ -0,0 +1,201 @@ +"use strict"; +var __importDefault = (this && this.__importDefault) || function (mod) { + return (mod && mod.__esModule) ? mod : { "default": mod }; +}; +Object.defineProperty(exports, "__esModule", { value: true }); +const node_rdkafka_1 = __importDefault(require("@confluentinc/kafka-javascript")); +const client_1 = __importDefault(require("./client")); +let _kafkaConsumer = null; +class KafkaConsumer extends client_1.default { + consumer; + /** + * Initializes a KafkaConsumer. + * @param {String} clientId: id to identify a client consuming the message. + * @param {String} groupId: consumer group id, the consumer belongs to. + * @param {import('@confluentinc/kafka-javascript').ConsumerGlobalConfig} config: configs for consumer. + * @param {import('@confluentinc/kafka-javascript').ConsumerTopicConfig} topicConfig: topic configs + * @param {EventEmitter} emitter: to emit log events + */ + constructor(clientId, groupId, config, topicConfig, emitter) { + // consumer specific default configs we would like to have + config = Object.assign({ + 'allow.auto.create.topics': true, + }, config, { + 'group.id': groupId, + }); + super(clientId, 'consumer', config, topicConfig, emitter); + this.consumer = new node_rdkafka_1.default.KafkaConsumer(this.config, this.topicConfig); + } + /** + * Asynchronous function which connects to kafka cluster. + * Resolves when connection is ready. + * + * @returns {Promise} + */ + connect() { + return new Promise((resolve, reject) => { + try { + this.consumer + .connect() + .on('ready', (info, metadata) => { + this.success('Consumer connected to kafka cluster....', { + name: info.name, + }); + resolve(this); + }) + .on('connection.failure', (err, clientMetrics) => { + this.error('Consumer encountered error while connecting to Kafka.', JSON.stringify(err)); + reject(err); + }) + .on('event.error', (err) => { + this.error('Consumer encountered error.', JSON.stringify(err)); + reject(err); + }) + .on('event.log', (eventData) => this.log('Logging consumer event: ', eventData)) + .on('disconnected', (metrics) => { + this.log('Consumer disconnected. Client metrics are: ' + metrics.connectionOpened); + }) + .on('offset.commit', (err, topicPartitions) => { + if (err) { + this.error('Encountered error while committing offset.', JSON.stringify(err)); + return; + } + this.log('Commited offset for topic-partitions: ' + JSON.stringify(topicPartitions)); + }) + .on('subscribed', (topics) => { + this.log('Subscribed to topics: ' + JSON.stringify(topics)); + }); + } + catch (err) { + this.error('Consumer encountered while connecting to kafka server.', err); + reject(err); + } + }); + } + /** + * Subscribe to topics. + * @param {import('@confluentinc/kafka-javascript').SubscribeTopicList} topics: array of topic names. + * @returns {KafkaConsumer} + */ + subscribe(topics) { + try { + this.consumer.subscribe(topics); + } + catch (err) { + this.error(`Consumer encountered error while subscribing to topics=${topics}`, err); + } + return this; + } + /** + * Unsubscribe from all the subscribed topics.s + * @returns {KafkaConsumer} + */ + unsubscribe() { + try { + this.consumer.unsubscribe(); + } + catch (err) { + this.error('Consumer encountered error while unsubscribing', err); + } + return this; + } + /** + * Consumes message one-by-one and executes actionsOnData callback + * on the message read. + * + * NOTE: Needs to be called in infinite loop to have it consuming messages continuously. + * + * @param {Function} actionOnData: callback to return when message is read. + */ + consume(actionOnData) { + try { + // reset 'data' event listener to no-op callback. + this.consumer.removeAllListeners('data'); + this.consumer.consume(this._wrapConsumeCallbackWrapper(actionOnData)); + } + catch (err) { + this.error('Consumer encountered error while consuming messages', err); + } + } + /** + * Consumes messages in a batch and executes actionsOnData callback + * on the message read. + * + * NOTE: Needs to be called in infinite loop to have it consuming messages continuously. + * + * @param {Number} msgCount: number of messages to read. + * @param {Function} actionOnData: callback to be executed for each message. + */ + consumeBatch(msgCount, actionOnData) { + try { + // reset 'data' event listener to no-op callback. + this.consumer.removeAllListeners('data'); + this.consumer.consume(msgCount, this._wrapConsumeCallbackWrapper(actionOnData)); + } + catch (err) { + this.error(`Consumer encountered error while consuming messages in batch of size=${msgCount}`, err); + } + } + /** + * Listens to subscribed topic in flowing mode. Triggers a thread in background which keeps polling for events. + * + * @param {Function} actionOnData + */ + listen(actionOnData) { + try { + this.consumer.on('data', this._wrapListenCallbackWrapper(actionOnData)); + this.consumer.consume(); + } + catch (err) { + this.error('Consumer encountered error while starting to listen to messages.', err); + } + } + _wrapConsumeCallbackWrapper(actionOnData) { + const wrapper = (err, msgs) => { + if (err) { + actionOnData(err, msgs); + return; + } + if (!Array.isArray(msgs)) { + msgs = [msgs]; + } + const parsedMsgs = msgs.map((msg) => this._parseMessage(msg)); + actionOnData(err, parsedMsgs); + }; + return wrapper; + } + _wrapListenCallbackWrapper(actionOnData) { + const wrapper = (msg) => { + try { + msg = this._parseMessage(msg); + actionOnData(msg); + } + catch (e) { + this.error(e); + } + }; + return wrapper; + } + /** + * Parses message before passing it to consumer callback. + * @param {Object} msg - expects it to be in @confluentinc/kafka-javascript msg format. + * @returns + */ + _parseMessage(msg) { + msg.value = msg.value == null ? null : JSON.parse(msg.value.toString()); + msg.key = msg.key != null && Buffer.isBuffer(msg.key) ? msg.key.toString() : msg.key; + return msg; + } +} +function getKafkaConsumer(clientId, groupId, config, topicConfig, emitter, createNew = false) { + if (createNew) { + const consumer = new KafkaConsumer(clientId, groupId, config, topicConfig, emitter); + return consumer; + } + if (!_kafkaConsumer) { + _kafkaConsumer = new KafkaConsumer(clientId, groupId, config, topicConfig, emitter); + } + return _kafkaConsumer; +} +exports.default = getKafkaConsumer; +//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoiY29uc3VtZXIuanMiLCJzb3VyY2VSb290IjoiIiwic291cmNlcyI6WyIuLi9zcmMvY29uc3VtZXIudHMiXSwibmFtZXMiOltdLCJtYXBwaW5ncyI6Ijs7Ozs7QUFBQSxnRUFBOEg7QUFFOUgsc0RBQThCO0FBTTlCLElBQUksY0FBYyxHQUFrQixJQUFJLENBQUM7QUFFekMsTUFBTSxhQUFjLFNBQVEsZ0JBQU07SUFDdEIsUUFBUSxDQUFzQjtJQUV0Qzs7Ozs7OztPQU9HO0lBQ0gsWUFBWSxRQUFnQixFQUFFLE9BQWUsRUFBRSxNQUE0QixFQUFFLFdBQWdDLEVBQUUsT0FBcUI7UUFDaEksMERBQTBEO1FBQzFELE1BQU0sR0FBRyxNQUFNLENBQUMsTUFBTSxDQUFDO1lBQ25CLDBCQUEwQixFQUFFLElBQUk7U0FDbkMsRUFDRyxNQUFNLEVBQ047WUFDSSxVQUFVLEVBQUUsT0FBTztTQUN0QixDQUFDLENBQUM7UUFDUCxLQUFLLENBQUMsUUFBUSxFQUFFLFVBQVUsRUFBRSxNQUFNLEVBQUUsV0FBVyxFQUFFLE9BQU8sQ0FBQyxDQUFDO1FBQzFELElBQUksQ0FBQyxRQUFRLEdBQUcsSUFBSSxzQkFBSyxDQUFDLGFBQWEsQ0FBQyxJQUFJLENBQUMsTUFBTSxFQUFFLElBQUksQ0FBQyxXQUFXLENBQUMsQ0FBQztJQUMzRSxDQUFDO0lBRUQ7Ozs7O09BS0c7SUFDSCxPQUFPO1FBQ0gsT0FBTyxJQUFJLE9BQU8sQ0FBQyxDQUFDLE9BQU8sRUFBRSxNQUFNLEVBQUUsRUFBRTtZQUNuQyxJQUFJO2dCQUNBLElBQUksQ0FBQyxRQUFRO3FCQUNSLE9BQU8sRUFBRTtxQkFDVCxFQUFFLENBQUMsT0FBTyxFQUFFLENBQUMsSUFBSSxFQUFFLFFBQVEsRUFBRSxFQUFFO29CQUM1QixJQUFJLENBQUMsT0FBTyxDQUFDLHlDQUF5QyxFQUFFO3dCQUNwRCxJQUFJLEVBQUUsSUFBSSxDQUFDLElBQUk7cUJBQ2xCLENBQUMsQ0FBQztvQkFDSCxPQUFPLENBQUMsSUFBSSxDQUFDLENBQUM7Z0JBQ2xCLENBQUMsQ0FBQztxQkFDRCxFQUFFLENBQUMsb0JBQW9CLEVBQUUsQ0FBQyxHQUFHLEVBQUUsYUFBYSxFQUFFLEVBQUU7b0JBQzdDLElBQUksQ0FBQyxLQUFLLENBQUMsdURBQXVELEVBQUUsSUFBSSxDQUFDLFNBQVMsQ0FBQyxHQUFHLENBQUMsQ0FBQyxDQUFDO29CQUN6RixNQUFNLENBQUMsR0FBRyxDQUFDLENBQUM7Z0JBQ2hCLENBQUMsQ0FBQztxQkFDRCxFQUFFLENBQUMsYUFBYSxFQUFFLENBQUMsR0FBRyxFQUFFLEVBQUU7b0JBQ3ZCLElBQUksQ0FBQyxLQUFLLENBQUMsNkJBQTZCLEVBQUUsSUFBSSxDQUFDLFNBQVMsQ0FBQyxHQUFHLENBQUMsQ0FBQyxDQUFDO29CQUMvRCxNQUFNLENBQUMsR0FBRyxDQUFDLENBQUM7Z0JBQ2hCLENBQUMsQ0FBQztxQkFDRCxFQUFFLENBQUMsV0FBVyxFQUFFLENBQUMsU0FBUyxFQUFFLEVBQUUsQ0FBQyxJQUFJLENBQUMsR0FBRyxDQUFDLDBCQUEwQixFQUFFLFNBQVMsQ0FBQyxDQUFDO3FCQUMvRSxFQUFFLENBQUMsY0FBYyxFQUFFLENBQUMsT0FBTyxFQUFFLEVBQUU7b0JBQzVCLElBQUksQ0FBQyxHQUFHLENBQUMsNkNBQTZDLEdBQUcsT0FBTyxDQUFDLGdCQUFnQixDQUFDLENBQUE7Z0JBQ3RGLENBQUMsQ0FBQztxQkFDRCxFQUFFLENBQUMsZUFBZSxFQUFFLENBQUMsR0FBRyxFQUFFLGVBQWUsRUFBRSxFQUFFO29CQUMxQyxJQUFJLEdBQUcsRUFBRTt3QkFDTCxJQUFJLENBQUMsS0FBSyxDQUFDLDRDQUE0QyxFQUFFLElBQUksQ0FBQyxTQUFTLENBQUMsR0FBRyxDQUFDLENBQUMsQ0FBQzt3QkFDOUUsT0FBTztxQkFDVjtvQkFDRCxJQUFJLENBQUMsR0FBRyxDQUFDLHdDQUF3QyxHQUFHLElBQUksQ0FBQyxTQUFTLENBQUMsZUFBZSxDQUFDLENBQUMsQ0FBQztnQkFDekYsQ0FBQyxDQUFDO3FCQUNELEVBQUUsQ0FBQyxZQUFZLEVBQUUsQ0FBQyxNQUFNLEVBQUUsRUFBRTtvQkFDekIsSUFBSSxDQUFDLEdBQUcsQ0FBQyx3QkFBd0IsR0FBRyxJQUFJLENBQUMsU0FBUyxDQUFDLE1BQU0sQ0FBQyxDQUFDLENBQUM7Z0JBQ2hFLENBQUMsQ0FBQyxDQUFDO2FBQ1Y7WUFBQyxPQUFPLEdBQUcsRUFBRTtnQkFDVixJQUFJLENBQUMsS0FBSyxDQUFDLHdEQUF3RCxFQUFFLEdBQUcsQ0FBQyxDQUFDO2dCQUMxRSxNQUFNLENBQUMsR0FBRyxDQUFDLENBQUM7YUFDZjtRQUNMLENBQUMsQ0FBQyxDQUFDO0lBQ1AsQ0FBQztJQUVEOzs7O09BSUc7SUFDSCxTQUFTLENBQUMsTUFBMEI7UUFDaEMsSUFBSTtZQUNBLElBQUksQ0FBQyxRQUFRLENBQUMsU0FBUyxDQUFDLE1BQU0sQ0FBQyxDQUFDO1NBQ25DO1FBQUMsT0FBTyxHQUFHLEVBQUU7WUFDVixJQUFJLENBQUMsS0FBSyxDQUFDLDBEQUEwRCxNQUFNLEVBQUUsRUFBRSxHQUFHLENBQUMsQ0FBQztTQUN2RjtRQUNELE9BQU8sSUFBSSxDQUFDO0lBQ2hCLENBQUM7SUFFRDs7O09BR0c7SUFDSCxXQUFXO1FBQ1AsSUFBSTtZQUNBLElBQUksQ0FBQyxRQUFRLENBQUMsV0FBVyxFQUFFLENBQUM7U0FDL0I7UUFBQyxPQUFPLEdBQUcsRUFBRTtZQUNWLElBQUksQ0FBQyxLQUFLLENBQUMsZ0RBQWdELEVBQUUsR0FBRyxDQUFDLENBQUM7U0FDckU7UUFDRCxPQUFPLElBQUksQ0FBQztJQUNoQixDQUFDO0lBRUQ7Ozs7Ozs7T0FPRztJQUNILE9BQU8sQ0FBQyxZQUFtQztRQUN2QyxJQUFJO1lBQ0Esa0RBQWtEO1lBQ2xELElBQUksQ0FBQyxRQUFRLENBQUMsa0JBQWtCLENBQUMsTUFBTSxDQUFDLENBQUM7WUFDekMsSUFBSSxDQUFDLFFBQVEsQ0FBQyxPQUFPLENBQUMsSUFBSSxDQUFDLDJCQUEyQixDQUFDLFlBQVksQ0FBQyxDQUFDLENBQUM7U0FDekU7UUFBQyxPQUFPLEdBQUcsRUFBRTtZQUNWLElBQUksQ0FBQyxLQUFLLENBQUMscURBQXFELEVBQUUsR0FBRyxDQUFDLENBQUM7U0FDMUU7SUFDTCxDQUFDO0lBRUQ7Ozs7Ozs7O09BUUc7SUFDSCxZQUFZLENBQUMsUUFBZ0IsRUFBRSxZQUFtQztRQUM5RCxJQUFJO1lBQ0Esa0RBQWtEO1lBQ2xELElBQUksQ0FBQyxRQUFRLENBQUMsa0JBQWtCLENBQUMsTUFBTSxDQUFDLENBQUM7WUFDekMsSUFBSSxDQUFDLFFBQVEsQ0FBQyxPQUFPLENBQUMsUUFBUSxFQUFFLElBQUksQ0FBQywyQkFBMkIsQ0FBQyxZQUFZLENBQUMsQ0FBQyxDQUFDO1NBQ25GO1FBQUMsT0FBTyxHQUFHLEVBQUU7WUFDVixJQUFJLENBQUMsS0FBSyxDQUFDLHdFQUF3RSxRQUFRLEVBQUUsRUFBRSxHQUFHLENBQUMsQ0FBQTtTQUN0RztJQUNMLENBQUM7SUFFRDs7OztPQUlHO0lBQ0gsTUFBTSxDQUFDLFlBQWtDO1FBQ3JDLElBQUk7WUFDQSxJQUFJLENBQUMsUUFBUSxDQUFDLEVBQUUsQ0FBQyxNQUFNLEVBQUUsSUFBSSxDQUFDLDBCQUEwQixDQUFDLFlBQVksQ0FBQyxDQUFDLENBQUM7WUFDeEUsSUFBSSxDQUFDLFFBQVEsQ0FBQyxPQUFPLEVBQUUsQ0FBQztTQUMzQjtRQUFDLE9BQU8sR0FBRyxFQUFFO1lBQ1YsSUFBSSxDQUFDLEtBQUssQ0FBQyxrRUFBa0UsRUFBRSxHQUFHLENBQUMsQ0FBQztTQUN2RjtJQUNMLENBQUM7SUFFRCwyQkFBMkIsQ0FBQyxZQUFZO1FBQ3BDLE1BQU0sT0FBTyxHQUFHLENBQUMsR0FBRyxFQUFFLElBQUksRUFBRSxFQUFFO1lBQzFCLElBQUksR0FBRyxFQUFFO2dCQUNMLFlBQVksQ0FBQyxHQUFHLEVBQUUsSUFBSSxDQUFDLENBQUM7Z0JBQ3hCLE9BQU87YUFDVjtZQUNELElBQUksQ0FBQyxLQUFLLENBQUMsT0FBTyxDQUFDLElBQUksQ0FBQyxFQUFFO2dCQUN0QixJQUFJLEdBQUcsQ0FBQyxJQUFJLENBQUMsQ0FBQzthQUNqQjtZQUNELE1BQU0sVUFBVSxHQUFHLElBQUksQ0FBQyxHQUFHLENBQUMsQ0FBQyxHQUFHLEVBQUUsRUFBRSxDQUFDLElBQUksQ0FBQyxhQUFhLENBQUMsR0FBRyxDQUFDLENBQUMsQ0FBQztZQUM5RCxZQUFZLENBQUMsR0FBRyxFQUFFLFVBQVUsQ0FBQyxDQUFDO1FBQ2xDLENBQUMsQ0FBQztRQUNGLE9BQU8sT0FBTyxDQUFDO0lBQ25CLENBQUM7SUFFRCwwQkFBMEIsQ0FBQyxZQUFZO1FBQ25DLE1BQU0sT0FBTyxHQUFHLENBQUMsR0FBRyxFQUFFLEVBQUU7WUFDcEIsSUFBSTtnQkFDQSxHQUFHLEdBQUcsSUFBSSxDQUFDLGFBQWEsQ0FBQyxHQUFHLENBQUMsQ0FBQztnQkFDOUIsWUFBWSxDQUFDLEdBQUcsQ0FBQyxDQUFDO2FBQ3JCO1lBQUMsT0FBTyxDQUFDLEVBQUU7Z0JBQ1IsSUFBSSxDQUFDLEtBQUssQ0FBQyxDQUFDLENBQUMsQ0FBQzthQUNqQjtRQUNMLENBQUMsQ0FBQztRQUNGLE9BQU8sT0FBTyxDQUFDO0lBQ25CLENBQUM7SUFFRDs7OztPQUlHO0lBQ0gsYUFBYSxDQUFDLEdBQUc7UUFDYixHQUFHLENBQUMsS0FBSyxHQUFHLEdBQUcsQ0FBQyxLQUFLLElBQUksSUFBSSxDQUFDLENBQUMsQ0FBQyxJQUFJLENBQUMsQ0FBQyxDQUFDLElBQUksQ0FBQyxLQUFLLENBQUMsR0FBRyxDQUFDLEtBQUssQ0FBQyxRQUFRLEVBQUUsQ0FBQyxDQUFDO1FBQ3hFLEdBQUcsQ0FBQyxHQUFHLEdBQUcsR0FBRyxDQUFDLEdBQUcsSUFBSSxJQUFJLElBQUksTUFBTSxDQUFDLFFBQVEsQ0FBQyxHQUFHLENBQUMsR0FBRyxDQUFDLENBQUMsQ0FBQyxDQUFDLEdBQUcsQ0FBQyxHQUFHLENBQUMsUUFBUSxFQUFFLENBQUMsQ0FBQyxDQUFDLEdBQUcsQ0FBQyxHQUFHLENBQUM7UUFFckYsT0FBTyxHQUFHLENBQUM7SUFDZixDQUFDO0NBQ0o7QUFFRCxTQUFTLGdCQUFnQixDQUFDLFFBQWdCLEVBQUUsT0FBZSxFQUFFLE1BQTRCLEVBQUUsV0FBZ0MsRUFBRSxPQUFxQixFQUFFLFlBQXFCLEtBQUs7SUFDMUssSUFBSSxTQUFTLEVBQUU7UUFDWCxNQUFNLFFBQVEsR0FBRyxJQUFJLGFBQWEsQ0FBQyxRQUFRLEVBQUUsT0FBTyxFQUFFLE1BQU0sRUFBRSxXQUFXLEVBQUUsT0FBTyxDQUFDLENBQUM7UUFDcEYsT0FBTyxRQUFRLENBQUM7S0FDbkI7SUFFRCxJQUFJLENBQUMsY0FBYyxFQUFFO1FBQ2pCLGNBQWMsR0FBRyxJQUFJLGFBQWEsQ0FBQyxRQUFRLEVBQUUsT0FBTyxFQUFFLE1BQU0sRUFBRSxXQUFXLEVBQUUsT0FBTyxDQUFDLENBQUM7S0FDdkY7SUFDRCxPQUFPLGNBQWMsQ0FBQztBQUMxQixDQUFDO0FBRUQsa0JBQWUsZ0JBQWdCLENBQUMifQ== \ No newline at end of file diff --git a/lib/index.d.ts b/lib/index.d.ts new file mode 100644 index 0000000..21a4953 --- /dev/null +++ b/lib/index.d.ts @@ -0,0 +1,20 @@ +import getKafkaProducer, { ProduceParameters } from './producer'; +import getKafkaConsumer, { ConsumeActionFunction, ListenActionFunction } from './consumer'; +import KafkaAdmin from './admin'; +import { ClientMetrics, LibrdKafkaError, NumberNullUndefined, SubscribeTopicList } from '@confluentinc/kafka-javascript'; +import { ErrorHandlingFunction } from './client'; +interface KafkaConsumer { + connect(): Promise; + subscribe(topics: SubscribeTopicList): this; + unsubscribe(): this; + consume(actionOnData: ConsumeActionFunction): void; + consumeBatch(msgCount: number, actionOnData: ConsumeActionFunction): void; + listen(actionOnData: ListenActionFunction): void; +} +interface KafkaProducer { + connect(): Promise; + produce(args: ProduceParameters): boolean | number; + flush(timeout?: NumberNullUndefined, postFlushAction?: ErrorHandlingFunction): this; + disconnect(postDisconnectAction?: (err: any, data: ClientMetrics) => any): this; +} +export { getKafkaConsumer, getKafkaProducer, KafkaAdmin, KafkaConsumer, KafkaProducer, }; diff --git a/lib/index.js b/lib/index.js new file mode 100644 index 0000000..a7c948f --- /dev/null +++ b/lib/index.js @@ -0,0 +1,13 @@ +"use strict"; +var __importDefault = (this && this.__importDefault) || function (mod) { + return (mod && mod.__esModule) ? mod : { "default": mod }; +}; +Object.defineProperty(exports, "__esModule", { value: true }); +exports.KafkaAdmin = exports.getKafkaProducer = exports.getKafkaConsumer = void 0; +const producer_1 = __importDefault(require("./producer")); +exports.getKafkaProducer = producer_1.default; +const consumer_1 = __importDefault(require("./consumer")); +exports.getKafkaConsumer = consumer_1.default; +const admin_1 = __importDefault(require("./admin")); +exports.KafkaAdmin = admin_1.default; +//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoiaW5kZXguanMiLCJzb3VyY2VSb290IjoiIiwic291cmNlcyI6WyIuLi9zcmMvaW5kZXgudHMiXSwibmFtZXMiOltdLCJtYXBwaW5ncyI6Ijs7Ozs7O0FBQUEsMERBQWdFO0FBd0I5RCwyQkF4Qkssa0JBQWdCLENBd0JMO0FBdkJsQiwwREFBMkY7QUFzQnpGLDJCQXRCSyxrQkFBZ0IsQ0FzQkw7QUFyQmxCLG9EQUFpQztBQXVCL0IscUJBdkJLLGVBQVUsQ0F1QkwifQ== \ No newline at end of file diff --git a/lib/producer.d.ts b/lib/producer.d.ts new file mode 100644 index 0000000..ad1d7ce --- /dev/null +++ b/lib/producer.d.ts @@ -0,0 +1,55 @@ +/// +import EventEmitter from 'events'; +import { ClientMetrics, LibrdKafkaError, MessageKey, NumberNullUndefined, ProducerGlobalConfig, ProducerTopicConfig } from '@confluentinc/kafka-javascript'; +import Client, { ErrorHandlingFunction } from './client'; +export interface ProduceParameters { + topic: string; + message: any; + partition?: NumberNullUndefined; + key?: MessageKey; + timestamp?: NumberNullUndefined; +} +declare class KafkaProducer extends Client { + private producer; + /** + * Initializes a KafkaProducer. + * @param {String} clientId: id to identify a client producing the message. + * @param {import('@confluentinc/kafka-javascript').ProducerGlobalConfig} config: configs for producer. + * @param {import('@confluentinc/kafka-javascript').ProducerTopicConfig} topicConfig: topic configs. + * @param {EventEmitter} emitter: to emit log messages + */ + constructor(clientId: string, config: ProducerGlobalConfig, topicConfig: ProducerTopicConfig, emitter: EventEmitter); + /** + * Asynchronous function which connects to kafka cluster. + * Resolves when connection is ready. + * + * @returns {Promise} + */ + connect(): Promise; + /** + * Produce a message to a topic-partition. + * @param {String} topic: name of topic + * @param {import('@confluentinc/kafka-javascript').NumberNullUndefined} partition: partition number to produce to. + * @param {any} message: message to be produced. + * @param {import('@confluentinc/kafka-javascript').MessageKey} key: key associated with the message. + * @param {import('@confluentinc/kafka-javascript').NumberNullUndefined} timestamp: timestamp to send with the message. + * @returns {import('../types').BooleanOrNumber}: returns boolean or librdkafka error code. + */ + produce({ topic, message, partition, key, timestamp }: ProduceParameters): boolean | number; + /** + * Flush everything on the internal librdkafka buffer. + * Good to perform before disconnect. + * @param {import('@confluentinc/kafka-javascript').NumberNullUndefined}} timeout + * @param {import('../types').ErrorHandlingFunction} postFlushAction + * @returns {KafkaProducer} + */ + flush(timeout?: NumberNullUndefined, postFlushAction?: ErrorHandlingFunction): this; + /** + * Disconnects producer. + * @param {import('../types').DisconnectFunction} postDisconnectAction + * @returns {KafkaProducer} + */ + disconnect(postDisconnectAction?: (err: any, data: ClientMetrics) => any): this; +} +declare function getKafkaProducer(clientId: string, config: ProducerGlobalConfig, topicConfig: ProducerTopicConfig, emitter: EventEmitter, createNew?: boolean): KafkaProducer; +export default getKafkaProducer; diff --git a/lib/producer.js b/lib/producer.js new file mode 100644 index 0000000..cf3b2e4 --- /dev/null +++ b/lib/producer.js @@ -0,0 +1,137 @@ +"use strict"; +var __importDefault = (this && this.__importDefault) || function (mod) { + return (mod && mod.__esModule) ? mod : { "default": mod }; +}; +Object.defineProperty(exports, "__esModule", { value: true }); +const node_rdkafka_1 = __importDefault(require("@confluentinc/kafka-javascript")); +const client_1 = __importDefault(require("./client")); +let _kafkaProducer = null; +class KafkaProducer extends client_1.default { + producer; + /** + * Initializes a KafkaProducer. + * @param {String} clientId: id to identify a client producing the message. + * @param {import('@confluentinc/kafka-javascript').ProducerGlobalConfig} config: configs for producer. + * @param {import('@confluentinc/kafka-javascript').ProducerTopicConfig} topicConfig: topic configs. + * @param {EventEmitter} emitter: to emit log messages + */ + constructor(clientId, config, topicConfig, emitter) { + // producer config defaults should go here. + config = Object.assign({ + 'retry.backoff.ms': 200, + 'message.send.max.retries': 10, + 'queue.buffering.max.messages': 100000, + 'queue.buffering.max.ms': 1000, + 'batch.num.messages': 1000000, + 'dr_cb': true + }, config); + // producer topic config defaults should go here. + topicConfig = Object.assign({ 'acks': 1 }, topicConfig); + super(clientId, 'producer', config, topicConfig, emitter); + this.producer = new node_rdkafka_1.default.Producer(this.config, this.topicConfig); + } + /** + * Asynchronous function which connects to kafka cluster. + * Resolves when connection is ready. + * + * @returns {Promise} + */ + connect() { + return new Promise((resolve, reject) => { + try { + this.producer + .connect() + .on('ready', (info, metadata) => { + this.success('Producer connected to kafka cluster...', { + name: info.name, + }); + // set automating polling to every second for delivery reports + this.producer.setPollInterval(1000); + resolve(this); + }) + .on('delivery-report', (err, report) => { + if (err) { + this.error('Error producing message: ', err); + } + else { + this.log(`Produced event: key=${report.key}, timestamp=${report.timestamp}.`); + } + }) + .on('event.error', (err) => { + this.error('Producer encountered error: ', err); + reject(err); + }) + .on('event.log', (eventData) => this.log('Logging consumer event: ', eventData)) + .on('disconnected', (metrics) => { + this.log('Producer disconnected. Client metrics are: ', metrics.connectionOpened); + }); + } + catch (err) { + this.error('Producer encountered while connecting to kafka server.', err); + reject(err); + } + }); + } + /** + * Produce a message to a topic-partition. + * @param {String} topic: name of topic + * @param {import('@confluentinc/kafka-javascript').NumberNullUndefined} partition: partition number to produce to. + * @param {any} message: message to be produced. + * @param {import('@confluentinc/kafka-javascript').MessageKey} key: key associated with the message. + * @param {import('@confluentinc/kafka-javascript').NumberNullUndefined} timestamp: timestamp to send with the message. + * @returns {import('../types').BooleanOrNumber}: returns boolean or librdkafka error code. + */ + produce({ topic, message, partition = null, key = null, timestamp = null }) { + try { + const stringifiedMsg = JSON.stringify(message); + const isSuccess = this.producer.produce(topic, partition, Buffer.from(stringifiedMsg), key, timestamp, null); + return isSuccess; + } + catch (err) { + this.error(`Producer encountered error while producing message to topic=${topic}, partition=${partition} with key=${key}`, err); + return false; + } + } + /** + * Flush everything on the internal librdkafka buffer. + * Good to perform before disconnect. + * @param {import('@confluentinc/kafka-javascript').NumberNullUndefined}} timeout + * @param {import('../types').ErrorHandlingFunction} postFlushAction + * @returns {KafkaProducer} + */ + flush(timeout, postFlushAction) { + try { + this.producer.flush(timeout, postFlushAction); + } + catch (err) { + this.error('Producer encountered error while flusing events.', err); + } + return this; + } + /** + * Disconnects producer. + * @param {import('../types').DisconnectFunction} postDisconnectAction + * @returns {KafkaProducer} + */ + disconnect(postDisconnectAction) { + try { + this.producer.disconnect(postDisconnectAction); + } + catch (err) { + this.error('Producer encountered error while disconnecting.', err); + } + return this; + } +} +function getKafkaProducer(clientId, config, topicConfig, emitter, createNew = false) { + if (createNew) { + const producer = new KafkaProducer(clientId, config, topicConfig, emitter); + return producer; + } + if (!_kafkaProducer) { + _kafkaProducer = new KafkaProducer(clientId, config, topicConfig, emitter); + } + return _kafkaProducer; +} +exports.default = getKafkaProducer; +//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoicHJvZHVjZXIuanMiLCJzb3VyY2VSb290IjoiIiwic291cmNlcyI6WyIuLi9zcmMvcHJvZHVjZXIudHMiXSwibmFtZXMiOltdLCJtYXBwaW5ncyI6Ijs7Ozs7QUFDQSxnRUFBaUo7QUFDakosc0RBQXlEO0FBVXpELElBQUksY0FBYyxHQUFrQixJQUFJLENBQUM7QUFFekMsTUFBTSxhQUFjLFNBQVEsZ0JBQU07SUFDdEIsUUFBUSxDQUFpQjtJQUVqQzs7Ozs7O09BTUc7SUFDSCxZQUFZLFFBQWdCLEVBQUUsTUFBNEIsRUFBRSxXQUFnQyxFQUFFLE9BQXFCO1FBQy9HLDJDQUEyQztRQUMzQyxNQUFNLEdBQUcsTUFBTSxDQUFDLE1BQU0sQ0FBQztZQUNuQixrQkFBa0IsRUFBRSxHQUFHO1lBQ3ZCLDBCQUEwQixFQUFFLEVBQUU7WUFDOUIsOEJBQThCLEVBQUUsTUFBTTtZQUN0Qyx3QkFBd0IsRUFBRSxJQUFJO1lBQzlCLG9CQUFvQixFQUFFLE9BQU87WUFDN0IsT0FBTyxFQUFFLElBQUk7U0FDZCxFQUNELE1BQU0sQ0FDUCxDQUFDO1FBQ0YsaURBQWlEO1FBQ2pELFdBQVcsR0FBRyxNQUFNLENBQUMsTUFBTSxDQUFDLEVBQUUsTUFBTSxFQUFHLENBQUMsRUFBRSxFQUFFLFdBQVcsQ0FBQyxDQUFDO1FBRXpELEtBQUssQ0FBQyxRQUFRLEVBQUUsVUFBVSxFQUFFLE1BQU0sRUFBRSxXQUFXLEVBQUUsT0FBTyxDQUFDLENBQUM7UUFDMUQsSUFBSSxDQUFDLFFBQVEsR0FBRyxJQUFJLHNCQUFLLENBQUMsUUFBUSxDQUFDLElBQUksQ0FBQyxNQUFNLEVBQUUsSUFBSSxDQUFDLFdBQVcsQ0FBQyxDQUFDO0lBQ3RFLENBQUM7SUFFRDs7Ozs7T0FLRztJQUNILE9BQU87UUFDSCxPQUFPLElBQUksT0FBTyxDQUFDLENBQUMsT0FBTyxFQUFFLE1BQU0sRUFBRSxFQUFFO1lBQ25DLElBQUk7Z0JBQ0EsSUFBSSxDQUFDLFFBQVE7cUJBQ1osT0FBTyxFQUFFO3FCQUNULEVBQUUsQ0FBQyxPQUFPLEVBQUUsQ0FBQyxJQUFJLEVBQUUsUUFBUSxFQUFFLEVBQUU7b0JBQzVCLElBQUksQ0FBQyxPQUFPLENBQUMsd0NBQXdDLEVBQUU7d0JBQ25ELElBQUksRUFBRSxJQUFJLENBQUMsSUFBSTtxQkFDbEIsQ0FBQyxDQUFDO29CQUNILDhEQUE4RDtvQkFDOUQsSUFBSSxDQUFDLFFBQVEsQ0FBQyxlQUFlLENBQUMsSUFBSSxDQUFDLENBQUM7b0JBQ3BDLE9BQU8sQ0FBQyxJQUFJLENBQUMsQ0FBQztnQkFDbEIsQ0FBQyxDQUFDO3FCQUNELEVBQUUsQ0FBQyxpQkFBaUIsRUFBRSxDQUFDLEdBQUcsRUFBRSxNQUFNLEVBQUUsRUFBRTtvQkFDbkMsSUFBSSxHQUFHLEVBQUU7d0JBQ0wsSUFBSSxDQUFDLEtBQUssQ0FBQywyQkFBMkIsRUFBRSxHQUFHLENBQUMsQ0FBQztxQkFDaEQ7eUJBQU07d0JBQ0gsSUFBSSxDQUFDLEdBQUcsQ0FBQyx1QkFBdUIsTUFBTSxDQUFDLEdBQUcsZUFBZSxNQUFNLENBQUMsU0FBUyxHQUFHLENBQUMsQ0FBQztxQkFDakY7Z0JBQ0wsQ0FBQyxDQUFDO3FCQUNELEVBQUUsQ0FBQyxhQUFhLEVBQUUsQ0FBQyxHQUFHLEVBQUUsRUFBRTtvQkFDdkIsSUFBSSxDQUFDLEtBQUssQ0FBQyw4QkFBOEIsRUFBRSxHQUFHLENBQUMsQ0FBQztvQkFDaEQsTUFBTSxDQUFDLEdBQUcsQ0FBQyxDQUFDO2dCQUNoQixDQUFDLENBQUM7cUJBQ0QsRUFBRSxDQUFDLFdBQVcsRUFBRyxDQUFDLFNBQVMsRUFBRSxFQUFFLENBQUMsSUFBSSxDQUFDLEdBQUcsQ0FBQywwQkFBMEIsRUFBRSxTQUFTLENBQUMsQ0FBQztxQkFDaEYsRUFBRSxDQUFDLGNBQWMsRUFBRSxDQUFDLE9BQU8sRUFBRSxFQUFFO29CQUM1QixJQUFJLENBQUMsR0FBRyxDQUFDLDZDQUE2QyxFQUFFLE9BQU8sQ0FBQyxnQkFBZ0IsQ0FBQyxDQUFDO2dCQUN0RixDQUFDLENBQUMsQ0FBQzthQUNOO1lBQUMsT0FBTyxHQUFHLEVBQUU7Z0JBQ1YsSUFBSSxDQUFDLEtBQUssQ0FBQyx3REFBd0QsRUFBRSxHQUFHLENBQUMsQ0FBQztnQkFDMUUsTUFBTSxDQUFDLEdBQUcsQ0FBQyxDQUFDO2FBQ2Y7UUFDTCxDQUFDLENBQUMsQ0FBQztJQUNQLENBQUM7SUFFRDs7Ozs7Ozs7T0FRRztJQUNILE9BQU8sQ0FBQyxFQUFFLEtBQUssRUFBRSxPQUFPLEVBQUUsU0FBUyxHQUFHLElBQUksRUFBRSxHQUFHLEdBQUcsSUFBSSxFQUFFLFNBQVMsR0FBRyxJQUFJLEVBQXFCO1FBQ3pGLElBQUk7WUFDQSxNQUFNLGNBQWMsR0FBRyxJQUFJLENBQUMsU0FBUyxDQUFDLE9BQU8sQ0FBQyxDQUFDO1lBQy9DLE1BQU0sU0FBUyxHQUFHLElBQUksQ0FBQyxRQUFRLENBQUMsT0FBTyxDQUFDLEtBQUssRUFBRSxTQUFTLEVBQUUsTUFBTSxDQUFDLElBQUksQ0FBQyxjQUFjLENBQUMsRUFBRSxHQUFHLEVBQUUsU0FBUyxFQUFFLElBQUksQ0FBQyxDQUFDO1lBQzdHLE9BQU8sU0FBUyxDQUFDO1NBQ3BCO1FBQUMsT0FBTyxHQUFHLEVBQUU7WUFDVixJQUFJLENBQUMsS0FBSyxDQUFDLCtEQUErRCxLQUFLLGVBQWUsU0FBUyxhQUFhLEdBQUcsRUFBRSxFQUFFLEdBQUcsQ0FBQyxDQUFDO1lBQ2hJLE9BQU8sS0FBSyxDQUFDO1NBQ2hCO0lBQ0wsQ0FBQztJQUVEOzs7Ozs7T0FNRztJQUNILEtBQUssQ0FBQyxPQUE2QixFQUFFLGVBQXVDO1FBQ3hFLElBQUk7WUFDQSxJQUFJLENBQUMsUUFBUSxDQUFDLEtBQUssQ0FBQyxPQUFPLEVBQUUsZUFBZSxDQUFDLENBQUM7U0FDakQ7UUFBQyxPQUFPLEdBQUcsRUFBRTtZQUNWLElBQUksQ0FBQyxLQUFLLENBQUMsa0RBQWtELEVBQUUsR0FBRyxDQUFDLENBQUM7U0FDdkU7UUFDRCxPQUFPLElBQUksQ0FBQztJQUNoQixDQUFDO0lBRUQ7Ozs7T0FJRztJQUNILFVBQVUsQ0FBQyxvQkFBNkQ7UUFDcEUsSUFBSTtZQUNBLElBQUksQ0FBQyxRQUFRLENBQUMsVUFBVSxDQUFDLG9CQUFvQixDQUFDLENBQUM7U0FDbEQ7UUFBQyxPQUFPLEdBQUcsRUFBRTtZQUNWLElBQUksQ0FBQyxLQUFLLENBQUMsaURBQWlELEVBQUUsR0FBRyxDQUFDLENBQUM7U0FDdEU7UUFDRCxPQUFPLElBQUksQ0FBQztJQUNoQixDQUFDO0NBQ0o7QUFFRCxTQUFTLGdCQUFnQixDQUFDLFFBQWdCLEVBQUUsTUFBNEIsRUFBRSxXQUFnQyxFQUFFLE9BQXFCLEVBQUUsWUFBcUIsS0FBSztJQUN6SixJQUFJLFNBQVMsRUFBRTtRQUNYLE1BQU0sUUFBUSxHQUFHLElBQUksYUFBYSxDQUFDLFFBQVEsRUFBRSxNQUFNLEVBQUUsV0FBVyxFQUFFLE9BQU8sQ0FBQyxDQUFDO1FBQzNFLE9BQU8sUUFBUSxDQUFDO0tBQ25CO0lBQ0QsSUFBSSxDQUFDLGNBQWMsRUFBRTtRQUNqQixjQUFjLEdBQUcsSUFBSSxhQUFhLENBQUMsUUFBUSxFQUFFLE1BQU0sRUFBRSxXQUFXLEVBQUUsT0FBTyxDQUFDLENBQUM7S0FDOUU7SUFDRCxPQUFPLGNBQWMsQ0FBQztBQUMxQixDQUFDO0FBRUQsa0JBQWUsZ0JBQWdCLENBQUMifQ== \ No newline at end of file diff --git a/package.json b/package.json index 0d7ea7e..fe029df 100644 --- a/package.json +++ b/package.json @@ -1,10 +1,10 @@ { - "name": "kafka-wrapper", - "version": "0.1.0", + "name": "@quizizz/kafka", + "version": "1.0.3", "description": "A simple kafka client to produce and consume messages to/from kafka cluster.", - "main": "index.js", - "types": "types/index.d.ts", + "main": "lib/index.js", "scripts": { + "build": "tsc", "test": "echo \"Error: no test specified\" && exit 1" }, "repository": { @@ -18,6 +18,10 @@ ], "license": "ISC", "dependencies": { - "node-rdkafka": "^2.12.0" + "@confluentinc/kafka-javascript": "^1.3.1" + }, + "devDependencies": { + "@types/node": "17.0.21", + "typescript": "4.6.2" } } diff --git a/src/admin.js b/src/admin.ts similarity index 86% rename from src/admin.js rename to src/admin.ts index 4fbcec9..8705419 100644 --- a/src/admin.js +++ b/src/admin.ts @@ -1,7 +1,9 @@ -const Client = require('./client'); -const Kafka = require('node-rdkafka'); +import Client from './client'; +import { AdminClient, GlobalConfig } from '@confluentinc/kafka-javascript'; +import EventEmitter from 'events'; class KafkaAdmin extends Client { + private adminClient; /** * Initialzes a KafkaAdmin client with config. @@ -10,7 +12,7 @@ class KafkaAdmin extends Client { * @param {object} config - global kafka config * @param {object} emitter - emitter to emit log event */ - constructor(clientId, config, emitter) { + constructor(clientId: string, config: GlobalConfig, emitter: EventEmitter) { super(clientId, 'admin', config, {}, emitter); this.adminClient = null; } @@ -18,10 +20,10 @@ class KafkaAdmin extends Client { /** * Connect to kafka server as admin. */ - async connect() { + async connect(): Promise { try { if (this.adminClient === null) { - this.adminClient = await Kafka.AdminClient.create(this.config); + this.adminClient = await AdminClient.create(this.config); } this.success('Successfully connected to kafka as admin'); } catch (err) { @@ -75,4 +77,4 @@ class KafkaAdmin extends Client { } } -module.exports = KafkaAdmin; \ No newline at end of file +export default KafkaAdmin; diff --git a/src/client.js b/src/client.ts similarity index 65% rename from src/client.js rename to src/client.ts index 38edeb3..e5d0c14 100644 --- a/src/client.js +++ b/src/client.ts @@ -1,5 +1,9 @@ -class Client { - constructor(clientId, clientType, config, topicConfig, emitter) { +import EventEmitter from "events"; +import { GlobalConfig, LibrdKafkaError, TopicConfig } from "@confluentinc/kafka-javascript"; + +export default class Client { + constructor(private clientId: string, + private clientType: string, protected config: GlobalConfig, protected topicConfig: TopicConfig, private emitter: EventEmitter) { this.clientId = clientId; this.clientType = clientType; @@ -16,7 +20,7 @@ class Client { this.emitter = emitter; } - _logMessage(msgType, message, data) { + _logMessage(msgType: 'log' | 'success' | 'error', message: string, data: any) { if (this.emitter != null) { this.emitter.emit(msgType, { clientId: this.clientId, @@ -31,17 +35,17 @@ class Client { } } - log(message, data) { + log(message: string, data?: any) { this._logMessage('log', message, data); } - success(message, data) { + success(message: string, data?: any) { this._logMessage('success', message, data); } - error(err, data) { + error(err: string, data?: any) { this._logMessage('error', err, data); } } -module.exports = Client; \ No newline at end of file +export type ErrorHandlingFunction = (err: LibrdKafkaError) => void; diff --git a/src/consumer.js b/src/consumer.ts similarity index 77% rename from src/consumer.js rename to src/consumer.ts index 4f65097..f9e43a0 100644 --- a/src/consumer.js +++ b/src/consumer.ts @@ -1,17 +1,25 @@ -const Kafka = require('node-rdkafka'); -const Client = require('./client'); +import Kafka, { ConsumerGlobalConfig, ConsumerTopicConfig, LibrdKafkaError, Message, SubscribeTopicList } from '@confluentinc/kafka-javascript'; +import { EventEmitter } from 'stream'; +import Client from './client'; + +export type ConsumeActionFunction = (err: LibrdKafkaError, messages: Message[]) => void; + +export type ListenActionFunction = (arg: Message) => void; + +let _kafkaConsumer: KafkaConsumer = null; class KafkaConsumer extends Client { + private consumer: Kafka.KafkaConsumer; /** * Initializes a KafkaConsumer. * @param {String} clientId: id to identify a client consuming the message. * @param {String} groupId: consumer group id, the consumer belongs to. - * @param {import('node-rdkafka').ConsumerGlobalConfig} config: configs for consumer. - * @param {import('node-rdkafka').ConsumerTopicConfig} topicConfig: topic configs + * @param {import('@confluentinc/kafka-javascript').ConsumerGlobalConfig} config: configs for consumer. + * @param {import('@confluentinc/kafka-javascript').ConsumerTopicConfig} topicConfig: topic configs * @param {EventEmitter} emitter: to emit log events */ - constructor(clientId, groupId, config, topicConfig, emitter) { + constructor(clientId: string, groupId: string, config: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, emitter: EventEmitter) { // consumer specific default configs we would like to have config = Object.assign({ 'allow.auto.create.topics': true, @@ -30,7 +38,7 @@ class KafkaConsumer extends Client { * * @returns {Promise} */ - connect() { + connect(): Promise { return new Promise((resolve, reject) => { try { this.consumer @@ -72,10 +80,10 @@ class KafkaConsumer extends Client { /** * Subscribe to topics. - * @param {import('node-rdkafka').SubscribeTopicList} topics: array of topic names. + * @param {import('@confluentinc/kafka-javascript').SubscribeTopicList} topics: array of topic names. * @returns {KafkaConsumer} */ - subscribe(topics) { + subscribe(topics: SubscribeTopicList): this { try { this.consumer.subscribe(topics); } catch (err) { @@ -88,7 +96,7 @@ class KafkaConsumer extends Client { * Unsubscribe from all the subscribed topics.s * @returns {KafkaConsumer} */ - unsubscribe() { + unsubscribe(): this { try { this.consumer.unsubscribe(); } catch (err) { @@ -105,7 +113,7 @@ class KafkaConsumer extends Client { * * @param {Function} actionOnData: callback to return when message is read. */ - consume(actionOnData) { + consume(actionOnData: ConsumeActionFunction): void { try { // reset 'data' event listener to no-op callback. this.consumer.removeAllListeners('data'); @@ -124,7 +132,7 @@ class KafkaConsumer extends Client { * @param {Number} msgCount: number of messages to read. * @param {Function} actionOnData: callback to be executed for each message. */ - consumeBatch(msgCount, actionOnData) { + consumeBatch(msgCount: number, actionOnData: ConsumeActionFunction): void { try { // reset 'data' event listener to no-op callback. this.consumer.removeAllListeners('data'); @@ -139,7 +147,7 @@ class KafkaConsumer extends Client { * * @param {Function} actionOnData */ - listen(actionOnData) { + listen(actionOnData: ListenActionFunction): void { try { this.consumer.on('data', this._wrapListenCallbackWrapper(actionOnData)); this.consumer.consume(); @@ -177,7 +185,7 @@ class KafkaConsumer extends Client { /** * Parses message before passing it to consumer callback. - * @param {Object} msg - expects it to be in node-rdkafka msg format. + * @param {Object} msg - expects it to be in @confluentinc/kafka-javascript msg format. * @returns */ _parseMessage(msg) { @@ -188,4 +196,16 @@ class KafkaConsumer extends Client { } } -module.exports = KafkaConsumer; \ No newline at end of file +function getKafkaConsumer(clientId: string, groupId: string, config: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, emitter: EventEmitter, createNew: boolean = false): KafkaConsumer { + if (createNew) { + const consumer = new KafkaConsumer(clientId, groupId, config, topicConfig, emitter); + return consumer; + } + + if (!_kafkaConsumer) { + _kafkaConsumer = new KafkaConsumer(clientId, groupId, config, topicConfig, emitter); + } + return _kafkaConsumer; +} + +export default getKafkaConsumer; diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..6521f4a --- /dev/null +++ b/src/index.ts @@ -0,0 +1,29 @@ +import getKafkaProducer, { ProduceParameters } from './producer' +import getKafkaConsumer, { ConsumeActionFunction, ListenActionFunction } from './consumer'; +import KafkaAdmin from './admin'; +import { ClientMetrics, LibrdKafkaError, NumberNullUndefined, SubscribeTopicList } from '@confluentinc/kafka-javascript'; +import { ErrorHandlingFunction } from './client'; + +interface KafkaConsumer { + connect(): Promise; + subscribe(topics: SubscribeTopicList): this; + unsubscribe(): this; + consume(actionOnData: ConsumeActionFunction): void; + consumeBatch(msgCount: number, actionOnData: ConsumeActionFunction): void; + listen(actionOnData: ListenActionFunction): void; +} + +interface KafkaProducer { + connect(): Promise; + produce(args: ProduceParameters): boolean | number; + flush(timeout?: NumberNullUndefined, postFlushAction?: ErrorHandlingFunction): this; + disconnect(postDisconnectAction?: (err: any, data: ClientMetrics) => any): this; +} + +export { + getKafkaConsumer, + getKafkaProducer, + KafkaAdmin, + KafkaConsumer, + KafkaProducer, +} diff --git a/src/producer.js b/src/producer.ts similarity index 66% rename from src/producer.js rename to src/producer.ts index d9bdb0b..c7e76f2 100644 --- a/src/producer.js +++ b/src/producer.ts @@ -1,16 +1,28 @@ -const Kafka = require('node-rdkafka'); -const Client = require('./client'); +import EventEmitter from 'events'; +import Kafka, { ClientMetrics, LibrdKafkaError, MessageKey, NumberNullUndefined, ProducerGlobalConfig, ProducerTopicConfig } from '@confluentinc/kafka-javascript'; +import Client, { ErrorHandlingFunction } from './client'; + +export interface ProduceParameters{ + topic: string; + message: any; + partition?: NumberNullUndefined; + key?: MessageKey; + timestamp?: NumberNullUndefined; +} + +let _kafkaProducer: KafkaProducer = null; class KafkaProducer extends Client { + private producer: Kafka.Producer; /** * Initializes a KafkaProducer. * @param {String} clientId: id to identify a client producing the message. - * @param {import('node-rdkafka').ProducerGlobalConfig} config: configs for producer. - * @param {import('node-rdkafka').ProducerTopicConfig} topicConfig: topic configs. + * @param {import('@confluentinc/kafka-javascript').ProducerGlobalConfig} config: configs for producer. + * @param {import('@confluentinc/kafka-javascript').ProducerTopicConfig} topicConfig: topic configs. * @param {EventEmitter} emitter: to emit log messages */ - constructor(clientId, config, topicConfig, emitter) { + constructor(clientId: string, config: ProducerGlobalConfig, topicConfig: ProducerTopicConfig, emitter: EventEmitter) { // producer config defaults should go here. config = Object.assign({ 'retry.backoff.ms': 200, @@ -35,7 +47,7 @@ class KafkaProducer extends Client { * * @returns {Promise} */ - connect() { + connect(): Promise { return new Promise((resolve, reject) => { try { this.producer @@ -73,13 +85,13 @@ class KafkaProducer extends Client { /** * Produce a message to a topic-partition. * @param {String} topic: name of topic - * @param {import('node-rdkafka').NumberNullUndefined} partition: partition number to produce to. + * @param {import('@confluentinc/kafka-javascript').NumberNullUndefined} partition: partition number to produce to. * @param {any} message: message to be produced. - * @param {import('node-rdkafka').MessageKey} key: key associated with the message. - * @param {import('node-rdkafka').NumberNullUndefined} timestamp: timestamp to send with the message. + * @param {import('@confluentinc/kafka-javascript').MessageKey} key: key associated with the message. + * @param {import('@confluentinc/kafka-javascript').NumberNullUndefined} timestamp: timestamp to send with the message. * @returns {import('../types').BooleanOrNumber}: returns boolean or librdkafka error code. */ - produce({ topic, message, partition = null, key = null, timestamp = null }) { + produce({ topic, message, partition = null, key = null, timestamp = null }: ProduceParameters): boolean | number { try { const stringifiedMsg = JSON.stringify(message); const isSuccess = this.producer.produce(topic, partition, Buffer.from(stringifiedMsg), key, timestamp, null); @@ -93,11 +105,11 @@ class KafkaProducer extends Client { /** * Flush everything on the internal librdkafka buffer. * Good to perform before disconnect. - * @param {import('node-rdkafka').NumberNullUndefined}} timeout + * @param {import('@confluentinc/kafka-javascript').NumberNullUndefined}} timeout * @param {import('../types').ErrorHandlingFunction} postFlushAction * @returns {KafkaProducer} */ - flush(timeout, postFlushAction) { + flush(timeout?: NumberNullUndefined, postFlushAction?: ErrorHandlingFunction): this { try { this.producer.flush(timeout, postFlushAction); } catch (err) { @@ -111,7 +123,7 @@ class KafkaProducer extends Client { * @param {import('../types').DisconnectFunction} postDisconnectAction * @returns {KafkaProducer} */ - disconnect(postDisconnectAction) { + disconnect(postDisconnectAction?: (err: any, data: ClientMetrics) => any): this { try { this.producer.disconnect(postDisconnectAction); } catch (err) { @@ -121,4 +133,15 @@ class KafkaProducer extends Client { } } -module.exports = KafkaProducer; \ No newline at end of file +function getKafkaProducer(clientId: string, config: ProducerGlobalConfig, topicConfig: ProducerTopicConfig, emitter: EventEmitter, createNew: boolean = false): KafkaProducer { + if (createNew) { + const producer = new KafkaProducer(clientId, config, topicConfig, emitter); + return producer; + } + if (!_kafkaProducer) { + _kafkaProducer = new KafkaProducer(clientId, config, topicConfig, emitter); + } + return _kafkaProducer; +} + +export default getKafkaProducer; diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..80a2d7b --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,13 @@ +{ + "compilerOptions": { + "module": "commonjs", + "target": "esnext", + "declaration": true, + "outDir": "./lib", + "esModuleInterop": true, + "inlineSourceMap": true + }, + "include": [ + "src/**/*" + ] +} \ No newline at end of file diff --git a/types/index.d.ts b/types/index.d.ts deleted file mode 100644 index ce05dc5..0000000 --- a/types/index.d.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { ClientMetrics, ConsumerGlobalConfig, ConsumerTopicConfig, GlobalConfig, LibrdKafkaError, Message, MessageKey, NewTopic, NumberNullUndefined, ProducerGlobalConfig, ProducerTopicConfig, SubscribeTopicList } from "node-rdkafka"; - -export type ConsumeActionFunction = (err: LibrdKafkaError, messages: Message[]) => void; - -export type ListenActionFunction = (arg: Message) => void; - -export type ErrorHandlingFunction = (err: LibrdKafkaError) => void; - -export type DisconnectFunction = (err: any, data: ClientMetrics) => any; - -export type BooleanOrNumber = boolean | number; - -export interface ProduceParameters{ - topic: string; - message: any; - partition?: NumberNullUndefined; - key?: MessageKey; - timestamp?: NumberNullUndefined; -} - -export class KafkaConsumer { - constructor(clientId: string, groupId: string, config: ConsumerGlobalConfig, topicConfig: ConsumerTopicConfig, emitter: any); - connect(): Promise; - subscribe(topics: SubscribeTopicList): this; - unsubscribe(): this; - consume(actionOnData: ConsumeActionFunction): void; - consumeBatch(msgCount: number, actionOnData: ConsumeActionFunction): void; - listen(actionOnData: ListenActionFunction): void; -} - -export class KafkaProducer { - constructor(clientId: string, config: ProducerGlobalConfig, topicConfig: ProducerTopicConfig, emitter: any); - connect(): Promise; - produce(args: ProduceParameters): BooleanOrNumber; - flush(timeout?: NumberNullUndefined, postFlushAction?: ErrorHandlingFunction): this; - disconnect(postDisconnectAction?: DisconnectFunction): this; -} - -export class KafkaAdmin { - constructor(clientId: string, config: GlobalConfig, emitter: any); - connect(): void; - createTopic(topic: NewTopic, actionPostTopicCreation?: ErrorHandlingFunction): void; - createTopic(topic: NewTopic, timeout?: number, actionPostTopicCreation?: ErrorHandlingFunction): void; - deleteTopic(topic: string, actionPostTopicDeletion?: ErrorHandlingFunction): void; - deleteTopic(topic: string, timeout?: number, actionPostTopicDeletion?: ErrorHandlingFunction): void; - createPartitions(topic: string, totalPartitions: number, actionPostPartitionCreation?: ErrorHandlingFunction): void; - createPartitions(topic: string, totalPartitions: number, timeout?: number, actionPostPartitionCreation?: ErrorHandlingFunction): void; - disconnect(): void; -} \ No newline at end of file diff --git a/types/tsconfig.json b/types/tsconfig.json deleted file mode 100644 index a9061b7..0000000 --- a/types/tsconfig.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "compilerOptions": { - "module": "commonjs", - "target": "esnext", - "noImplicitAny": true, - "noImplicitThis": true, - "strictNullChecks": true, - "baseUrl": "../", - "typeRoots": ["../"], - "types": [], - "noEmit": true, - "forceConsistentCasingInFileNames": true, - "strictFunctionTypes": true, - "esModuleInterop": true - }, - "files": [ - "index.d.ts" - ] -} \ No newline at end of file