Skip to content

Commit 48ef858

Browse files
committed
bf: search with aggregate instead of find
The search was using find().sort() and was disrupting user defined search queries and custom indexes. The sort() is needed to implement a stateless paging system. The combo of user defined query and sort is now implemented with a 2 stage aggregate on server side. We always limit the execution time maxTimeMs to 5mn (tunable by an environment variable). The result is staged in a temporary bucket and cached for paging. We rely on an external job to cleanup the searches (e.g. daily).
1 parent 78d6263 commit 48ef858

File tree

2 files changed

+127
-10
lines changed

2 files changed

+127
-10
lines changed

lib/storage/metadata/mongoclient/MongoClientInterface.js

+124-10
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* We use proper atomic operations when needed.
1111
*/
1212
const async = require('async');
13+
const crypto = require('crypto');
1314

1415
const constants = require('../../../constants');
1516

@@ -39,6 +40,10 @@ const CONNECT_TIMEOUT_MS = 5000;
3940
// MongoDB default
4041
const SOCKET_TIMEOUT_MS = 360000;
4142
const CONCURRENT_CURSORS = 10;
43+
// Search
44+
const MAX_TIME_MS = 300000;
45+
const TEMP_SEARCH_PREFIX = '__temp_search';
46+
const SEARCH_PREFIX = '__search';
4247

4348
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
4449

@@ -109,6 +114,11 @@ class MongoClientInterface {
109114
!Number.isNaN(process.env.CONCURRENT_CURSORS))
110115
? Number.parseInt(process.env.CONCURRENT_CURSORS, 10)
111116
: CONCURRENT_CURSORS;
117+
118+
this.maxTimeMs = (process.env.MAX_TIME_MS &&
119+
!Number.isNaN(process.env.MAX_TIME_MS))
120+
? Number.parseInt(process.env.MAX_TIME_MS, 10)
121+
: MAX_TIME_MS;
112122
}
113123

114124
setup(cb) {
@@ -939,9 +949,10 @@ class MongoClientInterface {
939949
params, log, cb);
940950
}
941951

