From 777d16dad30638feee3d073b18576691f0bc5054 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Sun, 1 Dec 2024 17:23:48 -0300 Subject: [PATCH] improve lock perf by using pubsub --- apps/api/src/lock.ts | 276 ++++++++++++----------- apps/api/src/schedule/index.ts | 54 ++--- apps/api/src/yjs/v2/executor/ai/index.ts | 3 +- apps/api/src/yjs/v2/executor/executor.ts | 3 +- 4 files changed, 177 insertions(+), 159 deletions(-) diff --git a/apps/api/src/lock.ts b/apps/api/src/lock.ts index 61dfbd77..74bc94d3 100644 --- a/apps/api/src/lock.ts +++ b/apps/api/src/lock.ts @@ -1,12 +1,19 @@ +import crypto from 'crypto' import { v4 as uuidv4 } from 'uuid' -import prisma from '@briefer/database' +import prisma, { publish, subscribe } from '@briefer/database' import { logger } from './logger.js' import { z } from 'zod' import { exhaustiveCheck } from '@briefer/types' const EXPIRATION_TIME = 1000 * 5 // 5 seconds -const MAX_RETRY_TIMEOUT = 500 // 500ms -const DEFAULT_ACQUIRE_TIMEOUT = Infinity +const RETRY_TIMEOUT = 1000 * 30 // 30 seconds in case pubsub fails +const NUM_PARTITIONS = 32 + +function getPartition(name: string): number { + const hash = crypto.createHash('md5').update(name).digest('hex') + const hashValue = parseInt(hash.slice(0, 8), 16) // Use first 8 hex characters + return hashValue % NUM_PARTITIONS +} class AlreadyAcquiredError extends Error { constructor(public readonly lockName: string) { @@ -15,146 +22,163 @@ class AlreadyAcquiredError extends Error { } } -export class AcquireLockTimeoutError extends Error { - constructor( - public readonly name: string, - public readonly ownerId: string, - public readonly startTime: number, - public readonly acquireTimeout: number, - public readonly attempt: number - ) { - super( - `Failed to acquire lock ${name} with ownerId ${ownerId} after ${acquireTimeout}ms and ${attempt} attempts.` - ) - this.name = 'AcquireLockTimeoutError' - } -} - export async function acquireLock( name: string, - cb: () => Promise, - { acquireTimeout = DEFAULT_ACQUIRE_TIMEOUT }: { acquireTimeout?: number } = {} + cb: () => Promise ): Promise { - const startTime = Date.now() const ownerId = uuidv4() + let acquired = false + let attempt = 0 + let timeout: NodeJS.Timeout | null = null - const inner = async (attempt: number): Promise => { - if (Date.now() - startTime > acquireTimeout) { - throw new AcquireLockTimeoutError( - name, - ownerId, - startTime, - acquireTimeout, - attempt - ) - } + const channel = `lock_releases_${getPartition(name)}` - logger().trace({ name, ownerId, attempt }, 'Acquiring lock') - try { - const lock = await prisma().lock.findFirst({ - where: { - name, - }, - }) - if (!lock) { - // this is safe because if someone else creates the lock in the meantime - // this will raise a unique constraint error that we catch below to retry - await prisma().lock.create({ - data: { - name, - isLocked: true, - ownerId, - expiresAt: new Date(Date.now() + EXPIRATION_TIME), - acquiredAt: new Date(), - }, - }) - } else if (!lock.isLocked || lock.expiresAt < new Date()) { - // this is safe because if someone else updates the lock in the meantime - // this will fail to find the lock to update because expiresAt will be changed - // that will raise a not found error that we catch below to retry - await prisma().lock.update({ + return new Promise(async (resolve, reject) => { + const cleanSubscription = await subscribe(channel, async (event) => { + if (acquired) { + return + } + + if (event === name) { + logger().trace( + { name, ownerId, channel }, + 'Got lock released message. Anticipating lock acquisition attempt' + ) + tryAcquire() + } + }) + + const tryAcquire = async () => { + if (acquired) { + return + } + + attempt++ + logger().trace({ name, ownerId, attempt, channel }, 'Acquiring lock') + try { + const lock = await prisma().lock.findFirst({ where: { - id: lock.id, - expiresAt: lock.expiresAt, - }, - data: { - isLocked: true, - ownerId, - expiresAt: new Date(Date.now() + EXPIRATION_TIME), - acquiredAt: new Date(), + name, }, }) - } else { - // lock is already acquired - throw new AlreadyAcquiredError(name) - } - } catch (err) { - let code = '' - if (err instanceof AlreadyAcquiredError) { - code = 'AlreadyAcquiredError' - } else { - const parsed = z - .object({ code: z.union([z.literal('P2002'), z.literal('P2025')]) }) - .safeParse(err) - if (parsed.success) { - switch (parsed.data.code) { - case 'P2002': - code = 'UniqueConstraintError' - break - case 'P2025': - code = 'NotFound' - break - default: - exhaustiveCheck(parsed.data.code) + if (!lock) { + // this is safe because if someone else creates the lock in the meantime + // this will raise a unique constraint error that we catch below to retry + await prisma().lock.create({ + data: { + name, + isLocked: true, + ownerId, + expiresAt: new Date(Date.now() + EXPIRATION_TIME), + acquiredAt: new Date(), + }, + }) + } else if (!lock.isLocked || lock.expiresAt < new Date()) { + // this is safe because if someone else updates the lock in the meantime + // this will fail to find the lock to update because expiresAt will be changed + // that will raise a not found error that we catch below to retry + await prisma().lock.update({ + where: { + id: lock.id, + expiresAt: lock.expiresAt, + }, + data: { + isLocked: true, + ownerId, + expiresAt: new Date(Date.now() + EXPIRATION_TIME), + acquiredAt: new Date(), + }, + }) + } else { + // lock is already acquired + throw new AlreadyAcquiredError(name) + } + } catch (err) { + let code = '' + if (err instanceof AlreadyAcquiredError) { + code = 'AlreadyAcquiredError' + } else { + const parsed = z + .object({ code: z.union([z.literal('P2002'), z.literal('P2025')]) }) + .safeParse(err) + if (parsed.success) { + switch (parsed.data.code) { + case 'P2002': + code = 'UniqueConstraintError' + break + case 'P2025': + code = 'NotFound' + break + default: + exhaustiveCheck(parsed.data.code) + } } } - } - if (code !== '') { - const timeout = Math.min(MAX_RETRY_TIMEOUT, Math.pow(2, attempt) * 100) - logger().trace( - { name, ownerId, attempt, code, timeout }, - 'Lock is already acquired. Retrying.' + if (code !== '') { + logger().trace( + { + name, + ownerId, + attempt, + code, + retryTimeout: RETRY_TIMEOUT, + channel, + }, + `Lock is already acquired. Retrying in ${RETRY_TIMEOUT}.` + ) + if (timeout) { + clearTimeout(timeout) + } + timeout = setTimeout(tryAcquire, RETRY_TIMEOUT) + return + } + + logger().error( + { name, ownerId, channel, err }, + 'Failed to acquire lock' ) - await new Promise((resolve) => setTimeout(resolve, timeout)) - return inner(attempt + 1) + reject(err) + return } - logger().error({ name, ownerId, err }, 'Failed to acquire lock') - throw err - } - - const extendExpirationInterval = setInterval(async () => { - await prisma().lock.updateMany({ - where: { - name, - ownerId, - }, - data: { - expiresAt: new Date(Date.now() + EXPIRATION_TIME), - }, - }) - }, EXPIRATION_TIME / 3) + const extendExpirationInterval = setInterval(async () => { + await prisma().lock.updateMany({ + where: { + name, + ownerId, + }, + data: { + expiresAt: new Date(Date.now() + EXPIRATION_TIME), + }, + }) + }, EXPIRATION_TIME / 3) - logger().debug({ name, ownerId }, 'Lock acquired') + logger().debug({ name, ownerId, channel }, 'Lock acquired') + acquired = true + await cleanSubscription() - try { - return await cb() - } finally { - logger().trace({ name, ownerId }, 'Releasing lock') - clearInterval(extendExpirationInterval) - await prisma().lock.updateMany({ - where: { - name, - ownerId, - }, - data: { - isLocked: false, - }, - }) - logger().debug({ name, ownerId }, 'Lock released') + try { + resolve(await cb()) + } catch (err) { + reject(err) + } finally { + logger().trace({ name, ownerId, channel }, 'Releasing lock') + clearInterval(extendExpirationInterval) + await prisma().lock.updateMany({ + where: { + name, + ownerId, + }, + data: { + isLocked: false, + }, + }) + await publish(channel, name) + logger().debug({ name, ownerId, channel }, 'Lock released') + } } - } - return inner(0) + tryAcquire() + }) } diff --git a/apps/api/src/schedule/index.ts b/apps/api/src/schedule/index.ts index ffc4691a..bd2831bc 100644 --- a/apps/api/src/schedule/index.ts +++ b/apps/api/src/schedule/index.ts @@ -201,40 +201,36 @@ export async function runSchedule(socketServer: IOServer) { let stop = false const loop = new Promise(async (resolve) => { logger().trace('Acquiring lock to be the schedule executor') - await acquireLock( - 'schedule-executor', - async () => { + await acquireLock('schedule-executor', async () => { + if (stop) { + logger().trace( + 'Schedule executor lock acquired but server is shutting down' + ) + return + } + + logger().trace('Schedule executor lock acquired') + while (true) { if (stop) { - logger().trace( - 'Schedule executor lock acquired but server is shutting down' - ) - return + break } - logger().trace('Schedule executor lock acquired') - while (true) { - if (stop) { - break - } - - try { - await updateSchedule() - } catch (err) { - logger().error( - { err, module: 'schedule' }, - 'Failed to update schedule' - ) - } - - if (stop) { - break - } + try { + await updateSchedule() + } catch (err) { + logger().error( + { err, module: 'schedule' }, + 'Failed to update schedule' + ) + } - await new Promise((resolve) => setTimeout(resolve, 5000)) + if (stop) { + break } - }, - { acquireTimeout: Infinity } - ) + + await new Promise((resolve) => setTimeout(resolve, 5000)) + } + }) resolve() }) diff --git a/apps/api/src/yjs/v2/executor/ai/index.ts b/apps/api/src/yjs/v2/executor/ai/index.ts index ece3dbf9..8aa175f8 100644 --- a/apps/api/src/yjs/v2/executor/ai/index.ts +++ b/apps/api/src/yjs/v2/executor/ai/index.ts @@ -144,8 +144,7 @@ export class AIExecutor { } tick() - }), - { acquireTimeout: Infinity } + }) ) } catch (err) { logger().error( diff --git a/apps/api/src/yjs/v2/executor/executor.ts b/apps/api/src/yjs/v2/executor/executor.ts index 80c05791..58632a77 100644 --- a/apps/api/src/yjs/v2/executor/executor.ts +++ b/apps/api/src/yjs/v2/executor/executor.ts @@ -197,8 +197,7 @@ export class Executor { } tick() - }), - { acquireTimeout: Infinity } + }) ) } catch (err) { logger().error(