Skip to content

Commit 5f7abbb

Browse files
authored
fix: kafka error producer not connected (#338)
1 parent 76e6552 commit 5f7abbb

File tree

4 files changed

+76
-35
lines changed

4 files changed

+76
-35
lines changed

packages/message-bus/src/class.ts

+43-19
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ export type MBConsumer = {
2828
export class MessageBus {
2929
#client: Kafka;
3030
#producer: Producer | undefined;
31+
#producerConnectingPromise: Promise<void> | undefined;
3132
#consumers: Record<string, MBConsumer | undefined>;
3233
#shutdown = false;
3334
logger: Logger;
@@ -39,16 +40,23 @@ export class MessageBus {
3940
brokers: options.connection.split(","),
4041
sasl: options.saslConfig,
4142
logCreator: () => {
42-
return ({ log }) => {
43+
return ({ log, label }) => {
4344
const { message, error, stack, retryCount } = log;
44-
if (stack) this.logger.error(`Kafka: ${stack}`);
45-
else if (error) this.logger.error(`Kafka: ${error} (message: ${message})`);
46-
else if (retryCount) this.logger.warn(`Kafka: ${message} (retrying ${retryCount}...)`);
47-
else this.logger.info(`Kafka: ${message}`);
45+
if (stack) {
46+
this.logger.error(`Kafka: ${stack}`);
47+
} else if (error) {
48+
this.logger.error(`Kafka: ${error} (message: ${message})`);
49+
} else if (retryCount) {
50+
this.logger.warn(`Kafka: ${message} (retrying ${retryCount}...)`);
51+
} else {
52+
this.logger.info(`Kafka[${label}]: ${message}`);
53+
}
4854
};
4955
},
5056
};
51-
if (options.ssl) kafkaConfig.ssl = true;
57+
if (options.ssl) {
58+
kafkaConfig.ssl = true;
59+
}
5260

5361
this.#client = new Kafka(kafkaConfig);
5462
this.#consumers = {};
@@ -69,9 +77,12 @@ export class MessageBus {
6977
);
7078
}
7179

72-
private async connectProducer() {
73-
this.#producer = this.#client.producer();
74-
await this.#producer.connect();
80+
private async ensureProducerConnected() {
81+
if (!this.#producerConnectingPromise) {
82+
this.#producer = this.#client.producer();
83+
this.#producerConnectingPromise = this.#producer.connect();
84+
}
85+
await this.#producerConnectingPromise;
7586
}
7687

7788
private async connectConsumer(groupId: string) {
@@ -83,13 +94,16 @@ export class MessageBus {
8394
}
8495

8596
async publish<T>(topic: string, ...messages: T[]) {
86-
if (this.#shutdown) throw new Error("MessageBus is already shutdown");
97+
if (this.#shutdown) {
98+
throw new Error("MessageBus is already shutdown");
99+
}
87100

88-
if (!this.#producer) await this.connectProducer();
101+
await this.ensureProducerConnected();
89102

90103
const kafkaMessages: Message[] = [];
91-
for (const m of messages)
92-
kafkaMessages.push({ value: Buffer.from(BSON.serialize({ data: m })) }); // double check
104+
for (const m of messages) {
105+
kafkaMessages.push({ value: Buffer.from(BSON.serialize({ data: m })) });
106+
} // double check
93107

94108
await this.#producer!.send({ topic, messages: kafkaMessages });
95109
}
@@ -101,7 +115,9 @@ export class MessageBus {
101115
cb: ((event: T) => Promise<void>) | ((event: T) => void)
102116
) {
103117
this.logger.info(`Kafka (sub): connecting to consumer group ${groupId}`);
104-
if (!this.#consumers[groupId]) await this.connectConsumer(groupId);
118+
if (!this.#consumers[groupId]) {
119+
await this.connectConsumer(groupId);
120+
}
105121

106122
this.logger.info(`Kafka (sub): subscribing to topic ${topic}`);
107123
await this.#consumers[groupId]!.consumer.subscribe({
@@ -119,7 +135,11 @@ export class MessageBus {
119135
// eslint-disable-next-line @typescript-eslint/no-misused-promises
120136
this.#consumers[groupId]!.consumer.on("consumer.crash", async (payload) => {
121137
if (this.shouldCrash(payload.payload.error)) {
122-
this.logger.error(`FATAL: not recoverable error: ${JSON.stringify(payload.payload.error)}`);
138+
this.logger.error(
139+
`FATAL: not recoverable error: ${JSON.stringify(
140+
payload.payload.error
141+
)}`
142+
);
123143
process.kill(process.pid, "SIGTERM");
124144
return;
125145
}
@@ -148,8 +168,9 @@ export class MessageBus {
148168
}
149169

150170
async runConsumer(groupId: string) {
151-
if (!this.#consumers[groupId])
171+
if (!this.#consumers[groupId]) {
152172
throw new Error(`No consumer exists with groupId: ${groupId}`);
173+
}
153174

154175
this.nonRetriableWrapper(groupId);
155176

@@ -171,8 +192,10 @@ export class MessageBus {
171192
`Provided callback for topic ${topic} failed: ${e.stack}`
172193
);
173194
} else {
174-
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
175-
this.logger.warn(`Provided callback for topic ${topic} failed with non-Error: ${e}`);
195+
this.logger.warn(
196+
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
197+
`Provided callback for topic ${topic} failed with non-Error: ${e}`
198+
);
176199
}
177200
}
178201
}
@@ -183,7 +206,8 @@ export class MessageBus {
183206
async disconnect() {
184207
this.#shutdown = true;
185208
await this.#producer?.disconnect();
186-
for (const consumer of Object.values(this.#consumers))
209+
for (const consumer of Object.values(this.#consumers)) {
187210
await consumer?.consumer.disconnect();
211+
}
188212
}
189213
}

packages/message-bus/src/environment.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
export const KAFKA_CONNECTION = process.env.KAFKA_CONNECTION ?? "kafka:9092";
2-
export const KAFKA_SASL_USERNAME = process.env.KAFKA_SASL_USERNAME ?? "controller_user";
2+
export const KAFKA_SASL_USERNAME =
3+
process.env.KAFKA_SASL_USERNAME ?? "controller_user";
34
export const KAFKA_SASL_PASSWORD = process.env.KAFKA_SASL_PASSWORD ?? "test";
5+
process.env.KAFKAJS_NO_PARTITIONER_WARNING = "1";
46

57
export const getConfigStr = () => {
68
return `KAFKA

packages/message-bus/src/svc.ts

+17-4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
type MicroserviceBaseSvc,
1010
} from "@chicmoz-pkg/microservice-base";
1111
import { backOff } from "exponential-backoff";
12+
import { KafkaJSProtocolError } from "kafkajs";
1213
import { MessageBus } from "./class.js";
1314
import {
1415
KAFKA_CONNECTION,
@@ -36,12 +37,15 @@ export const init = async (instanceName: string, logger: Logger) => {
3637

3738
const checkReady = () => {
3839
const state = getSvcState(svcId);
39-
if (state === MicroserviceBaseSvcState.SHUTTING_DOWN)
40+
if (state === MicroserviceBaseSvcState.SHUTTING_DOWN) {
4041
throw new Error("MessageBus is shutting down");
41-
if (state === MicroserviceBaseSvcState.DOWN)
42+
}
43+
if (state === MicroserviceBaseSvcState.DOWN) {
4244
throw new Error("MessageBus is down");
43-
if (state === MicroserviceBaseSvcState.INITIALIZING)
45+
}
46+
if (state === MicroserviceBaseSvcState.INITIALIZING) {
4447
throw new Error("MessageBus is initializing");
48+
}
4549
return state;
4650
};
4751

@@ -77,8 +81,17 @@ export const startSubscribe = async (
7781
logger.info("Trying to subscribe before MessageBus is initialized...");
7882
return false;
7983
}
84+
if (
85+
(e as KafkaJSProtocolError).message ===
86+
"This server does not host this topic-partition"
87+
) {
88+
logger.info(
89+
"If this happens during the first mins of starting the cluster it's fine..."
90+
);
91+
} else {
92+
logger.warn(e);
93+
}
8094
// TODO: probably not infinite retries?
81-
logger.warn(e);
8295
logger.info(`Retrying attempt ${attemptNumber}...`);
8396
return true;
8497
},

services/explorer-api/src/events/received/index.ts

+13-11
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,17 @@ import { pendingTxHandler } from "./on-pending-txs.js";
1515
import { sequencerInfoHandler } from "./on-sequencer-info.js";
1616

1717
export const subscribeHandlers = async () => {
18-
await startSubscribe(chainInfoHandler);
19-
await startSubscribe(sequencerInfoHandler);
20-
await startSubscribe(l2RpcNodeAliveHandler);
21-
await startSubscribe(l2RpcNodeErrorHandler);
22-
await startSubscribe(blockHandler);
23-
await startSubscribe(catchupHandler);
24-
await startSubscribe(pendingTxHandler);
25-
await startSubscribe(l1L2ValidatorHandler);
26-
await startSubscribe(l1L2BlockProposedHandler);
27-
await startSubscribe(l1L2ProofVerifiedHandler);
28-
await startSubscribe(l1GenericContractEventHandler);
18+
await Promise.all([
19+
startSubscribe(chainInfoHandler),
20+
startSubscribe(sequencerInfoHandler),
21+
startSubscribe(l2RpcNodeAliveHandler),
22+
startSubscribe(l2RpcNodeErrorHandler),
23+
startSubscribe(blockHandler),
24+
startSubscribe(catchupHandler),
25+
startSubscribe(pendingTxHandler),
26+
startSubscribe(l1L2ValidatorHandler),
27+
startSubscribe(l1L2BlockProposedHandler),
28+
startSubscribe(l1L2ProofVerifiedHandler),
29+
startSubscribe(l1GenericContractEventHandler),
30+
]);
2931
};

0 commit comments

Comments
 (0)