Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 56 additions & 13 deletions internal/channels/telegram/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func stripHTML(s string) string {
}

// isRetryableNetworkErr checks if a Telegram API error is a transient network error worth retrying.
// Includes post-connect network stalls and 5xx server errors.
func isRetryableNetworkErr(err error) bool {
if err == nil {
return false
Expand All @@ -48,7 +49,24 @@ func isRetryableNetworkErr(err error) bool {
strings.Contains(s, "connection reset") ||
strings.Contains(s, "broken pipe") ||
strings.Contains(s, "EOF") ||
strings.Contains(s, "lookup") // DNS resolution failure
strings.Contains(s, "lookup") || // DNS resolution failure
strings.Contains(s, "502") || // Bad Gateway
strings.Contains(s, "503") || // Service Unavailable
strings.Contains(s, "504") // Gateway Timeout
}

// isPostConnectNetworkErr check if the error likely occurred AFTER reaching the server
// (timeout, connection reset, EOF) vs before (DNS lookup failure).
func isPostConnectNetworkErr(err error) bool {
if err == nil {
return false
}
s := err.Error()
// Exclude "lookup" (DNS) and "connection refused" as they are safe pre-connect errors.
return (strings.Contains(s, "timeout") ||
strings.Contains(s, "connection reset") ||
strings.Contains(s, "broken pipe") ||
strings.Contains(s, "EOF")) && !strings.Contains(s, "lookup")
}

// retrySend wraps a Telegram send call with retry logic for transient network errors.
Expand Down Expand Up @@ -185,11 +203,33 @@ func (c *Channel) Send(ctx context.Context, msg bus.OutboundMessage) error {
startChunk := 0
if pID, ok := c.placeholders.Load(localKey); ok {
c.placeholders.Delete(localKey)
if err := c.editMessage(ctx, chatID, pID.(int), chunks[0]); err == nil {
startChunk = 1 // first chunk edited into stream message
msgID := pID.(int)

if msgID == -1 {
// SIGNAL from stream: A message transport send likely landed but ID was never retrieved.
// Swallow the first chunk ONLY if there are more chunks to come (minimizes visible duplicate).
// If it is the ONLY chunk, we deliver it anyway to guarantee the user sees the answer.
if len(chunks) > 1 {
slog.Warn("telegram: ghost message detected, skipping first chunk of multi-chunk response", "chat_id", chatID)
startChunk = 1
}
} else {
// Edit failed (message deleted externally, etc.) — delete and send all fresh
_ = c.deleteMessage(ctx, chatID, pID.(int))
err := c.editMessage(ctx, chatID, msgID, chunks[0])
if err == nil {
startChunk = 1 // first chunk edited into stream message
} else if isPostConnectNetworkErr(err) && len(chunks) > 1 {
// Mid-stream timeout/lost connection: the edit likely reached Telegram
// but the response was lost. Swallow and skip chunk 0 ONLY for multi-chunk
// messages where the rest of the answer is still coming.
slog.Warn("telegram: final edit timed out or lost, skipping chunk 0 of multi-chunk response",
"chat_id", chatID, "message_id", msgID, "error", err)
startChunk = 1
} else {
// Edit failed definitely (400 rejection), or a single-chunk edit timed out.
// For single-chunk answers, we delete (best-effort) and send fresh to
// guarantee the user gets the content.
_ = c.deleteMessage(ctx, chatID, msgID)
}
}
}

Expand Down Expand Up @@ -543,19 +583,22 @@ func (c *Channel) sendDocument(ctx context.Context, chatID telego.ChatID, filePa
}

// editMessage edits an existing message's text.
// Uses retrySend since edits are idempotent and may fail on transient network issues.
func (c *Channel) editMessage(ctx context.Context, chatID int64, messageID int, htmlText string) error {
editMsg := tu.EditMessageText(tu.ID(chatID), messageID, htmlText)
editMsg.ParseMode = telego.ModeHTML

_, err := c.bot.EditMessageText(ctx, editMsg)
if err != nil {
// Ignore "message is not modified" errors (idempotent edit)
if messageNotModifiedRe.MatchString(err.Error()) {
return nil
return c.retrySend(ctx, "editMessage", nil, func(ctx context.Context) error {
_, err := c.bot.EditMessageText(ctx, editMsg)
if err != nil {
// Ignore "message is not modified" errors (idempotent edit)
if messageNotModifiedRe.MatchString(err.Error()) {
return nil
}
return err
}
return err
}
return nil
return nil
})
}

// deleteMessage deletes a message from the chat.
Expand Down
15 changes: 14 additions & 1 deletion internal/channels/telegram/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type DraftStream struct {
draftID int // sendMessageDraft draft_id (0 = message transport)
useDraft bool // true = draft transport, false = message transport
draftFailed bool // true = draft API rejected permanently, using message transport
sendMayHaveLanded bool // true = initial sendMessage was attempted and may have landed (even if timed out)
}

// NewDraftStream creates a new streaming preview manager.
Expand Down Expand Up @@ -199,6 +200,7 @@ func (ds *DraftStream) flush(ctx context.Context) error {
if sendThreadID := resolveThreadIDForSend(ds.messageThreadID); sendThreadID > 0 {
params.MessageThreadID = sendThreadID
}
ds.sendMayHaveLanded = true
msg, err := ds.bot.SendMessage(ctx, params)
// TS ref: withTelegramThreadFallback — retry without thread ID when topic is deleted.
if err != nil && params.MessageThreadID != 0 && threadNotFoundRe.MatchString(err.Error()) {
Expand All @@ -207,6 +209,10 @@ func (ds *DraftStream) flush(ctx context.Context) error {
msg, err = ds.bot.SendMessage(ctx, params)
}
if err != nil {
if isPostConnectNetworkErr(err) {
slog.Warn("stream: initial sendMessage timed out or lost. Treating as landed to avoid duplicate.", "error", err)
return nil // treat as successful but with unknown messageID
}
slog.Debug("stream: failed to send initial message", "error", err)
return err
}
Expand Down Expand Up @@ -312,10 +318,17 @@ func (c *Channel) CreateStream(ctx context.Context, chatID string, firstStream b
// Also stops any thinking animation for the chat.
// Implements channels.StreamingChannel.
func (c *Channel) FinalizeStream(ctx context.Context, chatID string, stream channels.ChannelStream) {
if msgID := stream.MessageID(); msgID != 0 {
msgID := stream.MessageID()
if msgID != 0 {
// Hand off the stream message to Send() for final formatted edit.
c.placeholders.Store(chatID, msgID)
slog.Info("stream: ended, handing off to Send()", "chat_id", chatID, "message_id", msgID)
} else if ds, ok := stream.(*DraftStream); ok && ds.sendMayHaveLanded && !ds.UsedDraftTransport() {
// The message transport was used but no ID was retrieved (timeout).
// We MUST store a -1 placeholder to signal to Send() that a message
// likely landed and it should NOT send a duplicate, even if it cannot edit.
c.placeholders.Store(chatID, -1)
slog.Warn("stream: initial send landed but ID unknown. Suppressing fallback message to avoid duplicate.", "chat_id", chatID)
}

// Stop thinking animation
Expand Down
Loading