diff --git a/src/fetch.ts b/src/fetch.ts index 162ea45..03617bf 100644 --- a/src/fetch.ts +++ b/src/fetch.ts @@ -110,7 +110,7 @@ export function fetchEventSource(input: RequestInfo, { await onopen(response); - await getBytes(response.body!, getLines(getMessages(id => { + await getBytes(response, getLines(getMessages(id => { if (id) { // store the id and send it back on the next retry: headers[LastEventId] = id; diff --git a/src/parse.ts b/src/parse.ts index c13e3c4..9550b74 100644 --- a/src/parse.ts +++ b/src/parse.ts @@ -19,11 +19,16 @@ export interface EventSourceMessage { * @param onChunk A function that will be called on each new byte chunk in the stream. * @returns {Promise} A promise that will be resolved when the stream closes. */ -export async function getBytes(stream: ReadableStream, onChunk: (arr: Uint8Array) => void) { - const reader = stream.getReader(); - let result: ReadableStreamDefaultReadResult; - while (!(result = await reader.read()).done) { - onChunk(result.value); +export async function getBytes(response: Response, onChunk: (arr: Uint8Array) => void) { + const reader = response.body?.getReader(); + if (reader) { + let result = await reader.read(); + while (!result.done) { + onChunk(result.value); + result = await reader.read(); + } + } else { + onChunk(new Uint8Array(await response.arrayBuffer())); } } @@ -64,10 +69,10 @@ export function getLines(onLine: (line: Uint8Array, fieldLength: number) => void if (buffer[position] === ControlChars.NewLine) { lineStart = ++position; // skip to next char } - + discardTrailingNewline = false; } - + // start looking forward till the end of line: let lineEnd = -1; // index of the \r or \n char for (; position < bufLength && lineEnd === -1; ++position) {