Skip to content

Commit

Permalink
Buffer pool over allocation fix
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeniy Belyi <[email protected]>
(cherry picked from commit 6b941a2)
  • Loading branch information
jeniawhite committed Nov 11, 2021
1 parent 180ba75 commit 3d110ff
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 24 deletions.
8 changes: 5 additions & 3 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ class NamespaceFS {

// allocate or reuse buffer
const remain_size = Math.max(0, end - pos);
const { buffer, callback } = await buffers_pool.get_buffer(remain_size);
const { buffer, callback } = await buffers_pool.get_buffer();
buffer_pool_cleanup = callback;

// read from file
Expand Down Expand Up @@ -637,6 +637,8 @@ class NamespaceFS {
fs_xattr = await this._copy_stream(source_file_path, upload_path, fs_account_config, fs_xattr);
}
} else {
// TODO: Take up only as much as we need (requires fine-tune of the semaphore inside the _upload_stream)
// Currently we are taking config.NSFS_BUF_SIZE for any sized upload (1KB upload will take a full buffer from semaphore)
fs_xattr = await buffers_pool_sem.surround_count(
config.NSFS_BUF_SIZE,
async () => this._upload_stream(params.source_stream, upload_path, fs_account_config, object_sdk.rpc_client, fs_xattr)
Expand Down Expand Up @@ -689,7 +691,7 @@ class NamespaceFS {
//Reading the source_file and writing into the target_file
let read_pos = 0;
for (;;) {
const { buffer, callback } = await buffers_pool.get_buffer(config.NSFS_BUF_SIZE);
const { buffer, callback } = await buffers_pool.get_buffer();
buffer_pool_cleanup = callback;
const bytesRead = await source_file.read(fs_account_config, buffer, 0, config.NSFS_BUF_SIZE, read_pos);
if (!bytesRead) {
Expand Down Expand Up @@ -891,7 +893,7 @@ class NamespaceFS {
}
let read_pos = 0;
for (;;) {
const { buffer, callback } = await buffers_pool.get_buffer(config.NSFS_BUF_SIZE);
const { buffer, callback } = await buffers_pool.get_buffer();
buffer_pool_cleanup = callback;
const bytesRead = await read_file.read(fs_account_config, buffer, 0, config.NSFS_BUF_SIZE, read_pos);
if (!bytesRead) {
Expand Down
1 change: 1 addition & 0 deletions src/test/unit_tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require('./test_kmeans');
require('./test_sensitive_wrapper');
// require('./test_debug_module');
require('./test_range_stream');
require('./test_buffer_pool');

// // STORES
require('./test_md_store');
Expand Down
47 changes: 47 additions & 0 deletions src/test/unit_tests/test_buffer_pool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/* Copyright (C) 2016 NooBaa */
'use strict';
const mocha = require('mocha');
const assert = require('assert');
const buffer_utils = require('../../util/buffer_utils');
const Semaphore = require('../../util/semaphore');

mocha.describe('Test buffers pool', function() {

mocha.it('Work parallel with buf_size and respect semaphore', async function() {
const SEM_LIMIT = 64 * 1024 * 1024;
const BUF_LIMIT = 8 * 1024 * 1024;
const MAX_POOL_ALLOWED = SEM_LIMIT / BUF_LIMIT;
const SLEEP_BEFORE_RELEASE = 264;
const buffers_pool_sem = new Semaphore(SEM_LIMIT);
const buffers_pool = new buffer_utils.BuffersPool({
buf_size: BUF_LIMIT,
sem: buffers_pool_sem,
warning_timeout: false
});
// Allocate the buffers in a lazy fashion and verify
const lazy_fill = new Array(MAX_POOL_ALLOWED).fill(0);
const from_pool_fill = new Array(MAX_POOL_ALLOWED * 2).fill(0);
const lazy_buffer_allocation = lazy_fill.map(async () => {
const { buffer, callback } = await buffers_pool.get_buffer();
await new Promise((resolve, reject) => setTimeout(resolve, SLEEP_BEFORE_RELEASE));
console.log('Lazy allocation', buffer.length, buffers_pool.buffers.length, buffers_pool.sem.value);
callback();
});
const lazy_buffers = await Promise.all(lazy_buffer_allocation);
assert(lazy_buffers.length === MAX_POOL_ALLOWED, 'Allocated more buffers than requested');
assert(buffers_pool.buffers.length === MAX_POOL_ALLOWED, 'Buffer pool allocated more than semaphore allows');
assert(buffers_pool.sem.value === SEM_LIMIT, 'Smepahore did not deallocate after buffers release');
// Re-use buffer pool and verify that we do not allocate new buffers and respect the semaphore
const from_pool_allocation = from_pool_fill.map(async () => {
const { buffer, callback } = await buffers_pool.get_buffer();
await new Promise((resolve, reject) => setTimeout(resolve, SLEEP_BEFORE_RELEASE));
console.log('From pool allocations', buffer.length, buffers_pool.buffers.length, buffers_pool.sem.value);
callback();
});
const from_pool_buffers = await Promise.all(from_pool_allocation);
assert(from_pool_buffers.length === MAX_POOL_ALLOWED * 2, 'Allocated more buffers than requested');
assert(buffers_pool.buffers.length === MAX_POOL_ALLOWED, 'Buffer pool allocated more than semaphore allows');
assert(buffers_pool.sem.value === SEM_LIMIT, 'Smepahore did not deallocate after buffers release');
});

});
29 changes: 8 additions & 21 deletions src/util/buffer_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,29 +184,22 @@ class BuffersPool {
}

/**
* @param {number} len
* @returns {Promise<{
* buffer: Buffer,
* callback: () => void,
* }>}
*/
async get_buffer(len) {
async get_buffer() {
dbg.log1('BufferPool.get_buffer', this);
let buffer = null;
let should_release = 0;
let should_pool = false;
let warning_timer;
if (len < this.buf_size / 4) {
await this.sem.wait(len);
should_release = len;
buffer = Buffer.allocUnsafe(len);
} else if (this.buffers.length) {
// Lazy allocation of buffers pool, first cycle will take up buffers
// Will cause semaphore to be empty with actual buffers allocated and waiting to be used
// Any buffer that is in usage (allocated from this.buffers) will be accounted in the semaphore
await this.sem.wait(this.buf_size);
if (this.buffers.length) {
buffer = this.buffers.shift();
should_pool = true;
} else {
await this.sem.wait(this.buf_size);
should_release = this.buf_size;
should_pool = true;
buffer = Buffer.allocUnsafeSlow(this.buf_size);
}
if (this.warning_timeout) {
Expand All @@ -218,14 +211,8 @@ class BuffersPool {
}
const callback = () => {
if (warning_timer) clearTimeout(warning_timer);
if (should_release) {
this.sem.release(should_release);
should_release = 0;
}
if (should_pool) {
this.buffers.push(buffer);
should_pool = false;
}
this.buffers.push(buffer);
this.sem.release(this.buf_size);
};
return { buffer, callback };
}
Expand Down

0 comments on commit 3d110ff

Please sign in to comment.