Skip to content

Commit

Permalink
fix(cronjob): missing prepare and wrong uid compute
Browse files Browse the repository at this point in the history
  • Loading branch information
climba03003 committed Oct 17, 2024
1 parent 75e7038 commit 9d366fa
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 210 deletions.
3 changes: 3 additions & 0 deletions packages/cronjob/lib/adapter/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ import { type CreateTask, type Task } from '../cronjob'
import { kAdapter } from '../symbols'

export interface AdapterOptions {
resetOnInit?: boolean
}

export class Adapter {
readonly #options: AdapterOptions
applicationName: string

constructor (options: AdapterOptions) {
this.#options = options
this.applicationName = ''
}

static [kAdapter]: any = true
Expand Down
71 changes: 39 additions & 32 deletions packages/cronjob/lib/adapter/mongodb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Adapter, type AdapterOptions } from './adapter'

export interface MongoDBAdapterOptions extends AdapterOptions {
db: Db
name?: string
}

async function ensureIndex (collection: Collection, indexSpec: IndexSpecification, options: CreateIndexesOptions): Promise<void> {
Expand All @@ -13,23 +14,34 @@ async function ensureIndex (collection: Collection, indexSpec: IndexSpecificatio
}
export class MongoDBAdapter extends Adapter {
db: Db
name: string
collection: Collection
collectionLock: Collection
resetOnInit: boolean

constructor (options: MongoDBAdapterOptions) {
super(options)
this.db = options.db
this.collection = this.db.collection('__cron__.task')
this.collectionLock = this.db.collection('__cron__.lock')
this.name = options.name ?? 'tasks'
this.collection = this.db.collection(`${this.name}.task`)
this.collectionLock = this.db.collection(`${this.name}.lock`)
this.resetOnInit = options.resetOnInit ?? false
}

async prepare (): Promise<void> {
await ensureIndex(this.collection, { uid: 1 }, { unique: true, background: false })
await ensureIndex(this.collection, { uid: 1, isDeleted: 1 }, { unique: false, background: false })
await ensureIndex(this.collection, { executeAt: 1 }, { unique: false, background: false })
// we use all settled for error-free control
await Promise.allSettled([
ensureIndex(this.collection, { uid: 1 }, { unique: true, background: false }),
ensureIndex(this.collection, { uid: 1, isDeleted: 1 }, { unique: false, background: false }),
ensureIndex(this.collection, { executeAt: 1 }, { unique: false, background: false }),

await ensureIndex(this.collectionLock, { expireAt: 1 }, { unique: false, expireAfterSeconds: 1, background: false })
await ensureIndex(this.collectionLock, { application: 1 }, { unique: true, background: false })
ensureIndex(this.collectionLock, { expireAt: 1 }, { unique: false, expireAfterSeconds: 1, background: false }),
ensureIndex(this.collectionLock, { application: 1 }, { unique: true, background: false })
])
if (this.resetOnInit) {
this.collection.deleteMany({ once: false }).catch(() => {})
this.collectionLock.deleteMany({ application: this.applicationName }).catch(() => {})
}
}

async fetchTasks (executeAt: number): Promise<Task[]> {
Expand All @@ -42,32 +54,20 @@ export class MongoDBAdapter extends Adapter {
}

async createTask (task: CreateTask): Promise<void> {
const _task = await this.collection.findOne<Task>({ uid: task.uid })
const executeAt = Date.now() + task.delay
if (_task === null) {
await this.collection.insertOne({
uid: task.uid,
once: task.once ?? false,
await this.collection.updateOne({
uid: task.uid
}, {
$set: {
delay: task.delay,
executeAt,
isDeleted: false,
})
} else {
const $set: any = { isDeleted: false }
if (_task.delay !== task.delay) {
$set.delay = task.delay
}
if (_task.executeAt !== executeAt) {
$set.executeAt = executeAt
}
if ($set !== null) {
await this.collection.updateOne({
uid: task.uid,
}, {
$set,
})
},
$setOnInsert: {
uid: task.uid,
once: task.once ?? false,
}
}
}, { upsert: true })
}

async updateTasks (uids: string[], executeAt: number): Promise<void> {
Expand Down Expand Up @@ -99,10 +99,17 @@ export class MongoDBAdapter extends Adapter {
}

async aquireLock (name: string, expireAt: number): Promise<boolean> {
const result = await this.collectionLock.findOne({ application: name })
if (result !== null) return false
await this.collectionLock.insertOne({ application: name, expireAt })
return true
const { acknowledged, upsertedId } = await this.collectionLock.updateOne({
application: name
}, {
$set: { application: name },
$setOnInsert: { expireAt }
}, {
upsert: true
})
// when upsertedId is non-null, it means the document is inserted
// when upsertedId is null, it means the document already exists
return acknowledged && upsertedId !== null
}

async releaseLock (name: string): Promise<void> {
Expand Down
42 changes: 22 additions & 20 deletions packages/cronjob/lib/cronjob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import EventEmitter from 'events'
import { type Adapter } from './adapter/adapter'

type _TaskExecutor = () => Promise<void>
export type TaskExecutor<Context = unknown> = (context: Context) => Promise<void>
export type TaskExecutor<Context = unknown> = (context: Context) => void | Promise<void>

export interface Task {
uid: string
Expand Down Expand Up @@ -54,11 +54,13 @@ export class CronJob<RootContext = unknown> extends EventEmitter {
this.isDestroyed = false
this.isLocked = false
this.adapter = options.adapter
this.adapter.applicationName = this.application
this.#context = options.context as any

this.minTickMS = options?.minTickMS ?? 128
this.maxTickMS = options?.maxTickMS ?? 768
this.maxExecutionMS = options?.maxExecutionMS ?? 900_000
this.adapter.prepare()
this.#tick()
}

Expand Down Expand Up @@ -151,22 +153,20 @@ export class CronJob<RootContext = unknown> extends EventEmitter {
): Promise<string> {
const nextExecuteAt = Number(parseExpression(cron).next().toDate())
const ms = nextExecuteAt - Date.now()
const _uid = `timeout-cron-${uid}`
if (fresh) this.#deleted[_uid] = false
if (this.#deleted[_uid]) return ''
!uid.startsWith('timeout-cron-') && (uid = `timeout-cron-${uid}`)
if (fresh) this.#deleted[uid] = false
if (this.#deleted[uid]) return ''

return await this.setTimeout(async (context) => {
if (this.#deleted[_uid]) return
setImmediate(() => {
// we execute immediately for the next task
Promise.race([
executor(context),
this.#setCronJob(executor, cron, uid, false, context),
]).catch((err) => {
this.emit('error', err)
})
})
}, ms, _uid, context)
if (this.#deleted[uid]) return
const [, result] = await Promise.allSettled([
this.#setCronJob(executor, cron, uid, false, context),
executor(context)
])
if (result.status === 'rejected') {
this.emit('error', result.reason)
}
}, ms, uid, context)
}

async setLoopTask<Context = RootContext>(
Expand All @@ -183,17 +183,19 @@ export class CronJob<RootContext = unknown> extends EventEmitter {
fresh: boolean,
context?: Context
): Promise<string> {
const _uid = `immediate-loop-${uid}`
if (fresh) this.#deleted[_uid] = false
if (this.#deleted[_uid]) return ''
!uid.startsWith('immediate-loop-') && (uid = `immediate-loop-${uid}`)
if (fresh) this.#deleted[uid] = false
if (this.#deleted[uid]) return ''
return await this.setImmediate(async (context) => {
if (this.#deleted[_uid]) return
if (this.#deleted[uid]) return
try {
await executor(context)
} catch (err) {
this.emit('error', err)
} finally {
await this.#setLoopTask(executor, uid, false, context)
}
}, _uid, context)
}, uid, context)
}

clearInterval (uid: string): void {
Expand Down
1 change: 1 addition & 0 deletions packages/cronjob/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"@types/node": "^22.7.5",
"c8": "^10.1.2",
"cross-env": "^7.0.3",
"dotenv": "^16.4.5",
"eslint": "^9.12.0",
"fastify": "^5.0.0",
"mongodb": "^6.9.0",
Expand Down
Loading

0 comments on commit 9d366fa

Please sign in to comment.