Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/stretchr/testify v1.11.1
golang.org/x/crypto v0.48.0
golang.org/x/net v0.51.0
golang.org/x/sync v0.19.0
gorm.io/driver/mysql v1.6.0
gorm.io/driver/postgres v1.6.0
gorm.io/gorm v1.31.1
Expand Down Expand Up @@ -118,7 +119,6 @@ require (
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/arch v0.22.0 // indirect
golang.org/x/oauth2 v0.34.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/term v0.40.0 // indirect
golang.org/x/text v0.34.0 // indirect
Expand Down
63 changes: 50 additions & 13 deletions pkg/handlers/search_handler.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package handlers

import (
"context"
"fmt"
"log"
"net/http"
"sort"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/gin-gonic/gin"
Expand All @@ -14,6 +17,7 @@ import (
"github.com/zxh326/kite/pkg/handlers/resources"
"github.com/zxh326/kite/pkg/middleware"
"github.com/zxh326/kite/pkg/utils"
"golang.org/x/sync/errgroup"
)

type SearchHandler struct {
Expand Down Expand Up @@ -54,21 +58,55 @@ func (h *SearchHandler) createCacheKey(clusterName, query string, limit int) str
func (h *SearchHandler) Search(c *gin.Context, query string, limit int) ([]common.SearchResult, error) {
query = normalizeSearchQuery(query)
limit = normalizeSearchLimit(limit)
var allResults []common.SearchResult

// Search in different resource types
// Determine which resource types to search
searchFuncs := resources.SearchFuncs
guessSearchResources, q := utils.GuessSearchResources(query)

// Collect the search functions to execute
type searchEntry struct {
name string
fn func(*gin.Context, string, int64) ([]common.SearchResult, error)
}
var entries []searchEntry
for name, searchFunc := range searchFuncs {
if guessSearchResources == "all" || name == guessSearchResources {
results, err := searchFunc(c, q, int64(limit))
if err != nil {
continue
}
allResults = append(allResults, results...)
entries = append(entries, searchEntry{name: name, fn: searchFunc})
}
}

// Execute searches in parallel using errgroup
resultSlices := make([][]common.SearchResult, len(entries))
var hadFailure atomic.Bool // set on panic OR error — prevents caching incomplete results
g, _ := errgroup.WithContext(context.Background())

for i, entry := range entries {
g.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("search: resource %q panicked: %v", entry.name, r)
hadFailure.Store(true)
}
}()
results, searchErr := entry.fn(c, q, int64(limit))
if searchErr != nil {
log.Printf("search: resource %q failed: %v", entry.name, searchErr)
hadFailure.Store(true)
return nil
}
resultSlices[i] = results
return nil
})
}

_ = g.Wait() // all goroutines return nil, error is always nil

// Merge results from all resource types
var allResults []common.SearchResult
for _, slice := range resultSlices {
allResults = append(allResults, slice...)
}

queryLower := strings.ToLower(q)
sortResults(allResults, queryLower)

Expand All @@ -77,7 +115,11 @@ func (h *SearchHandler) Search(c *gin.Context, query string, limit int) ([]commo
allResults = allResults[:limit]
}

h.cache.Add(h.createCacheKey(getSearchClusterName(c), query, limit), allResults)
// Only cache results when no failure (panic or error) occurred — avoids
// caching incomplete results that would be served as valid 200 OK for the TTL.
if !hadFailure.Load() {
h.cache.Add(h.createCacheKey(getSearchClusterName(c), query, limit), allResults)
}
return allResults, nil
}

Expand All @@ -104,11 +146,6 @@ func (h *SearchHandler) GlobalSearch(c *gin.Context) {
Results: cachedResults,
Total: len(cachedResults),
}
copiedCtx := c.Copy()
go func() {
// Perform search in the background to update cache
_, _ = h.Search(copiedCtx, query, limit)
}()
c.JSON(http.StatusOK, response)
return
}
Expand Down
224 changes: 221 additions & 3 deletions pkg/handlers/search_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"

