diff --git a/motionplan/armplanning/node.go b/motionplan/armplanning/node.go index c14b6bf82b9..69a06b4d200 100644 --- a/motionplan/armplanning/node.go +++ b/motionplan/armplanning/node.go @@ -118,6 +118,7 @@ type solutionSolvingState struct { processCalls int failures *IkConstraintError + solutionScoreMu sync.Mutex solutions []*node startTime time.Time bestScore float64 @@ -218,6 +219,10 @@ func (sss *solutionSolvingState) nonchainMinimize(seed, step referenceframe.Fram // return bool is if we should stop because we're done. func (sss *solutionSolvingState) process(ctx context.Context, stepSolution *ik.Solution, ) bool { + if ctx.Err() != nil { + return true + } + ctx, span := trace.StartSpan(ctx, "process") defer span.End() sss.processCalls++ @@ -254,6 +259,8 @@ func (sss *solutionSolvingState) process(ctx context.Context, stepSolution *ik.S return false } + sss.solutionScoreMu.Lock() + defer sss.solutionScoreMu.Unlock() for _, oldSol := range sss.solutions { similarity := &motionplan.SegmentFS{ StartConfiguration: oldSol.inputs, @@ -275,7 +282,16 @@ func (sss *solutionSolvingState) process(ctx context.Context, stepSolution *ik.S if myNode.cost < sss.goodCost || // this checks the absolute score of the plan // if we've got something sane, and it's really good, let's check myNode.cost < (sss.bestScore*defaultOptimalityMultiple) { + if ctx.Err() != nil { + // Check here before the expensive `checkPath` call. We put this check inside the lock + // to avoid playing games with the deferred unlock. + return true + } + + sss.solutionScoreMu.Unlock() whyNot := sss.psc.checkPath(ctx, sss.psc.start, step) + sss.solutionScoreMu.Lock() + sss.psc.pc.logger.Debugf("got score %0.4f and goodCost: %0.2f - result: %v", myNode.cost, sss.goodCost, whyNot) if whyNot == nil { myNode.checkPath = true @@ -372,20 +388,30 @@ func getSolutions(ctx context.Context, psc *planSegmentContext) ([]*node, error) } }) -solutionLoop: - for { - select { - case <-ctx.Done(): - // We've been canceled. So have our workers. Can just return. - return nil, ctx.Err() - case stepSolution, ok := <-solutionGen: - if !ok || solvingState.process(ctx, stepSolution) { - // No longer using the generated solutions. Cancel the workers. - cancel() - break solutionLoop + processCtx, processForkJoinSpan := trace.StartSpan(ctx, "processForkJoin") + wg := sync.WaitGroup{} + for processCPUs := 0; processCPUs < 8; processCPUs++ { + wg.Add(1) + go func() { + defer wg.Done() + + for { + select { + case <-processCtx.Done(): + // We've been canceled. So have our workers. Can just return. + return + case stepSolution, ok := <-solutionGen: + if !ok || solvingState.process(processCtx, stepSolution) { + // No longer using the generated solutions. Cancel the workers. + cancel() + return + } + } } - } + }() } + wg.Wait() + processForkJoinSpan.End() solveErrorLock.Lock() defer solveErrorLock.Unlock() @@ -393,6 +419,8 @@ solutionLoop: return nil, fmt.Errorf("solver had an error: %w", solveError) } + solvingState.solutionScoreMu.Lock() + defer solvingState.solutionScoreMu.Unlock() if len(solvingState.solutions) == 0 { // We have failed to produce a usable IK solution. Let the user know if zero IK solutions // were produced, or if non-zero solutions were produced, which constraints were violated.