diff --git a/lib/storage/metadata/MetadataWrapper.js b/lib/storage/metadata/MetadataWrapper.js index c869dc723..8cf7dbfed 100644 --- a/lib/storage/metadata/MetadataWrapper.js +++ b/lib/storage/metadata/MetadataWrapper.js @@ -85,6 +85,7 @@ class MetadataWrapper { replicaSetHosts: params.mongodb.replicaSetHosts, writeConcern: params.mongodb.writeConcern, replicaSet: params.mongodb.replicaSet, + enableSharding: params.mongodb.enableSharding, readPreference: params.mongodb.readPreference, database: params.mongodb.database, replicationGroupId: params.replicationGroupId, diff --git a/lib/storage/metadata/mongoclient/LogConsumer.js b/lib/storage/metadata/mongoclient/LogConsumer.js index 4a065beca..8b906dde2 100644 --- a/lib/storage/metadata/mongoclient/LogConsumer.js +++ b/lib/storage/metadata/mongoclient/LogConsumer.js @@ -17,10 +17,13 @@ class LogConsumer { * @param {string} logger - logger */ constructor(mongoConfig, logger) { - const { authCredentials, replicaSetHosts, replicaSet, database } = mongoConfig; + const { authCredentials, replicaSetHosts, replicaSet, database, + enableSharding } = mongoConfig; const cred = MongoUtils.credPrefix(authCredentials); this._mongoUrl = `mongodb://${cred}${replicaSetHosts}/`; this._replicaSet = replicaSet; + this._enableSharding = + enableSharding !== undefined ? enableSharding : false; this._logger = logger; this._oplogNsRegExp = new RegExp(`^${database}\\.`); // oplog collection @@ -36,10 +39,15 @@ class LogConsumer { * @return {undefined} */ connectMongo(done) { - MongoClient.connect(this._mongoUrl, { - replicaSet: this._replicaSet, + const options = { useNewUrlParser: true, - }, + }; + if (!this._enableSharding) { + // XXX real fix is with change streams + options.replicaSet = this._replicaSet; + } + MongoClient.connect(this._mongoUrl, + options, (err, client) => { if (err) { this._logger.error('Unable to connect to MongoDB', diff --git a/lib/storage/metadata/mongoclient/MongoClientInterface.js b/lib/storage/metadata/mongoclient/MongoClientInterface.js index 4eb0ea9ba..86b97d1d5 100644 --- a/lib/storage/metadata/mongoclient/MongoClientInterface.js +++ b/lib/storage/metadata/mongoclient/MongoClientInterface.js @@ -91,11 +91,15 @@ function generatePHDVersion(versionId) { class MongoClientInterface { constructor(params) { const { replicaSetHosts, writeConcern, replicaSet, readPreference, path, - database, logger, replicationGroupId, authCredentials, + database, enableSharding, logger, + replicationGroupId, authCredentials, isLocationTransient } = params; const cred = MongoUtils.credPrefix(authCredentials); + this._enableSharding = + enableSharding !== undefined ? enableSharding : false; this.mongoUrl = `mongodb://${cred}${replicaSetHosts}/` + - `?w=${writeConcern}&replicaSet=${replicaSet}` + + `?w=${writeConcern}` + + `${!this._enableSharding && !!replicaSet ? `&replicaSet=${replicaSet}` : ''}` + `&readPreference=${readPreference}`; this.logger = logger; this.client = null; @@ -142,6 +146,7 @@ class MongoClientInterface { this.db = client.db(this.database, { ignoreUndefined: true, }); + this.adminDb = client.db('admin'); return this.usersBucketHack(cb); }); } @@ -219,6 +224,21 @@ class MongoClientInterface { { error: err.message }); return cb(errors.InternalError); } + if (this._enableSharding) { + const cmd = { + shardCollection: `${this.database}.${bucketName}`, + key: { _id: 1 }, + }; + return this.adminDb.command(cmd, {}, err => { + if (err) { + log.error( + 'createBucket: enabling sharding', + { error: err.message }); + return cb(errors.InternalError); + } + return cb(); + }); + } return cb(); }); }