"github.com/gin-gonic/gin"
"github.com/zxh326/kite/pkg/common"
Expand Down Expand Up @@ -109,7 +111,7 @@ func TestGlobalSearchCacheKeyIncludesClusterAndLimit(t *testing.T) {

handler := NewSearchHandler()

ctx := newSearchContext(t, "cluster-a", "/search")
ctx := newSearchContext(t, "cluster-a")
if _, err := handler.Search(ctx, "po target", 1); err != nil {
t.Fatalf("Search returned error: %v", err)
}
Expand All @@ -128,13 +130,13 @@ func TestGlobalSearchCacheKeyIncludesClusterAndLimit(t *testing.T) {
}
}

func newSearchContext(t *testing.T, clusterName, target string) *gin.Context {
func newSearchContext(t *testing.T, clusterName string) *gin.Context {
t.Helper()

gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
ctx, _ := gin.CreateTestContext(rec)
ctx.Request = httptest.NewRequest(http.MethodGet, target, nil)
ctx.Request = httptest.NewRequest(http.MethodGet, "/search", nil)
if clusterName != "" {
ctx.Set(middleware.ClusterNameKey, clusterName)
}
Expand Down Expand Up @@ -163,3 +165,219 @@ func performGlobalSearch(t *testing.T, handler *SearchHandler, clusterName, targ
}
return resp
}

// TestSearchParallelExecution verifies that multiple resource searches run concurrently.
func TestSearchParallelExecution(t *testing.T) {
oldSearchFuncs := resources.SearchFuncs

// Track concurrent execution: each func sleeps and records max concurrency.
var running atomic.Int32
var maxConcurrent atomic.Int32

slowSearch := func(results []common.SearchResult) func(*gin.Context, string, int64) ([]common.SearchResult, error) {
return func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) {
cur := running.Add(1)
// Update max concurrency seen
for {
old := maxConcurrent.Load()
if cur <= old || maxConcurrent.CompareAndSwap(old, cur) {
break
}
}
time.Sleep(50 * time.Millisecond)
running.Add(-1)
return results, nil
}
}

resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){
"pods": slowSearch([]common.SearchResult{{Name: "nginx", ResourceType: "pods"}}),
"services": slowSearch([]common.SearchResult{{Name: "nginx-svc", ResourceType: "services"}}),
"deployments": slowSearch([]common.SearchResult{{Name: "nginx-deploy", ResourceType: "deployments"}}),
}
t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs })

handler := NewSearchHandler()
ctx := newSearchContext(t, "test-cluster")

start := time.Now()
results, err := handler.Search(ctx, "nginx", 50)
elapsed := time.Since(start)

if err != nil {
t.Fatalf("Search returned error: %v", err)
}

// With 3 funcs sleeping 50ms each, sequential would take >= 150ms.
// Parallel should complete in ~50-80ms. Allow generous margin.
if elapsed >= 140*time.Millisecond {
t.Errorf("Search took %v, expected < 140ms for parallel execution", elapsed)
}

if maxConcurrent.Load() < 2 {
t.Errorf("maxConcurrent = %d, want >= 2 (proves parallelism)", maxConcurrent.Load())
}

if len(results) != 3 {
t.Fatalf("expected 3 results, got %d", len(results))
}
}

// TestSearchPartialFailure ensures that one failing resource type doesn't break others
// and that partial results due to errors are NOT cached.
func TestSearchPartialFailure(t *testing.T) {
oldSearchFuncs := resources.SearchFuncs
var callCount atomic.Int32
resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){
"pods": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { //nolint:unparam // signature required by SearchFuncs
callCount.Add(1)
return []common.SearchResult{{Name: "ok-pod", ResourceType: "pods"}}, nil
},
"services": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { //nolint:unparam // signature required by SearchFuncs
callCount.Add(1)
return nil, fmt.Errorf("simulated API server error")
},
"deployments": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { //nolint:unparam // signature required by SearchFuncs
callCount.Add(1)
return []common.SearchResult{{Name: "ok-deploy", ResourceType: "deployments"}}, nil
},
}
t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs })

handler := NewSearchHandler()
ctx := newSearchContext(t, "test-cluster")

results, err := handler.Search(ctx, "ok", 50)
if err != nil {
t.Fatalf("Search returned error: %v", err)
}

// Should have results from pods + deployments (services failed gracefully)
if len(results) != 2 {
t.Fatalf("expected 2 results (failed resource skipped), got %d: %+v", len(results), results)
}

callsBefore := callCount.Load()

