@@ -58,7 +58,13 @@ func ImagePullPrivileged(ctx context.Context, dockerapi docker.APIClient, imageN
58
58
return err
59
59
}
60
60
} else {
61
- if err := printPull (ctx , responseBody , logger ); err != nil {
61
+ ctx , cancel := context .WithCancel (ctx )
62
+ defer cancel () // Used to stop the go routine if printPull returns error early.
63
+
64
+ msgCh := make (chan Message , 4096 )
65
+ go decode (ctx , responseBody , msgCh )
66
+ err := printPull (ctx , msgCh , logger )
67
+ if err != nil {
62
68
return err
63
69
}
64
70
}
@@ -156,7 +162,52 @@ type PullProgress struct {
156
162
Vtx * client.VertexStatus
157
163
}
158
164
159
- func printPull (_ context.Context , rc io.Reader , l progress.SubLogger ) error {
165
+ type Message struct {
166
+ msg * jsonmessage.JSONMessage
167
+ err error
168
+ }
169
+
170
+ // decode reads the body of the response from Docker and decodes it into JSON messages as fast
171
+ // as it can. It does not block on the channel and prefers to drop messages if the channel is full
172
+ // to prevent Docker from blocking on the pull.
173
+ func decode (ctx context.Context , r io.Reader , msgCh chan <- Message ) {
174
+ defer close (msgCh )
175
+
176
+ dec := json .NewDecoder (r )
177
+ for {
178
+ select {
179
+ case <- ctx .Done ():
180
+ select {
181
+ case msgCh <- Message {err : ctx .Err ()}:
182
+ default :
183
+ }
184
+ return
185
+ default :
186
+ }
187
+
188
+ var msg jsonmessage.JSONMessage
189
+ if err := dec .Decode (& msg ); err != nil {
190
+ if err == io .EOF {
191
+ return
192
+ }
193
+
194
+ select {
195
+ case msgCh <- Message {err : err }:
196
+ default :
197
+ }
198
+ }
199
+
200
+ // If we block here it is possible for Docker to block on the pull.
201
+ select {
202
+ case msgCh <- Message {msg : & msg }:
203
+ default :
204
+ }
205
+ }
206
+ }
207
+
208
+ // printPull will convert the messages to useful on screen content.
209
+ // we want to read as fast as possible as docker will block if the body buffer becomes too full.
210
+ func printPull (ctx context.Context , msgCh <- chan Message , l progress.SubLogger ) error {
160
211
started := map [string ]PullProgress {}
161
212
162
213
defer func () {
@@ -170,26 +221,29 @@ func printPull(_ context.Context, rc io.Reader, l progress.SubLogger) error {
170
221
}
171
222
}()
172
223
173
- dec := json .NewDecoder (rc )
174
-
175
224
var (
176
- parsedError error
177
- jm jsonmessage. JSONMessage
225
+ msg Message
226
+ ok bool
178
227
)
179
228
180
229
for {
181
- if err := dec .Decode (& jm ); err != nil {
182
- if parsedError != nil {
183
- return parsedError
230
+ select {
231
+ case <- ctx .Done ():
232
+ return ctx .Err ()
233
+ case msg , ok = <- msgCh :
234
+ if ! ok {
235
+ return nil
184
236
}
185
- if err == io .EOF {
186
- break
187
- }
188
- return err
189
237
}
190
238
239
+ if msg .err != nil {
240
+ return msg .err
241
+ }
242
+
243
+ jm := msg .msg
244
+
191
245
if jm .Error != nil {
192
- parsedError = jm .Error
246
+ return jm .Error
193
247
}
194
248
195
249
if jm .ID == "" {
@@ -270,5 +324,4 @@ func printPull(_ context.Context, rc io.Reader, l progress.SubLogger) error {
270
324
271
325
l .SetStatus (st .Vtx )
272
326
}
273
- return nil
274
327
}
0 commit comments