Skip to content

Commit 59daa00

Browse files
author
Devdutt Shenoi
committed
update to tokio 1.0
1 parent acdce39 commit 59daa00

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+854
-570
lines changed

Cargo.lock

+410-138
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+12-17
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,16 @@ members = [
88
]
99

1010
[workspace.dependencies]
11-
anyhow = "1.0.31"
12-
env_logger = "0.7.1"
13-
futures = "0.3.5"
14-
log = "0.4.8"
15-
rand = "0.7.3"
16-
serde = "1.0.114"
17-
serde_json = "1.0.55"
18-
tokio = { version = "0.2.21", features = [
19-
"blocking",
20-
"time",
21-
"io-util",
22-
"macros",
23-
] }
24-
tokio-rustls = { version = "0.14.0", features = ["dangerous_configuration"] }
25-
tokio-util = { version = "0.3.1", features = ["compat"] }
26-
webpki = "0.21.3"
27-
webpki-roots = "0.20.0"
11+
anyhow = "1"
12+
env_logger = "0.11.5"
13+
futures = "0.3.30"
14+
log = "0.4.22"
15+
rand = "0.8.5"
16+
serde = "1"
17+
serde_json = "1"
18+
tokio = { version = "1.40.0", features = ["time", "io-util", "macros"] }
19+
tokio-rustls = { version = "0.24.1" }
20+
tokio-util = { version = "0.7.12", features = ["compat"] }
21+
webpki = "0.22.4"
22+
webpki-roots = "0.26.6"
2823
lazy_static = "1.4.0"

tunshell-client/Cargo.toml

+3-5
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,14 @@ async-tungstenite = { version = "0.8.0", features = ["tokio-runtime"] } #no-wasm
4343
crossterm = { version = "0.23.2" }
4444
libc = "0.2.71"
4545
tokio = { workspace = true, features = [
46-
"rt-threaded",
47-
"dns",
46+
"rt-multi-thread",
4847
"io-std",
49-
"tcp",
50-
"udp",
48+
"io-util",
49+
"net",
5150
"sync",
5251
"process",
5352
"signal",
5453
"fs",
55-
"uds",
5654
] } #no-wasm
5755

5856
[target.'cfg(target_arch = "wasm32")'.dependencies]

tunshell-client/src/client.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ use anyhow::{Error, Result};
77
use futures::{future, stream::StreamExt, FutureExt};
88
use log::*;
99
use std::sync::{Arc, Mutex};
10-
use tokio_util::compat::*;
1110
use tunshell_shared::*;
1211

13-
pub type ClientMessageStream = MessageStream<ClientMessage, ServerMessage, Compat<ServerStream>>;
12+
pub type ClientMessageStream = MessageStream<ClientMessage, ServerMessage, ServerStream>;
1413

