Skip to content

AI / LLM

AI/LLM streaming module for Atmosphere. Provides @AiEndpoint, @Prompt, StreamingSession, the AgentRuntime SPI for auto-detected AI framework adapters, and a built-in OpenAiCompatibleClient that works with Gemini, OpenAI, Ollama, and any OpenAI-compatible API.

<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-ai</artifactId>
<version>${project.version}</version>
</dependency>

Atmosphere has two pluggable SPI layers. AsyncSupport adapts web containers — Jetty, Tomcat, Undertow. AgentRuntime adapts AI frameworks — Spring AI, LangChain4j, Google ADK, Embabel, and Koog. Same design pattern, same discovery mechanism:

ConcernTransport layerAI layer
SPI interfaceAsyncSupportAgentRuntime
What it adaptsWeb containers (Jetty, Tomcat, Undertow)AI frameworks (Spring AI, LangChain4j, ADK, Embabel, Koog)
DiscoveryClasspath scanningServiceLoader
ResolutionBest available containerHighest priority() among isAvailable()
Initializationinit(ServletConfig)configure(LlmSettings)
Core methodservice(req, res)execute(AgentExecutionContext, StreamingSession)
FallbackBlockingIOCometSupportBuilt-in AgentRuntime (OpenAI-compatible)

This is the Servlet model for AI agents: write your @Agent once, run it on LangChain4j, Google ADK, Spring AI, or standalone — determined by classpath.

@AiEndpoint(path = "/ai/chat",
systemPrompt = "You are a helpful assistant",
conversationMemory = true)
public class MyChatBot {
@Prompt
public void onPrompt(String message, StreamingSession session) {
session.stream(message); // auto-detects Spring AI, LangChain4j, ADK, Embabel, or built-in
}
}

The @AiEndpoint annotation replaces the boilerplate of @ManagedService + @Ready + @Disconnect + @Message for AI streaming use cases. The @Prompt method runs on a virtual thread.

session.stream(message) auto-detects the best available AgentRuntime implementation via ServiceLoader — drop an adapter JAR on the classpath and it just works.

The AgentRuntime SPI dispatches the entire agent loop — tool calling, memory, RAG, retries — to the AI framework on the classpath. When multiple implementations are available, the one with the highest priority() that reports isAvailable() wins.

public interface AgentRuntime {
String name(); // e.g. "langchain4j", "spring-ai"
boolean isAvailable(); // checks classpath dependencies
int priority(); // higher wins
void configure(AiConfig.LlmSettings settings); // called once after resolution
Set<AiCapability> capabilities(); // feature discovery
void execute(AgentExecutionContext context, StreamingSession session); // full agent loop
}
Classpath JARAuto-detected AgentRuntimePriority
atmosphere-ai (default)Built-in OpenAiCompatibleClient (Gemini, OpenAI, Ollama)0
atmosphere-spring-aiSpring AI ChatClient100
atmosphere-langchain4jLangChain4j StreamingChatLanguageModel100
atmosphere-adkGoogle ADK Runner100
atmosphere-embabelEmbabel AgentPlatform100
atmosphere-koogJetBrains Koog AIAgent100
atmosphere-semantic-kernelMicrosoft Semantic Kernel ChatCompletionService100

To switch runtimes, change a single Maven dependency — no code changes needed.

Enable multi-turn conversations with one annotation attribute:

@AiEndpoint(path = "/ai/chat",
systemPrompt = "You are a helpful assistant",
conversationMemory = true,
maxHistoryMessages = 20)
public class MyChat {
@Prompt
public void onPrompt(String message, StreamingSession session) {
session.stream(message);
}
}

When conversationMemory = true, the framework:

  1. Captures each user message and the streamed assistant response (via MemoryCapturingSession)
  2. Stores them as conversation turns per AtmosphereResource
  3. Injects the full history into every subsequent AiRequest
  4. Clears the history when the resource disconnects

The default implementation is InMemoryConversationMemory (capped at maxHistoryMessages, default 20). For external storage, implement the AiConversationMemory SPI:

public interface AiConversationMemory {
List<ChatMessage> getHistory(String conversationId);
void addMessage(String conversationId, ChatMessage message);
void clear(String conversationId);
int maxMessages();
}

@AiTool — Framework-Agnostic Tool Calling

Section titled “@AiTool — Framework-Agnostic Tool Calling”

