Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve lock perf by using pubsub #274

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 150 additions & 126 deletions apps/api/src/lock.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import crypto from 'crypto'
import { v4 as uuidv4 } from 'uuid'
import prisma from '@briefer/database'
import prisma, { publish, subscribe } from '@briefer/database'
import { logger } from './logger.js'
import { z } from 'zod'
import { exhaustiveCheck } from '@briefer/types'

const EXPIRATION_TIME = 1000 * 5 // 5 seconds
const MAX_RETRY_TIMEOUT = 500 // 500ms
const DEFAULT_ACQUIRE_TIMEOUT = Infinity
const RETRY_TIMEOUT = 1000 * 30 // 30 seconds in case pubsub fails
const NUM_PARTITIONS = 32

function getPartition(name: string): number {
const hash = crypto.createHash('md5').update(name).digest('hex')
const hashValue = parseInt(hash.slice(0, 8), 16) // Use first 8 hex characters
return hashValue % NUM_PARTITIONS
}

class AlreadyAcquiredError extends Error {
constructor(public readonly lockName: string) {
Expand All @@ -15,146 +22,163 @@ class AlreadyAcquiredError extends Error {
}
}

export class AcquireLockTimeoutError extends Error {
constructor(
public readonly name: string,
public readonly ownerId: string,
public readonly startTime: number,
public readonly acquireTimeout: number,
public readonly attempt: number
) {
super(
`Failed to acquire lock ${name} with ownerId ${ownerId} after ${acquireTimeout}ms and ${attempt} attempts.`
)
this.name = 'AcquireLockTimeoutError'
}
}

export async function acquireLock<T>(
name: string,
cb: () => Promise<T>,
{ acquireTimeout = DEFAULT_ACQUIRE_TIMEOUT }: { acquireTimeout?: number } = {}
cb: () => Promise<T>
): Promise<T> {
const startTime = Date.now()
const ownerId = uuidv4()
let acquired = false
let attempt = 0
let timeout: NodeJS.Timeout | null = null

const inner = async (attempt: number): Promise<T> => {
if (Date.now() - startTime > acquireTimeout) {
throw new AcquireLockTimeoutError(
name,
ownerId,
startTime,
acquireTimeout,
attempt
)
}
const channel = `lock_releases_${getPartition(name)}`

logger().trace({ name, ownerId, attempt }, 'Acquiring lock')
try {
const lock = await prisma().lock.findFirst({
where: {
name,
},
})
if (!lock) {
// this is safe because if someone else creates the lock in the meantime
// this will raise a unique constraint error that we catch below to retry
await prisma().lock.create({
data: {
name,
isLocked: true,
ownerId,
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
acquiredAt: new Date(),
},
})
} else if (!lock.isLocked || lock.expiresAt < new Date()) {
// this is safe because if someone else updates the lock in the meantime
// this will fail to find the lock to update because expiresAt will be changed
// that will raise a not found error that we catch below to retry
await prisma().lock.update({
return new Promise<T>(async (resolve, reject) => {
const cleanSubscription = await subscribe(channel, async (event) => {
if (acquired) {
return
}

if (event === name) {
logger().trace(
{ name, ownerId, channel },
'Got lock released message. Anticipating lock acquisition attempt'
)
tryAcquire()
}
})

const tryAcquire = async () => {
if (acquired) {
return
}

attempt++
logger().trace({ name, ownerId, attempt, channel }, 'Acquiring lock')
try {
const lock = await prisma().lock.findFirst({
where: {
id: lock.id,
expiresAt: lock.expiresAt,
},
data: {
isLocked: true,
ownerId,
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
acquiredAt: new Date(),
name,
},
})
} else {
// lock is already acquired
throw new AlreadyAcquiredError(name)
}
} catch (err) {
let code = ''
if (err instanceof AlreadyAcquiredError) {
code = 'AlreadyAcquiredError'
} else {
const parsed = z
.object({ code: z.union([z.literal('P2002'), z.literal('P2025')]) })
.safeParse(err)
if (parsed.success) {
switch (parsed.data.code) {
case 'P2002':
code = 'UniqueConstraintError'
break
case 'P2025':
code = 'NotFound'
break
default:
exhaustiveCheck(parsed.data.code)
if (!lock) {
// this is safe because if someone else creates the lock in the meantime
// this will raise a unique constraint error that we catch below to retry
await prisma().lock.create({
data: {
name,
isLocked: true,
ownerId,
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
acquiredAt: new Date(),
},
})
} else if (!lock.isLocked || lock.expiresAt < new Date()) {
// this is safe because if someone else updates the lock in the meantime
// this will fail to find the lock to update because expiresAt will be changed
// that will raise a not found error that we catch below to retry
await prisma().lock.update({
where: {
id: lock.id,
expiresAt: lock.expiresAt,
},
data: {
isLocked: true,
ownerId,
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
acquiredAt: new Date(),
},
})
} else {
// lock is already acquired
throw new AlreadyAcquiredError(name)
}
} catch (err) {
let code = ''
if (err instanceof AlreadyAcquiredError) {
code = 'AlreadyAcquiredError'
} else {
const parsed = z
.object({ code: z.union([z.literal('P2002'), z.literal('P2025')]) })
.safeParse(err)
if (parsed.success) {
switch (parsed.data.code) {
case 'P2002':
code = 'UniqueConstraintError'
break
case 'P2025':
code = 'NotFound'
break
default:
exhaustiveCheck(parsed.data.code)
}
}
}
}

if (code !== '') {
const timeout = Math.min(MAX_RETRY_TIMEOUT, Math.pow(2, attempt) * 100)
logger().trace(
{ name, ownerId, attempt, code, timeout },
'Lock is already acquired. Retrying.'
if (code !== '') {
logger().trace(
{
name,
ownerId,
attempt,
code,
retryTimeout: RETRY_TIMEOUT,
channel,
},
`Lock is already acquired. Retrying in ${RETRY_TIMEOUT}.`
)
if (timeout) {
clearTimeout(timeout)
}
timeout = setTimeout(tryAcquire, RETRY_TIMEOUT)
return
}

logger().error(
{ name, ownerId, channel, err },
'Failed to acquire lock'
)
await new Promise((resolve) => setTimeout(resolve, timeout))
return inner(attempt + 1)
reject(err)
return
}

logger().error({ name, ownerId, err }, 'Failed to acquire lock')
throw err
}

const extendExpirationInterval = setInterval(async () => {
await prisma().lock.updateMany({
where: {
name,
ownerId,
},
data: {
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
},
})
}, EXPIRATION_TIME / 3)
const extendExpirationInterval = setInterval(async () => {
await prisma().lock.updateMany({
where: {
name,
ownerId,
},
data: {
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
},
})
}, EXPIRATION_TIME / 3)

logger().debug({ name, ownerId }, 'Lock acquired')
logger().debug({ name, ownerId, channel }, 'Lock acquired')
acquired = true
await cleanSubscription()

try {
return await cb()
} finally {
logger().trace({ name, ownerId }, 'Releasing lock')
clearInterval(extendExpirationInterval)
await prisma().lock.updateMany({
where: {
name,
ownerId,
},
data: {
isLocked: false,
},
})
logger().debug({ name, ownerId }, 'Lock released')
try {
resolve(await cb())
} catch (err) {
reject(err)
} finally {
logger().trace({ name, ownerId, channel }, 'Releasing lock')
clearInterval(extendExpirationInterval)
await prisma().lock.updateMany({
where: {
name,
ownerId,
},
data: {
isLocked: false,
},
})
await publish(channel, name)
logger().debug({ name, ownerId, channel }, 'Lock released')
}
}
}

return inner(0)
tryAcquire()
})
}
54 changes: 25 additions & 29 deletions apps/api/src/schedule/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,40 +201,36 @@ export async function runSchedule(socketServer: IOServer) {
let stop = false
const loop = new Promise<void>(async (resolve) => {
logger().trace('Acquiring lock to be the schedule executor')
await acquireLock(
'schedule-executor',
async () => {
await acquireLock('schedule-executor', async () => {
if (stop) {
logger().trace(
'Schedule executor lock acquired but server is shutting down'
)
return
}

logger().trace('Schedule executor lock acquired')
while (true) {
if (stop) {
logger().trace(
'Schedule executor lock acquired but server is shutting down'
)
return
break
}

logger().trace('Schedule executor lock acquired')
while (true) {
if (stop) {
break
}

try {
await updateSchedule()
} catch (err) {
logger().error(
{ err, module: 'schedule' },
'Failed to update schedule'
)
}

if (stop) {
break
}
try {
await updateSchedule()
} catch (err) {
logger().error(
{ err, module: 'schedule' },
'Failed to update schedule'
)
}

await new Promise((resolve) => setTimeout(resolve, 5000))
if (stop) {
break
}
},
{ acquireTimeout: Infinity }
)

await new Promise((resolve) => setTimeout(resolve, 5000))
}
})

resolve()
})
Expand Down
3 changes: 1 addition & 2 deletions apps/api/src/yjs/v2/executor/ai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ export class AIExecutor {
}

tick()
}),
{ acquireTimeout: Infinity }
})
)
} catch (err) {
logger().error(
Expand Down
3 changes: 1 addition & 2 deletions apps/api/src/yjs/v2/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ export class Executor {
}

tick()
}),
{ acquireTimeout: Infinity }
})
)
} catch (err) {
logger().error(
Expand Down