Skip to content

Commit 98df6af

Browse files
authored
Merge pull request #16 from runreveal/alan/await-fixes
await: fix issues with await when in 'job-tree' mode
2 parents 22e93e7 + 1479867 commit 98df6af

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

await/await.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,23 +104,26 @@ func (r *runner) Run(ctx context.Context) error {
104104

105105
errc := make(chan error, len(r.funcs))
106106
// this cancel func cancels all subroutines
107-
ctx, cancel := context.WithCancelCause(ctx)
107+
subctx, cancel := context.WithCancelCause(ctx)
108108

109109
var waitCount int32
110110

111111
for i, f := range r.funcs {
112112
atomic.AddInt32(&waitCount, 1)
113113
go func(fn func(context.Context) error, idx int) {
114-
err := fn(ctx)
114+
err := fn(subctx)
115115
if err != nil && !errors.Is(err, context.Canceled) {
116116
if r.funcNames[idx] != "" {
117117
slog.Info(fmt.Sprintf("subroutine %s error: %+v", r.funcNames[idx], err))
118118
} else {
119119
slog.Info(fmt.Sprintf("subroutine error: %+v", err))
120120
}
121121
}
122-
errc <- err
122+
// Order matters for the following two statements.
123+
// We must decrement before writing to the channel so that waitCount is
124+
// accurate when we read the remaining waitCount below after reading errC.
123125
atomic.AddInt32(&waitCount, -1)
126+
errc <- err
124127
}(f, i)
125128
}
126129

@@ -140,19 +143,19 @@ loop:
140143
select {
141144
case sig := <-sigc:
142145
slog.Error("stopping on signal", "signal", sig)
143-
case <-ctx.Done():
144-
err = ctx.Err()
146+
case <-subctx.Done():
147+
err = subctx.Err()
145148
if !errors.Is(err, context.Canceled) {
146149
slog.Error("error on context done", "err", err)
147150
}
148151
case err = <-errc:
149152
if err != nil {
150153
slog.Warn("await: stopping on error returned", "err", err)
151154
} else {
152-
if !r.proceedOnNil {
153-
slog.Info("await: stopping because a subroutine finished")
154-
} else {
155+
if r.proceedOnNil && atomic.LoadInt32(&waitCount) > 0 {
155156
goto loop
157+
} else {
158+
slog.Debug("await: stopping on subroutine(s) complete")
156159
}
157160
}
158161
}

0 commit comments

Comments
 (0)