Declare tools with @AiTool and they work with any AI backend — Spring AI, LangChain4j, Google ADK. No framework-specific annotations needed.

public class AssistantTools {
@AiTool(name = "get_weather",
description = "Returns a weather report for a city")
public String getWeather(
@Param(value = "city", description = "City name to get weather for")
String city) {
return weatherService.lookup(city);
}
@AiTool(name = "convert_temperature",
description = "Converts between Celsius and Fahrenheit")
public String convertTemperature(
@Param(value = "value", description = "Temperature value") double value,
@Param(value = "from_unit", description = "'C' or 'F'") String fromUnit) {
return "C".equalsIgnoreCase(fromUnit)
? String.format("%.1f°C = %.1f°F", value, value * 9.0 / 5.0 + 32)
: String.format("%.1f°F = %.1f°C", value, (value - 32) * 5.0 / 9.0);
}
}
@AiEndpoint(path = "/ai/chat",
systemPrompt = "You are a helpful assistant",
conversationMemory = true,
tools = AssistantTools.class)
public class MyChat {
@Prompt
public void onPrompt(String message, StreamingSession session) {
session.stream(message); // tools are automatically available to the LLM
}
}
@AiTool methods
↓ scan at startup
DefaultToolRegistry (global)
↓ selected per-endpoint via tools = {...}
AiRequest.withTools(tools)
↓ bridged to backend-native format
LangChain4jToolBridge / SpringAiToolBridge / AdkToolBridge
↓ LLM decides to call a tool
ToolExecutor.execute(args) → result fed back to LLM
StreamingSession → WebSocket → browser

The tool bridge layer converts @AiTool to the native format at runtime:

BackendBridge ClassNative Format
LangChain4jLangChain4jToolBridgeToolSpecification
Spring AISpringAiToolBridgeToolCallback
Google ADKAdkToolBridgeBaseTool
@AiTool (Atmosphere)@Tool (LangChain4j)FunctionCallback (Spring AI)
PortableAny backendLangChain4j onlySpring AI only
Parameter metadata@Param annotation@P annotationJSON Schema
RegistrationToolRegistry (global)Per-servicePer-ChatClient

To swap the AI backend, change only the Maven dependency — no tool code changes:

<!-- Use LangChain4j -->
<artifactId>atmosphere-langchain4j</artifactId>
<!-- Or Spring AI -->
<artifactId>atmosphere-spring-ai</artifactId>
<!-- Or Google ADK -->
<artifactId>atmosphere-adk</artifactId>

See the spring-boot-ai-tools sample.

Cross-cutting concerns (RAG, guardrails, logging) go through AiInterceptor, not subclassing:

@AiEndpoint(path = "/ai/chat", interceptors = {RagInterceptor.class, LoggingInterceptor.class})
public class MyChat { ... }
public class RagInterceptor implements AiInterceptor {
@Override
public AiRequest preProcess(AiRequest request, AtmosphereResource resource) {
String context = vectorStore.search(request.message());
return request.withMessage(context + "\n\n" + request.message());
}
}

The AI module includes filters and middleware that sit between the @Prompt method and the LLM:

ClassWhat it does
PiiRedactionFilterBuffers messages to sentence boundaries, redacts email/phone/SSN/CC
ContentSafetyFilterPluggable SafetyChecker SPI — block, redact, or pass
CostMeteringFilterPer-session/broadcaster message counting with budget enforcement
RoutingLlmClientRoute by content, model, cost, or latency rules
FanOutStreamingSessionConcurrent N-model streaming: AllResponses, FirstComplete, FastestStreamingTexts
StreamingTextBudgetManagerPer-user/org budgets with graceful degradation
AiResponseCacheInspectorCache control for AI messages in BroadcasterCache
AiResponseCacheListenerAggregate per-session events instead of per-message noise

RoutingLlmClient supports cost-based and latency-based routing rules:

var router = RoutingLlmClient.builder(defaultClient, "gemini-2.5-flash")
.route(RoutingRule.costBased(5.0, List.of(
new ModelOption(openaiClient, "gpt-4o", 0.01, 200, 10),
new ModelOption(geminiClient, "gemini-flash", 0.001, 50, 5))))
.route(RoutingRule.latencyBased(100, List.of(
new ModelOption(ollamaClient, "llama3.2", 0.0, 30, 3),
new ModelOption(openaiClient, "gpt-4o-mini", 0.005, 80, 7))))
.build();

