Skip to content

Commit

Permalink
WIP raft integration
Browse files Browse the repository at this point in the history
  • Loading branch information
eaneto committed Apr 25, 2024
1 parent acba534 commit e6ee61c
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ bytes = { version = "1", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
bincode = "1.3.3"
rand = "0.8.5"
clokwerk = "0.4.0"
43 changes: 41 additions & 2 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, time::Duration};

use bytes::{BufMut, BytesMut};
use clap::Parser;
use clokwerk::{AsyncScheduler, TimeUnits};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
Expand All @@ -26,6 +27,12 @@ struct Args {
/// Server's unique id
#[arg(short, long, default_value_t = 1)]
id: u64,
/// List of node ids
#[arg(long)]
node_ids: Vec<u64>,
/// List of node addresses
#[arg(long)]
node_addresses: Vec<String>,
}

#[tokio::main]
Expand All @@ -44,12 +51,44 @@ async fn main() {
let log_segments = Arc::new(RwLock::new(HashMap::new()));
load_logs(logs.clone(), log_segments.clone()).await;

let server = Arc::new(Mutex::new(raft::Server::new(args.id)));
let mut nodes = HashMap::new();
for i in 0..(args.node_ids.len()) {
let node_id = args.node_ids.get(i).unwrap();
let node_address = args.node_addresses.get(i).unwrap();
let node = raft::Node {
id: *node_id,
address: node_address.clone(),
};
nodes.insert(*node_id, node);
}

println!("{:?}", nodes);
let server = Arc::new(Mutex::new(raft::Server::new(args.id, nodes)));
// If a new node is up it immediately tries to become the leader,
// if there's already a leader in the cluster it will receive
// other node's response and become a follower.
server.lock().await.start_election().await;

// TODO: Health job
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
//server.lock().await.
}
});

// TODO: Election timeout
// TODO: Election job
let election_timeout = server.lock().await.election_timeout();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(election_timeout as u64)).await;
// TODO Check if election is needed.
// If heartbeats not received, then:
trace!("Start new election");
}
});

handle_connections(server, listener, logs, log_segments).await;
}

Expand Down
5 changes: 2 additions & 3 deletions src/log.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::HashMap, env, ffi::OsStr, os::unix::fs::OpenOptionsExt,
os::unix::prelude::OsStrExt, path::Path, sync::Arc, time::SystemTime,
collections::HashMap, env, ffi::OsStr, os::unix::prelude::OsStrExt, path::Path, sync::Arc,
time::SystemTime,
};

use bytes::BytesMut;
Expand Down Expand Up @@ -392,7 +392,6 @@ impl CommitLogReceiver {
let entry_in_storage_format = self.build_entry_in_storage_format(&entry, id);
let result = OpenOptions::new()
.append(true)
.custom_flags(libc::O_DIRECT)
.open(&segment_filename)
.await;
let mut log_file = match result {
Expand Down
69 changes: 53 additions & 16 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::{
collections::{HashMap, HashSet},
};

use tracing::{error, trace};

use bytes::BytesMut;
use rand::Rng;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -57,14 +59,14 @@ pub struct Server {
nodes: HashMap<u64, Node>,
}

#[derive(Clone)]
struct Node {
id: u64,
address: String,
#[derive(Clone, Debug)]
pub struct Node {
pub id: u64,
pub address: String,
}

impl Server {
pub fn new(id: u64) -> Server {
pub fn new(id: u64, nodes: HashMap<u64, Node>) -> Server {
Server {
id,
current_term: 0,
Expand All @@ -77,11 +79,13 @@ impl Server {
votes_received: HashSet::new(),
sent_length: HashMap::new(),
acked_length: HashMap::new(),
nodes: HashMap::new(),
nodes,
}
}

// Leader election
pub fn election_timeout(&self) -> u16 {
self.election_timeout
}

pub async fn start_election(&mut self) {
self.current_term += 1;
Expand All @@ -97,32 +101,65 @@ impl Server {
last_term,
};

// TODO: Send VoteRequest to other nodes
let nodes = self.nodes.clone();
for (_, node) in nodes.into_iter() {
let response = self.send_vote_request(&vote_request, &node).await;
self.process_vote_response(response).await;
// TODO Send requests in parallel.
// TODO Treat error
if let Ok(response) = self.send_vote_request(&vote_request, &node).await {
self.process_vote_response(response).await;
}
}
}

async fn send_vote_request(&self, vote_request: &VoteRequest, node: &Node) -> VoteResponse {
async fn send_vote_request(
&self,
vote_request: &VoteRequest,
node: &Node,
) -> Result<VoteResponse, &str> {
let command_byte = 4_u8.to_be_bytes();
let encoded_request = bincode::serialize(vote_request).unwrap();
let mut buf = Vec::new();
buf.extend(command_byte);
buf.extend(encoded_request.len().to_be_bytes());
buf.extend(encoded_request);

let mut stream = TcpStream::connect(&node.address).await.unwrap();
stream.write_all(&buf).await.unwrap();
// TODO: Retry
let mut stream = match TcpStream::connect(&node.address).await {
Ok(stream) => stream,
Err(_) => {
error!("Can't connect to node at {}", &node.address);
return Err("Can't connect to node");
}
};

// What should be done in case of failure?
match stream.write_all(&buf).await {
Ok(()) => {
trace!("Successfully sent request to node {}", &node.id)
}
Err(_) => {
error!("Unable to send request to node {}", &node.id);
return Err("Unable to send request to node");
}
};
let mut buf = [0; 1024];
stream.read(&mut buf).await.unwrap();
match stream.read(&mut buf).await {
Ok(_) => (),
Err(_) => {
error!("Can't read response from client {}", &node.id);
return Err("Can't read response from client");
}
};
if buf[0] == 0 {
let length = buf.get(1..9).unwrap();
let length = usize::from_be_bytes(length.try_into().unwrap());
return bincode::deserialize(buf.get(9..(9 + length)).unwrap()).unwrap();
let encoded_response = match buf.get(9..(9 + length)) {
Some(response) => response,
None => return Err("Incomplete response, unable to parse vote response"),
};
Ok(bincode::deserialize(encoded_response).unwrap())
} else {
panic!("Error");
Err("Response is not successful")
}
}

Expand Down

0 comments on commit e6ee61c

Please sign in to comment.