diff --git a/go.mod b/go.mod index c393c8b7..c5bc154e 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/stretchr/testify v1.11.1 golang.org/x/crypto v0.48.0 golang.org/x/net v0.51.0 + golang.org/x/sync v0.19.0 gorm.io/driver/mysql v1.6.0 gorm.io/driver/postgres v1.6.0 gorm.io/gorm v1.31.1 @@ -118,7 +119,6 @@ require ( go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/arch v0.22.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect - golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.41.0 // indirect golang.org/x/term v0.40.0 // indirect golang.org/x/text v0.34.0 // indirect diff --git a/pkg/handlers/search_handler.go b/pkg/handlers/search_handler.go index 9d6f076c..78d60504 100644 --- a/pkg/handlers/search_handler.go +++ b/pkg/handlers/search_handler.go @@ -1,11 +1,14 @@ package handlers import ( + "context" "fmt" + "log" "net/http" "sort" "strconv" "strings" + "sync/atomic" "time" "github.com/gin-gonic/gin" @@ -14,6 +17,7 @@ import ( "github.com/zxh326/kite/pkg/handlers/resources" "github.com/zxh326/kite/pkg/middleware" "github.com/zxh326/kite/pkg/utils" + "golang.org/x/sync/errgroup" ) type SearchHandler struct { @@ -54,21 +58,55 @@ func (h *SearchHandler) createCacheKey(clusterName, query string, limit int) str func (h *SearchHandler) Search(c *gin.Context, query string, limit int) ([]common.SearchResult, error) { query = normalizeSearchQuery(query) limit = normalizeSearchLimit(limit) - var allResults []common.SearchResult - // Search in different resource types + // Determine which resource types to search searchFuncs := resources.SearchFuncs guessSearchResources, q := utils.GuessSearchResources(query) + + // Collect the search functions to execute + type searchEntry struct { + name string + fn func(*gin.Context, string, int64) ([]common.SearchResult, error) + } + var entries []searchEntry for name, searchFunc := range searchFuncs { if guessSearchResources == "all" || name == guessSearchResources { - results, err := searchFunc(c, q, int64(limit)) - if err != nil { - continue - } - allResults = append(allResults, results...) + entries = append(entries, searchEntry{name: name, fn: searchFunc}) } } + // Execute searches in parallel using errgroup + resultSlices := make([][]common.SearchResult, len(entries)) + var hadFailure atomic.Bool // set on panic OR error — prevents caching incomplete results + g, _ := errgroup.WithContext(context.Background()) + + for i, entry := range entries { + g.Go(func() (err error) { + defer func() { + if r := recover(); r != nil { + log.Printf("search: resource %q panicked: %v", entry.name, r) + hadFailure.Store(true) + } + }() + results, searchErr := entry.fn(c, q, int64(limit)) + if searchErr != nil { + log.Printf("search: resource %q failed: %v", entry.name, searchErr) + hadFailure.Store(true) + return nil + } + resultSlices[i] = results + return nil + }) + } + + _ = g.Wait() // all goroutines return nil, error is always nil + + // Merge results from all resource types + var allResults []common.SearchResult + for _, slice := range resultSlices { + allResults = append(allResults, slice...) + } + queryLower := strings.ToLower(q) sortResults(allResults, queryLower) @@ -77,7 +115,11 @@ func (h *SearchHandler) Search(c *gin.Context, query string, limit int) ([]commo allResults = allResults[:limit] } - h.cache.Add(h.createCacheKey(getSearchClusterName(c), query, limit), allResults) + // Only cache results when no failure (panic or error) occurred — avoids + // caching incomplete results that would be served as valid 200 OK for the TTL. + if !hadFailure.Load() { + h.cache.Add(h.createCacheKey(getSearchClusterName(c), query, limit), allResults) + } return allResults, nil } @@ -104,11 +146,6 @@ func (h *SearchHandler) GlobalSearch(c *gin.Context) { Results: cachedResults, Total: len(cachedResults), } - copiedCtx := c.Copy() - go func() { - // Perform search in the background to update cache - _, _ = h.Search(copiedCtx, query, limit) - }() c.JSON(http.StatusOK, response) return } diff --git a/pkg/handlers/search_handler_test.go b/pkg/handlers/search_handler_test.go index 74e78f36..6c0910bf 100644 --- a/pkg/handlers/search_handler_test.go +++ b/pkg/handlers/search_handler_test.go @@ -5,7 +5,9 @@ import ( "fmt" "net/http" "net/http/httptest" + "sync/atomic" "testing" + "time" "github.com/gin-gonic/gin" "github.com/zxh326/kite/pkg/common" @@ -109,7 +111,7 @@ func TestGlobalSearchCacheKeyIncludesClusterAndLimit(t *testing.T) { handler := NewSearchHandler() - ctx := newSearchContext(t, "cluster-a", "/search") + ctx := newSearchContext(t, "cluster-a") if _, err := handler.Search(ctx, "po target", 1); err != nil { t.Fatalf("Search returned error: %v", err) } @@ -128,13 +130,13 @@ func TestGlobalSearchCacheKeyIncludesClusterAndLimit(t *testing.T) { } } -func newSearchContext(t *testing.T, clusterName, target string) *gin.Context { +func newSearchContext(t *testing.T, clusterName string) *gin.Context { t.Helper() gin.SetMode(gin.TestMode) rec := httptest.NewRecorder() ctx, _ := gin.CreateTestContext(rec) - ctx.Request = httptest.NewRequest(http.MethodGet, target, nil) + ctx.Request = httptest.NewRequest(http.MethodGet, "/search", nil) if clusterName != "" { ctx.Set(middleware.ClusterNameKey, clusterName) } @@ -163,3 +165,219 @@ func performGlobalSearch(t *testing.T, handler *SearchHandler, clusterName, targ } return resp } + +// TestSearchParallelExecution verifies that multiple resource searches run concurrently. +func TestSearchParallelExecution(t *testing.T) { + oldSearchFuncs := resources.SearchFuncs + + // Track concurrent execution: each func sleeps and records max concurrency. + var running atomic.Int32 + var maxConcurrent atomic.Int32 + + slowSearch := func(results []common.SearchResult) func(*gin.Context, string, int64) ([]common.SearchResult, error) { + return func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { + cur := running.Add(1) + // Update max concurrency seen + for { + old := maxConcurrent.Load() + if cur <= old || maxConcurrent.CompareAndSwap(old, cur) { + break + } + } + time.Sleep(50 * time.Millisecond) + running.Add(-1) + return results, nil + } + } + + resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){ + "pods": slowSearch([]common.SearchResult{{Name: "nginx", ResourceType: "pods"}}), + "services": slowSearch([]common.SearchResult{{Name: "nginx-svc", ResourceType: "services"}}), + "deployments": slowSearch([]common.SearchResult{{Name: "nginx-deploy", ResourceType: "deployments"}}), + } + t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs }) + + handler := NewSearchHandler() + ctx := newSearchContext(t, "test-cluster") + + start := time.Now() + results, err := handler.Search(ctx, "nginx", 50) + elapsed := time.Since(start) + + if err != nil { + t.Fatalf("Search returned error: %v", err) + } + + // With 3 funcs sleeping 50ms each, sequential would take >= 150ms. + // Parallel should complete in ~50-80ms. Allow generous margin. + if elapsed >= 140*time.Millisecond { + t.Errorf("Search took %v, expected < 140ms for parallel execution", elapsed) + } + + if maxConcurrent.Load() < 2 { + t.Errorf("maxConcurrent = %d, want >= 2 (proves parallelism)", maxConcurrent.Load()) + } + + if len(results) != 3 { + t.Fatalf("expected 3 results, got %d", len(results)) + } +} + +// TestSearchPartialFailure ensures that one failing resource type doesn't break others +// and that partial results due to errors are NOT cached. +func TestSearchPartialFailure(t *testing.T) { + oldSearchFuncs := resources.SearchFuncs + var callCount atomic.Int32 + resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){ + "pods": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { //nolint:unparam // signature required by SearchFuncs + callCount.Add(1) + return []common.SearchResult{{Name: "ok-pod", ResourceType: "pods"}}, nil + }, + "services": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { //nolint:unparam // signature required by SearchFuncs + callCount.Add(1) + return nil, fmt.Errorf("simulated API server error") + }, + "deployments": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { //nolint:unparam // signature required by SearchFuncs + callCount.Add(1) + return []common.SearchResult{{Name: "ok-deploy", ResourceType: "deployments"}}, nil + }, + } + t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs }) + + handler := NewSearchHandler() + ctx := newSearchContext(t, "test-cluster") + + results, err := handler.Search(ctx, "ok", 50) + if err != nil { + t.Fatalf("Search returned error: %v", err) + } + + // Should have results from pods + deployments (services failed gracefully) + if len(results) != 2 { + t.Fatalf("expected 2 results (failed resource skipped), got %d: %+v", len(results), results) + } + + callsBefore := callCount.Load() + + // Second call: should NOT be served from cache because one resource errored. + ctx2 := newSearchContext(t, "test-cluster") + results2, err := handler.Search(ctx2, "ok", 50) + if err != nil { + t.Fatalf("second Search returned error: %v", err) + } + if len(results2) != 2 { + t.Fatalf("expected 2 results on retry, got %d", len(results2)) + } + + callsAfter := callCount.Load() + if callsAfter == callsBefore { + t.Fatal("second call was served from cache — error results should NOT be cached") + } +} + +// TestGlobalSearchCacheDoesNotTriggerBackgroundRefresh validates Solution E: +// a cache hit should NOT invoke Search again (no background goroutine). +func TestGlobalSearchCacheDoesNotTriggerBackgroundRefresh(t *testing.T) { + var searchCallCount atomic.Int32 + + oldSearchFuncs := resources.SearchFuncs + resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){ + "pods": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { //nolint:unparam // signature required by SearchFuncs + searchCallCount.Add(1) + return []common.SearchResult{{Name: "nginx", ResourceType: "pods"}}, nil + }, + } + t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs }) + + handler := NewSearchHandler() + + // First call: populates the cache + resp := performGlobalSearch(t, handler, "test-cluster", "/search?q=nginx&limit=50") + if resp.Total != 1 { + t.Fatalf("first call: expected 1 result, got %d", resp.Total) + } + + callsAfterFirst := searchCallCount.Load() + + // Second call: should serve from cache WITHOUT launching background search + resp = performGlobalSearch(t, handler, "test-cluster", "/search?q=nginx&limit=50") + if resp.Total != 1 { + t.Fatalf("second call: expected 1 result, got %d", resp.Total) + } + + // Give any hypothetical background goroutine time to execute + time.Sleep(100 * time.Millisecond) + + callsAfterSecond := searchCallCount.Load() + if callsAfterSecond != callsAfterFirst { + t.Fatalf("cache hit triggered %d extra Search calls (background refresh not removed)", + callsAfterSecond-callsAfterFirst) + } +} + +// TestSearchPanicDoesNotCacheResults verifies that when a search function panics, +// partial results are still returned but NOT cached (avoids serving stale incomplete data). +func TestSearchPanicDoesNotCacheResults(t *testing.T) { + oldSearchFuncs := resources.SearchFuncs + var callCount atomic.Int32 + + resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){ + "pods": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { //nolint:unparam // signature required by SearchFuncs + callCount.Add(1) + return []common.SearchResult{{Name: "ok-pod", ResourceType: "pods"}}, nil + }, + "services": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { + callCount.Add(1) + panic("simulated nil-pointer in service search") + }, + } + t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs }) + + handler := NewSearchHandler() + + // First call: one func panics → partial results returned, cache NOT written + ctx1 := newSearchContext(t, "test-cluster") + results, err := handler.Search(ctx1, "ok", 50) + if err != nil { + t.Fatalf("Search returned error: %v", err) + } + if len(results) != 1 || results[0].Name != "ok-pod" { + t.Fatalf("expected partial result [ok-pod], got %+v", results) + } + + callsBefore := callCount.Load() + + // Second call: should NOT be served from cache (cache was skipped due to panic). + // Both search funcs must be invoked again. + ctx2 := newSearchContext(t, "test-cluster") + results2, err := handler.Search(ctx2, "ok", 50) + if err != nil { + t.Fatalf("second Search returned error: %v", err) + } + if len(results2) != 1 { + t.Fatalf("expected 1 result on retry, got %d", len(results2)) + } + + callsAfter := callCount.Load() + if callsAfter == callsBefore { + t.Fatal("second call was served from cache — panic results should NOT be cached") + } +} + +// TestSearchEmptyResourceFuncs verifies Search handles zero searchable types gracefully. +func TestSearchEmptyResourceFuncs(t *testing.T) { + oldSearchFuncs := resources.SearchFuncs + resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){} + t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs }) + + handler := NewSearchHandler() + ctx := newSearchContext(t, "test-cluster") + + results, err := handler.Search(ctx, "anything", 50) + if err != nil { + t.Fatalf("Search returned error: %v", err) + } + if len(results) != 0 { + t.Fatalf("expected 0 results with no search funcs, got %d", len(results)) + } +}