diff --git a/cmd/gateway_channels_setup.go b/cmd/gateway_channels_setup.go index 2c0edffd5..cc1beb550 100644 --- a/cmd/gateway_channels_setup.go +++ b/cmd/gateway_channels_setup.go @@ -13,6 +13,7 @@ import ( "github.com/nextlevelbuilder/goclaw/internal/channels" "github.com/nextlevelbuilder/goclaw/internal/channels/discord" "github.com/nextlevelbuilder/goclaw/internal/channels/feishu" + "github.com/nextlevelbuilder/goclaw/internal/channels/googlechat" slackchannel "github.com/nextlevelbuilder/goclaw/internal/channels/slack" "github.com/nextlevelbuilder/goclaw/internal/channels/telegram" "github.com/nextlevelbuilder/goclaw/internal/channels/whatsapp" @@ -97,6 +98,16 @@ func registerConfigChannels(cfg *config.Config, channelMgr *channels.Manager, ms slog.Info("feishu/lark channel enabled (config)") } } + + if cfg.Channels.GoogleChat.Enabled && cfg.Channels.GoogleChat.ServiceAccountFile != "" && instanceLoader == nil { + gc, err := googlechat.New(cfg.Channels.GoogleChat, msgBus, nil) + if err != nil { + slog.Error("failed to initialize google chat channel", "error", err) + } else { + channelMgr.RegisterChannel(channels.TypeGoogleChat, gc) + slog.Info("google chat channel enabled (config)") + } + } } // wireChannelRPCMethods registers WS RPC methods for channels, instances, agent links, and teams. diff --git a/internal/channels/channel.go b/internal/channels/channel.go index 5d4d26e8f..d13e838b3 100644 --- a/internal/channels/channel.go +++ b/internal/channels/channel.go @@ -61,6 +61,7 @@ const ( TypeWhatsApp = "whatsapp" TypeZaloOA = "zalo_oa" TypeZaloPersonal = "zalo_personal" + TypeGoogleChat = "google_chat" ) // Channel defines the interface that all channel implementations must satisfy. diff --git a/internal/channels/googlechat/auth.go b/internal/channels/googlechat/auth.go new file mode 100644 index 000000000..a3bc50d00 --- /dev/null +++ b/internal/channels/googlechat/auth.go @@ -0,0 +1,170 @@ +package googlechat + +import ( + "context" + "crypto" + "crypto/rand" + "crypto/rsa" + "crypto/sha256" + "crypto/x509" + "encoding/base64" + "encoding/json" + "encoding/pem" + "fmt" + "io" + "net/http" + "net/url" + "os" + "strings" + "sync" + "time" +) + +type ServiceAccountAuth struct { + email string + privateKey *rsa.PrivateKey + scopes []string + token string + expiresAt time.Time + mu sync.Mutex + tokenEndpoint string + httpClient *http.Client +} + +type serviceAccountFile struct { + Type string `json:"type"` + ClientEmail string `json:"client_email"` + PrivateKey string `json:"private_key"` + TokenURI string `json:"token_uri"` +} + +func NewServiceAccountAuth(saFilePath string, scopes []string) (*ServiceAccountAuth, error) { + data, err := os.ReadFile(saFilePath) + if err != nil { + return nil, fmt.Errorf("read service account file: %w", err) + } + + var sa serviceAccountFile + if err := json.Unmarshal(data, &sa); err != nil { + return nil, fmt.Errorf("parse service account file: %w", err) + } + if sa.ClientEmail == "" { + return nil, fmt.Errorf("service account file missing client_email") + } + if sa.PrivateKey == "" { + return nil, fmt.Errorf("service account file missing private_key") + } + + block, _ := pem.Decode([]byte(sa.PrivateKey)) + if block == nil { + return nil, fmt.Errorf("failed to decode PEM block from private_key") + } + + key, err := x509.ParsePKCS8PrivateKey(block.Bytes) + if err != nil { + rsaKey, err2 := x509.ParsePKCS1PrivateKey(block.Bytes) + if err2 != nil { + return nil, fmt.Errorf("parse private key: %w (pkcs1: %w)", err, err2) + } + key = rsaKey + } + + rsaKey, ok := key.(*rsa.PrivateKey) + if !ok { + return nil, fmt.Errorf("private key is not RSA") + } + + ep := sa.TokenURI + if ep == "" { + ep = tokenEndpoint + } + + return &ServiceAccountAuth{ + email: sa.ClientEmail, + privateKey: rsaKey, + scopes: scopes, + tokenEndpoint: ep, + httpClient: &http.Client{Timeout: 10 * time.Second}, + }, nil +} + +func (a *ServiceAccountAuth) Token(ctx context.Context) (string, error) { + a.mu.Lock() + defer a.mu.Unlock() + + if a.token != "" && time.Now().Add(60*time.Second).Before(a.expiresAt) { + return a.token, nil + } + + now := time.Now() + claims := map[string]any{ + "iss": a.email, + "scope": strings.Join(a.scopes, " "), + "aud": tokenEndpoint, + "iat": now.Unix(), + "exp": now.Add(time.Hour).Unix(), + } + + signedJWT, err := signJWT(a.privateKey, claims) + if err != nil { + return "", fmt.Errorf("sign JWT: %w", err) + } + + form := url.Values{ + "grant_type": {"urn:ietf:params:oauth:grant-type:jwt-bearer"}, + "assertion": {signedJWT}, + } + + req, err := http.NewRequestWithContext(ctx, "POST", a.tokenEndpoint, strings.NewReader(form.Encode())) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + + resp, err := a.httpClient.Do(req) + if err != nil { + return "", fmt.Errorf("token exchange request: %w", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("token exchange failed (%d): %s", resp.StatusCode, string(body)) + } + + var tokenResp struct { + AccessToken string `json:"access_token"` + ExpiresIn int `json:"expires_in"` + TokenType string `json:"token_type"` + } + if err := json.Unmarshal(body, &tokenResp); err != nil { + return "", fmt.Errorf("parse token response: %w", err) + } + + a.token = tokenResp.AccessToken + a.expiresAt = now.Add(time.Duration(tokenResp.ExpiresIn) * time.Second) + + return a.token, nil +} + +func signJWT(key *rsa.PrivateKey, claims map[string]any) (string, error) { + header := base64URLEncode([]byte(`{"alg":"RS256","typ":"JWT"}`)) + payload, err := json.Marshal(claims) + if err != nil { + return "", err + } + payloadEnc := base64URLEncode(payload) + signingInput := header + "." + payloadEnc + + hash := sha256.Sum256([]byte(signingInput)) + sig, err := rsa.SignPKCS1v15(rand.Reader, key, crypto.SHA256, hash[:]) + if err != nil { + return "", err + } + + return signingInput + "." + base64URLEncode(sig), nil +} + +func base64URLEncode(data []byte) string { + return base64.RawURLEncoding.EncodeToString(data) +} diff --git a/internal/channels/googlechat/auth_test.go b/internal/channels/googlechat/auth_test.go new file mode 100644 index 000000000..5eb555f65 --- /dev/null +++ b/internal/channels/googlechat/auth_test.go @@ -0,0 +1,145 @@ +package googlechat + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "encoding/json" + "encoding/pem" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + "time" +) + +func testServiceAccountJSON(t *testing.T, dir string) (string, *rsa.PrivateKey) { + t.Helper() + key, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + t.Fatal(err) + } + pkcs8, err := x509.MarshalPKCS8PrivateKey(key) + if err != nil { + t.Fatal(err) + } + pemBlock := pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8}) + + sa := map[string]string{ + "type": "service_account", + "client_email": "test@test.iam.gserviceaccount.com", + "private_key": string(pemBlock), + "token_uri": "https://oauth2.googleapis.com/token", + } + data, _ := json.Marshal(sa) + path := filepath.Join(dir, "sa.json") + if err := os.WriteFile(path, data, 0600); err != nil { + t.Fatal(err) + } + return path, key +} + +func TestNewServiceAccountAuth_ValidFile(t *testing.T) { + dir := t.TempDir() + path, _ := testServiceAccountJSON(t, dir) + auth, err := NewServiceAccountAuth(path, []string{scopeChat}) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if auth.email != "test@test.iam.gserviceaccount.com" { + t.Errorf("email = %q, want test@test.iam.gserviceaccount.com", auth.email) + } +} + +func TestNewServiceAccountAuth_InvalidFile(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "bad.json") + os.WriteFile(path, []byte("{bad json"), 0600) + _, err := NewServiceAccountAuth(path, []string{scopeChat}) + if err == nil { + t.Fatal("expected error for invalid JSON") + } +} + +func TestNewServiceAccountAuth_MissingFile(t *testing.T) { + _, err := NewServiceAccountAuth("/nonexistent/sa.json", []string{scopeChat}) + if err == nil { + t.Fatal("expected error for missing file") + } +} + +func TestServiceAccountAuth_Token_CachesWithinTTL(t *testing.T) { + dir := t.TempDir() + path, _ := testServiceAccountJSON(t, dir) + callCount := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount++ + json.NewEncoder(w).Encode(map[string]any{ + "access_token": "tok-123", + "expires_in": 3600, + "token_type": "Bearer", + }) + })) + defer ts.Close() + + auth, err := NewServiceAccountAuth(path, []string{scopeChat}) + if err != nil { + t.Fatal(err) + } + auth.tokenEndpoint = ts.URL + + ctx := context.Background() + tok1, _ := auth.Token(ctx) + tok2, _ := auth.Token(ctx) + if tok1 != tok2 { + t.Errorf("tokens differ") + } + if callCount != 1 { + t.Errorf("callCount = %d, want 1", callCount) + } +} + +func TestServiceAccountAuth_Token_RefreshesExpired(t *testing.T) { + dir := t.TempDir() + path, _ := testServiceAccountJSON(t, dir) + callCount := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount++ + json.NewEncoder(w).Encode(map[string]any{ + "access_token": "tok", + "expires_in": 1, + "token_type": "Bearer", + }) + })) + defer ts.Close() + + auth, err := NewServiceAccountAuth(path, []string{scopeChat}) + if err != nil { + t.Fatal(err) + } + auth.tokenEndpoint = ts.URL + auth.Token(context.Background()) + auth.expiresAt = time.Now().Add(-1 * time.Minute) + auth.Token(context.Background()) + if callCount != 2 { + t.Errorf("callCount = %d, want 2", callCount) + } +} + +func TestServiceAccountAuth_Token_RefreshFailure(t *testing.T) { + dir := t.TempDir() + path, _ := testServiceAccountJSON(t, dir) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(500) + })) + defer ts.Close() + + auth, _ := NewServiceAccountAuth(path, []string{scopeChat}) + auth.tokenEndpoint = ts.URL + _, err := auth.Token(context.Background()) + if err == nil { + t.Fatal("expected error on 500") + } +} diff --git a/internal/channels/googlechat/constants.go b/internal/channels/googlechat/constants.go new file mode 100644 index 000000000..9eec18a14 --- /dev/null +++ b/internal/channels/googlechat/constants.go @@ -0,0 +1,27 @@ +package googlechat + +import "time" + +const ( + typeGoogleChat = "google_chat" + chatAPIBase = "https://chat.googleapis.com/v1" + pubsubAPIBase = "https://pubsub.googleapis.com/v1" + driveUploadBase = "https://www.googleapis.com/upload/drive/v3" + driveAPIBase = "https://www.googleapis.com/drive/v3" + tokenEndpoint = "https://oauth2.googleapis.com/token" + googleChatMaxMessageBytes = 3900 + longFormThresholdDefault = 6000 + dedupTTL = 5 * time.Minute + defaultPullInterval = 1 * time.Second + defaultPullMaxMessages = 10 + defaultMediaMaxMB = 20 + defaultFileRetentionDays = 7 + shutdownDrainTimeout = 5 * time.Second + scopeChat = "https://www.googleapis.com/auth/chat.bot" + scopePubSub = "https://www.googleapis.com/auth/pubsub" + scopeDrive = "https://www.googleapis.com/auth/drive.file" + retrySendMaxAttempts = 5 + retrySendBaseDelay = 1 * time.Second + retrySendMaxDelay = 30 * time.Second + defaultStreamThrottle = 1500 * time.Millisecond +) diff --git a/internal/channels/googlechat/factory.go b/internal/channels/googlechat/factory.go new file mode 100644 index 000000000..5afbff7da --- /dev/null +++ b/internal/channels/googlechat/factory.go @@ -0,0 +1,118 @@ +package googlechat + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + + "github.com/google/uuid" + "github.com/nextlevelbuilder/goclaw/internal/bus" + "github.com/nextlevelbuilder/goclaw/internal/channels" + "github.com/nextlevelbuilder/goclaw/internal/config" + "github.com/nextlevelbuilder/goclaw/internal/store" +) + +// googleChatCreds maps the credentials JSON from the channel_instances table. +type googleChatCreds struct { + ServiceAccountJSON json.RawMessage `json:"service_account_json"` // embedded SA key JSON +} + +// googleChatInstanceConfig maps the non-secret config JSONB from the channel_instances table. +type googleChatInstanceConfig struct { + Mode string `json:"mode,omitempty"` + ProjectID string `json:"project_id,omitempty"` + SubscriptionID string `json:"subscription_id,omitempty"` + PullIntervalMs int `json:"pull_interval_ms,omitempty"` + BotUser string `json:"bot_user,omitempty"` + DMPolicy string `json:"dm_policy,omitempty"` + GroupPolicy string `json:"group_policy,omitempty"` + AllowFrom []string `json:"allow_from,omitempty"` + LongFormThreshold int `json:"long_form_threshold,omitempty"` + LongFormFormat string `json:"long_form_format,omitempty"` + MediaMaxMB int `json:"media_max_mb,omitempty"` + DrivePermission string `json:"drive_permission,omitempty"` + BlockReply *bool `json:"block_reply,omitempty"` +} + +// FactoryWithPendingStore returns a ChannelFactory that includes the pending message store. +func FactoryWithPendingStore(pendingStore store.PendingMessageStore) channels.ChannelFactory { + return func(name string, creds json.RawMessage, cfg json.RawMessage, + msgBus *bus.MessageBus, pairingSvc store.PairingStore) (channels.Channel, error) { + return buildChannel(name, creds, cfg, msgBus, pendingStore) + } +} + +// Factory creates a Google Chat channel from DB instance data (no pending store). +func Factory(name string, creds json.RawMessage, cfg json.RawMessage, + msgBus *bus.MessageBus, _ store.PairingStore) (channels.Channel, error) { + return buildChannel(name, creds, cfg, msgBus, nil) +} + +func buildChannel(name string, creds json.RawMessage, cfg json.RawMessage, + msgBus *bus.MessageBus, pendingStore store.PendingMessageStore) (channels.Channel, error) { + + var c googleChatCreds + if len(creds) > 0 { + if err := json.Unmarshal(creds, &c); err != nil { + return nil, fmt.Errorf("decode googlechat credentials: %w", err) + } + } + + var ic googleChatInstanceConfig + if len(cfg) > 0 { + if err := json.Unmarshal(cfg, &ic); err != nil { + return nil, fmt.Errorf("decode googlechat config: %w", err) + } + } + + if len(c.ServiceAccountJSON) == 0 { + return nil, fmt.Errorf("googlechat: service_account_json is required in credentials") + } + + // Write SA JSON to a temp file for NewServiceAccountAuth (it reads from file path). + saFile, err := writeTempSAFile(c.ServiceAccountJSON) + if err != nil { + return nil, fmt.Errorf("googlechat: write SA temp file: %w", err) + } + + gcCfg := config.GoogleChatConfig{ + Enabled: true, + ServiceAccountFile: saFile, + Mode: ic.Mode, + ProjectID: ic.ProjectID, + SubscriptionID: ic.SubscriptionID, + PullIntervalMs: ic.PullIntervalMs, + BotUser: ic.BotUser, + DMPolicy: ic.DMPolicy, + GroupPolicy: ic.GroupPolicy, + AllowFrom: ic.AllowFrom, + LongFormThreshold: ic.LongFormThreshold, + LongFormFormat: ic.LongFormFormat, + MediaMaxMB: ic.MediaMaxMB, + DrivePermission: ic.DrivePermission, + BlockReply: ic.BlockReply, + } + + // DB instances default to "allowlist" for groups. + if gcCfg.GroupPolicy == "" { + gcCfg.GroupPolicy = "allowlist" + } + + ch, err := New(gcCfg, msgBus, pendingStore) + if err != nil { + return nil, err + } + + ch.SetName(name) + return ch, nil +} + +// writeTempSAFile writes the SA JSON to a temp file and returns the path. +func writeTempSAFile(saJSON json.RawMessage) (string, error) { + tmpPath := filepath.Join(os.TempDir(), "goclaw-sa-"+uuid.New().String()+".json") + if err := os.WriteFile(tmpPath, saJSON, 0600); err != nil { + return "", err + } + return tmpPath, nil +} diff --git a/internal/channels/googlechat/format.go b/internal/channels/googlechat/format.go new file mode 100644 index 000000000..5bef6bf95 --- /dev/null +++ b/internal/channels/googlechat/format.go @@ -0,0 +1,498 @@ +package googlechat + +import ( + "fmt" + "regexp" + "strings" + "unicode/utf8" +) + +const ( + codePlaceholder = "\x00" + boldMarker = "\x01" // temp marker for bold * to avoid italic conversion +) + +var ( + reCodeBlock = regexp.MustCompile("(?s)(```[\\s\\S]*?```)") + reCodeInline = regexp.MustCompile("(`[^`]+`)") + reBoldItalic = regexp.MustCompile(`\*\*\*(.+?)\*\*\*`) + reBold = regexp.MustCompile(`\*\*(.+?)\*\*`) + reStrike = regexp.MustCompile(`~~(.+?)~~`) + reLink = regexp.MustCompile(`\[([^\]]+)\]\(([^)]+)\)`) + reTable = regexp.MustCompile(`(?m)^\|.+\|$\n^\|[-| :]+\|$`) + reLongCodeBlock = regexp.MustCompile("(?s)```[\\w]*\\n(.{500,}?)\\n```") + reHeading = regexp.MustCompile(`(?m)^(#{1,6})\s+(.+)$`) +) + +// markdownToGoogleChat converts markdown to Google Chat plain-text markup. +// Used for simple text messages (non-CardV2). +func markdownToGoogleChat(text string) string { + if text == "" { + return "" + } + + var codeBlocks []string + protected := reCodeBlock.ReplaceAllStringFunc(text, func(match string) string { + codeBlocks = append(codeBlocks, match) + return codePlaceholder + }) + var inlineCodes []string + protected = reCodeInline.ReplaceAllStringFunc(protected, func(match string) string { + inlineCodes = append(inlineCodes, match) + return codePlaceholder + }) + + // Headers: ## text → *text* (bold in Google Chat) + protected = reHeading.ReplaceAllString(protected, boldMarker+"${2}"+boldMarker) + // Bold+italic: ***text*** → *_text_* (use boldMarker to protect from italic converter) + protected = reBoldItalic.ReplaceAllString(protected, boldMarker+"_${1}_"+boldMarker) + // Bold: **text** → *text* (use boldMarker to protect from italic converter) + protected = reBold.ReplaceAllString(protected, boldMarker+"${1}"+boldMarker) + // Italic: *text* → _text_ (only matches unprotected single *) + protected = convertItalic(protected) + // Restore boldMarker → * + protected = strings.ReplaceAll(protected, boldMarker, "*") + protected = reStrike.ReplaceAllString(protected, "~${1}~") + protected = reLink.ReplaceAllString(protected, "<${2}|${1}>") + + codeIdx := 0 + inlineIdx := 0 + var result strings.Builder + for _, r := range protected { + if string(r) == codePlaceholder { + if codeIdx < len(codeBlocks) { + result.WriteString(codeBlocks[codeIdx]) + codeIdx++ + } else if inlineIdx < len(inlineCodes) { + result.WriteString(inlineCodes[inlineIdx]) + inlineIdx++ + } + } else { + result.WriteRune(r) + } + } + + return result.String() +} + +// markdownToGoogleChatHTML converts markdown to HTML subset supported by CardV2 textParagraph. +// Supports: , , , ,
, +func markdownToGoogleChatHTML(text string) string { + if text == "" { + return "" + } + + var codeBlocks []string + protected := reCodeBlock.ReplaceAllStringFunc(text, func(match string) string { + codeBlocks = append(codeBlocks, match) + return codePlaceholder + }) + var inlineCodes []string + protected = reCodeInline.ReplaceAllStringFunc(protected, func(match string) string { + inlineCodes = append(inlineCodes, match) + return codePlaceholder + }) + + // Headers → bold HTML + protected = reHeading.ReplaceAllString(protected, "${2}") + // Bold+italic + protected = reBoldItalic.ReplaceAllString(protected, "${1}") + // Bold + protected = reBold.ReplaceAllString(protected, "${1}") + // Italic (single *) + protected = convertItalicHTML(protected) + // Strikethrough + protected = reStrike.ReplaceAllString(protected, "${1}") + // Links + protected = reLink.ReplaceAllString(protected, `
${1}`) + + codeIdx := 0 + inlineIdx := 0 + var result strings.Builder + for _, r := range protected { + if string(r) == codePlaceholder { + if codeIdx < len(codeBlocks) { + block := codeBlocks[codeIdx] + block = strings.TrimPrefix(block, "```") + if idx := strings.IndexByte(block, '\n'); idx >= 0 { + block = block[idx+1:] + } + block = strings.TrimSuffix(block, "```") + block = strings.TrimSpace(block) + result.WriteString("
" + block + "
") + codeIdx++ + } else if inlineIdx < len(inlineCodes) { + code := inlineCodes[inlineIdx] + code = strings.Trim(code, "`") + result.WriteString("" + code + "") + inlineIdx++ + } + } else { + result.WriteRune(r) + } + } + + return result.String() +} + +// convertItalicHTML converts single *text* to text. +func convertItalicHTML(s string) string { + var result strings.Builder + runes := []rune(s) + i := 0 + for i < len(runes) { + if runes[i] == '*' { + prevStar := i > 0 && runes[i-1] == '*' + nextStar := i+1 < len(runes) && runes[i+1] == '*' + if !prevStar && !nextStar { + end := -1 + for j := i + 1; j < len(runes); j++ { + if runes[j] == '*' { + nextJ := j+1 < len(runes) && runes[j+1] == '*' + prevJ := j > 0 && runes[j-1] == '*' + if !nextJ && !prevJ { + end = j + break + } + } + } + if end > 0 { + result.WriteString("") + result.WriteString(string(runes[i+1 : end])) + result.WriteString("") + i = end + 1 + continue + } + } + } + result.WriteRune(runes[i]) + i++ + } + return result.String() +} + +func convertItalic(s string) string { + var result strings.Builder + runes := []rune(s) + i := 0 + for i < len(runes) { + if runes[i] == '*' { + prevStar := i > 0 && runes[i-1] == '*' + nextStar := i+1 < len(runes) && runes[i+1] == '*' + if !prevStar && !nextStar { + end := -1 + for j := i + 1; j < len(runes); j++ { + if runes[j] == '*' { + nextJ := j+1 < len(runes) && runes[j+1] == '*' + prevJ := j > 0 && runes[j-1] == '*' + if !nextJ && !prevJ { + end = j + break + } + } + } + if end > 0 { + result.WriteRune('_') + result.WriteString(string(runes[i+1 : end])) + result.WriteRune('_') + i = end + 1 + continue + } + } + } + result.WriteRune(runes[i]) + i++ + } + return result.String() +} + +func detectStructuredContent(text string) bool { + return reTable.MatchString(text) || reLongCodeBlock.MatchString(text) +} + +// splitTableCells splits "| a | b | c |" into ["a", "b", "c"]. +func splitTableCells(row string) []string { + row = strings.TrimSpace(row) + row = strings.Trim(row, "|") + parts := strings.Split(row, "|") + var cells []string + for _, p := range parts { + cells = append(cells, strings.TrimSpace(p)) + } + return cells +} + +// parseTableRows extracts header and data rows from collected table lines. +func parseTableRows(lines []string) (headers []string, rows [][]string) { + if len(lines) == 0 { + return nil, nil + } + for _, line := range lines { + cells := splitTableCells(line) + if len(cells) == 0 { + continue + } + if headers == nil { + headers = cells + } else { + rows = append(rows, cells) + } + } + return +} + +// buildTableWidget creates a CardV2 widget for a markdown table. +// 2-column tables use decoratedText (key-value); wider tables use aligned
.
+func buildTableWidget(tableLines []string) map[string]any {
+	headers, rows := parseTableRows(tableLines)
+	if len(headers) == 0 {
+		return nil
+	}
+
+	// 2-column tables → decoratedText widgets stacked vertically
+	if len(headers) == 2 {
+		var widgets []map[string]any
+		for _, row := range rows {
+			if len(row) < 2 {
+				continue
+			}
+			widgets = append(widgets, map[string]any{
+				"decoratedText": map[string]any{
+					"topLabel": markdownToGoogleChatHTML(row[0]),
+					"text":     markdownToGoogleChatHTML(row[1]),
+				},
+			})
+		}
+		if len(widgets) > 0 {
+			return map[string]any{"__widgets": widgets}
+		}
+	}
+
+	// Wider tables → aligned monospace
+	allRows := [][]string{headers}
+	allRows = append(allRows, rows...)
+
+	colWidths := make([]int, len(headers))
+	for _, row := range allRows {
+		for j, cell := range row {
+			if j < len(colWidths) && len([]rune(cell)) > colWidths[j] {
+				colWidths[j] = len([]rune(cell))
+			}
+		}
+	}
+
+	var sb strings.Builder
+	for i, row := range allRows {
+		for j, cell := range row {
+			if j > 0 {
+				sb.WriteString(" | ")
+			}
+			if j < len(colWidths) {
+				padding := colWidths[j] - len([]rune(cell))
+				sb.WriteString(cell)
+				if padding > 0 {
+					sb.WriteString(strings.Repeat(" ", padding))
+				}
+			} else {
+				sb.WriteString(cell)
+			}
+		}
+		sb.WriteString("\n")
+		if i == 0 {
+			for j := range colWidths {
+				if j > 0 {
+					sb.WriteString("-+-")
+				}
+				sb.WriteString(strings.Repeat("-", colWidths[j]))
+			}
+			sb.WriteString("\n")
+		}
+	}
+
+	return map[string]any{
+		"textParagraph": map[string]string{
+			"text": fmt.Sprintf("
%s
", strings.TrimRight(sb.String(), "\n")), + }, + } +} + +func chunkByBytes(text string, maxBytes int) []string { + if text == "" { + return nil + } + if len([]byte(text)) <= maxBytes { + return []string{text} + } + + var chunks []string + paragraphs := strings.Split(text, "\n\n") + if len(paragraphs) > 1 { + var current strings.Builder + for i, p := range paragraphs { + sep := "" + if i > 0 { + sep = "\n\n" + } + candidate := current.String() + sep + p + if len([]byte(candidate)) > maxBytes && current.Len() > 0 { + chunks = append(chunks, current.String()) + current.Reset() + current.WriteString(p) + } else { + if current.Len() > 0 { + current.WriteString(sep) + } + current.WriteString(p) + } + } + if current.Len() > 0 { + remaining := current.String() + if len([]byte(remaining)) > maxBytes { + chunks = append(chunks, chunkByLines(remaining, maxBytes)...) + } else { + chunks = append(chunks, remaining) + } + } + return chunks + } + + return chunkByLines(text, maxBytes) +} + +func chunkByLines(text string, maxBytes int) []string { + lines := strings.Split(text, "\n") + if len(lines) <= 1 { + return chunkByWords(text, maxBytes) + } + + var chunks []string + var current strings.Builder + for i, line := range lines { + sep := "" + if i > 0 { + sep = "\n" + } + candidate := current.String() + sep + line + if len([]byte(candidate)) > maxBytes && current.Len() > 0 { + chunks = append(chunks, current.String()) + current.Reset() + if len([]byte(line)) > maxBytes { + chunks = append(chunks, chunkByWords(line, maxBytes)...) + } else { + current.WriteString(line) + } + } else { + if current.Len() > 0 { + current.WriteString(sep) + } + current.WriteString(line) + } + } + if current.Len() > 0 { + chunks = append(chunks, current.String()) + } + return chunks +} + +func chunkByWords(text string, maxBytes int) []string { + words := strings.Fields(text) + if len(words) == 0 { + return []string{text} + } + + var chunks []string + var current strings.Builder + for _, word := range words { + sep := "" + if current.Len() > 0 { + sep = " " + } + candidate := current.String() + sep + word + if len([]byte(candidate)) > maxBytes && current.Len() > 0 { + chunks = append(chunks, current.String()) + current.Reset() + if len([]byte(word)) > maxBytes { + chunks = append(chunks, splitAtUTF8Boundary(word, maxBytes)...) + } else { + current.WriteString(word) + } + } else { + if current.Len() > 0 { + current.WriteString(sep) + } + current.WriteString(word) + } + } + if current.Len() > 0 { + chunks = append(chunks, current.String()) + } + return chunks +} + +func splitAtUTF8Boundary(word string, maxBytes int) []string { + var chunks []string + b := []byte(word) + for len(b) > 0 { + end := maxBytes + if end > len(b) { + end = len(b) + } + for end > 0 && !utf8.Valid(b[:end]) { + end-- + } + if end == 0 { + end = 1 + } + chunks = append(chunks, string(b[:end])) + b = b[end:] + } + return chunks +} + +func extractSummary(content string) string { + lines := strings.Split(content, "\n") + if len(lines) == 0 { + return content + } + + var heading string + var bullets []string + var textLines []string + + for _, line := range lines { + trimmed := strings.TrimSpace(line) + if strings.HasPrefix(trimmed, "# ") && heading == "" { + heading = strings.TrimPrefix(trimmed, "# ") + continue + } + if strings.HasPrefix(trimmed, "## ") && heading != "" { + break + } + if strings.HasPrefix(trimmed, "- ") || strings.HasPrefix(trimmed, "* ") { + if len(bullets) < 3 { + bullets = append(bullets, trimmed) + } + continue + } + if trimmed != "" && heading != "" && len(bullets) == 0 { + textLines = append(textLines, trimmed) + } + } + + var parts []string + if heading != "" { + parts = append(parts, heading) + } + if len(textLines) > 0 && len(bullets) == 0 { + parts = append(parts, strings.Join(textLines, "\n")) + } + if len(bullets) > 0 { + parts = append(parts, strings.Join(bullets, "\n")) + } + + result := strings.Join(parts, "\n\n") + if result == "" { + return content + } + return result +} diff --git a/internal/channels/googlechat/format_test.go b/internal/channels/googlechat/format_test.go new file mode 100644 index 000000000..a74970341 --- /dev/null +++ b/internal/channels/googlechat/format_test.go @@ -0,0 +1,118 @@ +package googlechat + +import ( + "strings" + "testing" +) + +func TestMarkdownToGoogleChat(t *testing.T) { + tests := []struct { + name, input, want string + }{ + {"bold", "**hello**", "*hello*"}, + {"italic", "*hello*", "_hello_"}, + {"strikethrough", "~~deleted~~", "~deleted~"}, + {"code inline", "`code`", "`code`"}, + {"code block", "```go\nfunc(){}\n```", "```go\nfunc(){}\n```"}, + {"mixed", "**bold** and *italic*", "*bold* and _italic_"}, + {"link", "[text](https://example.com)", ""}, + {"nested bold+italic", "***both***", "*_both_*"}, + {"empty", "", ""}, + {"plain text", "plain text", "plain text"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := markdownToGoogleChat(tt.input) + if got != tt.want { + t.Errorf("markdownToGoogleChat(%q) = %q, want %q", tt.input, got, tt.want) + } + }) + } +} + +func TestDetectStructuredContent(t *testing.T) { + tests := []struct { + name string + input string + want bool + }{ + {"has table", "| col1 | col2 |\n|---|---|\n| a | b |", true}, + {"has long code block", "```\n" + string(make([]byte, 600)) + "\n```", true}, + {"short code block", "```\nshort\n```", false}, + {"plain text", "Hello world", false}, + {"inline code only", "`code` in text", false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := detectStructuredContent(tt.input); got != tt.want { + t.Errorf("detectStructuredContent() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestChunkByBytes(t *testing.T) { + tests := []struct { + name string + input string + maxBytes int + wantCount int + }{ + {"under limit", "hello", googleChatMaxMessageBytes, 1}, + {"empty", "", googleChatMaxMessageBytes, 0}, + {"over limit paragraph split", "para one\n\npara two\n\npara three", 20, 2}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + chunks := chunkByBytes(tt.input, tt.maxBytes) + if len(chunks) != tt.wantCount { + t.Errorf("chunkByBytes() returned %d chunks, want %d", len(chunks), tt.wantCount) + } + for i, c := range chunks { + if len([]byte(c)) > tt.maxBytes { + t.Errorf("chunk[%d] = %d bytes, exceeds max %d", i, len([]byte(c)), tt.maxBytes) + } + } + }) + } +} + +func TestChunkByBytes_Unicode(t *testing.T) { + vn := "Đây là một đoạn văn bản tiếng Việt dài để kiểm tra việc chia chunk theo byte" + chunks := chunkByBytes(vn, 50) + if len(chunks) < 2 { + t.Fatalf("expected multiple chunks, got %d", len(chunks)) + } + for i, c := range chunks { + if len([]byte(c)) > 50 { + t.Errorf("chunk[%d] = %d bytes, exceeds 50", i, len([]byte(c))) + } + if c == "" { + t.Errorf("chunk[%d] is empty", i) + } + } + // Verify all words are preserved across chunks. + reassembled := strings.Join(chunks, " ") + if reassembled != vn { + t.Errorf("reassembled text doesn't match original:\ngot: %q\nwant: %q", reassembled, vn) + } +} + +func TestExtractSummary(t *testing.T) { + tests := []struct { + name, input, want string + }{ + {"heading + bullets", "# Title\n- A\n- B\n- C\n- D\n- E", "Title\n\n- A\n- B\n- C"}, + {"no heading", "- A\n- B\n- C\n- D", "- A\n- B\n- C"}, + {"very short", "Hello", "Hello"}, + {"only heading", "# Title", "Title"}, + {"multiple headings", "# H1\ntext here\n## H2\nmore text", "H1\n\ntext here"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := extractSummary(tt.input); got != tt.want { + t.Errorf("extractSummary(%q) = %q, want %q", tt.input, got, tt.want) + } + }) + } +} diff --git a/internal/channels/googlechat/googlechat.go b/internal/channels/googlechat/googlechat.go new file mode 100644 index 000000000..98aab7299 --- /dev/null +++ b/internal/channels/googlechat/googlechat.go @@ -0,0 +1,250 @@ +package googlechat + +import ( + "context" + "log/slog" + "net/http" + "sync" + "time" + + "github.com/nextlevelbuilder/goclaw/internal/bus" + "github.com/nextlevelbuilder/goclaw/internal/channels" + "github.com/nextlevelbuilder/goclaw/internal/config" + "github.com/nextlevelbuilder/goclaw/internal/store" +) + +// Compile-time interface assertions. +var _ channels.Channel = (*Channel)(nil) +var _ channels.StreamingChannel = (*Channel)(nil) + +// Channel implements channels.Channel, channels.BlockReplyChannel, and +// channels.PendingCompactable for Google Chat via Pub/Sub pull (phase 1). +type Channel struct { + *channels.BaseChannel + + // Auth + auth *ServiceAccountAuth + + // Pub/Sub config + projectID string + subscriptionID string + pullInterval time.Duration + + // Identity + botUser string // bot's own user ID to filter self-messages + + // Policies + dmPolicy string + groupPolicy string + requireMention bool // require @bot mention in groups + + // Outbound + apiBase string // overridable Chat API base (for testing) + longFormThreshold int + longFormFormat string // "md" or "txt" + drivePermission string // "domain" or "anyone" + driveDomain string // domain for "domain" permission + blockReply *bool + + // Media + mediaMaxBytes int64 + fileRetentionDays int + + // HTTP client (shared for all API calls) + httpClient *http.Client + + // State + dedup *dedupCache + threadIDs sync.Map // spaceID:senderID → threadName + placeholders sync.Map // chatID → messageName (placeholder for edit) + groupHistory *channels.PendingHistory + historyLimit int + driveFiles []driveFileRecord + driveFilesMu sync.Mutex + + // Streaming + dmStream bool + groupStream bool + streams sync.Map // chatID → *chatStream + + // Lifecycle + pullCancel context.CancelFunc + pullDone chan struct{} + cleanupCancel context.CancelFunc +} + +// New creates a new Google Chat channel from config. +func New(cfg config.GoogleChatConfig, msgBus *bus.MessageBus, pendingStore store.PendingMessageStore) (*Channel, error) { + auth, err := NewServiceAccountAuth(cfg.ServiceAccountFile, []string{scopeChat, scopePubSub, scopeDrive}) + if err != nil { + return nil, err + } + + pullInterval := defaultPullInterval + if cfg.PullIntervalMs > 0 { + pullInterval = time.Duration(cfg.PullIntervalMs) * time.Millisecond + } + + longFormThreshold := longFormThresholdDefault + if cfg.LongFormThreshold > 0 { + longFormThreshold = cfg.LongFormThreshold + } else if cfg.LongFormThreshold < 0 { + longFormThreshold = 0 // disabled + } + + longFormFormat := "md" + if cfg.LongFormFormat == "txt" { + longFormFormat = "txt" + } + + mediaMaxBytes := int64(defaultMediaMaxMB) * 1024 * 1024 + if cfg.MediaMaxMB > 0 { + mediaMaxBytes = int64(cfg.MediaMaxMB) * 1024 * 1024 + } + + drivePermission := "domain" + if cfg.DrivePermission == "anyone" { + drivePermission = "anyone" + } + driveDomain := "vnpay.vn" + if cfg.DriveDomain != "" { + driveDomain = cfg.DriveDomain + } + + dmPolicy := cfg.DMPolicy + if dmPolicy == "" { + dmPolicy = "open" + } + groupPolicy := cfg.GroupPolicy + if groupPolicy == "" { + groupPolicy = "open" + } + + requireMention := true + if cfg.RequireMention != nil { + requireMention = *cfg.RequireMention + } + + historyLimit := 50 + if cfg.HistoryLimit > 0 { + historyLimit = cfg.HistoryLimit + } + + dmStream := true // default: enable streaming for DMs + if cfg.DMStream != nil { + dmStream = *cfg.DMStream + } + groupStream := false // default: disable streaming for groups + if cfg.GroupStream != nil { + groupStream = *cfg.GroupStream + } + + ch := &Channel{ + BaseChannel: channels.NewBaseChannel(channels.TypeGoogleChat, msgBus, cfg.AllowFrom), + auth: auth, + projectID: cfg.ProjectID, + subscriptionID: cfg.SubscriptionID, + pullInterval: pullInterval, + botUser: cfg.BotUser, + dmPolicy: dmPolicy, + groupPolicy: groupPolicy, + requireMention: requireMention, + apiBase: chatAPIBase, + longFormThreshold: longFormThreshold, + longFormFormat: longFormFormat, + drivePermission: drivePermission, + driveDomain: driveDomain, + blockReply: cfg.BlockReply, + mediaMaxBytes: mediaMaxBytes, + fileRetentionDays: cfg.FileRetentionDays, + httpClient: &http.Client{Timeout: 30 * time.Second}, + dedup: newDedupCache(dedupTTL), + historyLimit: historyLimit, + dmStream: dmStream, + groupStream: groupStream, + groupHistory: channels.MakeHistory("google_chat", pendingStore), + } + + ch.BaseChannel.SetType(typeGoogleChat) + ch.BaseChannel.ValidatePolicy(dmPolicy, groupPolicy) + + return ch, nil +} + +// Start begins the Pub/Sub pull loop and optional Drive cleanup goroutine. +func (c *Channel) Start(ctx context.Context) error { + if c.IsRunning() { + return nil + } + + pullCtx, cancel := context.WithCancel(ctx) + c.pullCancel = cancel + c.pullDone = make(chan struct{}) + + go func() { + defer close(c.pullDone) + c.startPullLoop(pullCtx) + }() + + // Start Drive file cleanup goroutine if retention is configured. + if c.fileRetentionDays > 0 { + cleanupCtx, cleanupCancel := context.WithCancel(ctx) + c.cleanupCancel = cleanupCancel + go c.startDriveCleanupLoop(cleanupCtx) + } + + c.SetRunning(true) + slog.Info("googlechat: channel started", + "name", c.Name(), + "project", c.projectID, + "subscription", c.subscriptionID) + return nil +} + +// Stop gracefully shuts down the channel. +func (c *Channel) Stop(ctx context.Context) error { + if !c.IsRunning() { + return nil + } + + c.SetRunning(false) + + // Drain active streams to cancel flush timers and avoid goroutine leaks. + c.streams.Range(func(key, value any) bool { + cs := value.(*chatStream) + cs.stop(ctx) + c.streams.Delete(key) + return true + }) + + if c.cleanupCancel != nil { + c.cleanupCancel() + } + if c.pullCancel != nil { + c.pullCancel() + } + + // Wait for pull loop to drain (with timeout). + if c.pullDone != nil { + select { + case <-c.pullDone: + case <-time.After(shutdownDrainTimeout): + slog.Warn("googlechat: shutdown drain timeout exceeded") + } + } + + slog.Info("googlechat: channel stopped", "name", c.Name()) + return nil +} + +// SetPendingCompaction implements channels.PendingCompactable. +func (c *Channel) SetPendingCompaction(cfg *channels.CompactionConfig) { + if c.groupHistory != nil { + c.groupHistory.SetCompactionConfig(cfg) + } +} + +// BlockReplyEnabled implements channels.BlockReplyChannel. +func (c *Channel) BlockReplyEnabled() *bool { + return c.blockReply +} diff --git a/internal/channels/googlechat/media.go b/internal/channels/googlechat/media.go new file mode 100644 index 000000000..92ccd98c7 --- /dev/null +++ b/internal/channels/googlechat/media.go @@ -0,0 +1,290 @@ +package googlechat + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "mime/multipart" + "net/http" + "net/textproto" + "os" + "path/filepath" + "strings" + "time" + + "github.com/google/uuid" +) + +// downloadAttachment downloads a Chat API attachment to a temp file. +func (c *Channel) downloadAttachment(ctx context.Context, att chatAttachment) (string, error) { + if att.ResourceName == "" { + return "", fmt.Errorf("attachment has no resourceName") + } + + token, err := c.auth.Token(ctx) + if err != nil { + return "", err + } + + url := fmt.Sprintf("%s/media/%s?alt=media", chatAPIBase, att.ResourceName) + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return "", err + } + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := c.httpClient.Do(req) + if err != nil { + return "", fmt.Errorf("download attachment: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return "", fmt.Errorf("download attachment %d: %s", resp.StatusCode, string(body)) + } + + // Check size limit. + if c.mediaMaxBytes > 0 && resp.ContentLength > c.mediaMaxBytes { + return "", fmt.Errorf("attachment too large: %d bytes (max %d)", resp.ContentLength, c.mediaMaxBytes) + } + + // Determine extension from content type. + ext := extensionFromMIME(att.ContentType) + tmpPath := filepath.Join(os.TempDir(), uuid.New().String()+ext) + + f, err := os.Create(tmpPath) + if err != nil { + return "", err + } + defer f.Close() + + // Limit read to mediaMaxBytes. + reader := io.Reader(resp.Body) + if c.mediaMaxBytes > 0 { + reader = io.LimitReader(resp.Body, c.mediaMaxBytes+1) + } + n, err := io.Copy(f, reader) + if err != nil { + os.Remove(tmpPath) + return "", err + } + if c.mediaMaxBytes > 0 && n > c.mediaMaxBytes { + os.Remove(tmpPath) + return "", fmt.Errorf("attachment exceeded max size during download") + } + + slog.Debug("googlechat: attachment downloaded", "path", tmpPath, "size", n, "type", att.ContentType) + return tmpPath, nil +} + +// driveFileRecord tracks uploaded Drive files for retention cleanup. +type driveFileRecord struct { + FileID string + CreatedAt time.Time +} + +// startDriveCleanupLoop periodically deletes expired Drive files. +func (c *Channel) startDriveCleanupLoop(ctx context.Context) { + ticker := time.NewTicker(1 * time.Hour) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + c.cleanupExpiredDriveFiles(ctx) + } + } +} + +// cleanupExpiredDriveFiles deletes Drive files older than fileRetentionDays. +func (c *Channel) cleanupExpiredDriveFiles(ctx context.Context) { + c.driveFilesMu.Lock() + defer c.driveFilesMu.Unlock() + + cutoff := time.Now().AddDate(0, 0, -c.fileRetentionDays) + var remaining []driveFileRecord + for _, f := range c.driveFiles { + if f.CreatedAt.Before(cutoff) { + if err := c.deleteDriveFile(ctx, f.FileID); err != nil { + slog.Warn("googlechat: failed to delete expired drive file", "file_id", f.FileID, "error", err) + remaining = append(remaining, f) // retry next cycle + } else { + slog.Debug("googlechat: deleted expired drive file", "file_id", f.FileID) + } + } else { + remaining = append(remaining, f) + } + } + c.driveFiles = remaining +} + +// deleteDriveFile deletes a file from Google Drive. +func (c *Channel) deleteDriveFile(ctx context.Context, fileID string) error { + token, err := c.auth.Token(ctx) + if err != nil { + return err + } + + url := fmt.Sprintf("%s/files/%s", driveAPIBase, fileID) + req, err := http.NewRequestWithContext(ctx, "DELETE", url, nil) + if err != nil { + return err + } + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusNotFound { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("delete drive file %d: %s", resp.StatusCode, string(body)) + } + return nil +} + +// extensionFromMIME returns a file extension for common MIME types. +func extensionFromMIME(mime string) string { + switch { + case strings.HasPrefix(mime, "image/png"): + return ".png" + case strings.HasPrefix(mime, "image/jpeg"): + return ".jpg" + case strings.HasPrefix(mime, "image/gif"): + return ".gif" + case strings.HasPrefix(mime, "image/webp"): + return ".webp" + case strings.HasPrefix(mime, "application/pdf"): + return ".pdf" + case strings.HasPrefix(mime, "text/plain"): + return ".txt" + case strings.HasPrefix(mime, "text/markdown"): + return ".md" + default: + return "" + } +} + +// uploadToDrive uploads a file to Google Drive and returns the file ID and web link. +func (c *Channel) uploadToDrive(ctx context.Context, localPath string, fileName string, mimeType string) (fileID string, webLink string, err error) { + token, err := c.auth.Token(ctx) + if err != nil { + return "", "", err + } + + f, err := os.Open(localPath) + if err != nil { + return "", "", err + } + defer f.Close() + + // Build multipart upload body. + pr, pw := io.Pipe() + writer := multipart.NewWriter(pw) + + go func() { + defer pw.Close() + defer writer.Close() + + // Part 1: metadata + metaHeader := make(textproto.MIMEHeader) + metaHeader.Set("Content-Type", "application/json; charset=UTF-8") + metaPart, _ := writer.CreatePart(metaHeader) + json.NewEncoder(metaPart).Encode(map[string]string{ + "name": fileName, + "mimeType": mimeType, + }) + + // Part 2: file content + fileHeader := make(textproto.MIMEHeader) + fileHeader.Set("Content-Type", mimeType) + filePart, _ := writer.CreatePart(fileHeader) + io.Copy(filePart, f) + }() + + url := driveUploadBase + "/files?uploadType=multipart&fields=id,webViewLink" + req, err := http.NewRequestWithContext(ctx, "POST", url, pr) + if err != nil { + return "", "", err + } + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("Content-Type", "multipart/related; boundary="+writer.Boundary()) + + resp, err := c.httpClient.Do(req) + if err != nil { + return "", "", fmt.Errorf("drive upload: %w", err) + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode != http.StatusOK { + return "", "", fmt.Errorf("drive upload %d: %s", resp.StatusCode, string(body)) + } + + var result struct { + ID string `json:"id"` + WebViewLink string `json:"webViewLink"` + } + if err := json.Unmarshal(body, &result); err != nil { + return "", "", fmt.Errorf("parse drive response: %w", err) + } + + // Set permissions. + if err := c.setDrivePermission(ctx, result.ID); err != nil { + slog.Warn("googlechat: failed to set drive permission", "file_id", result.ID, "error", err) + } + + // Track for retention cleanup. + if c.fileRetentionDays > 0 { + c.driveFilesMu.Lock() + c.driveFiles = append(c.driveFiles, driveFileRecord{FileID: result.ID, CreatedAt: time.Now()}) + c.driveFilesMu.Unlock() + } + + return result.ID, result.WebViewLink, nil +} + +// setDrivePermission sets the sharing permission on a Drive file. +func (c *Channel) setDrivePermission(ctx context.Context, fileID string) error { + token, err := c.auth.Token(ctx) + if err != nil { + return err + } + + var perm map[string]string + switch c.drivePermission { + case "anyone": + perm = map[string]string{"type": "anyone", "role": "reader"} + default: // "domain" + perm = map[string]string{"type": "domain", "role": "reader", "domain": c.driveDomain} + } + + body, _ := json.Marshal(perm) + url := fmt.Sprintf("%s/files/%s/permissions", driveAPIBase, fileID) + + req, err := http.NewRequestWithContext(ctx, "POST", url, strings.NewReader(string(body))) + if err != nil { + return err + } + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + b, _ := io.ReadAll(resp.Body) + return fmt.Errorf("set permission %d: %s", resp.StatusCode, string(b)) + } + return nil +} diff --git a/internal/channels/googlechat/pubsub.go b/internal/channels/googlechat/pubsub.go new file mode 100644 index 000000000..d45f623c8 --- /dev/null +++ b/internal/channels/googlechat/pubsub.go @@ -0,0 +1,376 @@ +package googlechat + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "strings" + "sync" + "time" +) + +// chatEvent is the parsed representation of a Google Chat event from Pub/Sub. +type chatEvent struct { + Type string // MESSAGE, ADDED_TO_SPACE, REMOVED_FROM_SPACE, etc. + SenderID string // users/{userId} + SenderName string // display name + SpaceID string // spaces/{spaceId} + SpaceType string // DM, SPACE, ROOM + PeerKind string // "direct" or "group" + Text string // message text + MessageName string // spaces/{spaceId}/messages/{messageId} + ThreadName string // spaces/{spaceId}/threads/{threadId} + Attachments []chatAttachment // file attachments +} + +// chatAttachment represents a file attachment in a Google Chat message. +type chatAttachment struct { + Name string // attachment resource name + ContentType string + ResourceName string // for download via media API +} + +// parseEvent parses a base64-encoded Pub/Sub message data into a chatEvent. +func parseEvent(encodedData string) (*chatEvent, error) { + data, err := base64.StdEncoding.DecodeString(encodedData) + if err != nil { + return nil, fmt.Errorf("base64 decode: %w", err) + } + if len(data) == 0 { + return nil, fmt.Errorf("empty event data") + } + + var raw struct { + Type string `json:"type"` + Message struct { + Name string `json:"name"` + Text string `json:"text"` + Sender struct { + Name string `json:"name"` + DisplayName string `json:"displayName"` + } `json:"sender"` + Thread struct { + Name string `json:"name"` + } `json:"thread"` + Attachment []struct { + Name string `json:"name"` + ContentType string `json:"contentType"` + AttachmentDataRef struct { + ResourceName string `json:"resourceName"` + } `json:"attachmentDataRef"` + } `json:"attachment"` + } `json:"message"` + Space struct { + Name string `json:"name"` + Type string `json:"type"` + } `json:"space"` + User struct { + Name string `json:"name"` + } `json:"user"` + } + + if err := json.Unmarshal(data, &raw); err != nil { + return nil, fmt.Errorf("parse chat event: %w", err) + } + + evt := &chatEvent{ + Type: raw.Type, + SpaceID: raw.Space.Name, + SpaceType: raw.Space.Type, + } + + // Determine peer kind + switch raw.Space.Type { + case "DM": + evt.PeerKind = "direct" + default: // SPACE, ROOM + evt.PeerKind = "group" + } + + // Extract sender + if raw.Type == "MESSAGE" { + if raw.Message.Sender.Name == "" { + return nil, fmt.Errorf("MESSAGE event missing sender") + } + evt.SenderID = raw.Message.Sender.Name + evt.SenderName = raw.Message.Sender.DisplayName + evt.Text = raw.Message.Text + evt.MessageName = raw.Message.Name + evt.ThreadName = raw.Message.Thread.Name + + // Parse attachments + for _, att := range raw.Message.Attachment { + evt.Attachments = append(evt.Attachments, chatAttachment{ + Name: att.Name, + ContentType: att.ContentType, + ResourceName: att.AttachmentDataRef.ResourceName, + }) + } + } else if raw.Type == "ADDED_TO_SPACE" || raw.Type == "REMOVED_FROM_SPACE" { + evt.SenderID = raw.User.Name + } + + return evt, nil +} + +// dedupCache is a thread-safe cache for Pub/Sub message deduplication. +type dedupCache struct { + mu sync.Mutex + entries map[string]time.Time + ttl time.Duration +} + +func newDedupCache(ttl time.Duration) *dedupCache { + return &dedupCache{ + entries: make(map[string]time.Time), + ttl: ttl, + } +} + +// seen returns true if the messageID was already processed. +func (d *dedupCache) seen(messageID string) bool { + d.mu.Lock() + defer d.mu.Unlock() + + if t, ok := d.entries[messageID]; ok { + if time.Since(t) < d.ttl { + return true + } + delete(d.entries, messageID) + } + return false +} + +// add marks a messageID as processed. +func (d *dedupCache) add(messageID string) { + d.mu.Lock() + defer d.mu.Unlock() + d.entries[messageID] = time.Now() + + // Periodic cleanup of expired entries (every 100 adds). + if len(d.entries)%100 == 0 { + now := time.Now() + for k, t := range d.entries { + if now.Sub(t) > d.ttl { + delete(d.entries, k) + } + } + } +} + +// pubsubPullResponse is the response from Pub/Sub pull API. +type pubsubPullResponse struct { + ReceivedMessages []struct { + AckID string `json:"ackId"` + Message struct { + Data string `json:"data"` + MessageID string `json:"messageId"` + } `json:"message"` + } `json:"receivedMessages"` +} + +// pullMessages performs a single Pub/Sub pull request and returns received messages. +func pullMessages(ctx context.Context, auth *ServiceAccountAuth, httpClient *http.Client, projectID, subscriptionID string, maxMessages int) (*pubsubPullResponse, error) { + token, err := auth.Token(ctx) + if err != nil { + return nil, fmt.Errorf("get token: %w", err) + } + + url := fmt.Sprintf("%s/projects/%s/subscriptions/%s:pull", pubsubAPIBase, projectID, subscriptionID) + body := fmt.Sprintf(`{"maxMessages":%d}`, maxMessages) + + req, err := http.NewRequestWithContext(ctx, "POST", url, strings.NewReader(body)) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("Content-Type", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("pubsub pull: %w", err) + } + defer resp.Body.Close() + + respBody, _ := io.ReadAll(resp.Body) + + if resp.StatusCode == http.StatusOK && len(respBody) <= 2 { + // Empty response "{}" — no messages + return &pubsubPullResponse{}, nil + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("pubsub pull %d: %s", resp.StatusCode, string(respBody)) + } + + var result pubsubPullResponse + if err := json.Unmarshal(respBody, &result); err != nil { + return nil, fmt.Errorf("parse pull response: %w", err) + } + return &result, nil +} + +// ackMessages acknowledges received Pub/Sub messages. +func ackMessages(ctx context.Context, auth *ServiceAccountAuth, httpClient *http.Client, projectID, subscriptionID string, ackIDs []string) error { + if len(ackIDs) == 0 { + return nil + } + + token, err := auth.Token(ctx) + if err != nil { + return fmt.Errorf("get token: %w", err) + } + + url := fmt.Sprintf("%s/projects/%s/subscriptions/%s:acknowledge", pubsubAPIBase, projectID, subscriptionID) + + ackBody := struct { + AckIDs []string `json:"ackIds"` + }{AckIDs: ackIDs} + bodyBytes, _ := json.Marshal(ackBody) + + req, err := http.NewRequestWithContext(ctx, "POST", url, strings.NewReader(string(bodyBytes))) + if err != nil { + return err + } + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("Content-Type", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("pubsub ack: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("pubsub ack %d: %s", resp.StatusCode, string(body)) + } + return nil +} + +// startPullLoop runs the Pub/Sub pull loop. Blocks until ctx is cancelled. +func (c *Channel) startPullLoop(ctx context.Context) { + ticker := time.NewTicker(c.pullInterval) + defer ticker.Stop() + + slog.Info("googlechat: pubsub pull loop started", + "project", c.projectID, "subscription", c.subscriptionID, + "interval", c.pullInterval) + + for { + select { + case <-ctx.Done(): + slog.Info("googlechat: pubsub pull loop stopped") + return + case <-ticker.C: + c.doPull(ctx) + } + } +} + +// doPull performs a single pull cycle. +func (c *Channel) doPull(ctx context.Context) { + resp, err := pullMessages(ctx, c.auth, c.httpClient, c.projectID, c.subscriptionID, defaultPullMaxMessages) + if err != nil { + if ctx.Err() != nil { + return // context cancelled, normal shutdown + } + slog.Warn("googlechat: pubsub pull failed", "error", err) + return + } + + if len(resp.ReceivedMessages) == 0 { + return + } + + var ackIDs []string + for _, rm := range resp.ReceivedMessages { + ackIDs = append(ackIDs, rm.AckID) + + // Dedup check + if c.dedup.seen(rm.Message.MessageID) { + slog.Debug("googlechat: duplicate pubsub message, skipping", "message_id", rm.Message.MessageID) + continue + } + c.dedup.add(rm.Message.MessageID) + + evt, err := parseEvent(rm.Message.Data) + if err != nil { + slog.Warn("googlechat: malformed event, acking anyway", "error", err, "message_id", rm.Message.MessageID) + continue + } + + c.handleEvent(ctx, evt) + } + + // Ack all messages (including malformed ones to prevent infinite re-delivery). + if err := ackMessages(ctx, c.auth, c.httpClient, c.projectID, c.subscriptionID, ackIDs); err != nil { + slog.Warn("googlechat: ack failed", "error", err) + } +} + +// handleEvent dispatches a parsed chat event. +func (c *Channel) handleEvent(ctx context.Context, evt *chatEvent) { + // Filter bot self-messages. + if c.botUser != "" && evt.SenderID == c.botUser { + return + } + + switch evt.Type { + case "MESSAGE": + c.handleMessage(ctx, evt) + case "ADDED_TO_SPACE": + slog.Info("googlechat: added to space", "space", evt.SpaceID, "by", evt.SenderID) + case "REMOVED_FROM_SPACE": + slog.Info("googlechat: removed from space", "space", evt.SpaceID) + default: + slog.Debug("googlechat: ignoring event", "type", evt.Type, "space", evt.SpaceID) + } +} + +// handleMessage processes an inbound MESSAGE event. +func (c *Channel) handleMessage(ctx context.Context, evt *chatEvent) { + // Skip whitespace-only messages. + text := strings.TrimSpace(evt.Text) + if text == "" && len(evt.Attachments) == 0 { + return + } + + // Check DM/Group policy. + if !c.BaseChannel.CheckPolicy(evt.PeerKind, c.dmPolicy, c.groupPolicy, evt.SenderID) { + slog.Debug("googlechat: message rejected by policy", + "sender", evt.SenderID, "peer_kind", evt.PeerKind) + return + } + + // Store thread name for outbound routing (groups). + if evt.ThreadName != "" && evt.PeerKind == "group" { + threadKey := evt.SpaceID + ":" + evt.SenderID + c.threadIDs.Store(threadKey, evt.ThreadName) + } + + // Download attachments. + var mediaPaths []string + for _, att := range evt.Attachments { + path, err := c.downloadAttachment(ctx, att) + if err != nil { + slog.Warn("googlechat: attachment download failed", "error", err) + continue + } + mediaPaths = append(mediaPaths, path) + } + + metadata := map[string]string{ + "sender_name": evt.SenderName, + "message_name": evt.MessageName, + } + if evt.ThreadName != "" { + metadata["thread_name"] = evt.ThreadName + } + + c.BaseChannel.HandleMessage(evt.SenderID, evt.SpaceID, text, mediaPaths, metadata, evt.PeerKind) +} diff --git a/internal/channels/googlechat/pubsub_test.go b/internal/channels/googlechat/pubsub_test.go new file mode 100644 index 000000000..ea4dfcd0b --- /dev/null +++ b/internal/channels/googlechat/pubsub_test.go @@ -0,0 +1,205 @@ +package googlechat + +import ( + "encoding/base64" + "encoding/json" + "testing" +) + +func TestParseEvent_Message(t *testing.T) { + chatEvent := map[string]any{ + "type": "MESSAGE", + "message": map[string]any{ + "name": "spaces/AAA/messages/BBB", + "text": "hello bot", + "sender": map[string]any{ + "name": "users/12345", + "displayName": "Test User", + }, + "thread": map[string]any{ + "name": "spaces/AAA/threads/CCC", + }, + }, + "space": map[string]any{ + "name": "spaces/AAA", + "type": "DM", + }, + } + data, _ := json.Marshal(chatEvent) + encoded := base64.StdEncoding.EncodeToString(data) + + evt, err := parseEvent(encoded) + if err != nil { + t.Fatal(err) + } + if evt.Type != "MESSAGE" { + t.Errorf("type = %q, want MESSAGE", evt.Type) + } + if evt.SenderID != "users/12345" { + t.Errorf("senderID = %q, want users/12345", evt.SenderID) + } + if evt.SpaceID != "spaces/AAA" { + t.Errorf("spaceID = %q, want spaces/AAA", evt.SpaceID) + } + if evt.Text != "hello bot" { + t.Errorf("text = %q, want 'hello bot'", evt.Text) + } + if evt.PeerKind != "direct" { + t.Errorf("peerKind = %q, want direct", evt.PeerKind) + } + if evt.ThreadName != "spaces/AAA/threads/CCC" { + t.Errorf("threadName = %q, want spaces/AAA/threads/CCC", evt.ThreadName) + } +} + +func TestParseEvent_GroupSpace(t *testing.T) { + chatEvent := map[string]any{ + "type": "MESSAGE", + "message": map[string]any{ + "text": "hey", + "sender": map[string]any{ + "name": "users/999", + }, + }, + "space": map[string]any{ + "name": "spaces/GGG", + "type": "SPACE", + }, + } + data, _ := json.Marshal(chatEvent) + encoded := base64.StdEncoding.EncodeToString(data) + + evt, err := parseEvent(encoded) + if err != nil { + t.Fatal(err) + } + if evt.PeerKind != "group" { + t.Errorf("peerKind = %q, want group", evt.PeerKind) + } +} + +func TestParseEvent_AddedToSpace(t *testing.T) { + chatEvent := map[string]any{ + "type": "ADDED_TO_SPACE", + "space": map[string]any{ + "name": "spaces/AAA", + "type": "DM", + }, + "user": map[string]any{ + "name": "users/12345", + }, + } + data, _ := json.Marshal(chatEvent) + encoded := base64.StdEncoding.EncodeToString(data) + + evt, err := parseEvent(encoded) + if err != nil { + t.Fatal(err) + } + if evt.Type != "ADDED_TO_SPACE" { + t.Errorf("type = %q, want ADDED_TO_SPACE", evt.Type) + } +} + +func TestParseEvent_MalformedJSON(t *testing.T) { + encoded := base64.StdEncoding.EncodeToString([]byte("{bad json")) + _, err := parseEvent(encoded) + if err == nil { + t.Fatal("expected error for malformed JSON") + } +} + +func TestParseEvent_EmptyData(t *testing.T) { + encoded := base64.StdEncoding.EncodeToString([]byte("")) + _, err := parseEvent(encoded) + if err == nil { + t.Fatal("expected error for empty data") + } +} + +func TestParseEvent_MissingSender(t *testing.T) { + chatEvent := map[string]any{ + "type": "MESSAGE", + "message": map[string]any{"text": "hello"}, + "space": map[string]any{"name": "spaces/AAA", "type": "DM"}, + } + data, _ := json.Marshal(chatEvent) + encoded := base64.StdEncoding.EncodeToString(data) + + _, err := parseEvent(encoded) + if err == nil { + t.Fatal("expected error for missing sender") + } +} + +func TestParseEvent_BotSelfFilter(t *testing.T) { + chatEvent := map[string]any{ + "type": "MESSAGE", + "message": map[string]any{ + "text": "bot reply", + "sender": map[string]any{"name": "users/BOT123"}, + }, + "space": map[string]any{"name": "spaces/AAA", "type": "DM"}, + } + data, _ := json.Marshal(chatEvent) + encoded := base64.StdEncoding.EncodeToString(data) + + evt, err := parseEvent(encoded) + if err != nil { + t.Fatal(err) + } + if evt.SenderID != "users/BOT123" { + t.Errorf("senderID = %q", evt.SenderID) + } +} + +func TestParseEvent_WithAttachment(t *testing.T) { + chatEvent := map[string]any{ + "type": "MESSAGE", + "message": map[string]any{ + "text": "", + "sender": map[string]any{"name": "users/12345"}, + "attachment": []any{ + map[string]any{ + "name": "spaces/AAA/messages/BBB/attachments/CCC", + "contentType": "image/png", + "attachmentDataRef": map[string]any{ + "resourceName": "spaces/AAA/attachments/CCC", + }, + }, + }, + }, + "space": map[string]any{"name": "spaces/AAA", "type": "DM"}, + } + data, _ := json.Marshal(chatEvent) + encoded := base64.StdEncoding.EncodeToString(data) + + evt, err := parseEvent(encoded) + if err != nil { + t.Fatal(err) + } + if len(evt.Attachments) != 1 { + t.Fatalf("attachments = %d, want 1", len(evt.Attachments)) + } + if evt.Attachments[0].ResourceName != "spaces/AAA/attachments/CCC" { + t.Errorf("resourceName = %q", evt.Attachments[0].ResourceName) + } +} + +func TestDedupCache(t *testing.T) { + cache := newDedupCache(dedupTTL) + + if cache.seen("msg1") { + t.Error("msg1 should not be seen yet") + } + cache.add("msg1") + if !cache.seen("msg1") { + t.Error("msg1 should be seen after add") + } + if !cache.seen("msg1") { + t.Error("msg1 should still be seen") + } + if cache.seen("msg2") { + t.Error("msg2 should not be seen") + } +} diff --git a/internal/channels/googlechat/send.go b/internal/channels/googlechat/send.go new file mode 100644 index 000000000..ee7a47ee8 --- /dev/null +++ b/internal/channels/googlechat/send.go @@ -0,0 +1,513 @@ +package googlechat + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log/slog" + "math" + "net/http" + "os" + "path/filepath" + "strings" + "time" + + "github.com/google/uuid" + "github.com/nextlevelbuilder/goclaw/internal/bus" +) + +// sendTextMessage sends a plain text message to a Google Chat space. +func sendTextMessage(ctx context.Context, apiBase, token string, httpClient *http.Client, msg bus.OutboundMessage, threadName, replyOption string) error { + _, err := sendTextMessageWithResponse(ctx, apiBase, token, httpClient, msg, threadName, replyOption) + return err +} + +// sendTextMessageWithResponse sends a text message and returns the API response (for thread chaining). +func sendTextMessageWithResponse(ctx context.Context, apiBase, token string, httpClient *http.Client, msg bus.OutboundMessage, threadName, replyOption string) (*chatMessageResponse, error) { + text := strings.TrimSpace(msg.Content) + if text == "" { + return nil, nil + } + + body := map[string]any{ + "text": markdownToGoogleChat(text), + } + if threadName != "" { + body["thread"] = map[string]string{"name": threadName} + } + + return postChatMessage(ctx, apiBase, token, httpClient, msg.ChatID, body, replyOption) +} + +// sendCardMessage sends a Card V2 message. +func sendCardMessage(ctx context.Context, apiBase, token string, httpClient *http.Client, chatID string, card map[string]any, threadName, replyOption string) error { + if threadName != "" { + card["thread"] = map[string]string{"name": threadName} + } + _, err := postChatMessage(ctx, apiBase, token, httpClient, chatID, card, replyOption) + return err +} + +// chatMessageResponse is the response from Chat API message operations. +type chatMessageResponse struct { + Name string `json:"name"` // spaces/{space}/messages/{message} + Thread struct { + Name string `json:"name"` // spaces/{space}/threads/{thread} + } `json:"thread"` +} + +// postChatMessage sends a message to the Chat API with retry logic. +func postChatMessage(ctx context.Context, apiBase, token string, httpClient *http.Client, spaceID string, body map[string]any, replyOption string) (*chatMessageResponse, error) { + bodyBytes, err := json.Marshal(body) + if err != nil { + return nil, err + } + + url := fmt.Sprintf("%s/%s/messages", apiBase, spaceID) + if replyOption != "" { + url += "?messageReplyOption=" + replyOption + } + + var result chatMessageResponse + err = retrySend(ctx, httpClient, func() (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, "POST", url, strings.NewReader(string(bodyBytes))) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("Content-Type", "application/json") + return httpClient.Do(req) + }, &result) + if err != nil { + return nil, err + } + return &result, nil +} + +// editMessage edits an existing message. +func editMessage(ctx context.Context, apiBase, token string, httpClient *http.Client, messageName string, text string) error { + body := map[string]any{ + "text": text, + } + bodyBytes, _ := json.Marshal(body) + + url := fmt.Sprintf("%s/%s?updateMask=text", apiBase, messageName) + + return retrySend(ctx, httpClient, func() (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, "PATCH", url, strings.NewReader(string(bodyBytes))) + if err != nil { + return nil, err + } + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("Content-Type", "application/json") + return httpClient.Do(req) + }) +} + +// deleteMessage deletes a message. +func deleteMessage(ctx context.Context, apiBase, token string, httpClient *http.Client, messageName string) error { + url := fmt.Sprintf("%s/%s", apiBase, messageName) + + req, err := http.NewRequestWithContext(ctx, "DELETE", url, nil) + if err != nil { + return err + } + req.Header.Set("Authorization", "Bearer "+token) + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("delete message %d: %s", resp.StatusCode, string(body)) + } + return nil +} + +// retrySend retries an HTTP request with exponential backoff on 429/5xx. +func retrySend(ctx context.Context, httpClient *http.Client, doReq func() (*http.Response, error), result ...any) error { + delay := retrySendBaseDelay + for attempt := 0; attempt < retrySendMaxAttempts; attempt++ { + resp, err := doReq() + if err != nil { + if ctx.Err() != nil { + return ctx.Err() + } + return err + } + + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + if len(result) > 0 && result[0] != nil { + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + json.Unmarshal(body, result[0]) + } else { + resp.Body.Close() + } + return nil + } + + body, _ := io.ReadAll(resp.Body) + resp.Body.Close() + + if resp.StatusCode == 429 || resp.StatusCode >= 500 { + if attempt < retrySendMaxAttempts-1 { + slog.Debug("googlechat: retrying send", "status", resp.StatusCode, "attempt", attempt+1, "delay", delay) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + delay = time.Duration(math.Min(float64(delay*2), float64(retrySendMaxDelay))) + continue + } + } + + return fmt.Errorf("chat API %d: %s", resp.StatusCode, string(body)) + } + return fmt.Errorf("chat API: max retries exceeded") +} + +// buildCardMessage creates a Cards V2 message from content with tables/code. +// Handles: # (card title), ## (section header), ### (bold subtitle), +// tables (decoratedText for 2-col,
 for wider), bullets, code blocks, dividers.
+func buildCardMessage(content string) map[string]any {
+	if !detectStructuredContent(content) {
+		return nil
+	}
+
+	var sections []map[string]any
+	lines := strings.Split(content, "\n")
+	var currentWidgets []map[string]any
+	var currentHeader string // section-level header text
+	var inTable bool
+	var tableRows []string
+	var inCodeBlock bool
+	var codeLines []string
+
+	flushWidgets := func() {
+		if len(currentWidgets) == 0 {
+			return
+		}
+		section := map[string]any{"widgets": currentWidgets}
+		if currentHeader != "" {
+			section["header"] = currentHeader
+			currentHeader = ""
+		}
+		sections = append(sections, section)
+		currentWidgets = nil
+	}
+
+	flushTable := func() {
+		if len(tableRows) == 0 {
+			return
+		}
+		tw := buildTableWidget(tableRows)
+		tableRows = nil
+		if tw == nil {
+			return
+		}
+		// buildTableWidget returns {"__widgets": [...]} for 2-col decoratedText
+		if multiWidgets, ok := tw["__widgets"]; ok {
+			currentWidgets = append(currentWidgets, multiWidgets.([]map[string]any)...)
+		} else {
+			currentWidgets = append(currentWidgets, tw)
+		}
+	}
+
+	flushCodeBlock := func() {
+		if len(codeLines) == 0 {
+			return
+		}
+		codeText := strings.Join(codeLines, "\n")
+		currentWidgets = append(currentWidgets, map[string]any{
+			"textParagraph": map[string]string{
+				"text": "
" + codeText + "
", + }, + }) + codeLines = nil + } + + for _, line := range lines { + trimmed := strings.TrimSpace(line) + + // Code block toggle. + if strings.HasPrefix(trimmed, "```") { + if inCodeBlock { + flushCodeBlock() + inCodeBlock = false + } else { + inCodeBlock = true + } + continue + } + if inCodeBlock { + codeLines = append(codeLines, line) + continue + } + + // Table row detection. + if strings.HasPrefix(trimmed, "|") && strings.HasSuffix(trimmed, "|") { + inTable = true + if isSeparatorRow(trimmed) { + continue + } + tableRows = append(tableRows, trimmed) + continue + } + if inTable { + flushTable() + inTable = false + } + + // Horizontal rule → divider. + if trimmed == "---" || trimmed == "***" || trimmed == "___" { + currentWidgets = append(currentWidgets, map[string]any{"divider": map[string]any{}}) + continue + } + + // # heading → card title (handled below), start new section + if strings.HasPrefix(trimmed, "# ") && !strings.HasPrefix(trimmed, "## ") { + flushWidgets() + continue + } + + // ## heading → section header (CardV2 section.header) + if strings.HasPrefix(trimmed, "## ") && !strings.HasPrefix(trimmed, "### ") { + flushWidgets() + currentHeader = strings.TrimPrefix(trimmed, "## ") + continue + } + + // ### heading → bold subtitle widget + if strings.HasPrefix(trimmed, "### ") { + headerText := strings.TrimPrefix(trimmed, "### ") + currentWidgets = append(currentWidgets, map[string]any{ + "decoratedText": map[string]any{ + "text": "" + markdownToGoogleChatHTML(headerText) + "", + }, + }) + continue + } + + // Empty line → skip + if trimmed == "" { + continue + } + + // Regular text (including bullets) → textParagraph with HTML + currentWidgets = append(currentWidgets, map[string]any{ + "textParagraph": map[string]string{ + "text": markdownToGoogleChatHTML(trimmed), + }, + }) + } + + flushCodeBlock() + flushTable() + flushWidgets() + + if len(sections) == 0 { + return nil + } + + title := "Response" + for _, line := range lines { + t := strings.TrimSpace(line) + if strings.HasPrefix(t, "# ") && !strings.HasPrefix(t, "## ") { + title = strings.TrimPrefix(t, "# ") + break + } + } + + return map[string]any{ + "cardsV2": []map[string]any{{ + "card": map[string]any{ + "header": map[string]string{"title": title}, + "sections": sections, + }, + }}, + } +} + +// isSeparatorRow checks if a table row is a separator (e.g. |---|---|). +func isSeparatorRow(row string) bool { + inner := strings.Trim(row, "|") + for _, ch := range inner { + if ch != '-' && ch != ':' && ch != ' ' && ch != '|' { + return false + } + } + return true +} + +// Send implements the Channel interface for outbound messages. +func (c *Channel) Send(ctx context.Context, msg bus.OutboundMessage) error { + content := strings.TrimSpace(msg.Content) + if content == "" && len(msg.Media) == 0 { + return nil + } + + token, err := c.auth.Token(ctx) + if err != nil { + return err + } + + // Determine thread context. + peerKind := msg.Metadata["peer_kind"] + threadName := "" + replyOption := "" + if peerKind == "group" { + if tn, ok := msg.Metadata["thread_name"]; ok { + threadName = tn + } else { + senderID := msg.Metadata["sender_id"] + threadKey := msg.ChatID + ":" + senderID + if v, ok := c.threadIDs.Load(threadKey); ok { + threadName = v.(string) + } + } + replyOption = "REPLY_MESSAGE_FALLBACK_TO_NEW_THREAD" + } + + // Placeholder update (e.g. tool status, LLM retry): edit placeholder but keep it + // alive for the final response. Same pattern as Telegram/Discord/Slack. + if msg.Metadata["placeholder_update"] == "true" { + if pName, ok := c.placeholders.Load(msg.ChatID); ok { + _ = editMessage(ctx, c.apiBase, token, c.httpClient, pName.(string), markdownToGoogleChat(content)) + } + return nil + } + + // Check for placeholder edit (Thinking... → final response). + if placeholderName, ok := c.placeholders.Load(msg.ChatID); ok { + c.placeholders.Delete(msg.ChatID) + pName := placeholderName.(string) + + if len([]byte(content)) <= googleChatMaxMessageBytes && !detectStructuredContent(content) && + (c.longFormThreshold == 0 || len(content) <= c.longFormThreshold) { + if err := editMessage(ctx, c.apiBase, token, c.httpClient, pName, markdownToGoogleChat(content)); err != nil { + slog.Warn("googlechat: placeholder edit failed, sending new", "error", err) + } else { + return nil + } + } + deleteMessage(ctx, c.apiBase, token, c.httpClient, pName) + } + + // Long-form content → file attachment. + if c.longFormThreshold > 0 && len(content) > c.longFormThreshold { + if err := c.sendLongForm(ctx, token, msg, content, threadName, replyOption); err != nil { + slog.Warn("googlechat: long-form send failed, falling back to chunks", "error", err) + } else { + return nil + } + } + + // Card message for structured content. + if card := buildCardMessage(content); card != nil { + return sendCardMessage(ctx, c.apiBase, token, c.httpClient, msg.ChatID, card, threadName, replyOption) + } + + // Chunked plain text. + chunks := chunkByBytes(content, googleChatMaxMessageBytes) + currentThread := threadName + for i, chunk := range chunks { + chunkMsg := msg + chunkMsg.Content = chunk + resp, err := sendTextMessageWithResponse(ctx, c.apiBase, token, c.httpClient, chunkMsg, currentThread, replyOption) + if err != nil { + return fmt.Errorf("send chunk %d/%d: %w", i+1, len(chunks), err) + } + if resp != nil && resp.Thread.Name != "" { + currentThread = resp.Thread.Name + } + } + + return nil +} + +// sendLongForm uploads content as a file and sends a summary message. +func (c *Channel) sendLongForm(ctx context.Context, token string, msg bus.OutboundMessage, content, threadName, replyOption string) error { + summary := extractSummary(content) + + ext := ".md" + if c.longFormFormat == "txt" { + ext = ".txt" + } + tmpPath := filepath.Join(os.TempDir(), uuid.New().String()+ext) + if err := os.WriteFile(tmpPath, []byte(content), 0644); err != nil { + return err + } + defer os.Remove(tmpPath) + + mimeType := "text/markdown" + if c.longFormFormat == "txt" { + mimeType = "text/plain" + } + _, webLink, err := c.uploadToDrive(ctx, tmpPath, "response"+ext, mimeType) + if err != nil { + return err + } + + summaryText := markdownToGoogleChat(summary) + "\n\n📎 " + webLink + body := map[string]any{ + "text": summaryText, + } + if threadName != "" { + body["thread"] = map[string]string{"name": threadName} + } + + _, err = postChatMessage(ctx, c.apiBase, token, c.httpClient, msg.ChatID, body, replyOption) + return err +} + +// sendPlaceholder sends a "Thinking..." placeholder message and stores its name. +func (c *Channel) sendPlaceholder(ctx context.Context, chatID, threadName, replyOption string) { + token, err := c.auth.Token(ctx) + if err != nil { + slog.Warn("googlechat: placeholder auth failed", "error", err) + return + } + + body := map[string]any{ + "text": "🤔 Thinking...", + } + if threadName != "" { + body["thread"] = map[string]string{"name": threadName} + } + + bodyBytes, _ := json.Marshal(body) + url := fmt.Sprintf("%s/%s/messages", c.apiBase, chatID) + if replyOption != "" { + url += "?messageReplyOption=" + replyOption + } + + req, err := http.NewRequestWithContext(ctx, "POST", url, strings.NewReader(string(bodyBytes))) + if err != nil { + return + } + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(req) + if err != nil { + return + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + var result struct { + Name string `json:"name"` + } + respBody, _ := io.ReadAll(resp.Body) + if json.Unmarshal(respBody, &result) == nil && result.Name != "" { + c.placeholders.Store(chatID, result.Name) + slog.Debug("googlechat: placeholder sent", "chat_id", chatID, "name", result.Name) + } + } +} diff --git a/internal/channels/googlechat/send_test.go b/internal/channels/googlechat/send_test.go new file mode 100644 index 000000000..097fe160c --- /dev/null +++ b/internal/channels/googlechat/send_test.go @@ -0,0 +1,71 @@ +package googlechat + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/nextlevelbuilder/goclaw/internal/bus" +) + +// mockChatAPI creates an httptest server mimicking Google Chat API. +func mockChatAPI(t *testing.T, handler func(w http.ResponseWriter, r *http.Request)) (*httptest.Server, string) { + t.Helper() + ts := httptest.NewServer(http.HandlerFunc(handler)) + return ts, ts.URL +} + +func TestSendMessage_ShortDM(t *testing.T) { + var sentBody map[string]any + ts, baseURL := mockChatAPI(t, func(w http.ResponseWriter, r *http.Request) { + json.NewDecoder(r.Body).Decode(&sentBody) + json.NewEncoder(w).Encode(map[string]any{ + "name": "spaces/DM1/messages/123", + }) + }) + defer ts.Close() + + msg := bus.OutboundMessage{ + ChatID: "spaces/DM1", + Content: "Hello world", + Metadata: map[string]string{ + "peer_kind": "direct", + }, + } + + err := sendTextMessage(context.Background(), baseURL, "fake-token", &http.Client{}, msg, "", "") + if err != nil { + t.Fatal(err) + } + if sentBody["text"] == nil { + t.Error("expected text in sent body") + } +} + +func TestSendMessage_EmptyContent(t *testing.T) { + msg := bus.OutboundMessage{ + ChatID: "spaces/DM1", + Content: "", + } + + err := sendTextMessage(context.Background(), "http://unused", "fake", &http.Client{}, msg, "", "") + if err != nil { + t.Fatal("empty content should not error") + } +} + +func TestBuildCardMessage_Table(t *testing.T) { + content := "# Results\n\n| Name | Score |\n|---|---|\n| Alice | 95 |\n| Bob | 87 |" + card := buildCardMessage(content) + if card == nil { + t.Fatal("expected card for table content") + } + cardJSON, _ := json.Marshal(card) + s := string(cardJSON) + if !strings.Contains(s, "cardsV2") { + t.Error("card JSON should contain cardsV2") + } +} diff --git a/internal/channels/googlechat/stream.go b/internal/channels/googlechat/stream.go new file mode 100644 index 000000000..68aa89916 --- /dev/null +++ b/internal/channels/googlechat/stream.go @@ -0,0 +1,231 @@ +package googlechat + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/nextlevelbuilder/goclaw/internal/channels" +) + +// chatStream manages streaming preview state for a single Google Chat conversation. +// Ref: Telegram DraftStream in telegram/stream.go — same throttle/dedup pattern, +// adapted for Google Chat PATCH API (updateMask=text). +type chatStream struct { + ch *Channel // back-reference for auth + HTTP client + messageName string // "spaces/xxx/messages/yyy" + lastText string // dedup: skip edit if unchanged + lastEdit time.Time // throttle tracking + throttle time.Duration // default 1500ms + mu sync.Mutex + stopped bool + pending string // buffered text during throttle window + flushTimer *time.Timer // fires after throttle to send pending +} + +// newChatStream creates a streaming state manager for a conversation. +func newChatStream(ch *Channel, messageName string) *chatStream { + return &chatStream{ + ch: ch, + messageName: messageName, + throttle: defaultStreamThrottle, + } +} + +// update sends or buffers new streaming text. Throttled and deduped. +func (cs *chatStream) update(ctx context.Context, fullText string) { + cs.mu.Lock() + defer cs.mu.Unlock() + + if cs.stopped { + return + } + + // Dedup + if fullText == cs.lastText { + return + } + + cs.pending = fullText + + // Throttle: buffer if too soon + if time.Since(cs.lastEdit) < cs.throttle { + cs.resetFlushTimer() + return + } + + cs.cancelFlushTimer() + cs.doFlush(ctx) +} + +// stop finalizes the stream: cancel timer, flush pending, mark stopped. +func (cs *chatStream) stop(ctx context.Context) { + cs.mu.Lock() + defer cs.mu.Unlock() + + cs.stopped = true + cs.cancelFlushTimer() + cs.doFlush(ctx) +} + +// doFlush sends pending text via PATCH edit. Must hold mu lock. +func (cs *chatStream) doFlush(ctx context.Context) { + if cs.pending == "" || cs.pending == cs.lastText { + return + } + + text := cs.pending + formatted := markdownToGoogleChat(text) + + // Truncate to fit Google Chat limit + if len([]byte(formatted)) > googleChatMaxMessageBytes { + formatted = truncateBytes(formatted, googleChatMaxMessageBytes-len([]byte("…"))) + "…" + } + + token, err := cs.ch.auth.Token(ctx) + if err != nil { + slog.Warn("googlechat: stream flush auth failed", "error", err) + return + } + + if err := editMessage(ctx, cs.ch.apiBase, token, cs.ch.httpClient, cs.messageName, formatted); err != nil { + slog.Warn("googlechat: stream edit failed", "error", err, "name", cs.messageName) + return + } + + cs.lastText = text + cs.lastEdit = time.Now() +} + +// resetFlushTimer starts or resets the timer to flush pending text after +// the remaining throttle interval. Must hold mu lock. +func (cs *chatStream) resetFlushTimer() { + if cs.flushTimer != nil { + cs.flushTimer.Stop() + } + remaining := cs.throttle - time.Since(cs.lastEdit) + if remaining <= 0 { + remaining = cs.throttle + } + // Timer callback runs on a separate goroutine after the caller releases mu. + // Uses context.Background() intentionally — the originating request context + // may be cancelled by then; a best-effort flush is acceptable for streaming previews. + cs.flushTimer = time.AfterFunc(remaining, func() { + cs.mu.Lock() + defer cs.mu.Unlock() + if !cs.stopped { + cs.doFlush(context.Background()) + } + }) +} + +// cancelFlushTimer stops any pending flush timer. Must hold mu lock. +func (cs *chatStream) cancelFlushTimer() { + if cs.flushTimer != nil { + cs.flushTimer.Stop() + cs.flushTimer = nil + } +} + +// truncateBytes truncates a string to maxBytes without cutting mid-UTF8. +func truncateBytes(s string, maxBytes int) string { + b := []byte(s) + if len(b) <= maxBytes { + return s + } + if maxBytes <= 0 { + return "" + } + // Don't cut in the middle of a UTF-8 sequence + for maxBytes > 0 && maxBytes < len(b) && b[maxBytes]>>6 == 0b10 { + maxBytes-- + } + return string(b[:maxBytes]) +} + +// --- ChannelStream interface implementation (chatStream) --- + +// Update sends or edits the streaming message with the latest accumulated text. +// Implements channels.ChannelStream. +func (cs *chatStream) Update(ctx context.Context, text string) { + cs.update(ctx, text) +} + +// Stop finalizes the stream with a final flush. +// Implements channels.ChannelStream. +func (cs *chatStream) Stop(ctx context.Context) error { + cs.stop(ctx) + return nil +} + +// MessageID returns 0 — Google Chat uses string messageName, not int IDs. +// FinalizeStream handles the Google Chat-specific placeholder handoff via type assertion. +// Implements channels.ChannelStream. +func (cs *chatStream) MessageID() int { return 0 } + +// MessageName returns the Google Chat message resource name for FinalizeStream handoff. +func (cs *chatStream) MessageName() string { + cs.mu.Lock() + defer cs.mu.Unlock() + return cs.messageName +} + +// --- StreamingChannel interface implementation (Channel) --- + +// StreamEnabled returns whether streaming is enabled for DMs or groups. +func (c *Channel) StreamEnabled(isGroup bool) bool { + if isGroup { + return c.groupStream + } + return c.dmStream +} + +// CreateStream creates a per-run streaming handle for the given chatID. +// Implements channels.StreamingChannel. +// +// Reuses existing placeholder message if available (from sendPlaceholder or +// a previous FinalizeStream), otherwise creates a new "⏳" message. +func (c *Channel) CreateStream(ctx context.Context, chatID string, _ bool) (channels.ChannelStream, error) { + var messageName string + + // Check for existing placeholder (from sendPlaceholder or previous FinalizeStream) + if v, ok := c.placeholders.Load(chatID); ok { + c.placeholders.Delete(chatID) + messageName = v.(string) + slog.Info("googlechat: stream reusing placeholder", "chat_id", chatID, "name", messageName) + } else { + // Create new stream message + token, err := c.auth.Token(ctx) + if err != nil { + return nil, err + } + resp, err := postChatMessage(ctx, c.apiBase, token, c.httpClient, chatID, + map[string]any{"text": "⏳"}, "") + if err != nil { + return nil, fmt.Errorf("googlechat: create stream message: %w", err) + } + messageName = resp.Name + slog.Info("googlechat: stream created new message", "chat_id", chatID, "name", messageName) + } + + cs := newChatStream(c, messageName) + return cs, nil +} + +// FinalizeStream hands the stream's messageName back to the placeholders map so that +// Send() can edit it with the properly formatted final response. +// Implements channels.StreamingChannel. +func (c *Channel) FinalizeStream(_ context.Context, chatID string, stream channels.ChannelStream) { + cs, ok := stream.(*chatStream) + if !ok || cs.MessageName() == "" { + return + } + c.placeholders.Store(chatID, cs.MessageName()) + slog.Info("googlechat: stream ended, handing off to Send()", "chat_id", chatID, "name", cs.MessageName()) +} + +// ReasoningStreamEnabled returns false — Google Chat doesn't support reasoning lanes yet. +// The PATCH-based streaming doesn't support dual-lane preview like Telegram DraftStream. +func (c *Channel) ReasoningStreamEnabled() bool { return false } diff --git a/internal/channels/googlechat/stream_test.go b/internal/channels/googlechat/stream_test.go new file mode 100644 index 000000000..9a5535493 --- /dev/null +++ b/internal/channels/googlechat/stream_test.go @@ -0,0 +1,444 @@ +package googlechat + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/nextlevelbuilder/goclaw/internal/bus" +) + +// requestRecord captures an HTTP request for test assertions. +type requestRecord struct { + Method string + URL string + Body map[string]any +} + +// streamTestEnv holds test infrastructure for stream tests. +type streamTestEnv struct { + ch *Channel + srv *httptest.Server + mu sync.Mutex + recs []requestRecord +} + +// getRecords returns a snapshot of recorded requests (thread-safe). +func (e *streamTestEnv) getRecords() []requestRecord { + e.mu.Lock() + defer e.mu.Unlock() + out := make([]requestRecord, len(e.recs)) + copy(out, e.recs) + return out +} + +// testStreamEnv creates a Channel with mock auth and HTTP server for stream tests. +func testStreamEnv(t *testing.T) *streamTestEnv { + t.Helper() + env := &streamTestEnv{} + + env.srv = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var body map[string]any + json.NewDecoder(r.Body).Decode(&body) + env.mu.Lock() + env.recs = append(env.recs, requestRecord{ + Method: r.Method, + URL: r.URL.String(), + Body: body, + }) + env.mu.Unlock() + + if r.Method == "POST" { + json.NewEncoder(w).Encode(map[string]any{ + "name": "spaces/test/messages/new-1", + "thread": map[string]string{"name": "spaces/test/threads/t1"}, + }) + return + } + w.WriteHeader(http.StatusOK) + })) + t.Cleanup(env.srv.Close) + + env.ch = &Channel{ + auth: &ServiceAccountAuth{ + token: "test-token", + expiresAt: time.Now().Add(1 * time.Hour), + }, + apiBase: env.srv.URL, + httpClient: env.srv.Client(), + dmStream: true, + groupStream: false, + longFormThreshold: longFormThresholdDefault, + } + + return env +} + +func TestChatStream_Dedup(t *testing.T) { + env := testStreamEnv(t) + cs := newChatStream(env.ch, "spaces/test/messages/m1") + + ctx := context.Background() + + // First update should send + cs.update(ctx, "Hello") + // Same text should be deduped + cs.update(ctx, "Hello") + // Wait for any flush timers + time.Sleep(50 * time.Millisecond) + + recs := env.getRecords() + patchCount := 0 + for _, r := range recs { + if r.Method == "PATCH" { + patchCount++ + } + } + if patchCount != 1 { + t.Errorf("expected 1 PATCH (dedup), got %d", patchCount) + } +} + +func TestChatStream_Throttle(t *testing.T) { + env := testStreamEnv(t) + cs := newChatStream(env.ch, "spaces/test/messages/m1") + ctx := context.Background() + + // First update sends immediately + cs.update(ctx, "chunk1") + // Second update within throttle window should be buffered + cs.update(ctx, "chunk2") + // Third update within throttle window should replace pending + cs.update(ctx, "chunk3") + + // Only 1 PATCH should have been sent (chunk1) + time.Sleep(50 * time.Millisecond) // let goroutines settle + recs := env.getRecords() + immediatePatch := 0 + for _, r := range recs { + if r.Method == "PATCH" { + immediatePatch++ + } + } + if immediatePatch != 1 { + t.Errorf("expected 1 immediate PATCH, got %d", immediatePatch) + } + + // Wait for flush timer to fire + time.Sleep(defaultStreamThrottle + 200*time.Millisecond) + + recs = env.getRecords() + totalPatch := 0 + for _, r := range recs { + if r.Method == "PATCH" { + totalPatch++ + } + } + // Should now have 2 PATCHes: immediate (chunk1) + timer flush (chunk3) + if totalPatch != 2 { + t.Errorf("expected 2 total PATCHes after timer, got %d", totalPatch) + } +} + +func TestChatStream_PendingFlush(t *testing.T) { + env := testStreamEnv(t) + cs := newChatStream(env.ch, "spaces/test/messages/m1") + ctx := context.Background() + + // Send first to start throttle window + cs.update(ctx, "first") + // Buffer pending + cs.update(ctx, "pending-text") + // Stop should flush pending + cs.stop(ctx) + + recs := env.getRecords() + patchCount := 0 + var lastBody map[string]any + for _, r := range recs { + if r.Method == "PATCH" { + patchCount++ + lastBody = r.Body + } + } + if patchCount != 2 { + t.Errorf("expected 2 PATCHes (initial + flush), got %d", patchCount) + } + if lastBody != nil { + if text, ok := lastBody["text"].(string); ok { + if !strings.Contains(text, "pending") { + t.Errorf("final flush should contain pending text, got %q", text) + } + } + } +} + +func TestStreamEnabled_Config(t *testing.T) { + ch := &Channel{dmStream: true, groupStream: false} + if !ch.StreamEnabled(false) { + t.Error("DM streaming should be enabled") + } + if ch.StreamEnabled(true) { + t.Error("group streaming should be disabled") + } + + ch2 := &Channel{dmStream: false, groupStream: true} + if ch2.StreamEnabled(false) { + t.Error("DM streaming should be disabled") + } + if !ch2.StreamEnabled(true) { + t.Error("group streaming should be enabled") + } +} + +func TestCreateStream_ReusePlaceholder(t *testing.T) { + env := testStreamEnv(t) + ctx := context.Background() + + // Pre-store a placeholder + env.ch.placeholders.Store("spaces/test", "spaces/test/messages/placeholder-1") + + stream, err := env.ch.CreateStream(ctx, "spaces/test", true) + if err != nil { + t.Fatal(err) + } + + // Should NOT have created a new message (no POST) + recs := env.getRecords() + for _, r := range recs { + if r.Method == "POST" { + t.Error("should reuse placeholder, not POST new message") + } + } + + // Stream should have the placeholder message name + cs := stream.(*chatStream) + if cs.messageName != "spaces/test/messages/placeholder-1" { + t.Errorf("stream should use placeholder message, got %q", cs.messageName) + } + + // Placeholder should be consumed + if _, ok := env.ch.placeholders.Load("spaces/test"); ok { + t.Error("placeholder should be deleted after reuse") + } +} + +func TestCreateStream_CreateNew(t *testing.T) { + env := testStreamEnv(t) + ctx := context.Background() + + // No placeholder stored + stream, err := env.ch.CreateStream(ctx, "spaces/test", true) + if err != nil { + t.Fatal(err) + } + + // Should have created a new message (POST) + recs := env.getRecords() + postCount := 0 + for _, r := range recs { + if r.Method == "POST" { + postCount++ + // Verify "⏳" text + if text, ok := r.Body["text"].(string); ok { + if text != "⏳" { + t.Errorf("new stream message should have ⏳ text, got %q", text) + } + } + } + } + if postCount != 1 { + t.Errorf("expected 1 POST, got %d", postCount) + } + + // Stream should have server-returned name + cs := stream.(*chatStream) + if cs.messageName != "spaces/test/messages/new-1" { + t.Errorf("stream should use server-returned name, got %q", cs.messageName) + } +} + +func TestFinalizeStream_HandoffToPlaceholders(t *testing.T) { + env := testStreamEnv(t) + ctx := context.Background() + + // Simulate active stream + cs := newChatStream(env.ch, "spaces/test/messages/stream-1") + + // Stop and finalize + cs.Stop(ctx) + env.ch.FinalizeStream(ctx, "spaces/test", cs) + + // Message should be handed off to placeholders + pName, ok := env.ch.placeholders.Load("spaces/test") + if !ok { + t.Fatal("placeholder should be stored for Send() pickup") + } + if pName.(string) != "spaces/test/messages/stream-1" { + t.Errorf("placeholder should have stream message name, got %q", pName) + } +} + +func TestToolIteration_Reuse(t *testing.T) { + env := testStreamEnv(t) + ctx := context.Background() + + // Simulate: CreateStream (reuses placeholder) → chunks → Stop + FinalizeStream + env.ch.placeholders.Store("spaces/test", "spaces/test/messages/original") + stream, err := env.ch.CreateStream(ctx, "spaces/test", true) + if err != nil { + t.Fatal(err) + } + + // Stream some text + stream.Update(ctx, "partial response") + + // Tool call: stop and finalize (hands off to placeholders) + stream.Stop(ctx) + env.ch.FinalizeStream(ctx, "spaces/test", stream) + + // Message should be in placeholders + pName, ok := env.ch.placeholders.Load("spaces/test") + if !ok { + t.Fatal("placeholder should exist after tool-phase FinalizeStream") + } + + // Next iteration: CreateStream should reuse from placeholders + stream2, err := env.ch.CreateStream(ctx, "spaces/test", false) + if err != nil { + t.Fatal(err) + } + + cs2 := stream2.(*chatStream) + if cs2.messageName != pName.(string) { + t.Errorf("restarted stream should reuse message %q, got %q", pName, cs2.messageName) + } +} + +func TestSend_InPlaceEdit(t *testing.T) { + env := testStreamEnv(t) + ctx := context.Background() + + // Simulate stream handoff: placeholder exists + env.ch.placeholders.Store("spaces/test", "spaces/test/messages/stream-1") + + msg := bus.OutboundMessage{ + ChatID: "spaces/test", + Content: "Short final response.", + Metadata: map[string]string{"peer_kind": "direct"}, + } + + err := env.ch.Send(ctx, msg) + if err != nil { + t.Fatal(err) + } + + recs := env.getRecords() + patchCount := 0 + postCount := 0 + for _, r := range recs { + switch r.Method { + case "PATCH": + patchCount++ + case "POST": + postCount++ + } + } + + if patchCount != 1 { + t.Errorf("expected 1 PATCH (in-place edit), got %d", patchCount) + } + if postCount != 0 { + t.Errorf("expected 0 POST (no new message), got %d", postCount) + } + + // Placeholder should be consumed + if _, ok := env.ch.placeholders.Load("spaces/test"); ok { + t.Error("placeholder should be consumed after Send") + } +} + +func TestSend_FallbackDelete(t *testing.T) { + env := testStreamEnv(t) + env.ch.longFormThreshold = 50 // low threshold to trigger fallback + ctx := context.Background() + + // Simulate stream handoff + env.ch.placeholders.Store("spaces/test", "spaces/test/messages/stream-1") + + // Content exceeds longFormThreshold (50 chars) + longContent := strings.Repeat("x", 100) + msg := bus.OutboundMessage{ + ChatID: "spaces/test", + Content: longContent, + Metadata: map[string]string{"peer_kind": "direct"}, + } + + err := env.ch.Send(ctx, msg) + if err != nil { + t.Fatal(err) + } + + recs := env.getRecords() + deleteCount := 0 + postCount := 0 + for _, r := range recs { + switch r.Method { + case "DELETE": + deleteCount++ + case "POST": + postCount++ + } + } + + if deleteCount != 1 { + t.Errorf("expected 1 DELETE (stream message), got %d", deleteCount) + } + if postCount == 0 { + t.Error("expected POST for new message after fallback") + } +} + +func TestSend_PlaceholderUpdate(t *testing.T) { + env := testStreamEnv(t) + ctx := context.Background() + + // Pre-store placeholder + env.ch.placeholders.Store("spaces/test", "spaces/test/messages/stream-1") + + msg := bus.OutboundMessage{ + ChatID: "spaces/test", + Content: "Running tool: search_code", + Metadata: map[string]string{ + "peer_kind": "direct", + "placeholder_update": "true", + }, + } + + err := env.ch.Send(ctx, msg) + if err != nil { + t.Fatal(err) + } + + // Should edit placeholder (PATCH) but NOT consume it + recs := env.getRecords() + patchCount := 0 + for _, r := range recs { + if r.Method == "PATCH" { + patchCount++ + } + } + if patchCount != 1 { + t.Errorf("expected 1 PATCH for placeholder update, got %d", patchCount) + } + + // Placeholder should still exist (not consumed) + if _, ok := env.ch.placeholders.Load("spaces/test"); !ok { + t.Error("placeholder should still exist after placeholder_update") + } +} diff --git a/internal/channels/googlechat/webhook.go b/internal/channels/googlechat/webhook.go new file mode 100644 index 000000000..03fb6fe54 --- /dev/null +++ b/internal/channels/googlechat/webhook.go @@ -0,0 +1,7 @@ +package googlechat + +// Phase 2: HTTP webhook handler for Google Chat push events. +// When mode="webhook", the channel will register an HTTP handler via WebhookChannel +// interface instead of using Pub/Sub pull. +// +// Implementation deferred to phase 2. diff --git a/internal/config/config_channels.go b/internal/config/config_channels.go index 206f48120..3d6883d5e 100644 --- a/internal/config/config_channels.go +++ b/internal/config/config_channels.go @@ -20,6 +20,7 @@ type ChannelsConfig struct { Zalo ZaloConfig `json:"zalo"` ZaloPersonal ZaloPersonalConfig `json:"zalo_personal"` Feishu FeishuConfig `json:"feishu"` + GoogleChat GoogleChatConfig `json:"google_chat"` PendingCompaction *PendingCompactionConfig `json:"pending_compaction,omitempty"` // global pending message compaction settings } @@ -189,6 +190,30 @@ type FeishuConfig struct { VoiceAgentID string `json:"voice_agent_id,omitempty"` } +type GoogleChatConfig struct { + Enabled bool `json:"enabled"` + ServiceAccountFile string `json:"serviceAccountFile"` + Mode string `json:"mode"` // "pubsub" (phase 1) | "webhook" (phase 2) + ProjectID string `json:"projectId"` + SubscriptionID string `json:"subscriptionId"` + PullIntervalMs int `json:"pullIntervalMs,omitempty"` + BotUser string `json:"botUser,omitempty"` + DMPolicy string `json:"dm_policy,omitempty"` // "open" (default), "allowlist", "disabled" + GroupPolicy string `json:"group_policy,omitempty"` // "open" (default), "allowlist", "disabled" + RequireMention *bool `json:"require_mention,omitempty"` // require @bot mention in groups (default true) + AllowFrom FlexibleStringSlice `json:"allow_from,omitempty"` + HistoryLimit int `json:"history_limit,omitempty"` // max pending group messages (default 50, 0=disabled) + LongFormThreshold int `json:"longFormThreshold,omitempty"` + LongFormFormat string `json:"longFormFormat,omitempty"` // "md" (default) | "txt" + MediaMaxMB int `json:"mediaMaxMb,omitempty"` + FileRetentionDays int `json:"fileRetentionDays,omitempty"` // auto-delete Drive files (0 = keep forever) + DrivePermission string `json:"drivePermission,omitempty"` // "domain" (default) | "anyone" + DriveDomain string `json:"driveDomain,omitempty"` // domain for "domain" permission (default "vnpay.vn") + DMStream *bool `json:"dm_stream,omitempty"` // enable streaming for DMs (default true) + GroupStream *bool `json:"group_stream,omitempty"` // enable streaming for groups (default false) + BlockReply *bool `json:"block_reply,omitempty"` +} + // ProvidersConfig maps provider name to its config. type ProvidersConfig struct { Anthropic ProviderConfig `json:"anthropic"` diff --git a/ui/web/src/constants/channels.ts b/ui/web/src/constants/channels.ts index ba5042856..e8437edfe 100644 --- a/ui/web/src/constants/channels.ts +++ b/ui/web/src/constants/channels.ts @@ -2,6 +2,7 @@ export const CHANNEL_TYPES = [ { value: "telegram", label: "Telegram" }, { value: "discord", label: "Discord" }, { value: "slack", label: "Slack" }, + { value: "google_chat", label: "Google Chat" }, { value: "feishu", label: "Feishu / Lark" }, { value: "zalo_oa", label: "Zalo OA" }, { value: "zalo_personal", label: "Zalo Personal" }, diff --git a/ui/web/src/pages/channels/channel-schemas.ts b/ui/web/src/pages/channels/channel-schemas.ts index f6d543b99..648a8c955 100644 --- a/ui/web/src/pages/channels/channel-schemas.ts +++ b/ui/web/src/pages/channels/channel-schemas.ts @@ -56,6 +56,9 @@ export const credentialsSchema: Record = { { key: "encrypt_key", label: "Encrypt Key", type: "password", help: "For webhook event decryption", showWhen: { key: "connection_mode", value: "webhook" } }, { key: "verification_token", label: "Verification Token", type: "password", help: "For webhook event verification", showWhen: { key: "connection_mode", value: "webhook" } }, ], + google_chat: [ + { key: "serviceAccountFile", label: "Service Account JSON Path", type: "text", required: true, placeholder: "/app/config/google-sa.json", help: "Path to Google service account key file inside the container" }, + ], zalo_oa: [ { key: "token", label: "OA Access Token", type: "password", required: true }, { key: "webhook_secret", label: "Webhook Secret", type: "password" }, @@ -126,6 +129,27 @@ export const configSchema: Record = { { key: "group_allow_from", label: "Group Allowed Users", type: "tags", help: "Separate allowlist for group senders" }, { key: "block_reply", label: "Block Reply", type: "select", options: blockReplyOptions, defaultValue: "inherit", help: "Deliver intermediate text during tool iterations" }, ], + google_chat: [ + { key: "mode", label: "Connection Mode", type: "select", options: [{ value: "pubsub", label: "Pub/Sub Pull" }, { value: "webhook", label: "Webhook (Phase 2)" }], defaultValue: "pubsub" }, + { key: "project_id", label: "GCP Project ID", type: "text", required: true, placeholder: "my-gcp-project" }, + { key: "subscription_id", label: "Pub/Sub Subscription", type: "text", required: true, placeholder: "my-chat-sub", help: "Pub/Sub subscription for Google Chat events" }, + { key: "pull_interval_ms", label: "Pull Interval (ms)", type: "number", defaultValue: 3000, help: "Pub/Sub pull frequency" }, + { key: "bot_user", label: "Bot User Email", type: "text", placeholder: "bot@project.iam.gserviceaccount.com", help: "Service account email to filter self-messages" }, + { key: "dm_policy", label: "DM Policy", type: "select", options: dmPolicyOptions, defaultValue: "open" }, + { key: "group_policy", label: "Group Policy", type: "select", options: groupPolicyOptions, defaultValue: "open" }, + { key: "require_mention", label: "Require @mention in groups", type: "boolean", defaultValue: true }, + { key: "history_limit", label: "Group History Limit", type: "number", defaultValue: 50, help: "Max pending group messages for context (0 = disabled)" }, + { key: "dm_stream", label: "DM Streaming", type: "boolean", defaultValue: true, help: "Progressively edit message as LLM generates (DMs)" }, + { key: "group_stream", label: "Group Streaming", type: "boolean", defaultValue: false, help: "Progressively edit message as LLM generates (groups)" }, + { key: "long_form_threshold", label: "Long-Form Threshold", type: "number", defaultValue: 3500, help: "Character count above which content is uploaded as a Drive file (0 = disabled)" }, + { key: "long_form_format", label: "Long-Form Format", type: "select", options: [{ value: "md", label: "Markdown (.md)" }, { value: "txt", label: "Plain text (.txt)" }], defaultValue: "md" }, + { key: "media_max_mb", label: "Max Media Size (MB)", type: "number", defaultValue: 20 }, + { key: "file_retention_days", label: "Drive File Retention (days)", type: "number", defaultValue: 7, help: "Auto-delete uploaded Drive files after N days (0 = keep forever)" }, + { key: "drive_permission", label: "Drive Permission", type: "select", options: [{ value: "domain", label: "Domain" }, { value: "anyone", label: "Anyone with link" }], defaultValue: "domain" }, + { key: "drive_domain", label: "Drive Domain", type: "text", placeholder: "example.com", help: "Domain for domain-scoped Drive sharing" }, + { key: "allow_from", label: "Allowed Users", type: "tags", help: "Google Chat user IDs or emails" }, + { key: "block_reply", label: "Block Reply", type: "select", options: blockReplyOptions, defaultValue: "inherit", help: "Deliver intermediate text during tool iterations" }, + ], zalo_oa: [ { key: "dm_policy", label: "DM Policy", type: "select", options: dmPolicyOptions, defaultValue: "pairing" }, { key: "webhook_url", label: "Webhook URL", type: "text", placeholder: "https://..." },