Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: S3UTILS-110 pass VersionId to PutMetadata route #265

Draft
wants to merge 11 commits into
base: development/1.13
Choose a base branch
from
Prev Previous commit
Next Next commit
feature: S3UTILS-29 huge lint overhaul from upgraded eslint
(cherry picked from commit 724008d)
  • Loading branch information
miniscruff authored and jonathan-gramain committed Oct 5, 2022

Verified

This commit was signed with the committer’s verified signature.
ramsay-t Ramsay Taylor
commit 927c1acdb2a7cdd1e3397510d5f9717326cc573b
122 changes: 66 additions & 56 deletions crrExistingObjects.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
const http = require('http');

const AWS = require('aws-sdk');
const { doWhilst, eachSeries, eachLimit, waterfall } = require('async');
const {
doWhilst, eachSeries, eachLimit, waterfall,
} = require('async');

const werelogs = require('werelogs');

@@ -18,32 +20,32 @@ werelogs.configure(loggerConfig);
const log = new werelogs.Logger('s3utils::crrExistingObjects');

const BUCKETS = process.argv[2] ? process.argv[2].split(',') : null;
const ACCESS_KEY = process.env.ACCESS_KEY;
const SECRET_KEY = process.env.SECRET_KEY;
const ENDPOINT = process.env.ENDPOINT;
const SITE_NAME = process.env.SITE_NAME;
let STORAGE_TYPE = process.env.STORAGE_TYPE;
let TARGET_REPLICATION_STATUS = process.env.TARGET_REPLICATION_STATUS;
const TARGET_PREFIX = process.env.TARGET_PREFIX;
const {
ACCESS_KEY, SECRET_KEY, ENDPOINT, SITE_NAME,
} = process.env;
let {
STORAGE_TYPE, TARGET_REPLICATION_STATUS,
} = process.env;
const { TARGET_PREFIX } = process.env;
const WORKERS = (process.env.WORKERS &&
Number.parseInt(process.env.WORKERS, 10)) || 10;
const MAX_UPDATES = (process.env.MAX_UPDATES &&
Number.parseInt(process.env.MAX_UPDATES, 10));
const MAX_SCANNED = (process.env.MAX_SCANNED &&
Number.parseInt(process.env.MAX_SCANNED, 10));
let KEY_MARKER = process.env.KEY_MARKER;
let VERSION_ID_MARKER = process.env.VERSION_ID_MARKER;
let { KEY_MARKER } = process.env;
let { VERSION_ID_MARKER } = process.env;

const LISTING_LIMIT = (process.env.LISTING_LIMIT &&
Number.parseInt(process.env.LISTING_LIMIT, 10)) || 1000;
const LISTING_LIMIT = (process.env.LISTING_LIMIT
&& Number.parseInt(process.env.LISTING_LIMIT, 10)) || 1000;

const LOG_PROGRESS_INTERVAL_MS = 10000;
const AWS_SDK_REQUEST_RETRIES = 100;
const AWS_SDK_REQUEST_DELAY_MS = 30;

