From 93fb1a0dc9a5c1342837427129a85d8d4d8e041b Mon Sep 17 00:00:00 2001 From: Aidan Daly <74743624+dalyaidan1@users.noreply.github.com> Date: Wed, 11 Sep 2024 09:23:33 -0400 Subject: [PATCH 1/6] chunking passthough --- index.js | 111 ++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 101 insertions(+), 10 deletions(-) diff --git a/index.js b/index.js index fb2a68a..0edd567 100644 --- a/index.js +++ b/index.js @@ -4,6 +4,7 @@ const AWS = require('aws-sdk'); const optionsFromArguments = require('./lib/optionsFromArguments'); +const stream = require('stream'); const awsCredentialsDeprecationNotice = function awsCredentialsDeprecationNotice() { // eslint-disable-next-line no-console @@ -92,12 +93,11 @@ class S3Adapter { return promise; } - // For a given config object, filename, and data, store a file in S3 + / // For a given config object, filename, and data, store a file in S3 // Returns a promise containing the S3 object creation response createFile(filename, data, contentType, options = {}) { const params = { Key: this._bucketPrefix + filename, - Body: data, }; if (this._generateKey instanceof Function) { @@ -128,14 +128,105 @@ class S3Adapter { const serializedTags = serialize(options.tags); params.Tagging = serializedTags; } - return this.createBucket().then(() => new Promise((resolve, reject) => { - this._s3Client.upload(params, (err, response) => { - if (err !== null) { - return reject(err); - } - return resolve(response); - }); - })); + + // if we are dealing with a blob, we need to handle it differently + // it could be over the V8 memory limit + if ( + typeof Blob !== 'undefined' && + data instanceof Blob + ) { + return this.createBucket().then(() => new Promise((resolve, reject) => { + const passThrough = new stream.PassThrough(); // Create a PassThrough stream + + // 100MB part size + const partSize = 1024 * 1024 * 100; + let buffer = Buffer.alloc(0); + let partNumber = 1; + const uploadPromises = []; + + // Initiate multipart upload + this._s3Client.createMultipartUpload(params, (err, multipart) => { + if (err) return reject(err); + + // Handle data chunking + passThrough.on('data', chunk => { + buffer = Buffer.concat([buffer, chunk]); + + // When buffer exceeds partSize, upload that part + while (buffer.length >= partSize) { + const partParams = { + Body: buffer.subarray(0, partSize), + Bucket: multipart.Bucket, + Key: multipart.Key, + PartNumber: partNumber, + UploadId: multipart.UploadId, + }; + + uploadPromises.push( + this._s3Client.uploadPart(partParams).promise() + ); + + buffer = buffer.subarray(partSize); // Remove the uploaded part from buffer + partNumber++; + } + }); + + passThrough.on('end', () => { + // Upload the remaining buffer as the last part + if (buffer.length > 0) { + const partParams = { + Body: buffer, + Bucket: multipart.Bucket, + Key: multipart.Key, + PartNumber: partNumber, + UploadId: multipart.UploadId, + }; + uploadPromises.push( + this._s3Client.uploadPart(partParams).promise() + ); + } + + // Complete multipart upload + Promise.all(uploadPromises) + .then(parts => { + const completeParams = { + Bucket: multipart.Bucket, + Key: multipart.Key, + MultipartUpload: { + Parts: parts.map((part, index) => ({ + ETag: part.ETag, + PartNumber: index + 1, + })), + }, + UploadId: multipart.UploadId, + }; + return this._s3Client.completeMultipartUpload(completeParams).promise(); + }) + .then(resolve) + .catch(err => { + this._s3Client.abortMultipartUpload({ Bucket: multipart.Bucket, Key: multipart.Key, UploadId: multipart.UploadId }); + reject(err); + }); + }); + + passThrough.on('error', err => { + this._s3Client.abortMultipartUpload({ Bucket: multipart.Bucket, Key: multipart.Key, UploadId: multipart.UploadId }); + reject(err); + }); + + // make the data a stream + let readableStream = data.stream(); + + // may come in as a web stream, so we need to convert it to a node stream + if (readableStream instanceof ReadableStream) { + readableStream = stream.Readable.fromWeb(readableStream); + } + + // Pipe the data to the PassThrough stream + readableStream.pipe(passThrough); + }); + })); + } } deleteFile(filename) { From c45a1aa6407202b64c1150c7d6658ec4f7c7adf9 Mon Sep 17 00:00:00 2001 From: Aidan Daly <74743624+dalyaidan1@users.noreply.github.com> Date: Wed, 11 Sep 2024 09:39:05 -0400 Subject: [PATCH 2/6] fix comment --- index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.js b/index.js index 0edd567..efbd366 100644 --- a/index.js +++ b/index.js @@ -93,7 +93,7 @@ class S3Adapter { return promise; } - / // For a given config object, filename, and data, store a file in S3 + // For a given config object, filename, and data, store a file in S3 // Returns a promise containing the S3 object creation response createFile(filename, data, contentType, options = {}) { const params = { From 5b9c6dcd535f9e32ba4029869ad03ee2eca697d6 Mon Sep 17 00:00:00 2001 From: Aidan Daly <74743624+dalyaidan1@users.noreply.github.com> Date: Wed, 11 Sep 2024 16:42:06 -0400 Subject: [PATCH 3/6] chunking that doesnt work --- index.js | 198 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 108 insertions(+), 90 deletions(-) diff --git a/index.js b/index.js index efbd366..aa77efe 100644 --- a/index.js +++ b/index.js @@ -37,6 +37,39 @@ function buildDirectAccessUrl(baseUrl, baseUrlFileKey, presignedUrl, config, fil return directAccessUrl; } +class Chunker extends stream.Transform { + constructor(chunkSize) { + super(); + this.chunkSize = chunkSize; + this.currentSize = 0; // Track current chunk size + this.chunks = []; // Store chunks + } + + _transform(chunk, encoding, callback) { + this.currentSize += chunk.length; + this.chunks.push(chunk); + + // If accumulated chunks reach the chunkSize, flush them + if (this.currentSize >= this.chunkSize) { + const fullChunk = Buffer.concat(this.chunks, this.currentSize); + this.push(fullChunk); // Push the full chunk + this.chunks = []; // Reset chunk storage + this.currentSize = 0; + } + + callback(); + } + + _flush(callback) { + // Handle any remaining data that's smaller than chunkSize + if (this.currentSize > 0) { + const remainingChunk = Buffer.concat(this.chunks, this.currentSize); + this.push(remainingChunk); + } + callback(); + } +} + class S3Adapter { // Creates an S3 session. // Providing AWS access, secret keys and bucket are mandatory @@ -93,6 +126,80 @@ class S3Adapter { return promise; } + + _handleCreateLargeBlob(params, data) { + return this.createBucket().then(() => new Promise((resolve, reject) => { + // Initiate multipart upload + this._s3Client.createMultipartUpload(params, (err, multipart) => { + if (err) return reject(err); + + const ONE_HUNDRED_MB = 1024 * 1024 * 100; + const chunker = new Chunker(ONE_HUNDRED_MB); + let partNumber = 1; + const uploadPromises = []; + + + // Handle data chunking + chunker.on('data', chunk => { + // When buffer exceeds partSize, upload that part + const partParams = { + Body: chunk, + Bucket: multipart.Bucket, + Key: multipart.Key, + PartNumber: partNumber, + UploadId: multipart.UploadId, + }; + + uploadPromises.push( + this._s3Client.uploadPart(partParams).promise() + ); + + partNumber++; + + }); + + chunker.on('error', err => { + this._s3Client.abortMultipartUpload({ Bucket: multipart.Bucket, Key: multipart.Key, UploadId: multipart.UploadId }); + reject(err); + }); + + + // Complete multipart upload + Promise.all(uploadPromises) + .then(parts => { + const completeParams = { + Bucket: multipart.Bucket, + Key: multipart.Key, + MultipartUpload: { + Parts: parts.map((part, index) => ({ + ETag: part.ETag, + PartNumber: index + 1, + })), + }, + UploadId: multipart.UploadId, + }; + return this._s3Client.completeMultipartUpload(completeParams).promise(); + }) + .then(resolve) + .catch(err => { + this._s3Client.abortMultipartUpload({ Bucket: multipart.Bucket, Key: multipart.Key, UploadId: multipart.UploadId }); + reject(err); + }); + + // make the data a stream + let readableStream = data.stream(); + + // may come in as a web stream, so we need to convert it to a node stream + if (readableStream instanceof ReadableStream) { + readableStream = stream.Readable.fromWeb(readableStream); + } + + // Pipe the data to the PassThrough stream + readableStream.pipe(chunker); + }); + })); + } + // For a given config object, filename, and data, store a file in S3 // Returns a promise containing the S3 object creation response createFile(filename, data, contentType, options = {}) { @@ -135,97 +242,8 @@ class S3Adapter { typeof Blob !== 'undefined' && data instanceof Blob ) { - return this.createBucket().then(() => new Promise((resolve, reject) => { - const passThrough = new stream.PassThrough(); // Create a PassThrough stream - - // 100MB part size - const partSize = 1024 * 1024 * 100; - let buffer = Buffer.alloc(0); - let partNumber = 1; - const uploadPromises = []; - - // Initiate multipart upload - this._s3Client.createMultipartUpload(params, (err, multipart) => { - if (err) return reject(err); - - // Handle data chunking - passThrough.on('data', chunk => { - buffer = Buffer.concat([buffer, chunk]); - - // When buffer exceeds partSize, upload that part - while (buffer.length >= partSize) { - const partParams = { - Body: buffer.subarray(0, partSize), - Bucket: multipart.Bucket, - Key: multipart.Key, - PartNumber: partNumber, - UploadId: multipart.UploadId, - }; - - uploadPromises.push( - this._s3Client.uploadPart(partParams).promise() - ); - - buffer = buffer.subarray(partSize); // Remove the uploaded part from buffer - partNumber++; - } - }); - - passThrough.on('end', () => { - // Upload the remaining buffer as the last part - if (buffer.length > 0) { - const partParams = { - Body: buffer, - Bucket: multipart.Bucket, - Key: multipart.Key, - PartNumber: partNumber, - UploadId: multipart.UploadId, - }; - uploadPromises.push( - this._s3Client.uploadPart(partParams).promise() - ); - } - - // Complete multipart upload - Promise.all(uploadPromises) - .then(parts => { - const completeParams = { - Bucket: multipart.Bucket, - Key: multipart.Key, - MultipartUpload: { - Parts: parts.map((part, index) => ({ - ETag: part.ETag, - PartNumber: index + 1, - })), - }, - UploadId: multipart.UploadId, - }; - return this._s3Client.completeMultipartUpload(completeParams).promise(); - }) - .then(resolve) - .catch(err => { - this._s3Client.abortMultipartUpload({ Bucket: multipart.Bucket, Key: multipart.Key, UploadId: multipart.UploadId }); - reject(err); - }); - }); - - passThrough.on('error', err => { - this._s3Client.abortMultipartUpload({ Bucket: multipart.Bucket, Key: multipart.Key, UploadId: multipart.UploadId }); - reject(err); - }); - // make the data a stream - let readableStream = data.stream(); - - // may come in as a web stream, so we need to convert it to a node stream - if (readableStream instanceof ReadableStream) { - readableStream = stream.Readable.fromWeb(readableStream); - } - - // Pipe the data to the PassThrough stream - readableStream.pipe(passThrough); - }); - })); + return this._handleCreateLargeBlob(params, data); } } From 1177223de9eed74d8f56179c732c0fa04497b695 Mon Sep 17 00:00:00 2001 From: Aidan Daly <74743624+dalyaidan1@users.noreply.github.com> Date: Thu, 12 Sep 2024 14:00:31 -0400 Subject: [PATCH 4/6] simple stream --- index.js | 150 +++++++++++++------------------------------------------ 1 file changed, 34 insertions(+), 116 deletions(-) diff --git a/index.js b/index.js index aa77efe..b3d9009 100644 --- a/index.js +++ b/index.js @@ -37,39 +37,6 @@ function buildDirectAccessUrl(baseUrl, baseUrlFileKey, presignedUrl, config, fil return directAccessUrl; } -class Chunker extends stream.Transform { - constructor(chunkSize) { - super(); - this.chunkSize = chunkSize; - this.currentSize = 0; // Track current chunk size - this.chunks = []; // Store chunks - } - - _transform(chunk, encoding, callback) { - this.currentSize += chunk.length; - this.chunks.push(chunk); - - // If accumulated chunks reach the chunkSize, flush them - if (this.currentSize >= this.chunkSize) { - const fullChunk = Buffer.concat(this.chunks, this.currentSize); - this.push(fullChunk); // Push the full chunk - this.chunks = []; // Reset chunk storage - this.currentSize = 0; - } - - callback(); - } - - _flush(callback) { - // Handle any remaining data that's smaller than chunkSize - if (this.currentSize > 0) { - const remainingChunk = Buffer.concat(this.chunks, this.currentSize); - this.push(remainingChunk); - } - callback(); - } -} - class S3Adapter { // Creates an S3 session. // Providing AWS access, secret keys and bucket are mandatory @@ -126,80 +93,6 @@ class S3Adapter { return promise; } - - _handleCreateLargeBlob(params, data) { - return this.createBucket().then(() => new Promise((resolve, reject) => { - // Initiate multipart upload - this._s3Client.createMultipartUpload(params, (err, multipart) => { - if (err) return reject(err); - - const ONE_HUNDRED_MB = 1024 * 1024 * 100; - const chunker = new Chunker(ONE_HUNDRED_MB); - let partNumber = 1; - const uploadPromises = []; - - - // Handle data chunking - chunker.on('data', chunk => { - // When buffer exceeds partSize, upload that part - const partParams = { - Body: chunk, - Bucket: multipart.Bucket, - Key: multipart.Key, - PartNumber: partNumber, - UploadId: multipart.UploadId, - }; - - uploadPromises.push( - this._s3Client.uploadPart(partParams).promise() - ); - - partNumber++; - - }); - - chunker.on('error', err => { - this._s3Client.abortMultipartUpload({ Bucket: multipart.Bucket, Key: multipart.Key, UploadId: multipart.UploadId }); - reject(err); - }); - - - // Complete multipart upload - Promise.all(uploadPromises) - .then(parts => { - const completeParams = { - Bucket: multipart.Bucket, - Key: multipart.Key, - MultipartUpload: { - Parts: parts.map((part, index) => ({ - ETag: part.ETag, - PartNumber: index + 1, - })), - }, - UploadId: multipart.UploadId, - }; - return this._s3Client.completeMultipartUpload(completeParams).promise(); - }) - .then(resolve) - .catch(err => { - this._s3Client.abortMultipartUpload({ Bucket: multipart.Bucket, Key: multipart.Key, UploadId: multipart.UploadId }); - reject(err); - }); - - // make the data a stream - let readableStream = data.stream(); - - // may come in as a web stream, so we need to convert it to a node stream - if (readableStream instanceof ReadableStream) { - readableStream = stream.Readable.fromWeb(readableStream); - } - - // Pipe the data to the PassThrough stream - readableStream.pipe(chunker); - }); - })); - } - // For a given config object, filename, and data, store a file in S3 // Returns a promise containing the S3 object creation response createFile(filename, data, contentType, options = {}) { @@ -235,16 +128,41 @@ class S3Adapter { const serializedTags = serialize(options.tags); params.Tagging = serializedTags; } + return this.createBucket() + .then(() => new Promise((resolve, reject) => { + // if we are dealing with a blob, we need to handle it differently + // it could be over the V8 memory limit + if ( + typeof Blob !== 'undefined' && + data instanceof Blob + ) { + const passStream = new stream.PassThrough(); + + // make the data a stream + let readableStream = data.stream(); + + // may come in as a web stream, so we need to convert it to a node stream + if (readableStream instanceof ReadableStream) { + readableStream = stream.Readable.fromWeb(readableStream); + } + + // Pipe the data to the PassThrough stream + readableStream.pipe(passStream).on('error', reject) + + params.Body = passStream; + + console.log(`Uploading stream to S3 for ${filename} of type ${contentType} and size ${data.size}`); + } else { + params.Body = data; + } - // if we are dealing with a blob, we need to handle it differently - // it could be over the V8 memory limit - if ( - typeof Blob !== 'undefined' && - data instanceof Blob - ) { - - return this._handleCreateLargeBlob(params, data); - } + this._s3Client.upload(params, (err, response) => { + if (err !== null) { + return reject(err); + } + return resolve(response); + }); + })); } deleteFile(filename) { From efed6b36cb925c18ff94cd23a4551b272ded729b Mon Sep 17 00:00:00 2001 From: Aidan Daly <74743624+dalyaidan1@users.noreply.github.com> Date: Fri, 13 Sep 2024 15:45:46 -0400 Subject: [PATCH 5/6] remove debug --- index.js | 1 - 1 file changed, 1 deletion(-) diff --git a/index.js b/index.js index b3d9009..1a52766 100644 --- a/index.js +++ b/index.js @@ -151,7 +151,6 @@ class S3Adapter { params.Body = passStream; - console.log(`Uploading stream to S3 for ${filename} of type ${contentType} and size ${data.size}`); } else { params.Body = data; } From ea04bdb72770cf2a102fbcf596a42131f19c7670 Mon Sep 17 00:00:00 2001 From: Aidan Daly <74743624+dalyaidan1@users.noreply.github.com> Date: Sun, 3 Nov 2024 08:46:56 -0500 Subject: [PATCH 6/6] make sure things are imported --- index.js | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/index.js b/index.js index 1a52766..235071a 100644 --- a/index.js +++ b/index.js @@ -5,6 +5,7 @@ const AWS = require('aws-sdk'); const optionsFromArguments = require('./lib/optionsFromArguments'); const stream = require('stream'); +const {Blob} = require('node:buffer') const awsCredentialsDeprecationNotice = function awsCredentialsDeprecationNotice() { // eslint-disable-next-line no-console @@ -132,17 +133,14 @@ class S3Adapter { .then(() => new Promise((resolve, reject) => { // if we are dealing with a blob, we need to handle it differently // it could be over the V8 memory limit - if ( - typeof Blob !== 'undefined' && - data instanceof Blob - ) { + if (data instanceof Blob) { const passStream = new stream.PassThrough(); // make the data a stream let readableStream = data.stream(); // may come in as a web stream, so we need to convert it to a node stream - if (readableStream instanceof ReadableStream) { + if (readableStream instanceof stream.ReadableStream) { readableStream = stream.Readable.fromWeb(readableStream); }