File tree Expand file tree Collapse file tree 2 files changed +22
-18
lines changed Expand file tree Collapse file tree 2 files changed +22
-18
lines changed Original file line number Diff line number Diff line change 3
3
const { Readable } = require ( 'node:stream' ) ;
4
4
5
5
class QueueStream extends Readable {
6
- constructor ( concurrent ) {
6
+ constructor ( concurrency ) {
7
7
super ( { objectMode : true } ) ;
8
- this . concurrent = concurrent ;
8
+ this . concurrency = concurrency ;
9
9
this . count = 0 ;
10
- this . queue = [ ] ;
10
+ this . waiting = [ ] ;
11
11
}
12
12
13
- static channels ( concurrent ) {
14
- return new QueueStream ( concurrent ) ;
13
+ static channels ( concurrency ) {
14
+ return new QueueStream ( concurrency ) ;
15
15
}
16
16
17
17
add ( task ) {
18
- this . queue . push ( task ) ;
18
+ this . waiting . push ( task ) ;
19
19
}
20
20
21
21
_read ( ) {
22
- while ( this . count < this . concurrent && this . queue . length > 0 ) {
23
- const task = this . queue . shift ( ) ;
22
+ const emptyChannels = this . concurrency - this . count ;
23
+ let launchCount = Math . min ( emptyChannels , this . waiting . length ) ;
24
+ while ( launchCount -- > 0 ) {
24
25
this . count ++ ;
26
+ const task = this . waiting . shift ( ) ;
25
27
this . onProcess ( task , ( err , res ) => {
26
28
if ( err ) this . emit ( 'error' , err ) ;
27
29
this . push ( { err, res } ) ;
28
30
this . count -- ;
29
31
} ) ;
30
32
}
31
- if ( this . queue . length === 0 && this . count === 0 ) {
33
+ if ( this . waiting . length === 0 && this . count === 0 ) {
32
34
this . push ( null ) ;
33
35
}
34
36
}
Original file line number Diff line number Diff line change 3
3
const { Readable, Writable, Transform, pipeline } = require ( 'node:stream' ) ;
4
4
5
5
class QueueStream extends Readable {
6
- constructor ( concurrent ) {
6
+ constructor ( concurrency ) {
7
7
super ( { objectMode : true } ) ;
8
- this . concurrent = concurrent ;
8
+ this . concurrency = concurrency ;
9
9
this . count = 0 ;
10
- this . queue = [ ] ;
10
+ this . waiting = [ ] ;
11
11
}
12
12
13
- static channels ( concurrent ) {
14
- return new QueueStream ( concurrent ) ;
13
+ static channels ( concurrency ) {
14
+ return new QueueStream ( concurrency ) ;
15
15
}
16
16
17
17
add ( task ) {
18
- this . queue . push ( task ) ;
18
+ this . waiting . push ( task ) ;
19
19
}
20
20
21
21
_read ( ) {
22
- while ( this . count < this . concurrent && this . queue . length > 0 ) {
23
- const task = this . queue . shift ( ) ;
22
+ const emptyChannels = this . concurrency - this . count ;
23
+ let launchCount = Math . min ( emptyChannels , this . waiting . length ) ;
24
+ while ( launchCount -- > 0 ) {
25
+ const task = this . waiting . shift ( ) ;
24
26
this . count ++ ;
25
27
this . onProcess ( task , ( err , res ) => {
26
28
if ( err ) this . emit ( 'error' , err ) ;
27
29
this . push ( { err, res } ) ;
28
30
this . count -- ;
29
31
} ) ;
30
32
}
31
- if ( this . queue . length === 0 && this . count === 0 ) {
33
+ if ( this . waiting . length === 0 && this . count === 0 ) {
32
34
this . push ( null ) ;
33
35
}
34
36
}
You can’t perform that action at this time.
0 commit comments