diff --git a/cmd/destinations/gcppubsub/main.go b/cmd/destinations/gcppubsub/main.go new file mode 100644 index 00000000..f103fb79 --- /dev/null +++ b/cmd/destinations/gcppubsub/main.go @@ -0,0 +1,300 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + "cloud.google.com/go/pubsub" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +const ( + // Default configuration for local emulator + DEFAULT_PROJECT_ID = "test-project" + DEFAULT_TOPIC = "test-topic" + DEFAULT_SUBSCRIPTION = "test-subscription" + DEFAULT_ENDPOINT = "localhost:8085" // Default emulator endpoint + + // To use real GCP, set these environment variables: + // GCP_PROJECT_ID - Your GCP project ID + // GCP_TOPIC - Your Pub/Sub topic name + // GCP_SUBSCRIPTION - Your subscription name + // GCP_CREDENTIALS - Path to service account JSON file + // GCP_ENDPOINT - Leave empty for production or set for custom endpoint +) + +func main() { + // Check for command arguments + if len(os.Args) > 1 { + switch os.Args[1] { + case "help", "-h", "--help": + printHelp() + return + case "clean", "cleanup": + if err := cleanup(); err != nil { + log.Fatalf("Cleanup error: %v", err) + } + return + } + } + + if err := run(); err != nil { + log.Fatalf("Error: %v", err) + } +} + +func printHelp() { + fmt.Println(`GCP Pub/Sub Test Consumer + +This program connects to GCP Pub/Sub and listens for messages on a subscription. +It supports both the local emulator and real GCP environments. + +USAGE: + go run cmd/destinations/gcppubsub/main.go [command] + +COMMANDS: + help Show this help message + clean Delete the topic and subscription (cleanup) + +DEFAULT CONFIGURATION (Emulator): + - Project ID: test-project + - Topic: test-topic + - Subscription: test-subscription + - Endpoint: localhost:8085 + +TO USE WITH LOCAL EMULATOR: + # Make sure the emulator is running, then: + go run cmd/destinations/gcppubsub/main.go + +TO USE WITH REAL GCP: + export GCP_PROJECT_ID="your-project-id" + export GCP_TOPIC="your-topic" + export GCP_SUBSCRIPTION="your-subscription" + export GCP_CREDENTIALS="/path/to/service-account.json" + export GCP_ENDPOINT="" # Leave empty for production + go run cmd/destinations/gcppubsub/main.go + +ENVIRONMENT VARIABLES: + GCP_PROJECT_ID - GCP project ID (default: test-project) + GCP_TOPIC - Pub/Sub topic name (default: test-topic) + GCP_SUBSCRIPTION - Subscription name (default: test-subscription) + GCP_CREDENTIALS - Path to service account JSON file (default: none, uses emulator) + GCP_ENDPOINT - Custom endpoint (default: localhost:8085) + +NOTES: + - The program will create the topic and subscription if they don't exist + - Messages are automatically acknowledged after processing + - Use CTRL+C to gracefully shut down`) +} + +func run() error { + ctx := context.Background() + + // Get configuration from environment or use defaults + projectID := getEnvOrDefault("GCP_PROJECT_ID", DEFAULT_PROJECT_ID) + topicName := getEnvOrDefault("GCP_TOPIC", DEFAULT_TOPIC) + subscriptionName := getEnvOrDefault("GCP_SUBSCRIPTION", DEFAULT_SUBSCRIPTION) + endpoint := getEnvOrDefault("GCP_ENDPOINT", DEFAULT_ENDPOINT) + credentialsPath := os.Getenv("GCP_CREDENTIALS") + + log.Printf("Configuration:") + log.Printf(" Project ID: %s", projectID) + log.Printf(" Topic: %s", topicName) + log.Printf(" Subscription: %s", subscriptionName) + log.Printf(" Endpoint: %s", endpoint) + if credentialsPath != "" { + log.Printf(" Credentials: %s", credentialsPath) + } else { + log.Printf(" Credentials: Using emulator (no auth)") + } + + // Create client options + var opts []option.ClientOption + if endpoint != "" { + // Using emulator or custom endpoint + log.Printf("Connecting to emulator/custom endpoint: %s", endpoint) + opts = append(opts, + option.WithEndpoint(endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + ) + } else if credentialsPath != "" { + // Using real GCP with service account + log.Printf("Using service account credentials from: %s", credentialsPath) + opts = append(opts, option.WithCredentialsFile(credentialsPath)) + } + + // Create client + client, err := pubsub.NewClient(ctx, projectID, opts...) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + defer client.Close() + + // Get or create topic + topic := client.Topic(topicName) + exists, err := topic.Exists(ctx) + if err != nil { + return fmt.Errorf("failed to check topic existence: %w", err) + } + if !exists { + log.Printf("Topic %s doesn't exist, creating...", topicName) + topic, err = client.CreateTopic(ctx, topicName) + if err != nil { + return fmt.Errorf("failed to create topic: %w", err) + } + log.Printf("Created topic: %s", topicName) + } else { + log.Printf("Using existing topic: %s", topicName) + } + + // Get or create subscription + sub := client.Subscription(subscriptionName) + exists, err = sub.Exists(ctx) + if err != nil { + return fmt.Errorf("failed to check subscription existence: %w", err) + } + if !exists { + log.Printf("Subscription %s doesn't exist, creating...", subscriptionName) + sub, err = client.CreateSubscription(ctx, subscriptionName, pubsub.SubscriptionConfig{ + Topic: topic, + AckDeadline: 10 * time.Second, + }) + if err != nil { + return fmt.Errorf("failed to create subscription: %w", err) + } + log.Printf("Created subscription: %s", subscriptionName) + } else { + log.Printf("Using existing subscription: %s", subscriptionName) + } + + // Set up signal handling + termChan := make(chan os.Signal, 1) + signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) + + // Start receiving messages + log.Printf("[*] Waiting for messages. To exit press CTRL+C") + + // Create a cancellable context for receiving + receiveCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // Handle messages + err = sub.Receive(receiveCtx, func(ctx context.Context, msg *pubsub.Message) { + log.Printf("[x] Received message ID: %s", msg.ID) + log.Printf(" Data: %s", string(msg.Data)) + + // Pretty print attributes + if len(msg.Attributes) > 0 { + log.Printf(" Attributes:") + for key, value := range msg.Attributes { + log.Printf(" %s: %s", key, value) + } + } + + // Try to parse as JSON for prettier output + var jsonData interface{} + if err := json.Unmarshal(msg.Data, &jsonData); err == nil { + prettyJSON, _ := json.MarshalIndent(jsonData, " ", " ") + log.Printf(" JSON Data:\n %s", string(prettyJSON)) + } + + // Acknowledge the message + msg.Ack() + }) + + if err != nil { + return fmt.Errorf("receive error: %w", err) + } + + // Wait for termination signal + <-termChan + log.Println("Shutting down...") + cancel() + + return nil +} + +func getEnvOrDefault(key, defaultValue string) string { + if value := os.Getenv(key); value != "" { + return value + } + return defaultValue +} + +func cleanup() error { + ctx := context.Background() + + // Get configuration from environment or use defaults + projectID := getEnvOrDefault("GCP_PROJECT_ID", DEFAULT_PROJECT_ID) + topicName := getEnvOrDefault("GCP_TOPIC", DEFAULT_TOPIC) + subscriptionName := getEnvOrDefault("GCP_SUBSCRIPTION", DEFAULT_SUBSCRIPTION) + endpoint := getEnvOrDefault("GCP_ENDPOINT", DEFAULT_ENDPOINT) + credentialsPath := os.Getenv("GCP_CREDENTIALS") + + log.Printf("Cleanup Configuration:") + log.Printf(" Project ID: %s", projectID) + log.Printf(" Topic: %s", topicName) + log.Printf(" Subscription: %s", subscriptionName) + log.Printf(" Endpoint: %s", endpoint) + + // Create client options + var opts []option.ClientOption + if endpoint != "" { + opts = append(opts, + option.WithEndpoint(endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + ) + } else if credentialsPath != "" { + opts = append(opts, option.WithCredentialsFile(credentialsPath)) + } + + // Create client + client, err := pubsub.NewClient(ctx, projectID, opts...) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + defer client.Close() + + // Delete subscription first + sub := client.Subscription(subscriptionName) + exists, err := sub.Exists(ctx) + if err != nil { + log.Printf("Warning: Failed to check subscription existence: %v", err) + } else if exists { + if err := sub.Delete(ctx); err != nil { + log.Printf("Warning: Failed to delete subscription %s: %v", subscriptionName, err) + } else { + log.Printf("Deleted subscription: %s", subscriptionName) + } + } else { + log.Printf("Subscription %s doesn't exist", subscriptionName) + } + + // Delete topic + topic := client.Topic(topicName) + exists, err = topic.Exists(ctx) + if err != nil { + log.Printf("Warning: Failed to check topic existence: %v", err) + } else if exists { + if err := topic.Delete(ctx); err != nil { + log.Printf("Warning: Failed to delete topic %s: %v", topicName, err) + } else { + log.Printf("Deleted topic: %s", topicName) + } + } else { + log.Printf("Topic %s doesn't exist", topicName) + } + + log.Println("Cleanup completed") + return nil +} diff --git a/cmd/e2e/api_test.go b/cmd/e2e/api_test.go index ba98107c..1b75016e 100644 --- a/cmd/e2e/api_test.go +++ b/cmd/e2e/api_test.go @@ -1004,8 +1004,8 @@ func (suite *basicSuite) TestDestinationTypesAPI() { "body": map[string]interface{}{ "type": "array", "items": providerSchema, - "minItems": 7, - "maxItems": 7, + "minItems": 8, + "maxItems": 8, "uniqueItems": true, }, }, diff --git a/contributing/destinations/gcp_pubsub/configuration.md b/contributing/destinations/gcp_pubsub/configuration.md new file mode 100644 index 00000000..12427787 --- /dev/null +++ b/contributing/destinations/gcp_pubsub/configuration.md @@ -0,0 +1,118 @@ +# GCP Pub/Sub Destination Configuration + +This document outlines implementation decisions and potential configuration options for the GCP Pub/Sub destination. + +## Current Implementation Decisions + +### Message Structure +- **Body**: Event data is JSON-marshaled +- **Attributes**: Outpost metadata is passed as Pub/Sub attributes (string key-value pairs) + +### Configuration +Currently minimal - only supports: +```golang +type Config struct { + ProjectID string + Topic string + Endpoint string // For emulator support +} +``` + +### Authentication +- Only supports service account JSON credentials +- No support for Application Default Credentials (ADC) +- No support for Workload Identity + +### Publishing Behavior +- **Synchronous**: Each publish waits for acknowledgment (no fire-and-forget) +- **No batching**: Each event published individually +- **Topic validation**: Commented out for performance (let it fail on publish instead) + +## Potential Configuration Options + +### Message Configuration +GCP Pub/Sub supports these message fields we're NOT using: + +```golang +type Message struct { + OrderingKey string // For message ordering within same key + // Attributes map[string]string - We use this for metadata +} +``` + +**Ordering Key**: Could add `ordering_key_template` config similar to Kinesis partition key: +- Enable FIFO delivery for messages with same ordering key +- Requires enabling message ordering on the topic +- Trade-off: Can reduce throughput + +### Publisher Configuration +The Go SDK supports these publisher settings we could expose: + +```golang +topic.PublishSettings = pubsub.PublishSettings{ + // Batching + DelayThreshold: 100 * time.Millisecond, // Max time to wait before sending batch + CountThreshold: 100, // Max messages in batch + ByteThreshold: 1e6, // Max bytes in batch + + // Concurrency + NumGoroutines: 10, // Parallel publish goroutines + + // Flow control + FlowControlSettings: pubsub.FlowControlSettings{ + MaxOutstandingMessages: 1000, + MaxOutstandingBytes: 1e9, + LimitExceededBehavior: pubsub.FlowControlBlock, // or FlowControlIgnore + }, + + // Timeout + Timeout: 60 * time.Second, +} +``` + +Potential configs: +- `enable_batching`: Toggle batch publishing +- `batch_size`: Max messages per batch +- `batch_delay_ms`: Max delay before sending partial batch +- `timeout_seconds`: Publish timeout +- `max_concurrent_publishes`: Parallelism limit + +### Topic Configuration +- `enable_message_ordering`: Requires ordering_key support +- `retention_duration`: How long Pub/Sub retains unacknowledged messages +- `message_storage_policy`: Region restrictions for data residency + +### Connection Options +- `enable_compression`: gRPC compression +- `keepalive_time_seconds`: Connection keepalive interval +- `max_connection_idle_seconds`: When to close idle connections + +## Design Questions + +1. **Should we support ordering keys?** + - Pro: Enables FIFO delivery for related events + - Con: Reduces throughput, requires topic configuration + +2. **Should we enable batching by default?** + - Pro: Better throughput, lower costs + - Con: Adds latency, complicates error handling + +3. **Should we validate topic existence?** + - Currently disabled for performance + - Could make it configurable: `skip_topic_validation` + +4. **Should we support Application Default Credentials?** + - Would work better in GKE/Cloud Run environments + - Less configuration needed for GCP-hosted Outpost + +5. **Should we expose publisher timeout?** + - Current: Uses default (60s) + - Could add `publish_timeout_seconds` + +## Not Implemented (Intentionally) + +1. **Push subscriptions**: Outpost is a publisher, not a subscriber +2. **Topic management**: No auto-creation of topics +3. **Schema validation**: Could support Pub/Sub schemas in future +4. **Dead letter topics**: Publisher-side concern only +5. **Encryption keys**: Uses Google-managed encryption \ No newline at end of file diff --git a/contributing/destinations/gcp_pubsub/test-destination.md b/contributing/destinations/gcp_pubsub/test-destination.md new file mode 100644 index 00000000..3c64dd03 --- /dev/null +++ b/contributing/destinations/gcp_pubsub/test-destination.md @@ -0,0 +1,36 @@ +```sh +# 0. (optional) Create a GCP project (or use existing one) +gcloud projects create outpost-pubsub-test --name="Outpost PubSub Test" +gcloud config set project outpost-pubsub-test +gcloud services enable pubsub.googleapis.com + +# 1. Create a topic +gcloud pubsub topics create outpost-destination-test + +# 2. Create a subscription (to verify messages are received) +gcloud pubsub subscriptions create outpost-destination-test-sub --topic=outpost-destination-test + +# 3. Create a service account for Outpost +gcloud iam service-accounts create outpost-pubsub-destination-sa \ + --display-name="Outpost PubSub Service Account" + +# 4. Grant publisher role to service account +gcloud projects add-iam-policy-binding outpost-pubsub-test \ + --member="serviceAccount:outpost-pubsub-destination-sa@outpost-pubsub-test.iam.gserviceaccount.com" \ + --role="roles/pubsub.publisher" + +# 5. Create and download service account key +gcloud iam service-accounts keys create ./gcppubsub-credentials.json \ + --iam-account=outpost-pubsub-destination-sa@outpost-pubsub-test.iam.gserviceaccount.com + +# 6. Get the formatted service account JSON content +cat ./gcppubsub-credentials.json | jq -c '.' + +For the Outpost destination config: +- project_id: "outpost-pubsub-test" +- topic: "outpost-destination-test" +- service_account_json: (paste the JSON content from step 6) + +To verify messages: +gcloud pubsub subscriptions pull outpost-destination-test-sub --auto-ack +``` diff --git a/internal/destregistry/metadata/providers/aws_s3/metadata.json b/internal/destregistry/metadata/providers/aws_s3/metadata.json index 653d6d5c..79e0a2c1 100644 --- a/internal/destregistry/metadata/providers/aws_s3/metadata.json +++ b/internal/destregistry/metadata/providers/aws_s3/metadata.json @@ -62,5 +62,5 @@ "sensitive": true } ], - "icon": "" + "icon": "Icon-Architecture/16/Arch_Amazon-Simple-Storage-Service_16" } \ No newline at end of file diff --git a/internal/destregistry/metadata/providers/gcp_pubsub/instructions.md b/internal/destregistry/metadata/providers/gcp_pubsub/instructions.md new file mode 100644 index 00000000..5526c5a2 --- /dev/null +++ b/internal/destregistry/metadata/providers/gcp_pubsub/instructions.md @@ -0,0 +1,335 @@ +# GCP Pub/Sub Destination + +This guide provides comprehensive instructions for setting up a GCP Pub/Sub destination using the gcloud CLI. + +## Prerequisites + +- **gcloud CLI**: Install the [Google Cloud CLI](https://cloud.google.com/sdk/docs/install) +- **GCP Account**: A Google Cloud Platform account +- **Permissions**: You need sufficient permissions to create projects (if creating new), topics, service accounts, and assign IAM roles + +## Authentication + +Before proceeding with the setup, you must authenticate the gcloud CLI with your Google account. + +### Authenticate with User Credentials + +```bash +# Login to your Google account +gcloud auth login + +# This will open a browser window for authentication +# Follow the prompts to complete the login process +``` + +### Set Application Default Credentials (Recommended) + +For running applications that use GCP services, also set up Application Default Credentials: + +```bash +# Set application default credentials +gcloud auth application-default login + +# This ensures your local development environment can authenticate +# with GCP services using your user account +``` + +### Verify Authentication + +```bash +# Check currently authenticated account +gcloud auth list + +# Verify active configuration +gcloud config list +``` + +**Note**: The `gcloud auth login` command authenticates your user account for running gcloud CLI commands. The `gcloud auth application-default login` command sets up credentials that applications (including Outpost during local testing) can use to authenticate with GCP services. + +## Setup Instructions + +### 1. Create a GCP Project (Optional) + +If you don't have an existing GCP project, you can create one using the CLI: + +```bash +# Set your desired project ID (must be globally unique) +export PROJECT_ID="outpost-test-$(date +%s)" + +# Create a new project +gcloud projects create $PROJECT_ID --name="Outpost Project" + +# List your projects to verify +# This can take a few moments to propagate +gcloud projects list + +# Link a billing account (required for Pub/Sub) +# First, list available billing accounts +gcloud billing accounts list + +# Set your billing account ID from the previous command +export BILLING_ACCOUNT_ID="your-billing-account-id" + +# Link billing account to project (replace BILLING_ACCOUNT_ID) +gcloud billing projects link $PROJECT_ID \ + --billing-account=$BILLING_ACCOUNT_ID +``` + +**Note**: If you already have a project, skip to step 2. + +### 2. Set Your GCP Project + +Set the project ID as a variable and configure gcloud to use it: + +```bash +# Set your GCP project ID if you haven't already +export PROJECT_ID="your-project-id" + +# Configure gcloud to use the project +gcloud config set project $PROJECT_ID + +# Set the quota project for Application Default Credentials +# This prevents quota warnings when using the project +gcloud auth application-default set-quota-project $PROJECT_ID +``` + +### 3. Enable the Pub/Sub API + +Enable the Pub/Sub API for your project: + +```bash +gcloud services enable pubsub.googleapis.com +``` + +### 4. Create a Pub/Sub Topic + +Create a new Pub/Sub topic where events will be published: + +```bash +# Set your topic name +export TOPIC_NAME="outpost-events" + +# Create the topic +gcloud pubsub topics create $TOPIC_NAME + +# Verify the topic was created +gcloud pubsub topics list +``` + +### 5. Create a Service Account + +Create a dedicated service account for Outpost to use when publishing to Pub/Sub: + +```bash +# Set service account name +export SERVICE_ACCOUNT_NAME="outpost-pubsub-publisher" + +# Create the service account +gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME \ + --display-name="Outpost Pub/Sub Publisher" \ + --description="Service account for Outpost to publish messages to Pub/Sub" + +# Verify the service account was created +gcloud iam service-accounts list +``` + +### 6. Grant Pub/Sub Publisher Permissions + +Assign the Pub/Sub Publisher role to the service account: + +```bash +# Grant Pub/Sub Publisher role at the project level +gcloud projects add-iam-policy-binding $PROJECT_ID \ + --member="serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" \ + --role="roles/pubsub.publisher" + +# Alternatively, grant permissions only for the specific topic (more restrictive) +gcloud pubsub topics add-iam-policy-binding $TOPIC_NAME \ + --member="serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" \ + --role="roles/pubsub.publisher" +``` + +### 7. Create and Download Service Account Keys + +Generate a JSON key file for the service account: + +```bash +# Create and download the service account key +gcloud iam service-accounts keys create ~/outpost-pubsub-key.json \ + --iam-account="${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" + +# Verify the key was created +ls -la ~/outpost-pubsub-key.json +``` + +**Important**: Store this key file securely. It provides authentication credentials for your service account. + +## Configuration + +When configuring your Outpost destination, you'll need: + +1. **Project ID**: Your GCP project ID (e.g., `my-gcp-project`) +2. **Topic Name**: The name of your Pub/Sub topic (e.g., `outpost-events`) +3. **Service Account Credentials**: The contents of the JSON key file or the path to it + +### Example Configuration + +```json +{ + "type": "gcp_pubsub", + "config": { + "project_id": "your-project-id", + "topic_name": "outpost-events", + "credentials_json": "contents-of-key-file" + } +} +``` + +## Testing the Integration + +### Create a Test Subscription + +Create a subscription to verify messages are being published: + +```bash +# Create a test subscription +export SUBSCRIPTION_NAME="outpost-events-test" + +gcloud pubsub subscriptions create $SUBSCRIPTION_NAME \ + --topic=$TOPIC_NAME \ + --ack-deadline=60 + +# Pull messages from the subscription to test +gcloud pubsub subscriptions pull $SUBSCRIPTION_NAME \ + --auto-ack \ + --limit=10 +``` + +### Publish a Test Message + +Test publishing directly to verify permissions: + +```bash +# Publish a test message +gcloud pubsub topics publish $TOPIC_NAME \ + --message="Test message from Outpost setup" + +# Pull the message to verify +gcloud pubsub subscriptions pull $SUBSCRIPTION_NAME \ + --auto-ack \ + --limit=1 +``` + +### Verify Service Account Permissions + +Check the IAM policy bindings: + +```bash +# View project-level IAM policy for the service account +gcloud projects get-iam-policy $PROJECT_ID \ + --flatten="bindings[].members" \ + --filter="bindings.members:serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" + +# View topic-level IAM policy +gcloud pubsub topics get-iam-policy $TOPIC_NAME +``` + +## Troubleshooting + +### Permission Denied Errors + +If you encounter permission errors: + +1. Verify the service account has the correct role: + ```bash + gcloud projects get-iam-policy $PROJECT_ID \ + --flatten="bindings[].members" \ + --filter="bindings.members:serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" + ``` + +2. Ensure the Pub/Sub API is enabled: + ```bash + gcloud services list --enabled | grep pubsub + ``` + +3. Check that the credentials file path is correct: + ```bash + cat $GOOGLE_APPLICATION_CREDENTIALS + ``` + +### Topic Not Found Errors + +Verify the topic exists and is in the correct project: + +```bash +gcloud pubsub topics describe $TOPIC_NAME +``` + +### Authentication Issues + +Validate your service account key: + +```bash +# Test authentication with the service account +gcloud auth activate-service-account \ + --key-file=$GOOGLE_APPLICATION_CREDENTIALS + +# List topics to verify access +gcloud pubsub topics list +``` + +### Message Delivery Issues + +1. Check topic configuration: + ```bash + gcloud pubsub topics describe $TOPIC_NAME + ``` + +2. Monitor message metrics: + ```bash + gcloud pubsub topics list-subscriptions $TOPIC_NAME + ``` + +3. Review Cloud Logging for errors: + ```bash + gcloud logging read "resource.type=pubsub_topic AND resource.labels.topic_id=$TOPIC_NAME" \ + --limit=50 \ + --format=json + ``` + +## Cleanup (Optional) + +To remove the resources created during setup: + +```bash +# Delete the subscription (if created for testing) +gcloud pubsub subscriptions delete $SUBSCRIPTION_NAME + +# Delete the service account key +gcloud iam service-accounts keys list \ + --iam-account="${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" +# Note the KEY_ID and delete it +gcloud iam service-accounts keys delete KEY_ID \ + --iam-account="${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" + +# Remove IAM policy binding +gcloud projects remove-iam-policy-binding $PROJECT_ID \ + --member="serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" \ + --role="roles/pubsub.publisher" + +# Delete the service account +gcloud iam service-accounts delete \ + "${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com" + +# Delete the topic +gcloud pubsub topics delete $TOPIC_NAME +``` + +## Additional Resources + +- [GCP Pub/Sub Documentation](https://cloud.google.com/pubsub/docs) +- [gcloud pubsub Command Reference](https://cloud.google.com/sdk/gcloud/reference/pubsub) +- [Service Account Best Practices](https://cloud.google.com/iam/docs/best-practices-service-accounts) +- [Pub/Sub Authentication Guide](https://cloud.google.com/pubsub/docs/authentication) +- [IAM Roles for Pub/Sub](https://cloud.google.com/pubsub/docs/access-control) +- [Pub/Sub Monitoring](https://cloud.google.com/pubsub/docs/monitoring) \ No newline at end of file diff --git a/internal/destregistry/metadata/providers/gcp_pubsub/metadata.json b/internal/destregistry/metadata/providers/gcp_pubsub/metadata.json new file mode 100644 index 00000000..be412aaf --- /dev/null +++ b/internal/destregistry/metadata/providers/gcp_pubsub/metadata.json @@ -0,0 +1,40 @@ +{ + "type": "gcp_pubsub", + "config_fields": [ + { + "key": "project_id", + "type": "text", + "label": "Project ID", + "description": "The GCP project ID", + "required": true + }, + { + "key": "topic", + "type": "text", + "label": "Topic", + "description": "The Pub/Sub topic", + "required": true + }, + { + "key": "endpoint", + "type": "text", + "label": "Endpoint", + "description": "Custom endpoint URL (e.g., localhost:8085 for emulator)", + "required": false + } + ], + "credential_fields": [ + { + "key": "service_account_json", + "type": "text", + "label": "Service Account JSON", + "description": "Service account key JSON", + "required": true, + "sensitive": true + } + ], + "label": "GCP Pub/Sub", + "link": "https://cloud.google.com/pubsub", + "description": "Send events to Google Cloud Pub/Sub topics using service account authentication.", + "icon": "Icon_24px_Pub-Sub_Color" +} \ No newline at end of file diff --git a/internal/destregistry/providers/default.go b/internal/destregistry/providers/default.go index 433a5e90..9552bf5b 100644 --- a/internal/destregistry/providers/default.go +++ b/internal/destregistry/providers/default.go @@ -6,6 +6,7 @@ import ( "github.com/hookdeck/outpost/internal/destregistry/providers/destawss3" "github.com/hookdeck/outpost/internal/destregistry/providers/destawssqs" "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" + "github.com/hookdeck/outpost/internal/destregistry/providers/destgcppubsub" "github.com/hookdeck/outpost/internal/destregistry/providers/desthookdeck" "github.com/hookdeck/outpost/internal/destregistry/providers/destrabbitmq" "github.com/hookdeck/outpost/internal/destregistry/providers/destwebhook" @@ -101,6 +102,12 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina } registry.RegisterProvider("aws_s3", awsS3) + gcpPubSub, err := destgcppubsub.New(loader, basePublisherOpts) + if err != nil { + return err + } + registry.RegisterProvider("gcp_pubsub", gcpPubSub) + azureServiceBus, err := destazureservicebus.New(loader, basePublisherOpts) if err != nil { return err diff --git a/internal/destregistry/providers/destgcppubsub/destgcppubsub.go b/internal/destregistry/providers/destgcppubsub/destgcppubsub.go new file mode 100644 index 00000000..5dd4c81d --- /dev/null +++ b/internal/destregistry/providers/destgcppubsub/destgcppubsub.go @@ -0,0 +1,204 @@ +package destgcppubsub + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "cloud.google.com/go/pubsub" + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/metadata" + "github.com/hookdeck/outpost/internal/models" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +type GCPPubSubDestination struct { + *destregistry.BaseProvider +} + +type GCPPubSubDestinationConfig struct { + ProjectID string + Topic string + Endpoint string // For emulator support +} + +type GCPPubSubDestinationCredentials struct { + ServiceAccountJSON string +} + +var _ destregistry.Provider = (*GCPPubSubDestination)(nil) + +func New(loader metadata.MetadataLoader, basePublisherOpts []destregistry.BasePublisherOption) (*GCPPubSubDestination, error) { + base, err := destregistry.NewBaseProvider(loader, "gcp_pubsub", basePublisherOpts...) + if err != nil { + return nil, err + } + + return &GCPPubSubDestination{ + BaseProvider: base, + }, nil +} + +func (d *GCPPubSubDestination) Validate(ctx context.Context, destination *models.Destination) error { + _, _, err := d.resolveMetadata(ctx, destination) + if err != nil { + return err + } + return nil +} + +func (d *GCPPubSubDestination) CreatePublisher(ctx context.Context, destination *models.Destination) (destregistry.Publisher, error) { + cfg, creds, err := d.resolveMetadata(ctx, destination) + if err != nil { + return nil, err + } + + // Create Pub/Sub client options + var opts []option.ClientOption + + // Check for emulator endpoint (for testing) + if cfg.Endpoint != "" { + opts = append(opts, + option.WithEndpoint(cfg.Endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + ) + } else if creds.ServiceAccountJSON != "" { + // Use service account credentials for production + opts = append(opts, option.WithCredentialsJSON([]byte(creds.ServiceAccountJSON))) + } + + // Create the client + client, err := pubsub.NewClient(ctx, cfg.ProjectID, opts...) + if err != nil { + return nil, fmt.Errorf("failed to create Pub/Sub client: %w", err) + } + + // Get the topic + topic := client.Topic(cfg.Topic) + + return &GCPPubSubPublisher{ + BasePublisher: d.BaseProvider.NewPublisher(), + client: client, + topic: topic, + projectID: cfg.ProjectID, + }, nil +} + +func (d *GCPPubSubDestination) resolveMetadata(ctx context.Context, destination *models.Destination) (*GCPPubSubDestinationConfig, *GCPPubSubDestinationCredentials, error) { + if err := d.BaseProvider.Validate(ctx, destination); err != nil { + return nil, nil, err + } + + return &GCPPubSubDestinationConfig{ + ProjectID: destination.Config["project_id"], + Topic: destination.Config["topic"], + Endpoint: destination.Config["endpoint"], // For testing + }, &GCPPubSubDestinationCredentials{ + ServiceAccountJSON: destination.Credentials["service_account_json"], + }, nil +} + +func (d *GCPPubSubDestination) ComputeTarget(destination *models.Destination) destregistry.DestinationTarget { + projectID := destination.Config["project_id"] + topic := destination.Config["topic"] + + return destregistry.DestinationTarget{ + Target: fmt.Sprintf("%s/%s", projectID, topic), + TargetURL: fmt.Sprintf("https://console.cloud.google.com/cloudpubsub/topic/detail/%s?project=%s", topic, projectID), + } +} + +func (d *GCPPubSubDestination) Preprocess(newDestination *models.Destination, originalDestination *models.Destination, opts *destregistry.PreprocessDestinationOpts) error { + // No preprocessing needed for GCP Pub/Sub + return nil +} + +type GCPPubSubPublisher struct { + *destregistry.BasePublisher + + client *pubsub.Client + topic *pubsub.Topic + projectID string +} + +func (pub *GCPPubSubPublisher) Format(ctx context.Context, event *models.Event) (*pubsub.Message, error) { + // Marshal event data to JSON + dataBytes, err := json.Marshal(event.Data) + if err != nil { + return nil, fmt.Errorf("failed to marshal event data: %w", err) + } + + // Create metadata + metadata := pub.BasePublisher.MakeMetadata(event, time.Now()) + + // Convert metadata to Pub/Sub attributes (must be strings) + attributes := make(map[string]string) + for k, v := range metadata { + attributes[k] = v + } + + return &pubsub.Message{ + Data: dataBytes, + Attributes: attributes, + }, nil +} + +func (pub *GCPPubSubPublisher) Publish(ctx context.Context, event *models.Event) (*destregistry.Delivery, error) { + if err := pub.BasePublisher.StartPublish(); err != nil { + return nil, err + } + defer pub.BasePublisher.FinishPublish() + + // Format the message + msg, err := pub.Format(ctx, event) + if err != nil { + return nil, err + } + + // Publish the message + result := pub.topic.Publish(ctx, msg) + + // Wait for the publish to complete + messageID, err := result.Get(ctx) + if err != nil { + return &destregistry.Delivery{ + Status: "failed", + Code: "ERR", + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, "gcp_pubsub", map[string]interface{}{ + "error": "publish_failed", + "project": pub.projectID, + "topic": pub.topic.ID(), + "message": err.Error(), + }) + } + + return &destregistry.Delivery{ + Status: "success", + Code: "OK", + Response: map[string]interface{}{ + "message_id": messageID, + "topic": pub.topic.ID(), + "project": pub.projectID, + }, + }, nil +} + +func (pub *GCPPubSubPublisher) Close() error { + pub.BasePublisher.StartClose() + + if pub.topic != nil { + pub.topic.Stop() + } + if pub.client != nil { + return pub.client.Close() + } + + return nil +} diff --git a/internal/destregistry/providers/destgcppubsub/destgcppubsub_publish_test.go b/internal/destregistry/providers/destgcppubsub/destgcppubsub_publish_test.go new file mode 100644 index 00000000..465b4e73 --- /dev/null +++ b/internal/destregistry/providers/destgcppubsub/destgcppubsub_publish_test.go @@ -0,0 +1,194 @@ +package destgcppubsub_test + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "cloud.google.com/go/pubsub" + "github.com/hookdeck/outpost/internal/destregistry/providers/destgcppubsub" + testsuite "github.com/hookdeck/outpost/internal/destregistry/testing" + "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/mqs" + "github.com/hookdeck/outpost/internal/util/testinfra" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// GCPPubSubConsumer implements testsuite.MessageConsumer +type GCPPubSubConsumer struct { + client *pubsub.Client + subscription *pubsub.Subscription + messages chan testsuite.Message + done chan struct{} + ctx context.Context + cancel context.CancelFunc +} + +func NewGCPPubSubConsumer(projectID, subscriptionID, endpoint string) (*GCPPubSubConsumer, error) { + ctx, cancel := context.WithCancel(context.Background()) + + // Create client with emulator settings + client, err := pubsub.NewClient(ctx, projectID, + option.WithEndpoint(endpoint), + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + ) + if err != nil { + cancel() + return nil, err + } + + subscription := client.Subscription(subscriptionID) + + consumer := &GCPPubSubConsumer{ + client: client, + subscription: subscription, + messages: make(chan testsuite.Message, 100), + done: make(chan struct{}), + ctx: ctx, + cancel: cancel, + } + + // Start consuming messages + go consumer.consume() + + return consumer, nil +} + +func (c *GCPPubSubConsumer) consume() { + err := c.subscription.Receive(c.ctx, func(ctx context.Context, msg *pubsub.Message) { + // Convert attributes to metadata + metadata := make(map[string]string) + for k, v := range msg.Attributes { + metadata[k] = v + } + + // Send to channel + select { + case c.messages <- testsuite.Message{ + Data: msg.Data, + Metadata: metadata, + Raw: msg, + }: + case <-c.done: + return + } + + // Acknowledge the message + msg.Ack() + }) + + if err != nil && err != context.Canceled { + fmt.Printf("Error in GCP Pub/Sub consume loop: %v\n", err) + } +} + +func (c *GCPPubSubConsumer) Consume() <-chan testsuite.Message { + return c.messages +} + +func (c *GCPPubSubConsumer) Close() error { + close(c.done) + c.cancel() + if c.client != nil { + return c.client.Close() + } + return nil +} + +// GCPPubSubAsserter implements testsuite.MessageAsserter +type GCPPubSubAsserter struct{} + +func (a *GCPPubSubAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Message, event models.Event) { + // Assert the raw message is a Pub/Sub message + pubsubMsg, ok := msg.Raw.(*pubsub.Message) + assert.True(t, ok, "raw message should be *pubsub.Message") + + // Verify event data + expectedData, _ := json.Marshal(event.Data) + assert.JSONEq(t, string(expectedData), string(msg.Data), "event data should match") + + // Verify system metadata + metadata := msg.Metadata + assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present") + assert.Equal(t, event.ID, metadata["event-id"], "event-id should match") + assert.Equal(t, event.Topic, metadata["topic"], "topic should match") + + // Verify custom metadata + for k, v := range event.Metadata { + assert.Equal(t, v, metadata[k], "metadata key %s should match expected value", k) + } + + // Verify Pub/Sub specific attributes + assert.NotNil(t, pubsubMsg.Attributes, "attributes should be set") +} + +// GCPPubSubPublishSuite uses the shared test suite +type GCPPubSubPublishSuite struct { + testsuite.PublisherSuite + consumer *GCPPubSubConsumer + config mqs.QueueConfig +} + +func (s *GCPPubSubPublishSuite) SetupSuite() { + t := s.T() + t.Cleanup(testinfra.Start(t)) + + // Set up GCP Pub/Sub test infrastructure + mqConfig := testinfra.NewMQGCPConfig(t, nil) + s.config = mqConfig + + // Get emulator endpoint + endpoint := testinfra.EnsureGCP() + + provider, err := destgcppubsub.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + dest := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("gcp_pubsub"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "project_id": mqConfig.GCPPubSub.ProjectID, + "topic": mqConfig.GCPPubSub.TopicID, + "endpoint": endpoint, // For emulator + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "service_account_json": `{"type":"service_account","project_id":"test-project"}`, // Dummy JSON for emulator + }), + ) + + // Create consumer + consumer, err := NewGCPPubSubConsumer( + mqConfig.GCPPubSub.ProjectID, + mqConfig.GCPPubSub.SubscriptionID, + endpoint, + ) + require.NoError(t, err) + s.consumer = consumer + + s.InitSuite(testsuite.Config{ + Provider: provider, + Dest: &dest, + Consumer: consumer, + Asserter: &GCPPubSubAsserter{}, + }) +} + +func (s *GCPPubSubPublishSuite) TearDownSuite() { + if s.consumer != nil { + s.consumer.Close() + } +} + +func TestGCPPubSubPublishIntegration(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + suite.Run(t, new(GCPPubSubPublishSuite)) +} diff --git a/internal/destregistry/providers/destgcppubsub/destgcppubsub_test.go b/internal/destregistry/providers/destgcppubsub/destgcppubsub_test.go new file mode 100644 index 00000000..8df308a9 --- /dev/null +++ b/internal/destregistry/providers/destgcppubsub/destgcppubsub_test.go @@ -0,0 +1,213 @@ +package destgcppubsub_test + +import ( + "context" + "strings" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/providers/destgcppubsub" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestComputeTarget(t *testing.T) { + provider, err := destgcppubsub.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + tests := []struct { + name string + config map[string]string + expectedTarget string + expectedURL string + }{ + { + name: "with valid project and topic", + config: map[string]string{ + "project_id": "my-project", + "topic": "my-topic", + }, + expectedTarget: "my-project/my-topic", + expectedURL: "https://console.cloud.google.com/cloudpubsub/topic/detail/my-topic?project=my-project", + }, + { + name: "with different project and topic", + config: map[string]string{ + "project_id": "test-project-123", + "topic": "events-topic", + }, + expectedTarget: "test-project-123/events-topic", + expectedURL: "https://console.cloud.google.com/cloudpubsub/topic/detail/events-topic?project=test-project-123", + }, + { + name: "with missing project_id", + config: map[string]string{ + "topic": "my-topic", + }, + expectedTarget: "/my-topic", + expectedURL: "https://console.cloud.google.com/cloudpubsub/topic/detail/my-topic?project=", + }, + { + name: "with missing topic", + config: map[string]string{ + "project_id": "my-project", + }, + expectedTarget: "my-project/", + expectedURL: "https://console.cloud.google.com/cloudpubsub/topic/detail/?project=my-project", + }, + { + name: "with missing both", + config: map[string]string{}, + expectedTarget: "/", + expectedURL: "https://console.cloud.google.com/cloudpubsub/topic/detail/?project=", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("gcp_pubsub"), + testutil.DestinationFactory.WithConfig(tt.config), + ) + result := provider.ComputeTarget(&destination) + assert.Equal(t, tt.expectedTarget, result.Target) + assert.Equal(t, tt.expectedURL, result.TargetURL) + }) + } +} + +func TestValidate(t *testing.T) { + provider, err := destgcppubsub.New(testutil.Registry.MetadataLoader(), nil) + require.NoError(t, err) + + tests := []struct { + name string + config map[string]string + credentials map[string]string + wantErr bool + errContains string + }{ + { + name: "valid destination", + config: map[string]string{ + "project_id": "my-project", + "topic": "my-topic", + }, + credentials: map[string]string{ + "service_account_json": `{"type":"service_account","project_id":"my-project"}`, + }, + wantErr: false, + }, + { + name: "valid destination with endpoint (emulator)", + config: map[string]string{ + "project_id": "my-project", + "topic": "my-topic", + "endpoint": "http://localhost:8085", + }, + credentials: map[string]string{ + "service_account_json": `{"type":"service_account"}`, + }, + wantErr: false, + }, + { + name: "missing project_id", + config: map[string]string{ + "topic": "my-topic", + }, + credentials: map[string]string{ + "service_account_json": `{"type":"service_account"}`, + }, + wantErr: true, + errContains: "config.project_id", + }, + { + name: "missing topic", + config: map[string]string{ + "project_id": "my-project", + }, + credentials: map[string]string{ + "service_account_json": `{"type":"service_account"}`, + }, + wantErr: true, + errContains: "config.topic", + }, + { + name: "missing service_account_json", + config: map[string]string{ + "project_id": "my-project", + "topic": "my-topic", + }, + credentials: map[string]string{}, + wantErr: true, + errContains: "credentials.service_account_json", + }, + { + name: "empty service_account_json", + config: map[string]string{ + "project_id": "my-project", + "topic": "my-topic", + }, + credentials: map[string]string{ + "service_account_json": "", + }, + wantErr: true, + errContains: "credentials.service_account_json", + }, + { + name: "invalid JSON in service_account_json", + config: map[string]string{ + "project_id": "my-project", + "topic": "my-topic", + }, + credentials: map[string]string{ + "service_account_json": "not-valid-json", + }, + wantErr: false, // We don't validate JSON structure anymore, Google SDK will handle it + }, + { + name: "valid with all optional fields", + config: map[string]string{ + "project_id": "my-project", + "topic": "my-topic", + "endpoint": "https://pubsub.googleapis.com", + }, + credentials: map[string]string{ + "service_account_json": `{"type":"service_account","project_id":"my-project"}`, + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("gcp_pubsub"), + testutil.DestinationFactory.WithConfig(tt.config), + testutil.DestinationFactory.WithCredentials(tt.credentials), + ) + ctx := context.Background() + err := provider.Validate(ctx, &destination) + if tt.wantErr { + require.Error(t, err) + if tt.errContains != "" { + var validationErr *destregistry.ErrDestinationValidation + require.ErrorAs(t, err, &validationErr) + require.NotEmpty(t, validationErr.Errors) + // Check that at least one error contains the expected field + found := false + for _, e := range validationErr.Errors { + if strings.Contains(e.Field, tt.errContains) { + found = true + break + } + } + assert.True(t, found, "Expected error field containing %q, but got %+v", tt.errContains, validationErr.Errors) + } + } else { + require.NoError(t, err) + } + }) + } +}