Skip to content

Commit f6de7b0

Browse files
committed
bf: search with aggregate instead of find
The search was using find().sort() and was disrupting user defined search queries and custom indexes. The sort() is needed to implement a stateless paging system. The combo of user defined query and sort is now implemented with a 2 stage aggregate on server side. We always limit the execution time maxTimeMs to 5mn (tunable by an environment variable).
1 parent fc23f68 commit f6de7b0

File tree

2 files changed

+22
-5
lines changed

2 files changed

+22
-5
lines changed

lib/storage/metadata/mongoclient/MongoClientInterface.js

+8
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ const CONNECT_TIMEOUT_MS = 5000;
3939
// MongoDB default
4040
const SOCKET_TIMEOUT_MS = 360000;
4141
const CONCURRENT_CURSORS = 10;
42+
// Search
43+
const MAX_TIME_MS = 300000;
4244

4345
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
4446

@@ -109,6 +111,11 @@ class MongoClientInterface {
109111
!Number.isNaN(process.env.CONCURRENT_CURSORS))
110112
? Number.parseInt(process.env.CONCURRENT_CURSORS, 10)
111113
: CONCURRENT_CURSORS;
114+
115+
this.maxTimeMs = (process.env.MAX_TIME_MS &&
116+
!Number.isNaN(process.env.MAX_TIME_MS))
117+
? Number.parseInt(process.env.MAX_TIME_MS, 10)
118+
: MAX_TIME_MS;
112119
}
113120

114121
setup(cb) {
@@ -941,6 +948,7 @@ class MongoClientInterface {
941948

942949
internalListObject(bucketName, params, extension, log, cb) {
943950
const c = this.getCollection(bucketName);
951+
Object.assign(params, { maxTimeMs: this.maxTimeMs });
944952
const stream = new MongoReadStream(c, params, params.mongifiedSearch);
945953
const skip = new Skip({
946954
extension,

lib/storage/metadata/mongoclient/readStream.js

+14-5
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,21 @@ class MongoReadStream extends Readable {
6060
}
6161

6262
if (searchOptions) {
63-
Object.assign(query, searchOptions);
63+
this._cursor = c.aggregate([
64+
{ $match: searchOptions }, // user query
65+
{ $match: query }, // for paging
66+
],
67+
{
68+
cursor: {}, // use default batchSize
69+
maxTimeMs: options.maxTimeMs, // always limit
70+
allowDiskUse: true, // stage large queries on disk
71+
},
72+
null); // will return a cursor
73+
} else {
74+
this._cursor = c.find(query).sort({
75+
_id: options.reverse ? -1 : 1,
76+
});
6477
}
65-
66-
this._cursor = c.find(query).sort({
67-
_id: options.reverse ? -1 : 1,
68-
});
6978
if (options.limit && options.limit !== -1) {
7079
this._cursor = this._cursor.limit(options.limit);
7180
}

0 commit comments

Comments
 (0)