Skip to content

Commit 389da95

Browse files
Ensure lifecycle tasks wait for messages to be pushed
Issue: BB-641
1 parent af16d44 commit 389da95

File tree

2 files changed

+74
-53
lines changed

2 files changed

+74
-53
lines changed

extensions/lifecycle/tasks/LifecycleTask.js

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,8 @@ class LifecycleTask extends BackbeatTask {
400400
});
401401
}
402402

403+
const promises = [];
404+
403405
// sending bucket entry - only once - for checking next listing
404406
if (data.IsTruncated && allVersions.length > 0 && nbRetries === 0) {
405407
// Uses last version whether Version or DeleteMarker
@@ -414,31 +416,30 @@ class LifecycleTask extends BackbeatTask {
414416
prevDate: last.LastModified,
415417
},
416418
});
417-
this._sendBucketEntry(entry, err => {
419+
promises.push(new Promise(resolve => this._sendBucketEntry(entry, err => {
418420
if (!err) {
419421
log.debug('sent kafka entry for bucket ' +
420422
'consumption', {
421423
method: 'LifecycleTask._getObjectVersions',
422424
});
423425
}
424-
});
426+
resolve(); // safe to ignore the error, we will retry lifecycle eventually
427+
})));
425428
}
426429

427-
// if no versions to process, skip further processing for this
428-
// batch
430+
// if no versions to process, skip further processing for this batch
429431
if (allVersionsWithStaleDate.length === 0) {
430-
return done(null);
432+
return Promise.all(promises).then(() => done(), done);
431433
}
432434

433435
// for each version, get their relative rules, compare with
434436
// bucket rules, match with `staleDate` to
435437
// NoncurrentVersionExpiration Days and send expiration if
436438
// rules all apply
437-
return this._compareRulesToList(bucketData, bucketLCRules,
438-
allVersionsWithStaleDate, log, versioningStatus,
439-
err => {
439+
promises.push(new Promise((resolve, reject) => this._compareRulesToList(bucketData,
440+
bucketLCRules, allVersionsWithStaleDate, log, versioningStatus, err => {
440441
if (err) {
441-
return done(err);
442+
return reject(err);
442443
}
443444

444445
if (!data.IsTruncated) {
@@ -453,8 +454,10 @@ class LifecycleTask extends BackbeatTask {
453454
);
454455
}
455456

456-
return done();
457-
});
457+
return resolve();
458+
})));
459+
460+
return Promise.all(promises).then(() => done(), done);
458461
});
459462
}
460463

extensions/lifecycle/tasks/LifecycleTaskV2.js

Lines changed: 60 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict'; // eslint-disable-line
22

33
const async = require('async');
4+
const util = require('util');
45
const { errors } = require('arsenal');
56

