Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: kafka error producer not connected #338

Merged
merged 17 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 43 additions & 19 deletions packages/message-bus/src/class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export type MBConsumer = {
export class MessageBus {
#client: Kafka;
#producer: Producer | undefined;
#producerConnectingPromise: Promise<void> | undefined;
#consumers: Record<string, MBConsumer | undefined>;
#shutdown = false;
logger: Logger;
Expand All @@ -39,16 +40,23 @@ export class MessageBus {
brokers: options.connection.split(","),
sasl: options.saslConfig,
logCreator: () => {
return ({ log }) => {
return ({ log, label }) => {
const { message, error, stack, retryCount } = log;
if (stack) this.logger.error(`Kafka: ${stack}`);
else if (error) this.logger.error(`Kafka: ${error} (message: ${message})`);
else if (retryCount) this.logger.warn(`Kafka: ${message} (retrying ${retryCount}...)`);
else this.logger.info(`Kafka: ${message}`);
if (stack) {
this.logger.error(`Kafka: ${stack}`);
} else if (error) {
this.logger.error(`Kafka: ${error} (message: ${message})`);
} else if (retryCount) {
this.logger.warn(`Kafka: ${message} (retrying ${retryCount}...)`);
} else {
this.logger.info(`Kafka[${label}]: ${message}`);
}
};
},
};
if (options.ssl) kafkaConfig.ssl = true;
if (options.ssl) {
kafkaConfig.ssl = true;
}

this.#client = new Kafka(kafkaConfig);
this.#consumers = {};
Expand All @@ -69,9 +77,12 @@ export class MessageBus {
);
}

private async connectProducer() {
this.#producer = this.#client.producer();
await this.#producer.connect();
private async ensureProducerConnected() {
if (!this.#producerConnectingPromise) {
this.#producer = this.#client.producer();
this.#producerConnectingPromise = this.#producer.connect();
}
await this.#producerConnectingPromise;
}

private async connectConsumer(groupId: string) {
Expand All @@ -83,13 +94,16 @@ export class MessageBus {
}

async publish<T>(topic: string, ...messages: T[]) {
if (this.#shutdown) throw new Error("MessageBus is already shutdown");
if (this.#shutdown) {
throw new Error("MessageBus is already shutdown");
}

if (!this.#producer) await this.connectProducer();
await this.ensureProducerConnected();

const kafkaMessages: Message[] = [];
for (const m of messages)
kafkaMessages.push({ value: Buffer.from(BSON.serialize({ data: m })) }); // double check
for (const m of messages) {
kafkaMessages.push({ value: Buffer.from(BSON.serialize({ data: m })) });
} // double check

await this.#producer!.send({ topic, messages: kafkaMessages });
}
Expand All @@ -101,7 +115,9 @@ export class MessageBus {
cb: ((event: T) => Promise<void>) | ((event: T) => void)
) {
this.logger.info(`Kafka (sub): connecting to consumer group ${groupId}`);
if (!this.#consumers[groupId]) await this.connectConsumer(groupId);
if (!this.#consumers[groupId]) {
await this.connectConsumer(groupId);
}

this.logger.info(`Kafka (sub): subscribing to topic ${topic}`);
await this.#consumers[groupId]!.consumer.subscribe({
Expand All @@ -119,7 +135,11 @@ export class MessageBus {
// eslint-disable-next-line @typescript-eslint/no-misused-promises
this.#consumers[groupId]!.consumer.on("consumer.crash", async (payload) => {
if (this.shouldCrash(payload.payload.error)) {
this.logger.error(`FATAL: not recoverable error: ${JSON.stringify(payload.payload.error)}`);
this.logger.error(
`FATAL: not recoverable error: ${JSON.stringify(
payload.payload.error
)}`
);
process.kill(process.pid, "SIGTERM");
return;
}
Expand Down Expand Up @@ -148,8 +168,9 @@ export class MessageBus {
}

async runConsumer(groupId: string) {
if (!this.#consumers[groupId])
if (!this.#consumers[groupId]) {
throw new Error(`No consumer exists with groupId: ${groupId}`);
}

this.nonRetriableWrapper(groupId);

Expand All @@ -171,8 +192,10 @@ export class MessageBus {
`Provided callback for topic ${topic} failed: ${e.stack}`
);
} else {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
this.logger.warn(`Provided callback for topic ${topic} failed with non-Error: ${e}`);
this.logger.warn(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`Provided callback for topic ${topic} failed with non-Error: ${e}`
);
}
}
}
Expand All @@ -183,7 +206,8 @@ export class MessageBus {
async disconnect() {
this.#shutdown = true;
await this.#producer?.disconnect();
for (const consumer of Object.values(this.#consumers))
for (const consumer of Object.values(this.#consumers)) {
await consumer?.consumer.disconnect();
}
}
}
4 changes: 3 additions & 1 deletion packages/message-bus/src/environment.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
export const KAFKA_CONNECTION = process.env.KAFKA_CONNECTION ?? "kafka:9092";
export const KAFKA_SASL_USERNAME = process.env.KAFKA_SASL_USERNAME ?? "controller_user";
export const KAFKA_SASL_USERNAME =
process.env.KAFKA_SASL_USERNAME ?? "controller_user";
export const KAFKA_SASL_PASSWORD = process.env.KAFKA_SASL_PASSWORD ?? "test";
process.env.KAFKAJS_NO_PARTITIONER_WARNING = "1";

export const getConfigStr = () => {
return `KAFKA
Expand Down
21 changes: 17 additions & 4 deletions packages/message-bus/src/svc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
type MicroserviceBaseSvc,
} from "@chicmoz-pkg/microservice-base";
import { backOff } from "exponential-backoff";
import { KafkaJSProtocolError } from "kafkajs";
import { MessageBus } from "./class.js";
import {
KAFKA_CONNECTION,
Expand Down Expand Up @@ -36,12 +37,15 @@ export const init = async (instanceName: string, logger: Logger) => {

const checkReady = () => {
const state = getSvcState(svcId);
if (state === MicroserviceBaseSvcState.SHUTTING_DOWN)
if (state === MicroserviceBaseSvcState.SHUTTING_DOWN) {
throw new Error("MessageBus is shutting down");
if (state === MicroserviceBaseSvcState.DOWN)
}
if (state === MicroserviceBaseSvcState.DOWN) {
throw new Error("MessageBus is down");
if (state === MicroserviceBaseSvcState.INITIALIZING)
}
if (state === MicroserviceBaseSvcState.INITIALIZING) {
throw new Error("MessageBus is initializing");
}
return state;
};

Expand Down Expand Up @@ -77,8 +81,17 @@ export const startSubscribe = async (
logger.info("Trying to subscribe before MessageBus is initialized...");
return false;
}
if (
(e as KafkaJSProtocolError).message ===
"This server does not host this topic-partition"
) {
logger.info(
"If this happens during the first mins of starting the cluster it's fine..."
);
} else {
logger.warn(e);
}
// TODO: probably not infinite retries?
logger.warn(e);
logger.info(`Retrying attempt ${attemptNumber}...`);
return true;
},
Expand Down
24 changes: 13 additions & 11 deletions services/explorer-api/src/events/received/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ import { pendingTxHandler } from "./on-pending-txs.js";
import { sequencerInfoHandler } from "./on-sequencer-info.js";

export const subscribeHandlers = async () => {
await startSubscribe(chainInfoHandler);
await startSubscribe(sequencerInfoHandler);
await startSubscribe(l2RpcNodeAliveHandler);
await startSubscribe(l2RpcNodeErrorHandler);
await startSubscribe(blockHandler);
await startSubscribe(catchupHandler);
await startSubscribe(pendingTxHandler);
await startSubscribe(l1L2ValidatorHandler);
await startSubscribe(l1L2BlockProposedHandler);
await startSubscribe(l1L2ProofVerifiedHandler);
await startSubscribe(l1GenericContractEventHandler);
await Promise.all([
startSubscribe(chainInfoHandler),
startSubscribe(sequencerInfoHandler),
startSubscribe(l2RpcNodeAliveHandler),
startSubscribe(l2RpcNodeErrorHandler),
startSubscribe(blockHandler),
startSubscribe(catchupHandler),
startSubscribe(pendingTxHandler),
startSubscribe(l1L2ValidatorHandler),
startSubscribe(l1L2BlockProposedHandler),
startSubscribe(l1L2ProofVerifiedHandler),
startSubscribe(l1GenericContractEventHandler),
]);
};