Skip to content

Commit

Permalink
@tus/server: Split to multiple chunks on large incoming buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
mitjap committed Oct 25, 2023
1 parent 0f0e5d3 commit f9a415b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 6 deletions.
13 changes: 7 additions & 6 deletions packages/server/src/models/StreamSplitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,20 @@ export class StreamSplitter extends stream.Writable {
if (this.fileHandle === null) {
await this._newChunk()
}

Check failure on line 46 in packages/server/src/models/StreamSplitter.ts

View workflow job for this annotation

GitHub Actions / lts/hydrogen

Delete `······`
let overflow = this.currentChunkSize + chunk.length - this.chunkSize

const overflow = this.currentChunkSize + chunk.length - this.chunkSize
// The current chunk will be more than our defined part size if we would
// write all of it to disk.
if (overflow > 0) {
while (overflow > 0) {
// Only write to disk the up to our defined part size.
await this._writeChunk(chunk.slice(0, chunk.length - overflow))
await this._writeChunk(chunk.subarray(0, chunk.length - overflow))
await this._finishChunk()

Check failure on line 55 in packages/server/src/models/StreamSplitter.ts

View workflow job for this annotation

GitHub Actions / lts/hydrogen

Delete `········`
// We still have some overflow left, so we write it to a new chunk.
await this._newChunk()
await this._writeChunk(chunk.slice(chunk.length - overflow, chunk.length))
callback(null)
return
chunk = chunk.subarray(chunk.length - overflow, chunk.length)
overflow = this.currentChunkSize + chunk.length - this.chunkSize
}

// The chunk is smaller than our defined part size so we can just write it to disk.
Expand Down
24 changes: 24 additions & 0 deletions packages/server/test/StreamSplitter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import stream from 'node:stream/promises'
import {strict as assert} from 'node:assert'

import {StreamSplitter} from '../src/models'
import {Readable} from 'node:stream'

const fileSize = 20_971_520

Expand All @@ -25,4 +26,27 @@ describe('StreamSplitter', () => {
await stream.pipeline(readStream, splitterStream)
assert.equal(offset, fileSize)
})

it('should split to multiple chunks when single buffer exceeds chunk size', async () => {
const optimalChunkSize = 1024
const expectedChunks = 7;

Check failure on line 32 in packages/server/test/StreamSplitter.test.ts

View workflow job for this annotation

GitHub Actions / lts/hydrogen

Delete `;`

const readStream = Readable.from([Buffer.alloc(expectedChunks * optimalChunkSize)])

let chunksStarted = 0
let chunksFinished = 0
const splitterStream = new StreamSplitter({
chunkSize: optimalChunkSize,
directory: os.tmpdir(),
}).on('chunkStarted', () => {

Check failure on line 41 in packages/server/test/StreamSplitter.test.ts

View workflow job for this annotation

GitHub Actions / lts/hydrogen

Insert `⏎······`
chunksStarted++

Check failure on line 42 in packages/server/test/StreamSplitter.test.ts

View workflow job for this annotation

GitHub Actions / lts/hydrogen

Insert `··`
}).on('chunkFinished', () => {

Check failure on line 43 in packages/server/test/StreamSplitter.test.ts

View workflow job for this annotation

GitHub Actions / lts/hydrogen

Replace `····})` with `······})⏎······`
chunksFinished++

Check failure on line 44 in packages/server/test/StreamSplitter.test.ts

View workflow job for this annotation

GitHub Actions / lts/hydrogen

Insert `··`
})

Check failure on line 45 in packages/server/test/StreamSplitter.test.ts

View workflow job for this annotation

GitHub Actions / lts/hydrogen

Insert `··`

await stream.pipeline(readStream, splitterStream)

assert.equal(chunksStarted, expectedChunks)
assert.equal(chunksFinished, expectedChunks)
})
})

0 comments on commit f9a415b

Please sign in to comment.