diff --git a/go.mod b/go.mod index 3a4d6c9..89c13fa 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/mark3labs/mcp-go v0.43.1 github.com/spf13/cobra v1.10.1 github.com/spf13/viper v1.21.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -34,5 +35,4 @@ require ( golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.28.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/pkg/tools/client.go b/pkg/tools/client.go index 4297149..3ad05cf 100644 --- a/pkg/tools/client.go +++ b/pkg/tools/client.go @@ -367,3 +367,144 @@ func createRequest(ctx context.Context, reqUrl *url.URL, keys *ContextKeys, opts applyAuthHeader(req, keys) return req, nil } + +func ListConfs(ctx context.Context, client Client) ([]*ConfSummary, error) { + keys, err := FetchContextKeys(ctx) + if err != nil { + return nil, err + } + + confsURL, err := url.Parse(fmt.Sprintf("%s/v1/orgs/%s/confs", client.APIURL(), keys.OrgID)) + if err != nil { + return nil, err + } + + req, err := createRequest(ctx, confsURL, keys, func(v url.Values) { + v.Set("empty_contents", "true") + }) + if err != nil { + return nil, fmt.Errorf("failed to create confs request: %v", err) + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("failed to list confs, status code %d: %s", resp.StatusCode, string(bodyBytes)) + } + + var out []*ConfSummary + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("failed to decode confs response: %v", err) + } + return out, nil +} + +func GetIngestionEndpoints(ctx context.Context, client Client) (*IngestionEndpointsResponse, error) { + keys, err := FetchContextKeys(ctx) + if err != nil { + return nil, err + } + + endpointsURL, err := url.Parse(fmt.Sprintf("%s/v1/orgs/%s/ingestion_endpoints", client.APIURL(), keys.OrgID)) + if err != nil { + return nil, err + } + + req, err := createRequest(ctx, endpointsURL, keys) + if err != nil { + return nil, fmt.Errorf("failed to create ingestion_endpoints request: %v", err) + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("failed to get ingestion endpoints, status code %d: %s", resp.StatusCode, string(bodyBytes)) + } + + var out IngestionEndpointsResponse + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("failed to decode ingestion_endpoints response: %v", err) + } + return &out, nil +} + +func GetIngestionToken(ctx context.Context, client Client, confID, nodeName string) (*IngestionTokenResponse, error) { + keys, err := FetchContextKeys(ctx) + if err != nil { + return nil, err + } + + tokenURL, err := url.Parse(fmt.Sprintf("%s/v1/orgs/%s/ingestion_token", client.APIURL(), keys.OrgID)) + if err != nil { + return nil, err + } + + req, err := createRequest(ctx, tokenURL, keys, func(v url.Values) { + v.Set("conf_id", confID) + v.Set("node_name", nodeName) + }) + if err != nil { + return nil, fmt.Errorf("failed to create ingestion_token request: %v", err) + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("failed to get ingestion token, status code %d: %s", resp.StatusCode, string(bodyBytes)) + } + + var out IngestionTokenResponse + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("failed to decode ingestion_token response: %v", err) + } + return &out, nil +} + +func GetConf(ctx context.Context, client Client, confID string) (*ConfSummary, error) { + keys, err := FetchContextKeys(ctx) + if err != nil { + return nil, err + } + + confURL, err := url.Parse(fmt.Sprintf("%s/v1/orgs/%s/confs/%s", client.APIURL(), keys.OrgID, confID)) + if err != nil { + return nil, err + } + + req, err := createRequest(ctx, confURL, keys) + if err != nil { + return nil, fmt.Errorf("failed to create get-conf request: %v", err) + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("failed to get conf %s, status code %d: %s", confID, resp.StatusCode, string(bodyBytes)) + } + + var out ConfSummary + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("failed to decode conf response: %v", err) + } + return &out, nil +} diff --git a/pkg/tools/core.go b/pkg/tools/core.go index e95b34e..dd23ac1 100644 --- a/pkg/tools/core.go +++ b/pkg/tools/core.go @@ -43,9 +43,10 @@ const ( type FleetType string const ( - EdgeFleetType FleetType = "Edge" - CloudFleetType FleetType = "Cloud" - GatewayFleetType FleetType = "Gateway" + EdgeFleetType FleetType = "Edge" + CloudFleetType FleetType = "Cloud" + GatewayFleetType FleetType = "Gateway" + IngestionPipelineFleetType FleetType = "IngestionPipeline" ) // FleetStatus represents the status of a fleet @@ -69,3 +70,35 @@ type PipelineSummary struct { FleetType FleetType `json:"fleet_type,omitempty"` Status FleetStatus `json:"status,omitempty"` } + +// IngestionEndpointsResponse mirrors the backend response from +// GET /v1/orgs/{org_id}/ingestion_endpoints +type IngestionEndpointsResponse struct { + HTTPS *HTTPSIngestionEndpoints `json:"https,omitempty"` +} + +type HTTPSIngestionEndpoints struct { + BaseURL string `json:"base_url"` + PathForDataType map[string]string `json:"path_for_data_type"` + SampleData map[string]string `json:"sample_data"` + TestCommands map[string]string `json:"test_commands"` +} + +// IngestionTokenResponse mirrors the backend response from +// GET /v1/orgs/{org_id}/ingestion_token +type IngestionTokenResponse struct { + RawToken string `json:"raw_token"` + TokenID string `json:"token_id"` + OrgID string `json:"org_id"` + ConfID string `json:"conf_id"` + NodeName string `json:"node_name"` +} + +// ConfSummary mirrors the fields of backend/core.Conf that this package uses. +// The backend endpoint returns more fields; we only decode what we need. +type ConfSummary struct { + ID string `json:"id"` + Tag string `json:"tag"` + FleetType FleetType `json:"fleet_type"` + Content string `json:"content,omitempty"` +} diff --git a/pkg/tools/ingestion.go b/pkg/tools/ingestion.go new file mode 100644 index 0000000..c6d6f93 --- /dev/null +++ b/pkg/tools/ingestion.go @@ -0,0 +1,222 @@ +package tools + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "strings" + + "github.com/mark3labs/mcp-go/mcp" + "github.com/mark3labs/mcp-go/server" + "gopkg.in/yaml.v3" +) + +const ( + ingestionPipelineTag = "AI-Connector-Telemetry-Pipeline" + httpIngestionInputType = "http_ingestion_input" +) + +type ingestionEndpointOut struct { + Protocol string `json:"protocol"` + DataType string `json:"data_type"` + URL string `json:"url"` + SampleData string `json:"sample_data,omitempty"` + TestCommand string `json:"test_command,omitempty"` +} + +type ingestionPipelineOut struct { + ConfID string `json:"conf_id"` + Name string `json:"name"` + NodeName string `json:"node_name"` + Default bool `json:"default"` + Endpoints []ingestionEndpointOut `json:"endpoints"` + Error string `json:"error,omitempty"` +} + +type getIngestionEndpointOut struct { + Pipelines []ingestionPipelineOut `json:"pipelines"` +} + +// GetIngestionEndpointTool returns Edge Delta ingestion URLs for every +// IngestionPipeline conf the caller can see. The pipeline tagged +// "AI-Connector-Telemetry-Pipeline" — auto-provisioned for every org — is +// flagged default=true so the LLM can prefer it when no other signal +// exists. Tokens are fetched server-side; the caller never provides one. +func GetIngestionEndpointTool(client Client) (tool mcp.Tool, handler server.ToolHandlerFunc) { + return mcp.NewTool("get_ingestion_endpoint", + mcp.WithTitleAnnotation("Get Ingestion Endpoint"), + mcp.WithDescription(`Returns Edge Delta ingestion endpoints for every ingestion pipeline in the org (logs, metrics, traces, events) with the stream token embedded as a ?token=... query parameter. Each pipeline includes default=true if it is the auto-provisioned default ("AI-Connector-Telemetry-Pipeline"); prefer the default unless the user has named a specific pipeline. Each endpoint carries a "protocol" field ("http" today; "grpc" may be added later). POST raw payload bodies to HTTP URLs to send telemetry to Edge Delta.`), + mcp.WithReadOnlyHintAnnotation(true), + mcp.WithIdempotentHintAnnotation(true), + mcp.WithDestructiveHintAnnotation(false), + mcp.WithOpenWorldHintAnnotation(false), + ), + func(ctx context.Context, _ mcp.CallToolRequest) (*mcp.CallToolResult, error) { + confs, err := ListConfs(ctx, client) + if err != nil { + return nil, fmt.Errorf("failed to list configurations: %w", err) + } + + ingestionConfs := filterIngestionConfs(confs) + if len(ingestionConfs) == 0 { + return mcp.NewToolResultError(fmt.Sprintf( + "no ingestion pipelines (fleet_type=%q) visible to this caller. /confs returned %d entries: %s", + IngestionPipelineFleetType, len(confs), summarizeConfs(confs), + )), nil + } + + endpoints, err := GetIngestionEndpoints(ctx, client) + if err != nil { + return nil, fmt.Errorf("failed to get ingestion endpoints: %w", err) + } + if endpoints.HTTPS == nil { + return mcp.NewToolResultError("backend did not return HTTPS ingestion endpoints"), nil + } + base, err := url.Parse(endpoints.HTTPS.BaseURL) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("invalid base_url %q: %v", endpoints.HTTPS.BaseURL, err)), nil + } + + var out getIngestionEndpointOut + for _, c := range ingestionConfs { + out.Pipelines = append(out.Pipelines, resolvePipeline(ctx, client, c, base, endpoints.HTTPS)...) + } + + b, err := json.Marshal(out) + if err != nil { + return nil, fmt.Errorf("failed to marshal response: %w", err) + } + return mcp.NewToolResultText(string(b)), nil + } +} + +// resolvePipeline expands a single ingestion-pipeline conf into one entry per +// HTTP ingest node it declares. If the conf detail can't be fetched or the +// YAML doesn't expose any HTTP ingest node, a single placeholder entry with +// an Error field is returned so the LLM has visibility into the failure. +func resolvePipeline( + ctx context.Context, + client Client, + c *ConfSummary, + base *url.URL, + httpsCfg *HTTPSIngestionEndpoints, +) []ingestionPipelineOut { + isDefault := c.Tag == ingestionPipelineTag + + detail, err := GetConf(ctx, client, c.ID) + if err != nil { + return []ingestionPipelineOut{{ + ConfID: c.ID, + Name: c.Tag, + Default: isDefault, + Error: fmt.Sprintf("failed to fetch pipeline content: %v", err), + }} + } + + nodeNames := findHTTPIngestNodeNames(detail.Content) + if len(nodeNames) == 0 { + return []ingestionPipelineOut{{ + ConfID: c.ID, + Name: c.Tag, + Default: isDefault, + Error: fmt.Sprintf("no %q nodes in this pipeline", httpIngestionInputType), + }} + } + + out := make([]ingestionPipelineOut, 0, len(nodeNames)) + for _, nodeName := range nodeNames { + entry := ingestionPipelineOut{ + ConfID: c.ID, + Name: c.Tag, + NodeName: nodeName, + Default: isDefault, + } + tokenResp, err := GetIngestionToken(ctx, client, c.ID, nodeName) + if err != nil { + entry.Error = fmt.Sprintf("failed to fetch token: %v", err) + out = append(out, entry) + continue + } + if tokenResp.RawToken == "" { + entry.Error = "backend returned empty ingestion token" + out = append(out, entry) + continue + } + entry.Endpoints = buildEndpoints(base, httpsCfg, tokenResp.RawToken) + out = append(out, entry) + } + return out +} + +func buildEndpoints(base *url.URL, cfg *HTTPSIngestionEndpoints, rawToken string) []ingestionEndpointOut { + endpoints := make([]ingestionEndpointOut, 0, len(cfg.PathForDataType)) + for dataType, path := range cfg.PathForDataType { + u := *base + u.Path = strings.TrimRight(base.Path, "/") + "/" + strings.TrimLeft(path, "/") + q := u.Query() + q.Set("token", rawToken) + u.RawQuery = q.Encode() + + item := ingestionEndpointOut{Protocol: "http", DataType: dataType, URL: u.String()} + if cfg.SampleData != nil { + item.SampleData = cfg.SampleData[dataType] + } + if cmd, ok := cfg.TestCommands[dataType]; ok { + item.TestCommand = strings.ReplaceAll(cmd, "{TOKEN}", rawToken) + } + endpoints = append(endpoints, item) + } + return endpoints +} + +func filterIngestionConfs(confs []*ConfSummary) []*ConfSummary { + out := make([]*ConfSummary, 0, len(confs)) + for _, c := range confs { + if c == nil || c.FleetType != IngestionPipelineFleetType { + continue + } + out = append(out, c) + } + return out +} + +// pipelineYAML matches just the bits of a pipeline config we care about. +type pipelineYAML struct { + Nodes []struct { + Name string `yaml:"name"` + Type string `yaml:"type"` + } `yaml:"nodes"` +} + +func findHTTPIngestNodeNames(content string) []string { + if content == "" { + return nil + } + var cfg pipelineYAML + if err := yaml.Unmarshal([]byte(content), &cfg); err != nil { + return nil + } + var names []string + for _, n := range cfg.Nodes { + if n.Type == httpIngestionInputType && n.Name != "" { + names = append(names, n.Name) + } + } + return names +} + +// summarizeConfs returns a short debug string of fleet_type/tag pairs for each conf. +func summarizeConfs(confs []*ConfSummary) string { + if len(confs) == 0 { + return "[]" + } + parts := make([]string, 0, len(confs)) + for _, c := range confs { + if c == nil { + continue + } + parts = append(parts, fmt.Sprintf("{id=%s fleet_type=%q tag=%q}", c.ID, c.FleetType, c.Tag)) + } + return "[" + strings.Join(parts, ", ") + "]" +} diff --git a/server/server.go b/server/server.go index 18d7f41..b918988 100644 --- a/server/server.go +++ b/server/server.go @@ -60,6 +60,9 @@ func AddCustomTools(s *server.MCPServer, client tools.Client) { s.AddTool(tools.DeployPipelineTool(client)) s.AddTool(tools.AddPipelineSourceTool(client)) + // Ingestion tools + s.AddTool(tools.GetIngestionEndpointTool(client)) + // Facet tools s.AddTool(tools.FacetsTool, tools.FacetsToolHandler(client)) s.AddTool(tools.FacetOptionsTool, tools.FacetOptionsToolHandler(client))