Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bf: search with aggregate instead of find #1213

Open
wants to merge 1 commit into
base: development/8.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 124 additions & 10 deletions lib/storage/metadata/mongoclient/MongoClientInterface.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* We use proper atomic operations when needed.
*/
const async = require('async');
const crypto = require('crypto');

const constants = require('../../../constants');

Expand Down Expand Up @@ -39,6 +40,10 @@ const CONNECT_TIMEOUT_MS = 5000;
// MongoDB default
const SOCKET_TIMEOUT_MS = 360000;
const CONCURRENT_CURSORS = 10;
// Search
const MAX_TIME_MS = 300000;
const TEMP_SEARCH_PREFIX = '__temp_search';
const SEARCH_PREFIX = '__search';

const initialInstanceID = process.env.INITIAL_INSTANCE_ID;

Expand Down Expand Up @@ -109,6 +114,11 @@ class MongoClientInterface {
!Number.isNaN(process.env.CONCURRENT_CURSORS))
? Number.parseInt(process.env.CONCURRENT_CURSORS, 10)
: CONCURRENT_CURSORS;

this.maxTimeMs = (process.env.MAX_TIME_MS &&
!Number.isNaN(process.env.MAX_TIME_MS))
? Number.parseInt(process.env.MAX_TIME_MS, 10)
: MAX_TIME_MS;
}

setup(cb) {
Expand Down Expand Up @@ -939,9 +949,8 @@ class MongoClientInterface {
params, log, cb);
}

internalListObject(bucketName, params, extension, log, cb) {
const c = this.getCollection(bucketName);
const stream = new MongoReadStream(c, params, params.mongifiedSearch);
internalListObject(c, params, extension, log, cb) {
const stream = new MongoReadStream(c, params);
const skip = new Skip({
extension,
gte: params.gte,
Expand All @@ -963,7 +972,7 @@ class MongoClientInterface {
newParams.gte = range;

// then continue listing the next key range
this.internalListObject(bucketName, newParams, extension, log, cb);
this.internalListObject(c, newParams, extension, log, cb);
});

stream
Expand Down Expand Up @@ -993,22 +1002,127 @@ class MongoClientInterface {
return undefined;
}

/*
* Execute the user-defined query in a stage then sort it for
* stateless paging. The output is stored in a temporary
* collection in a special namespace that will be periodically
* erased (e.g. once a day).
*
* All search queries are bounded by MAX_TIME_MS env (default is
* 5mn).
*/
doSearch(c, searchCollection, params, extension, searchOptions, log, cb) {
// use temp name to avoid races
const tempCollection =
TEMP_SEARCH_PREFIX +
crypto.randomBytes(16).toString('hex');
log.info('doSearch: launching aggregate', {
searchCollection,
});
const _cursor = c.aggregate([
{ $match: searchOptions }, // user query
{ $out: tempCollection }, // a job will clean it up
],
{
maxTimeMs: this.maxTimeMs, // always limit
allowDiskUse: true, // stage large queries on disk
},
null);
_cursor.toArray(err => {
Copy link
Contributor

@alexanderchan-scality alexanderchan-scality Jul 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can possibly lead to out of memory if the aggregate results becomes too large

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No the result is empty because there is a $out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But one problem of this approach is that it returns nothing until the query is done. This is weird from the API client perspective. Any suggestion ?

Copy link
Collaborator

@rahulreddy rahulreddy Jul 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's streaming the output, may be use a combined approach of streaming 1000 documents back to the client and starting a background async job that writes to the temporary collection
https://mongodb.github.io/node-mongodb-native/3.6/reference/cursors/#stream-api
It sounds complex and dirty, ideally I would delegate the task to some other worker but I can't see how.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we could to basically the user-query in an aggregate AND in a find() at the same time. But it will execute the query twice... just to avoid sessions, that's a bit too much...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the $sort here is also killing the query plan, and the good news is ... no need to do the $sort because in the $out collection the keys will already be sorted...

if (err) {
log.error('doSearch: error', {
error: err.message,
});
return cb(err);
}
// final rename
this.db.renameCollection(
tempCollection,
searchCollection,
{
dropTarget: true,
},
err => {
if (err) {
log.error('doSearch: renaming', {
error: err.message,
tempCollection,
searchCollection,
});
return cb(err);
}
log.info('doSearch: aggregate done', {
searchCollection,
});
return this.internalListObject(
this.db.collection(searchCollection),
params, extension,
log, cb);
});
return undefined;
});
}

/*
* Check if the used defined query has been cached otherwise
* launch the search
*/
prepareSearch(bucketName, params, extension, searchOptions, log, cb) {
const c = this.getCollection(bucketName);
// generate the search collection name
const searchCollection =
SEARCH_PREFIX +
crypto.createHash('md5').
update(JSON.stringify(searchOptions)).
digest('hex');
this.db.listCollections({
name: searchCollection,
}).toArray((err, items) => {
if (err) {
log.error('prepareSearch: listing collection', {
error: err.message,
});
return cb(err);
}
if (items.length > 0) {
log.info('prepareSearch: using cache', {
searchCollection,
});
return this.internalListObject(
this.db.collection(searchCollection),
params, extension,
log, cb);
}
return this.doSearch(
c, searchCollection,
params, extension, searchOptions,
log, cb);
});
}

listObject(bucketName, params, log, cb) {
const extName = params.listingType;
const extension = new listAlgos[extName](params, log);
const internalParams = extension.genMDParams();
internalParams.mongifiedSearch = params.mongifiedSearch;
return this.internalListObject(bucketName, internalParams, extension,
log, cb);
if (params.mongifiedSearch) {
return this.prepareSearch(
bucketName, internalParams, extension,
params.mongifiedSearch, log, cb);
}
return this.internalListObject(
this.getCollection(bucketName),
internalParams, extension,
log, cb);
}

listMultipartUploads(bucketName, params, log, cb) {
const extName = params.listingType;
const extension = new listAlgos[extName](params, log);
const internalParams = extension.genMDParams();
internalParams.mongifiedSearch = params.mongifiedSearch;
return this.internalListObject(bucketName, internalParams, extension,
log, cb);
return this.internalListObject(
this.getCollection(bucketName),
internalParams, extension,
log, cb);
}

checkHealth(implName, log, cb) {
Expand Down
6 changes: 1 addition & 5 deletions lib/storage/metadata/mongoclient/readStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const Readable = require('stream').Readable;
const MongoUtils = require('./utils');

class MongoReadStream extends Readable {
constructor(c, options, searchOptions) {
constructor(c, options) {
super({
objectMode: true,
highWaterMark: 0,
Expand Down Expand Up @@ -59,10 +59,6 @@ class MongoReadStream extends Readable {
delete query._id;
}

if (searchOptions) {
Object.assign(query, searchOptions);
}

this._cursor = c.find(query).sort({
_id: options.reverse ? -1 : 1,
});
Expand Down