Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion agent/claudecode/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,19 @@ func (cs *claudeSession) Alive() bool {
return cs.alive.Load()
}

// ForceKill immediately terminates the underlying Claude Code process
// without waiting for graceful shutdown. This is used when the agent
// is stuck in a thinking loop or unresponsive state.
func (cs *claudeSession) ForceKill() error {
cs.cancel()
if cs.cmd != nil && cs.cmd.Process != nil {
slog.Debug("claudeSession: force killing process", "pid", cs.cmd.Process.Pid)
_ = cs.cmd.Process.Kill()
}
<-cs.done
return nil
}

func (cs *claudeSession) Close() error {
cs.cancel()

Expand Down Expand Up @@ -589,4 +602,3 @@ func filterEnv(env []string, key string) []string {
}
return out
}

1 change: 0 additions & 1 deletion agent/cursor/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func normalizeMode(raw string) string {
}
}


func (a *Agent) Name() string { return "cursor" }
func (a *Agent) CLIBinaryName() string { return "agent" }
func (a *Agent) CLIDisplayName() string { return "Cursor Agent" }
Expand Down
14 changes: 7 additions & 7 deletions agent/gemini/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,9 @@ func TestHandleMessage_MixedDeltaAndNonDelta(t *testing.T) {
"content": "Let me look at the files.",
})
gs.handleEvent(map[string]any{
"type": "tool_use",
"tool_name": "shell",
"tool_id": "t1",
"type": "tool_use",
"tool_name": "shell",
"tool_id": "t1",
"parameters": map[string]any{"command": "ls"},
})
gs.handleEvent(map[string]any{
Expand Down Expand Up @@ -445,10 +445,10 @@ func TestSessionMessage_TextContent(t *testing.T) {

func TestComputeLineDiff(t *testing.T) {
tests := []struct {
name string
old string
new_ string
want string
name string
old string
new_ string
want string
}{
{
"single line fully different",
Expand Down
12 changes: 6 additions & 6 deletions agent/pi/pi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,15 +522,15 @@ func TestHandleMessageUpdate_ThinkingAccumulation(t *testing.T) {

// Multiple thinking deltas should be accumulated.
s.handleEvent(map[string]any{
"type": "message_update",
"type": "message_update",
"assistantMessageEvent": map[string]any{"type": "thinking_delta", "delta": "Let me "},
})
s.handleEvent(map[string]any{
"type": "message_update",
"type": "message_update",
"assistantMessageEvent": map[string]any{"type": "thinking_delta", "delta": "think about "},
})
s.handleEvent(map[string]any{
"type": "message_update",
"type": "message_update",
"assistantMessageEvent": map[string]any{"type": "thinking_delta", "delta": "this."},
})

Expand All @@ -542,7 +542,7 @@ func TestHandleMessageUpdate_ThinkingAccumulation(t *testing.T) {

// thinking_end triggers the accumulated event.
s.handleEvent(map[string]any{
"type": "message_update",
"type": "message_update",
"assistantMessageEvent": map[string]any{"type": "thinking_end"},
})

Expand All @@ -564,7 +564,7 @@ func TestHandleMessageUpdate_ThinkingEndEmpty(t *testing.T) {

// thinking_end with no prior deltas should not emit.
s.handleEvent(map[string]any{
"type": "message_update",
"type": "message_update",
"assistantMessageEvent": map[string]any{"type": "thinking_end"},
})

Expand All @@ -580,7 +580,7 @@ func TestHandleMessageUpdate_ThinkingDeltaEmpty(t *testing.T) {

// Empty deltas should not grow the buffer.
s.handleEvent(map[string]any{
"type": "message_update",
"type": "message_update",
"assistantMessageEvent": map[string]any{"type": "thinking_delta", "delta": ""},
})

Expand Down
2 changes: 1 addition & 1 deletion core/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"fmt"
"log/slog"
"os"
"reflect"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
Expand Down
2 changes: 1 addition & 1 deletion core/dir_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,4 @@ func (dh *DirHistory) saveLocked() {
if err := AtomicWriteFile(dh.storePath, data, 0644); err != nil {
slog.Error("dir_history: failed to write", "path", dh.storePath, "error", err)
}
}
}
10 changes: 9 additions & 1 deletion core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5291,7 +5291,15 @@ func (e *Engine) stopInteractiveSession(sessionKey string, quietPlatform Platfor
quietMode := state.quiet
agentSession := state.agentSession
state.mu.Unlock()

// Force kill the agent session if it supports immediate termination
// This is faster than graceful Close() when agent is stuck in a loop
if state.agentSession != nil {
if ft, ok := state.agentSession.(ForceTerminator); ok {
slog.Debug("cmdStop: force killing agent session", "session", sessionKey)
_ = ft.ForceKill()
e.cleanupInteractiveState(sessionKey)
}
}
state.markStopped()
if quietMode {
if quietPlatform == nil {
Expand Down
2 changes: 1 addition & 1 deletion core/i18n_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestI18n_AllKeysHaveEnglish(t *testing.T) {

func TestDetectLanguage(t *testing.T) {
tests := []struct {
text string
text string
wantLang Language
}{
// Japanese Hiragana
Expand Down
8 changes: 8 additions & 0 deletions core/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,11 @@ type CommandRegistrar interface {
type ChannelNameResolver interface {
ResolveChannelName(channelID string) (string, error)
}

// ForceTerminator is an optional interface for agent sessions that support
// immediate process termination without waiting for graceful shutdown.
// This is useful when the agent is stuck in a loop or unresponsive state.
// Close() should still be called after ForceKill() for cleanup.
type ForceTerminator interface {
ForceKill() error
}
2 changes: 1 addition & 1 deletion core/management_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package core

import (
"context"
"bytes"
"context"
"encoding/json"
"errors"
"io"
Expand Down
6 changes: 3 additions & 3 deletions core/outgoing_ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ func (c OutgoingRateLimitCfg) effectiveBurst() int {
// the rate budget allows the send.
type OutgoingRateLimiter struct {
mu sync.Mutex
buckets map[string]*tokenBucket // key = platform name
buckets map[string]*tokenBucket // key = platform name
defaults OutgoingRateLimitCfg
overrides map[string]OutgoingRateLimitCfg // per-platform overrides
overrides map[string]OutgoingRateLimitCfg // per-platform overrides
}

type tokenBucket struct {
tokens float64
maxTokens float64
refillRate float64 // tokens per second
refillRate float64 // tokens per second
lastRefill time.Time
}

Expand Down
18 changes: 9 additions & 9 deletions core/progress_compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ type suppressTestPlatform struct {
style string
}

func (s *suppressTestPlatform) Name() string { return "test" }
func (s *suppressTestPlatform) Start(MessageHandler) error { return nil }
func (s *suppressTestPlatform) Name() string { return "test" }
func (s *suppressTestPlatform) Start(MessageHandler) error { return nil }
func (s *suppressTestPlatform) Reply(context.Context, any, string) error { return nil }
func (s *suppressTestPlatform) Send(context.Context, any, string) error { return nil }
func (s *suppressTestPlatform) Stop() error { return nil }
func (s *suppressTestPlatform) ProgressStyle() string { return s.style }
func (s *suppressTestPlatform) Send(context.Context, any, string) error { return nil }
func (s *suppressTestPlatform) Stop() error { return nil }
func (s *suppressTestPlatform) ProgressStyle() string { return s.style }

func TestSuppressStandaloneToolResultEvent(t *testing.T) {
if SuppressStandaloneToolResultEvent(&stubPlatformNoProgress{}) {
Expand All @@ -35,11 +35,11 @@ func TestSuppressStandaloneToolResultEvent(t *testing.T) {
// stubPlatformNoProgress is a minimal Platform without ProgressStyleProvider.
type stubPlatformNoProgress struct{}

func (stubPlatformNoProgress) Name() string { return "plain" }
func (stubPlatformNoProgress) Start(MessageHandler) error { return nil }
func (stubPlatformNoProgress) Name() string { return "plain" }
func (stubPlatformNoProgress) Start(MessageHandler) error { return nil }
func (stubPlatformNoProgress) Reply(context.Context, any, string) error { return nil }
func (stubPlatformNoProgress) Send(context.Context, any, string) error { return nil }
func (stubPlatformNoProgress) Stop() error { return nil }
func (stubPlatformNoProgress) Send(context.Context, any, string) error { return nil }
func (stubPlatformNoProgress) Stop() error { return nil }

func TestBuildAndParseProgressCardPayload(t *testing.T) {
payload := BuildProgressCardPayload([]string{" step1 ", "", "step2"}, true)
Expand Down
12 changes: 6 additions & 6 deletions core/speech.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ func ConvertMP3ToOGG(ctx context.Context, mp3Data []byte) ([]byte, error) {
args := []string{
"-i", "pipe:0",
"-c:a", "libopus",
"-ar", "16000", // 16kHz sample rate for voice
"-ac", "1", // mono
"-b:a", "32k", // 32 kbps bitrate (voice quality)
"-ar", "16000", // 16kHz sample rate for voice
"-ac", "1", // mono
"-b:a", "32k", // 32 kbps bitrate (voice quality)
"-application", "voip", // optimize for voice
"-f", "ogg",
"-y",
Expand Down Expand Up @@ -306,9 +306,9 @@ func ConvertMP3ToAMR(ctx context.Context, mp3Data []byte) ([]byte, error) {
args := []string{
"-i", "pipe:0",
"-c:a", "amr_nb",
"-ar", "8000", // 8kHz sample rate (AMR-NB standard)
"-ac", "1", // mono
"-b:a", "12.2k", // 12.2 kbps bitrate (AMR-NB max)
"-ar", "8000", // 8kHz sample rate (AMR-NB standard)
"-ac", "1", // mono
"-b:a", "12.2k", // 12.2 kbps bitrate (AMR-NB max)
"-f", "amr",
"-y",
"pipe:1",
Expand Down
6 changes: 3 additions & 3 deletions core/tts.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,9 @@ func (m *MiniMaxTTS) Synthesize(ctx context.Context, text string, opts TTSSynthe
}

reqBody := map[string]any{
"model": m.Model,
"text": text,
"stream": true,
"model": m.Model,
"text": text,
"stream": true,
"voice_setting": map[string]any{
"voice_id": voice,
"speed": speed,
Expand Down
16 changes: 8 additions & 8 deletions core/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ type WebhookServer struct {

// WebhookRequest is the JSON body for POST /hook.
type WebhookRequest struct {
Event string `json:"event,omitempty"` // event name for logging (e.g. "git:commit")
Project string `json:"project,omitempty"` // target project; optional if single project
SessionKey string `json:"session_key"` // target session key (required)
Prompt string `json:"prompt,omitempty"` // agent prompt (mutually exclusive with exec)
Exec string `json:"exec,omitempty"` // shell command (mutually exclusive with prompt)
WorkDir string `json:"work_dir,omitempty"` // working dir for exec
Silent bool `json:"silent,omitempty"` // suppress notification
Payload any `json:"payload,omitempty"` // arbitrary extra data; appended to prompt context
Event string `json:"event,omitempty"` // event name for logging (e.g. "git:commit")
Project string `json:"project,omitempty"` // target project; optional if single project
SessionKey string `json:"session_key"` // target session key (required)
Prompt string `json:"prompt,omitempty"` // agent prompt (mutually exclusive with exec)
Exec string `json:"exec,omitempty"` // shell command (mutually exclusive with prompt)
WorkDir string `json:"work_dir,omitempty"` // working dir for exec
Silent bool `json:"silent,omitempty"` // suppress notification
Payload any `json:"payload,omitempty"` // arbitrary extra data; appended to prompt context
}

func NewWebhookServer(port int, token, path string) *WebhookServer {
Expand Down
34 changes: 17 additions & 17 deletions platform/dingtalk/dingtalk.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ func init() {
}

type replyContext struct {
sessionWebhook string
conversationId string
senderStaffId string
sessionWebhook string
conversationId string
senderStaffId string
}

type downloadResponse struct {
Expand All @@ -38,7 +38,7 @@ type Platform struct {
clientID string
clientSecret string
robotCode string
agentID int64 // Agent ID for work notifications API (numeric)
agentID int64 // Agent ID for work notifications API (numeric)
allowFrom string
shareSessionInChannel bool
streamClient *dingtalkClient.StreamClient
Expand Down Expand Up @@ -183,9 +183,9 @@ func (p *Platform) onMessage(data *chatbot.BotCallbackDataModel) {
Content: data.Text.Content,
MessageID: data.MsgId,
ReplyCtx: replyContext{
sessionWebhook: data.SessionWebhook,
conversationId: data.ConversationId,
senderStaffId: data.SenderStaffId,
sessionWebhook: data.SessionWebhook,
conversationId: data.ConversationId,
senderStaffId: data.SenderStaffId,
},
}

Expand Down Expand Up @@ -224,11 +224,11 @@ func (p *Platform) handleAudioMessage(data *chatbot.BotCallbackDataModel, sessio
Content: recognition,
MessageID: data.MsgId,
ReplyCtx: replyContext{
sessionWebhook: data.SessionWebhook,
conversationId: data.ConversationId,
senderStaffId: data.SenderStaffId,
sessionWebhook: data.SessionWebhook,
conversationId: data.ConversationId,
senderStaffId: data.SenderStaffId,
},
FromVoice: true,
FromVoice: true,
}
p.handler(p, msg)
}
Expand All @@ -246,11 +246,11 @@ func (p *Platform) handleAudioMessage(data *chatbot.BotCallbackDataModel, sessio
Content: recognition, // Use recognition as text content
MessageID: data.MsgId,
ReplyCtx: replyContext{
sessionWebhook: data.SessionWebhook,
conversationId: data.ConversationId,
senderStaffId: data.SenderStaffId,
sessionWebhook: data.SessionWebhook,
conversationId: data.ConversationId,
senderStaffId: data.SenderStaffId,
},
FromVoice: true,
FromVoice: true,
Audio: &core.AudioAttachment{
MimeType: mimeType,
Data: audioBytes,
Expand Down Expand Up @@ -658,8 +658,8 @@ func (p *Platform) compressAudioWithFFmpeg(ctx context.Context, audio []byte, fo
args := []string{
"-i", "pipe:0",
"-ar", "16000", // 16kHz sample rate for voice
"-ac", "1", // mono
"-b:a", "64k", // 64 kbps bitrate (voice quality)
"-ac", "1", // mono
"-b:a", "64k", // 64 kbps bitrate (voice quality)
"-f", "mp3",
"-y",
"pipe:1",
Expand Down
2 changes: 1 addition & 1 deletion platform/dingtalk/dingtalk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestGetAccessToken_ConcurrentAccess(t *testing.T) {
func TestGetAccessToken_MutexExists(t *testing.T) {
// Verify that the tokenMu mutex field exists and works
p := &Platform{
clientID: "test_client",
clientID: "test_client",
clientSecret: "test_secret",
}

Expand Down
Loading
Loading