Skip to content

Commit

Permalink
introduce acquire timeout to locks
Browse files Browse the repository at this point in the history
  • Loading branch information
vieiralucas committed Nov 28, 2024
1 parent 68ee779 commit b741dc1
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 63 deletions.
163 changes: 140 additions & 23 deletions apps/api/src/lock.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,160 @@
import prisma, { getPGInstance } from '@briefer/database'
import { v4 as uuidv4 } from 'uuid'
import prisma from '@briefer/database'
import { logger } from './logger.js'
import { z } from 'zod'
import { exhaustiveCheck } from '@briefer/types'

const EXPIRATION_TIME = 1000 * 5 // 5 seconds
const MAX_RETRY_TIMEOUT = 1000 * 2 // 2 seconds
const DEFAULT_ACQUIRE_TIMEOUT = 1000 * 10 // 10 seconds

class AlreadyAcquiredError extends Error {
constructor(public readonly lockName: string) {
super(`Lock ${lockName} is already acquired.`)
this.name = 'AlreadyAcquiredError'
}
}

export class AcquireLockTimeoutError extends Error {
constructor(
public readonly name: string,
public readonly ownerId: string,
public readonly startTime: number,
public readonly acquireTimeout: number,
public readonly attempt: number
) {
super(
`Failed to acquire lock ${name} with ownerId ${ownerId} after ${acquireTimeout}ms and ${attempt} attempts.`
)
this.name = 'AcquireLockTimeoutError'
}
}

