Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: kafka error producer not connected #338

Merged
merged 17 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# workaround for wildcards not working in exclusions (requires docker engine >23 https://docs.docker.com/engine/release-notes/23.0/#bug-fixes-and-enhancements-1)
# find . -name 'package.json' -not -path './.yarn/*' -not -path './packages/*' -maxdepth 3 | sed -e 's/^.\//!/'
!services/auth/package.json
!services/dummy-node/package.json
!services/event-cannon/package.json
!services/aztec-listener/package.json
!services/ethereum-listener/package.json
Expand Down
32 changes: 32 additions & 0 deletions .github/workflows/build_on_pr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Build on PR

on:
pull_request:
types: [opened, synchronize, reopened]

jobs:
build:
if: ${{ github.event.pull_request.draft == false }}
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2

- name: Set up Node.js
uses: actions/setup-node@v2
with:
node-version: "20"

- name: Install dependencies
run: yarn install

- name: Build
run: yarn build

# TODO: fix linter-errors
#- name: Lint
# run: yarn lint

- name: Test
run: yarn test
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ ignore/
!.yarn/releases
!.yarn/sdks
!.yarn/versions

!.github/workflows/*
39 changes: 0 additions & 39 deletions k8s/local/dummy-node/deployment.yaml

This file was deleted.

24 changes: 0 additions & 24 deletions k8s/local/dummy-node/ingress.yaml

This file was deleted.

13 changes: 0 additions & 13 deletions k8s/local/dummy-node/service.yaml

This file was deleted.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"lint:packages": "yarn workspaces foreach --parallel --all --topological-dev --verbose --include '@chicmoz-pkg/*' run lint",
"build": "yarn workspaces foreach --parallel --all --topological-dev --verbose run build",
"lint": "yarn workspaces foreach --parallel --all --topological-dev --verbose run lint",
"test": "yarn workspaces foreach --parallel --all --topological-dev --verbose run test-once",
"g:lint": "cd $INIT_CWD && eslint --fix",
"postinstall": "husky install",
"prepare": "husky install"
Expand Down
3 changes: 2 additions & 1 deletion packages/contract-verification/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"start": "yarn node build/index.js",
"lint": "yarn run lint-base --ext .ts .",
"lint-base": "yarn run g:lint",
"test": "yarn run vitest"
"test": "yarn run vitest",
"test-once": "yarn run vitest run"
},
"dependencies": {
"@aztec/aztec.js": "0.76.4",
Expand Down
34 changes: 19 additions & 15 deletions packages/message-bus/src/class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export type MBConsumer = {
export class MessageBus {
#client: Kafka;
#producer: Producer | undefined;
#producerConnectingPromise: Promise<void> | undefined;
#consumers: Record<string, MBConsumer | undefined>;
#shutdown = false;
logger: Logger;
Expand All @@ -39,16 +40,16 @@ export class MessageBus {
brokers: options.connection.split(","),
sasl: options.saslConfig,
logCreator: () => {
return ({ log }) => {
return ({ log, label }) => {
const { message, error, stack, retryCount } = log;
if (stack) this.logger.error(`Kafka: ${stack}`);
else if (error) this.logger.error(`Kafka: ${error} (message: ${message})`);
else if (retryCount) this.logger.warn(`Kafka: ${message} (retrying ${retryCount}...)`);
else this.logger.info(`Kafka: ${message}`);
if (stack) {this.logger.error(`Kafka: ${stack}`);}
else if (error) {this.logger.error(`Kafka: ${error} (message: ${message})`);}
else if (retryCount) {this.logger.warn(`Kafka: ${message} (retrying ${retryCount}...)`);}
else {this.logger.info(`Kafka[${label}]: ${message}`);}
};
},
};
if (options.ssl) kafkaConfig.ssl = true;
if (options.ssl) {kafkaConfig.ssl = true;}

this.#client = new Kafka(kafkaConfig);
this.#consumers = {};
Expand All @@ -69,9 +70,12 @@ export class MessageBus {
);
}

private async connectProducer() {
this.#producer = this.#client.producer();
await this.#producer.connect();
private async ensureProducerConnected() {
if (!this.#producerConnectingPromise) {
this.#producer = this.#client.producer();
this.#producerConnectingPromise = this.#producer.connect();
}
await this.#producerConnectingPromise;
}

private async connectConsumer(groupId: string) {
Expand All @@ -83,13 +87,13 @@ export class MessageBus {
}

async publish<T>(topic: string, ...messages: T[]) {
if (this.#shutdown) throw new Error("MessageBus is already shutdown");
if (this.#shutdown) {throw new Error("MessageBus is already shutdown");}

if (!this.#producer) await this.connectProducer();
await this.ensureProducerConnected();

const kafkaMessages: Message[] = [];
for (const m of messages)
kafkaMessages.push({ value: Buffer.from(BSON.serialize({ data: m })) }); // double check
{kafkaMessages.push({ value: Buffer.from(BSON.serialize({ data: m })) });} // double check

await this.#producer!.send({ topic, messages: kafkaMessages });
}
Expand All @@ -101,7 +105,7 @@ export class MessageBus {
cb: ((event: T) => Promise<void>) | ((event: T) => void)
) {
this.logger.info(`Kafka (sub): connecting to consumer group ${groupId}`);
if (!this.#consumers[groupId]) await this.connectConsumer(groupId);
if (!this.#consumers[groupId]) {await this.connectConsumer(groupId);}

this.logger.info(`Kafka (sub): subscribing to topic ${topic}`);
await this.#consumers[groupId]!.consumer.subscribe({
Expand Down Expand Up @@ -149,7 +153,7 @@ export class MessageBus {

async runConsumer(groupId: string) {
if (!this.#consumers[groupId])
throw new Error(`No consumer exists with groupId: ${groupId}`);
{throw new Error(`No consumer exists with groupId: ${groupId}`);}

this.nonRetriableWrapper(groupId);

Expand Down Expand Up @@ -184,6 +188,6 @@ export class MessageBus {
this.#shutdown = true;
await this.#producer?.disconnect();
for (const consumer of Object.values(this.#consumers))
await consumer?.consumer.disconnect();
{await consumer?.consumer.disconnect();}
}
}
4 changes: 3 additions & 1 deletion packages/message-bus/src/environment.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
export const KAFKA_CONNECTION = process.env.KAFKA_CONNECTION ?? "kafka:9092";
export const KAFKA_SASL_USERNAME = process.env.KAFKA_SASL_USERNAME ?? "controller_user";
export const KAFKA_SASL_USERNAME =
process.env.KAFKA_SASL_USERNAME ?? "controller_user";
export const KAFKA_SASL_PASSWORD = process.env.KAFKA_SASL_PASSWORD ?? "test";
process.env.KAFKAJS_NO_PARTITIONER_WARNING = "1";

export const getConfigStr = () => {
return `KAFKA
Expand Down
18 changes: 14 additions & 4 deletions packages/message-bus/src/svc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
type MicroserviceBaseSvc,
} from "@chicmoz-pkg/microservice-base";
import { backOff } from "exponential-backoff";
import { KafkaJSProtocolError } from "kafkajs";
import { MessageBus } from "./class.js";
import {
KAFKA_CONNECTION,
Expand Down Expand Up @@ -37,11 +38,11 @@ export const init = async (instanceName: string, logger: Logger) => {
const checkReady = () => {
const state = getSvcState(svcId);
if (state === MicroserviceBaseSvcState.SHUTTING_DOWN)
throw new Error("MessageBus is shutting down");
{throw new Error("MessageBus is shutting down");}
if (state === MicroserviceBaseSvcState.DOWN)
throw new Error("MessageBus is down");
{throw new Error("MessageBus is down");}
if (state === MicroserviceBaseSvcState.INITIALIZING)
throw new Error("MessageBus is initializing");
{throw new Error("MessageBus is initializing");}
return state;
};

Expand Down Expand Up @@ -77,8 +78,17 @@ export const startSubscribe = async (
logger.info("Trying to subscribe before MessageBus is initialized...");
return false;
}
if (
(e as KafkaJSProtocolError).message ===
"This server does not host this topic-partition"
) {
logger.info(
"If this happens during the first mins of starting the cluster it's fine..."
);
} else {
logger.warn(e);
}
// TODO: probably not infinite retries?
logger.warn(e);
logger.info(`Retrying attempt ${attemptNumber}...`);
return true;
},
Expand Down
6 changes: 3 additions & 3 deletions packages/types/src/ethereum/contract-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export const chicmozL1L2BlockProposedSchema = z.object({
l1ContractAddress: ethAddressSchema,
l2BlockNumber: z.coerce.bigint(),
l1BlockNumber: z.coerce.bigint(),
l1BlockTimestamp: z.number(),
l1BlockTimestamp: z.coerce.date(),
l1BlockHash: z.string().startsWith("0x"),
isFinalized: z.boolean().default(false),
archive: frSchema,
Expand All @@ -22,7 +22,7 @@ export const chicmozL1L2ProofVerifiedSchema = z.object({
l1ContractAddress: ethAddressSchema,
l2BlockNumber: z.coerce.bigint(),
l1BlockNumber: z.coerce.bigint(),
l1BlockTimestamp: z.number(),
l1BlockTimestamp: z.coerce.date(),
l1BlockHash: z.string().startsWith("0x"),
isFinalized: z.boolean().default(false),
proverId: frSchema,
Expand All @@ -39,7 +39,7 @@ export const chicmozL1GenericContractEventSchema = z.object({
eventArgs: z.record(z.unknown()).optional(),
l1BlockNumber: z.coerce.bigint(),
l1BlockHash: z.string().startsWith("0x"),
l1BlockTimestamp: z.number(),
l1BlockTimestamp: z.coerce.date(),
l1ContractAddress: z.string(),
isFinalized: z.boolean().default(false),
l1TransactionHash: z.string().startsWith("0x").optional().nullable(),
Expand Down
5 changes: 0 additions & 5 deletions services/auth/load_auth.sh

This file was deleted.

5 changes: 3 additions & 2 deletions services/auth/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@
"build": "yarn tsc",
"postbuild": "ln -s ../public build/public",
"predev": "yarn build",
"dev": "yarn node build/index.js",
"start": "yarn node build/index.js",
"dev": "yarn node build/src/index.js",
"start": "yarn node build/src/index.js",
"test": "yarn run vitest",
"test-once": "yarn run vitest run",
"lint": "yarn run lint-base --ext .ts .",
"lint-base": "yarn run g:lint"
},
Expand Down
14 changes: 8 additions & 6 deletions services/auth/src/core.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@ import { RateLimitDb } from "./database/rate-limit.js";
describe("core unit tests", () => {
// FIXME: Mock these things better most likely
const infoMock = vi.fn(() => "");

const logger = {
info: infoMock,
} as unknown as Logger;

const mockApiKey = "9ccca684-28f8-4897-af71-5f85fcbd60bd";

const registerMock = vi.fn(() => [{ id: "1", apiKey: mockApiKey }, true]);
const registerWithDiscordIdMock = vi.fn(() => [{ id: "2", apiKey: mockApiKey }, true]);
const registerWithDiscordIdMock = vi.fn(() => [
{ id: "2", apiKey: mockApiKey },
true,
]);
const deleteMock = vi.fn(() => 1);
const deleteMockRateLimitDb = vi.fn(() => Promise<void>);

const setApiKeySubscriptionMock = vi.fn();

const rateLimitDb = {
setApiKeySubscription: setApiKeySubscriptionMock,
deleteApiKey: deleteMockRateLimitDb,
Expand All @@ -32,7 +35,6 @@ describe("core unit tests", () => {
delete: deleteMock,
} as unknown as DB;


describe("extractApiKey", () => {
test("success; multiple sub-paths", () => {
const url = "/v1/4aedfbf7-4d0f-4033-9bc8-50443d5be2cd/latest/height";
Expand Down
Loading