You can bypass @AiEndpoint and use adapters directly:

Spring AI:

var session = StreamingSessions.start(resource);
springAiAdapter.stream(chatClient, prompt, session);

LangChain4j:

var session = StreamingSessions.start(resource);
model.chat(ChatMessage.userMessage(prompt),
new AtmosphereStreamingResponseHandler(session));

Google ADK:

var session = StreamingSessions.start(resource);
adkAdapter.stream(new AdkRequest(runner, userId, sessionId, prompt), session);

Embabel:

val session = StreamingSessions.start(resource)
embabelAdapter.stream(AgentRequest("assistant") { channel ->
agentPlatform.run(prompt, channel)
}, session)
import { useStreaming } from 'atmosphere.js/react';
function AiChat() {
const { fullText, isStreaming, stats, routing, send } = useStreaming({
request: { url: '/ai/chat', transport: 'websocket' },
});
return (
<div>
<button onClick={() => send('Explain WebSockets')} disabled={isStreaming}>
Ask
</button>
<p>{fullText}</p>
{stats && <small>{stats.totalStreamingTexts} streaming texts</small>}
{routing.model && <small>Model: {routing.model}</small>}
</div>
);
}
var client = AiConfig.get().client();
var assistant = new LlmRoomMember("assistant", client, "gpt-5",
"You are a helpful coding assistant");
Room room = rooms.room("dev-chat");
room.joinVirtual(assistant);
// Now when any user sends a message, the LLM responds in the same room

The client receives JSON messages over WebSocket/SSE:

  • {"type":"streaming-text","content":"Hello"} — a single streaming text
  • {"type":"progress","message":"Thinking..."} — status update
  • {"type":"complete"} — stream finished
  • {"type":"error","message":"..."} — stream failed

Configure the built-in client with environment variables:

VariableDescriptionDefault
LLM_MODEremote (cloud) or local (Ollama)remote
LLM_MODELgemini-2.5-flash, gpt-5, o3-mini, llama3.2, …gemini-2.5-flash
LLM_API_KEYAPI key (or GEMINI_API_KEY for Gemini)
LLM_BASE_URLOverride endpoint (auto-detected from model name)auto
ClassDescription
@AiEndpointMarks a class as an AI chat endpoint with a path, system prompt, and interceptors
@PromptMarks the method that handles user messages
@AiToolMarks a method as an AI-callable tool (framework-agnostic)
@ParamDescribes a tool parameter’s name, description, and required flag
AgentRuntimeSPI for AI framework backends (ServiceLoader-discovered)
AiRequestFramework-agnostic request record (message, systemPrompt, model, userId, sessionId, agentId, conversationId, metadata)
AiEventSealed interface: 13 structured event types (TextDelta, ToolStart, ToolResult, AgentStep, EntityStart, etc.)
AiCapabilityEnum for endpoint capability requirements (TEXT_STREAMING, TOOL_CALLING, STRUCTURED_OUTPUT, etc.)
AiInterceptorPre/post processing hooks for RAG, guardrails, logging
AiConversationMemorySPI for conversation history storage
MemoryStrategyPluggable memory selection: MessageWindowStrategy, TokenWindowStrategy, SummarizingStrategy
StructuredOutputParserSPI for JSON Schema generation and typed output parsing (built-in: JacksonStructuredOutputParser)
StreamingSessionDelivers streaming texts, events, progress updates, and metadata to the client
StreamingSessionsFactory for creating StreamingSession instances
OpenAiCompatibleClientBuilt-in HTTP client for OpenAI-compatible APIs
RoutingLlmClientRoutes prompts to different LLM backends based on rules
ToolRegistryGlobal registry for @AiTool definitions
ModelRouterSPI for intelligent model routing and failover
AiGuardrailSPI for pre/post-LLM safety inspection
AiMetricsSPI for AI observability (streaming texts, latency, cost)
ConversationPersistenceSPI for durable conversation storage (Redis, SQLite)
RetryPolicyExponential backoff with circuit-breaker semantics

@RequiresApproval pauses tool execution until the client approves. The virtual thread parks cheaply on a CompletableFuture — no carrier thread consumed.

@AiTool(name = "delete_account", description = "Permanently delete a user account")
@RequiresApproval("This will permanently delete the account. Are you sure?")
public String deleteAccount(@Param("accountId") String accountId) {
return accountService.delete(accountId);
}

