Skip to content

Commit 4346c0f

Browse files
authored
http: fix handling of HTTP upgrades with bodies
Previously, we ignored all indicators of the body (content-length or transfer-encoding headers) and treated any information following the headers as part of the upgraded stream. We now fully process the requests bodies separately instead, allowing us to automatically handle correct parsing of the body like any other HTTP request. This is a breaking change if you are currently accepting HTTP Upgrade requests with request bodies successfully, or if you use socket-specific fields & methods on the upgraded stream argument. In the former case, before now you will have received the request body and then the upgraded data on the same stream without any distinction or HTTP parsing applied. Now, you will need to separately read the request body from the request (the 1st argument) and the upgraded data from the upgrade stream (the 2nd argument). If you're not interested in request bodies, you can continue to just read from the upgrade stream directly. In the latter case, if you want to access the raw socket, you should do so via request.socket, instead of expecting the 2nd argument to be a socket. PR-URL: #60016 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Colin Ihrig <[email protected]> Reviewed-By: Luigi Pinca <[email protected]> Reviewed-By: Ethan Arrowood <[email protected]> Reviewed-By: Rafael Gonzaga <[email protected]>
1 parent f9a83ff commit 4346c0f

7 files changed

+543
-38
lines changed

doc/api/http.md

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -745,7 +745,7 @@ added: v0.1.94
745745
-->
746746

747747
* `response` {http.IncomingMessage}
748-
* `socket` {stream.Duplex}
748+
* `stream` {stream.Duplex}
749749
* `head` {Buffer}
750750

