Skip to content

Commit

Permalink
update kafka transporter tests
Browse files Browse the repository at this point in the history
  • Loading branch information
icebob committed Jul 16, 2023
1 parent f682a87 commit 43af200
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 181 deletions.
4 changes: 2 additions & 2 deletions dev/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ const broker = new ServiceBroker({
logger: console,
logLevel: "info",
middlewares: [
//Middlewares.Transmit.Encryption("moleculer", "aes-256-cbc"),
//Middlewares.Transmit.Compression(),
// Middlewares.Transmit.Encryption("moleculer", "aes-256-cbc"),
// Middlewares.Transmit.Compression()
//Middlewares.Debugging.TransitLogger({ logPacketData: false, /*folder: null, colors: { send: "magenta", receive: "blue"}*/ }),
//Middlewares.Debugging.ActionLogger({ logParams: true, logResponse: true, /*folder: null, colors: { send: "magenta", receive: "blue"}*/ }),
//require("./RedisHeartbeat")
Expand Down
4 changes: 2 additions & 2 deletions dev/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ const broker = new ServiceBroker({
logLevel: "info",

middlewares: [
//Middlewares.Transmit.Encryption("moleculer", "aes-256-cbc"),
//Middlewares.Transmit.Compression(),
// Middlewares.Transmit.Encryption("moleculer", "aes-256-cbc"),
// Middlewares.Transmit.Compression()
//Middlewares.Debugging.TransitLogger({ logPacketData: false, /*folder: null, colors: { send: "magenta", receive: "blue"}*/ }),
//Middlewares.Debugging.ActionLogger({ logPacketData: false, /*folder: null, colors: { send: "magenta", receive: "blue"}*/ }),
//require("./RedisHeartbeat")
Expand Down
23 changes: 10 additions & 13 deletions src/transporters/kafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,7 @@ const toMoleculerLogLevel = level => {
};

/**
* Lightweight transporter for Kafka
*
* For test:
* 1. clone https://github.com/wurstmeister/kafka-docker.git repo
* 2. follow instructions on https://github.com/wurstmeister/kafka-docker#pre-requisites
* 3. start containers with Docker Compose
*
* docker-compose -f docker-compose-single-broker.yml up -d
* Transporter for Kafka
*
* @class KafkaTransporter
* @extends {Transporter}
Expand All @@ -47,15 +40,19 @@ class KafkaTransporter extends Transporter {
*/
constructor(opts) {
if (typeof opts === "string") {
opts = { brokers: opts.replace("kafka://", "") };
opts = { client: { brokers: [opts.replace("kafka://", "")] } };
} else if (opts == null) {
opts = {};
}

opts = defaultsDeep(opts, {
// KafkaClient options. More info: https://kafka.js.org/docs/configuration
client: {
brokers: Array.isArray(opts.brokers) ? opts.brokers : [opts.brokers],
brokers: Array.isArray(opts.brokers)
? opts.brokers
: opts.brokers
? [opts.brokers]
: null,
logLevel: 1,
logCreator:
logLevel =>
Expand Down Expand Up @@ -138,15 +135,15 @@ class KafkaTransporter extends Transporter {
*/
async disconnect() {
if (this.admin) {
await this.admin.disconnect;
await this.admin.disconnect();
this.admin = null;
}
if (this.producer) {
await this.producer.disconnect;
await this.producer.disconnect();
this.producer = null;
}
if (this.consumer) {
await this.consumer.disconnect;
await this.consumer.disconnect();
this.consumer = null;
}
}
Expand Down
30 changes: 15 additions & 15 deletions test/unit/transporters/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,42 +119,42 @@ describe("Test Transporter resolver", () => {
});

it("should resolve KafkaTransporter from connection string", () => {
let trans = Transporters.resolve("kafka://localhost:2181");
let trans = Transporters.resolve("kafka://localhost:9093");
expect(trans).toBeInstanceOf(Transporters.Kafka);
expect(trans.opts).toEqual({
host: "localhost:2181",
client: {
kafkaHost: "localhost:2181"
brokers: ["localhost:9093"],
logCreator: expect.any(Function),
logLevel: 1
},
consumer: {},
customPartitioner: undefined,
producer: {},
publish: {
attributes: 0,
consumer: {},
publish: {},
publishMessage: {
partition: 0
}
});
});

it("should resolve KafkaTransporter from obj", () => {
let options = {
host: "localhost:2181",
publish: {
client: { brokers: ["localhost:9093"] },
publishMessage: {
partition: 2
}
};
let trans = Transporters.resolve({ type: "Kafka", options });
expect(trans).toBeInstanceOf(Transporters.Kafka);
expect(trans.opts).toEqual({
host: "localhost:2181",
client: {
kafkaHost: "localhost:2181"
brokers: ["localhost:9093"],
logCreator: expect.any(Function),
logLevel: 1
},
consumer: {},
customPartitioner: undefined,
producer: {},
publish: {
attributes: 0,
consumer: {},
publish: {},
publishMessage: {
partition: 2
}
});
Expand Down
Loading

0 comments on commit 43af200

Please sign in to comment.