Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TCP feature #33

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 64 additions & 29 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::sync::{mpsc, Arc, atomic::{AtomicI32, self}};
use std::{fmt::Debug, net::{TcpListener, TcpStream, ToSocketAddrs}, sync::{atomic::{self, AtomicI32}, mpsc, Arc}};

use log::{warn, info, debug};
use anyhow::{Result, Context};
use serde_json as json;

use crate::{rpcio, bytecode::{self, BytecodeOptions}};
use crate::{bytecode::{self, BytecodeOptions}, rpcio};
use crate::lsp_message::{LspRequest, LspResponse, LspResponseError};

fn process_channel_to_writer(channel_sub: mpsc::Receiver<String>,
Expand Down Expand Up @@ -96,60 +96,57 @@ pub struct AppOptions {
pub bytecode_options: Option<bytecode::BytecodeOptions>,
}

pub fn run_app_forever(client_reader: impl std::io::Read + Send + 'static,
client_writer: impl std::io::Write + Send + 'static,
mut server_cmd: std::process::Command,
options: AppOptions) -> Result<std::process::ExitStatus> {
info!("About to run the lsp server with command {:?}", server_cmd);
// Return the receiver which can be used get notifications about thread termination
fn run_app(client_reader: impl std::io::Read + Send + 'static,
client_writer: impl std::io::Write + Send + 'static,
server_reader: impl std::io::Read + Send + 'static,
server_writer: impl std::io::Write + Send + 'static,
options: AppOptions) -> Result<mpsc::Receiver<()>> {
if let Some(ref bytecode_options) = options.bytecode_options {
info!("Will convert server json to bytecode! bytecode options: {:?}", bytecode_options);
} else {
info!("Bytecode disabled! Will forward server json as-is.")
}

let mut proc = server_cmd
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit())
.spawn()
.with_context(|| {
format!(
"Failed to run the lsp server with command: {:?}",
server_cmd
)
})?;
let (finish_sender, finish_receiver) = mpsc::channel::<()>();

let (c2s_channel_pub, c2s_channel_sub) = mpsc::channel::<String>();
let c2s_channel_counter = Arc::new(AtomicI32::new(0));
let (s2c_channel_pub, s2c_channel_sub) = mpsc::channel::<String>();

{
let finish_sender = finish_sender.clone();
let c2s_channel_counter = c2s_channel_counter.clone();
let proc_stdin = proc.stdin.take().unwrap();
std::thread::spawn(move || {
debug!("Started client->server write thread");
process_channel_to_writer(c2s_channel_sub, Some(c2s_channel_counter), proc_stdin)
process_channel_to_writer(c2s_channel_sub, Some(c2s_channel_counter), server_writer)
.with_context(|| "Client->server write thread failed")
.unwrap();
debug!("Finished client->server write thread");
let _ = finish_sender.send(()); // ignore error
});
}
{
let finish_sender = finish_sender.clone();
std::thread::spawn(move || {
debug!("Started server->client write thread");
process_channel_to_writer(s2c_channel_sub, None, client_writer)
.with_context(|| "Server->client write thread failed")
.unwrap();
debug!("Finished server->client write thread");
let _ = finish_sender.send(()); // ignore error
});
}
std::thread::spawn(move || {
debug!("Started server->client write thread");
process_channel_to_writer(s2c_channel_sub, None, client_writer)
.with_context(|| "Server->client write thread failed")
.unwrap();
debug!("Finished server->client write thread");
});
{
let finish_sender = finish_sender.clone();
let s2c_channel_pub = s2c_channel_pub.clone();
let proc_stdout = proc.stdout.take().unwrap();
std::thread::spawn(move || {
debug!("Started server->client read thread");
process_server_reader(proc_stdout, s2c_channel_pub, options.bytecode_options)
process_server_reader(server_reader, s2c_channel_pub, options.bytecode_options)
.with_context(|| "Server->client read thread failed")
.unwrap();
debug!("Finished server->client read thread");
let _ = finish_sender.send(()); // ignore error
});
}
std::thread::spawn(move || {
Expand All @@ -159,7 +156,45 @@ pub fn run_app_forever(client_reader: impl std::io::Read + Send + 'static,
.with_context(|| "Client->server read thread failed")
.unwrap();
debug!("Finished client->server read thread");
let _ = finish_sender.send(()); // ignore error
});

Ok(finish_receiver)
}

pub fn run_app_stdio(mut server_cmd: std::process::Command,
options: AppOptions) -> Result<std::process::ExitStatus> {
info!("About to run the lsp server with command {:?}", server_cmd);
let mut proc = server_cmd
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit())
.spawn()
.context(format!("Failed to run the lsp server with command: {:?}", server_cmd))?;

run_app(std::io::stdin(), std::io::stdout(),
proc.stdout.take().unwrap(), proc.stdin.take().unwrap(),
options)?;
Ok(proc.wait()?)
}

pub fn run_app_tcp(server_addr: impl ToSocketAddrs + Debug,
listen_addr: impl ToSocketAddrs + Debug,
options: AppOptions) -> Result<()> {
info!("Connecting to server at {:?}", server_addr);
let server_conn = TcpStream::connect(server_addr)?;

info!("Listenting at {:?}", listen_addr);
let client_listener = TcpListener::bind(listen_addr)?;
// NOTE: only accept single client for now. Is it enough?
let (client_conn, _) = client_listener.accept()?;
info!("Client connected, start running");

let finish_receiver = run_app(client_conn.try_clone()?,
client_conn.try_clone()?,
server_conn.try_clone()?,
server_conn.try_clone()?,
options)?;
let _ = finish_receiver.recv(); // wait for finish, ignore error
Ok(())
}
43 changes: 29 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ struct Cli {
#[arg(last = true)]
server_cmd: Vec<String>,

#[arg(long,
help = "[experimental] Use tcp mode instead of stdio mode. \
In this case, the server_cmd should contain exactly two arguments that are interpreted as server address and listening address respestively. \
E.g. `emacs_lsp_booster --tcp 127.0.0.1:1234 127.0.0.1:2345` would connect to server at port 1234 and listen at port 2345.")]
tcp: bool,

