From 6edc0d645d821361f1b0eaf5c924decb6ea899e1 Mon Sep 17 00:00:00 2001 From: xuanyu <307100890@qq.com> Date: Sun, 8 Oct 2023 20:42:02 +0800 Subject: [PATCH] fix: fix bug with polyfill fetch --- src/fetch.ts | 2 +- src/parse.ts | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/fetch.ts b/src/fetch.ts index 162ea45..03617bf 100644 --- a/src/fetch.ts +++ b/src/fetch.ts @@ -110,7 +110,7 @@ export function fetchEventSource(input: RequestInfo, { await onopen(response); - await getBytes(response.body!, getLines(getMessages(id => { + await getBytes(response, getLines(getMessages(id => { if (id) { // store the id and send it back on the next retry: headers[LastEventId] = id; diff --git a/src/parse.ts b/src/parse.ts index c13e3c4..9550b74 100644 --- a/src/parse.ts +++ b/src/parse.ts @@ -19,11 +19,16 @@ export interface EventSourceMessage { * @param onChunk A function that will be called on each new byte chunk in the stream. * @returns {Promise} A promise that will be resolved when the stream closes. */ -export async function getBytes(stream: ReadableStream, onChunk: (arr: Uint8Array) => void) { - const reader = stream.getReader(); - let result: ReadableStreamDefaultReadResult; - while (!(result = await reader.read()).done) { - onChunk(result.value); +export async function getBytes(response: Response, onChunk: (arr: Uint8Array) => void) { + const reader = response.body?.getReader(); + if (reader) { + let result = await reader.read(); + while (!result.done) { + onChunk(result.value); + result = await reader.read(); + } + } else { + onChunk(new Uint8Array(await response.arrayBuffer())); } } @@ -64,10 +69,10 @@ export function getLines(onLine: (line: Uint8Array, fieldLength: number) => void if (buffer[position] === ControlChars.NewLine) { lineStart = ++position; // skip to next char } - + discardTrailingNewline = false; } - + // start looking forward till the end of line: let lineEnd = -1; // index of the \r or \n char for (; position < bufLength && lineEnd === -1; ++position) {