Skip to content

Commit a19efda

Browse files
committed
Merge pull-request #562
2 parents 5f93df7 + 5efd866 commit a19efda

File tree

1 file changed

+36
-29
lines changed

1 file changed

+36
-29
lines changed

src/qos_core/src/io/stream.rs

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ const MAX_RETRY: usize = 25;
2525
const BACKOFF_MILLISECONDS: u64 = 10;
2626
const BACKLOG: usize = 128;
2727

28-
const MEGABYTE: usize = 1024 * 1024;
28+
const MIB: usize = 1024 * 1024;
2929

30-
/// Maximum payload size for a single recv / send call. We're being generous with 128MB.
30+
/// Maximum payload size for a single recv / send call. We're being generous with 128MiB.
3131
/// The goal here is to avoid server crashes if the payload size exceeds the available system memory.
32-
pub const MAX_PAYLOAD_SIZE: usize = 128 * MEGABYTE;
32+
pub const MAX_PAYLOAD_SIZE: usize = 128 * MIB;
33+
34+
/// Even though we allow for big payloads we start by allocating a small buffer first. Then allocate more as needed.
35+
pub const INITIAL_RECV_BUF_SIZE: usize = 2 * MIB;
3336

3437
/// Socket address.
3538
#[derive(Clone, Debug, PartialEq, Eq)]
@@ -234,33 +237,37 @@ impl Stream {
234237
return Err(IOError::OversizedPayload(length));
235238
}
236239

237-
// Read the buffer
238-
let mut buf = vec![0; length];
239-
{
240-
let mut received_bytes = 0;
241-
while received_bytes < length {
242-
received_bytes += match recv(
243-
self.fd,
244-
&mut buf[received_bytes..length],
245-
MsgFlags::empty(),
246-
) {
247-
Ok(0) => {
248-
return Err(IOError::RecvConnectionClosed);
249-
}
250-
Ok(size) => size,
251-
Err(nix::Error::EINTR) => {
252-
return Err(IOError::RecvInterrupted);
253-
}
254-
Err(nix::Error::EAGAIN) => {
255-
return Err(IOError::RecvTimeout);
256-
}
257-
Err(err) => {
258-
return Err(IOError::NixError(err));
259-
}
260-
};
240+
// Allocate conservatively to avoid clients setting 128MB as their declared length and keeping the connection open.
241+
// We'd only need a few of these to run out of memory. This "as needed" allocation ensures clients have skin in the game.
242+
let initial_recv_buf_len =
243+
core::cmp::min(length, INITIAL_RECV_BUF_SIZE);
244+
let mut recv_buf = vec![0u8; initial_recv_buf_len];
245+
246+
let mut received_bytes = 0;
247+
while received_bytes < length {
248+
// If the receive buffer is full, double it.
249+
if received_bytes == recv_buf.len() {
250+
// Using `saturating_mul` here out of paranoia; it's cheap enough to saturate instead of overflow.
251+
// We either double the recv buffer capacity, or set it to `length` if doubling would exceed it.
252+
let new_len =
253+
core::cmp::min(recv_buf.len().saturating_mul(2), length);
254+
recv_buf.resize(new_len, 0);
261255
}
256+
257+
received_bytes += match recv(
258+
self.fd,
259+
&mut recv_buf[received_bytes..],
260+
MsgFlags::empty(),
261+
) {
262+
Ok(0) => return Err(IOError::RecvConnectionClosed),
263+
Ok(size) => size,
264+
Err(nix::Error::EINTR) => return Err(IOError::RecvInterrupted),
265+
Err(nix::Error::EAGAIN) => return Err(IOError::RecvTimeout),
266+
Err(err) => return Err(IOError::NixError(err)),
267+
};
262268
}
263-
Ok(buf)
269+
270+
Ok(recv_buf)
264271
}
265272
}
266273

@@ -533,7 +540,7 @@ mod test {
533540
}
534541
});
535542

536-
// Sending a request that is strictly less than the max size should work
543+
// Sending a request that is exactly the max size should work
537544
// (the response will be exactly max size)
538545
let client = Stream::connect(&addr, timeval()).unwrap();
539546
let req = vec![1u8; MAX_PAYLOAD_SIZE];

0 commit comments

Comments
 (0)