Skip to content

Commit

Permalink
introduce a lock mechanism that ensures only one executor running at …
Browse files Browse the repository at this point in the history
…a time
  • Loading branch information
vieiralucas committed Nov 22, 2024
1 parent 14d2ef5 commit b562d40
Show file tree
Hide file tree
Showing 12 changed files with 393 additions and 506 deletions.
132 changes: 43 additions & 89 deletions apps/api/src/lock.ts
Original file line number Diff line number Diff line change
@@ -1,101 +1,55 @@
import prisma from '@briefer/database'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { parse as parseUUID } from 'uuid'
import prisma, { getPGInstance } from '@briefer/database'
import { logger } from './logger.js'

const INTERVAL_MS = 5000
// PostgreSQL advisory lock key is a 64-bit integer
//
// However we use UUIDs as identifiers in our database and JavaScript
// only supports 53-bit integers
//
// So, we convert UUIDs to two 32-bit integers since PostgreSQL advisory
// lock can accept two 32-bit integers as a key
function uuidToPGLockKey(uuid: string) {
const binaryId: Uint8Array = parseUUID(uuid)
const view = new DataView(binaryId.buffer)

// first 32 bits
const fst = view.getInt32(0)

// last 32 bits
const snd = view.getInt32(4)

return [fst, snd]
}

export async function acquireLock<T>(
name: string,
cb: () => Promise<T>,
expirationTimeMs: number = 30000
cb: () => Promise<T>
): Promise<T> {
const now = new Date()
const expiresAt = new Date(now.getTime() + expirationTimeMs)
const ownerId = uuidv4()
const { pgClient } = await getPGInstance()

let interval: NodeJS.Timeout | null = null
try {
let attempt = 1
while (true) {
logger().debug({ name, ownerId, attempt }, 'attempting to acquire lock')
try {
await prisma().lock.upsert({
where: {
name,
OR: [
{ isLocked: false },
{
expiresAt: {
lte: now,
},
},
],
},
update: {
isLocked: true,
acquiredAt: now,
expiresAt,
ownerId,
},
create: {
name,
isLocked: true,
acquiredAt: now,
expiresAt: expiresAt,
ownerId,
},
})
logger().debug({ name, ownerId }, 'lock acquired')
const lock = await prisma().lock.upsert({
where: {
name,
},
update: {},
create: { name },
})

interval = setInterval(async () => {
logger().debug({ name, ownerId }, 'incrementing lock expiration time')
await prisma().lock.update({
where: {
name,
ownerId,
},
data: {
expiresAt: new Date(new Date().getTime() + INTERVAL_MS),
},
})
}, INTERVAL_MS)
const [fst, snd] = uuidToPGLockKey(lock.id)

const r = await cb()
clearInterval(interval)
return r
} catch (err) {
// catch unique constraint violation
if (z.object({ code: z.literal('P2002') }).safeParse(err).success) {
logger().debug({ name, ownerId }, 'lock already acquired, retrying')
await new Promise((resolve) => setTimeout(resolve, 200))
attempt++
continue
}
try {
// acquire lock
logger().trace({ name, fst, snd }, 'Acquiring lock')
await pgClient.query('SELECT pg_advisory_lock($1, $2)', [fst, snd])
logger().trace({ name, fst, snd }, 'Lock acquired')

if (!interval) {
logger().error(
{
name,
ownerId,
err,
},
'error acquiring lock'
)
}
throw err
}
}
// run callback
return await cb()
} finally {
logger().debug({ name, ownerId }, 'releasing lock')
if (interval) {
clearInterval(interval)
}
await prisma().lock.deleteMany({
where: {
name,
ownerId,
},
})
logger().debug({ name, ownerId }, 'lock released')
// release lock
logger().trace({ name, fst, snd }, 'Releasing lock')
await pgClient.query('SELECT pg_advisory_unlock($1, $2)', [fst, snd])
logger().trace({ name, fst, snd }, 'Lock released')
}
}
4 changes: 1 addition & 3 deletions apps/api/src/schedule/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,7 @@ async function executeNotebook(
await updateAppState(ydoc, app, socketServer)
return false
},
app
? new AppPersistor(app.id, null) // user is null when running a schedule
: new DocumentPersistor(doc.id)
new AppPersistor(app.id, null) // user is null when running a schedule
)

if (emptyLayout) {
Expand Down
4 changes: 4 additions & 0 deletions apps/api/src/websocket/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ export async function createSocketServer(server: http.Server): Promise<Server> {
})
})

io.on('error', (err) => {
logger().error({ err }, 'Socket server error occurred')
})

