Skip to content

Commit 8c82097

Browse files
committed
fix: looptask and cron implementation
1 parent 922f982 commit 8c82097

File tree

3 files changed

+112
-20
lines changed

3 files changed

+112
-20
lines changed

packages/cronjob/lib/adapter/mongodb.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,15 @@ export class MongoDBAdapter extends Adapter {
4949
uid: task.uid,
5050
once: task.once ?? false,
5151
delay: task.delay,
52-
executeAt
52+
executeAt,
53+
isDeleted: false
5354
})
5455
} else {
55-
let $set: any = null
56+
const $set: any = { isDeleted: false }
5657
if (_task.delay !== task.delay) {
57-
$set = { delay: task.delay }
58+
$set.delay = task.delay
5859
}
5960
if (_task.executeAt !== executeAt) {
60-
$set ??= {}
6161
$set.executeAt = executeAt
6262
}
6363
if ($set !== null) {

packages/cronjob/lib/cronjob.ts

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ export interface CronJobOptions {
3232
export class CronJob<RootContext = unknown> extends EventEmitter {
3333
// state
3434
application: string
35-
tasks: Record<string, _TaskExecutor>
35+
#tasks: Record<string, _TaskExecutor>
36+
#deleted: Record<string, boolean>
3637
nextTick: null | NodeJS.Timeout
3738
isDestroyed: boolean
3839
isLocked: boolean
@@ -47,7 +48,8 @@ export class CronJob<RootContext = unknown> extends EventEmitter {
4748
constructor (options: CronJobOptions) {
4849
super()
4950
this.application = options.application
50-
this.tasks = {}
51+
this.#tasks = {}
52+
this.#deleted = {}
5153
this.nextTick = null
5254
this.isDestroyed = false
5355
this.isLocked = false
@@ -69,7 +71,7 @@ export class CronJob<RootContext = unknown> extends EventEmitter {
6971
if (this.isDestroyed) return ''
7072
// we need to prefix uid
7173
!uid.startsWith('interval-') && (uid = `interval-${uid}`)
72-
this.tasks[uid] = async () => {
74+
this.#tasks[uid] = async () => {
7375
try {
7476
await executor((context ?? this.#context) as Context)
7577
} catch (err) {
@@ -93,7 +95,7 @@ export class CronJob<RootContext = unknown> extends EventEmitter {
9395
if (this.isDestroyed) return ''
9496
// we need to prefix uid
9597
!uid.startsWith('timeout-') && (uid = `timeout-${uid}`)
96-
this.tasks[uid] = async () => {
98+
this.#tasks[uid] = async () => {
9799
try {
98100
await executor((context ?? this.#context) as Context)
99101
} catch (err) {
@@ -116,7 +118,7 @@ export class CronJob<RootContext = unknown> extends EventEmitter {
116118
if (this.isDestroyed) return ''
117119
// we need to prefix uid
118120
!uid.startsWith('immediate-') && (uid = `immediate-${uid}`)
119-
this.tasks[uid] = async () => {
121+
this.#tasks[uid] = async () => {
120122
try {
121123
await executor((context ?? this.#context) as Context)
122124
} catch (err) {
@@ -136,43 +138,71 @@ export class CronJob<RootContext = unknown> extends EventEmitter {
136138
cron: string,
137139
uid: string,
138140
context?: Context
141+
): Promise<string> {
142+
return await this.#setCronJob(executor, cron, uid, true, context)
143+
}
144+
145+
async #setCronJob<Context = RootContext>(
146+
executor: TaskExecutor<Context>,
147+
cron: string,
148+
uid: string,
149+
fresh: boolean,
150+
context?: Context
139151
): Promise<string> {
140152
const nextExecuteAt = Number(parseExpression(cron).next().toDate())
141153
const ms = nextExecuteAt - Date.now()
154+
const _uid = `timeout-cron-${uid}`
155+
if (fresh) this.#deleted[_uid] = false
156+
if (this.#deleted[_uid]) return ''
142157

143158
return await this.setTimeout(async (context) => {
159+
if (this.#deleted[_uid]) return
144160
setImmediate(() => {
145161
// we execute immediately for the next task
146162
Promise.race([
147163
executor(context),
148-
this.setCronJob(executor, cron, uid, context)
164+
this.#setCronJob(executor, cron, uid, false, context)
149165
]).catch((err) => {
150166
this.emit('error', err)
151167
})
152168
})
153-
}, ms, `timeout-cron-${uid}`, context)
169+
}, ms, _uid, context)
154170
}
155171

156172
async setLoopTask<Context = RootContext>(
157173
executor: TaskExecutor<Context>,
158174
uid: string,
159175
context?: Context
160176
): Promise<string> {
177+
return await this.#setLoopTask(executor, uid, true, context)
178+
}
179+
180+
async #setLoopTask<Context = RootContext>(
181+
executor: TaskExecutor<Context>,
182+
uid: string,
183+
fresh: boolean,
184+
context?: Context
185+
): Promise<string> {
186+
const _uid = `immediate-loop-${uid}`
187+
if (fresh) this.#deleted[_uid] = false
188+
if (this.#deleted[_uid]) return ''
161189
return await this.setImmediate(async (context) => {
190+
if (this.#deleted[_uid]) return
162191
try {
163192
await executor(context)
164193
} finally {
165-
await this.setLoopTask(executor, uid, context)
194+
await this.#setLoopTask(executor, uid, false, context)
166195
}
167-
}, `timeout-loop-${uid}`, context)
196+
}, _uid, context)
168197
}
169198

170199
clearInterval (uid: string): void {
200+
this.#deleted[uid] = true
171201
this.#deleteTask(uid)
172202
}
173203

174204
clearTimeout (uid: string): void {
175-
this.#deleteTask(uid)
205+
this.clearInterval(uid)
176206
}
177207

178208
async #_deleteTask (uid: string): Promise<void> {
@@ -228,7 +258,7 @@ export class CronJob<RootContext = unknown> extends EventEmitter {
228258
await this.adapter.updateTask(task.uid, executeAt, task.once)
229259
}
230260

231-
const executor = this.tasks[task.uid]
261+
const executor = this.#tasks[task.uid]
232262
if (typeof executor !== 'function') {
233263
// when we missing runtime
234264
// we delay the task to maxExecutionMS

packages/cronjob/test/adapter/mongodb.test.ts

Lines changed: 67 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { parseExpression } from 'cron-parser'
12
import Fastify from 'fastify'
23
import { MongoClient } from 'mongodb'
34
import assert from 'node:assert/strict'
@@ -19,6 +20,10 @@ function createDeferredPromise (): { promise: Promise<void>, resolve: () => void
1920
return promise
2021
}
2122

23+
function isJob (uid: unknown, expected: string): boolean {
24+
return String(uid).includes(expected)
25+
}
26+
2227
test('MongoDBAdapter', async function (t) {
2328
const client = new MongoClient(MONGODB_URI)
2429
await client.connect()
@@ -49,15 +54,18 @@ test('MongoDBAdapter', async function (t) {
4954
const now = Date.now()
5055

5156
if (
52-
(String(task.uid).includes('interval') || String(task.uid).includes('timeout')) &&
53-
timestamps[task.uid].length === 2
57+
(
58+
isJob(task.uid, 'interval') ||
59+
isJob(task.uid, 'timeout') ||
60+
isJob(task.uid, 'immediate')
61+
) && timestamps[task.uid].length === 2
5462
) {
5563
const expected = timestamps[task.uid][0]
5664
const _from = expected - RANDOM_GAP
5765
const _to = expected + RANDOM_GAP
5866
const diff = now - expected
5967

60-
if (String(task.uid).includes('interval')) {
68+
if (isJob(task.uid, 'interval')) {
6169
if (ticks[task.uid] >= 2) {
6270
fastify.cronjob.clearInterval(task.uid as string)
6371
assert.equal(ticks[task.uid], 2)
@@ -66,12 +74,29 @@ test('MongoDBAdapter', async function (t) {
6674
timestamps[task.uid][0] = now + task.delay
6775
assert.equal(ticks[task.uid], 1)
6876
}
77+
} else if (isJob(task.uid, 'cron')) {
78+
if (ticks[task.uid] >= 2) {
79+
fastify.cronjob.clearInterval(task.uid as string)
80+
assert.equal(ticks[task.uid] >= 2, true)
81+
dones[task.uid]()
82+
} else {
83+
timestamps[task.uid][0] = now + task.delay
84+
assert.equal(ticks[task.uid], 1)
85+
}
86+
} else if (isJob(task.uid, 'loop')) {
87+
fastify.cronjob.clearInterval(task.uid as string)
88+
assert.equal(ticks[task.uid] >= 1, true)
89+
dones[task.uid]()
6990
} else {
91+
fastify.cronjob.clearInterval(task.uid as string)
7092
assert.equal(ticks[task.uid], 1)
7193
dones[task.uid]()
7294
}
73-
assert.equal(_from < now && now < _to, true)
74-
assert.equal(diff < RANDOM_GAP, true)
95+
96+
if (!isJob(task.uid, 'loop') && !isJob(task.uid, 'cron')) {
97+
assert.equal(_from < now && now < _to, true)
98+
assert.equal(diff < RANDOM_GAP, true)
99+
}
75100
}
76101
})
77102

@@ -97,6 +122,25 @@ test('MongoDBAdapter', async function (t) {
97122
await promise.promise
98123
}
99124

125+
const checkCronJob = async function (cron: string): Promise<void> {
126+
const next = parseExpression(cron).next().toDate()
127+
const promise = createDeferredPromise()
128+
const uid = await fastify.cronjob.setCronJob(async () => {}, cron, cron)
129+
dones[uid] = promise.resolve
130+
timestamps[uid] = [+next]
131+
ticks[uid] = 0
132+
await promise.promise
133+
}
134+
135+
const checkLoopTask = async function (name: number): Promise<void> {
136+
const promise = createDeferredPromise()
137+
const uid = await fastify.cronjob.setLoopTask(async () => {}, '' + name)
138+
dones[uid] = promise.resolve
139+
timestamps[uid] = [Date.now()]
140+
ticks[uid] = 0
141+
await promise.promise
142+
}
143+
100144
t.after(async function () {
101145
await fastify.close()
102146
await db.dropDatabase()
@@ -142,4 +186,22 @@ test('MongoDBAdapter', async function (t) {
142186
]
143187
await Promise.allSettled(promises)
144188
})
189+
190+
await t.test('cronjob', async function () {
191+
const promises = [
192+
checkCronJob('* * * * * *'),
193+
checkCronJob('*/2 * * * * *'),
194+
checkCronJob('*/3 * * * * *'),
195+
checkCronJob('*/4 * * * * *'),
196+
checkCronJob('*/5 * * * * *')
197+
]
198+
await Promise.allSettled(promises)
199+
})
200+
201+
await t.test('looptask', async function () {
202+
const promises = [
203+
checkLoopTask(384)
204+
]
205+
await Promise.allSettled(promises)
206+
})
145207
})

0 commit comments

Comments
 (0)