From f2fbcdd33d842308cce232fc38e8b0cd3d7ece39 Mon Sep 17 00:00:00 2001
From: Alexandros Pappas <apappascs@gmail.com>
Date: Fri, 18 Apr 2025 12:24:16 +0200
Subject: [PATCH] feat(anthropic): Add support for streaming thinking events

Add necessary types and update stream processing to handle Anthropic's 'thinking' content blocks and deltas in streaming responses. This resolves an issue where an IllegalArgumentException was thrown for unhandled thinking event types.
format

Signed-off-by: Alexandros Pappas <apappascs@gmail.com>
---
 .../ai/anthropic/api/AnthropicApi.java        |  20 ++-
 .../ai/anthropic/api/StreamHelper.java        |  91 ++++++----
 .../ai/anthropic/AnthropicChatModelIT.java    |  34 +++-
 .../ai/anthropic/api/AnthropicApiIT.java      | 165 ++++++++++++++++--
 .../client/AnthropicChatClientIT.java         |   2 +-
 5 files changed, 267 insertions(+), 45 deletions(-)

diff --git a/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/api/AnthropicApi.java b/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/api/AnthropicApi.java
index 87098c6d938..8dde9aad10f 100644
--- a/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/api/AnthropicApi.java
+++ b/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/api/AnthropicApi.java
@@ -1226,8 +1226,11 @@ public record ContentBlockStartEvent(
 
 		@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "type",
 				visible = true)
-		@JsonSubTypes({ @JsonSubTypes.Type(value = ContentBlockToolUse.class, name = "tool_use"),
-				@JsonSubTypes.Type(value = ContentBlockText.class, name = "text") })
+		@JsonSubTypes({
+				@JsonSubTypes.Type(value = ContentBlockToolUse.class, name = "tool_use"),
+				@JsonSubTypes.Type(value = ContentBlockText.class, name = "text"),
+				@JsonSubTypes.Type(value = ContentBlockThinking.class, name = "thinking")
+		})
 		public interface ContentBlockBody {
 			String type();
 		}
@@ -1257,6 +1260,19 @@ public record ContentBlockText(
 			@JsonProperty("type") String type,
 			@JsonProperty("text") String text) implements ContentBlockBody {
 		}
+
+		/**
+		 * Thinking content block.
+		 * @param type The content block type.
+		 * @param thinking The thinking content.
+		 */
+		@JsonInclude(Include.NON_NULL)
+		public record ContentBlockThinking(
+			@JsonProperty("type") String type,
+			@JsonProperty("thinking") String thinking,
+			@JsonProperty("signature") String signature) implements ContentBlockBody {
+		}
+		
 	}
 	// @formatter:on
 
diff --git a/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/api/StreamHelper.java b/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/api/StreamHelper.java
index ae62eb0748c..1ed4cfb0b8b 100644
--- a/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/api/StreamHelper.java
+++ b/models/spring-ai-anthropic/src/main/java/org/springframework/ai/anthropic/api/StreamHelper.java
@@ -26,9 +26,12 @@
 import org.springframework.ai.anthropic.api.AnthropicApi.ContentBlockDeltaEvent;
 import org.springframework.ai.anthropic.api.AnthropicApi.ContentBlockDeltaEvent.ContentBlockDeltaJson;
 import org.springframework.ai.anthropic.api.AnthropicApi.ContentBlockDeltaEvent.ContentBlockDeltaText;
+import org.springframework.ai.anthropic.api.AnthropicApi.ContentBlockDeltaEvent.ContentBlockDeltaThinking;
+import org.springframework.ai.anthropic.api.AnthropicApi.ContentBlockDeltaEvent.ContentBlockDeltaSignature;
 import org.springframework.ai.anthropic.api.AnthropicApi.ContentBlockStartEvent;
 import org.springframework.ai.anthropic.api.AnthropicApi.ContentBlockStartEvent.ContentBlockText;
 import org.springframework.ai.anthropic.api.AnthropicApi.ContentBlockStartEvent.ContentBlockToolUse;