942-
internalListObject(bucketName, params, extension, log, cb) {
943-
const c = this.getCollection(bucketName);
944-
const stream = new MongoReadStream(c, params, params.mongifiedSearch);
952+
internalListObject(c, params, extension, log, cb) {
953+
// eslint-disable-next-line
954+
params.maxTimeMs = this.maxTimeMs;
955+
const stream = new MongoReadStream(c, params, params.searchOptions);
945956
const skip = new Skip({
946957
extension,
947958
gte: params.gte,
@@ -963,7 +974,7 @@ class MongoClientInterface {
963974
newParams.gte = range;
964975

965976
// then continue listing the next key range
966-
this.internalListObject(bucketName, newParams, extension, log, cb);
977+
this.internalListObject(c, newParams, extension, log, cb);
967978
});
968979

969980
stream
@@ -993,22 +1004,125 @@ class MongoClientInterface {
9931004
return undefined;
9941005
}
9951006

1007+
/*
1008+
* Execute the user-defined query in a stage then sort it for
1009+
* stateless paging. The output is stored in a temporary
1010+
* collection in a special namespace that will be periodically
1011+
* erased (e.g. once a day).
1012+
*
1013+
* All search queries are bounded by MAX_TIME_MS env (default is
1014+
* 5mn).
1015+
*/
1016+
doSearch(c, searchCollection, params, extension, searchOptions, log, cb) {
1017+
// use temp name to avoid races
1018+
const tempCollection =
1019+
TEMP_SEARCH_PREFIX +
1020+
crypto.randomBytes(16).toString('hex');
1021+
const _cursor = c.aggregate([
1022+
{ $match: searchOptions }, // user query
1023+
{ $out: tempCollection }, // a job will clean it up
1024+
],
1025+
{
1026+
maxTimeMs: this.maxTimeMs, // always limit
1027+
allowDiskUse: true, // stage large queries on disk
1028+
},
1029+
null);
1030+
_cursor.toArray(err => {
1031+
if (err) {
1032+
log.error('doSearch: error', {
1033+
error: err.message,
1034+
});
1035+
return cb(err);
1036+
}
1037+
// final rename
1038+
this.db.renameCollection(
1039+
tempCollection,
1040+
searchCollection,
1041+
{
1042+
dropTarget: true,
1043+
},
1044+
err => {
1045+
if (err) {
1046+
log.error('doSearch: renaming', {
1047+
error: err.message,
1048+
tempCollection,
1049+
searchCollection,
1050+
});
1051+
return cb(err);
1052+
}
1053+
log.info('doSearch: done', {
1054+
searchCollection,
1055+
});
1056+
return undefined;
1057+
});
1058+
// fallthrough
1059+
// eslint-disable-next-line
1060+
params.searchOptions = searchOptions;
1061+
return this.internalListObject(
1062+
c,
1063+
params, extension,
1064+
log, cb);
1065+
});
1066+
}
1067+
1068+
/*
1069+
* Check if the used defined query has been cached otherwise
1070+
* launch the search
1071+
*/
1072+
prepareSearch(bucketName, params, extension, searchOptions, log, cb) {
1073+
const c = this.getCollection(bucketName);
1074+
// generate the search collection name
1075+
const searchCollection =
1076+
SEARCH_PREFIX +
1077+
crypto.createHash('md5').
1078+
update(JSON.stringify(searchOptions)).
1079+
digest('hex');
1080+
this.db.listCollections({
1081+
name: searchCollection,
1082+
}).toArray((err, items) => {
1083+
if (err) {
1084+
log.error('prepareSearch: listing collection', {
1085+
error: err.message,
1086+
});
1087+
return cb(err);
1088+
}
1089+
if (items.length > 0) {
1090+
// result is cached
1091+
return this.internalListObject(
1092+
this.db.collection(searchCollection),
1093+
params, extension,
1094+
log, cb);
1095+
}
1096+
return this.doSearch(
1097+
c, searchCollection,
1098+
params, extension, searchOptions,
1099+
log, cb);
1100+
});
1101+
}
1102+
9961103
listObject(bucketName, params, log, cb) {
9971104
const extName = params.listingType;
9981105
const extension = new listAlgos[extName](params, log);
9991106
const internalParams = extension.genMDParams();
1000-
internalParams.mongifiedSearch = params.mongifiedSearch;
1001-
return this.internalListObject(bucketName, internalParams, extension,
1002-
log, cb);
1107+
if (params.mongifiedSearch) {
1108+
return this.prepareSearch(
1109+
bucketName, internalParams, extension,
1110+
params.mongifiedSearch, log, cb);
1111+
}
1112+
return this.internalListObject(
1113+
this.getCollection(bucketName),
1114+
internalParams, extension,
1115+
log, cb);
10031116
}
10041117

10051118
listMultipartUploads(bucketName, params, log, cb) {
10061119
const extName = params.listingType;
10071120
const extension = new listAlgos[extName](params, log);
10081121
const internalParams = extension.genMDParams();
1009-
internalParams.mongifiedSearch = params.mongifiedSearch;
1010-
return this.internalListObject(bucketName, internalParams, extension,
1011-
log, cb);
1122+
return this.internalListObject(
1123+
this.getCollection(bucketName),
1124+
internalParams, extension,
1125+
log, cb);
10121126
}
10131127

10141128
checkHealth(implName, log, cb) {

lib/storage/metadata/mongoclient/readStream.js

+3
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ class MongoReadStream extends Readable {
6969
if (options.limit && options.limit !== -1) {
7070
this._cursor = this._cursor.limit(options.limit);
7171
}
72+
if (options.maxTimeMs && options.maxTimeMs !== -1) {
73+
this._cursor = this._cursor.maxTimeMS(options.maxTimeMs);
74+
}
7275
this._options = options;
7376
this._destroyed = false;
7477
this.on('end', this._cleanup.bind(this));

0 commit comments

Comments
 (0)