Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix generators, use text/event-stream protocol #743

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Binary file modified bun.lockb
Binary file not shown.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
"eslint": "^8.49.0",
"eslint-plugin-security": "^2.1.0",
"eslint-plugin-sonarjs": "^0.23.0",
"eventsource-parser": "^1.1.2",
"expect-type": "^0.16.0",
"memoirist": "^0.2.0",
"prettier": "^3.3.3",
Expand Down
4 changes: 2 additions & 2 deletions src/dynamic-handle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ export const createDynamicHandler =
}
}

return (context.response = mapResponse(response, context.set))
return (context.response = await mapResponse(response, context.set))
} catch (error) {
if ((error as ElysiaErrors).status)
set.status = (error as ElysiaErrors).status
Expand Down Expand Up @@ -418,7 +418,7 @@ export const createDynamicErrorHandler =
let response = hook.fn(errorContext as any)
if (response instanceof Promise) response = await response
if (response !== undefined && response !== null)
return (context.response = mapResponse(response, context.set))
return (context.response = await mapResponse(response, context.set))
}

return new Response(
Expand Down
51 changes: 21 additions & 30 deletions src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ export const serializeCookie = (cookies: Context['set']['cookie']) => {

// return arr
// }

const handleStream = async (
generator: Generator | AsyncGenerator,
set?: Context['set'],
Expand All @@ -126,10 +125,11 @@ const handleStream = async (
let init = generator.next()
if (init instanceof Promise) init = await init

if (init.done) {
if (init?.done) {
if (set) return mapResponse(init.value, set, request)
return mapCompactResponse(init.value, request)
}


return new Response(
new ReadableStream({
Expand All @@ -146,38 +146,29 @@ const handleStream = async (
}
})

if (init.value !== undefined && init.value !== null) {
if (
typeof init.value === "object"
if (init?.value !== undefined && init?.value !== null)
controller.enqueue(
Buffer.from(
`event: message\ndata: ${JSON.stringify(init.value)}\n\n`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would make sense to check typeof object before using JSON.stringify

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't always call JSON.stringify and the user yields a string that contains 2 new lines the response will break the text/event-stream protocol. If you always call JSON.stringify you can be sure that all the new lines are encoded with \n and put in a single line.

The parsing in Eden also becomes simpler because you know messages are always a JSON encoded string instead of using heuristics.

)
)
try {
controller.enqueue(
Buffer.from(JSON.stringify(init.value))
)
} catch {
controller.enqueue(Buffer.from(init.value.toString()))
}
else controller.enqueue(Buffer.from(init.value.toString()))
}

for await (const chunk of generator) {
if (end) break
if (chunk === undefined || chunk === null) continue
try {
for await (const chunk of generator) {
if (end) break
if (chunk === undefined || chunk === null) continue

if (typeof chunk === 'object')
try {
controller.enqueue(
Buffer.from(JSON.stringify(chunk))
controller.enqueue(
Buffer.from(
`event: message\ndata: ${JSON.stringify(chunk)}\n\n`
remorses marked this conversation as resolved.
Show resolved Hide resolved
)
} catch {
controller.enqueue(Buffer.from(chunk.toString()))
}
else controller.enqueue(Buffer.from(chunk.toString()))

// Wait for the next event loop
// Otherwise the data will be mixed up
await new Promise<void>((resolve) =>
setTimeout(() => resolve(), 0)
)
}
} catch (error: any) {
controller.enqueue(
Buffer.from(
`event: error\ndata: ${JSON.stringify(error.message || error.name || 'Error')}\n\n`
)
)
}

Expand Down
125 changes: 119 additions & 6 deletions test/response/stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
import { describe, it, expect } from 'bun:test'
import { req } from '../utils'
import { createParser } from 'eventsource-parser'

import { Elysia } from '../../src'

function textEventStream(items: string[]) {
return items
.map((item) => `event: message\ndata: ${JSON.stringify(item)}\n\n`)
.join('')
}

function parseTextEventStreamItem(item: string) {
const data = item.split('data: ')[1].split('\n')[0]
return JSON.parse(data)
}

describe('Stream', () => {
it('handle stream', async () => {
const expected = ['a', 'b', 'c']
Expand Down Expand Up @@ -31,7 +43,9 @@ describe('Stream', () => {
reader.read().then(function pump({ done, value }): unknown {
if (done) return resolve(acc)

expect(value.toString()).toBe(expected.shift()!)
expect(parseTextEventStreamItem(value.toString())).toBe(
expected.shift()!
)

acc += value.toString()
return reader.read().then(pump)
Expand All @@ -41,7 +55,64 @@ describe('Stream', () => {
})

expect(expected).toHaveLength(0)
expect(response).toBe('abc')
expect(response).toBe(textEventStream(['a', 'b', 'c']))
})
it('handle errors after yield', async () => {
const app = new Elysia().get('/', async function* () {
yield 'a'
await Bun.sleep(10)

throw new Error('an error')
})

const response = await app.handle(req('/')).then((x) => x.text())

expect(response).toBe(
'event: message\ndata: "a"\n\nevent: error\ndata: "an error"\n\n'
)
})

it('handle errors before yield when aot is false', async () => {
const app = new Elysia({ aot: false })
.onError(({ error }) => {
return new Response(error.message)
})
.get('/', async function* () {
throw new Error('an error xxxx')
})

const response = await app.handle(req('/')).then((x) => x.text())

expect(response).toInclude('an error')
})

it.todo('handle errors before yield when aot is true', async () => {
const app = new Elysia({ aot: true })
.onError(({ error }) => {
return new Response(error.message)
})
.get('/', async function* () {
throw new Error('an error')
})

const response = await app.handle(req('/')).then((x) => x.text())

expect(response).toInclude('an error')
})

it.todo('handle errors before yield with onError', async () => {
const expected = 'error expected'
const app = new Elysia()
.onError(({}) => {
return new Response(expected)
})
.get('/', async function* () {
throw new Error('an error')
})

const response = await app.handle(req('/')).then((x) => x.text())

expect(response).toBe(expected)
})

it('stop stream on canceled request', async () => {
Expand Down Expand Up @@ -79,9 +150,13 @@ describe('Stream', () => {
const { promise, resolve } = Promise.withResolvers()

reader.read().then(function pump({ done, value }): unknown {
if (done) return resolve(acc)
if (done) {
return resolve(acc)
}

expect(value.toString()).toBe(expected.shift()!)
expect(parseTextEventStreamItem(value.toString())).toBe(
expected.shift()!
)

acc += value.toString()
return reader.read().then(pump)
Expand All @@ -91,7 +166,7 @@ describe('Stream', () => {
})

expect(expected).toHaveLength(0)
expect(response).toBe('ab')
expect(response).toBe(textEventStream(['a', 'b']))
})

it('mutate set before yield is called', async () => {
Expand All @@ -111,6 +186,42 @@ describe('Stream', () => {
'http://saltyaom.com'
)
})
it('handle stream with objects', async () => {
const objects = [
{ message: 'hello' },
{ response: 'world' },
{ data: [1, 2, 3] },
{ result: [4, 5, 6] }
]
const app = new Elysia().get('/', async function* ({}) {
for (const obj of objects) {
yield obj
}
})

const body = await app.handle(req('/')).then((x) => x.body)

let events = [] as any[]
const parser = createParser((event) => {
events.push(event)
})
const { promise, resolve } = Promise.withResolvers()
const reader = body?.getReader()!

reader.read().then(function pump({ done, value }): unknown {
if (done) {
return resolve()
}
const text = value.toString()
parser.feed(text)
return reader.read().then(pump)
})
await promise

expect(events.map((x) => x.data)).toEqual(
objects.map((x) => JSON.stringify(x))
)
})

it('mutate set before yield is called', async () => {
const expected = ['a', 'b', 'c']
Expand Down Expand Up @@ -216,7 +327,9 @@ describe('Stream', () => {
reader.read().then(function pump({ done, value }): unknown {
if (done) return resolve()

expect(value.toString()).toBe(JSON.stringify(expected[i++]))
expect(parseTextEventStreamItem(value.toString())).toEqual(
expected[i++]
)

return reader.read().then(pump)
})
Expand Down