+import org.springframework.ai.anthropic.api.AnthropicApi.ContentBlockStartEvent.ContentBlockThinking;
 import org.springframework.ai.anthropic.api.AnthropicApi.EventType;
 import org.springframework.ai.anthropic.api.AnthropicApi.MessageDeltaEvent;
 import org.springframework.ai.anthropic.api.AnthropicApi.MessageStartEvent;
@@ -36,19 +39,19 @@
 import org.springframework.ai.anthropic.api.AnthropicApi.StreamEvent;
 import org.springframework.ai.anthropic.api.AnthropicApi.ToolUseAggregationEvent;
 import org.springframework.ai.anthropic.api.AnthropicApi.Usage;
-import org.springframework.util.Assert;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 
 /**
- * Helper class to support streaming function calling.
+ * Helper class to support streaming function calling and thinking events.
  * <p>
  * It can merge the streamed {@link StreamEvent} chunks in case of function calling
- * message.
+ * message. It passes through other events like text, thinking, and signature deltas.
  *
  * @author Mariusz Bernacki
  * @author Christian Tzolov
  * @author Jihoon Kim
+ * @author Alexandros Pappas
  * @since 1.0.0
  */
 public class StreamHelper {
@@ -61,13 +64,16 @@ public boolean isToolUseStart(StreamEvent event) {
 	}
 
 	public boolean isToolUseFinish(StreamEvent event) {
-
-		if (event == null || event.type() == null || event.type() != EventType.CONTENT_BLOCK_STOP) {
-			return false;
-		}
-		return true;
+		// Tool use streaming sequence ends with a CONTENT_BLOCK_STOP event.
+		// The logic relies on the state machine (isInsideTool flag) managed in
+		// chatCompletionStream to know if this stop event corresponds to a tool use.
+		return event != null && event.type() != null && event.type() == EventType.CONTENT_BLOCK_STOP;
 	}
 
+	/**
+	 * Merge the tool‑use related streaming events into one aggregate event so that the
+	 * upper layers see a single ContentBlock with the full JSON input.
+	 */
 	public StreamEvent mergeToolUseEvents(StreamEvent previousEvent, StreamEvent event) {
 
 		ToolUseAggregationEvent eventAggregator = (ToolUseAggregationEvent) previousEvent;
@@ -76,8 +82,7 @@ public StreamEvent mergeToolUseEvents(StreamEvent previousEvent, StreamEvent eve
 			ContentBlockStartEvent contentBlockStart = (ContentBlockStartEvent) event;
 
 			if (ContentBlock.Type.TOOL_USE.getValue().equals(contentBlockStart.contentBlock().type())) {
-				ContentBlockStartEvent.ContentBlockToolUse cbToolUse = (ContentBlockToolUse) contentBlockStart
-					.contentBlock();
+				ContentBlockToolUse cbToolUse = (ContentBlockToolUse) contentBlockStart.contentBlock();
 
 				return eventAggregator.withIndex(contentBlockStart.index())
 					.withId(cbToolUse.id())
@@ -102,6 +107,14 @@ else if (event.type() == EventType.CONTENT_BLOCK_STOP) {
 		return event;
 	}
 
+	/**
+	 * Converts a raw {@link StreamEvent} potentially containing tool use aggregates or
+	 * other block types (text, thinking) into a {@link ChatCompletionResponse} chunk.
+	 * @param event The incoming StreamEvent.
+	 * @param contentBlockReference Holds the state of the response being built across
+	 * multiple events.
+	 * @return A ChatCompletionResponse representing the processed chunk.
+	 */
 	public ChatCompletionResponse eventToChatCompletionResponse(StreamEvent event,
 			AtomicReference<ChatCompletionResponseBuilder> contentBlockReference) {
 
@@ -135,28 +148,41 @@ else if (event.type().equals(EventType.TOOL_USE_AGGREGATE)) {
 		else if (event.type().equals(EventType.CONTENT_BLOCK_START)) {
 			ContentBlockStartEvent contentBlockStartEvent = (ContentBlockStartEvent) event;
 
-			Assert.isTrue(contentBlockStartEvent.contentBlock().type().equals("text"),
-					"The json content block should have been aggregated. Unsupported content block type: "
-							+ contentBlockStartEvent.contentBlock().type());
-
-			ContentBlockText contentBlockText = (ContentBlockText) contentBlockStartEvent.contentBlock();
-			ContentBlock contentBlock = new ContentBlock(Type.TEXT, null, contentBlockText.text(),
-					contentBlockStartEvent.index());
-			contentBlockReference.get().withType(event.type().name()).withContent(List.of(contentBlock));
+			if (contentBlockStartEvent.contentBlock() instanceof ContentBlockText textBlock) {
+				ContentBlock cb = new ContentBlock(Type.TEXT, null, textBlock.text(), contentBlockStartEvent.index());
+				contentBlockReference.get().withType(event.type().name()).withContent(List.of(cb));
+			}
+			else if (contentBlockStartEvent.contentBlock() instanceof ContentBlockThinking thinkingBlock) {
+				ContentBlock cb = new ContentBlock(Type.THINKING, null, null, contentBlockStartEvent.index(), null,
+						null, null, null, null, null, thinkingBlock.thinking(), null);
+				contentBlockReference.get().withType(event.type().name()).withContent(List.of(cb));
+			}
+			else {
+				throw new IllegalArgumentException(
+						"Unsupported content block type: " + contentBlockStartEvent.contentBlock().type());
+			}
 		}
 		else if (event.type().equals(EventType.CONTENT_BLOCK_DELTA)) {
-
 			ContentBlockDeltaEvent contentBlockDeltaEvent = (ContentBlockDeltaEvent) event;
 
-			Assert.isTrue(contentBlockDeltaEvent.delta().type().equals("text_delta"),
-					"The json content block delta should have been aggregated. Unsupported content block type: "
-							+ contentBlockDeltaEvent.delta().type());
-
-			ContentBlockDeltaText deltaTxt = (ContentBlockDeltaText) contentBlockDeltaEvent.delta();
-
-			var contentBlock = new ContentBlock(Type.TEXT_DELTA, null, deltaTxt.text(), contentBlockDeltaEvent.index());
-
-			contentBlockReference.get().withType(event.type().name()).withContent(List.of(contentBlock));
+			if (contentBlockDeltaEvent.delta() instanceof ContentBlockDeltaText txt) {
+				ContentBlock cb = new ContentBlock(Type.TEXT_DELTA, null, txt.text(), contentBlockDeltaEvent.index());
+				contentBlockReference.get().withType(event.type().name()).withContent(List.of(cb));
+			}
+			else if (contentBlockDeltaEvent.delta() instanceof ContentBlockDeltaThinking thinking) {
+				ContentBlock cb = new ContentBlock(Type.THINKING_DELTA, null, null, contentBlockDeltaEvent.index(),
+						null, null, null, null, null, null, thinking.thinking(), null);
+				contentBlockReference.get().withType(event.type().name()).withContent(List.of(cb));
+			}
+			else if (contentBlockDeltaEvent.delta() instanceof ContentBlockDeltaSignature sig) {
+				ContentBlock cb = new ContentBlock(Type.SIGNATURE_DELTA, null, null, contentBlockDeltaEvent.index(),
+						null, null, null, null, null, sig.signature(), null, null);
+				contentBlockReference.get().withType(event.type().name()).withContent(List.of(cb));
+			}
+			else {
+				throw new IllegalArgumentException(
+						"Unsupported content block delta type: " + contentBlockDeltaEvent.delta().type());
+			}
 		}
 		else if (event.type().equals(EventType.MESSAGE_DELTA)) {
 
@@ -173,21 +199,26 @@ else if (event.type().equals(EventType.MESSAGE_DELTA)) {
 			}
 
 			if (messageDeltaEvent.usage() != null) {
-				var totalUsage = new Usage(contentBlockReference.get().usage.inputTokens(),
+				Usage totalUsage = new Usage(contentBlockReference.get().usage.inputTokens(),
 						messageDeltaEvent.usage().outputTokens());
 				contentBlockReference.get().withUsage(totalUsage);
 			}
 		}
 		else if (event.type().equals(EventType.MESSAGE_STOP)) {
-			// pass through
+			// pass through as‑is
 		}
 		else {
+			// Any other event types that should propagate upwards without content
 			contentBlockReference.get().withType(event.type().name()).withContent(List.of());
 		}
 
 		return contentBlockReference.get().build();
 	}
 
+	/**
+	 * Builder for {@link ChatCompletionResponse}. Used internally by {@link StreamHelper}
+	 * to aggregate stream events.
+	 */
 	public static class ChatCompletionResponseBuilder {
 
 		private String type;
diff --git a/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/AnthropicChatModelIT.java b/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/AnthropicChatModelIT.java
index a13c6cca869..91315c5f3a5 100644
--- a/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/AnthropicChatModelIT.java
+++ b/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/AnthropicChatModelIT.java
@@ -288,7 +288,7 @@ void functionCallTest() {
 		assertThat(generation.getOutput().getText()).contains("30", "10", "15");
 		assertThat(response.getMetadata()).isNotNull();
 		assertThat(response.getMetadata().getUsage()).isNotNull();
-		assertThat(response.getMetadata().getUsage().getTotalTokens()).isLessThan(4000).isGreaterThan(1800);
+		assertThat(response.getMetadata().getUsage().getTotalTokens()).isLessThan(4000).isGreaterThan(100);
 	}
 
 	@Test
@@ -415,6 +415,38 @@ else if (message.getMetadata().containsKey("data")) { // redacted thinking
 		}
 	}
 
+	@Test
+	void thinkingWithStreamingTest() {
+		UserMessage userMessage = new UserMessage(
+				"Are there an infinite number of prime numbers such that n mod 4 == 3?");
+
+		var promptOptions = AnthropicChatOptions.builder()
+			.model(AnthropicApi.ChatModel.CLAUDE_3_7_SONNET.getName())
+			.temperature(1.0) // Temperature should be set to 1 when thinking is enabled
+			.maxTokens(8192)
+			.thinking(AnthropicApi.ThinkingType.ENABLED, 2048) // Must be ≥1024 && <
+																// max_tokens
+			.build();
+
+		Flux<ChatResponse> responseFlux = this.streamingChatModel
+			.stream(new Prompt(List.of(userMessage), promptOptions));
+
+		String content = responseFlux.collectList()
+			.block()
+			.stream()
+			.map(ChatResponse::getResults)
+			.flatMap(List::stream)
+			.map(Generation::getOutput)
+			.map(AssistantMessage::getText)
+			.filter(text -> text != null && !text.isBlank())
+			.collect(Collectors.joining());
+
+		logger.info("Response: {}", content);
+
+		assertThat(content).isNotBlank();
+		assertThat(content).contains("prime numbers");
+	}
+
 	@Test
 	void testToolUseContentBlock() {
 		UserMessage userMessage = new UserMessage(
diff --git a/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/api/AnthropicApiIT.java b/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/api/AnthropicApiIT.java
index 7eb86034c75..6ad8397cffa 100644
--- a/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/api/AnthropicApiIT.java
+++ b/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/api/AnthropicApiIT.java
@@ -17,17 +17,23 @@
 package org.springframework.ai.anthropic.api;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
 
 import org.springframework.ai.anthropic.api.AnthropicApi.AnthropicMessage;
 import org.springframework.ai.anthropic.api.AnthropicApi.ChatCompletionRequest;
 import org.springframework.ai.anthropic.api.AnthropicApi.ChatCompletionResponse;
 import org.springframework.ai.anthropic.api.AnthropicApi.ContentBlock;
+import org.springframework.ai.anthropic.api.AnthropicApi.EventType;
 import org.springframework.ai.anthropic.api.AnthropicApi.Role;
 import org.springframework.http.ResponseEntity;
+import org.springframework.util.CollectionUtils;
+import org.springframework.util.StringUtils;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -40,6 +46,8 @@
 @EnabledIfEnvironmentVariable(named = "ANTHROPIC_API_KEY", matches = ".+")
 public class AnthropicApiIT {
 
+	private static final Logger logger = LoggerFactory.getLogger(AnthropicApiIT.class);
+
 	AnthropicApi anthropicApi = new AnthropicApi(System.getenv("ANTHROPIC_API_KEY"));
 
 	@Test
@@ -48,17 +56,26 @@ void chatCompletionEntity() {
 		AnthropicMessage chatCompletionMessage = new AnthropicMessage(List.of(new ContentBlock("Tell me a Joke?")),
 				Role.USER);
 		ResponseEntity<ChatCompletionResponse> response = this.anthropicApi
-			.chatCompletionEntity(new ChatCompletionRequest(AnthropicApi.ChatModel.CLAUDE_3_OPUS.getValue(),
-					List.of(chatCompletionMessage), null, 100, 0.8, false));
+			.chatCompletionEntity(ChatCompletionRequest.builder()
+				.model(AnthropicApi.ChatModel.CLAUDE_3_OPUS.getValue())
+				.messages(List.of(chatCompletionMessage))
+				.maxTokens(100)
+				.temperature(0.8)
+				.stream(false)
+				.build());
 
-		System.out.println(response);
+		logger.info("Non-Streaming Response: {}", response.getBody());
 		assertThat(response).isNotNull();
 		assertThat(response.getBody()).isNotNull();
+		assertThat(response.getBody().content()).isNotEmpty();
+		assertThat(response.getBody().content().get(0).text()).isNotBlank();
+		assertThat(response.getBody().stopReason()).isEqualTo("end_turn");
 	}
 
 	@Test
 	void chatCompletionWithThinking() {
-		AnthropicMessage chatCompletionMessage = new AnthropicMessage(List.of(new ContentBlock("Tell me a Joke?")),
+		AnthropicMessage chatCompletionMessage = new AnthropicMessage(
+				List.of(new ContentBlock("Are there an infinite number of prime numbers such that n mod 4 == 3?")),
 				Role.USER);
 
 		ChatCompletionRequest request = ChatCompletionRequest.builder()
@@ -73,20 +90,31 @@ void chatCompletionWithThinking() {
 
 		assertThat(response).isNotNull();
 		assertThat(response.getBody()).isNotNull();
+		assertThat(response.getBody().content()).isNotEmpty();
+
+		boolean foundThinkingBlock = false;
+		boolean foundTextBlock = false;
 
 		List<ContentBlock> content = response.getBody().content();
 		for (ContentBlock block : content) {
 			if (block.type() == ContentBlock.Type.THINKING) {
 				assertThat(block.thinking()).isNotBlank();
 				assertThat(block.signature()).isNotBlank();
+				foundThinkingBlock = true;
 			}
+			// Note: Redacted thinking might occur if budget is exceeded or other reasons.
 			if (block.type() == ContentBlock.Type.REDACTED_THINKING) {
 				assertThat(block.data()).isNotBlank();
 			}
 			if (block.type() == ContentBlock.Type.TEXT) {
 				assertThat(block.text()).isNotBlank();
+				foundTextBlock = true;
 			}
 		}
+
+		assertThat(foundThinkingBlock).isTrue();
+		assertThat(foundTextBlock).isTrue();
+		assertThat(response.getBody().stopReason()).isEqualTo("end_turn");
 	}
 
 	@Test
@@ -95,15 +123,125 @@ void chatCompletionStream() {
 		AnthropicMessage chatCompletionMessage = new AnthropicMessage(List.of(new ContentBlock("Tell me a Joke?")),
 				Role.USER);
 
-		Flux<ChatCompletionResponse> response = this.anthropicApi.chatCompletionStream(new ChatCompletionRequest(
-				AnthropicApi.ChatModel.CLAUDE_3_OPUS.getValue(), List.of(chatCompletionMessage), null, 100, 0.8, true));
+		Flux<ChatCompletionResponse> response = this.anthropicApi.chatCompletionStream(ChatCompletionRequest.builder()
+			.model(AnthropicApi.ChatModel.CLAUDE_3_OPUS.getValue())
+			.messages(List.of(chatCompletionMessage))
+			.maxTokens(100)
+			.temperature(0.8)
+			.stream(true)
+			.build());
 
 		assertThat(response).isNotNull();
 
-		List<ChatCompletionResponse> bla = response.collectList().block();
-		assertThat(bla).isNotNull();
+		List<ChatCompletionResponse> results = response.collectList().block();
+		assertThat(results).isNotNull().isNotEmpty();
+
+		results.forEach(chunk -> logger.info("Streaming Chunk: {}", chunk));
+
+		// Verify the stream contains actual text content deltas
+		String aggregatedText = results.stream()
+			.filter(r -> !CollectionUtils.isEmpty(r.content()))
+			.flatMap(r -> r.content().stream())
+			.filter(cb -> cb.type() == ContentBlock.Type.TEXT_DELTA)
+			.map(ContentBlock::text)
+			.collect(Collectors.joining());
+		assertThat(aggregatedText).isNotBlank();
+
+		// Verify the final state
+		ChatCompletionResponse lastMeaningfulResponse = results.stream()
+			.filter(r -> StringUtils.hasText(r.stopReason()))
+			.reduce((first, second) -> second)
+			.orElse(results.get(results.size() - 1)); // Fallback to very last if no stop
+
+		// StopReason found earlier
+		assertThat(lastMeaningfulResponse.stopReason()).isEqualTo("end_turn");
+		assertThat(lastMeaningfulResponse.usage()).isNotNull();
+		assertThat(lastMeaningfulResponse.usage().outputTokens()).isPositive();
+	}
+
+	@Test
+	void chatCompletionStreamWithThinking() {
+		AnthropicMessage chatCompletionMessage = new AnthropicMessage(
+				List.of(new ContentBlock("Are there an infinite number of prime numbers such that n mod 4 == 3?")),
+				Role.USER);
+
+		ChatCompletionRequest request = ChatCompletionRequest.builder()
+			.model(AnthropicApi.ChatModel.CLAUDE_3_7_SONNET.getValue())
+			.messages(List.of(chatCompletionMessage))
+			.maxTokens(2048)
+			.temperature(1.0)
+			.stream(true)
+			.thinking(new ChatCompletionRequest.ThinkingConfig(AnthropicApi.ThinkingType.ENABLED, 1024))
+			.build();
+
+		Flux<ChatCompletionResponse> responseFlux = this.anthropicApi.chatCompletionStream(request);
+
+		assertThat(responseFlux).isNotNull();
+
+		List<ChatCompletionResponse> results = responseFlux.collectList().block();
+		assertThat(results).isNotNull().isNotEmpty();
+
+		results.forEach(chunk -> logger.info("Streaming Thinking Chunk: {}", chunk));
+
+		// Verify MESSAGE_START event exists
+		assertThat(results.stream().anyMatch(r -> EventType.MESSAGE_START.name().equals(r.type()))).isTrue();
+		assertThat(results.get(0).id()).isNotBlank();
+		assertThat(results.get(0).role()).isEqualTo(Role.ASSISTANT);
 
-		bla.stream().forEach(r -> System.out.println(r));
+		// Verify presence of THINKING_DELTA content
+		boolean foundThinkingDelta = results.stream()
+			.filter(r -> !CollectionUtils.isEmpty(r.content()))
+			.flatMap(r -> r.content().stream())
+			.anyMatch(cb -> cb.type() == ContentBlock.Type.THINKING_DELTA && StringUtils.hasText(cb.thinking()));
+		assertThat(foundThinkingDelta).as("Should find THINKING_DELTA content").isTrue();
+
+		// Verify presence of SIGNATURE_DELTA content
+		boolean foundSignatureDelta = results.stream()
+			.filter(r -> !CollectionUtils.isEmpty(r.content()))
+			.flatMap(r -> r.content().stream())
+			.anyMatch(cb -> cb.type() == ContentBlock.Type.SIGNATURE_DELTA && StringUtils.hasText(cb.signature()));
+		assertThat(foundSignatureDelta).as("Should find SIGNATURE_DELTA content").isTrue();
+
+		// Verify presence of TEXT_DELTA content (the actual answer)
+		boolean foundTextDelta = results.stream()
+			.filter(r -> !CollectionUtils.isEmpty(r.content()))
+			.flatMap(r -> r.content().stream())
+			.anyMatch(cb -> cb.type() == ContentBlock.Type.TEXT_DELTA && StringUtils.hasText(cb.text()));
+		assertThat(foundTextDelta).as("Should find TEXT_DELTA content").isTrue();
+
+		// Combine text deltas to check final answer structure
+		String aggregatedText = results.stream()
+			.filter(r -> !CollectionUtils.isEmpty(r.content()))
+			.flatMap(r -> r.content().stream())
+			.filter(cb -> cb.type() == ContentBlock.Type.TEXT_DELTA)
+			.map(ContentBlock::text)
+			.collect(Collectors.joining());
+		assertThat(aggregatedText).as("Aggregated text response should not be blank").isNotBlank();
+		logger.info("Aggregated Text from Stream: {}", aggregatedText);
+
+		// Verify the final state (stop reason and usage)
+		ChatCompletionResponse finalStateEvent = results.stream()
+			.filter(r -> StringUtils.hasText(r.stopReason()))
+			.reduce((first, second) -> second)
+			.orElse(null);
+
+		assertThat(finalStateEvent).as("Should find an event with stopReason").isNotNull();
+		assertThat(finalStateEvent.stopReason()).isEqualTo("end_turn");
+		assertThat(finalStateEvent.usage()).isNotNull();
+		assertThat(finalStateEvent.usage().outputTokens()).isPositive();
+		assertThat(finalStateEvent.usage().inputTokens()).isPositive();
+
+		// Verify presence of key event types
+		assertThat(results.stream().anyMatch(r -> EventType.CONTENT_BLOCK_START.name().equals(r.type())))
+			.as("Should find CONTENT_BLOCK_START event")
+			.isTrue();
+		assertThat(results.stream().anyMatch(r -> EventType.CONTENT_BLOCK_STOP.name().equals(r.type())))
+			.as("Should find CONTENT_BLOCK_STOP event")
+			.isTrue();
+		assertThat(results.stream()
+			.anyMatch(r -> EventType.MESSAGE_STOP.name().equals(r.type()) || StringUtils.hasText(r.stopReason())))
+			.as("Should find MESSAGE_STOP or MESSAGE_DELTA with stopReason")
+			.isTrue();
 	}
 
 	@Test
@@ -112,8 +250,13 @@ void chatCompletionStreamError() {
 				Role.USER);
 		AnthropicApi api = new AnthropicApi("FAKE_KEY_FOR_ERROR_RESPONSE");
 
-		Flux<ChatCompletionResponse> response = api.chatCompletionStream(new ChatCompletionRequest(
-				AnthropicApi.ChatModel.CLAUDE_3_OPUS.getValue(), List.of(chatCompletionMessage), null, 100, 0.8, true));
+		Flux<ChatCompletionResponse> response = api.chatCompletionStream(ChatCompletionRequest.builder()
+			.model(AnthropicApi.ChatModel.CLAUDE_3_OPUS.getValue())
+			.messages(List.of(chatCompletionMessage))
+			.maxTokens(100)
+			.temperature(0.8)
+			.stream(true)
+			.build());
 
 		assertThat(response).isNotNull();
 
diff --git a/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/client/AnthropicChatClientIT.java b/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/client/AnthropicChatClientIT.java
index 556cd548c70..2349469543f 100644
--- a/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/client/AnthropicChatClientIT.java
+++ b/models/spring-ai-anthropic/src/test/java/org/springframework/ai/anthropic/client/AnthropicChatClientIT.java
@@ -212,7 +212,7 @@ void functionCallTest() {
 
 		// @formatter:off
 		String response = ChatClient.create(this.chatModel).prompt()
-				.user("What's the weather like in San Francisco, Tokyo, and Paris?  Use Celsius.")
+				.user("What's the weather like in San Francisco (California, USA), Tokyo (Japan), and Paris (France)? Use Celsius.")
 				.tools(FunctionToolCallback.builder("getCurrentWeather", new MockWeatherService())
 					.inputType(MockWeatherService.Request.class)
 					.build())