// Second call: should NOT be served from cache because one resource errored.
ctx2 := newSearchContext(t, "test-cluster")
results2, err := handler.Search(ctx2, "ok", 50)
if err != nil {
t.Fatalf("second Search returned error: %v", err)
}
if len(results2) != 2 {
t.Fatalf("expected 2 results on retry, got %d", len(results2))
}

callsAfter := callCount.Load()
if callsAfter == callsBefore {
t.Fatal("second call was served from cache — error results should NOT be cached")
}
}

// TestGlobalSearchCacheDoesNotTriggerBackgroundRefresh validates Solution E:
// a cache hit should NOT invoke Search again (no background goroutine).
func TestGlobalSearchCacheDoesNotTriggerBackgroundRefresh(t *testing.T) {
var searchCallCount atomic.Int32

oldSearchFuncs := resources.SearchFuncs
resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){
"pods": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { //nolint:unparam // signature required by SearchFuncs
searchCallCount.Add(1)
return []common.SearchResult{{Name: "nginx", ResourceType: "pods"}}, nil
},
}
t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs })

handler := NewSearchHandler()

// First call: populates the cache
resp := performGlobalSearch(t, handler, "test-cluster", "/search?q=nginx&limit=50")
if resp.Total != 1 {
t.Fatalf("first call: expected 1 result, got %d", resp.Total)
}

callsAfterFirst := searchCallCount.Load()

// Second call: should serve from cache WITHOUT launching background search
resp = performGlobalSearch(t, handler, "test-cluster", "/search?q=nginx&limit=50")
if resp.Total != 1 {
t.Fatalf("second call: expected 1 result, got %d", resp.Total)
}

// Give any hypothetical background goroutine time to execute
time.Sleep(100 * time.Millisecond)

callsAfterSecond := searchCallCount.Load()
if callsAfterSecond != callsAfterFirst {
t.Fatalf("cache hit triggered %d extra Search calls (background refresh not removed)",
callsAfterSecond-callsAfterFirst)
}
}

// TestSearchPanicDoesNotCacheResults verifies that when a search function panics,
// partial results are still returned but NOT cached (avoids serving stale incomplete data).
func TestSearchPanicDoesNotCacheResults(t *testing.T) {
oldSearchFuncs := resources.SearchFuncs
var callCount atomic.Int32

resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){
"pods": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) { //nolint:unparam // signature required by SearchFuncs
callCount.Add(1)
return []common.SearchResult{{Name: "ok-pod", ResourceType: "pods"}}, nil
},
"services": func(_ *gin.Context, _ string, _ int64) ([]common.SearchResult, error) {
callCount.Add(1)
panic("simulated nil-pointer in service search")
},
}
t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs })

handler := NewSearchHandler()

// First call: one func panics → partial results returned, cache NOT written
ctx1 := newSearchContext(t, "test-cluster")
results, err := handler.Search(ctx1, "ok", 50)
if err != nil {
t.Fatalf("Search returned error: %v", err)
}
if len(results) != 1 || results[0].Name != "ok-pod" {
t.Fatalf("expected partial result [ok-pod], got %+v", results)
}

callsBefore := callCount.Load()

// Second call: should NOT be served from cache (cache was skipped due to panic).
// Both search funcs must be invoked again.
ctx2 := newSearchContext(t, "test-cluster")
results2, err := handler.Search(ctx2, "ok", 50)
if err != nil {
t.Fatalf("second Search returned error: %v", err)
}
if len(results2) != 1 {
t.Fatalf("expected 1 result on retry, got %d", len(results2))
}

callsAfter := callCount.Load()
if callsAfter == callsBefore {
t.Fatal("second call was served from cache — panic results should NOT be cached")
}
}

// TestSearchEmptyResourceFuncs verifies Search handles zero searchable types gracefully.
func TestSearchEmptyResourceFuncs(t *testing.T) {
oldSearchFuncs := resources.SearchFuncs
resources.SearchFuncs = map[string]func(*gin.Context, string, int64) ([]common.SearchResult, error){}
t.Cleanup(func() { resources.SearchFuncs = oldSearchFuncs })

handler := NewSearchHandler()
ctx := newSearchContext(t, "test-cluster")

results, err := handler.Search(ctx, "anything", 50)
if err != nil {
t.Fatalf("Search returned error: %v", err)
}
if len(results) != 0 {
t.Fatalf("expected 0 results with no search funcs, got %d", len(results))
}
}
Loading