-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhttp.rs
96 lines (86 loc) · 2.84 KB
/
http.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use crate::runtime::{self, reactor};
use mio::Interest;
use std::{
future::Future,
io::{ErrorKind, Read, Write},
pin::Pin,
task::{Context, Poll}
};
fn get_req(path: &str) -> String {
format!(
"GET {path} HTTP/1.1\r\n\
Host: localhost\r\n\
Connection: close\r\n\
\r\n"
)
}
pub struct Http;
impl Http {
pub fn get(path: &str) -> impl Future<Output = String> {
HttpGetFuture::new(path.to_string())
}
}
struct HttpGetFuture {
stream: Option<mio::net::TcpStream>,
buffer: Vec<u8>,
path: String,
id: usize,
}
impl HttpGetFuture {
fn new(path: String) -> Self {
let id = reactor().next_id();
Self {
stream: None,
buffer: vec![],
path: path.to_string(),
id,
}
}
fn write_request(&mut self) {
let stream = std::net::TcpStream::connect("127.0.0.1:8080").unwrap();
stream.set_nonblocking(true).unwrap();
let mut stream = mio::net::TcpStream::from_std(stream);
stream.write_all(get_req(&self.path).as_bytes()).unwrap();
self.stream = Some(stream);
}
}
impl Future for HttpGetFuture {
type Output = String;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let id = self.id;
if self.stream.is_none() {
println!("FIRST POLL - START OPERATION");
self.write_request();
let stream = (&mut self).stream.as_mut().unwrap();
runtime::reactor().register(stream, Interest::READABLE, id);
runtime::reactor().set_waker(cx, self.id);
}
let mut buff = vec![0u8; 4096];
loop {
match self.stream.as_mut().unwrap().read(&mut buff) {
Ok(0) => {
let s = String::from_utf8_lossy(&self.buffer).to_string();
runtime::reactor()
.deregister(self.stream.as_mut().unwrap(), id);
break Poll::Ready(s);
}
Ok(n) => {
self.buffer.extend(&buff[0..n]);
continue;
}
Err(e) if e.kind() == ErrorKind::WouldBlock => {
// https://doc.rust-lang.org/stable/std/future/trait.Future.html#tymethod.poll
// The `Waker` from the most recent call is expected to be scheduled to wake up.
// Meaning every time a `WouldBlock` error is received, the most recent `Waker`
// must be stored.
runtime::reactor().set_waker(cx, self.id);
break Poll::Pending;
}
Err(e) if e.kind() == ErrorKind::Interrupted => {
continue;
}
Err(e) => panic!("{e:?}"),
}
}
}
}