export async function acquireLock<T>(
name: string,
cb: () => Promise<T>
cb: () => Promise<T>,
{ acquireTimeout = DEFAULT_ACQUIRE_TIMEOUT }: { acquireTimeout?: number } = {}
): Promise<T> {
const { pool } = await getPGInstance()
const startTime = Date.now()
const ownerId = uuidv4()

const inner = async (attempt: number): Promise<T> => {
if (Date.now() - startTime > acquireTimeout) {
throw new AcquireLockTimeoutError(
name,
ownerId,
startTime,
acquireTimeout,
attempt
)
}

let lockId = BigInt(-1)
while (true) {
logger().trace({ name, ownerId, attempt }, 'Acquiring lock')
try {
const lock = await prisma().lock2.upsert({
const lock = await prisma().lock.findFirst({
where: {
name,
},
update: {},
create: { name },
})
lockId = lock.id

// acquire lock
logger().trace({ name, id: lock.id }, 'Acquiring lock')
await pool.query('SELECT pg_advisory_lock($1)', [lock.id])
logger().trace({ name, id: lock.id }, 'Lock acquired')

// run callback
return await cb()
if (!lock) {
// this is safe because if someone else creates the lock in the meantime
// this will raise a unique constraint error that we catch below to retry
await prisma().lock.create({
data: {
name,
isLocked: true,
ownerId,
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
acquiredAt: new Date(),
},
})
} else if (!lock.isLocked || lock.expiresAt < new Date()) {
// this is safe because if someone else updates the lock in the meantime
// this will fail to find the lock to update because expiresAt will be changed
// that will raise a not found error that we catch below to retry
await prisma().lock.update({
where: {
id: lock.id,
expiresAt: lock.expiresAt,
},
data: {
isLocked: true,
ownerId,
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
acquiredAt: new Date(),
},
})
} else {
// lock is already acquired
throw new AlreadyAcquiredError(name)
}
} catch (err) {
if (z.object({ code: z.literal('P2002') }).safeParse(err).success) {
continue
let code = ''
if (err instanceof AlreadyAcquiredError) {
code = 'AlreadyAcquiredError'
} else {
const parsed = z
.object({ code: z.union([z.literal('P2002'), z.literal('P2025')]) })
.safeParse(err)
if (parsed.success) {
switch (parsed.data.code) {
case 'P2002':
code = 'UniqueConstraintError'
break
case 'P2025':
code = 'NotFound'
break
default:
exhaustiveCheck(parsed.data.code)
}
}
}

if (code !== '') {
const timeout = Math.min(MAX_RETRY_TIMEOUT, Math.pow(2, attempt) * 100)
logger().trace(
{ name, ownerId, attempt, code, timeout },
'Lock is already acquired. Retrying.'
)
await new Promise((resolve) => setTimeout(resolve, timeout))
return inner(attempt + 1)
}

logger().error({ name, ownerId, err }, 'Failed to acquire lock')
throw err
}

const extendExpirationInterval = setInterval(async () => {
await prisma().lock.updateMany({
where: {
name,
ownerId,
},
data: {
expiresAt: new Date(Date.now() + EXPIRATION_TIME),
},
})
}, EXPIRATION_TIME / 3)

logger().debug({ name, ownerId }, 'Lock acquired')

try {
return await cb()
} finally {
// release lock
logger().trace({ name, id: lockId }, 'Releasing lock')
await pool.query('SELECT pg_advisory_unlock($1)', [lockId])
logger().trace({ name, id: lockId }, 'Lock released')
logger().trace({ name, ownerId }, 'Releasing lock')
clearInterval(extendExpirationInterval)
await prisma().lock.updateMany({
where: {
name,
ownerId,
},
data: {
isLocked: false,
},
})
logger().debug({ name, ownerId }, 'Lock released')
}
}

return inner(0)
}
54 changes: 29 additions & 25 deletions apps/api/src/schedule/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,36 +201,40 @@ export async function runSchedule(socketServer: IOServer) {
let stop = false
const loop = new Promise<void>(async (resolve) => {
logger().trace('Acquiring lock to be the schedule executor')
await acquireLock('schedule-executor', async () => {
if (stop) {
logger().trace(
'Schedule executor lock acquired but server is shutting down'
)
return
}

logger().trace('Schedule executor lock acquired')
while (true) {
await acquireLock(
'schedule-executor',
async () => {
if (stop) {
break
}

try {
await updateSchedule()
} catch (err) {
logger().error(
{ err, module: 'schedule' },
'Failed to update schedule'
logger().trace(
'Schedule executor lock acquired but server is shutting down'
)
return
}

if (stop) {
break
}
logger().trace('Schedule executor lock acquired')
while (true) {
if (stop) {
break
}

await new Promise((resolve) => setTimeout(resolve, 5000))
}
})
try {
await updateSchedule()
} catch (err) {
logger().error(
{ err, module: 'schedule' },
'Failed to update schedule'
)
}

if (stop) {
break
}

await new Promise((resolve) => setTimeout(resolve, 5000))
}
},
{ acquireTimeout: Infinity }
)

resolve()
})
Expand Down
3 changes: 2 additions & 1 deletion apps/api/src/yjs/v2/executor/ai/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ export class AIExecutor {
}

tick()
})
}),
{ acquireTimeout: Infinity }
)
} catch (err) {
logger().error(
Expand Down
13 changes: 4 additions & 9 deletions apps/api/src/yjs/v2/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ export function unknownUser(): ApiUser {
export class Executor {
private readonly id = uuidv4()
private isRunning: boolean = false
private timeout: NodeJS.Timeout | null = null
private currentExecution: Promise<void> | null = null
private readonly queue: ExecutionQueue

Expand Down Expand Up @@ -104,11 +103,6 @@ export class Executor {

public async stop(): Promise<void> {
this.isRunning = false

if (this.timeout) {
clearTimeout(this.timeout)
}

await this.currentExecution
}

Expand Down Expand Up @@ -174,7 +168,7 @@ export class Executor {

const currentBatch = this.queue.getCurrentBatch()
if (!currentBatch) {
this.timeout = setTimeout(() => tick(), 500)
setTimeout(() => tick(), 500)
return
}

Expand All @@ -194,15 +188,16 @@ export class Executor {
await this.currentExecution
this.currentExecution = null
if (this.isRunning) {
this.timeout = setTimeout(() => tick(), 0)
setTimeout(() => tick(), 0)
}
} catch (err) {
reject(err)
}
}

tick()
})
}),
{ acquireTimeout: Infinity }
)
} catch (err) {
logger().error(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
Warnings:
- You are about to drop the `Lock2` table. If the table is not empty, all the data it contains will be lost.
*/
-- DropTable
DROP TABLE "Lock2";
5 changes: 0 additions & 5 deletions packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -568,11 +568,6 @@ model Lock {
ownerId String @db.Uuid
}

model Lock2 {
id BigInt @id @default(autoincrement())
name String @unique
}

model WorkspaceSecrets {
id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
openAiApiKey String?
Expand Down

0 comments on commit b741dc1

Please sign in to comment.