feat(telegram): stream LLM responses via sendMessageDraft#1101
feat(telegram): stream LLM responses via sendMessageDraft#1101alexhoshina merged 23 commits intosipeed:mainfrom
Conversation
Implements real-time token streaming to Telegram using the sendMessageDraft API (telego v1.6.0). Instead of showing only a "Thinking..." placeholder until the full response arrives, users now see partial LLM output appear in the chat as it's generated. The streaming pipeline threads through all layers: - StreamingProvider interface (providers/types.go): opt-in ChatStream() method that receives an onChunk callback with accumulated text - OpenAI-compatible SSE streaming (openai_compat/provider.go): parses SSE events with stream:true, handles text deltas and tool call assembly - Anthropic native streaming (anthropic/provider.go): uses SDK's NewStreaming() for direct Anthropic API connections - HTTPProvider delegation (http_provider.go): delegates ChatStream to the underlying openai_compat provider - StreamingCapable + Streamer interfaces (channels/interfaces.go): opt-in channel capability like TypingCapable/PlaceholderCapable - Telegram streamer (telegram/telegram.go): BeginStream returns a telegramStreamer that throttles sendMessageDraft calls (3s/200 chars) with graceful degradation on API errors - StreamDelegate bridge (bus/bus.go): decouples agent loop from channel manager without tight imports - Manager integration (manager.go): implements StreamDelegate, tracks streamActive state, coordinates with placeholder editing - Agent loop (loop.go): uses ChatStream when both provider and channel support streaming, cancels stream on tool calls, skips PublishOutbound when Finalize already delivered the message Graceful degradation: - Bots without forum/topics mode: first sendMessageDraft error sets failed=true, subsequent Updates become no-ops, Finalize still delivers via SendMessage. User sees normal non-streaming behavior. - Non-streaming providers: type assertion fails, falls back to Chat() - Config opt-out: streaming.enabled (default true) in telegram config Closes sipeed#1098
…ponse When streaming was active, the "Thinking..." placeholder message stayed in the chat because preSend only deleted the tracking entry without removing the actual Telegram message. Now preSend deletes the placeholder via the new MessageDeleter interface when streamActive is set.
- Delete unused Anthropic ChatStream/parseStream (-131 lines) — factory creates HTTPProvider for all OpenAI-compat providers including OpenRouter - Simplify runLLMIteration from 4 to 3 return values (remove unused streamed bool) - Replace managerStreamer struct with finalizeHookStreamer using embedding (Update/Cancel promoted, only Finalize overridden)
Heartbeat messages set SendResponse=false but the streaming path was unconditionally acquiring a streamer, causing HEARTBEAT_OK to leak to Telegram via streamer.Finalize().
f8c1257 to
2d5f320
Compare
…ng config Skip streamer acquisition for heartbeat (NoHistory=true), preventing HEARTBEAT_OK from leaking to Telegram via streamer.Finalize(). Add streaming.enabled to Telegram defaults and example config.
72c15be to
55e2466
Compare
Implements real-time token streaming to Telegram using the sendMessageDraft API (telego v1.6.0). Instead of showing only a "Thinking..." placeholder until the full response arrives, users now see partial LLM output appear in the chat as it's generated. The streaming pipeline threads through all layers: - StreamingProvider interface (providers/types.go): opt-in ChatStream() method that receives an onChunk callback with accumulated text - OpenAI-compatible SSE streaming (openai_compat/provider.go): parses SSE events with stream:true, handles text deltas and tool call assembly - Anthropic native streaming (anthropic/provider.go): uses SDK's NewStreaming() for direct Anthropic API connections - HTTPProvider delegation (http_provider.go): delegates ChatStream to the underlying openai_compat provider - StreamingCapable + Streamer interfaces (channels/interfaces.go): opt-in channel capability like TypingCapable/PlaceholderCapable - Telegram streamer (telegram/telegram.go): BeginStream returns a telegramStreamer that throttles sendMessageDraft calls (3s/200 chars) with graceful degradation on API errors - StreamDelegate bridge (bus/bus.go): decouples agent loop from channel manager without tight imports - Manager integration (manager.go): implements StreamDelegate, tracks streamActive state, coordinates with placeholder editing - Agent loop (loop.go): uses ChatStream when both provider and channel support streaming, cancels stream on tool calls, skips PublishOutbound when Finalize already delivered the message Graceful degradation: - Bots without forum/topics mode: first sendMessageDraft error sets failed=true, subsequent Updates become no-ops, Finalize still delivers via SendMessage. User sees normal non-streaming behavior. - Non-streaming providers: type assertion fails, falls back to Chat() - Config opt-out: streaming.enabled (default true) in telegram config Closes sipeed#1098
…ponse When streaming was active, the "Thinking..." placeholder message stayed in the chat because preSend only deleted the tracking entry without removing the actual Telegram message. Now preSend deletes the placeholder via the new MessageDeleter interface when streamActive is set.
- Delete unused Anthropic ChatStream/parseStream (-131 lines) — factory creates HTTPProvider for all OpenAI-compat providers including OpenRouter - Simplify runLLMIteration from 4 to 3 return values (remove unused streamed bool) - Replace managerStreamer struct with finalizeHookStreamer using embedding (Update/Cancel promoted, only Finalize overridden)
Heartbeat messages set SendResponse=false but the streaming path was unconditionally acquiring a streamer, causing HEARTBEAT_OK to leak to Telegram via streamer.Finalize().
…ng config Skip streamer acquisition for heartbeat (NoHistory=true), preventing HEARTBEAT_OK from leaking to Telegram via streamer.Finalize(). Add streaming.enabled to Telegram defaults and example config.
…coclaw into feat/telegram-streaming # Conflicts: # pkg/agent/loop.go # pkg/providers/types.go
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fix gci import ordering in telegram and anthropic provider, and break long function signature in openai_compat provider to satisfy golines.
|
ready for merge guys @yinwm @alexhoshina @lxowalle |
yinwm
left a comment
There was a problem hiding this comment.
Code Review: Telegram Streaming Implementation
This PR implements real-time LLM response streaming to Telegram using sendMessageDraft API. The overall architecture is well-designed with clean separation of concerns.
🔴 Critical Issues
1. Duplicate Streamer Interface Definition - Potential Type Mismatch
pkg/channels/interfaces.go and pkg/bus/bus.go both define Streamer interface:
// channels/interfaces.go
type Streamer interface {
Update(ctx context.Context, content string) error
Finalize(ctx context.Context, content string) error
Cancel(ctx context.Context)
}
// bus/bus.go
type Streamer interface {
Update(ctx context.Context, content string) error
Finalize(ctx context.Context, content string) error
Cancel(ctx context.Context)
}Problem: These are two different types in Go. manager.go returns channels.Streamer, but bus.GetStreamer declares return type as bus.Streamer. This could cause subtle bugs or compilation issues if the interfaces drift apart.
Recommendation: Keep only one definition. Define Streamer in bus/bus.go and have channels package import and use bus.Streamer. Or use type aliasing.
2. Scanner Buffer Size Limitation in SSE Parsing
pkg/providers/openai_compat/provider.go:247:
scanner := bufio.NewScanner(reader)
for scanner.Scan() {bufio.Scanner has a default max token size of 64KB. If an LLM response contains a single line (without newlines) exceeding 64KB, it will trigger bufio.ErrTooLong.
Recommendation: Increase buffer size:
scanner := bufio.NewScanner(reader)
buf := make([]byte, 0, 1024*1024) // 1MB initial capacity
scanner.Buffer(buf, 10*1024*1024) // max 10MB🟡 Medium Issues
3. DraftID Potential Collision Risk
pkg/channels/telegram/telegram.go:815:
draftID: rand.Intn(1<<31-1) + 1, // non-zero random draft IDmath/rand is not cryptographically secure, and without proper seeding could produce predictable/colliding values.
Recommendation: Use crypto/rand:
import "crypto/rand"
import "encoding/binary"
func randomDraftID() int {
var b [4]byte
rand.Read(b[:])
return int(binary.BigEndian.Uint32(b[:])) | 1 // ensure non-zero
}4. Missing Context Cancellation Check in Stream Parsing
In parseStreamResponse, if the context is cancelled mid-stream, the function continues processing until the stream ends naturally.
Recommendation: Add context check in the parsing loop:
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// process chunk
}5. Silent Failure in Finalize
When Update fails (sets failed=true), Finalize is still called and sends the message. However, if Finalize also fails after the fallback, the user might not see any message at all.
Recommendation: Consider retry mechanism or logging to persistent storage for recovery.
📋 Suggestions
6. Add Observability/Metrics
Streaming success rate, fallback rate, and latency are important for operations.
Recommendation: Add Prometheus metrics or structured logging.
7. Configuration Could Be More Flexible
The throttle constants are hardcoded:
const (
streamThrottleInterval = 3 * time.Second
streamMinGrowth = 200
)Recommendation: Consider making these configurable for different deployment scenarios.
✅ Well Done
- Graceful Degradation: The
failedflag pattern provides smooth fallback - Interface Design:
StreamingCapableas optional interface maintains backward compatibility - Throttling Strategy: Prevents Telegram API rate limiting
- Tool Call Handling: Correctly cancels stream when tool calls are detected
- Heartbeat Fix: Properly guards internal messages from triggering streaming
Summary
| Severity | Count |
|---|---|
| 🔴 Critical | 2 |
| 🟡 Medium | 3 |
| 🟢 Suggestion | 2 |
Recommendation: Address at least the two critical issues before merging.
🤖 Generated with Claude Code
- Deduplicate Streamer interface: alias channels.Streamer to bus.Streamer to prevent type drift across packages - Increase SSE scanner buffer to 10MB max to handle large single-line responses that exceed bufio.Scanner's 64KB default - Switch draftID generation from math/rand to crypto/rand for collision-resistant random IDs - Add context cancellation check in SSE parsing loop so cancelled streams stop processing immediately - Log Finalize failures with chat_id and content length for debugging silent message delivery failures
22bba53 to
fa3c00a
Compare
|
Thanks for the thorough review @yinwm! Pushed fixes for all actionable items: Critical:
Medium: Skipped #6 (observability/metrics) and #7 (configurable throttle) for now — can follow up in separate PRs if needed. EDIT: you know what, let me do the throttle config too 😁 |
Move hardcoded streamThrottleInterval (3s) and streamMinGrowth (200) into StreamingConfig so they can be tuned per deployment via config or environment variables.
Resolve conflicts in agent/loop.go (streaming + candidate routing) and channels/interfaces.go (bus + commands imports).
|
forgot to write you.. ready to merge! 🙌 |
Resolve conflicts in pkg/agent/loop.go (keep both reasoning fallback and stream finalize) and pkg/providers/openai_compat/provider.go (keep ChatStream + HTML error helpers + io.Reader parseResponse). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
guys, just a friendly reminder that ready to merge @yinwm @alexhoshina |
I've briefly reviewed the changes related to the channel, and they should be fine. The other parts might need to be reviewed by @yinwm. |
|
hey bro, just a reminder @yinwm |
|
Perhaps the CI issues need to be resolved, as all three checks have failed |
These two functions called undefined parseChatID. Use parseTelegramChatID with _ for the unused threadID instead of adding a wrapper function. Fixes all three CI checks. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
8acb6a1 to
4669e5a
Compare
|
Updated — removed the wrapper function, now just uses |
yinwm
left a comment
There was a problem hiding this comment.
Review: Ready to Merge ✅
This PR is well-designed with a solid streaming pipeline implementation and robust graceful degradation.
Before merging, please address:
- 🗑️ Remove the
pico-echo-serverbinary file that was accidentally committed - 🔀 Resolve any conflicts with the main branch
Once these are addressed, this is good to merge. The remaining suggestions I had (tool call index handling, unit tests) are incremental improvements that can be addressed in follow-up PRs.
… binary Resolves merge conflicts in config, agent loop, bus, defaults, and openai_compat provider. Updates streaming code to use refactored common package helpers (ReadAndParseResponse, HandleErrorResponse, AsInt, AsFloat). Removes accidentally committed pico-echo-server binary. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Resolved the two items from the latest review:
All checks pass locally: |
|
ready for merge |
| } | ||
|
|
||
| func (s *finalizeHookStreamer) Finalize(ctx context.Context, content string) error { | ||
| s.onFinalize() |
There was a problem hiding this comment.
Setting streamActive to True too early may cause the message sending to not fall back to the regular path after Streamer.Finalize fails.
Keep both streamActive map from streaming feature and channelHashes map + initChannels signature change from main. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move onFinalize hook to run after Streamer.Finalize succeeds, so that if Finalize fails the streamActive flag stays false and the regular placeholder fallback path remains available. Addresses review feedback from @alexhoshina. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
Implements real-time LLM response streaming to Telegram using the
sendMessageDraftAPI, replacing the current "Thinking..." placeholder pattern with live token-by-token output as it's generated.Closes #1098
What changed
New interfaces and capabilities
StreamingProvider(providers/types.go): opt-inChatStream()method withonChunkcallback receiving accumulated textStreamingCapable+Streamer(channels/interfaces.go): opt-in channel capability (likeTypingCapable/PlaceholderCapable)StreamDelegate(bus/bus.go): decouples agent loop from channel managerProvider streaming (works with any OpenAI-compatible API)
openai_compat/provider.go: SSE streaming parser (stream: true) — handles text deltas, tool call assembly, and usage tracking. Uses a separate HTTP client without timeout for long streams (context cancellation provides safety).http_provider.go: delegatesChatStreamtoopenai_compatanthropic/provider.go: native SDK streaming viaMessages.NewStreaming()for direct Anthropic API connectionsTelegram streaming
telegram/telegram.go:BeginStream()returns atelegramStreamerthat callssendMessageDraftwith throttling (3s / 200 chars minimum) to stay within Telegram rate limitssendMessageDrafterror (e.g., no forum/topics mode) setsfailed=true— subsequentUpdate()calls become no-ops whileFinalize()still delivers viaSendMessagestreaming.enabled(default:true) in telegram channel configAgent loop + manager integration
agent/loop.go: when both provider and channel support streaming, usesChatStreamwith streaming callback. Cancels stream on tool calls. SkipsPublishOutboundwhenFinalizealready delivered.channels/manager.go: implementsStreamDelegate, tracksstreamActivestate per channel+chatID, coordinates with placeholder editing inpreSendchannels/base.go: always shows typing + placeholder on inbound (streaming coordination happens on the output side viastreamActivemap)Fallback behavior
sendMessageDraftfails → streamer degrades silently →Finalizesends viaSendMessageChat()→ normal placeholder flowstreaming.enabled: falseBeginStreamreturns error →GetStreamerreturns nil → normal placeholder flowTest plan
streamed=truein logs with OpenRouter providergo build ./pkg/...andgo vet ./pkg/...pass