diff --git a/packages/cronjob/lib/adapter/adapter.ts b/packages/cronjob/lib/adapter/adapter.ts index 8576fe7..79b930f 100644 --- a/packages/cronjob/lib/adapter/adapter.ts +++ b/packages/cronjob/lib/adapter/adapter.ts @@ -2,13 +2,16 @@ import { type CreateTask, type Task } from '../cronjob' import { kAdapter } from '../symbols' export interface AdapterOptions { + resetOnInit?: boolean } export class Adapter { readonly #options: AdapterOptions + applicationName: string constructor (options: AdapterOptions) { this.#options = options + this.applicationName = '' } static [kAdapter]: any = true diff --git a/packages/cronjob/lib/adapter/mongodb.ts b/packages/cronjob/lib/adapter/mongodb.ts index ee6ff2c..470b457 100644 --- a/packages/cronjob/lib/adapter/mongodb.ts +++ b/packages/cronjob/lib/adapter/mongodb.ts @@ -4,6 +4,7 @@ import { Adapter, type AdapterOptions } from './adapter' export interface MongoDBAdapterOptions extends AdapterOptions { db: Db + name?: string } async function ensureIndex (collection: Collection, indexSpec: IndexSpecification, options: CreateIndexesOptions): Promise { @@ -13,23 +14,34 @@ async function ensureIndex (collection: Collection, indexSpec: IndexSpecificatio } export class MongoDBAdapter extends Adapter { db: Db + name: string collection: Collection collectionLock: Collection + resetOnInit: boolean constructor (options: MongoDBAdapterOptions) { super(options) this.db = options.db - this.collection = this.db.collection('__cron__.task') - this.collectionLock = this.db.collection('__cron__.lock') + this.name = options.name ?? 'tasks' + this.collection = this.db.collection(`${this.name}.task`) + this.collectionLock = this.db.collection(`${this.name}.lock`) + this.resetOnInit = options.resetOnInit ?? false } async prepare (): Promise { - await ensureIndex(this.collection, { uid: 1 }, { unique: true, background: false }) - await ensureIndex(this.collection, { uid: 1, isDeleted: 1 }, { unique: false, background: false }) - await ensureIndex(this.collection, { executeAt: 1 }, { unique: false, background: false }) + // we use all settled for error-free control + await Promise.allSettled([ + ensureIndex(this.collection, { uid: 1 }, { unique: true, background: false }), + ensureIndex(this.collection, { uid: 1, isDeleted: 1 }, { unique: false, background: false }), + ensureIndex(this.collection, { executeAt: 1 }, { unique: false, background: false }), - await ensureIndex(this.collectionLock, { expireAt: 1 }, { unique: false, expireAfterSeconds: 1, background: false }) - await ensureIndex(this.collectionLock, { application: 1 }, { unique: true, background: false }) + ensureIndex(this.collectionLock, { expireAt: 1 }, { unique: false, expireAfterSeconds: 1, background: false }), + ensureIndex(this.collectionLock, { application: 1 }, { unique: true, background: false }) + ]) + if (this.resetOnInit) { + this.collection.deleteMany({ once: false }).catch(() => {}) + this.collectionLock.deleteMany({ application: this.applicationName }).catch(() => {}) + } } async fetchTasks (executeAt: number): Promise { @@ -42,32 +54,20 @@ export class MongoDBAdapter extends Adapter { } async createTask (task: CreateTask): Promise { - const _task = await this.collection.findOne({ uid: task.uid }) const executeAt = Date.now() + task.delay - if (_task === null) { - await this.collection.insertOne({ - uid: task.uid, - once: task.once ?? false, + await this.collection.updateOne({ + uid: task.uid + }, { + $set: { delay: task.delay, executeAt, isDeleted: false, - }) - } else { - const $set: any = { isDeleted: false } - if (_task.delay !== task.delay) { - $set.delay = task.delay - } - if (_task.executeAt !== executeAt) { - $set.executeAt = executeAt - } - if ($set !== null) { - await this.collection.updateOne({ - uid: task.uid, - }, { - $set, - }) + }, + $setOnInsert: { + uid: task.uid, + once: task.once ?? false, } - } + }, { upsert: true }) } async updateTasks (uids: string[], executeAt: number): Promise { @@ -99,10 +99,17 @@ export class MongoDBAdapter extends Adapter { } async aquireLock (name: string, expireAt: number): Promise { - const result = await this.collectionLock.findOne({ application: name }) - if (result !== null) return false - await this.collectionLock.insertOne({ application: name, expireAt }) - return true + const { acknowledged, upsertedId } = await this.collectionLock.updateOne({ + application: name + }, { + $set: { application: name }, + $setOnInsert: { expireAt } + }, { + upsert: true + }) + // when upsertedId is non-null, it means the document is inserted + // when upsertedId is null, it means the document already exists + return acknowledged && upsertedId !== null } async releaseLock (name: string): Promise { diff --git a/packages/cronjob/lib/cronjob.ts b/packages/cronjob/lib/cronjob.ts index 39f99e1..42524a0 100644 --- a/packages/cronjob/lib/cronjob.ts +++ b/packages/cronjob/lib/cronjob.ts @@ -3,7 +3,7 @@ import EventEmitter from 'events' import { type Adapter } from './adapter/adapter' type _TaskExecutor = () => Promise -export type TaskExecutor = (context: Context) => Promise +export type TaskExecutor = (context: Context) => void | Promise export interface Task { uid: string @@ -54,11 +54,13 @@ export class CronJob extends EventEmitter { this.isDestroyed = false this.isLocked = false this.adapter = options.adapter + this.adapter.applicationName = this.application this.#context = options.context as any this.minTickMS = options?.minTickMS ?? 128 this.maxTickMS = options?.maxTickMS ?? 768 this.maxExecutionMS = options?.maxExecutionMS ?? 900_000 + this.adapter.prepare() this.#tick() } @@ -151,22 +153,20 @@ export class CronJob extends EventEmitter { ): Promise { const nextExecuteAt = Number(parseExpression(cron).next().toDate()) const ms = nextExecuteAt - Date.now() - const _uid = `timeout-cron-${uid}` - if (fresh) this.#deleted[_uid] = false - if (this.#deleted[_uid]) return '' + !uid.startsWith('timeout-cron-') && (uid = `timeout-cron-${uid}`) + if (fresh) this.#deleted[uid] = false + if (this.#deleted[uid]) return '' return await this.setTimeout(async (context) => { - if (this.#deleted[_uid]) return - setImmediate(() => { - // we execute immediately for the next task - Promise.race([ - executor(context), - this.#setCronJob(executor, cron, uid, false, context), - ]).catch((err) => { - this.emit('error', err) - }) - }) - }, ms, _uid, context) + if (this.#deleted[uid]) return + const [, result] = await Promise.allSettled([ + this.#setCronJob(executor, cron, uid, false, context), + executor(context) + ]) + if (result.status === 'rejected') { + this.emit('error', result.reason) + } + }, ms, uid, context) } async setLoopTask( @@ -183,17 +183,19 @@ export class CronJob extends EventEmitter { fresh: boolean, context?: Context ): Promise { - const _uid = `immediate-loop-${uid}` - if (fresh) this.#deleted[_uid] = false - if (this.#deleted[_uid]) return '' + !uid.startsWith('immediate-loop-') && (uid = `immediate-loop-${uid}`) + if (fresh) this.#deleted[uid] = false + if (this.#deleted[uid]) return '' return await this.setImmediate(async (context) => { - if (this.#deleted[_uid]) return + if (this.#deleted[uid]) return try { await executor(context) + } catch (err) { + this.emit('error', err) } finally { await this.#setLoopTask(executor, uid, false, context) } - }, _uid, context) + }, uid, context) } clearInterval (uid: string): void { diff --git a/packages/cronjob/package.json b/packages/cronjob/package.json index f2c6f1b..8131d5f 100644 --- a/packages/cronjob/package.json +++ b/packages/cronjob/package.json @@ -58,6 +58,7 @@ "@types/node": "^22.7.5", "c8": "^10.1.2", "cross-env": "^7.0.3", + "dotenv": "^16.4.5", "eslint": "^9.12.0", "fastify": "^5.0.0", "mongodb": "^6.9.0", diff --git a/packages/cronjob/test/adapter/mongodb.test.ts b/packages/cronjob/test/adapter/mongodb.test.ts index 5afe7d0..a2c48ec 100644 --- a/packages/cronjob/test/adapter/mongodb.test.ts +++ b/packages/cronjob/test/adapter/mongodb.test.ts @@ -1,14 +1,15 @@ -import { test } from '@kakang/unit' import { parseExpression } from 'cron-parser' -import Fastify from 'fastify' +import Fastify, { FastifyInstance } from 'fastify' import { MongoClient } from 'mongodb' +import { after, before, test, TestContext } from 'node:test' +import { setTimeout } from 'node:timers/promises' import { fastifyCronJob } from '../../lib' import { MongoDBAdapter, type MongoDBAdapterOptions } from '../../lib/adapter/mongodb' +import { MONGODB_URL } from '../config' -const MONGODB_URI = 'mongodb://127.0.0.1:27017/?replicaSet=rs0' const minTickMS = 32 const maxTickMS = 256 -const RANDOM_GAP = (maxTickMS - minTickMS) + 3072 +const RANDOM_GAP = (maxTickMS - minTickMS) + (1024 * 1.5) function createDeferredPromise (): { promise: Promise, resolve: () => void, reject: () => void } { const promise: any = {} @@ -19,19 +20,16 @@ function createDeferredPromise (): { promise: Promise, resolve: () => void return promise } -function isJob (uid: unknown, expected: string): boolean { - return String(uid).includes(expected) -} - -test('MongoDBAdapter', async function (t) { - const client = new MongoClient(MONGODB_URI) - await client.connect() - - const db = client.db('foobar') - const fastify = Fastify() +let client: MongoClient +let fastify: FastifyInstance +before(async function (t) { + client = new MongoClient(MONGODB_URL) + const db = client.db('cicd') + fastify = Fastify() const adapterOption: MongoDBAdapterOptions = { db, + resetOnInit: true } await fastify.register(fastifyCronJob, { application: t.name, @@ -41,167 +39,242 @@ test('MongoDBAdapter', async function (t) { maxTickMS, maxExecutionMS: 8000, }) +}) - fastify.cronjob.on('executed', function (task) { - ticks[task.uid]++ - if (timestamps[task.uid].length < 2) { - timestamps[task.uid].push(task.timestamp as number) - } else { - timestamps[task.uid][1] = task.timestamp as number - } - - const now = Date.now() - - if ( - ( - isJob(task.uid, 'interval') || - isJob(task.uid, 'timeout') || - isJob(task.uid, 'immediate') - ) && timestamps[task.uid].length === 2 - ) { - const expected = timestamps[task.uid][0] - const _from = expected - RANDOM_GAP - const _to = expected + RANDOM_GAP - const diff = now - expected - - if (isJob(task.uid, 'interval')) { - if (ticks[task.uid] >= 2) { - fastify.cronjob.clearInterval(task.uid as string) - t.equal(ticks[task.uid], 2) - dones[task.uid]() - } else { - timestamps[task.uid][0] = now + task.delay - t.equal(ticks[task.uid], 1) - } - } else if (isJob(task.uid, 'cron')) { - if (ticks[task.uid] >= 2) { - fastify.cronjob.clearInterval(task.uid as string) - t.equal(ticks[task.uid] >= 2, true) - dones[task.uid]() - } else { - timestamps[task.uid][0] = now + task.delay - t.equal(ticks[task.uid], 1) - } - } else if (isJob(task.uid, 'loop')) { - fastify.cronjob.clearInterval(task.uid as string) - t.equal(ticks[task.uid] >= 1, true) - dones[task.uid]() - } else { - fastify.cronjob.clearInterval(task.uid as string) - t.equal(ticks[task.uid], 1) - dones[task.uid]() - } - - if (!isJob(task.uid, 'loop') && !isJob(task.uid, 'cron')) { - t.equal(_from < now && now < _to, true) - t.equal(diff < RANDOM_GAP, true) - } - } - }) +after(async function () { + await fastify.close() + await setTimeout(500) + await client.close() +}) - const dones: Record void> = {} - const timestamps: Record = {} - const ticks: Record = {} +test('properly registered', function (t: TestContext) { + t.plan(4) + t.assert.ok(fastify.cronjob) + t.assert.equal(typeof fastify.cronjob.setTimeout, 'function') + t.assert.equal(typeof fastify.cronjob.setInterval, 'function') + t.assert.equal(typeof fastify.cronjob.setImmediate, 'function') +}) - const checkInterval = async function (interval: number): Promise { +test('timeout', async function (t: TestContext) { + // [setup, ...runat] + const tasks: Record = {} + function setupTimeout (delay: number) { const promise = createDeferredPromise() - const uid = await fastify.cronjob.setInterval(async () => {}, interval, '' + interval) - dones[uid] = promise.resolve - timestamps[uid] = [Date.now() + interval] - ticks[uid] = 0 - await promise.promise + const uid = `timeout-${delay}` + tasks[delay] = [Date.now()] + fastify.cronjob.setTimeout(() => { + tasks[delay].push(Date.now()) + promise.resolve() + }, delay, uid) + + return t.test(`${delay}ms`, async function (t: TestContext) { + await promise.promise + + // check + const timestamps = tasks[delay] + const expected = (timestamps[0] + delay) + const diff = timestamps[1] - expected + const from = expected - RANDOM_GAP + const to = expected + RANDOM_GAP + t.assert.equal(timestamps.length, 2, 'executed one time') + // within expected range + t.assert.equal(timestamps[1] >= from, true, 'within random gap lower bound') + t.assert.equal(timestamps[1] <= to, true, 'within random gap upper bound') + // within expected diff + t.assert.equal(diff < RANDOM_GAP, true, 'within random gap') + }) } - const checkTimeout = async function (interval: number): Promise { + await Promise.all([ + setupTimeout(384), + setupTimeout(512), + setupTimeout(640), + setupTimeout(768), + setupTimeout(778), + setupTimeout(788), + setupTimeout(789), + setupTimeout(800), + setupTimeout(801), + setupTimeout(802), + ]) +}) + +test('interval', async function (t: TestContext) { + // [setup, ...runat] + const tasks: Record = {} + function setupInterval (delay: number) { const promise = createDeferredPromise() - const uid = await fastify.cronjob.setTimeout(async () => {}, interval, '' + interval) - dones[uid] = promise.resolve - timestamps[uid] = [Date.now() + interval] - ticks[uid] = 0 - await promise.promise + const uid = `interval-${delay}` + tasks[delay] = [Date.now()] + fastify.cronjob.setInterval(() => { + tasks[delay].push(Date.now()) + if (tasks[delay].length === 3) { + promise.resolve() + fastify.cronjob.clearInterval(uid) + } + }, delay, uid) + + return t.test(`${delay}ms`, async function (t: TestContext) { + await promise.promise + + // check + const timestamps = tasks[delay] + t.assert.equal(timestamps.length, 3, 'executed two time') + for (let i = 1; i < timestamps.length; i++) { + const expected = (timestamps[i - 1] + delay) + const diff = timestamps[i] - expected + const from = expected - RANDOM_GAP + const to = expected + RANDOM_GAP + // within expected range + t.assert.equal(timestamps[i] >= from, true, 'within random gap lower bound') + t.assert.equal(timestamps[i] <= to, true, 'within random gap upper bound') + // within expected diff + t.assert.equal(diff < RANDOM_GAP, true, 'within random gap') + } + }) } - const checkCronJob = async function (cron: string): Promise { - const next = parseExpression(cron).next().toDate() + await Promise.all([ + setupInterval(384), + setupInterval(512), + setupInterval(640), + setupInterval(768), + setupInterval(778), + setupInterval(788), + setupInterval(789), + setupInterval(800), + setupInterval(801), + setupInterval(802), + ]) +}) + +test('immediate', async function (t: TestContext) { + // [setup, ...runat] + const tasks: Record = {} + function setupImmediate (name: string) { const promise = createDeferredPromise() - const uid = await fastify.cronjob.setCronJob(async () => {}, cron, cron) - dones[uid] = promise.resolve - timestamps[uid] = [+next] - ticks[uid] = 0 - await promise.promise + const uid = `immediate-${name}` + tasks[name] = [Date.now()] + fastify.cronjob.setImmediate(() => { + tasks[name].push(Date.now()) + promise.resolve() + }, uid) + + return t.test(`${name}`, async function (t: TestContext) { + await promise.promise + + // check + const timestamps = tasks[name] + const diff = timestamps[1] - timestamps[0] + const from = timestamps[0] - RANDOM_GAP + const to = timestamps[0] + RANDOM_GAP + t.assert.equal(timestamps.length, 2, 'executed one time') + // within expected range + t.assert.equal(timestamps[1] >= from, true, 'within random gap lower bound') + t.assert.equal(timestamps[1] <= to, true, 'within random gap upper bound') + // within expected diff + t.assert.equal(diff < RANDOM_GAP, true, 'within random gap') + }) } - const checkLoopTask = async function (name: number): Promise { + await Promise.all([ + setupImmediate('foo'), + setupImmediate('bar'), + setupImmediate('baz'), + setupImmediate('hello'), + setupImmediate('world'), + ]) +}) + +test('cronjob', async function (t: TestContext) { + // [setup, ...runat] + const tasks: Record = {} + const expecteds: Record = {} + function setupCronJob (name: string, gap: number) { const promise = createDeferredPromise() - const uid = await fastify.cronjob.setLoopTask(async () => {}, '' + name) - dones[uid] = promise.resolve - timestamps[uid] = [Date.now()] - ticks[uid] = 0 - await promise.promise - } + const uid = `timeout-cron-${name}` + tasks[name] = [Date.now()] + expecteds[name] = [+parseExpression(name).next().toDate()] + fastify.cronjob.setCronJob(() => { + tasks[name].push(Date.now()) + expecteds[name].push(+parseExpression(name).next().toDate()) + if (tasks[name].length === 3) { + promise.resolve() + fastify.cronjob.clearTimeout(uid) + } + }, name, uid) - t.after(async function () { - await fastify.close() - await db.dropDatabase() - await client.close(true) - }) + return t.test(`${name}`, async function (t: TestContext) { + await promise.promise - t.test('fastify.cronjob', function (t, done) { - const ok: typeof t.ok = t.ok - ok(fastify.cronjob) - t.equal(typeof fastify.cronjob.setTimeout, 'function') - t.equal(typeof fastify.cronjob.setInterval, 'function') - t.equal(typeof fastify.cronjob.setImmediate, 'function') - done() - }) + // check + const timestamps = tasks[name] + t.assert.equal(timestamps.length, 3, 'executed two time') + for (let i = 1; i < timestamps.length; i++) { + const expected = expecteds[name][i - 1] + const diff = timestamps[i] - expected + const from = expected - RANDOM_GAP + const to = expected + RANDOM_GAP + // within expected range + t.assert.equal(timestamps[i] >= from, true, 'within random gap lower bound') + t.assert.equal(timestamps[i] <= to, true, 'within random gap upper bound') + // within expected diff + t.assert.equal(diff < RANDOM_GAP, true, 'within random gap') + } + }) + } - t.test('interval', async function () { - const promises = [ - checkInterval(384), - checkInterval(512), - checkInterval(640), - checkInterval(768), - checkInterval(778), - checkInterval(788), - checkInterval(789), - checkInterval(800), - checkInterval(801), - checkInterval(802), - ] - await Promise.allSettled(promises) - }) + await Promise.all([ + setupCronJob('* * * * * *', 1000), + setupCronJob('*/2 * * * * *', 2000), + setupCronJob('*/3 * * * * *', 3000), + setupCronJob('*/4 * * * * *', 4000), + setupCronJob('*/5 * * * * *', 5000), + ]) +}) - t.test('timeout', async function () { - const promises = [ - checkTimeout(384), - checkTimeout(512), - checkTimeout(640), - checkTimeout(768), - checkTimeout(778), - checkTimeout(788), - checkTimeout(789), - checkTimeout(800), - checkTimeout(801), - checkTimeout(802), - ] - await Promise.allSettled(promises) - }) +test('loop', async function (t: TestContext) { + // [setup, ...runat] + const tasks: Record = {} + function setupLoopTask (name: string, gap: number) { + const promise = createDeferredPromise() + const uid = `immediate-loop-${name}` + tasks[name] = [Date.now()] + fastify.cronjob.setLoopTask(async () => { + tasks[name].push(Date.now()) + if (tasks[name].length === 4) { + promise.resolve() + fastify.cronjob.clearTimeout(uid) + } + await setTimeout(gap) + tasks[name].push(Date.now()) + }, name, uid) - t.test('cronjob', async function () { - const promises = [ - checkCronJob('* * * * * *'), - checkCronJob('*/2 * * * * *'), - checkCronJob('*/3 * * * * *'), - checkCronJob('*/4 * * * * *'), - checkCronJob('*/5 * * * * *'), - ] - await Promise.allSettled(promises) - }) + return t.test(`${name}`, async function (t: TestContext) { + await promise.promise - t.test('looptask', async function () { - const promises = [ - checkLoopTask(384), - ] - await Promise.allSettled(promises) - }) + // check + const timestamps = tasks[name] + t.assert.equal(timestamps.length, 4, 'executed two time') + for (let i = 1; i < timestamps.length; i += 2) { + const expected = timestamps[i - 1] + const diff = timestamps[i] - expected + const from = expected - RANDOM_GAP + const to = expected + RANDOM_GAP + // within expected diff + t.assert.equal(diff < RANDOM_GAP, true, 'within random gap') + // within expected range + t.assert.equal(timestamps[i] >= from, true, 'within random gap lower bound') + t.assert.equal(timestamps[i] <= to, true, 'within random gap upper bound') + } + }) + } + + await Promise.all([ + setupLoopTask('1s', 1000), + setupLoopTask('2s', 2000), + setupLoopTask('3s', 3000), + setupLoopTask('4s', 4000), + setupLoopTask('5s', 5000), + ]) }) diff --git a/packages/cronjob/test/config.ts b/packages/cronjob/test/config.ts new file mode 100644 index 0000000..463f47f --- /dev/null +++ b/packages/cronjob/test/config.ts @@ -0,0 +1,6 @@ +import dotenv from 'dotenv' +dotenv.config() + +// Use environment variables for local +// Use fallback value for Github Actions +export const MONGODB_URL = process.env.MONGODB_URL ?? 'mongodb://127.0.0.1:27017/?replicaSet=rs0' diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1ac97bc..e2d2399 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -72,6 +72,9 @@ importers: cross-env: specifier: ^7.0.3 version: 7.0.3 + dotenv: + specifier: ^16.4.5 + version: 16.4.5 eslint: specifier: ^9.12.0 version: 9.12.0