Skip to content

Commit 6401dc7

Browse files
committed
bf: search with aggregate instead of find
The search was using find().sort() and was disrupting user defined search queries and custom indexes. The sort() is needed to implement a stateless paging system. The combo of user defined query and sort is now implemented with a 2 stage aggregate on server side. We always limit the execution time maxTimeMs to 5mn (tunable by an environment variable). The result is staged in a temporary bucket and cached for paging. We rely on an external job to cleanup the searches (e.g. daily).
1 parent 78d6263 commit 6401dc7

File tree

2 files changed

+96
-15
lines changed

2 files changed

+96
-15
lines changed

lib/storage/metadata/mongoclient/MongoClientInterface.js

Lines changed: 95 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* We use proper atomic operations when needed.
1111
*/
1212
const async = require('async');
13+
const crypto = require('crypto');
1314

1415
const constants = require('../../../constants');
1516

@@ -39,6 +40,9 @@ const CONNECT_TIMEOUT_MS = 5000;
3940
// MongoDB default
4041
const SOCKET_TIMEOUT_MS = 360000;
4142
const CONCURRENT_CURSORS = 10;
43+
// Search
44+
const MAX_TIME_MS = 300000;
45+
const SEARCH_PREFIX = '__search';
4246

4347
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
4448

@@ -109,6 +113,11 @@ class MongoClientInterface {
109113
!Number.isNaN(process.env.CONCURRENT_CURSORS))
110114
? Number.parseInt(process.env.CONCURRENT_CURSORS, 10)
111115
: CONCURRENT_CURSORS;
116+
117+
this.maxTimeMs = (process.env.MAX_TIME_MS &&
118+
!Number.isNaN(process.env.MAX_TIME_MS))
119+
? Number.parseInt(process.env.MAX_TIME_MS, 10)
120+
: MAX_TIME_MS;
112121
}
113122

114123
setup(cb) {
@@ -939,9 +948,9 @@ class MongoClientInterface {
939948
params, log, cb);
940949
}
941950

942-
internalListObject(bucketName, params, extension, log, cb) {
943-
const c = this.getCollection(bucketName);
944-
const stream = new MongoReadStream(c, params, params.mongifiedSearch);
951+
internalListObject(c, params, extension, log, cb) {
952+
const stream =
953+
new MongoReadStream(c, params);
945954
const skip = new Skip({
946955
extension,
947956
gte: params.gte,
@@ -963,7 +972,7 @@ class MongoClientInterface {
963972
newParams.gte = range;
964973

965974
// then continue listing the next key range
966-
this.internalListObject(bucketName, newParams, extension, log, cb);
975+
this.internalListObject(c, newParams, extension, log, cb);
967976
});
968977

969978
stream
@@ -993,22 +1002,98 @@ class MongoClientInterface {
9931002
return undefined;
9941003
}
9951004

1005+
/*
1006+
* Execute the user-defined query in a stage then sort it for
1007+
* stateless paging. The output is stored in a temporary
1008+
* collection in a special namespace that will be periodically
1009+
* erased (e.g. once a day).
1010+
*
1011+
* All search queries are bounded by MAX_TIME_MS env (default is
1012+
* 5mn).
1013+
*/
1014+
doSearch(c, tempCollection, params, extension, searchOptions, log, cb) {
1015+
const _cursor = c.aggregate([
1016+
{ $match: searchOptions }, // user query
1017+
{ $sort: { _id: 1 } }, // needed for paging
1018+
{ $out: tempCollection }, // a job will clean it up
1019+
],
1020+
{
1021+
maxTimeMs: this.maxTimeMs, // always limit
1022+
allowDiskUse: true, // stage large queries on disk
1023+
},
1024+
null);
1025+
_cursor.toArray(err => {
1026+
if (err) {
1027+
log.error('doSearch: error', {
1028+
error: err.message,
1029+
});
1030+
return cb(err);
1031+
}
1032+
return this.internalListObject(
1033+
this.db.collection(tempCollection),
1034+
params, extension,
1035+
log, cb);
1036+
});
1037+
}
1038+
1039+
/*
1040+
* Check if the used defined query has been cached otherwise
1041+
* launch the search
1042+
*/
1043+
prepareSearch(bucketName, params, extension, searchOptions, log, cb) {
1044+
const c = this.getCollection(bucketName);
1045+
// generate the temp collection name
1046+
const tempCollection =
1047+
SEARCH_PREFIX +
1048+
crypto.createHash('md5').
1049+
update(JSON.stringify(searchOptions)).
1050+
digest('hex');
1051+
this.db.listCollections({
1052+
name: tempCollection,
1053+
}).toArray((err, items) => {
1054+
if (err) {
1055+
log.error('prepareSearch: listing collection', {
1056+
error: err.message,
1057+
});
1058+
return cb(err);
1059+
}
1060+
if (items.length > 0) {
1061+
// result is cached
1062+
return this.internalListObject(
1063+
this.db.collection(tempCollection),
1064+
params, extension,
1065+
log, cb);
1066+
}
1067+
return this.doSearch(
1068+
c, tempCollection,
1069+
params, extension, searchOptions,
1070+
log, cb);
1071+
});
1072+
}
1073+
9961074
listObject(bucketName, params, log, cb) {
9971075
const extName = params.listingType;
9981076
const extension = new listAlgos[extName](params, log);
9991077
const internalParams = extension.genMDParams();
1000-
internalParams.mongifiedSearch = params.mongifiedSearch;
1001-
return this.internalListObject(bucketName, internalParams, extension,
1002-
log, cb);
1078+
if (params.mongifiedSearch) {
1079+
return this.prepareSearch(
1080+
bucketName, internalParams, extension,
1081+
params.mongifiedSearch, log, cb);
1082+
}
1083+
return this.internalListObject(
1084+
this.getCollection(bucketName),
1085+
internalParams, extension,
1086+
log, cb);
10031087
}
10041088

10051089
listMultipartUploads(bucketName, params, log, cb) {
10061090
const extName = params.listingType;
10071091
const extension = new listAlgos[extName](params, log);
10081092
const internalParams = extension.genMDParams();
1009-
internalParams.mongifiedSearch = params.mongifiedSearch;
1010-
return this.internalListObject(bucketName, internalParams, extension,
1011-
log, cb);
1093+
return this.internalListObject(
1094+
this.getCollection(bucketName),
1095+
internalParams, extension,
1096+
log, cb);
10121097
}
10131098

10141099
checkHealth(implName, log, cb) {

lib/storage/metadata/mongoclient/readStream.js

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ const Readable = require('stream').Readable;
22
const MongoUtils = require('./utils');
33

44
class MongoReadStream extends Readable {
5-
constructor(c, options, searchOptions) {
5+
constructor(c, options) {
66
super({
77
objectMode: true,
88
highWaterMark: 0,
@@ -59,10 +59,6 @@ class MongoReadStream extends Readable {
5959
delete query._id;
6060
}
6161

62-
if (searchOptions) {
63-
Object.assign(query, searchOptions);
64-
}
65-
6662
this._cursor = c.find(query).sort({
6763
_id: options.reverse ? -1 : 1,
6864
});

0 commit comments

Comments
 (0)