Skip to content

Commit

Permalink
Fix send on closed channel panic in streamPrediction (#80)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattt authored Sep 19, 2024
1 parent 1c2ae68 commit eb1270a
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (r *Client) StreamPrediction(ctx context.Context, prediction *Prediction) (

func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, lastEvent *SSEEvent, sseChan chan SSEEvent, errChan chan error) {
g, ctx := errgroup.WithContext(ctx)
done := make(chan struct{})

url := prediction.URLs["stream"]
if url == "" {
Expand Down Expand Up @@ -161,8 +162,6 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l
return
}

done := make(chan struct{})

reader := bufio.NewReader(resp.Body)
var buf bytes.Buffer
lineChan := make(chan []byte)
Expand Down Expand Up @@ -211,12 +210,22 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l

event := SSEEvent{Type: SSETypeDefault}
if err := event.decode(b); err != nil {
errChan <- err
select {
case errChan <- err:
default:
}
close(done)
return
}

sseChan <- event
select {
case sseChan <- event:
case <-done:
return
case <-ctx.Done():
return
}

if event.Type == SSETypeDone {
close(done)
return
Expand All @@ -239,15 +248,11 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l
return
}

if errors.Is(err, context.Canceled) {
// Context was canceled, simply return
return
}

select {
case errChan <- err:
default:
// errChan is full or closed
if !errors.Is(err, context.Canceled) {
select {
case errChan <- err:
default:
}
}
}
}()
Expand Down

0 comments on commit eb1270a

Please sign in to comment.