Skip to content

Commit

Permalink
fix persistor race coditions
Browse files Browse the repository at this point in the history
  • Loading branch information
vieiralucas committed Nov 27, 2024
1 parent 6960369 commit 68ee779
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 128 deletions.
49 changes: 30 additions & 19 deletions apps/api/src/lock.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,43 @@
import prisma, { getPGInstance } from '@briefer/database'
import { logger } from './logger.js'
import { z } from 'zod'

export async function acquireLock<T>(
name: string,
cb: () => Promise<T>
): Promise<T> {
const { pool } = await getPGInstance()

const lock = await prisma().lock2.upsert({
where: {
name,
},
update: {},
create: { name },
})
let lockId = BigInt(-1)
while (true) {
try {
const lock = await prisma().lock2.upsert({
where: {
name,
},
update: {},
create: { name },
})
lockId = lock.id

try {
// acquire lock
logger().trace({ name, id: lock.id }, 'Acquiring lock')
await pool.query('SELECT pg_advisory_lock($1)', [lock.id])
logger().trace({ name, id: lock.id }, 'Lock acquired')
// acquire lock
logger().trace({ name, id: lock.id }, 'Acquiring lock')
await pool.query('SELECT pg_advisory_lock($1)', [lock.id])
logger().trace({ name, id: lock.id }, 'Lock acquired')

// run callback
return await cb()
} finally {
// release lock
logger().trace({ name, id: lock.id }, 'Releasing lock')
await pool.query('SELECT pg_advisory_unlock($1)', [lock.id])
logger().trace({ name, id: lock.id }, 'Lock released')
// run callback
return await cb()
} catch (err) {
if (z.object({ code: z.literal('P2002') }).safeParse(err).success) {
continue
}

throw err
} finally {
// release lock
logger().trace({ name, id: lockId }, 'Releasing lock')
await pool.query('SELECT pg_advisory_unlock($1)', [lockId])
logger().trace({ name, id: lockId }, 'Lock released')
}
}
}
24 changes: 11 additions & 13 deletions apps/api/src/yjs/v2/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ export function setupYJSSocketServerV2(
const wsReadyStateConnecting = 0
const wsReadyStateOpen = 1

const DOCUMENT_COLLECTION_INTERVAL = 1000 * 60 // 1 minute
const DOCUMENT_COLLECTION_INTERVAL = 1000 * 20 // 20 seconds
export const docs = new Map<string, WSSharedDocV2>()

export const docsCache = new LRUCache<string, WSSharedDocV2>({
Expand Down Expand Up @@ -411,17 +411,15 @@ export class WSSharedDocV2 {
}

public async init() {
await acquireLock(`yjs-${this.documentId}`, async () => {
this.subscription = await subscribe(
this.getPubSubChannel(),
this.onSubMessage
)
this.subscription = await subscribe(
this.getPubSubChannel(),
this.onSubMessage
)

this.ydoc.on('update', this.updateHandler)
this.awareness.on('update', this.awarenessHandler)
this.executor.start()
this.aiExecutor.start()
})
this.ydoc.on('update', this.updateHandler)
this.awareness.on('update', this.awarenessHandler)
this.executor.start()
this.aiExecutor.start()
}

private onSubMessage = async (message?: string) => {
Expand Down Expand Up @@ -507,7 +505,7 @@ export class WSSharedDocV2 {
}

public async replaceState(state: Buffer) {
const result = await this.persistor.replaceState(state)
const result = await this.persistor.replaceState(this.id, state)
this.reset(result.ydoc, result.clock, result.byteLength)
const updateId = await this.persistor.persistUpdate(this, state)
await publish(
Expand Down Expand Up @@ -832,7 +830,7 @@ export class WSSharedDocV2 {
persistor: Persistor,
tx?: PrismaTransaction
): Promise<WSSharedDocV2> {
const loadStateResult = await persistor.load(tx)
const loadStateResult = await persistor.load(id, tx)
const doc = new WSSharedDocV2(
id,
documentId,
Expand Down
193 changes: 97 additions & 96 deletions apps/api/src/yjs/v2/persistors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ export type LoadStateResult = {
byteLength: number
}
export interface Persistor {
load: (tx?: PrismaTransaction) => Promise<LoadStateResult>
load: (id: string, tx?: PrismaTransaction) => Promise<LoadStateResult>
persistUpdate: (ydoc: WSSharedDocV2, update: Uint8Array) => Promise<string>
canWrite: (
decoder: decoding.Decoder,
doc: WSSharedDocV2,
transactionOrigin: TransactionOrigin
) => boolean
replaceState: (newState: Buffer) => Promise<LoadStateResult>
replaceState: (id: string, newState: Buffer) => Promise<LoadStateResult>
}

export class DocumentPersistor implements Persistor {
Expand All @@ -56,9 +56,9 @@ export class DocumentPersistor implements Persistor {
Y.applyUpdate(ydoc, update)
}

public async load(tx?: PrismaTransaction) {
public async load(id: string, tx?: PrismaTransaction) {
try {
return acquireLock(`document:${this.documentId}`, async () => {
return acquireLock(`document-persistor:${id}`, async () => {
const ydoc = new Y.Doc()
const dbDoc = await (tx ?? prisma()).yjsDocument.findUnique({
where: { documentId: this.documentId },
Expand Down Expand Up @@ -133,13 +133,13 @@ export class DocumentPersistor implements Persistor {
}

public async persistUpdate(doc: WSSharedDocV2, update: Uint8Array) {
let yjsDoc = await prisma().yjsDocument.findUnique({
select: { id: true, clock: true },
where: { documentId: this.documentId },
})
if (!yjsDoc) {
yjsDoc = await acquireLock(`document:${this.documentId}`, () =>
prisma().yjsDocument.upsert({
return acquireLock(`document-persistor:${doc.id}`, async () => {
let yjsDoc = await prisma().yjsDocument.findUnique({
select: { id: true, clock: true },
where: { documentId: this.documentId },
})
if (!yjsDoc) {
yjsDoc = await prisma().yjsDocument.upsert({
where: { documentId: this.documentId },
create: {
documentId: this.documentId,
Expand All @@ -148,19 +148,19 @@ export class DocumentPersistor implements Persistor {
update: {},
select: { id: true, clock: true },
})
)
}
}

const { id } = await prisma().yjsUpdate.create({
data: {
yjsDocumentId: yjsDoc.id,
update: Buffer.from(update),
clock: yjsDoc.clock,
},
select: { id: true },
})
const { id } = await prisma().yjsUpdate.create({
data: {
yjsDocumentId: yjsDoc.id,
update: Buffer.from(update),
clock: yjsDoc.clock,
},
select: { id: true },
})

return id
return id
})
}

public canWrite(
Expand All @@ -177,25 +177,27 @@ export class DocumentPersistor implements Persistor {
}
}

public async replaceState(newState: Buffer) {
const clock = await prisma().yjsDocument.update({
where: { documentId: this.documentId },
data: {
state: newState,
clock: {
increment: 1,
public async replaceState(id: string, newState: Buffer) {
return acquireLock(`document-persistor:${id}`, async () => {
const clock = await prisma().yjsDocument.update({
where: { documentId: this.documentId },
data: {
state: newState,
clock: {
increment: 1,
},
},
},
})
})

const ydoc = new Y.Doc()
this.applyUpdate(ydoc, newState)
const ydoc = new Y.Doc()
this.applyUpdate(ydoc, newState)

return {
ydoc,
clock: clock.clock,
byteLength: newState.length,
}
return {
ydoc,
clock: clock.clock,
byteLength: newState.length,
}
})
}
}

Expand All @@ -210,9 +212,9 @@ export class AppPersistor implements Persistor {
Y.applyUpdate(ydoc, update)
}

public async load(tx?: PrismaTransaction | undefined) {
public async load(id: string, tx?: PrismaTransaction | undefined) {
try {
return acquireLock(`app:${this.yjsAppDocumentId}`, async () => {
return acquireLock(`app-persistor:${id}`, async () => {
const bind = async (tx: PrismaTransaction) => {
const yjsAppDoc = await tx.yjsAppDocument.findFirstOrThrow({
where: {
Expand Down Expand Up @@ -373,64 +375,79 @@ export class AppPersistor implements Persistor {
}

public async persistUpdate(doc: WSSharedDocV2, update: Uint8Array) {
if (this.userId) {
await prisma().userYjsAppDocument.upsert({
where: {
yjsAppDocumentId_userId: {
return acquireLock(`app-persistor:${doc.id}`, async () => {
if (this.userId) {
await prisma().userYjsAppDocument.upsert({
where: {
yjsAppDocumentId_userId: {
yjsAppDocumentId: this.yjsAppDocumentId,
userId: this.userId,
},
},
create: {
yjsAppDocumentId: this.yjsAppDocumentId,
userId: this.userId,
state: Buffer.from(Y.encodeStateAsUpdate(doc.ydoc)),
clock: doc.clock,
},
},
create: {
yjsAppDocumentId: this.yjsAppDocumentId,
userId: this.userId,
state: Buffer.from(Y.encodeStateAsUpdate(doc.ydoc)),
},
update: {},
})
update: {},
})

const { id } = await prisma().yjsUpdate.create({
data: {
userYjsAppDocumentUserId: this.userId,
userYjsAppDocumentYjsAppDocumentId: this.yjsAppDocumentId,
update: Buffer.from(update),
clock: doc.clock,
},
select: { id: true },
})
return id
}

const { id } = await prisma().yjsUpdate.create({
data: {
userYjsAppDocumentUserId: this.userId,
userYjsAppDocumentYjsAppDocumentId: this.yjsAppDocumentId,
yjsAppDocumentId: this.yjsAppDocumentId,
update: Buffer.from(update),
clock: doc.clock,
},
select: { id: true },
})
return id
}

const { id } = await prisma().yjsUpdate.create({
data: {
yjsAppDocumentId: this.yjsAppDocumentId,
update: Buffer.from(update),
clock: doc.clock,
},
select: { id: true },
})
return id
}

public async replaceState(newState: Buffer) {
return acquireLock(`app:${this.yjsAppDocumentId}`, () =>
this._replaceState(newState)
)
}

private async _replaceState(newState: Buffer) {
const ydoc = new Y.Doc()
public async replaceState(id: string, newState: Buffer) {
return acquireLock(`app-persistor:${id}`, async () => {
const ydoc = new Y.Doc()

this.applyUpdate(ydoc, newState)
this.applyUpdate(ydoc, newState)

if (this.userId) {
const clock = await prisma().userYjsAppDocument.update({
where: {
yjsAppDocumentId_userId: {
yjsAppDocumentId: this.yjsAppDocumentId,
userId: this.userId,
if (this.userId) {
const clock = await prisma().userYjsAppDocument.update({
where: {
yjsAppDocumentId_userId: {
yjsAppDocumentId: this.yjsAppDocumentId,
userId: this.userId,
},
},
},
data: {
state: newState,
clock: {
increment: 1,
},
},
})

return {
ydoc,
clock: clock.clock,
byteLength: newState.length,
}
}

const clock = await prisma().yjsAppDocument.update({
where: { id: this.yjsAppDocumentId },
data: {
state: newState,
clock: {
Expand All @@ -444,23 +461,7 @@ export class AppPersistor implements Persistor {
clock: clock.clock,
byteLength: newState.length,
}
}

const clock = await prisma().yjsAppDocument.update({
where: { id: this.yjsAppDocumentId },
data: {
state: newState,
clock: {
increment: 1,
},
},
})

return {
ydoc,
clock: clock.clock,
byteLength: newState.length,
}
}

public canWrite(
Expand Down

0 comments on commit 68ee779

Please sign in to comment.