When the LLM calls a @RequiresApproval tool, the client receives an approval-required event:

{"event":"approval-required","data":{
"approvalId":"apr_a1b2c3d4e5f6",
"toolName":"delete_account",
"arguments":{"accountId":"user-42"},
"message":"This will permanently delete the account. Are you sure?",
"expiresIn":300
}}

The client responds with:

  • /__approval/apr_a1b2c3d4e5f6/approve — tool executes
  • /__approval/apr_a1b2c3d4e5f6/deny — tool returns cancelled

Default timeout: 5 minutes. Configurable via @RequiresApproval(timeoutSeconds = 120).

  1. AiStreamingSession.wrapApprovalGates() wraps @RequiresApproval tools with ApprovalGateExecutor
  2. When the LLM calls the tool, ApprovalGateExecutor parks the virtual thread on CompletableFuture.get(timeout)
  3. The session emits AiEvent.ApprovalRequired to the client
  4. AiEndpointHandler fast-paths /__approval/ messages to the session’s ApprovalRegistry (before prompt dispatch)
  5. ApprovalRegistry.tryResolve() completes the future, unparking the virtual thread
  6. On transport reconnect, a fallback scan across all active sessions ensures the approval reaches the parked thread

When running on Google ADK, Atmosphere also calls toolContext.requestConfirmation() to give ADK native visibility into the approval pause. If ADK resolves a confirmation before Atmosphere (e.g., via its own UI), the ADK denial short-circuits without calling the executor. This creates a two-layer model: Atmosphere-level (cross-runtime) + ADK-native (runtime-specific).

All runtimes with TOOL_CALLING also declare AiCapability.TOOL_APPROVAL (Built-in, Spring AI, LangChain4j, ADK, Koog). SK and Embabel are excluded: SK because its tool bridge is deferred; Embabel because it has no tool-calling path.

The AiCompactionStrategy SPI controls how conversation history is compacted when it exceeds the configured limit. Unlike MemoryStrategy (which selects messages for the next request — read path), compaction permanently reduces stored history (write path).

public interface AiCompactionStrategy {
List<ChatMessage> compact(List<ChatMessage> messages, int maxMessages);
String name();
}

SlidingWindowCompaction (default) — drops the oldest non-system messages until under the limit. System messages are always preserved.

SummarizingCompaction — condenses old messages into a single system-role summary, preserving the most recent messages verbatim. The recent window size is configurable (default: 6).

// Default: sliding window
var memory = new InMemoryConversationMemory(20);
// Custom: summarization with 8-message recent window
var memory = new InMemoryConversationMemory(20, new SummarizingCompaction(8));

AdkCompactionBridge.toAdkConfig() maps Atmosphere compaction settings to ADK’s EventsCompactionConfig for native compaction when using the ADK runtime.

The ArtifactStore SPI provides binary artifact persistence across agent runs. Use cases include agent-generated reports, images, code files, and content shared between coordinated agents.

public interface ArtifactStore {
Artifact save(Artifact artifact); // auto-versions
Optional<Artifact> load(String namespace, String artifactId); // latest version
Optional<Artifact> load(String namespace, String artifactId, int version);
List<Artifact> list(String namespace); // latest of each
boolean delete(String namespace, String artifactId); // all versions
void deleteAll(String namespace);
}
public record Artifact(
String id, // unique identifier
String namespace, // grouping key (session ID, agent name, user ID)
String fileName, // human-readable name ("report.pdf")
String mimeType, // MIME type ("application/pdf")
byte[] data, // binary content (defensively copied)
int version, // auto-incremented per save
Map<String, String> metadata, // arbitrary key-value pairs
Instant createdAt
) { }

Byte arrays are defensively copied on construction and on access — callers cannot mutate persisted data.

  • InMemoryArtifactStore — default, for development and testing. Data does not survive JVM restart.
  • ADK bridgeAdkArtifactBridge.toAdkService() wraps an ArtifactStore as ADK’s BaseArtifactService.

AiInterceptor includes an onDisconnect hook called before conversation memory is cleared. This enables fact extraction, summary persistence, and other cleanup that requires access to the conversation history.

public interface AiInterceptor {
default AiRequest preProcess(AiRequest request, AtmosphereResource resource) { return request; }
default void postProcess(AiRequest request, AtmosphereResource resource) { }
default void onDisconnect(String userId, String conversationId, List<ChatMessage> history) { }
}

