From f56847737ef4cef5c8e0c4d9ee2aa9c5f29c6ec6 Mon Sep 17 00:00:00 2001 From: Fatih Yildiz Date: Sat, 25 Apr 2026 09:05:10 -0700 Subject: [PATCH 1/4] #mcp add a new tool for ingestion endpoint discovery --- pkg/tools/client.go | 107 +++++++++++++++++++++++++++++++ pkg/tools/core.go | 38 ++++++++++- pkg/tools/ingestion.go | 141 +++++++++++++++++++++++++++++++++++++++++ server/server.go | 3 + 4 files changed, 286 insertions(+), 3 deletions(-) create mode 100644 pkg/tools/ingestion.go diff --git a/pkg/tools/client.go b/pkg/tools/client.go index 4297149..8477d44 100644 --- a/pkg/tools/client.go +++ b/pkg/tools/client.go @@ -367,3 +367,110 @@ func createRequest(ctx context.Context, reqUrl *url.URL, keys *ContextKeys, opts applyAuthHeader(req, keys) return req, nil } + +func ListConfs(ctx context.Context, client Client) ([]*ConfSummary, error) { + keys, err := FetchContextKeys(ctx) + if err != nil { + return nil, err + } + + confsURL, err := url.Parse(fmt.Sprintf("%s/v1/orgs/%s/confs", client.APIURL(), keys.OrgID)) + if err != nil { + return nil, err + } + + req, err := createRequest(ctx, confsURL, keys, func(v url.Values) { + v.Set("empty_contents", "true") + }) + if err != nil { + return nil, fmt.Errorf("failed to create confs request: %v", err) + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("failed to list confs, status code %d: %s", resp.StatusCode, string(bodyBytes)) + } + + var out []*ConfSummary + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("failed to decode confs response: %v", err) + } + return out, nil +} + +func GetIngestionEndpoints(ctx context.Context, client Client) (*IngestionEndpointsResponse, error) { + keys, err := FetchContextKeys(ctx) + if err != nil { + return nil, err + } + + endpointsURL, err := url.Parse(fmt.Sprintf("%s/v1/orgs/%s/ingestion_endpoints", client.APIURL(), keys.OrgID)) + if err != nil { + return nil, err + } + + req, err := createRequest(ctx, endpointsURL, keys) + if err != nil { + return nil, fmt.Errorf("failed to create ingestion_endpoints request: %v", err) + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("failed to get ingestion endpoints, status code %d: %s", resp.StatusCode, string(bodyBytes)) + } + + var out IngestionEndpointsResponse + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("failed to decode ingestion_endpoints response: %v", err) + } + return &out, nil +} + +func GetIngestionToken(ctx context.Context, client Client, confID, nodeName string) (*IngestionTokenResponse, error) { + keys, err := FetchContextKeys(ctx) + if err != nil { + return nil, err + } + + tokenURL, err := url.Parse(fmt.Sprintf("%s/v1/orgs/%s/ingestion_token", client.APIURL(), keys.OrgID)) + if err != nil { + return nil, err + } + + req, err := createRequest(ctx, tokenURL, keys, func(v url.Values) { + v.Set("conf_id", confID) + v.Set("node_name", nodeName) + }) + if err != nil { + return nil, fmt.Errorf("failed to create ingestion_token request: %v", err) + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("failed to get ingestion token, status code %d: %s", resp.StatusCode, string(bodyBytes)) + } + + var out IngestionTokenResponse + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("failed to decode ingestion_token response: %v", err) + } + return &out, nil +} diff --git a/pkg/tools/core.go b/pkg/tools/core.go index e95b34e..b8e30db 100644 --- a/pkg/tools/core.go +++ b/pkg/tools/core.go @@ -43,9 +43,10 @@ const ( type FleetType string const ( - EdgeFleetType FleetType = "Edge" - CloudFleetType FleetType = "Cloud" - GatewayFleetType FleetType = "Gateway" + EdgeFleetType FleetType = "Edge" + CloudFleetType FleetType = "Cloud" + GatewayFleetType FleetType = "Gateway" + IngestionPipelineFleetType FleetType = "IngestionPipeline" ) // FleetStatus represents the status of a fleet @@ -69,3 +70,34 @@ type PipelineSummary struct { FleetType FleetType `json:"fleet_type,omitempty"` Status FleetStatus `json:"status,omitempty"` } + +// IngestionEndpointsResponse mirrors the backend response from +// GET /v1/orgs/{org_id}/ingestion_endpoints +type IngestionEndpointsResponse struct { + HTTPS *HTTPSIngestionEndpoints `json:"https,omitempty"` +} + +type HTTPSIngestionEndpoints struct { + BaseURL string `json:"base_url"` + PathForDataType map[string]string `json:"path_for_data_type"` + SampleData map[string]string `json:"sample_data"` + TestCommands map[string]string `json:"test_commands"` +} + +// IngestionTokenResponse mirrors the backend response from +// GET /v1/orgs/{org_id}/ingestion_token +type IngestionTokenResponse struct { + RawToken string `json:"raw_token"` + TokenID string `json:"token_id"` + OrgID string `json:"org_id"` + ConfID string `json:"conf_id"` + NodeName string `json:"node_name"` +} + +// ConfSummary mirrors the fields of backend/core.Conf that this package uses. +// The backend endpoint returns more fields; we only decode what we need. +type ConfSummary struct { + ID string `json:"id"` + Tag string `json:"tag"` + FleetType FleetType `json:"fleet_type"` +} diff --git a/pkg/tools/ingestion.go b/pkg/tools/ingestion.go new file mode 100644 index 0000000..649f84d --- /dev/null +++ b/pkg/tools/ingestion.go @@ -0,0 +1,141 @@ +package tools + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "strings" + + "github.com/mark3labs/mcp-go/mcp" + "github.com/mark3labs/mcp-go/server" +) + +// These mirror backend constants for the default auto-provisioned ingestion +// pipeline every org is given on creation: +// - backend/provision/ingestpipeline/provisioner.go (defaultIngestPipelineName) +// - deploy/modules/knowledge/base_config_ingestion_pipeline.yml (node name) +const ( + ingestionPipelineTag = "AI-Connector-Telemetry-Pipeline" + httpIngestionNodeName = "http_ingestion_input" +) + +type ingestionEndpointOut struct { + Protocol string `json:"protocol"` + DataType string `json:"data_type"` + URL string `json:"url"` + SampleData string `json:"sample_data,omitempty"` + TestCommand string `json:"test_command,omitempty"` +} + +type getIngestionEndpointOut struct { + Endpoints []ingestionEndpointOut `json:"endpoints"` +} + +// GetIngestionEndpointTool returns Edge Delta HTTP ingestion URLs with the +// stream token pre-populated as a `?token=...` query parameter. Takes no args: +// it resolves the org's auto-provisioned ingestion pipeline, its HTTP ingest +// node, and fetches the stream token — all server-side. +func GetIngestionEndpointTool(client Client) (tool mcp.Tool, handler server.ToolHandlerFunc) { + return mcp.NewTool("get_ingestion_endpoint", + mcp.WithTitleAnnotation("Get Ingestion Endpoint"), + mcp.WithDescription(`Returns Edge Delta ingestion endpoints (logs, metrics, traces, events) with the stream token embedded as a ?token=... query parameter. Each endpoint includes a "protocol" field ("http" today; "grpc" may be added in the future) so callers can pick the right transport. Takes no arguments — the org's auto-provisioned ingestion pipeline and its ingest node are resolved server-side. POST raw payload bodies to HTTP URLs to send telemetry to Edge Delta.`), + mcp.WithReadOnlyHintAnnotation(true), + mcp.WithIdempotentHintAnnotation(true), + mcp.WithDestructiveHintAnnotation(false), + mcp.WithOpenWorldHintAnnotation(false), + ), + func(ctx context.Context, _ mcp.CallToolRequest) (*mcp.CallToolResult, error) { + confs, err := ListConfs(ctx, client) + if err != nil { + return nil, fmt.Errorf("failed to list configurations: %w", err) + } + + confID := findDefaultIngestionConfID(confs) + if confID == "" { + return mcp.NewToolResultError(fmt.Sprintf( + "could not find default ingestion pipeline (fleet_type=%q, tag=%q) for this org. /confs returned %d entries: %s", + IngestionPipelineFleetType, ingestionPipelineTag, len(confs), summarizeConfs(confs), + )), nil + } + + endpoints, err := GetIngestionEndpoints(ctx, client) + if err != nil { + return nil, fmt.Errorf("failed to get ingestion endpoints: %w", err) + } + if endpoints.HTTPS == nil { + return mcp.NewToolResultError("backend did not return HTTPS ingestion endpoints"), nil + } + + tokenResp, err := GetIngestionToken(ctx, client, confID, httpIngestionNodeName) + if err != nil { + return nil, fmt.Errorf("failed to get ingestion token: %w", err) + } + if tokenResp.RawToken == "" { + return mcp.NewToolResultError("backend returned empty ingestion token"), nil + } + + base, err := url.Parse(endpoints.HTTPS.BaseURL) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("invalid base_url %q: %v", endpoints.HTTPS.BaseURL, err)), nil + } + + var out getIngestionEndpointOut + for dataType, path := range endpoints.HTTPS.PathForDataType { + u := *base + u.Path = strings.TrimRight(base.Path, "/") + "/" + strings.TrimLeft(path, "/") + q := u.Query() + q.Set("token", tokenResp.RawToken) + u.RawQuery = q.Encode() + + item := ingestionEndpointOut{Protocol: "http", DataType: dataType, URL: u.String()} + if endpoints.HTTPS.SampleData != nil { + item.SampleData = endpoints.HTTPS.SampleData[dataType] + } + if cmd, ok := endpoints.HTTPS.TestCommands[dataType]; ok { + item.TestCommand = strings.ReplaceAll(cmd, "{TOKEN}", tokenResp.RawToken) + } + out.Endpoints = append(out.Endpoints, item) + } + + b, err := json.Marshal(out) + if err != nil { + return nil, fmt.Errorf("failed to marshal response: %w", err) + } + return mcp.NewToolResultText(string(b)), nil + } +} + +// summarizeConfs returns a short debug string of fleet_type/tag pairs for each conf. +func summarizeConfs(confs []*ConfSummary) string { + if len(confs) == 0 { + return "[]" + } + parts := make([]string, 0, len(confs)) + for _, c := range confs { + if c == nil { + continue + } + parts = append(parts, fmt.Sprintf("{id=%s fleet_type=%q tag=%q}", c.ID, c.FleetType, c.Tag)) + } + return "[" + strings.Join(parts, ", ") + "]" +} + +// findDefaultIngestionConfID picks the conf ID for the org's auto-provisioned +// ingestion pipeline. Prefers an exact tag match; falls back to the first +// ingestion-pipeline conf if none match the tag (covers older provisions). +func findDefaultIngestionConfID(confs []*ConfSummary) string { + var fallback string + for _, c := range confs { + if c == nil || c.FleetType != IngestionPipelineFleetType { + continue + } + if c.Tag == ingestionPipelineTag { + return c.ID + } + if fallback == "" { + fallback = c.ID + } + } + return fallback +} diff --git a/server/server.go b/server/server.go index 18d7f41..b918988 100644 --- a/server/server.go +++ b/server/server.go @@ -60,6 +60,9 @@ func AddCustomTools(s *server.MCPServer, client tools.Client) { s.AddTool(tools.DeployPipelineTool(client)) s.AddTool(tools.AddPipelineSourceTool(client)) + // Ingestion tools + s.AddTool(tools.GetIngestionEndpointTool(client)) + // Facet tools s.AddTool(tools.FacetsTool, tools.FacetsToolHandler(client)) s.AddTool(tools.FacetOptionsTool, tools.FacetOptionsToolHandler(client)) From 4c94f2eba06858338b9de5e2dd1b11abee635051 Mon Sep 17 00:00:00 2001 From: Fatih Yildiz Date: Mon, 27 Apr 2026 17:35:36 -0700 Subject: [PATCH 2/4] update --- pkg/tools/ingestion.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/tools/ingestion.go b/pkg/tools/ingestion.go index 649f84d..c3e00ae 100644 --- a/pkg/tools/ingestion.go +++ b/pkg/tools/ingestion.go @@ -11,10 +11,6 @@ import ( "github.com/mark3labs/mcp-go/server" ) -// These mirror backend constants for the default auto-provisioned ingestion -// pipeline every org is given on creation: -// - backend/provision/ingestpipeline/provisioner.go (defaultIngestPipelineName) -// - deploy/modules/knowledge/base_config_ingestion_pipeline.yml (node name) const ( ingestionPipelineTag = "AI-Connector-Telemetry-Pipeline" httpIngestionNodeName = "http_ingestion_input" From cc0100cbe6afbaa2a5c8f33aed2c07e8eb46158d Mon Sep 17 00:00:00 2001 From: Fatih Yildiz Date: Tue, 28 Apr 2026 15:41:50 -0700 Subject: [PATCH 3/4] update --- go.mod | 2 +- pkg/tools/client.go | 34 +++++++ pkg/tools/core.go | 1 + pkg/tools/ingestion.go | 195 +++++++++++++++++++++++++++++------------ 4 files changed, 176 insertions(+), 56 deletions(-) diff --git a/go.mod b/go.mod index 3a4d6c9..89c13fa 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/mark3labs/mcp-go v0.43.1 github.com/spf13/cobra v1.10.1 github.com/spf13/viper v1.21.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -34,5 +35,4 @@ require ( golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.28.0 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/pkg/tools/client.go b/pkg/tools/client.go index 8477d44..3ad05cf 100644 --- a/pkg/tools/client.go +++ b/pkg/tools/client.go @@ -474,3 +474,37 @@ func GetIngestionToken(ctx context.Context, client Client, confID, nodeName stri } return &out, nil } + +func GetConf(ctx context.Context, client Client, confID string) (*ConfSummary, error) { + keys, err := FetchContextKeys(ctx) + if err != nil { + return nil, err + } + + confURL, err := url.Parse(fmt.Sprintf("%s/v1/orgs/%s/confs/%s", client.APIURL(), keys.OrgID, confID)) + if err != nil { + return nil, err + } + + req, err := createRequest(ctx, confURL, keys) + if err != nil { + return nil, fmt.Errorf("failed to create get-conf request: %v", err) + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + bodyBytes, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("failed to get conf %s, status code %d: %s", confID, resp.StatusCode, string(bodyBytes)) + } + + var out ConfSummary + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("failed to decode conf response: %v", err) + } + return &out, nil +} diff --git a/pkg/tools/core.go b/pkg/tools/core.go index b8e30db..dd23ac1 100644 --- a/pkg/tools/core.go +++ b/pkg/tools/core.go @@ -100,4 +100,5 @@ type ConfSummary struct { ID string `json:"id"` Tag string `json:"tag"` FleetType FleetType `json:"fleet_type"` + Content string `json:"content,omitempty"` } diff --git a/pkg/tools/ingestion.go b/pkg/tools/ingestion.go index c3e00ae..795516f 100644 --- a/pkg/tools/ingestion.go +++ b/pkg/tools/ingestion.go @@ -9,11 +9,12 @@ import ( "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" + "gopkg.in/yaml.v3" ) const ( - ingestionPipelineTag = "AI-Connector-Telemetry-Pipeline" - httpIngestionNodeName = "http_ingestion_input" + ingestionPipelineTag = "AI-Connector-Telemetry-Pipeline" + httpIngestionInputType = "http_ingestion_input" ) type ingestionEndpointOut struct { @@ -24,18 +25,28 @@ type ingestionEndpointOut struct { TestCommand string `json:"test_command,omitempty"` } -type getIngestionEndpointOut struct { +type ingestionPipelineOut struct { + ConfID string `json:"conf_id"` + Name string `json:"name"` + NodeName string `json:"node_name"` + Default bool `json:"default"` Endpoints []ingestionEndpointOut `json:"endpoints"` + Error string `json:"error,omitempty"` +} + +type getIngestionEndpointOut struct { + Pipelines []ingestionPipelineOut `json:"pipelines"` } -// GetIngestionEndpointTool returns Edge Delta HTTP ingestion URLs with the -// stream token pre-populated as a `?token=...` query parameter. Takes no args: -// it resolves the org's auto-provisioned ingestion pipeline, its HTTP ingest -// node, and fetches the stream token — all server-side. +// GetIngestionEndpointTool returns Edge Delta ingestion URLs for every +// IngestionPipeline conf the caller can see. The pipeline tagged +// "AI-Connector-Telemetry-Pipeline" — auto-provisioned for every org — is +// flagged default=true so the LLM can prefer it when no other signal +// exists. Tokens are fetched server-side; the caller never provides one. func GetIngestionEndpointTool(client Client) (tool mcp.Tool, handler server.ToolHandlerFunc) { return mcp.NewTool("get_ingestion_endpoint", mcp.WithTitleAnnotation("Get Ingestion Endpoint"), - mcp.WithDescription(`Returns Edge Delta ingestion endpoints (logs, metrics, traces, events) with the stream token embedded as a ?token=... query parameter. Each endpoint includes a "protocol" field ("http" today; "grpc" may be added in the future) so callers can pick the right transport. Takes no arguments — the org's auto-provisioned ingestion pipeline and its ingest node are resolved server-side. POST raw payload bodies to HTTP URLs to send telemetry to Edge Delta.`), + mcp.WithDescription(`Returns Edge Delta ingestion endpoints for every ingestion pipeline in the org (logs, metrics, traces, events) with the stream token embedded as a ?token=... query parameter. Each pipeline includes default=true if it is the auto-provisioned default ("AI-Connector-Telemetry-Pipeline"); prefer the default unless the user has named a specific pipeline. Each endpoint carries a "protocol" field ("http" today; "grpc" may be added later). POST raw payload bodies to HTTP URLs to send telemetry to Edge Delta.`), mcp.WithReadOnlyHintAnnotation(true), mcp.WithIdempotentHintAnnotation(true), mcp.WithDestructiveHintAnnotation(false), @@ -47,11 +58,11 @@ func GetIngestionEndpointTool(client Client) (tool mcp.Tool, handler server.Tool return nil, fmt.Errorf("failed to list configurations: %w", err) } - confID := findDefaultIngestionConfID(confs) - if confID == "" { + ingestionConfs := filterIngestionConfs(confs) + if len(ingestionConfs) == 0 { return mcp.NewToolResultError(fmt.Sprintf( - "could not find default ingestion pipeline (fleet_type=%q, tag=%q) for this org. /confs returned %d entries: %s", - IngestionPipelineFleetType, ingestionPipelineTag, len(confs), summarizeConfs(confs), + "no ingestion pipelines (fleet_type=%q) visible to this caller. /confs returned %d entries: %s", + IngestionPipelineFleetType, len(confs), summarizeConfs(confs), )), nil } @@ -62,36 +73,14 @@ func GetIngestionEndpointTool(client Client) (tool mcp.Tool, handler server.Tool if endpoints.HTTPS == nil { return mcp.NewToolResultError("backend did not return HTTPS ingestion endpoints"), nil } - - tokenResp, err := GetIngestionToken(ctx, client, confID, httpIngestionNodeName) - if err != nil { - return nil, fmt.Errorf("failed to get ingestion token: %w", err) - } - if tokenResp.RawToken == "" { - return mcp.NewToolResultError("backend returned empty ingestion token"), nil - } - base, err := url.Parse(endpoints.HTTPS.BaseURL) if err != nil { return mcp.NewToolResultError(fmt.Sprintf("invalid base_url %q: %v", endpoints.HTTPS.BaseURL, err)), nil } var out getIngestionEndpointOut - for dataType, path := range endpoints.HTTPS.PathForDataType { - u := *base - u.Path = strings.TrimRight(base.Path, "/") + "/" + strings.TrimLeft(path, "/") - q := u.Query() - q.Set("token", tokenResp.RawToken) - u.RawQuery = q.Encode() - - item := ingestionEndpointOut{Protocol: "http", DataType: dataType, URL: u.String()} - if endpoints.HTTPS.SampleData != nil { - item.SampleData = endpoints.HTTPS.SampleData[dataType] - } - if cmd, ok := endpoints.HTTPS.TestCommands[dataType]; ok { - item.TestCommand = strings.ReplaceAll(cmd, "{TOKEN}", tokenResp.RawToken) - } - out.Endpoints = append(out.Endpoints, item) + for _, c := range ingestionConfs { + out.Pipelines = append(out.Pipelines, resolvePipeline(ctx, client, c, base, endpoints.HTTPS)...) } b, err := json.Marshal(out) @@ -102,36 +91,132 @@ func GetIngestionEndpointTool(client Client) (tool mcp.Tool, handler server.Tool } } -// summarizeConfs returns a short debug string of fleet_type/tag pairs for each conf. -func summarizeConfs(confs []*ConfSummary) string { - if len(confs) == 0 { - return "[]" +// resolvePipeline expands a single ingestion-pipeline conf into one entry per +// HTTP ingest node it declares. If the conf detail can't be fetched or the +// YAML doesn't expose any HTTP ingest node, a single placeholder entry with +// an Error field is returned so the LLM has visibility into the failure. +func resolvePipeline( + ctx context.Context, + client Client, + c *ConfSummary, + base *url.URL, + httpsCfg *HTTPSIngestionEndpoints, +) []ingestionPipelineOut { + isDefault := c.Tag == ingestionPipelineTag + + detail, err := GetConf(ctx, client, c.ID) + if err != nil { + return []ingestionPipelineOut{{ + ConfID: c.ID, + Name: c.Tag, + Default: isDefault, + Error: fmt.Sprintf("failed to fetch pipeline content: %v", err), + }} } - parts := make([]string, 0, len(confs)) - for _, c := range confs { - if c == nil { + + nodeNames := findHTTPIngestNodeNames(detail.Content) + if len(nodeNames) == 0 { + return []ingestionPipelineOut{{ + ConfID: c.ID, + Name: c.Tag, + Default: isDefault, + Error: fmt.Sprintf("no %q nodes in this pipeline", httpIngestionInputType), + }} + } + + out := make([]ingestionPipelineOut, 0, len(nodeNames)) + for _, nodeName := range nodeNames { + entry := ingestionPipelineOut{ + ConfID: c.ID, + Name: c.Tag, + NodeName: nodeName, + Default: isDefault, + } + tokenResp, err := GetIngestionToken(ctx, client, c.ID, nodeName) + if err != nil { + entry.Error = fmt.Sprintf("failed to fetch token: %v", err) + out = append(out, entry) continue } - parts = append(parts, fmt.Sprintf("{id=%s fleet_type=%q tag=%q}", c.ID, c.FleetType, c.Tag)) + if tokenResp.RawToken == "" { + entry.Error = "backend returned empty ingestion token" + out = append(out, entry) + continue + } + entry.Endpoints = buildEndpoints(base, httpsCfg, tokenResp.RawToken) + out = append(out, entry) } - return "[" + strings.Join(parts, ", ") + "]" + return out +} + +func buildEndpoints(base *url.URL, cfg *HTTPSIngestionEndpoints, rawToken string) []ingestionEndpointOut { + endpoints := make([]ingestionEndpointOut, 0, len(cfg.PathForDataType)) + for dataType, path := range cfg.PathForDataType { + u := *base + u.Path = strings.TrimRight(base.Path, "/") + "/" + strings.TrimLeft(path, "/") + q := u.Query() + q.Set("token", rawToken) + u.RawQuery = q.Encode() + + item := ingestionEndpointOut{Protocol: "http", DataType: dataType, URL: u.String()} + if cfg.SampleData != nil { + item.SampleData = cfg.SampleData[dataType] + } + if cmd, ok := cfg.TestCommands[dataType]; ok { + item.TestCommand = strings.ReplaceAll(cmd, "{TOKEN}", rawToken) + } + endpoints = append(endpoints, item) + } + return endpoints } -// findDefaultIngestionConfID picks the conf ID for the org's auto-provisioned -// ingestion pipeline. Prefers an exact tag match; falls back to the first -// ingestion-pipeline conf if none match the tag (covers older provisions). -func findDefaultIngestionConfID(confs []*ConfSummary) string { - var fallback string +func filterIngestionConfs(confs []*ConfSummary) []*ConfSummary { + out := make([]*ConfSummary, 0, len(confs)) for _, c := range confs { if c == nil || c.FleetType != IngestionPipelineFleetType { continue } - if c.Tag == ingestionPipelineTag { - return c.ID + out = append(out, c) + } + return out +} + +// pipelineYAML matches just the bits of a pipeline config we care about. +type pipelineYAML struct { + Nodes []struct { + Name string `yaml:"name"` + Type string `yaml:"type"` + } `yaml:"nodes"` +} + +func findHTTPIngestNodeNames(content string) []string { + if content == "" { + return nil + } + var cfg pipelineYAML + if err := yaml.Unmarshal([]byte(content), &cfg); err != nil { + return nil + } + var names []string + for _, n := range cfg.Nodes { + if n.Type == httpIngestionInputType && n.Name != "" { + names = append(names, n.Name) } - if fallback == "" { - fallback = c.ID + } + return names +} + +// summarizeConfs returns a short debug string of fleet_type/tag pairs for each conf. +func summarizeConfs(confs []*ConfSummary) string { + if len(confs) == 0 { + return "[]" + } + parts := make([]string, 0, len(confs)) + for _, c := range confs { + if c == nil { + continue } + parts = append(parts, fmt.Sprintf("{id=%s fleet_type=%q tag=%q}", c.ID, c.FleetType, c.Tag)) } - return fallback + return "[" + strings.Join(parts, ", ") + "]" } From e30c64fab1d2f381c7dfc87d8ddfd5d3e58be6e0 Mon Sep 17 00:00:00 2001 From: Fatih Yildiz Date: Tue, 28 Apr 2026 15:55:39 -0700 Subject: [PATCH 4/4] fix --- pkg/tools/ingestion.go | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/tools/ingestion.go b/pkg/tools/ingestion.go index 795516f..c6d6f93 100644 --- a/pkg/tools/ingestion.go +++ b/pkg/tools/ingestion.go @@ -13,8 +13,8 @@ import ( ) const ( - ingestionPipelineTag = "AI-Connector-Telemetry-Pipeline" - httpIngestionInputType = "http_ingestion_input" + ingestionPipelineTag = "AI-Connector-Telemetry-Pipeline" + httpIngestionInputType = "http_ingestion_input" ) type ingestionEndpointOut struct { @@ -107,30 +107,30 @@ func resolvePipeline( detail, err := GetConf(ctx, client, c.ID) if err != nil { return []ingestionPipelineOut{{ - ConfID: c.ID, - Name: c.Tag, - Default: isDefault, - Error: fmt.Sprintf("failed to fetch pipeline content: %v", err), + ConfID: c.ID, + Name: c.Tag, + Default: isDefault, + Error: fmt.Sprintf("failed to fetch pipeline content: %v", err), }} } nodeNames := findHTTPIngestNodeNames(detail.Content) if len(nodeNames) == 0 { return []ingestionPipelineOut{{ - ConfID: c.ID, - Name: c.Tag, - Default: isDefault, - Error: fmt.Sprintf("no %q nodes in this pipeline", httpIngestionInputType), + ConfID: c.ID, + Name: c.Tag, + Default: isDefault, + Error: fmt.Sprintf("no %q nodes in this pipeline", httpIngestionInputType), }} } out := make([]ingestionPipelineOut, 0, len(nodeNames)) for _, nodeName := range nodeNames { entry := ingestionPipelineOut{ - ConfID: c.ID, - Name: c.Tag, - NodeName: nodeName, - Default: isDefault, + ConfID: c.ID, + Name: c.Tag, + NodeName: nodeName, + Default: isDefault, } tokenResp, err := GetIngestionToken(ctx, client, c.ID, nodeName) if err != nil {