From 4852d8e3dcf065b1b3ce57b4ef9031d17ee8da8d Mon Sep 17 00:00:00 2001 From: Alex Zhang Date: Sun, 23 Oct 2022 10:47:12 +0800 Subject: [PATCH] improve the spawn feature 1. boomer receive the spawn message from locust, then calculate the concurrency difference among current step and previous step, then increase reqeust goroutines or decrease request goroutines which is depended on the sign of difference value 2. adjust the unit test with new spawn codes --- runner.go | 55 ++++++++++++++++++++++++++++++++++++-------------- runner_test.go | 29 +++++++++++++++++++++++--- 2 files changed, 66 insertions(+), 18 deletions(-) diff --git a/runner.go b/runner.go index 28ccf20..36b855e 100644 --- a/runner.go +++ b/runner.go @@ -121,7 +121,10 @@ func (r *runner) outputOnStop() { } func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) { - log.Println("Spawning", spawnCount, "clients immediately") + for i := 0; i > spawnCount; i-- { + r.stopChan <- true + atomic.AddInt32(&r.numClients, -1) + } for i := 1; i <= spawnCount; i++ { select { @@ -158,6 +161,7 @@ func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc if spawnCompleteFunc != nil { spawnCompleteFunc() + log.Printf("Spawned %d clients completely, the spawn count is: %d\n", r.numClients, spawnCount) } } @@ -201,23 +205,37 @@ func (r *runner) getTask() *Task { return nil } -func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) { - Events.Publish(EVENT_SPAWN, spawnCount, spawnRate) - +func (r *runner) initSpawning() { r.stopChan = make(chan bool) - r.numClients = 0 + // prevent receiving spawn message from master when boomer is handling stopping, + // that might happen when the numClients is too large and might take a while for stopping all request goroutines + for { + if atomic.LoadInt32(&r.numClients) == 0 { + break + } else { + time.Sleep(1 * time.Millisecond) + } + } + r.userClassesCountFromMaster = make(map[string]int64) +} +func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) { + Events.Publish(EVENT_SPAWN, spawnCount, spawnRate) go r.spawnWorkers(spawnCount, r.stopChan, spawnCompleteFunc) } func (r *runner) stop() { + // stop previous goroutines without blocking + // those goroutines will exit when r.safeRun returns + numClients := int(atomic.LoadInt32(&r.numClients)) + for i := 0; i < numClients; i++ { + r.stopChan <- true + atomic.AddInt32(&r.numClients, -1) + } + // publish the boomer stop event // user's code can subscribe to this event and do thins like cleaning up Events.Publish(EVENT_STOP) - - // stop previous goroutines without blocking - // those goroutines will exit when r.safeRun returns - close(r.stopChan) } type localRunner struct { @@ -351,8 +369,14 @@ func (r *slaveRunner) sumUsersAmount(msg *genericMessage) int { userClassesCountMap := userClassesCount.(map[interface{}]interface{}) // Save the original field and send it back to master in spawnComplete message. - r.userClassesCountFromMaster = make(map[string]int64) - amount := 0 + tmpClassesCountFromMaster := make(map[string]int64) + oldAmount := 0 + newAmount := 0 + + for _, num := range r.userClassesCountFromMaster { + oldAmount = oldAmount + int(num) + } + for class, num := range userClassesCountMap { c, ok := class.(string) n, ok2 := castToInt64(num) @@ -360,10 +384,11 @@ func (r *slaveRunner) sumUsersAmount(msg *genericMessage) int { log.Printf("user_classes_count in spawn message can't be casted to map[string]int64, current type is map[%T]%T, ignored!\n", class, num) continue } - r.userClassesCountFromMaster[c] = n - amount = amount + int(n) + tmpClassesCountFromMaster[c] = n + newAmount = newAmount + int(n) } - return amount + r.userClassesCountFromMaster = tmpClassesCountFromMaster + return newAmount - oldAmount } // TODO: Since locust 2.0, spawn rate and user count are both handled by master. @@ -440,6 +465,7 @@ func (r *slaveRunner) onMessage(msgInterface message) { case "spawn": r.state = stateSpawning r.stats.clearStatsChan <- true + r.initSpawning() r.onSpawnMessage(genericMsg) case "quit": Events.Publish(EVENT_QUIT) @@ -452,7 +478,6 @@ func (r *slaveRunner) onMessage(msgInterface message) { switch msgType { case "spawn": r.state = stateSpawning - r.stop() r.onSpawnMessage(genericMsg) case "stop": r.stop() diff --git a/runner_test.go b/runner_test.go index 0e0649d..d1e6af0 100644 --- a/runner_test.go +++ b/runner_test.go @@ -218,7 +218,7 @@ func TestSpawnAndStop(t *testing.T) { runner.state = stateSpawning runner.client = newClient("localhost", 5557, runner.nodeID) defer runner.shutdown() - + runner.initSpawning() runner.startSpawning(10, float64(10), runner.spawnComplete) // wait for spawning goroutines time.Sleep(2 * time.Second) @@ -234,6 +234,7 @@ func TestSpawnAndStop(t *testing.T) { msg = <-runner.client.sendChannel() m = msg.(*genericMessage) assert.Equal(t, "quit", m.Type) + assert.Equal(t, runner.numClients, int32(0)) } func TestStop(t *testing.T) { @@ -244,8 +245,7 @@ func TestStop(t *testing.T) { } runner := newSlaveRunner("localhost", 5557, []*Task{taskA}, nil) - runner.stopChan = make(chan bool) - + runner.initSpawning() stopped := false handler := func() { stopped = true @@ -414,6 +414,7 @@ func TestOnMessage(t *testing.T) { // stop all the workers runner.onMessage(newGenericMessage("stop", nil, runner.nodeID)) assert.Equal(t, stateInit, runner.state) + assert.Equal(t, runner.numClients, int32(0)) msg = <-runner.client.sendChannel() m = msg.(*genericMessage) @@ -444,9 +445,31 @@ func TestOnMessage(t *testing.T) { m = msg.(*genericMessage) assert.Equal(t, "spawning_complete", m.Type) + // decrease goroutines while running + runner.onMessage(newGenericMessage("spawn", map[string]interface{}{ + "user_classes_count": map[interface{}]interface{}{ + "Dummy": int64(3), + "Dummy2": int64(2), + }, + }, runner.nodeID)) + + msg = <-runner.client.sendChannel() + m = msg.(*genericMessage) + assert.Equal(t, "spawning", m.Type) + + // spawn complete and running + time.Sleep(2 * time.Second) + assert.Equal(t, stateRunning, runner.state) + assert.Equal(t, int32(5), runner.numClients) + + msg = <-runner.client.sendChannel() + m = msg.(*genericMessage) + assert.Equal(t, "spawning_complete", m.Type) + // stop all the workers runner.onMessage(newGenericMessage("stop", nil, runner.nodeID)) assert.Equal(t, stateInit, runner.state) + assert.Equal(t, runner.numClients, int32(0)) msg = <-runner.client.sendChannel() m = msg.(*genericMessage)