diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index f17d4e01d6..2d9fe8a23c 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -518,7 +518,7 @@ class NamespaceFS { // allocate or reuse buffer const remain_size = Math.max(0, end - pos); - const { buffer, callback } = await buffers_pool.get_buffer(remain_size); + const { buffer, callback } = await buffers_pool.get_buffer(); buffer_pool_cleanup = callback; // read from file @@ -637,6 +637,8 @@ class NamespaceFS { fs_xattr = await this._copy_stream(source_file_path, upload_path, fs_account_config, fs_xattr); } } else { + // TODO: Take up only as much as we need (requires fine-tune of the semaphore inside the _upload_stream) + // Currently we are taking config.NSFS_BUF_SIZE for any sized upload (1KB upload will take a full buffer from semaphore) fs_xattr = await buffers_pool_sem.surround_count( config.NSFS_BUF_SIZE, async () => this._upload_stream(params.source_stream, upload_path, fs_account_config, object_sdk.rpc_client, fs_xattr) @@ -689,7 +691,7 @@ class NamespaceFS { //Reading the source_file and writing into the target_file let read_pos = 0; for (;;) { - const { buffer, callback } = await buffers_pool.get_buffer(config.NSFS_BUF_SIZE); + const { buffer, callback } = await buffers_pool.get_buffer(); buffer_pool_cleanup = callback; const bytesRead = await source_file.read(fs_account_config, buffer, 0, config.NSFS_BUF_SIZE, read_pos); if (!bytesRead) { @@ -891,7 +893,7 @@ class NamespaceFS { } let read_pos = 0; for (;;) { - const { buffer, callback } = await buffers_pool.get_buffer(config.NSFS_BUF_SIZE); + const { buffer, callback } = await buffers_pool.get_buffer(); buffer_pool_cleanup = callback; const bytesRead = await read_file.read(fs_account_config, buffer, 0, config.NSFS_BUF_SIZE, read_pos); if (!bytesRead) { diff --git a/src/test/unit_tests/index.js b/src/test/unit_tests/index.js index 4d5d9e8c69..9004626fe7 100644 --- a/src/test/unit_tests/index.js +++ b/src/test/unit_tests/index.js @@ -30,6 +30,7 @@ require('./test_kmeans'); require('./test_sensitive_wrapper'); // require('./test_debug_module'); require('./test_range_stream'); +require('./test_buffer_pool'); // // STORES require('./test_md_store'); diff --git a/src/test/unit_tests/test_buffer_pool.js b/src/test/unit_tests/test_buffer_pool.js new file mode 100644 index 0000000000..b184db0a1d --- /dev/null +++ b/src/test/unit_tests/test_buffer_pool.js @@ -0,0 +1,47 @@ +/* Copyright (C) 2016 NooBaa */ +'use strict'; +const mocha = require('mocha'); +const assert = require('assert'); +const buffer_utils = require('../../util/buffer_utils'); +const Semaphore = require('../../util/semaphore'); + +mocha.describe('Test buffers pool', function() { + + mocha.it('Work parallel with buf_size and respect semaphore', async function() { + const SEM_LIMIT = 64 * 1024 * 1024; + const BUF_LIMIT = 8 * 1024 * 1024; + const MAX_POOL_ALLOWED = SEM_LIMIT / BUF_LIMIT; + const SLEEP_BEFORE_RELEASE = 264; + const buffers_pool_sem = new Semaphore(SEM_LIMIT); + const buffers_pool = new buffer_utils.BuffersPool({ + buf_size: BUF_LIMIT, + sem: buffers_pool_sem, + warning_timeout: false + }); + // Allocate the buffers in a lazy fashion and verify + const lazy_fill = new Array(MAX_POOL_ALLOWED).fill(0); + const from_pool_fill = new Array(MAX_POOL_ALLOWED * 2).fill(0); + const lazy_buffer_allocation = lazy_fill.map(async () => { + const { buffer, callback } = await buffers_pool.get_buffer(); + await new Promise((resolve, reject) => setTimeout(resolve, SLEEP_BEFORE_RELEASE)); + console.log('Lazy allocation', buffer.length, buffers_pool.buffers.length, buffers_pool.sem.value); + callback(); + }); + const lazy_buffers = await Promise.all(lazy_buffer_allocation); + assert(lazy_buffers.length === MAX_POOL_ALLOWED, 'Allocated more buffers than requested'); + assert(buffers_pool.buffers.length === MAX_POOL_ALLOWED, 'Buffer pool allocated more than semaphore allows'); + assert(buffers_pool.sem.value === SEM_LIMIT, 'Smepahore did not deallocate after buffers release'); + // Re-use buffer pool and verify that we do not allocate new buffers and respect the semaphore + const from_pool_allocation = from_pool_fill.map(async () => { + const { buffer, callback } = await buffers_pool.get_buffer(); + await new Promise((resolve, reject) => setTimeout(resolve, SLEEP_BEFORE_RELEASE)); + console.log('From pool allocations', buffer.length, buffers_pool.buffers.length, buffers_pool.sem.value); + callback(); + }); + const from_pool_buffers = await Promise.all(from_pool_allocation); + assert(from_pool_buffers.length === MAX_POOL_ALLOWED * 2, 'Allocated more buffers than requested'); + assert(buffers_pool.buffers.length === MAX_POOL_ALLOWED, 'Buffer pool allocated more than semaphore allows'); + assert(buffers_pool.sem.value === SEM_LIMIT, 'Smepahore did not deallocate after buffers release'); + }); + +}); diff --git a/src/util/buffer_utils.js b/src/util/buffer_utils.js index 7a5a5cf314..2078fa8a5c 100644 --- a/src/util/buffer_utils.js +++ b/src/util/buffer_utils.js @@ -184,29 +184,22 @@ class BuffersPool { } /** - * @param {number} len * @returns {Promise<{ * buffer: Buffer, * callback: () => void, * }>} */ - async get_buffer(len) { + async get_buffer() { dbg.log1('BufferPool.get_buffer', this); let buffer = null; - let should_release = 0; - let should_pool = false; let warning_timer; - if (len < this.buf_size / 4) { - await this.sem.wait(len); - should_release = len; - buffer = Buffer.allocUnsafe(len); - } else if (this.buffers.length) { + // Lazy allocation of buffers pool, first cycle will take up buffers + // Will cause semaphore to be empty with actual buffers allocated and waiting to be used + // Any buffer that is in usage (allocated from this.buffers) will be accounted in the semaphore + await this.sem.wait(this.buf_size); + if (this.buffers.length) { buffer = this.buffers.shift(); - should_pool = true; } else { - await this.sem.wait(this.buf_size); - should_release = this.buf_size; - should_pool = true; buffer = Buffer.allocUnsafeSlow(this.buf_size); } if (this.warning_timeout) { @@ -218,14 +211,8 @@ class BuffersPool { } const callback = () => { if (warning_timer) clearTimeout(warning_timer); - if (should_release) { - this.sem.release(should_release); - should_release = 0; - } - if (should_pool) { - this.buffers.push(buffer); - should_pool = false; - } + this.buffers.push(buffer); + this.sem.release(this.buf_size); }; return { buffer, callback }; }