diff --git a/packages/engine.io/lib/socket.ts b/packages/engine.io/lib/socket.ts index c2efe8ebb..d77c0a882 100644 --- a/packages/engine.io/lib/socket.ts +++ b/packages/engine.io/lib/socket.ts @@ -15,7 +15,10 @@ export interface SendOptions { type ReadyState = "opening" | "open" | "closing" | "closed"; -type SendCallback = (transport: Transport) => void; +type SendCallback = ( + transport: Transport, + packets: Packet[] | undefined, +) => void; export class Socket extends EventEmitter { /** @@ -272,13 +275,13 @@ export class Socket extends EventEmitter { * * @private */ - private onDrain() { + private onDrain(packets: Packet[] | undefined) { if (this.sentCallbackFn.length > 0) { debug("executing batch send callback"); const seqFn = this.sentCallbackFn.shift(); if (seqFn) { for (let i = 0; i < seqFn.length; i++) { - seqFn[i](this.transport); + seqFn[i](this.transport, packets); } } } @@ -526,8 +529,8 @@ export class Socket extends EventEmitter { } this.transport.send(wbuf); - this.emit("drain"); - this.server.emit("drain", this); + this.emit("drain", wbuf); + this.server.emit("drain", this, wbuf); } } diff --git a/packages/engine.io/lib/transport.ts b/packages/engine.io/lib/transport.ts index 6d3b9cbf6..7cd546472 100644 --- a/packages/engine.io/lib/transport.ts +++ b/packages/engine.io/lib/transport.ts @@ -4,6 +4,7 @@ import * as parser_v3 from "./parser-v3/index"; import debugModule from "debug"; import type { IncomingMessage, ServerResponse } from "http"; import { Packet, RawData } from "engine.io-parser"; +import type * as Parser from "engine.io-parser"; import type { WebSocket } from "ws"; const debug = debugModule("engine:transport"); diff --git a/packages/engine.io/lib/transports-uws/polling.ts b/packages/engine.io/lib/transports-uws/polling.ts index 7463b3b3d..6d9d7b9f7 100644 --- a/packages/engine.io/lib/transports-uws/polling.ts +++ b/packages/engine.io/lib/transports-uws/polling.ts @@ -3,6 +3,7 @@ import { createGzip, createDeflate } from "zlib"; import * as accepts from "accepts"; import debugModule from "debug"; import { HttpRequest, HttpResponse } from "uWebSockets.js"; +import { Packet } from "engine.io-parser"; import type * as parser_v4 from "engine.io-parser"; import type * as parser_v3 from "../parser-v3/index"; @@ -255,7 +256,7 @@ export class Polling extends Transport { * @param {Object} packet * @private */ - send(packets) { + send(packets: Packet[]) { this.writable = false; if (this.shouldClose) { @@ -269,7 +270,7 @@ export class Polling extends Transport { const compress = packets.some((packet) => { return packet.options && packet.options.compress; }); - this.write(data, { compress }); + this.write(data, { compress, source: packets }); }; if (this.protocol === 3) { @@ -294,7 +295,7 @@ export class Polling extends Transport { debug('writing "%s"', data); this.doWrite(data, options, () => { this.req.cleanup(); - this.emit("drain"); + this.emit("drain", options.source); }); } diff --git a/packages/engine.io/lib/transports-uws/websocket.ts b/packages/engine.io/lib/transports-uws/websocket.ts index aa5e1c939..6fa9d35fe 100644 --- a/packages/engine.io/lib/transports-uws/websocket.ts +++ b/packages/engine.io/lib/transports-uws/websocket.ts @@ -55,7 +55,7 @@ export class WebSocket extends Transport { this.socket.send(data, isBinary, compress); if (isLast) { - this.emit("drain"); + this.emit("drain", packets); this.writable = true; this.emit("ready"); } diff --git a/packages/engine.io/lib/transports/polling.ts b/packages/engine.io/lib/transports/polling.ts index 15e492a19..485776bb3 100644 --- a/packages/engine.io/lib/transports/polling.ts +++ b/packages/engine.io/lib/transports/polling.ts @@ -231,7 +231,7 @@ export class Polling extends Transport { const compress = packets.some((packet) => { return packet.options && packet.options.compress; }); - this.write(data, { compress }); + this.write(data, { compress, source: packets }); }; if (this.protocol === 3) { @@ -256,7 +256,7 @@ export class Polling extends Transport { debug('writing "%s"', data); this.doWrite(data, options, () => { this.req.cleanup(); - this.emit("drain"); + this.emit("drain", options.source); }); } @@ -280,8 +280,8 @@ export class Polling extends Transport { headers["Content-Length"] = "string" === typeof data ? Buffer.byteLength(data) : data.length; this.res.writeHead(200, this.headers(this.req, headers)); + this.res.once("finish", callback); this.res.end(data); - callback(); }; if (!this.httpCompression || !options.compress) { @@ -304,8 +304,8 @@ export class Polling extends Transport { this.compress(data, encoding, (err, data) => { if (err) { this.res.writeHead(500); + this.res.once("finish", callback); this.res.end(); - callback(err); return; } diff --git a/packages/engine.io/lib/transports/websocket.ts b/packages/engine.io/lib/transports/websocket.ts index ebc0553fb..41f1942fa 100644 --- a/packages/engine.io/lib/transports/websocket.ts +++ b/packages/engine.io/lib/transports/websocket.ts @@ -6,6 +6,7 @@ import type { PerMessageDeflateOptions, WebSocket as WsWebSocket } from "ws"; const debug = debugModule("engine:ws"); export class WebSocket extends Transport { + private currentPackets: Packet[] | undefined; perMessageDeflate?: boolean | PerMessageDeflateOptions; private socket: WsWebSocket; @@ -45,6 +46,8 @@ export class WebSocket extends Transport { send(packets: Packet[]) { this.writable = false; + this.currentPackets = packets; + for (let i = 0; i < packets.length; i++) { const packet = packets[i]; const isLast = i + 1 === packets.length; @@ -99,8 +102,9 @@ export class WebSocket extends Transport { if (err) { this.onError("write error", err.stack); } else { - this.emit("drain"); + this.emit("drain", this.currentPackets); this.writable = true; + this.currentPackets = undefined; this.emit("ready"); } }; diff --git a/packages/engine.io/lib/transports/webtransport.ts b/packages/engine.io/lib/transports/webtransport.ts index f675e7c37..2c79ac9e8 100644 --- a/packages/engine.io/lib/transports/webtransport.ts +++ b/packages/engine.io/lib/transports/webtransport.ts @@ -60,7 +60,7 @@ export class WebTransport extends Transport { debug("error while writing: %s", e.message); } - this.emit("drain"); + this.emit("drain", packets); this.writable = true; this.emit("ready"); }