751751
Emitted each time a server responds to a request with an upgrade. If this
@@ -768,13 +768,13 @@ const server = http.createServer((req, res) => {
768768
res.writeHead(200, { 'Content-Type': 'text/plain' });
769769
res.end('okay');
770770
});
771-
server.on('upgrade', (req, socket, head) => {
772-
socket.write('HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
771+
server.on('upgrade', (req, stream, head) => {
772+
stream.write('HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
773773
'Upgrade: WebSocket\r\n' +
774774
'Connection: Upgrade\r\n' +
775775
'\r\n');
776776

777-
socket.pipe(socket); // echo back
777+
stream.pipe(stream); // echo back
778778
});
779779

780780
// Now that server is running
@@ -793,9 +793,9 @@ server.listen(1337, '127.0.0.1', () => {
793793
const req = http.request(options);
794794
req.end();
795795

796-
req.on('upgrade', (res, socket, upgradeHead) => {
796+
req.on('upgrade', (res, stream, upgradeHead) => {
797797
console.log('got upgraded!');
798-
socket.end();
798+
stream.end();
799799
process.exit(0);
800800
});
801801
});
@@ -809,13 +809,13 @@ const server = http.createServer((req, res) => {
809809
res.writeHead(200, { 'Content-Type': 'text/plain' });
810810
res.end('okay');
811811
});
812-
server.on('upgrade', (req, socket, head) => {
813-
socket.write('HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
812+
server.on('upgrade', (req, stream, head) => {
813+
stream.write('HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
814814
'Upgrade: WebSocket\r\n' +
815815
'Connection: Upgrade\r\n' +
816816
'\r\n');
817817

818-
socket.pipe(socket); // echo back
818+
stream.pipe(stream); // echo back
819819
});
820820

821821
// Now that server is running
@@ -834,9 +834,9 @@ server.listen(1337, '127.0.0.1', () => {
834834
const req = http.request(options);
835835
req.end();
836836

837-
req.on('upgrade', (res, socket, upgradeHead) => {
837+
req.on('upgrade', (res, stream, upgradeHead) => {
838838
console.log('got upgraded!');
839-
socket.end();
839+
stream.end();
840840
process.exit(0);
841841
});
842842
});
@@ -1674,6 +1674,14 @@ per connection (in the case of HTTP Keep-Alive connections).
16741674
<!-- YAML
16751675
added: v0.1.94
16761676
changes:
1677+
- version: REPLACEME
1678+
pr-url: https://github.com/nodejs/node/pull/60016
1679+
description: Request bodies are no longer exposed raw (unparsed) on the
1680+
socket argument. Instead, if a body is received, the stream
1681+
argument will be a duplex that emits socket content only
1682+
after the request body, while the parsed request body data
1683+
will be emitted from the request, just as in normal server
1684+
`'request'` events.
16771685
- version:
16781686
- v24.9.0
16791687
- v22.21.0
@@ -1689,31 +1697,37 @@ changes:
16891697

16901698
* `request` {http.IncomingMessage} Arguments for the HTTP request, as it is in
16911699
the [`'request'`][] event
1692-
* `socket` {stream.Duplex} Network socket between the server and client
1700+
* `stream` {stream.Duplex} The upgraded stream between the server and client
16931701
* `head` {Buffer} The first packet of the upgraded stream (may be empty)
16941702

16951703
Emitted each time a client's HTTP upgrade request is accepted. By default
16961704
all HTTP upgrade requests are ignored (i.e. only regular `'request'` events
16971705
are emitted, sticking with the normal HTTP request/response flow) unless you
16981706
listen to this event, in which case they are all accepted (i.e. the `'upgrade'`
16991707
event is emitted instead, and future communication must handled directly
1700-
through the raw socket). You can control this more precisely by using the
1708+
through the raw stream). You can control this more precisely by using the
17011709
server `shouldUpgradeCallback` option.
17021710

17031711
Listening to this event is optional and clients cannot insist on a protocol
17041712
change.
17051713

1706-
After this event is emitted, the request's socket will not have a `'data'`
1707-
event listener, meaning it will need to be bound in order to handle data
1708-
sent to the server on that socket.
1709-
17101714
If an upgrade is accepted by `shouldUpgradeCallback` but no event handler
1711-
is registered then the socket is destroyed, resulting in an immediate
1715+
is registered then the socket will be destroyed, resulting in an immediate
17121716
connection closure for the client.
17131717

1714-
This event is guaranteed to be passed an instance of the {net.Socket} class,
1715-
a subclass of {stream.Duplex}, unless the user specifies a socket
1716-
type other than {net.Socket}.
1718+
In the uncommon case that the incoming request has a body, this body will be
1719+
parsed as normal, separate to the upgrade stream, and the raw stream data will
1720+
only begin after it has completed. To ensure that reading from the stream isn't
1721+
blocked by waiting for the request body to be read, any reads on the stream
1722+
will start the request body flowing automatically. If you want to read the
1723+
request body, ensure that you do so (i.e. you attach `'data'` listeners)
1724+
before starting to read from the upgraded stream.
1725+
1726+
The stream argument will typically be the {net.Socket} instance used by the
1727+
request, but in some cases (such as with a request body) it may be a duplex
1728+
stream. If required, you can access the raw connection underlying the request
1729+
via [`request.socket`][], which is guaranteed to be an instance of {net.Socket}
1730+
unless the user specified another socket type.
17171731

17181732
### `server.close([callback])`
17191733

lib/_http_server.js

Lines changed: 120 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const {
3434
SymbolFor,
3535
} = primordials;
3636

37+
const { Duplex } = require('stream');
3738
const net = require('net');
3839
const EE = require('events');
3940
const assert = require('internal/assert');
@@ -43,6 +44,7 @@ const {
4344
continueExpression,
4445
chunkExpression,
4546
kIncomingMessage,
47+
kSocket,
4648
HTTPParser,
4749
isLenient,
4850
_checkInvalidHeaderChar: checkInvalidHeaderChar,
@@ -106,6 +108,7 @@ const onResponseFinishChannel = dc.channel('http.server.response.finish');
106108

107109
const kServerResponse = Symbol('ServerResponse');
108110
const kServerResponseStatistics = Symbol('ServerResponseStatistics');
111+
const kUpgradeStream = Symbol('UpgradeStream');
109112

110113
const kOptimizeEmptyRequests = Symbol('OptimizeEmptyRequestsOption');
111114

@@ -953,6 +956,77 @@ function socketOnError(e) {
953956
}
954957
}
955958

959+
class UpgradeStream extends Duplex {
960+
constructor(socket, req) {
961+
super({
962+
allowHalfOpen: socket.allowHalfOpen,
963+
});
964+
965+
this[kSocket] = socket;
966+
this[kIncomingMessage] = req;
967+
968+
// Proxy error, end & closure events immediately.
969+
socket.on('error', (err) => this.destroy(err));
970+
971+
socket.on('close', () => this.destroy());
972+
this.on('close', () => socket.destroy());
973+
974+
socket.on('end', () => {
975+
this.push(null);
976+
977+
// Match the socket behaviour, where 'end' will fire despite no 'data'
978+
// listeners if a socket with no pending data ends:
979+
if (this.readableLength === 0) {
980+
this.resume();
981+
}
982+
});
983+
984+
// Other events (most notably, reading) all only
985+
// activate after requestBodyCompleted is called.
986+
}
987+
988+
requestBodyCompleted(upgradeHead) {
989+
this[kIncomingMessage] = null;
990+
991+
// When the request body is completed, we begin streaming all the
992+
// post-body data for the upgraded protocol:
993+
if (upgradeHead?.length > 0) {
994+
if (!this.push(upgradeHead)) {
995+
this[kSocket].pause();
996+
}
997+
}
998+
999+
this[kSocket].on('data', (data) => {
1000+
if (!this.push(data)) {
1001+
this[kSocket].pause();
1002+
}
1003+
});
1004+
}
1005+
1006+
_read(size) {
1007+
// Reading the upgrade stream starts the request stream flowing. It's
1008+
// important that this happens, even if there are no listeners, or it
1009+
// would be impossible to read this without explicitly reading all the
1010+
// request body first, which is backward incompatible & awkward.
1011+
this[kIncomingMessage]?.resume();
1012+
1013+
this[kSocket].resume();
1014+
}
1015+
1016+
_final(callback) {
1017+
this[kSocket].end(callback);
1018+
}
1019+
1020+
_write(chunk, encoding, callback) {
1021+
this[kSocket].write(chunk, encoding, callback);
1022+
}
1023+
1024+
_destroy(err, callback) {
1025+
this[kSocket].destroy(err);
1026+
callback(err);
1027+
}
1028+
}
1029+
9561030
function onParserExecuteCommon(server, socket, parser, state, ret, d) {
9571031
if (ret instanceof Error) {
9581032
prepareError(ret, parser, d);
@@ -962,28 +1036,56 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) {
9621036
// Upgrade or CONNECT
9631037
const req = parser.incoming;
9641038
debug('SERVER upgrade or connect', req.method);
1039+
const eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade';
1040+
1041+
let upgradeStream;
1042+
if (req.complete) {
1043+
d ||= parser.getCurrentBuffer();
1044+
1045+
socket.removeListener('data', state.onData);
1046+
socket.removeListener('end', state.onEnd);
1047+
socket.removeListener('close', state.onClose);
1048+
socket.removeListener('drain', state.onDrain);
1049+
socket.removeListener('error', socketOnError);
1050+
socket.removeListener('timeout', socketOnTimeout);
1051+
1052+
unconsume(parser, socket);
1053+
parser.finish();
1054+
freeParser(parser, req, socket);
1055+
parser = null;
1056+
1057+
// If the request is complete (no body, or all body read upfront) then
1058+
// we just emit the socket directly as the upgrade stream.
1059+
upgradeStream = socket;
1060+
} else {
1061+
// If the body hasn't been fully parsed yet, we emit immediately but
1062+
// we add a wrapper around the socket to not expose incoming data
1063+
// until the request body has finished.
1064+
1065+
if (socket[kUpgradeStream]) {
1066+
// We've already emitted the incomplete upgrade - nothing do to
1067+
// until actual body parsing completion.
1068+
return;
1069+
}
9651070

966-
d ||= parser.getCurrentBuffer();
1071+
d ||= Buffer.alloc(0);
9671072

968-
socket.removeListener('data', state.onData);
969-
socket.removeListener('end', state.onEnd);
970-
socket.removeListener('close', state.onClose);
971-
socket.removeListener('drain', state.onDrain);
972-
socket.removeListener('error', socketOnError);
973-
socket.removeListener('timeout', socketOnTimeout);
974-
unconsume(parser, socket);
975-
parser.finish();
976-
freeParser(parser, req, socket);
977-
parser = null;
1073+
upgradeStream = new UpgradeStream(socket, req);
1074+
socket[kUpgradeStream] = upgradeStream;
1075+
}
9781076

979-
const eventName = req.method === 'CONNECT' ? 'connect' : 'upgrade';
9801077
if (server.listenerCount(eventName) > 0) {
9811078
debug('SERVER have listener for %s', eventName);
982-
const bodyHead = d.slice(ret, d.length);
9831079

984-
socket.readableFlowing = null;
1080+
const bodyHead = d.slice(ret, d.length);
9851081

986-
server.emit(eventName, req, socket, bodyHead);
1082+
if (req.complete && socket[kUpgradeStream]) {
1083+
// Previously emitted, now completed - just activate the stream
1084+
socket[kUpgradeStream].requestBodyCompleted(bodyHead);
1085+
} else {
1086+
socket.readableFlowing = null;
1087+
server.emit(eventName, req, upgradeStream, bodyHead);
1088+
}
9871089
} else {
9881090
// Got upgrade or CONNECT method, but have no handler.
9891091
socket.destroy();
@@ -1089,8 +1191,9 @@ function parserOnIncoming(server, socket, state, req, keepAlive) {
10891191
if (req.upgrade) {
10901192
req.upgrade = req.method === 'CONNECT' ||
10911193
!!server.shouldUpgradeCallback(req);
1092-
if (req.upgrade)
1093-
return 2;
1194+
if (req.upgrade) {
1195+
return 0;
1196+
}
10941197
}
10951198

10961199
state.incoming.push(req);

0 commit comments

Comments
 (0)