1514
pub struct Client {
1615
config: Config,
@@ -33,7 +32,7 @@ impl Client {
3332
self.println("Connecting to relay server...").await;
3433
let relay_socket = ServerStream::connect(&self.config).await?;
3534

36-
let mut message_stream = ClientMessageStream::new(relay_socket.compat());
35+
let mut message_stream = ClientMessageStream::new(relay_socket);
3736

3837
self.send_key(&mut message_stream).await?;
3938

@@ -164,7 +163,7 @@ impl Client {
164163
assert!(peer_info.session_nonce.len() > 10);
165164

166165
let stream = AesStream::new(
167-
stream.compat(),
166+
stream,
168167
peer_info.session_nonce.as_bytes(),
169168
self.config.encryption_key().as_bytes(),
170169
)

tunshell-client/src/p2p/tcp.rs

+20-15
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,19 @@ use anyhow::{Error, Result};
44
use async_trait::async_trait;
55
use futures::future::pending;
66
use futures::TryFutureExt;
7+
use futures::{AsyncRead, AsyncWrite};
78
use log::*;
89
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
910
use std::pin::Pin;
10-
use std::{time::Duration, task::{Context, Poll}};
11-
use tokio::io::{AsyncRead, AsyncWrite};
11+
use std::task::{Context, Poll};
1212
use tokio::net::{TcpListener, TcpStream};
13+
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
1314
use tunshell_shared::PeerJoinedPayload;
1415

1516
pub struct TcpConnection {
1617
peer_info: PeerJoinedPayload,
1718
listener: Option<TcpListener>,
18-
socket: Option<TcpStream>,
19+
socket: Option<Compat<TcpStream>>,
1920
}
2021

2122
impl AsyncRead for TcpConnection {
@@ -24,7 +25,9 @@ impl AsyncRead for TcpConnection {
2425
cx: &mut Context<'_>,
2526
buff: &mut [u8],
2627
) -> Poll<std::result::Result<usize, std::io::Error>> {
27-
Pin::new(&mut self.socket.as_mut().unwrap()).poll_read(cx, buff)
28+
Pin::new(&mut self.socket.as_mut().unwrap())
29+
.poll_read(cx, buff)
30+
.map(|c| c.map(|_| 0))
2831
}
2932
}
3033

@@ -44,11 +47,11 @@ impl AsyncWrite for TcpConnection {
4447
Pin::new(&mut self.socket.as_mut().unwrap()).poll_flush(cx)
4548
}
4649

47-
fn poll_shutdown(
50+
fn poll_close(
4851
mut self: Pin<&mut Self>,
4952
cx: &mut Context<'_>,
5053
) -> Poll<std::result::Result<(), std::io::Error>> {
51-
Pin::new(&mut self.socket.as_mut().unwrap()).poll_shutdown(cx)
54+
Pin::new(&mut self.socket.as_mut().unwrap()).poll_close(cx)
5255
}
5356
}
5457

@@ -106,8 +109,8 @@ impl P2PConnection for TcpConnection {
106109
let connected_ip = self.peer_info.peer_ip_address.parse::<IpAddr>().unwrap();
107110

108111
if peer_addr.ip() == connected_ip {
109-
socket.set_keepalive(Some(Duration::from_secs(30)))?;
110-
self.socket.replace(socket);
112+
// socket.set_keepalive(Some(Duration::from_secs(30)))?;
113+
self.socket.replace(socket.compat());
111114
return Ok(());
112115
} else {
113116
error!("received connection from unknown ip address: {}", peer_addr);
@@ -121,15 +124,15 @@ impl P2PConnection for TcpConnection {
121124
#[cfg(test)]
122125
mod tests {
123126
use super::*;
127+
use futures::{AsyncReadExt, AsyncWriteExt, FutureExt};
124128
use std::time::Duration;
125-
use tokio::io::{AsyncReadExt, AsyncWriteExt};
126-
use tokio::{runtime::Runtime, time::delay_for};
127-
use futures::FutureExt;
129+
use tokio::{runtime::Runtime, time::sleep};
130+
use tokio_util::compat::TokioAsyncReadCompatExt;
128131

129132
#[test]
130133
fn test_connect_via_connect() {
131134
Runtime::new().unwrap().block_on(async {
132-
let mut listener = TcpListener::bind("0.0.0.0:22335".to_owned())
135+
let listener = TcpListener::bind("0.0.0.0:22335".to_owned())
133136
.await
134137
.expect("failed listen for connection");
135138

@@ -145,10 +148,11 @@ mod tests {
145148
.await
146149
.expect("failed to connect");
147150

148-
let (mut socket, _) = listener
151+
let (socket, _) = listener
149152
.accept()
150153
.await
151154
.expect("failed to accept connection");
155+
let mut socket = socket.compat();
152156

153157
connection1
154158
.write("hello".as_bytes())
@@ -189,12 +193,13 @@ mod tests {
189193

190194
let port = connection1.bind().await.expect("failed to bind");
191195

192-
let socket = delay_for(Duration::from_millis(100))
196+
let socket = sleep(Duration::from_millis(100))
193197
.then(|_| TcpStream::connect(format!("127.0.0.1:{}", port)))
194198
.or_else(|err| futures::future::err(Error::new(err)));
195199

196-
let (_, mut socket) = futures::try_join!(connection1.connect(22444, false), socket)
200+
let (_, socket) = futures::try_join!(connection1.connect(22444, false), socket)
197201
.expect("failed to connect");
202+
let mut socket = socket.compat();
198203

199204
socket.write("hello".as_bytes()).await.unwrap();
200205

tunshell-client/src/p2p/udp/congestion.rs

+17-15
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::{UdpConnectionVars, UdpPacket, MAX_PACKET_SIZE, SequenceNumber, UdpPacketType};
1+
use super::{SequenceNumber, UdpConnectionVars, UdpPacket, UdpPacketType, MAX_PACKET_SIZE};
22
use log::*;
33
use std::cmp;
44
use std::sync::{Arc, Mutex};
@@ -18,7 +18,7 @@ impl UdpConnectionVars {
1818
self.peer_ack_number,
1919
self.peer_window
2020
);
21-
21+
2222
self.window_wakers
2323
.push((cx.waker().clone(), packet.end_sequence_number()));
2424
return Poll::Pending;
@@ -31,7 +31,7 @@ impl UdpConnectionVars {
3131
sequence_number < self.max_send_sequence_number()
3232
}
3333

34-
pub(super) fn max_send_sequence_number(&self ) -> SequenceNumber {
34+
pub(super) fn max_send_sequence_number(&self) -> SequenceNumber {
3535
self.peer_ack_number + SequenceNumber(cmp::min(self.peer_window, self.transit_window))
3636
}
3737

@@ -40,7 +40,8 @@ impl UdpConnectionVars {
4040
}
4141

4242
pub(super) fn wait_until_decongested(&mut self, waker: Waker) {
43-
self.window_wakers.push((waker, self.sequence_number + SequenceNumber(1)));
43+
self.window_wakers
44+
.push((waker, self.sequence_number + SequenceNumber(1)));
4445
}
4546

4647
pub(super) fn increase_transit_window_after_send(&mut self) {
@@ -71,7 +72,8 @@ impl UdpConnectionVars {
7172
let max_sequence_number = self.max_send_sequence_number();
7273

7374
// Determine the number of packets we can now send form those are waiting.
74-
let (ready_wakers, pending_wakers) = self.window_wakers
75+
let (ready_wakers, pending_wakers) = self
76+
.window_wakers
7577
.drain(..)
7678
.partition(|(_, end_sequence_number)| end_sequence_number < &max_sequence_number);
7779

@@ -218,10 +220,10 @@ mod tests {
218220
let con = Arc::from(Mutex::from(con));
219221

220222
let packet = UdpPacket::data(SequenceNumber(0), SequenceNumber(0), 0, &[]);
221-
223+
222224
let packet = tokio::select! {
223225
packet = wait_until_can_send(Arc::clone(&con), packet) => packet,
224-
_ = tokio::time::delay_for(Duration::from_millis(1)) => panic!("should return from future immediately if there is sufficient window")
226+
_ = tokio::time::sleep(Duration::from_millis(1)) => panic!("should return from future immediately if there is sufficient window")
225227
};
226228

227229
let con = con.lock().unwrap();
@@ -237,7 +239,7 @@ mod tests {
237239
if std::env::var("CI").is_ok() {
238240
return;
239241
}
240-
242+
241243
Runtime::new().unwrap().block_on(async {
242244
let mut con = UdpConnectionVars::new(UdpConnectionConfig::default());
243245

@@ -247,14 +249,14 @@ mod tests {
247249
let con = Arc::from(Mutex::from(con));
248250

249251
let packet = UdpPacket::data(SequenceNumber(0), SequenceNumber(0), 0, &[]);
250-
252+
251253
let wait_for_send = tokio::spawn(wait_until_can_send(Arc::clone(&con), packet.clone()));
252254

253-
tokio::time::delay_for(Duration::from_millis(100)).await;
255+
tokio::time::sleep(Duration::from_millis(100)).await;
254256

255257
{
256258
let con = con.lock().unwrap();
257-
259+
258260
assert_eq!(con.window_wakers.len(), 1);
259261
assert_eq!(con.window_wakers[0].1, packet.end_sequence_number());
260262
}
@@ -263,18 +265,18 @@ mod tests {
263265
{
264266
let mut con = con.lock().unwrap();
265267
con.update_peer_window(MAX_PACKET_SIZE as u32);
266-
268+
267269
assert_eq!(con.window_wakers.len(), 0);
268270
}
269271

270-
// Task should now complete
272+
// Task should now complete
271273
let packet = tokio::select! {
272274
packet = wait_for_send => packet.unwrap(),
273-
_ = tokio::time::delay_for(Duration::from_millis(1)) => panic!("should return from future immediately if there is sufficient window")
275+
_ = tokio::time::sleep(Duration::from_millis(1)) => panic!("should return from future immediately if there is sufficient window")
274276
};
275277

276278
let con = con.lock().unwrap();
277-
279+
278280
assert_eq!(packet.sequence_number, SequenceNumber(0));
279281
assert_eq!(con.window_wakers.len(), 0);
280282
});

tunshell-client/src/p2p/udp/connection.rs

+13-13
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ use super::{
33
UdpConnectionState, UdpConnectionVars,
44
};
55
use anyhow::{Error, Result};
6+
use futures::{AsyncRead, AsyncWrite};
67
use log::*;
78
use std::io;
89
use std::mem;
910
use std::net::IpAddr;
1011
use std::pin::Pin;
1112
use std::sync::{Arc, Mutex};
1213
use std::task::{Context, Poll};
13-
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
1414
use tokio::net::UdpSocket;
1515
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
1616

@@ -189,15 +189,15 @@ impl UdpConnection {
189189
Ok(())
190190
}
191191

192-
/// Closes the connection
193-
#[allow(dead_code)]
194-
pub async fn close(&mut self) -> Result<()> {
195-
if !self.is_connected() {
196-
return Err(Error::msg("Connection must be in CONNECTED state"));
197-
}
192+
// /// Closes the connection
193+
// #[allow(dead_code)]
194+
// pub async fn close(&mut self) -> Result<()> {
195+
// if !self.is_connected() {
196+
// return Err(Error::msg("Connection must be in CONNECTED state"));
197+
// }
198198

199-
self.shutdown().await.map_err(Error::from)
200-
}
199+
// self.close().await.map_err(Error::from)
200+
// }
201201
}
202202

203203
impl AsyncRead for UdpConnection {
@@ -281,7 +281,7 @@ impl AsyncWrite for UdpConnection {
281281
Poll::Ready(Ok(()))
282282
}
283283

284-
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
284+
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
285285
if let State::Disconnecting(running) = &self.state {
286286
if !self.is_connected() {
287287
self.state = State::Disconnected;
@@ -321,12 +321,12 @@ impl AsyncWrite for UdpConnection {
321321
#[cfg(test)]
322322
mod tests {
323323
use super::*;
324+
use futures::{AsyncReadExt, AsyncWriteExt};
324325
use lazy_static::lazy_static;
325326
use std::net::SocketAddr;
326327
use std::time::Duration;
327-
use tokio::io::{AsyncReadExt, AsyncWriteExt};
328328
use tokio::runtime::Runtime;
329-
use tokio::time::delay_for;
329+
use tokio::time::sleep;
330330

331331
lazy_static! {
332332
static ref UDP_PORT_NUMBER: Mutex<u16> = Mutex::from(27660);
@@ -467,7 +467,7 @@ mod tests {
467467
assert_eq!(con2.is_disconnected(), true);
468468

469469
// Wait for close packet to be sent and process
470-
delay_for(Duration::from_millis(500)).await;
470+
sleep(Duration::from_millis(500)).await;
471471

472472
assert_eq!(con1.is_new(), false);
473473
assert_eq!(con1.is_connected(), false);

0 commit comments

Comments
 (0)