@@ -281,35 +281,68 @@ func (p *pipe) _background() {
281
281
atomic .StoreInt32 (& p .state , 4 )
282
282
}
283
283
284
+ type flush struct {
285
+ works time.Duration
286
+ waits time.Duration
287
+ bytes int
288
+ }
289
+
290
+ const flushesKeep = 16
291
+ const flushesMask = flushesKeep - 1
292
+
284
293
func (p * pipe ) _backgroundWrite () (err error ) {
285
294
var (
286
295
ones = make ([]cmds.Completed , 1 )
287
296
multi []cmds.Completed
288
297
ch chan RedisResult
289
298
290
- flushDelay = p .maxFlushDelay
291
- flushStart = time.Time {}
299
+ flushes [flushesKeep ]flush
300
+ flushId = 0
301
+ bytes = 1 // avoid zero
302
+ works = time .Duration (0 )
303
+ waits = time .Duration (0 )
304
+
305
+ userDelay = p .maxFlushDelay
292
306
)
293
307
308
+ var ts1 = time .Now ()
294
309
for atomic .LoadInt32 (& p .state ) < 3 {
295
310
if ones [0 ], multi , ch = p .queue .NextWriteCmd (); ch == nil {
296
- if flushDelay != 0 {
297
- flushStart = time .Now ()
298
- }
299
- if p .w .Buffered () == 0 {
311
+ buf := p .w .Buffered ()
312
+ if buf == 0 {
300
313
err = p .Error ()
314
+ } else if userDelay == 0 {
315
+ err = p .w .Flush ()
301
316
} else {
317
+ ts2 := time .Now ()
302
318
err = p .w .Flush ()
319
+ dur , gap := time .Since (ts2 ), ts2 .Sub (ts1 )
320
+ bytes = bytes + buf - flushes [flushId ].bytes
321
+ works = works + dur - flushes [flushId ].works
322
+ waits = waits + gap - flushes [flushId ].waits
323
+ flushes [flushId ] = flush {bytes : buf , works : dur , waits : gap }
324
+ flushId = (flushId + 1 ) & flushesMask
325
+ ts1 = ts2
303
326
}
327
+
304
328
if err == nil {
305
329
if atomic .LoadInt32 (& p .state ) == 1 {
306
330
ones [0 ], multi , ch = p .queue .WaitForWrite ()
307
331
} else {
308
332
runtime .Gosched ()
309
333
continue
310
334
}
311
- if flushDelay != 0 && atomic .LoadInt32 (& p .waits ) > 1 { // do not delay for sequential usage
312
- time .Sleep (flushDelay - time .Since (flushStart )) // ref: https://github.com/rueian/rueidis/issues/156
335
+ if userDelay != 0 && atomic .LoadInt32 (& p .waits ) > 1 { // do not delay for sequential usage
336
+ byteWork := works / time .Duration (bytes )
337
+ byteWait := waits / time .Duration (bytes )
338
+ byteCost := byteWait + byteWork
339
+ capDelay := userDelay
340
+ if byteCost != 0 {
341
+ if delay := byteWait * (works / flushesKeep ) / byteCost ; delay < capDelay {
342
+ capDelay = delay
343
+ }
344
+ }
345
+ time .Sleep (capDelay ) // ref: https://github.com/rueian/rueidis/issues/156
313
346
}
314
347
}
315
348
}
0 commit comments