diff --git a/packages/platform-node-shared/src/internal/multipart.ts b/packages/platform-node-shared/src/internal/multipart.ts index 0ec32396a2e..25d1e31286e 100644 --- a/packages/platform-node-shared/src/internal/multipart.ts +++ b/packages/platform-node-shared/src/internal/multipart.ts @@ -116,7 +116,8 @@ class FileImpl extends PartBase implements Multipart.File { /** @internal */ export const fileToReadable = (file: Multipart.File): Readable => (file as FileImpl).file -function convertError(cause: MultipartError): Multipart.MultipartError { +function convertError(error: MultipartError): Multipart.MultipartError { + const cause = (error as { context?: MultipartError })?.context ?? error switch (cause._tag) { case "ReachedLimit": { switch (cause.limit) { diff --git a/packages/platform-node-shared/src/internal/stream.ts b/packages/platform-node-shared/src/internal/stream.ts index 5b1fd0331c9..16697372f1b 100644 --- a/packages/platform-node-shared/src/internal/stream.ts +++ b/packages/platform-node-shared/src/internal/stream.ts @@ -190,14 +190,21 @@ export const fromReadableChannel = ( unknown, E > => - Channel.suspend(() => - unsafeReadableRead( - evaluate(), + Channel.suspend(() => { + let readable: Readable | NodeJS.ReadableStream + try { + readable = evaluate() + } catch (error) { + return Channel.failSync(() => onError(error)) + } + + return unsafeReadableRead( + readable, onError, MutableRef.make(undefined), options ) - ) + }) /** @internal */ export const writeInput = ( diff --git a/packages/platform-node/test/HttpApi.test.ts b/packages/platform-node/test/HttpApi.test.ts index 83c8baa94e6..28c0bfa71ac 100644 --- a/packages/platform-node/test/HttpApi.test.ts +++ b/packages/platform-node/test/HttpApi.test.ts @@ -17,9 +17,12 @@ import { OpenApi } from "@effect/platform" import { NodeHttpServer } from "@effect/platform-node" +import * as HttpBody from "@effect/platform/HttpBody" import * as HttpLayerRouter from "@effect/platform/HttpLayerRouter" import { assert, describe, it } from "@effect/vitest" -import { Chunk, Context, DateTime, Effect, Layer, Redacted, Ref, Schema, Stream, Struct } from "effect" +import { Chunk, Context, DateTime, Effect, Layer, pipe, Redacted, Ref, Schema, Stream, Struct } from "effect" +import * as Logger from "effect/Logger" +import * as LogLevel from "effect/LogLevel" import OpenApiFixture from "./fixtures/openapi.json" with { type: "json" } describe("HttpApi", () => { @@ -91,6 +94,34 @@ describe("HttpApi", () => { length: 5 }) }).pipe(Effect.provide(HttpLive))) + + it.live("multipart stream handles incorrect content-type gracefully", () => + Effect.gen(function*() { + const client = yield* HttpApiClient.make(Api, { + transformClient: (client) => + HttpClient.mapRequestInput(client, (request) => + pipe( + HttpClientRequest.setBody(request, HttpBody.empty), + // Intentionally set incorrect content-type (should be multipart/form-data) + HttpClientRequest.setHeader("content-type", "application/json") + )) + }) + const data = new FormData() + data.append("file", new Blob(["hello"], { type: "text/plain" }), "hello.txt") + const result = yield* client.users.uploadStream({ payload: data }).pipe( + Effect.flip + ) + assert.deepStrictEqual( + result, + new Multipart.MultipartError({ reason: "Parse", cause: "{\"_tag\": \"InvalidBoundary\"}" }) + ) + }).pipe( + Effect.provide(HttpLive), + // FIXME: remove this + Effect.provide( + Logger.minimumLogLevel(LogLevel.All) + ) + )) }) describe("headers", () => { @@ -480,9 +511,12 @@ class UsersApi extends HttpApiGroup.make("users") ) .add( HttpApiEndpoint.post("uploadStream")`/uploadstream` - .setPayload(HttpApiSchema.MultipartStream(Schema.Struct({ - file: Multipart.SingleFileSchema - }))) + .setPayload(HttpApiSchema.MultipartStream( + Schema.Struct({ + file: Multipart.SingleFileSchema + }) + )) + .addError(Multipart.MultipartError) .addSuccess(Schema.Struct({ contentType: Schema.String, length: Schema.Int