#[arg(short = 'n', long,
help = "Disable bytecode generation. Simply forward server json as-is. Useful for debugging or benchmarking.")]
disable_bytecode: bool,
Expand Down Expand Up @@ -62,26 +68,35 @@ fn main() -> Result<()> {
std::process::exit(1);
}));

// In windows, Command::new cannot find .cmd files, so use `which` to do that
// https://github.com/rust-lang/rust/issues/37519
let server_cmd_prog = if cfg!(windows) {
which::which(&cli.server_cmd[0])?
} else {
std::path::PathBuf::from(&cli.server_cmd[0])
};
trace!("Using server prog: {:?}", server_cmd_prog);
let mut cmd = std::process::Command::new(&server_cmd_prog);
cmd.args(&cli.server_cmd[1..]);

let exit_status = app::run_app_forever(std::io::stdin(), std::io::stdout(), cmd, app::AppOptions {
let app_options = app::AppOptions {
bytecode_options: if !cli.disable_bytecode {
Some(bytecode::BytecodeOptions {
object_type: cli.json_object_type,
null_value: cli.json_null_value,
false_value: cli.json_false_value,
}) } else { None },
})?;
std::process::exit(exit_status.code().unwrap_or(1))
};

if cli.tcp {
if cli.server_cmd.len() != 2 {
bail!("Need exactly two arguments as address for tcp mode");
}
app::run_app_tcp(&cli.server_cmd[0], &cli.server_cmd[1], app_options)
} else {
// In windows, Command::new cannot find .cmd files, so use `which` to do that
// https://github.com/rust-lang/rust/issues/37519
let server_cmd_prog = if cfg!(windows) {
which::which(&cli.server_cmd[0])?
} else {
std::path::PathBuf::from(&cli.server_cmd[0])
};
trace!("Using server prog: {:?}", server_cmd_prog);
let mut cmd = std::process::Command::new(&server_cmd_prog);
cmd.args(&cli.server_cmd[1..]);

let exit_status = app::run_app_stdio(cmd, app_options)?;
std::process::exit(exit_status.code().unwrap_or(1))
}
}

#[test]
Expand Down
Loading