66 "io"
77 "strings"
88 "sync"
9+
10+ "k8s.io/apimachinery/pkg/util/sets"
911)
1012
1113// parallelByFileTestQueue runs tests in parallel unless they have
@@ -16,7 +18,180 @@ type parallelByFileTestQueue struct {
1618 commandContext * commandContext
1719}
1820
19- type TestFunc func (ctx context.Context , test * testCase )
21+ // getTestConflictGroup returns the conflict group for a test.
22+ // Conflicts are only checked within the same conflict group.
23+ // Conflict group is a concept designed to support future functionality
24+ // like mode defined in Isolation. As of now, all tests belong to the
25+ // default group and behave like the "exec" mode.
26+ func getTestConflictGroup (test * testCase ) string {
27+ return "default"
28+ }
29+
30+ // TestScheduler defines the interface for test scheduling
31+ // Different implementations can provide various scheduling strategies
32+ type TestScheduler interface {
33+ // GetNextTestToRun blocks until a test is available, then returns it.
34+ // Returns nil when all tests have been distributed (queue is empty) or context is cancelled.
35+ // When a test is returned, it is atomically removed from queue and marked as running.
36+ // This method can be safely called from multiple goroutines concurrently.
37+ GetNextTestToRun (ctx context.Context ) * testCase
38+
39+ // MarkTestComplete marks a test as complete, cleaning up its conflicts and taints.
40+ // This may unblock other tests that were waiting.
41+ // This method can be safely called from multiple goroutines concurrently.
42+ MarkTestComplete (test * testCase )
43+ }
44+
45+ // testScheduler manages test scheduling based on conflicts, taints, and tolerations
46+ // It maintains an ordered queue of tests and provides thread-safe scheduling operations
47+ type testScheduler struct {
48+ mu sync.Mutex
49+ cond * sync.Cond // condition variable to signal when tests complete
50+ tests []* testCase // ordered queue of tests to execute
51+ runningConflicts map [string ]sets.Set [string ] // tracks which conflicts are running per group: group -> set of conflicts
52+ activeTaints map [string ]int // tracks how many tests are currently applying each taint
53+ }
54+
55+ // newTestScheduler creates a test scheduler. Potentially this can order the
56+ // tests in any order and schedule tests based on resulted order.
57+ func newTestScheduler (tests []* testCase ) TestScheduler {
58+ ts := & testScheduler {
59+ tests : tests ,
60+ runningConflicts : make (map [string ]sets.Set [string ]),
61+ activeTaints : make (map [string ]int ),
62+ }
63+ ts .cond = sync .NewCond (& ts .mu )
64+ return ts
65+ }
66+
67+ // GetNextTestToRun blocks until a test is available to run, or returns nil if all tests have been distributed
68+ // or the context is cancelled. It continuously scans the queue and waits for state changes when no tests are runnable.
69+ // When a test is returned, it is atomically removed from queue and marked as running.
70+ func (ts * testScheduler ) GetNextTestToRun (ctx context.Context ) * testCase {
71+ ts .mu .Lock ()
72+ defer ts .mu .Unlock ()
73+
74+ for {
75+ // Check if context is cancelled
76+ if ctx .Err () != nil {
77+ return nil
78+ }
79+
80+ // Check if all tests have been distributed
81+ if len (ts .tests ) == 0 {
82+ return nil
83+ }
84+
85+ // Scan from beginning to find first runnable test
86+ for i , test := range ts .tests {
87+ conflictGroup := getTestConflictGroup (test )
88+
89+ // Ensure the conflict group set exists
90+ if ts .runningConflicts [conflictGroup ] == nil {
91+ ts .runningConflicts [conflictGroup ] = sets .New [string ]()
92+ }
93+
94+ // Check if any of the test's conflicts are currently running within its group
95+ hasConflict := false
96+ if test .spec != nil {
97+ for _ , conflict := range test .spec .Resources .Isolation .Conflict {
98+ if ts .runningConflicts [conflictGroup ].Has (conflict ) {
99+ hasConflict = true
100+ break
101+ }
102+ }
103+ }
104+
105+ // Check if test can tolerate all currently active taints
106+ canTolerate := ts .canTolerateTaints (test )
107+
108+ if ! hasConflict && canTolerate {
109+ // Found a runnable test - ATOMICALLY:
110+ // 1. Mark conflicts as running
111+ if test .spec != nil {
112+ for _ , conflict := range test .spec .Resources .Isolation .Conflict {
113+ ts .runningConflicts [conflictGroup ].Insert (conflict )
114+ }
115+
116+ // 2. Activate taints
117+ for _ , taint := range test .spec .Resources .Isolation .Taint {
118+ ts .activeTaints [taint ]++
119+ }
120+ }
121+
122+ // 3. Remove test from queue
123+ ts .tests = append (ts .tests [:i ], ts .tests [i + 1 :]... )
124+
125+ // 4. Return the test (now safe to run)
126+ return test
127+ }
128+ }
129+
130+ // No runnable test found, but tests still exist in queue - wait for state change
131+ ts .cond .Wait ()
132+ }
133+ }
134+
135+ // canTolerateTaints checks if a test can tolerate all currently active taints
136+ func (ts * testScheduler ) canTolerateTaints (test * testCase ) bool {
137+ // If test has no spec, it has no toleration requirements (can run with any taints)
138+ if test .spec == nil {
139+ return len (ts .activeTaints ) == 0 // Can only run if no taints are active
140+ }
141+
142+ // Check if test tolerates all active taints
143+ for taint , count := range ts .activeTaints {
144+ // Skip taints with zero count (should be cleaned up but being defensive)
145+ if count <= 0 {
146+ continue
147+ }
148+
149+ tolerated := false
150+ for _ , toleration := range test .spec .Resources .Isolation .Toleration {
151+ if toleration == taint {
152+ tolerated = true
153+ break
154+ }
155+ }
156+ if ! tolerated {
157+ return false // Test cannot tolerate this active taint
158+ }
159+ }
160+ return true
161+ }
162+
163+ // MarkTestComplete marks all conflicts and taints of a test as no longer running/active
164+ // and signals waiting workers that blocked tests may now be runnable
165+ // This should be called after a test completes execution
166+ func (ts * testScheduler ) MarkTestComplete (test * testCase ) {
167+ ts .mu .Lock ()
168+ defer ts .mu .Unlock ()
169+
170+ // If test has no spec, there's nothing to clean up
171+ if test .spec != nil {
172+ // Get the conflict group for this test
173+ conflictGroup := getTestConflictGroup (test )
174+
175+ // Clean up conflicts within this group
176+ if groupConflicts , exists := ts .runningConflicts [conflictGroup ]; exists {
177+ for _ , conflict := range test .spec .Resources .Isolation .Conflict {
178+ groupConflicts .Delete (conflict )
179+ }
180+ }
181+
182+ // Clean up taints with reference counting
183+ for _ , taint := range test .spec .Resources .Isolation .Taint {
184+ ts .activeTaints [taint ]--
185+ if ts .activeTaints [taint ] <= 0 {
186+ delete (ts .activeTaints , taint )
187+ }
188+ }
189+ }
190+
191+ // Signal waiting workers that the state has changed
192+ // Some blocked tests might now be runnable
193+ ts .cond .Broadcast ()
194+ }
20195
21196func newParallelTestQueue (commandContext * commandContext ) * parallelByFileTestQueue {
22197 return & parallelByFileTestQueue {
@@ -54,35 +229,24 @@ func abortOnFailure(parentContext context.Context) (testAbortFunc, context.Conte
54229 }, testCtx
55230}
56231
57- // queueAllTests writes all the tests to the channel and closes it when all are finished
58- // even with buffering, this can take a while since we don't infinitely buffer.
59- func queueAllTests (remainingParallelTests chan * testCase , tests []* testCase ) {
60- for i := range tests {
61- curr := tests [i ]
62- remainingParallelTests <- curr
63- }
64-
65- close (remainingParallelTests )
66- }
67-
68- // runTestsUntilChannelEmpty reads from the channel to consume tests, run them, and return when the channel is closed.
69- func runTestsUntilChannelEmpty (ctx context.Context , remainingParallelTests chan * testCase , testSuiteRunner testSuiteRunner ) {
232+ // runTestsUntilDone continuously gets tests from the scheduler, runs them, and marks them complete.
233+ // GetNextTestToRun() blocks internally when no tests are runnable and returns nil when all tests are distributed
234+ // or context is cancelled. Returns when there are no more tests to take from the queue or context is cancelled.
235+ func runTestsUntilDone (ctx context.Context , scheduler TestScheduler , testSuiteRunner testSuiteRunner ) {
70236 for {
71- select {
72- // if the context is finished, simply return
73- case <- ctx .Done ():
74- return
237+ // Get next test - this blocks until a test is available, queue is empty, or context is cancelled
238+ test := scheduler .GetNextTestToRun (ctx )
75239
76- case test , ok := <- remainingParallelTests :
77- if ! ok { // channel closed, then we're done
78- return
79- }
80- // if the context is finished, simply return
81- if ctx .Err () != nil {
82- return
83- }
84- testSuiteRunner .RunOneTest (ctx , test )
240+ if test == nil {
241+ // No more tests to take from queue or context cancelled
242+ return
85243 }
244+
245+ // Run the test
246+ testSuiteRunner .RunOneTest (ctx , test )
247+
248+ // Mark test as complete (clean up conflicts/taints and signal waiting workers)
249+ scheduler .MarkTestComplete (test )
86250 }
87251}
88252
@@ -105,21 +269,30 @@ func execute(ctx context.Context, testSuiteRunner testSuiteRunner, tests []*test
105269 return
106270 }
107271
272+ // Split tests into two categories: serial and parallel (including isolated)
108273 serial , parallel := splitTests (tests , isSerialTest )
109274
110- remainingParallelTests := make (chan * testCase , 100 )
111- go queueAllTests (remainingParallelTests , parallel )
275+ if len (parallel ) > 0 {
276+ // Create test scheduler with all parallel tests
277+ // TestScheduler encapsulates the queue and scheduling logic
278+ var scheduler TestScheduler = newTestScheduler (parallel )
279+
280+ var wg sync.WaitGroup
281+
282+ // Run all non-serial tests with conflict-aware workers
283+ // Each worker polls the scheduler for the next runnable test in order
284+ for i := 0 ; i < parallelism ; i ++ {
285+ wg .Add (1 )
286+ go func (ctx context.Context ) {
287+ defer wg .Done ()
288+ runTestsUntilDone (ctx , scheduler , testSuiteRunner )
289+ }(ctx )
290+ }
112291
113- var wg sync.WaitGroup
114- for i := 0 ; i < parallelism ; i ++ {
115- wg .Add (1 )
116- go func (ctx context.Context ) {
117- defer wg .Done ()
118- runTestsUntilChannelEmpty (ctx , remainingParallelTests , testSuiteRunner )
119- }(ctx )
292+ wg .Wait ()
120293 }
121- wg .Wait ()
122294
295+ // Run serial tests sequentially at the end
123296 for _ , test := range serial {
124297 if ctx .Err () != nil {
125298 return
0 commit comments