diff --git a/runner.go b/runner.go index 28ccf20..36b855e 100644 --- a/runner.go +++ b/runner.go @@ -121,7 +121,10 @@ func (r *runner) outputOnStop() { } func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) { - log.Println("Spawning", spawnCount, "clients immediately") + for i := 0; i > spawnCount; i-- { + r.stopChan <- true + atomic.AddInt32(&r.numClients, -1) + } for i := 1; i <= spawnCount; i++ { select { @@ -158,6 +161,7 @@ func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc if spawnCompleteFunc != nil { spawnCompleteFunc() + log.Printf("Spawned %d clients completely, the spawn count is: %d\n", r.numClients, spawnCount) } } @@ -201,23 +205,37 @@ func (r *runner) getTask() *Task { return nil } -func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) { - Events.Publish(EVENT_SPAWN, spawnCount, spawnRate) - +func (r *runner) initSpawning() { r.stopChan = make(chan bool) - r.numClients = 0 + // prevent receiving spawn message from master when boomer is handling stopping, + // that might happen when the numClients is too large and might take a while for stopping all request goroutines + for { + if atomic.LoadInt32(&r.numClients) == 0 { + break + } else { + time.Sleep(1 * time.Millisecond) + } + } + r.userClassesCountFromMaster = make(map[string]int64) +} +func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) { + Events.Publish(EVENT_SPAWN, spawnCount, spawnRate) go r.spawnWorkers(spawnCount, r.stopChan, spawnCompleteFunc) } func (r *runner) stop() { + // stop previous goroutines without blocking + // those goroutines will exit when r.safeRun returns + numClients := int(atomic.LoadInt32(&r.numClients)) + for i := 0; i < numClients; i++ { + r.stopChan <- true + atomic.AddInt32(&r.numClients, -1) + } + // publish the boomer stop event // user's code can subscribe to this event and do thins like cleaning up Events.Publish(EVENT_STOP) - - // stop previous goroutines without blocking - // those goroutines will exit when r.safeRun returns - close(r.stopChan) } type localRunner struct { @@ -351,8 +369,14 @@ func (r *slaveRunner) sumUsersAmount(msg *genericMessage) int { userClassesCountMap := userClassesCount.(map[interface{}]interface{}) // Save the original field and send it back to master in spawnComplete message. - r.userClassesCountFromMaster = make(map[string]int64) - amount := 0 + tmpClassesCountFromMaster := make(map[string]int64) + oldAmount := 0 + newAmount := 0 + + for _, num := range r.userClassesCountFromMaster { + oldAmount = oldAmount + int(num) + } + for class, num := range userClassesCountMap { c, ok := class.(string) n, ok2 := castToInt64(num) @@ -360,10 +384,11 @@ func (r *slaveRunner) sumUsersAmount(msg *genericMessage) int { log.Printf("user_classes_count in spawn message can't be casted to map[string]int64, current type is map[%T]%T, ignored!\n", class, num) continue } - r.userClassesCountFromMaster[c] = n - amount = amount + int(n) + tmpClassesCountFromMaster[c] = n + newAmount = newAmount + int(n) } - return amount + r.userClassesCountFromMaster = tmpClassesCountFromMaster + return newAmount - oldAmount } // TODO: Since locust 2.0, spawn rate and user count are both handled by master. @@ -440,6 +465,7 @@ func (r *slaveRunner) onMessage(msgInterface message) { case "spawn": r.state = stateSpawning r.stats.clearStatsChan <- true + r.initSpawning() r.onSpawnMessage(genericMsg) case "quit": Events.Publish(EVENT_QUIT) @@ -452,7 +478,6 @@ func (r *slaveRunner) onMessage(msgInterface message) { switch msgType { case "spawn": r.state = stateSpawning - r.stop() r.onSpawnMessage(genericMsg) case "stop": r.stop() diff --git a/runner_test.go b/runner_test.go index 0e0649d..d1e6af0 100644 --- a/runner_test.go +++ b/runner_test.go @@ -218,7 +218,7 @@ func TestSpawnAndStop(t *testing.T) { runner.state = stateSpawning runner.client = newClient("localhost", 5557, runner.nodeID) defer runner.shutdown() - + runner.initSpawning() runner.startSpawning(10, float64(10), runner.spawnComplete) // wait for spawning goroutines time.Sleep(2 * time.Second) @@ -234,6 +234,7 @@ func TestSpawnAndStop(t *testing.T) { msg = <-runner.client.sendChannel() m = msg.(*genericMessage) assert.Equal(t, "quit", m.Type) + assert.Equal(t, runner.numClients, int32(0)) } func TestStop(t *testing.T) { @@ -244,8 +245,7 @@ func TestStop(t *testing.T) { } runner := newSlaveRunner("localhost", 5557, []*Task{taskA}, nil) - runner.stopChan = make(chan bool) - + runner.initSpawning() stopped := false handler := func() { stopped = true @@ -414,6 +414,7 @@ func TestOnMessage(t *testing.T) { // stop all the workers runner.onMessage(newGenericMessage("stop", nil, runner.nodeID)) assert.Equal(t, stateInit, runner.state) + assert.Equal(t, runner.numClients, int32(0)) msg = <-runner.client.sendChannel() m = msg.(*genericMessage) @@ -444,9 +445,31 @@ func TestOnMessage(t *testing.T) { m = msg.(*genericMessage) assert.Equal(t, "spawning_complete", m.Type) + // decrease goroutines while running + runner.onMessage(newGenericMessage("spawn", map[string]interface{}{ + "user_classes_count": map[interface{}]interface{}{ + "Dummy": int64(3), + "Dummy2": int64(2), + }, + }, runner.nodeID)) + + msg = <-runner.client.sendChannel() + m = msg.(*genericMessage) + assert.Equal(t, "spawning", m.Type) + + // spawn complete and running + time.Sleep(2 * time.Second) + assert.Equal(t, stateRunning, runner.state) + assert.Equal(t, int32(5), runner.numClients) + + msg = <-runner.client.sendChannel() + m = msg.(*genericMessage) + assert.Equal(t, "spawning_complete", m.Type) + // stop all the workers runner.onMessage(newGenericMessage("stop", nil, runner.nodeID)) assert.Equal(t, stateInit, runner.state) + assert.Equal(t, runner.numClients, int32(0)) msg = <-runner.client.sendChannel() m = msg.(*genericMessage)