LongTermMemoryInterceptor.onDisconnect() uses this to extract facts from the full conversation on session close via OnSessionCloseStrategy.

Execution order: preProcess runs FIFO, postProcess runs LIFO, onDisconnect runs FIFO. Exceptions in one interceptor do not prevent others from being called.

The AiEvent sealed interface provides 15 structured event types emitted via session.emit(). All runtimes map their native events to this common model.

EventDescription
TextDeltaStreaming token
TextCompleteFinal assembled text
ToolStartTool invocation begins (name + arguments)
ToolResultTool executed successfully (name + result)
ToolErrorTool execution failed
AgentStepOrchestration step (ADK agent steps, Embabel planning)
StructuredFieldStructured output field arrival
EntityStart / EntityCompleteStructured entity streaming
RoutingDecisionBackend routing event
ProgressLong-running operation status
HandoffAgent handoff notification
ApprovalRequiredHuman approval gate
ErrorError with recovery hint
CompleteStream completed with usage metadata
SourceAtmosphere Event
ADK event.functionCalls()AiEvent.ToolStart
ADK event.functionResponses()AiEvent.ToolResult
ADK event.author() (non-partial)AiEvent.AgentStep
ADK event.usageMetadata()ai.tokens.input/output/total metadata
Koog onToolCallStartingAiEvent.ToolStart
Koog onToolCallCompletedAiEvent.ToolResult
Koog onToolCallFailedAiEvent.ToolError
Koog StreamFrame.ReasoningDeltaAiEvent.Progress
Embabel MessageOutputChannelEventAiEvent.TextDelta
Embabel ProgressOutputChannelEventAiEvent.AgentStep

Each runtime declares capabilities via AiCapability. The framework uses these for model routing, tool negotiation, and feature discovery.

Available on all runtimes:

CapabilityDescription
TEXT_STREAMINGBasic text streaming
SYSTEM_PROMPTSystem prompt support
CapabilityBuilt-inLangChain4jSpring AIADKEmbabelKoogSK
TOOL_CALLINGYYYYY
STRUCTURED_OUTPUTYYYYYYY
CONVERSATION_MEMORYYYY
TOOL_APPROVALYYYYY
VISIONYYYY
AUDIOYYY
MULTI_MODALYYYY
PROMPT_CACHINGYYY
TOKEN_USAGEYYYYYYY

How structured output works: AiPipeline wraps the streaming session with StructuredOutputCapturingSession and augments the system prompt with JSON-schema instructions before the runtime runs. Any runtime that honors SYSTEM_PROMPT therefore gets STRUCTURED_OUTPUT automatically via the pipeline — no per-runtime adapter code required. BuiltInAgentRuntime additionally enables native jsonMode on the OpenAI-compatible client for provider-level JSON enforcement on top of the pipeline wrap. Source: modules/ai/src/main/java/org/atmosphere/ai/pipeline/AiPipeline.java:128-135, modules/ai/src/main/java/org/atmosphere/ai/llm/BuiltInAgentRuntime.java:72-74.

Capability gaps:

  • EmbabelAgentRuntime does not advertise TOOL_CALLING: Embabel agents expose skills via their own @Agent API, not free-form tool calling. Source: modules/embabel/src/main/kotlin/org/atmosphere/embabel/EmbabelAgentRuntime.kt.
CapabilityBuilt-inLangChain4jSpring AIADKEmbabelKoogSK
AGENT_ORCHESTRATIONYYY

The AbstractAgentRuntimeContractTest base class in atmosphere-ai-test enforces a minimum contract across all runtime adapters.

public abstract class AbstractAgentRuntimeContractTest {
protected abstract AgentRuntime createRuntime();
protected abstract AgentExecutionContext createTextContext();
protected abstract AgentExecutionContext createToolCallContext();
protected abstract AgentExecutionContext createErrorContext();
// Enforced contracts:
// - runtimeDeclaresMinimumCapabilities (TEXT_STREAMING)
// - runtimeHasNonBlankName
// - runtimeIsAvailable
// - textStreamingCompletesSession (10s timeout)
// - toolCallExecutesIfSupported
// - errorContextTriggersSessionError
}

Add atmosphere-ai-test as a test dependency and extend the base class:

<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-ai-test</artifactId>
<scope>test</scope>
</dependency>

The RecordingSession test double captures all events, text chunks, metadata, and errors for assertion. Currently enforced on: ADK, LangChain4j, Spring AI.