diff --git a/CHANGELOG.md b/CHANGELOG.md index 91837ca..20d9a77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,79 @@ All notable changes to this project will be documented in this file. +## [0.15.4] - 2026-05-01 + +Pluggable streaming HTTP backends + hackney 4 pull-mode bug fix. + +### Fixed + +- **Hackney 4 streaming was silently in push mode, not pull mode.** + `lib/nous/providers/http.ex:463-470` (in 0.15.0–0.15.3) passed + `[:async, :once, ...]` as separate atoms to `:hackney.request/5`. + Erlang's `proplists` resolves bare atom `:async` as `{:async, true}`, + which puts hackney into push mode; the bare `:once` atom is silently + ignored. The architectural intent of M-12 (strict pull-based + backpressure so a slow consumer cannot grow its mailbox) was + forfeited — `:hackney.stream_next/1` is a no-op in push mode, so the + receive loop appeared to work in many cases (chunks arrive in the + same shape) but the pacing came from the producer, not the consumer. + The fix is the tuple form `[{:async, :once}, ...]` per + `deps/hackney/NEWS.md:269-272`. Empirical confirmation: with the + broken form a benign Bypass server delivers 97 messages to the + caller's mailbox in 2 s without any `stream_next/1` call; with the + tuple form the mailbox holds only 2 messages (status + headers) and + body chunks gate on `stream_next/1`. Reported as part of the same + bug that caused observable timeouts against cold/slow SSE backends. + +### Added + +- **`Nous.HTTP.StreamBackend` behaviour** — pluggable streaming HTTP + layer mirroring the non-streaming `Nous.HTTP.Backend` introduced in + 0.15.1. Two impls ship: + - `Nous.HTTP.StreamBackend.Req` — the new default. Drives + `Req.post/1` with the `:into` callback. Simpler stack + (Req/Finch/Mint), marginally faster TTFB than hackney in + benchmarks against LMStudio (~130 ms vs ~133 ms mean). + - `Nous.HTTP.StreamBackend.Hackney` — opt-in. Strict pull-based + backpressure via `:hackney`'s `[{:async, :once}]` mode (the bug + above is fixed here). Pick this when downstream consumers can + block per chunk (LiveView fan-out under load, + persistence-on-every-chunk, slow IO). +- **`:stream_backend` per-call opt** on `Nous.Providers.HTTP.stream/4`. +- **`NOUS_HTTP_STREAM_BACKEND` env var** (`req` | `hackney` | + `My.Custom.Backend`). Resolution mirrors `NOUS_HTTP_BACKEND`: + per-call → env → app config → default. +- **`config :nous, :http_stream_backend, MyBackend`** application + config knob. + +### Changed + +- `Nous.Providers.HTTP.stream/4` now dispatches to the configured + `Nous.HTTP.StreamBackend` instead of inlining hackney plumbing. The + public API surface (return shape, event types, error tuples) is + unchanged. Provider stream normalizers (`Nous.StreamNormalizer.*`) + consume normalized events and need no changes. +- The non-streaming pluggable `Nous.HTTP.Backend` resolver is + refactored to share its `String.to_existing_atom/1` safety logic with + the streaming resolver — same C-2 protection on both paths. + +### Documentation + +- `Nous.Providers.HTTP` moduledoc rewritten around the dual + pluggable-backend model and the streaming backpressure trade-off. +- `Nous.HTTP.StreamBackend` and the two impl modules carry full + moduledocs explaining when to pick each. + +### Migration + +No code changes required for callers — the default behavior is +restored to "streaming works against any healthy SSE backend." Apps +that depend on strict pull-based backpressure should set: + + config :nous, :http_stream_backend, Nous.HTTP.StreamBackend.Hackney + +or pass `stream_backend: Nous.HTTP.StreamBackend.Hackney` per call. + ## [0.15.3] - 2026-05-01 Streaming + tool execution. The `Nous.Agent.run/3` loop now has a diff --git a/README.md b/README.md index d9414a9..178104a 100644 --- a/README.md +++ b/README.md @@ -110,11 +110,13 @@ IO.puts("Tokens: #{result.usage.total_tokens}") | LlamaCpp | `llamacpp:local` + `:llamacpp_model` | ✅ | | **Custom** | `custom:model` + `:base_url` | ✅ | -HTTP providers use a pluggable backend — `Req` (default, on top of Finch) or -`hackney 4` — selected per-call, via `NOUS_HTTP_BACKEND`, or via app config. -Streaming always uses `hackney`'s `:async, :once` pull-based mode for -backpressure (a slow consumer can't OOM under a fast LLM). LlamaCpp runs -in-process via NIFs. See [HTTP Backend](#http-backend) below for details. +HTTP providers use a pluggable backend on both the non-streaming and +streaming paths — `Req` (default, on top of Finch) or `hackney 4` — +selected per-call, via `NOUS_HTTP_BACKEND` / `NOUS_HTTP_STREAM_BACKEND`, +or via app config. The Hackney streaming backend uses `[{:async, :once}]` +pull-based mode for strict backpressure (a slow consumer can't grow its +mailbox under a fast LLM). LlamaCpp runs in-process via NIFs. +See [HTTP Backend](#http-backend) below for details. > **Tip**: The named local providers (`lmstudio:`, `vllm:`, `sglang:`, > `ollama:`) are the recommended way to talk to local OpenAI-compatible @@ -266,11 +268,13 @@ agent = Nous.new("openai:gpt-4", ### HTTP Backend -Non-streaming HTTP requests go through a pluggable backend. Default is -`Nous.HTTP.Backend.Req` (Req on top of Finch); `Nous.HTTP.Backend.Hackney` -is shipped as an alternative. Streaming always uses hackney's `:async, :once` -pull-based mode for backpressure — that choice is structural, not -configurable. +Both the non-streaming and streaming HTTP paths go through pluggable +backends. Defaults are `Nous.HTTP.Backend.Req` and +`Nous.HTTP.StreamBackend.Req` (both on Req + Finch). +`Nous.HTTP.Backend.Hackney` and `Nous.HTTP.StreamBackend.Hackney` are +shipped as alternatives. + +#### Non-streaming (`Nous.HTTP.Backend`) Pick per-call, per-environment, or per-app: @@ -286,8 +290,37 @@ HTTP.post(url, body, headers, backend: Nous.HTTP.Backend.Hackney) config :nous, :http_backend, Nous.HTTP.Backend.Hackney ``` -Tune the shared hackney `:default` pool from app config (used by both the -Hackney backend and the streaming pipeline): +#### Streaming (`Nous.HTTP.StreamBackend`) + +Same resolution chain, separate config knob: + +```elixir +# Per-call +HTTP.stream(url, body, headers, + stream_backend: Nous.HTTP.StreamBackend.Hackney) + +# Env +# NOUS_HTTP_STREAM_BACKEND=hackney + +# App config +config :nous, :http_stream_backend, Nous.HTTP.StreamBackend.Hackney +``` + +When to pick which streaming backend: + +| Backend | Pick it when | +|---------|--------------| +| `Nous.HTTP.StreamBackend.Req` *(default)* | One HTTP stack across streaming + non-streaming. Right default for almost every app. Backpressure is bounded by parsing speed, not strict pull pacing — fine for typical LLM workloads where token rate is the bottleneck. | +| `Nous.HTTP.StreamBackend.Hackney` | Strict pull-based backpressure via `[{:async, :once}]`. Pick this when downstream consumers can block per chunk (LiveView fan-out under load, persistence-on-every-chunk, slow IO). | + +Both emit identical normalized event streams (parsed JSON maps, +`{:stream_done, _}`, `{:stream_error, _}`); switching backends needs no +other code changes. + +#### Hackney pool + +Tune the shared hackney `:default` pool from app config (used by both +the Hackney non-streaming and Hackney streaming backends): ```elixir config :nous, :hackney_pool, @@ -297,9 +330,8 @@ config :nous, :hackney_pool, See [the HTTP backend benchmark report](https://github.com/nyo16/nous/blob/master/docs/benchmarks/http_backend.md) for localhost + real-endpoint benchmark numbers and guidance on when -to switch backends. Headline: stick with the Req default unless you -specifically need HTTP/3 (Alt-Svc auto-upgrade) or want to consolidate -on one HTTP family. +to switch backends. Headline: stick with the Req defaults unless you +have a specific reason (strict backpressure, HTTP/3 upgrade, single-HTTP-stack consolidation). ### Timeouts diff --git a/lib/nous/http/stream_backend.ex b/lib/nous/http/stream_backend.ex new file mode 100644 index 0000000..cd4b8a6 --- /dev/null +++ b/lib/nous/http/stream_backend.ex @@ -0,0 +1,68 @@ +defmodule Nous.HTTP.StreamBackend do + @moduledoc """ + Behaviour for SSE / chunked streaming HTTP backends. + + Implemented by `Nous.HTTP.StreamBackend.Req` (default) and + `Nous.HTTP.StreamBackend.Hackney`. Selection mirrors the non-streaming + `Nous.HTTP.Backend` resolution chain (per-call → env var → app config → + default). See `Nous.Providers.HTTP.stream/4` for the resolution order. + + Pick a backend three ways, highest precedence first: + + # 1. Per-call opt + Nous.Providers.HTTP.stream(url, body, headers, + stream_backend: Nous.HTTP.StreamBackend.Hackney) + + # 2. Environment variable (req | hackney | "MyApp.MyStreamBackend") + export NOUS_HTTP_STREAM_BACKEND=hackney + + # 3. Application config + config :nous, :http_stream_backend, Nous.HTTP.StreamBackend.Hackney + + Default: `Nous.HTTP.StreamBackend.Req`. + + ## When to pick which + + - `Nous.HTTP.StreamBackend.Req` — one HTTP stack across streaming and + non-streaming, simpler dependency story. Right default for most apps. + Backpressure is bounded by parsing speed, not by `stream_next/1` + pacing — a fast LLM + slow consumer can grow the consumer's mailbox. + Acceptable for typical LLM workloads where token rate is the + bottleneck. + - `Nous.HTTP.StreamBackend.Hackney` — strict pull-based backpressure + via `:hackney`'s `{:async, :once}` mode. The consumer paces the + producer chunk-by-chunk. Pick this when downstream consumers can + block per chunk (LiveView assigns + diff + push under load, + persistence-on-every-chunk, slow IO). + + Both backends emit the same normalized event stream (parsed JSON maps, + `{:stream_done, reason}`, `{:stream_error, reason}`). Switching between + them does not require changes elsewhere. + + ## Custom backends + + Implement `c:stream/4` and return `{:ok, Enumerable.t()}` where the + enumerable emits parsed JSON maps, `{:stream_done, reason}` tuples, or + `{:stream_error, reason}` tuples. The stream MUST halt after the first + `{:stream_error, _}` and after `{:stream_done, _}`. + """ + + @doc """ + Issue a streaming POST request and return a lazy `Enumerable.t()` of + parsed events. + + ## Options + * `:timeout` — receive timeout in milliseconds (default: `60_000`) + * `:connect_timeout` — TCP connect timeout in milliseconds (default: `30_000`) + * `:stream_parser` — module implementing `parse_buffer/1` for non-SSE + formats (e.g. JSON-array streams). Defaults to SSE. + + Backends MAY accept additional options; unknown options should be ignored. + """ + @callback stream( + url :: String.t(), + body :: map(), + headers :: [{String.t(), String.t()}], + opts :: keyword() + ) :: {:ok, Enumerable.t()} | {:error, term()} +end diff --git a/lib/nous/http/stream_backend/hackney.ex b/lib/nous/http/stream_backend/hackney.ex new file mode 100644 index 0000000..2568cec --- /dev/null +++ b/lib/nous/http/stream_backend/hackney.ex @@ -0,0 +1,227 @@ +defmodule Nous.HTTP.StreamBackend.Hackney do + @moduledoc """ + `Nous.HTTP.StreamBackend` implementation backed by `:hackney` in + `[{:async, :once}]` mode for strict pull-based backpressure. + + The consumer calls `:hackney.stream_next/1` to ask for one more chunk; + hackney reads it off the socket and delivers it as a single message + `{:hackney_response, conn, chunk_or_done}`. Because the network read + only happens when the consumer asks for it, the producer literally + cannot outrun the consumer — the mailbox stays bounded at one message + no matter how slow the consumer is. + + Pick this backend when downstream consumers can block per chunk + (LiveView fan-out under load, persistence-on-every-chunk, slow IO). + For typical LLM workloads where token-generation rate is the + bottleneck, `Nous.HTTP.StreamBackend.Req` is simpler and equally fast. + + ## Hackney 4 option shape + + Hackney 4 documents the pull-based form as `[{async, once}]` — a + **tuple**. The legacy `[:async, :once]` two-atom form silently puts + hackney into push mode (`proplists` resolves bare `:async` as + `{:async, true}`), which forfeits the backpressure guarantee. This + module uses the tuple form. See `deps/hackney/NEWS.md:255-275`. + + ## TLS verification + + Passes `verify: :verify_peer` with system CAs from + `:public_key.cacerts_get/0` explicitly. Hackney's default is + `:verify_none`, which would silently accept MITM'd connections — do + not regress this. + + ## Pool + + Uses hackney's `:default` pool (50 conns, 2s idle keepalive) unless + the caller passes `:pool`. Apps that want isolation can pass + `pool: :my_pool` per call after starting the pool with + `:hackney_pool.start_pool/2`. + """ + + @behaviour Nous.HTTP.StreamBackend + + require Logger + + alias Nous.Providers.HTTP + + @default_timeout 60_000 + @default_connect_timeout 30_000 + + @impl Nous.HTTP.StreamBackend + def stream(url, body, headers, opts \\ []) + + def stream(url, body, headers, opts) + when is_binary(url) and is_map(body) and is_list(headers) do + timeout = Keyword.get(opts, :timeout, @default_timeout) + connect_timeout = Keyword.get(opts, :connect_timeout, @default_connect_timeout) + pool = Keyword.get(opts, :pool, :default) + stream_parser = Keyword.get(opts, :stream_parser) + + try do + json_body = JSON.encode!(body) + + stream = + Stream.resource( + fn -> + start_streaming(url, headers, json_body, timeout, connect_timeout, pool, + stream_parser: stream_parser + ) + end, + &next_chunk/1, + &cleanup/1 + ) + + {:ok, stream} + catch + _, error -> + Logger.error("Failed to encode request body: #{inspect(error)}") + {:error, %{reason: :json_encode_error, details: error}} + end + end + + @doc false + # Public for unit-testing the regression net on the option proplist + # shape. Returns the proplist passed to `:hackney.request/5`. The + # `{:async, :once}` tuple form is what enables pull mode — a bare + # `:async` atom would put hackney into push mode (proplists resolves + # bare atoms as `{atom, true}`), forfeiting backpressure. See + # `deps/hackney/NEWS.md:269-272` and the regression test in + # `test/nous/http/stream_backend/hackney_test.exs`. + @spec request_opts(non_neg_integer(), non_neg_integer(), atom()) :: keyword() + def request_opts(timeout, connect_timeout, pool) do + [ + {:async, :once}, + {:pool, pool}, + {:recv_timeout, timeout}, + {:connect_timeout, connect_timeout}, + {:ssl_options, [verify: :verify_peer, cacerts: :public_key.cacerts_get()]} + ] + end + + # Start streaming via hackney's `[{:async, :once}]` pull-based mode. + defp start_streaming(url, headers, body, timeout, connect_timeout, pool, extra) do + stream_parser = Keyword.get(extra, :stream_parser) + hackney_headers = Enum.map(headers, fn {k, v} -> {to_charlist(k), to_charlist(v)} end) + + opts = request_opts(timeout, connect_timeout, pool) + + case :hackney.request(:post, url, hackney_headers, body, opts) do + {:ok, ref} -> + %{ + ref: ref, + buffer: "", + done: false, + status: nil, + timeout: timeout, + error: nil, + stream_parser: stream_parser + } + + {:error, reason} -> + Logger.error("hackney request failed: #{inspect(reason)}") + + # Stay `done: false` here so the next_chunk error-emission clause + # fires once before halting. Setting done immediately would short- + # circuit straight to {:halt, _} and the consumer would never see + # the {:stream_error, _} event. + %{ + ref: nil, + buffer: "", + done: false, + status: nil, + timeout: timeout, + error: {:stream_error, %{reason: reason}}, + stream_parser: stream_parser + } + end + end + + # Get next chunk from the hackney `[{:async, :once}]` stream. + defp next_chunk(%{done: true} = state), do: {:halt, state} + + defp next_chunk(%{ref: nil, error: {:stream_error, _} = err} = state) do + {[err], %{state | done: true}} + end + + defp next_chunk(state) do + timeout = state.timeout + + # :hackney.stream_next/1 returns :ok unconditionally; errors arrive + # asynchronously as {:hackney_response, ref, {:error, _}}. + :ok = :hackney.stream_next(state.ref) + + ref = state.ref + + receive do + {:hackney_response, ^ref, {:status, status, _reason_phrase}} + when status not in 200..299 -> + Logger.error("Hackney stream got error status #{status}") + {[{:stream_error, %{status: status}}], %{state | done: true}} + + {:hackney_response, ^ref, {:status, status, _reason_phrase}} -> + next_chunk(%{state | status: status}) + + {:hackney_response, ^ref, {:headers, _headers}} -> + next_chunk(state) + + {:hackney_response, ^ref, :done} -> + {events, _} = HTTP.flush_stream_buffer(state.buffer, state.stream_parser) + + final_events = + Enum.reject(events, fn + nil -> true + {:parse_error, _} -> true + _ -> false + end) + + if Enum.empty?(final_events) do + {:halt, %{state | done: true}} + else + {final_events, %{state | done: true, buffer: ""}} + end + + {:hackney_response, ^ref, {:error, reason}} -> + Logger.error("Hackney stream error: #{inspect(reason)}") + {[{:stream_error, reason}], %{state | done: true}} + + {:hackney_response, ^ref, chunk} when is_binary(chunk) -> + new_buffer = state.buffer <> chunk + + if byte_size(new_buffer) > HTTP.max_buffer_size() do + Logger.error("SSE buffer overflow, terminating stream") + {[{:stream_error, %{reason: :buffer_overflow}}], %{state | done: true}} + else + {events, remaining_buffer} = + HTTP.parse_stream_buffer(new_buffer, state.stream_parser) + + {valid_events, errors} = + Enum.split_with(events, fn + {:parse_error, _} -> false + _ -> true + end) + + for {:parse_error, err} <- errors do + Logger.debug("SSE parse error (ignored): #{inspect(err)}") + end + + if Enum.empty?(valid_events) do + next_chunk(%{state | buffer: remaining_buffer}) + else + {valid_events, %{state | buffer: remaining_buffer}} + end + end + after + timeout -> + Logger.error("Hackney stream timeout after #{timeout}ms") + {[{:stream_error, %{reason: :timeout, timeout_ms: timeout}}], %{state | done: true}} + end + end + + # Cleanup when the consumer halts (Stream.take/2, exception, normal end). + defp cleanup(%{ref: nil}), do: :ok + + defp cleanup(%{ref: ref}) do + _ = :hackney.close(ref) + :ok + end +end diff --git a/lib/nous/http/stream_backend/req.ex b/lib/nous/http/stream_backend/req.ex new file mode 100644 index 0000000..96fe252 --- /dev/null +++ b/lib/nous/http/stream_backend/req.ex @@ -0,0 +1,182 @@ +defmodule Nous.HTTP.StreamBackend.Req do + @moduledoc """ + `Nous.HTTP.StreamBackend` implementation backed by `Req` (Finch + underneath). + + Default streaming backend. Drives `Req.post/1` with the `:into` + callback so chunks are pushed into a `Task`, which forwards them to + the consuming `Stream.resource` via `send/2`. + + ## Backpressure + + Req's `:into` callback runs in the spawned `Task`. Forwarding to the + consumer process is `send/2`, so a fast producer + slow consumer can + grow the consumer's mailbox unboundedly. This is acceptable for + typical LLM workloads where token-generation rate is the bottleneck + and consumers are parsing-bound (parsing throttles naturally). + + Callers whose downstream consumers can block per chunk (LiveView + fan-out under load, persistence-on-every-chunk, slow IO) should use + `Nous.HTTP.StreamBackend.Hackney` instead, which provides strict + pull-based backpressure via `:hackney`'s `{:async, :once}` mode. + + ## TLS verification + + Req's defaults handle TLS verification via Mint/Finch (system CAs + with peer verification). No additional configuration needed. + """ + + @behaviour Nous.HTTP.StreamBackend + + require Logger + + alias Nous.Providers.HTTP + + @default_timeout 60_000 + @default_connect_timeout 30_000 + + @impl Nous.HTTP.StreamBackend + def stream(url, body, headers, opts \\ []) + + def stream(url, body, headers, opts) + when is_binary(url) and is_map(body) and is_list(headers) do + timeout = Keyword.get(opts, :timeout, @default_timeout) + connect_timeout = Keyword.get(opts, :connect_timeout, @default_connect_timeout) + stream_parser = Keyword.get(opts, :stream_parser) + + parent = self() + ref = make_ref() + + task = start_request_task(url, body, headers, timeout, connect_timeout, parent, ref) + + state = %{ + ref: ref, + task: task, + task_ref: task.ref, + buffer: "", + done: false, + timeout: timeout, + stream_parser: stream_parser + } + + stream = + Stream.resource( + fn -> state end, + &next_chunk/1, + &cleanup/1 + ) + + {:ok, stream} + end + + defp start_request_task(url, body, headers, timeout, connect_timeout, parent, ref) do + Task.async(fn -> + result = + Req.post(url, + json: body, + headers: headers, + receive_timeout: timeout, + connect_options: [timeout: connect_timeout], + into: fn {:data, chunk}, {req, resp} -> + if resp.status in 200..299 do + send(parent, {ref, {:chunk, chunk}}) + {:cont, {req, resp}} + else + # Non-2xx: accumulate body locally so the post-call status + # check has the full error body to report. Do not forward. + {:cont, {req, %{resp | body: (resp.body || "") <> chunk}}} + end + end + ) + + case result do + {:ok, %Req.Response{status: status}} when status in 200..299 -> + send(parent, {ref, :done}) + + {:ok, %Req.Response{status: status, body: response_body}} -> + Logger.error("Req stream got error status #{status}") + send(parent, {ref, {:error, %{status: status, body: response_body}}}) + + {:error, reason} -> + Logger.error("Req stream error: #{inspect(reason)}") + send(parent, {ref, {:error, reason}}) + end + end) + end + + # Get the next batch of events. + defp next_chunk(%{done: true} = state), do: {:halt, state} + + defp next_chunk(state) do + receive do + {ref, :done} when ref == state.ref -> + {events, _} = HTTP.flush_stream_buffer(state.buffer, state.stream_parser) + + final_events = + Enum.reject(events, fn + nil -> true + {:parse_error, _} -> true + _ -> false + end) + + if Enum.empty?(final_events) do + {:halt, %{state | done: true}} + else + {final_events, %{state | done: true, buffer: ""}} + end + + {ref, {:error, reason}} when ref == state.ref -> + {[{:stream_error, reason}], %{state | done: true}} + + # Task crashed without sending an explicit completion message — + # surface it as a stream error instead of waiting for the receive + # timeout. The :normal case here can only fire if a stale DOWN + # arrives before our explicit messages, which doesn't happen with + # Task.async monitor ordering, so any DOWN here is abnormal. + {:DOWN, task_ref, :process, _pid, reason} when task_ref == state.task_ref -> + Logger.error("Req stream task died: #{inspect(reason)}") + {[{:stream_error, %{reason: :task_died, details: reason}}], %{state | done: true}} + + {ref, {:chunk, chunk}} when ref == state.ref -> + new_buffer = state.buffer <> chunk + + if byte_size(new_buffer) > HTTP.max_buffer_size() do + Logger.error("SSE buffer overflow, terminating stream") + {[{:stream_error, %{reason: :buffer_overflow}}], %{state | done: true}} + else + {events, remaining_buffer} = + HTTP.parse_stream_buffer(new_buffer, state.stream_parser) + + {valid_events, errors} = + Enum.split_with(events, fn + {:parse_error, _} -> false + _ -> true + end) + + for {:parse_error, err} <- errors do + Logger.debug("SSE parse error (ignored): #{inspect(err)}") + end + + if Enum.empty?(valid_events) do + next_chunk(%{state | buffer: remaining_buffer}) + else + {valid_events, %{state | buffer: remaining_buffer}} + end + end + after + state.timeout -> + Logger.error("Req stream timeout after #{state.timeout}ms") + {[{:stream_error, %{reason: :timeout, timeout_ms: state.timeout}}], %{state | done: true}} + end + end + + defp cleanup(%{task: nil}), do: :ok + + defp cleanup(%{task: task}) do + # Brutal kill: the task may still be in Req.post pulling chunks. We + # don't care about graceful shutdown — the consumer halted the + # enumerator, which means it's done with the stream. + _ = Task.shutdown(task, :brutal_kill) + :ok + end +end diff --git a/lib/nous/providers/http.ex b/lib/nous/providers/http.ex index 9a4722c..bad54d5 100644 --- a/lib/nous/providers/http.ex +++ b/lib/nous/providers/http.ex @@ -2,36 +2,34 @@ defmodule Nous.Providers.HTTP do @moduledoc """ Shared HTTP utilities for all LLM providers. - Two HTTP families, deliberately split by use case: + Two HTTP families, both pluggable: - **Non-streaming** requests (one-shot model calls, web fetching, search - APIs) go through a pluggable `Nous.HTTP.Backend`. The default is - `Nous.HTTP.Backend.Req` (Req on top of Finch). `Nous.HTTP.Backend.Hackney` - is also shipped — pick it via per-call `:backend` opt, the - `NOUS_HTTP_BACKEND` env var, or `config :nous, :http_backend, ...`. - See `docs/benchmarks/http_backend.md` for the trade-offs. - - **Streaming** requests (SSE / chunked LLM responses) go through - `:hackney` in `:async, :once` pull-based mode. Hackney's `:async, :once` - is true pull-based streaming - the consumer calls - `:hackney.stream_next/1` to ask for one more chunk, the producer reads - it off the socket and delivers it as a single message. The consumer - paces the producer, so a slow consumer (LiveView assigns + diff + - push, slow IO, etc.) can never grow its mailbox unboundedly. - - This split fixes M-12 (streaming consumer backpressure) and H-12 (stream - lifecycle EXIT handling) from the comprehensive review by eliminating - the spawn-and-mailbox plumbing entirely - the `Stream.resource` - consumer is the only process involved. + APIs) go through a `Nous.HTTP.Backend`. Default is + `Nous.HTTP.Backend.Req`; `Nous.HTTP.Backend.Hackney` is also shipped. + - **Streaming** requests (SSE / chunked LLM responses) go through a + `Nous.HTTP.StreamBackend`. Default is `Nous.HTTP.StreamBackend.Req` + (Req's `:into` callback driven by Finch); `Nous.HTTP.StreamBackend.Hackney` + provides strict pull-based backpressure via `:hackney`'s `{:async, :once}` + mode for callers whose downstream consumers can block per chunk. + + Both backend layers resolve via the same precedence: per-call opt → env + var → app config → default. See `Nous.HTTP.Backend` and + `Nous.HTTP.StreamBackend` for selection details. ## Usage - # Non-streaming request (Req + Finch) + # Non-streaming request {:ok, body} = HTTP.post(url, body, headers) - # Streaming request (hackney pull-based, returns lazy stream) + # Streaming request — returns a lazy stream of parsed events {:ok, stream} = HTTP.stream(url, body, headers) Enum.each(stream, &process_event/1) + # Per-call backend override + {:ok, stream} = HTTP.stream(url, body, headers, + stream_backend: Nous.HTTP.StreamBackend.Hackney) + ## SSE Parsing SSE events follow the Server-Sent Events spec (https://html.spec.whatwg.org/multipage/server-sent-events.html): @@ -40,28 +38,27 @@ defmodule Nous.Providers.HTTP do - Multiple `data:` fields are concatenated with newlines - `[DONE]` signals stream completion (OpenAI convention) - ## Hackney pool - - Hackney's `:default` pool starts automatically when the `:hackney` - application boots. Defaults: 50 max connections per pool, 2s idle - keepalive timeout. For most LLM workloads (long-lived streams of - seconds-to-minutes) the defaults are appropriate. Apps that need a - Nous-isolated pool can pass `pool: :my_pool` per request and start - the pool with `:hackney_pool.start_pool/2` (or include - `:hackney_pool.child_spec/2` in their supervision tree). - - ## TLS verification - - Streaming requests pass `verify: :verify_peer` with system CAs from - `:public_key.cacerts_get/0` explicitly. Do not silently regress this - - hackney would otherwise default to `:verify_none` and accept MITM'd - connections. + The default SSE parser (`parse_sse_buffer/1`) is transport-agnostic and + shared by both stream backends. Custom parsers can be plugged in via + the `:stream_parser` opt; see `Nous.Providers.HTTP.JSONArrayParser` + for an example. + + ## Stream backpressure + + - `Nous.HTTP.StreamBackend.Req` (default): the `:into` callback runs in + a `Task` and feeds the consumer process via `send/2`. BEAM mailboxes + are unbounded, so a fast producer + slow consumer can grow the + consumer's mailbox. Acceptable for typical LLM workloads where the + consumer is parsing-bound (and parsing throttles naturally) or where + token-generation rate is the bottleneck. + - `Nous.HTTP.StreamBackend.Hackney`: strict pull-based — the consumer + calls `:hackney.stream_next/1` per chunk, so the producer literally + cannot outrun the consumer. Pick this when downstream consumers can + block per chunk (LiveView fan-out, persistence-on-every-chunk, slow IO). """ require Logger - @default_timeout 60_000 - @default_connect_timeout 30_000 # 10MB max buffer @max_buffer_size 10 * 1024 * 1024 @@ -121,84 +118,63 @@ defmodule Nous.Providers.HTTP do nil -> app_or_default() "req" -> Nous.HTTP.Backend.Req "hackney" -> Nous.HTTP.Backend.Hackney - other -> resolve_custom_backend(other) + other -> resolve_custom_backend(other, :post, 4, &app_or_default/0) end end - defp resolve_custom_backend(name) do + defp app_or_default do + Application.get_env(:nous, :http_backend, Nous.HTTP.Backend.Req) + end + + defp resolve_custom_backend(name, fun, arity, fallback) do mod = String.to_existing_atom("Elixir." <> name) Code.ensure_loaded?(mod) - if function_exported?(mod, :post, 4) do + if function_exported?(mod, fun, arity) do mod else - app_or_default() + fallback.() end rescue - ArgumentError -> app_or_default() - end - - defp app_or_default do - Application.get_env(:nous, :http_backend, Nous.HTTP.Backend.Req) + ArgumentError -> fallback.() end @doc """ - Make a streaming POST request with SSE parsing. + Make a streaming POST request. + + Dispatches to the configured `Nous.HTTP.StreamBackend`. Resolution + order (highest precedence first): + + 1. Per-call `:stream_backend` opt + 2. `NOUS_HTTP_STREAM_BACKEND` env var — `req`, `hackney`, or a + fully-qualified module name + 3. `Application.get_env(:nous, :http_stream_backend, ...)` + 4. Default: `Nous.HTTP.StreamBackend.Req` - Returns `{:ok, stream}` where stream is an Enumerable of parsed events. - Events are maps with string keys (parsed JSON) or `{:stream_done, reason}` tuples. + Returns `{:ok, stream}` where stream is an `Enumerable.t()` of parsed + events. Events are maps with string keys (parsed JSON), + `{:stream_done, reason}` tuples on completion, or + `{:stream_error, reason}` tuples on failure. ## Options - * `:timeout` - Receive timeout in ms (default: 60_000) — passed to - hackney as `:recv_timeout`. - * `:connect_timeout` - TCP connect timeout in ms (default: 30_000). - * `:pool` - Hackney pool name (default: `:default`). Configure the - default pool via `config :nous, :hackney_pool, max_connections: - ..., timeout: ...` or start a dedicated pool with - `:hackney_pool.start_pool/2`. - * `:stream_parser` - Module for parsing the stream buffer (default: SSE parsing). + * `:stream_backend` - Backend module (overrides env / config / default) + * `:timeout` - Receive timeout in ms (default: 60_000) + * `:connect_timeout` - TCP connect timeout in ms (default: 30_000) + * `:stream_parser` - Module for parsing the stream buffer (default: SSE). Must implement `parse_buffer/1` returning `{events, remaining_buffer}`. See `Nous.Providers.HTTP.JSONArrayParser` for an example. - * `:finch_name` - Ignored. Kept for source compatibility with - callers from before the 0.15.0 hackney rewrite. Will be removed - in a future release. + * `:pool` - (Hackney backend only) Hackney pool name (default: `:default`). ## Error Handling - The stream will emit `{:stream_error, reason}` on errors and then halt. + The stream emits `{:stream_error, reason}` on errors and then halts. """ @spec stream(String.t(), map(), list(), keyword()) :: {:ok, Enumerable.t()} | {:error, term()} def stream(url, body, headers, opts \\ []) def stream(url, body, headers, opts) when is_binary(url) and is_map(body) and is_list(headers) do - timeout = Keyword.get(opts, :timeout, @default_timeout) - connect_timeout = Keyword.get(opts, :connect_timeout, @default_connect_timeout) - pool = Keyword.get(opts, :pool, :default) - stream_parser = Keyword.get(opts, :stream_parser) - - try do - json_body = JSON.encode!(body) - - # Add streaming headers if not present - headers = ensure_streaming_headers(headers) - - stream = - Stream.resource( - fn -> - start_streaming(url, headers, json_body, timeout, connect_timeout, pool, - stream_parser: stream_parser - ) - end, - &next_chunk/1, - &cleanup/1 - ) - - {:ok, stream} - catch - _, error -> - Logger.error("Failed to encode request body: #{inspect(error)}") - {:error, %{reason: :json_encode_error, details: error}} - end + backend = Keyword.get(opts, :stream_backend) || configured_stream_backend() + backend.stream(url, body, ensure_streaming_headers(headers), opts) end def stream(url, body, headers, _opts) do @@ -210,8 +186,21 @@ defmodule Nous.Providers.HTTP do }} end + defp configured_stream_backend do + case System.get_env("NOUS_HTTP_STREAM_BACKEND") do + nil -> stream_app_or_default() + "req" -> Nous.HTTP.StreamBackend.Req + "hackney" -> Nous.HTTP.StreamBackend.Hackney + other -> resolve_custom_backend(other, :stream, 4, &stream_app_or_default/0) + end + end + + defp stream_app_or_default do + Application.get_env(:nous, :http_stream_backend, Nous.HTTP.StreamBackend.Req) + end + # ============================================================================ - # SSE Parsing (Public for testing) + # SSE Parsing (Public for testing and reuse by stream backends) # ============================================================================ @doc """ @@ -321,6 +310,47 @@ defmodule Nous.Providers.HTTP do def parse_sse_event(_), do: nil + @doc false + # Public for stream-backend reuse only — not part of the public API + # surface. Translates the new `{:error, :buffer_overflow}` tuple from + # `parse_sse_buffer/1` into the legacy `{events, buffer}` shape so + # backends can stay agnostic about the failure mode. + @spec parse_stream_buffer(String.t(), module() | nil) :: {list(), String.t()} + def parse_stream_buffer(buffer, nil) do + case parse_sse_buffer(buffer) do + {:error, :buffer_overflow} -> {[{:stream_error, %{reason: :buffer_overflow}}], ""} + result -> result + end + end + + def parse_stream_buffer(buffer, parser_mod), do: parser_mod.parse_buffer(buffer) + + @doc false + # Public for stream-backend reuse only. Flush remaining buffer at end + # of stream — SSE needs a trailing `\n\n` to force the last event + # through; custom parsers just re-parse the remaining buffer as-is. + # + # The chunk handler already enforces `@max_buffer_size` on every + # received chunk, so the buffer reaching here is by construction + # within limits. The synthetic `"\n\n"` is bookkeeping, not received + # data — bypass the public size check so a buffer at exactly the cap + # doesn't trip a false-positive overflow on the 2-byte append. Only + # surface overflow if the input itself is over. + @spec flush_stream_buffer(String.t(), module() | nil) :: {list(), String.t()} + def flush_stream_buffer(buffer, nil) do + if byte_size(buffer) > @max_buffer_size do + {[{:stream_error, %{reason: :buffer_overflow}}], ""} + else + do_parse_sse_buffer(buffer <> "\n\n") + end + end + + def flush_stream_buffer(buffer, parser_mod), do: parser_mod.parse_buffer(buffer) + + @doc false + # Max buffer size — public for stream-backend reuse. + def max_buffer_size, do: @max_buffer_size + # ============================================================================ # Header Helpers (Public for testing) # ============================================================================ @@ -355,12 +385,11 @@ defmodule Nous.Providers.HTTP do def api_key_header(_, _), do: [] - # ============================================================================ - # Private Functions - # ============================================================================ - - # Ensure headers include streaming-related ones - defp ensure_streaming_headers(headers) do + @doc false + # Public for stream-backend reuse. Ensures the request carries + # `content-type: application/json` and `accept: text/event-stream` + # if the caller didn't supply them. + def ensure_streaming_headers(headers) do headers |> maybe_add_header("content-type", "application/json") |> maybe_add_header("accept", "text/event-stream") @@ -376,6 +405,10 @@ defmodule Nous.Providers.HTTP do end end + # ============================================================================ + # Private Functions + # ============================================================================ + # Parse SSE event from lines defp parse_sse_event_lines(lines) do # Collect all data fields @@ -434,192 +467,6 @@ defmodule Nous.Providers.HTTP do end end - # Start streaming via hackney's `:async, :once` pull-based mode. - # - # The previous implementation used `Finch.stream/5` with a callback that - # fire-and-forgets `send(parent, ...)` per chunk - push-based, with no - # backpressure. A fast LLM (e.g. Groq at 500 tok/s) feeding a slow - # consumer (LiveView assigns + diff + push, slow stdout, etc.) grew the - # consumer mailbox unboundedly until either the 10 MiB buffer cap tripped - # or the BEAM scheduler starved. - # - # `:hackney.request/5` with `[:async, :once]` returns a request ref. The - # consumer pulls chunks one at a time by calling `:hackney.stream_next/1`, - # which causes hackney to read one more chunk off the socket and deliver - # it as `{:hackney_response, ref, chunk_or_done}`. The network read only - # happens when the consumer asks for it — the producer literally cannot - # outrun the consumer. - # - # As a side-effect this also eliminates the spawn-and-monitor plumbing - # that the previous design needed (H-12), the `Task.await(:infinity)` - # blocker, and the awkward parent EXIT/DOWN handling. The Stream.resource - # consumer IS the only process; cancellation is just the consumer - # halting its enumerator and `:hackney.close/1` releasing the conn. - # - defp start_streaming(url, headers, body, timeout, connect_timeout, pool, extra) do - stream_parser = Keyword.get(extra, :stream_parser) - hackney_headers = Enum.map(headers, fn {k, v} -> {to_charlist(k), to_charlist(v)} end) - - opts = [ - :async, - :once, - {:pool, pool}, - {:recv_timeout, timeout}, - {:connect_timeout, connect_timeout}, - {:ssl_options, [verify: :verify_peer, cacerts: :public_key.cacerts_get()]} - ] - - case :hackney.request(:post, url, hackney_headers, body, opts) do - {:ok, ref} -> - %{ - ref: ref, - buffer: "", - done: false, - status: nil, - timeout: timeout, - error: nil, - stream_parser: stream_parser - } - - {:error, reason} -> - Logger.error("hackney request failed: #{inspect(reason)}") - - # Stay `done: false` here so the next_chunk error-emission clause - # fires once before halting. Setting done immediately would short- - # circuit straight to {:halt, _} and the consumer would never see - # the {:stream_error, _} event. - %{ - ref: nil, - buffer: "", - done: false, - status: nil, - timeout: timeout, - error: {:stream_error, %{reason: reason}}, - stream_parser: stream_parser - } - end - end - - # Get next chunk from the hackney `:async, :once` stream. - # - # Pull semantics: we call `:hackney.stream_next/1` to ask for ONE more - # message from hackney, then block in `receive` for it. No mailbox - # accumulation possible - each iteration moves exactly one chunk through. - defp next_chunk(%{done: true} = state), do: {:halt, state} - - defp next_chunk(%{ref: nil, error: {:stream_error, _} = err} = state) do - {[err], %{state | done: true}} - end - - defp next_chunk(state) do - timeout = state.timeout - - # :hackney.stream_next/1 is spec'd to return :ok unconditionally - # (errors arrive asynchronously as {:hackney_response, ref, {:error, _}}). - :ok = :hackney.stream_next(state.ref) - - ref = state.ref - - receive do - {:hackney_response, ^ref, {:status, status, _reason_phrase}} - when status not in 200..299 -> - Logger.error("Hackney stream got error status #{status}") - {[{:stream_error, %{status: status}}], %{state | done: true}} - - {:hackney_response, ^ref, {:status, status, _reason_phrase}} -> - next_chunk(%{state | status: status}) - - {:hackney_response, ^ref, {:headers, _headers}} -> - next_chunk(state) - - {:hackney_response, ^ref, :done} -> - # Flush any remaining buffer with the parser's end-of-stream rules. - {events, _} = flush_stream_buffer(state.buffer, state.stream_parser) - - final_events = - Enum.reject(events, fn - nil -> true - {:parse_error, _} -> true - _ -> false - end) - - if Enum.empty?(final_events) do - {:halt, %{state | done: true}} - else - {final_events, %{state | done: true, buffer: ""}} - end - - {:hackney_response, ^ref, {:error, reason}} -> - Logger.error("Hackney stream error: #{inspect(reason)}") - {[{:stream_error, reason}], %{state | done: true}} - - {:hackney_response, ^ref, chunk} when is_binary(chunk) -> - new_buffer = state.buffer <> chunk - - if byte_size(new_buffer) > @max_buffer_size do - Logger.error("SSE buffer overflow, terminating stream") - {[{:stream_error, %{reason: :buffer_overflow}}], %{state | done: true}} - else - {events, remaining_buffer} = parse_stream_buffer(new_buffer, state.stream_parser) - - {valid_events, errors} = - Enum.split_with(events, fn - {:parse_error, _} -> false - _ -> true - end) - - for {:parse_error, err} <- errors do - Logger.debug("SSE parse error (ignored): #{inspect(err)}") - end - - if Enum.empty?(valid_events) do - next_chunk(%{state | buffer: remaining_buffer}) - else - {valid_events, %{state | buffer: remaining_buffer}} - end - end - after - timeout -> - Logger.error("Hackney stream timeout after #{timeout}ms") - {[{:stream_error, %{reason: :timeout, timeout_ms: timeout}}], %{state | done: true}} - end - end - - # Parse stream buffer using the configured parser (default: SSE). - # Translates the new {:error, :buffer_overflow} tuple from parse_sse_buffer - # into the legacy {events, buffer} shape so existing call sites keep working. - defp parse_stream_buffer(buffer, nil) do - case parse_sse_buffer(buffer) do - {:error, :buffer_overflow} -> {[{:stream_error, %{reason: :buffer_overflow}}], ""} - result -> result - end - end - - defp parse_stream_buffer(buffer, parser_mod), do: parser_mod.parse_buffer(buffer) - - # Flush remaining buffer at end of stream - # SSE needs a trailing \n\n to force the last event through; - # custom parsers just re-parse the remaining buffer as-is. - defp flush_stream_buffer(buffer, nil) do - case parse_sse_buffer(buffer <> "\n\n") do - {:error, :buffer_overflow} -> {[{:stream_error, %{reason: :buffer_overflow}}], ""} - result -> result - end - end - - defp flush_stream_buffer(buffer, parser_mod), do: parser_mod.parse_buffer(buffer) - - # Cleanup when the consumer halts (Stream.take/2, exception, normal end). - # `:hackney.close/1` releases the connection back to the default pool - # (or closes it if no pool was configured). Idempotent and safe to call - # whether or not the stream finished. - defp cleanup(%{ref: nil}), do: :ok - - defp cleanup(%{ref: ref}) do - _ = :hackney.close(ref) - :ok - end - # Truncate data for logging to avoid huge log messages defp truncate_for_log(data) when is_binary(data) do if byte_size(data) > 500 do diff --git a/mix.exs b/mix.exs index 9771fab..1e47f68 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule Nous.MixProject do use Mix.Project - @version "0.15.3" + @version "0.15.4" @source_url "https://github.com/nyo16/nous" def project do diff --git a/test/nous/http/stream_backend/hackney_test.exs b/test/nous/http/stream_backend/hackney_test.exs new file mode 100644 index 0000000..978a7cf --- /dev/null +++ b/test/nous/http/stream_backend/hackney_test.exs @@ -0,0 +1,99 @@ +defmodule Nous.HTTP.StreamBackend.HackneyTest do + use ExUnit.Case, async: true + + alias Nous.HTTP.StreamBackend.Hackney + + setup do + bypass = Bypass.open() + {:ok, bypass: bypass, url: "http://localhost:#{bypass.port}/v1/sse"} + end + + defp send_sse(conn, body) do + conn + |> Plug.Conn.put_resp_content_type("text/event-stream") + |> Plug.Conn.send_resp(200, body) + end + + test "consumes a multi-event SSE response (pull-based)", %{bypass: bypass, url: url} do + body = """ + data: {"text":"hello"} + + data: {"text":" world"} + + data: [DONE] + + """ + + Bypass.expect_once(bypass, "POST", "/v1/sse", fn conn -> send_sse(conn, body) end) + + {:ok, stream} = + Hackney.stream(url, %{model: "x"}, [{"content-type", "application/json"}], []) + + events = Enum.to_list(stream) + + assert Enum.any?(events, &match?(%{"text" => "hello"}, &1)) + assert Enum.any?(events, &match?(%{"text" => " world"}, &1)) + assert Enum.any?(events, &match?({:stream_done, _}, &1)) + end + + test "emits {:stream_error, %{status: ...}} on non-2xx response", %{bypass: bypass, url: url} do + Bypass.expect_once(bypass, "POST", "/v1/sse", fn conn -> + Plug.Conn.send_resp(conn, 500, "internal error") + end) + + {:ok, stream} = Hackney.stream(url, %{}, [], []) + events = Enum.to_list(stream) + + assert Enum.any?(events, &match?({:stream_error, %{status: 500}}, &1)) + end + + test "halts cleanly when consumer breaks early via Stream.take/2", %{bypass: bypass, url: url} do + Bypass.expect_once(bypass, "POST", "/v1/sse", fn conn -> + events = + 1..100 + |> Enum.map_join("\n\n", fn i -> ~s(data: {"i":#{i}}) end) + |> Kernel.<>("\n\n") + + send_sse(conn, events) + end) + + {:ok, stream} = Hackney.stream(url, %{}, [], []) + + taken = + stream + |> Stream.reject(&match?({:stream_done, _}, &1)) + |> Enum.take(2) + + assert length(taken) == 2 + end + + test "emits stream_error when host is unreachable" do + {:ok, stream} = Hackney.stream("http://127.0.0.1:1/", %{}, [], []) + events = Enum.to_list(stream) + + assert Enum.any?(events, &match?({:stream_error, _}, &1)) + end + + # Regression net for the [{:async, :once}] tuple fix. The end-to-end + # tests above pass in BOTH push and pull mode because hackney's push + # mode happens to deliver messages in the same `{:hackney_response, + # conn, _}` shape the receive loop tolerates. The architectural + # difference (and the entire reason this backend exists) is the + # *option proplist shape*: bare `:async` atom = push (no backpressure), + # `{:async, :once}` tuple = pull. Verifying the option shape directly + # is the only sound regression net. + test "request_opts/3 uses {:async, :once} tuple form (not bare atom)" do + opts = Hackney.request_opts(60_000, 30_000, :default) + + assert {:async, :once} in opts, + "expected `{:async, :once}` tuple in opts: #{inspect(opts)}" + + refute :async in opts, + "bare `:async` atom found — proplists would resolve it as `{:async, true}`, " <> + "putting hackney into push mode and forfeiting backpressure. " <> + "See deps/hackney/NEWS.md:269-272." + + refute :once in opts, + "bare `:once` atom found — would be silently ignored by hackney's proplist parser." + end +end diff --git a/test/nous/http/stream_backend/req_test.exs b/test/nous/http/stream_backend/req_test.exs new file mode 100644 index 0000000..1a2c8ad --- /dev/null +++ b/test/nous/http/stream_backend/req_test.exs @@ -0,0 +1,97 @@ +defmodule Nous.HTTP.StreamBackend.ReqTest do + use ExUnit.Case, async: true + + alias Nous.HTTP.StreamBackend.Req + + setup do + bypass = Bypass.open() + {:ok, bypass: bypass, url: "http://localhost:#{bypass.port}/v1/sse"} + end + + defp send_sse(conn, body) do + conn + |> Plug.Conn.put_resp_content_type("text/event-stream") + |> Plug.Conn.send_resp(200, body) + end + + test "consumes a multi-event SSE response", %{bypass: bypass, url: url} do + body = """ + data: {"text":"hello"} + + data: {"text":" world"} + + data: [DONE] + + """ + + Bypass.expect_once(bypass, "POST", "/v1/sse", fn conn -> send_sse(conn, body) end) + + {:ok, stream} = + Req.stream(url, %{model: "x"}, [{"content-type", "application/json"}], []) + + events = Enum.to_list(stream) + + assert Enum.any?(events, &match?(%{"text" => "hello"}, &1)) + assert Enum.any?(events, &match?(%{"text" => " world"}, &1)) + assert Enum.any?(events, &match?({:stream_done, _}, &1)) + end + + test "emits {:stream_error, %{status: ...}} on non-2xx response", %{bypass: bypass, url: url} do + Bypass.expect_once(bypass, "POST", "/v1/sse", fn conn -> + Plug.Conn.send_resp(conn, 500, ~s({"error":"boom"})) + end) + + {:ok, stream} = Req.stream(url, %{}, [], []) + events = Enum.to_list(stream) + + assert Enum.any?(events, &match?({:stream_error, %{status: 500}}, &1)) + end + + test "halts cleanly when consumer breaks early via Stream.take/2", %{bypass: bypass, url: url} do + Bypass.expect_once(bypass, "POST", "/v1/sse", fn conn -> + events = + 1..100 + |> Enum.map_join("\n\n", fn i -> ~s(data: {"i":#{i}}) end) + |> Kernel.<>("\n\n") + + send_sse(conn, events) + end) + + {:ok, stream} = Req.stream(url, %{}, [], []) + + taken = + stream + |> Stream.reject(&match?({:stream_done, _}, &1)) + |> Enum.take(2) + + assert length(taken) == 2 + end + + test "emits stream_error when host is unreachable" do + {:ok, stream} = Req.stream("http://127.0.0.1:1/", %{}, [], []) + events = Enum.to_list(stream) + + assert Enum.any?(events, &match?({:stream_error, _}, &1)) + end + + test "ignores malformed JSON in data lines and continues", %{bypass: bypass, url: url} do + body = """ + data: not-json-just-text + + data: {"text":"valid"} + + data: [DONE] + + """ + + Bypass.expect_once(bypass, "POST", "/v1/sse", fn conn -> send_sse(conn, body) end) + + {:ok, stream} = Req.stream(url, %{}, [], []) + events = Enum.to_list(stream) + + # Malformed line is filtered out as a parse_error; valid event is delivered. + assert Enum.any?(events, &match?(%{"text" => "valid"}, &1)) + assert Enum.any?(events, &match?({:stream_done, _}, &1)) + refute Enum.any?(events, &match?({:parse_error, _}, &1)) + end +end diff --git a/test/nous/http/stream_backend_resolution_test.exs b/test/nous/http/stream_backend_resolution_test.exs new file mode 100644 index 0000000..6063fb4 --- /dev/null +++ b/test/nous/http/stream_backend_resolution_test.exs @@ -0,0 +1,210 @@ +defmodule Nous.HTTP.StreamBackendResolutionTest do + # async: false — these tests mutate the NOUS_HTTP_STREAM_BACKEND env var + # and :nous app config, which are global. Keeping them in a single + # serialized module prevents cross-test interference. + use ExUnit.Case, async: false + + alias Nous.HTTP.StreamBackend.{Hackney, Req} + alias Nous.Providers.HTTP + + defmodule CustomStreamBackend do + @behaviour Nous.HTTP.StreamBackend + @impl true + def stream(_url, _body, _headers, _opts) do + {:ok, [%{"who" => "custom"}, {:stream_done, "stop"}]} + end + end + + setup do + prev_env = System.get_env("NOUS_HTTP_STREAM_BACKEND") + prev_app = Application.get_env(:nous, :http_stream_backend) + + on_exit(fn -> + restore_env("NOUS_HTTP_STREAM_BACKEND", prev_env) + + case prev_app do + nil -> Application.delete_env(:nous, :http_stream_backend) + v -> Application.put_env(:nous, :http_stream_backend, v) + end + end) + + bypass = Bypass.open() + %{bypass: bypass, url: "http://localhost:#{bypass.port}/v1/stream"} + end + + defp restore_env(key, nil), do: System.delete_env(key) + defp restore_env(key, val), do: System.put_env(key, val) + + defp send_sse(conn, body) do + conn + |> Plug.Conn.put_resp_content_type("text/event-stream") + |> Plug.Conn.send_resp(200, body) + end + + defp simple_sse_body do + """ + data: {"text":"hi"} + + data: [DONE] + + """ + end + + describe "precedence" do + test "per-call :stream_backend opt wins over env, app config, and default", %{ + bypass: bypass, + url: url + } do + System.put_env("NOUS_HTTP_STREAM_BACKEND", "req") + Application.put_env(:nous, :http_stream_backend, Req) + + Bypass.expect_once(bypass, "POST", "/v1/stream", fn conn -> + send_sse(conn, simple_sse_body()) + end) + + # Override with custom backend at call time. Bypass plug must NOT + # be hit; the custom backend short-circuits. + assert {:ok, stream} = + HTTP.stream(url, %{}, [], stream_backend: CustomStreamBackend) + + assert Enum.to_list(stream) == [%{"who" => "custom"}, {:stream_done, "stop"}] + + Bypass.pass(bypass) + end + + test "env var wins over app config", %{bypass: bypass, url: url} do + Application.put_env(:nous, :http_stream_backend, Hackney) + System.put_env("NOUS_HTTP_STREAM_BACKEND", "req") + + Bypass.expect_once(bypass, "POST", "/v1/stream", fn conn -> + ua = conn |> Plug.Conn.get_req_header("user-agent") |> List.first("") + assert String.contains?(ua, "req/") or String.contains?(ua, "Req/") + send_sse(conn, simple_sse_body()) + end) + + assert {:ok, stream} = HTTP.stream(url, %{}, [], []) + events = Enum.to_list(stream) + assert Enum.any?(events, &match?(%{"text" => "hi"}, &1)) + end + + test "app config wins over default when env var is unset", %{bypass: bypass, url: url} do + System.delete_env("NOUS_HTTP_STREAM_BACKEND") + Application.put_env(:nous, :http_stream_backend, Hackney) + + Bypass.expect_once(bypass, "POST", "/v1/stream", fn conn -> + ua = conn |> Plug.Conn.get_req_header("user-agent") |> List.first("") + assert String.contains?(ua, "hackney") + send_sse(conn, simple_sse_body()) + end) + + assert {:ok, stream} = HTTP.stream(url, %{}, [], []) + events = Enum.to_list(stream) + assert Enum.any?(events, &match?(%{"text" => "hi"}, &1)) + end + + test "default is Req when nothing is configured", %{bypass: bypass, url: url} do + System.delete_env("NOUS_HTTP_STREAM_BACKEND") + Application.delete_env(:nous, :http_stream_backend) + + Bypass.expect_once(bypass, "POST", "/v1/stream", fn conn -> + ua = conn |> Plug.Conn.get_req_header("user-agent") |> List.first("") + assert String.contains?(ua, "req/") or String.contains?(ua, "Req/") + send_sse(conn, simple_sse_body()) + end) + + assert {:ok, stream} = HTTP.stream(url, %{}, [], []) + events = Enum.to_list(stream) + assert Enum.any?(events, &match?(%{"text" => "hi"}, &1)) + end + end + + describe "env var values" do + test ~s|"req" resolves to Nous.HTTP.StreamBackend.Req|, %{bypass: bypass, url: url} do + Application.put_env(:nous, :http_stream_backend, Hackney) + System.put_env("NOUS_HTTP_STREAM_BACKEND", "req") + + Bypass.expect_once(bypass, "POST", "/v1/stream", fn conn -> + ua = conn |> Plug.Conn.get_req_header("user-agent") |> List.first("") + assert String.contains?(ua, "req/") or String.contains?(ua, "Req/") + send_sse(conn, simple_sse_body()) + end) + + assert {:ok, stream} = HTTP.stream(url, %{}, [], []) + _ = Enum.to_list(stream) + end + + test ~s|"hackney" resolves to Nous.HTTP.StreamBackend.Hackney|, %{ + bypass: bypass, + url: url + } do + Application.put_env(:nous, :http_stream_backend, Req) + System.put_env("NOUS_HTTP_STREAM_BACKEND", "hackney") + + Bypass.expect_once(bypass, "POST", "/v1/stream", fn conn -> + ua = conn |> Plug.Conn.get_req_header("user-agent") |> List.first("") + assert String.contains?(ua, "hackney") + send_sse(conn, simple_sse_body()) + end) + + assert {:ok, stream} = HTTP.stream(url, %{}, [], []) + _ = Enum.to_list(stream) + end + + test "fully-qualified custom module name resolves via String.to_existing_atom", %{ + bypass: bypass, + url: _url + } do + _ = CustomStreamBackend + + System.put_env( + "NOUS_HTTP_STREAM_BACKEND", + "Nous.HTTP.StreamBackendResolutionTest.CustomStreamBackend" + ) + + assert {:ok, stream} = + HTTP.stream("http://example.invalid/", %{}, [], []) + + assert Enum.to_list(stream) == [%{"who" => "custom"}, {:stream_done, "stop"}] + + Bypass.pass(bypass) + end + + test "unknown env var value falls back to app config (no atom DoS)", %{ + bypass: bypass, + url: url + } do + Application.put_env(:nous, :http_stream_backend, Hackney) + + System.put_env( + "NOUS_HTTP_STREAM_BACKEND", + "Definitely.Not.A.Real.StreamBackend.#{System.unique_integer()}" + ) + + Bypass.expect_once(bypass, "POST", "/v1/stream", fn conn -> + ua = conn |> Plug.Conn.get_req_header("user-agent") |> List.first("") + assert String.contains?(ua, "hackney") + send_sse(conn, simple_sse_body()) + end) + + assert {:ok, stream} = HTTP.stream(url, %{}, [], []) + _ = Enum.to_list(stream) + end + + test "module-name without :stream/4 falls back to app config", %{ + bypass: bypass, + url: url + } do + Application.put_env(:nous, :http_stream_backend, Hackney) + System.put_env("NOUS_HTTP_STREAM_BACKEND", "Enum") + + Bypass.expect_once(bypass, "POST", "/v1/stream", fn conn -> + ua = conn |> Plug.Conn.get_req_header("user-agent") |> List.first("") + assert String.contains?(ua, "hackney") + send_sse(conn, simple_sse_body()) + end) + + assert {:ok, stream} = HTTP.stream(url, %{}, [], []) + _ = Enum.to_list(stream) + end + end +end