-
-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Description
例行检查
- 我已确认目前没有类似 issue
- 我已确认我已升级到最新版本
- 我已完整查看过项目 README,已确定现有版本无法满足需求
- 我理解并愿意跟进此 issue,协助测试和提供反馈
- 我理解并认可上述内容,并理解项目维护者精力有限,不遵循规则的 issue 可能会被无视或直接关闭
功能描述
以处理task表视频任务,定时任务的查询条件是:progress != '100%' and status != 'FAILURE' OR status != 'SUCCESS'。随着时间推移,已完成的任务越来越多,但每次仍需扫描所有记录。以10万条记录为例。可能前99000条都已经处理了。但是每次全表扫描都会扫一遍没有意义。
应用场景
我们平时处理定时任务时会引入一个lastProcessedTaskId变量,代表之前的id都已经处理了。
查询语句变为id > lastProcessedTaskId and progress != '100%' and status != 'FAILURE' OR status != 'SUCCESS'。这样只会扫描99000条之后的数据。
但是这里我们的业务比较特别,就是task不是扫描完成之后全部都提交为完成,所以会存在数据空洞的问题。如:我扫描出来了 id 位 1 2 3 4 5这五条数据。在次定时任务只有 1 2 5完成了。3和4未完成。此时lastProcessedTaskId 应该是2,而不是5。代表2之前的都已经处理了。
怎么来实现这个功能呢。我想到了消息队列中队列中的消费位点。可以做为参考实现:
1.扫描出 1 2 3 4 5五条数据
2.依次将 1 2 3 4 5存储小顶堆
3.1 2 5处理完成。将1 2 5从小顶堆中删除。此时堆顶为3
4.lastProcessedTaskId = 堆顶-1 =2
5.如果小顶堆为空时(1,2,3,4,5都处理完),那么lastProcessedTaskId= 本次批次的最大 ID(5)
lastProcessedTaskId的实现完成了,怎么进行持久化呢?
为了简单,我们可以将在处理完db后存入到redis。这会出现一个情况:
- 存入redis失败,此时redis的值是上一个lastProcessedTaskId的值下次定时任务会更新它所以不会有问题。
- Redis 持久化丢失1秒数据或redis数据全部丢失,也不会有问题,原因同第1点
所以我们只需将lastProcessedTaskId存入redis。定时任务第一次初始化的时候去redis读取即可。
还有一个问题是消费记录如果一直不满足这个条件 progress != '100%' and status != 'FAILURE' OR status != 'SUCCESS',也就是我们一直是处理中。因为我们我们定时任务每次会通过taskid去查下游服务商,来获取状态,如果长时间一直在处理下游服务商会返回FAILURE。当然我们也可以自己去实现一个判断,比如当task超过了一个阈值,比如:一小时还在处理中,那么直接将状态改为FAILURE。也可以结合https://github.com/QuantumNous/new-api/pull/2778,这里已经加入条件submit_time >= cutoff也可以自动跳过来推进消费位点。