67
const LifecycleTask = require('./LifecycleTask');
@@ -39,33 +40,33 @@ class LifecycleTaskV2 extends LifecycleTask {
3940
* @param {array} remainings - array of { prefix, listType, beforeDate }
4041
* @param {object} bucketData - bucket data
4142
* @param {Logger.newRequestLogger} log - logger object
43+
* @param {function} done - callback(error)
4244
* @return {undefined}
4345
*/
44-
_handleRemainingListings(remainings, bucketData, log) {
45-
if (remainings && remainings.length) {
46-
remainings.forEach(l => {
47-
const {
48-
prefix,
49-
listType,
50-
beforeDate,
51-
storageClass,
52-
} = l;
53-
54-
const entry = Object.assign({}, bucketData, {
55-
contextInfo: { requestId: log.getSerializedUids() },
56-
details: { beforeDate, prefix, listType, storageClass },
57-
});
46+
_handleRemainingListings(remainings, bucketData, log, done) {
47+
async.forEach(remainings || [], (l, cb) => {
48+
const {
49+
prefix,
50+
listType,
51+
beforeDate,
52+
storageClass,
53+
} = l;
54+
55+
const entry = Object.assign({}, bucketData, {
56+
contextInfo: { requestId: log.getSerializedUids() },
57+
details: { beforeDate, prefix, listType, storageClass },
58+
});
5859

59-
this._sendBucketEntry(entry, err => {
60-
if (!err) {
61-
log.debug(
62-
'sent kafka entry for bucket consumption', {
63-
method: 'LifecycleTaskV2._getVersionList',
64-
});
65-
}
66-
});
60+
this._sendBucketEntry(entry, err => {
61+
if (!err) {
62+
log.debug(
63+
'sent kafka entry for bucket consumption', {
64+
method: 'LifecycleTaskV2._getVersionList',
65+
});
66+
}
67+
cb();
6768
});
68-
}
69+
}, done);
6970
}
7071

7172
/**
@@ -101,15 +102,19 @@ class LifecycleTaskV2 extends LifecycleTask {
101102
return process.nextTick(done);
102103
}
103104

105+
const promises = [];
106+
104107
// re-queue remaining listings only once
105108
if (nbRetries === 0) {
106-
this._handleRemainingListings(remainings, bucketData, log);
109+
promises.push(util.promisify(this._handleRemainingListings).bind(this)(
110+
remainings, bucketData, log,
111+
));
107112
}
108113

109114
return this.backbeatMetadataProxy.listLifecycle(listType, params, log,
110115
(err, contents, isTruncated, markerInfo) => {
111116
if (err) {
112-
return done(err);
117+
return Promise.all(promises).then(() => done(err), () => done(err));
113118
}
114119

115120
// re-queue truncated listing only once.
@@ -125,17 +130,22 @@ class LifecycleTaskV2 extends LifecycleTask {
125130
},
126131
});
127132

128-
this._sendBucketEntry(entry, err => {
133+
promises.push(new Promise(resolve => this._sendBucketEntry(entry, err => {
129134
if (!err) {
130135
log.debug(
131136
'sent kafka entry for bucket consumption', {
132-
method: 'LifecycleTaskV2._getObjectList',
133-
});
137+
method: 'LifecycleTaskV2._getObjectList',
138+
});
134139
}
135-
});
140+
resolve(); // safe to ignore the error, we will retry lifecycle eventually
141+
})));
136142
}
137-
return this._compareRulesToList(bucketData, bucketLCRules,
138-
contents, log, done);
143+
144+
promises.push(util.promisify(this._compareRulesToList).bind(this)(
145+
bucketData, bucketLCRules, contents, log,
146+
));
147+
148+
return Promise.all(promises).then(() => done(), done);
139149
});
140150
}
141151

@@ -173,15 +183,19 @@ class LifecycleTaskV2 extends LifecycleTask {
173183
return process.nextTick(done);
174184
}
175185

186+
const promises = [];
187+
176188
// re-queue remaining listings only once
177189
if (nbRetries === 0) {
178-
this._handleRemainingListings(remainings, bucketData, log);
190+
promises.push(util.promisify(this._handleRemainingListings).bind(this)(
191+
remainings, bucketData, log,
192+
));
179193
}
180194

181195
return this.backbeatMetadataProxy.listLifecycle(listType, params, log,
182196
(err, contents, isTruncated, markerInfo) => {
183197
if (err) {
184-
return done(err);
198+
return Promise.all(promises).then(() => done(err), () => done(err));
185199
}
186200

187201
// create Set of unique keys not matching the next marker to
@@ -209,19 +223,21 @@ class LifecycleTaskV2 extends LifecycleTask {
209223
},
210224
});
211225

212-
this._sendBucketEntry(entry, err => {
226+
promises.push(new Promise(resolve => this._sendBucketEntry(entry, err => {
213227
if (!err) {
214228
log.debug(
215229
'sent kafka entry for bucket consumption', {
216-
method: 'LifecycleTaskV2._getObjectList',
217-
});
230+
method: 'LifecycleTaskV2._getObjectVersions',
231+
});
218232
}
219-
});
233+
resolve(); // safe to ignore the error, we will retry lifecycle eventually
234+
})));
220235
}
221-
return this._compareRulesToList(bucketData, bucketLCRules,
222-
contents, log, err => {
236+
237+
promises.push(new Promise((resolve, reject) => this._compareRulesToList(
238+
bucketData, bucketLCRules, contents, log, err => {
223239
if (err) {
224-
return done(err);
240+
return reject(err);
225241
}
226242

227243
if (!isTruncated) {
@@ -236,8 +252,10 @@ class LifecycleTaskV2 extends LifecycleTask {
236252
);
237253
}
238254

239-
return done();
240-
});
255+
return resolve();
256+
})));
257+
258+
return Promise.all(promises).then(() => done(), done);
241259
});
242260
}
243261

0 commit comments

Comments
 (0)