if (!BUCKETS || BUCKETS.length === 0) {
log.fatal('No buckets given as input! Please provide ' +
'a comma-separated list of buckets');
log.fatal('No buckets given as input! Please provide '
+ 'a comma-separated list of buckets');
process.exit(1);
}
if (!ENDPOINT) {
@@ -68,15 +70,15 @@ if (!TARGET_REPLICATION_STATUS) {
const replicationStatusToProcess = TARGET_REPLICATION_STATUS.split(',');
replicationStatusToProcess.forEach(state => {
if (!['NEW', 'PENDING', 'COMPLETED', 'FAILED', 'REPLICA'].includes(state)) {
log.fatal('invalid TARGET_REPLICATION_STATUS environment: must be a ' +
'comma-separated list of replication statuses to requeue, ' +
'as NEW, PENDING, COMPLETED, FAILED or REPLICA.');
log.fatal('invalid TARGET_REPLICATION_STATUS environment: must be a '
+ 'comma-separated list of replication statuses to requeue, '
+ 'as NEW, PENDING, COMPLETED, FAILED or REPLICA.');
process.exit(1);
}
});
log.info('Objects with replication status ' +
`${replicationStatusToProcess.join(' or ')} ` +
'will be reset to PENDING to trigger CRR');
log.info('Objects with replication status '
+ `${replicationStatusToProcess.join(' or ')} `
+ 'will be reset to PENDING to trigger CRR');

const options = {
accessKeyId: ACCESS_KEY,
@@ -134,16 +136,22 @@ const logProgressInterval = setInterval(_logProgress, LOG_PROGRESS_INTERVAL_MS);
function _objectShouldBeUpdated(objMD) {
return replicationStatusToProcess.some(filter => {
if (filter === 'NEW') {
return (!objMD.getReplicationInfo() ||
objMD.getReplicationInfo().status === '');
return (!objMD.getReplicationInfo()
|| objMD.getReplicationInfo().status === '');
}
return (objMD.getReplicationInfo() &&
objMD.getReplicationInfo().status === filter);
return (objMD.getReplicationInfo()
&& objMD.getReplicationInfo().status === filter);
});
}

function _markObjectPending(bucket, key, versionId, storageClass,
repConfig, cb) {
function _markObjectPending(
bucket,
key,
versionId,
storageClass,
repConfig,
cb,
) {
let objMD;
let skip = false;
return waterfall([
@@ -199,8 +207,8 @@ function _markObjectPending(bucket, key, versionId, storageClass,
const { Rules, Role } = repConfig;
const destination = Rules[0].Destination.Bucket;
// set replication properties
const ops = objMD.getContentLength() === 0 ? ['METADATA'] :
['METADATA', 'DATA'];
const ops = objMD.getContentLength() === 0 ? ['METADATA']
: ['METADATA', 'DATA'];
const backends = [{
site: storageClass,
status: 'PENDING',
@@ -268,16 +276,14 @@ function _markPending(bucket, versions, cb) {
const { Rules } = repConfig;
const storageClass = Rules[0].Destination.StorageClass || SITE_NAME;
if (!storageClass) {
const errMsg =
'missing SITE_NAME environment variable, must be set to' +
' the value of "site" property in the CRR configuration';
const errMsg = 'missing SITE_NAME environment variable, must be set to'
+ ' the value of "site" property in the CRR configuration';
log.error(errMsg);
return next(new Error(errMsg));
}
return eachLimit(versions, WORKERS, (i, apply) => {
const { Key, VersionId } = i;
_markObjectPending(
bucket, Key, VersionId, storageClass, repConfig, apply);
_markObjectPending(bucket, Key, VersionId, storageClass, repConfig, apply);
}, next);
},
], cb);
@@ -293,25 +299,27 @@ function triggerCRROnBucket(bucketName, cb) {
VersionIdMarker = VERSION_ID_MARKER;
KEY_MARKER = undefined;
VERSION_ID_MARKER = undefined;
log.info(`resuming at: KeyMarker=${KeyMarker} ` +
`VersionIdMarker=${VersionIdMarker}`);
log.info(`resuming at: KeyMarker=${KeyMarker} `
+ `VersionIdMarker=${VersionIdMarker}`);
}
doWhilst(
done => _listObjectVersions(bucket, VersionIdMarker, KeyMarker,
done => _listObjectVersions(
bucket,
VersionIdMarker,
KeyMarker,
(err, data) => {
if (err) {
log.error('error listing object versions', { error: err });
return done(err);
}
return _markPending(
bucket, data.Versions.concat(data.DeleteMarkers), err => {
if (err) {
return done(err);
}
VersionIdMarker = data.NextVersionIdMarker;
KeyMarker = data.NextKeyMarker;
return done();
});
return _markPending(bucket, data.Versions.concat(data.DeleteMarkers), err => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not concat if data.DeleteMarkers is null?
e.g. keep the older code or data.Versions.concat(data.DeleteMarkers || []) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this extra check was introduced in the Zenko version to cope with the change in API (using metadataUtil listing instead of S3), it should not be necessary and 1.4.1 does not have this check, as the SDK always returns an empty array if there is no delete marker.

if (err) {
return done(err);
}
VersionIdMarker = data.NextVersionIdMarker;
KeyMarker = data.NextKeyMarker;
return done();
});
}),
() => {
if (nUpdated >= MAX_UPDATES || nProcessed >= MAX_SCANNED) {
@@ -320,22 +328,23 @@ function triggerCRROnBucket(bucketName, cb) {
if (VersionIdMarker || KeyMarker) {
// next bucket to process is still the current one
remainingBuckets = BUCKETS.slice(
BUCKETS.findIndex(bucket => bucket === bucketName));
BUCKETS.findIndex(bucket => bucket === bucketName),
);
} else {
// next bucket to process is the next in bucket list
remainingBuckets = BUCKETS.slice(
BUCKETS.findIndex(bucket => bucket === bucketName) + 1);
BUCKETS.findIndex(bucket => bucket === bucketName) + 1,
);
}
let message =
'reached ' +
`${nUpdated >= MAX_UPDATES ? 'update' : 'scanned'} ` +
'count limit, resuming from this ' +
'point can be achieved by re-running the script with ' +
`the bucket list "${remainingBuckets.join(',')}"`;
let message = 'reached '
+ `${nUpdated >= MAX_UPDATES ? 'update' : 'scanned'} `
+ 'count limit, resuming from this '
+ 'point can be achieved by re-running the script with '
+ `the bucket list "${remainingBuckets.join(',')}"`;
if (VersionIdMarker || KeyMarker) {
message += ' and the following environment variables set: '
+ `KEY_MARKER=${KeyMarker} ` +
`VERSION_ID_MARKER=${VersionIdMarker}`;
+ `KEY_MARKER=${KeyMarker} `
+ `VERSION_ID_MARKER=${VersionIdMarker}`;
}
log.info(message);
process.exit(0);
@@ -354,7 +363,8 @@ function triggerCRROnBucket(bucketName, cb) {
_logProgress();
log.info(`completed task for bucket: ${bucket}`);
return cb();
});
},
);
}

// trigger the calls to list objects and mark them for crr