return {
io: io,
shutdown: async () => {
Expand Down
37 changes: 18 additions & 19 deletions apps/api/src/yjs/v2/documents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,23 +183,22 @@ export async function updateAppState(
app: YjsAppDocument,
socketServer: IOServer
) {
const state = Y.encodeStateAsUpdate(ydoc.ydoc)

const usersApps = await prisma().userYjsAppDocument.findMany({
where: { yjsAppDocumentId: app.id },
select: { userId: true },
})

await Promise.all(
usersApps.map(async (userApp) =>
getYDocForUpdate(
getDocId(app.documentId, { id: app.id, userId: userApp.userId }),
socketServer,
app.documentId,
ydoc.workspaceId,
(ydoc) => ydoc.replaceState(state),
new AppPersistor(app.id, userApp.userId)
)
)
)
// TODO
// const state = Y.encodeStateAsUpdate(ydoc.ydoc)
// const usersApps = await prisma().userYjsAppDocument.findMany({
// where: { yjsAppDocumentId: app.id },
// select: { userId: true },
// })
// await Promise.all(
// usersApps.map(async (userApp) =>
// getYDocForUpdate(
// getDocId(app.documentId, { id: app.id, userId: userApp.userId }),
// socketServer,
// app.documentId,
// ydoc.workspaceId,
// (ydoc) => ydoc.replaceState(state),
// new AppPersistor(app.id, userApp.userId)
// )
// )
// )
}
103 changes: 93 additions & 10 deletions apps/api/src/yjs/v2/executor/ai/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { v4 as uuidv4 } from 'uuid'
import * as Y from 'yjs'
import PQueue from 'p-queue'
import {
Expand All @@ -17,6 +18,7 @@ import { AISQLExecutor, ISQLAIExecutor } from './sql.js'
import { ApiUser, getUserById } from '@briefer/database'
import { unknownUser } from '../executor.js'
import { UserNotebookEvents } from '../../../../events/user.js'
import { acquireLock } from '../../../../lock.js'

class UnexpectedBlockTypeError extends Error {
constructor(
Expand All @@ -37,11 +39,13 @@ class BlockNotFoundError extends Error {
}

export class AIExecutor {
private readonly id = uuidv4()
private isRunning = false
private readonly pQueue = new PQueue({ concurrency: 4 })
private timeout: NodeJS.Timeout | null = null

private constructor(
private readonly docId: string,
private readonly workspaceId: string,
private readonly documentId: string,
private readonly tasks: AITasks,
Expand All @@ -51,8 +55,8 @@ export class AIExecutor {
) {}

public start() {
this.execute()
this.isRunning = true
this.execute()
}

public async stop(): Promise<void> {
Expand All @@ -66,17 +70,95 @@ export class AIExecutor {
}

private async execute() {
await this.pQueue.onSizeLessThan(this.pQueue.concurrency)
try {
logger().debug(
{
id: this.id,
docId: this.docId,
workspaceId: this.workspaceId,
documentId: this.documentId,
},
'Acquiring AI tasks lock'
)

const next = this.tasks.next()
let timeout = 500
if (next) {
this.pQueue.add(() => this.executeItem(next))
timeout = 0
}
await acquireLock(
`ai-tasks:${this.docId}`,
() =>
new Promise<void>(async (resolve, reject) => {
if (!this.isRunning) {
logger().debug(
{
port: process.env['PORT'],
id: this.id,
docId: this.docId,
workspaceId: this.workspaceId,
documentId: this.documentId,
},
'AI tasks lock acquired but executor is not running anymore. Exiting.'
)
resolve()
return
}

logger().debug(
{
id: this.id,
docId: this.docId,
workspaceId: this.workspaceId,
documentId: this.documentId,
},
'AI tasks lock acquired. Executing AI tasks'
)

const tick = async () => {
try {
if (!this.isRunning) {
logger().debug(
{
port: process.env['PORT'],
id: this.id,
docId: this.docId,
workspaceId: this.workspaceId,
documentId: this.documentId,
},
'AI Tasks is not running. Stopping consumer loop.'
)
resolve()
return
}

if (this.isRunning) {
this.timeout = setTimeout(() => this.execute(), timeout)
await this.pQueue.onSizeLessThan(this.pQueue.concurrency)

const next = this.tasks.next()
let timeout = 500
if (next) {
this.pQueue.add(() => this.executeItem(next))
timeout = 0
}

if (this.isRunning) {
this.timeout = setTimeout(() => tick(), timeout)
}
} catch (err) {
reject(err)
}
}

tick()
})
)
} catch (err) {
logger().error(
{
id: this.id,
docId: this.docId,
workspaceId: this.workspaceId,
documentId: this.documentId,
err,
},
'Unexpected error while executing AI tasks. Retrying in 2 seconds'
)
setTimeout(() => this.execute(), 2000)
}
}

Expand Down Expand Up @@ -203,6 +285,7 @@ export class AIExecutor {

public static fromWSSharedV2(doc: WSSharedDocV2): AIExecutor {
return new AIExecutor(
doc.id,
doc.workspaceId,
doc.documentId,
AITasks.fromYjs(doc.ydoc),
Expand Down
Loading

0 comments on commit b562d40

Please sign in to comment.