Skip to content

Commit

Permalink
Merge pull request #418 from mkrudele/master
Browse files Browse the repository at this point in the history
Add abort signal support
  • Loading branch information
tomas authored Dec 8, 2023
2 parents f0164f7 + 137fd52 commit e045301
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 6 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ With only two real dependencies, Needle supports:
- Automatic XML & JSON parsing
- 301/302/303 redirect following, with fine-grained tuning, and
- Streaming non-UTF-8 charset decoding, via `iconv-lite`
- Aborting any or all Needle requests using `AbortSignal` objects

And yes, Mr. Wayne, it does come in black.

Expand Down Expand Up @@ -317,6 +318,7 @@ For information about options that've changed, there's always [the changelog](ht
- `stream_length`: When sending streams, this lets you manually set the Content-Length header --if the stream's bytecount is known beforehand--, preventing ECONNRESET (socket hang up) errors on some servers that misbehave when receiving payloads of unknown size. Set it to `0` and Needle will get and set the stream's length for you, or leave unset for the default behaviour, which is no Content-Length header for stream payloads.
- `localAddress`: <string>, IP address. Passed to http/https request. Local interface from which the request should be emitted.
- `uri_modifier`: Anonymous function taking request (or redirect location if following redirects) URI as an argument and modifying it given logic. It has to return a valid URI string for successful request.
- `signal` : An `AbortSignal` object that can be used to abort any or all Needle requests.

Response options
----------------
Expand Down
37 changes: 31 additions & 6 deletions lib/needle.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ var defaults = {
follow_max : 0,
stream_length : -1,

// abort signal
signal : null,

// booleans
compressed : false,
decode_response : true,
Expand Down Expand Up @@ -189,7 +192,8 @@ Needle.prototype.setup = function(uri, options) {
http_opts : {
agent: get_option('agent', defaults.agent),
localAddress: get_option('localAddress', undefined),
lookup: get_option('lookup', undefined)
lookup: get_option('lookup', undefined),
signal: get_option('signal', defaults.signal)
}, // passed later to http.request() directly
headers : {},
output : options.output,
Expand All @@ -206,6 +210,9 @@ Needle.prototype.setup = function(uri, options) {
config[key] = check_value('number', key);
})

if (config.http_opts.signal && !(config.http_opts.signal instanceof AbortSignal))
throw new TypeError(typeof config.http_opts.signal + ' received for signal, but expected an AbortSignal');

// populate http_opts with given TLS options
tls_options.split(' ').forEach(function(key) {
if (typeof options[key] != 'undefined') {
Expand Down Expand Up @@ -446,6 +453,7 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data,
self = this,
request_opts = this.get_request_opts(method, uri, config),
protocol = request_opts.protocol == 'https:' ? https : http;
signal = request_opts.signal;

function done(err, resp) {
if (returned++ > 0)
Expand Down Expand Up @@ -481,19 +489,25 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data,
done(err || new Error('Unknown error when making request.'));
}

function abort_handler() {
out.emit('err', new Error('Aborted by signal.'));
request.destroy();
}

function set_timeout(type, milisecs) {
if (timer) clearTimeout(timer);
if (milisecs <= 0) return;

timer = setTimeout(function() {
out.emit('timeout', type);
request.abort();
request.destroy();
// also invoke done() to terminate job on read_timeout
if (type == 'read') done(new Error(type + ' timeout'));

signal && signal.removeEventListener('abort', abort_handler);
}, milisecs);
}


debug('Making request #' + count, request_opts);
request = protocol.request(request_opts, function(resp) {

Expand Down Expand Up @@ -766,7 +780,16 @@ Needle.prototype.send_request = function(count, method, uri, config, post_data,
request.end();
}

out.abort = function() { request.abort() }; // easier access
// Manage the abort signal
if (signal) {
if (signal.aborted === true) {
abort_handler();
} else {
signal.addEventListener('abort', abort_handler, {once: true});
}
}

out.abort = function() { request.destroy() }; // easier access
out.request = request;
return out;
}
Expand Down Expand Up @@ -801,12 +824,14 @@ module.exports.defaults = function(obj) {
var target_key = aliased.options[key] || key;

if (defaults.hasOwnProperty(target_key) && typeof obj[key] != 'undefined') {
if (target_key != 'parse_response' && target_key != 'proxy' && target_key != 'agent') {
// ensure type matches the original, except for proxy/parse_response that can be null/bool or string
if (target_key != 'parse_response' && target_key != 'proxy' && target_key != 'agent' && target_key != 'signal') {
// ensure type matches the original, except for proxy/parse_response that can be null/bool or string, and signal that can be null/AbortSignal
var valid_type = defaults[target_key].constructor.name;

if (obj[key].constructor.name != valid_type)
throw new TypeError('Invalid type for ' + key + ', should be ' + valid_type);
} else if (target_key === 'signal' && obj[key] !== null && !(obj[key] instanceof AbortSignal)) {
throw new TypeError('Invalid type for ' + key + ', should be AbortSignal');
}
defaults[target_key] = obj[key];
} else {
Expand Down
134 changes: 134 additions & 0 deletions test/errors_spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,138 @@ describe('errors', function() {

})

var node_major_ver = process.version.split('.')[0].replace('v', '');
if (node_major_ver >= 16) {
describe('when request is aborted by signal', function() {

var server,
url = 'http://localhost:3333/foo';

before(function() {
server = helpers.server({ port: 3333, wait: 600 });
})

after(function() {
server.close();
})

afterEach(function() {
// reset signal to default
needle.defaults({signal: null});
})

it('works if passing an already aborted signal aborts the request', function(done) {
var abortedSignal = AbortSignal.abort();
var start = new Date();

abortedSignal.aborted.should.equal(true);

needle.get(url, { signal: abortedSignal, response_timeout: 10000 }, function(err, res) {
var timediff = (new Date() - start);

should.not.exist(res);
err.code.should.equal('ABORT_ERR');
timediff.should.be.within(0, 50);

done();
});
})

it('works if request aborts before timing out', function(done) {
var cancel = new AbortController();
var start = new Date();

needle.get(url, { signal: cancel.signal, response_timeout: 500, open_timeout: 500, read_timeout: 500 }, function(err, res) {
var timediff = (new Date() - start);

should.not.exist(res);
if (node_major_ver <= 16)
err.code.should.equal('ECONNRESET');
if (node_major_ver > 16)
err.code.should.equal('ABORT_ERR');
cancel.signal.aborted.should.equal(true);
timediff.should.be.within(200, 250);

done();
});

function abort() {
cancel.abort();
}
setTimeout(abort, 200);
})

it('works if request times out before being aborted', function(done) {
var cancel = new AbortController();
var start = new Date();

needle.get(url, { signal: cancel.signal, response_timeout: 200, open_timeout: 200, read_timeout: 200 }, function(err, res) {
var timediff = (new Date() - start);

should.not.exist(res);
err.code.should.equal('ECONNRESET');
timediff.should.be.within(200, 250);
});

function abort() {
cancel.signal.aborted.should.equal(false);
done();
}
setTimeout(abort, 500);
})

it('works if setting default signal aborts all requests', function(done) {
var cancel = new AbortController();

needle.defaults({signal: cancel.signal});

var start = new Date();
var count = 0;
function cb(err, res) {
var timediff = (new Date() - start);

should.not.exist(res);
if (node_major_ver <= 16)
err.code.should.equal('ECONNRESET');
if (node_major_ver > 16)
err.code.should.equal('ABORT_ERR');
cancel.signal.aborted.should.equal(true);
timediff.should.be.within(200, 250);

if ( count++ === 2 ) done();
}

needle.get(url, { timeout: 300 }, cb);
needle.get(url, { timeout: 350 }, cb);
needle.get(url, { timeout: 400}, cb);

function abort() {
cancel.abort();
}
setTimeout(abort, 200);
})

it('does not work if invalid signal passed', function(done) {
try {
needle.get(url, { signal: 'invalid signal' }, function(err, res) {
done(new Error('A bad option error expected to be thrown'));
});
} catch(e) {
e.should.be.a.TypeError;
done();
}
})

it('does not work if invalid signal set by default', function(done) {
try {
needle.defaults({signal: new Error(), timeout: 1200});
done(new Error('A bad option error expected to be thrown'));
} catch(e) {
e.should.be.a.TypeError;
done();
}
})

})
}
})

0 comments on commit e045301

Please sign in to comment.