From 077401c591e542f9df0deb5a7bb1eac8d5ba0ca5 Mon Sep 17 00:00:00 2001 From: Larry Fantasy Date: Thu, 22 Apr 2021 22:35:25 -0700 Subject: [PATCH] hls --- Cargo.lock | 60 ++++++--- Makefile.toml | 4 +- consts/src/lib.rs | 3 +- infra/config.cfg | 2 +- msg/src/lib.rs | 22 +++- server/Cargo.toml | 2 +- server/src/agents/chunk.rs | 3 - server/src/agents/envelope.rs | 43 ++++++- server/src/agents/mod.rs | 1 - server/src/agents/package.rs | 26 ---- server/src/agents/registry.rs | 220 ++++++++++++++++++--------------- server/src/agents/search.rs | 116 +++++++++-------- server/src/agents/websocket.rs | 51 ++++---- wasm/Cargo.toml | 1 + wasm/src/init.rs | 4 +- wasm/src/media.rs | 8 +- wasm/src/message.rs | 5 +- wasm/src/model.rs | 3 +- wasm/src/update.rs | 31 ++--- wasm/src/view.rs | 21 +++- 20 files changed, 362 insertions(+), 264 deletions(-) delete mode 100644 server/src/agents/package.rs diff --git a/Cargo.lock b/Cargo.lock index 4c6c0264..01d2b132 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,7 +19,7 @@ dependencies = [ "parking_lot", "pin-project 0.4.27", "smallvec", - "tokio 0.2.23", + "tokio", "tokio-util", "trust-dns-proto", "trust-dns-resolver", @@ -37,7 +37,7 @@ dependencies = [ "futures-sink", "log", "pin-project 0.4.27", - "tokio 0.2.23", + "tokio", "tokio-util", ] @@ -162,7 +162,7 @@ dependencies = [ "futures-channel", "futures-util", "smallvec", - "tokio 0.2.23", + "tokio", ] [[package]] @@ -1344,7 +1344,7 @@ dependencies = [ "http", "indexmap", "slab", - "tokio 0.2.23", + "tokio", "tokio-util", "tracing", "tracing-futures", @@ -1493,7 +1493,7 @@ dependencies = [ "itoa", "pin-project 1.0.1", "socket2", - "tokio 0.2.23", + "tokio", "tower-service", "tracing", "want", @@ -1508,7 +1508,7 @@ dependencies = [ "bytes", "hyper", "native-tls", - "tokio 0.2.23", + "tokio", "tokio-tls", ] @@ -1781,12 +1781,24 @@ dependencies = [ "kernel32-sys", "libc", "log", - "miow", + "miow 0.2.1", "net2", "slab", "winapi 0.2.8", ] +[[package]] +name = "mio-named-pipes" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656" +dependencies = [ + "log", + "mio", + "miow 0.3.7", + "winapi 0.3.9", +] + [[package]] name = "mio-uds" version = "0.6.8" @@ -1810,6 +1822,15 @@ dependencies = [ "ws2_32-sys", ] +[[package]] +name = "miow" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9f1c5b025cda876f66ef43a113f91ebc9f4ccef34843000e0adf6ebbab84e21" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "native-tls" version = "0.2.6" @@ -2371,7 +2392,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded 0.6.1", - "tokio 0.2.23", + "tokio", "tokio-tls", "url", "wasm-bindgen", @@ -2539,7 +2560,7 @@ dependencies = [ "serde", "serde_json", "sonic-channel", - "tokio 1.5.0", + "tokio", "uaparser", "uuid", ] @@ -3104,22 +3125,25 @@ dependencies = [ "libc", "memchr", "mio", + "mio-named-pipes", "mio-uds", "num_cpus", "pin-project-lite 0.1.11", "signal-hook-registry", "slab", + "tokio-macros", "winapi 0.3.9", ] [[package]] -name = "tokio" -version = "1.5.0" +name = "tokio-macros" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83f0c8e7c0addab50b663055baf787d0af7f413a46e6e7fb9559a4e4db7137a5" +checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a" dependencies = [ - "autocfg", - "pin-project-lite 0.2.4", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -3129,7 +3153,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a70f4fcd7b3b24fb194f837560168208f669ca8cb70d0c4b862944452396343" dependencies = [ "native-tls", - "tokio 0.2.23", + "tokio", ] [[package]] @@ -3144,7 +3168,7 @@ dependencies = [ "futures-sink", "log", "pin-project-lite 0.1.11", - "tokio 0.2.23", + "tokio", ] [[package]] @@ -3209,7 +3233,7 @@ dependencies = [ "rand 0.7.3", "smallvec", "thiserror", - "tokio 0.2.23", + "tokio", "url", ] @@ -3229,7 +3253,7 @@ dependencies = [ "resolv-conf", "smallvec", "thiserror", - "tokio 0.2.23", + "tokio", "trust-dns-proto", ] diff --git a/Makefile.toml b/Makefile.toml index 84c1e9f5..3e13f1a5 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -31,10 +31,10 @@ script = "tar -czvf static.tar.gz static" [tasks.watch-wasm] run_task = { name = ["wasm-pack"] } -watch = { postpone = true, watch = ["./wasm/", "./msg/"] } +watch = { postpone = true, watch = ["./wasm/", "./msg/", "./consts"] } [tasks.watch-server] -script = "cargo watch -w server -w msg -x 'run --bin rustgym-server'" +script = "cargo watch -w server -w msg -w consts -x 'run --bin rustgym-server'" dependencies = ["wasm-pack"] [tasks.sonic] diff --git a/consts/src/lib.rs b/consts/src/lib.rs index 8e9189f0..1a1b07fa 100644 --- a/consts/src/lib.rs +++ b/consts/src/lib.rs @@ -24,4 +24,5 @@ pub const SONIC_BUCKET: &str = "bucket"; pub const SEARCH_PLACEHOLDER: &str = "Search Rust Solutions"; pub const TIME_SLICE: i32 = 100; -pub const MIME_TYPE: &str = "video/webm;codecs=vp8,opus"; +pub const MIME_TYPE: &str = "video/x-matroska;codecs=avc1,opus"; +pub const STREAM_DIR: &str = "/Users/larry/Documents/rustgym/stream"; diff --git a/infra/config.cfg b/infra/config.cfg index 22e4c606..515e855b 100644 --- a/infra/config.cfg +++ b/infra/config.cfg @@ -6,7 +6,7 @@ [server] -log_level = "debug" +log_level = "error" [channel] diff --git a/msg/src/lib.rs b/msg/src/lib.rs index 69e83962..e5f1e8b2 100644 --- a/msg/src/lib.rs +++ b/msg/src/lib.rs @@ -7,16 +7,29 @@ use uuid::Uuid; #[derive(Serialize, Deserialize, Debug, Clone)] pub enum Msg { + In(MsgIn), + Out(MsgOut), +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum MsgIn { + Ping, + Pong, + SearchText(String), + QueryText(String), + StreamStart(String), +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum MsgOut { Ping, Pong, RegistorClient(ClientInfo), UnRegistorClient(ClientInfo), - SessionClients(HashSet), - SearchText(String), + SessionClients(Vec), + AllClients(Vec), SearchSuggestions(Vec), - QueryText(String), QueryResults(Vec), - StreamStart(String), } #[derive(Serialize, Deserialize, Debug, Clone, Hash, Eq, PartialEq, new)] @@ -25,6 +38,7 @@ pub struct ClientInfo { pub client_uuid: Uuid, pub name: String, pub chrome: bool, + pub streaming: bool, } #[derive(Serialize, Deserialize, Debug, Clone, new)] diff --git a/server/Cargo.toml b/server/Cargo.toml index e1ac76bd..76e172f7 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -38,5 +38,5 @@ serde = "1.0.118" serde_json = "1.0.61" uuid = { version = "0.8", features = ["serde", "v4"] } sonic-channel = "0.4.0" -tokio = { version = "1.5.0", features = ["sync"] } +tokio = { version = "0.2", features = ["full"] } uaparser = "0.4.0" diff --git a/server/src/agents/chunk.rs b/server/src/agents/chunk.rs index 5d4672a7..343e4d1a 100644 --- a/server/src/agents/chunk.rs +++ b/server/src/agents/chunk.rs @@ -1,13 +1,10 @@ -use crate::agents::websocket::SocketClient; use actix::prelude::*; use actix_web::web::Bytes; use rustgym_msg::ClientInfo; -use rustgym_msg::Msg; #[derive(Message, Clone, new)] #[rtype(result = "()")] pub struct Chunk { - pub client_addr: Addr, pub client_info: ClientInfo, pub bytes: Bytes, } diff --git a/server/src/agents/envelope.rs b/server/src/agents/envelope.rs index 58dffb39..a1cc5093 100644 --- a/server/src/agents/envelope.rs +++ b/server/src/agents/envelope.rs @@ -1,9 +1,46 @@ +use crate::agents::websocket::SocketClient; +use actix::prelude::*; use rustgym_msg::ClientInfo; -use rustgym_msg::Msg; -use serde::{Deserialize, Serialize}; +use rustgym_msg::*; +use std::fmt; -#[derive(Debug, Deserialize, Serialize, Clone, new)] +#[derive(Message, Debug, Clone, new)] +#[rtype(result = "()")] pub struct Envelope { + pub client_addr: Addr, pub client_info: ClientInfo, pub msg: Msg, } + +impl Envelope { + pub fn from_msg_in( + client_addr: Addr, + client_info: ClientInfo, + msg_in: MsgIn, + ) -> Self { + let msg = Msg::In(msg_in); + Envelope { + client_addr, + client_info, + msg, + } + } + pub fn from_msg_out( + client_addr: Addr, + client_info: ClientInfo, + msg_out: MsgOut, + ) -> Self { + let msg = Msg::Out(msg_out); + Envelope { + client_addr, + client_info, + msg, + } + } +} + +impl fmt::Debug for SocketClient { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "") + } +} diff --git a/server/src/agents/mod.rs b/server/src/agents/mod.rs index 7e1d829b..e80f3e40 100644 --- a/server/src/agents/mod.rs +++ b/server/src/agents/mod.rs @@ -1,6 +1,5 @@ pub mod chunk; pub mod envelope; -pub mod package; pub mod registry; pub mod search; pub mod uap; diff --git a/server/src/agents/package.rs b/server/src/agents/package.rs deleted file mode 100644 index b28e9d2b..00000000 --- a/server/src/agents/package.rs +++ /dev/null @@ -1,26 +0,0 @@ -use crate::agents::envelope::Envelope; -use crate::agents::websocket::SocketClient; -use actix::prelude::*; -use rustgym_msg::ClientInfo; -use rustgym_msg::Msg; - -#[derive(Message, Clone, new)] -#[rtype(result = "()")] -pub struct Package { - pub client_addr: Addr, - pub envelope: Envelope, -} - -impl Package { - pub fn from_message( - client_addr: Addr, - client_info: ClientInfo, - msg: Msg, - ) -> Self { - let envelope = Envelope::new(client_info, msg); - Package { - client_addr, - envelope, - } - } -} diff --git a/server/src/agents/registry.rs b/server/src/agents/registry.rs index e80a7fd0..d8da97fc 100644 --- a/server/src/agents/registry.rs +++ b/server/src/agents/registry.rs @@ -1,62 +1,82 @@ use crate::agents::chunk::Chunk; use crate::agents::envelope::Envelope; -use crate::agents::package::Package; use crate::agents::search::SearchAgent; use crate::agents::websocket::SocketClient; use actix::prelude::*; use log::{error, info}; use rustgym_consts::*; use rustgym_msg::ClientInfo; -use rustgym_msg::Msg; +use rustgym_msg::*; use std::cell::RefCell; use std::collections::HashMap; -use std::collections::HashSet; -use std::fs::File; -use std::fs::OpenOptions; -use std::io::prelude::*; -use std::process::Command; +use std::io::Write; +use std::process::{Child, Command, Stdio}; use std::rc::Rc; -use std::sync::Arc; -use std::sync::Mutex; use uuid::Uuid; #[derive(Clone)] pub struct RegistryAgent { search_addr: Addr, - all_session_clients: HashMap>, - all_clients: HashMap>, - all_streams: HashMap>>, + all_clients: HashMap, + all_sockets: HashMap>, + all_streams: HashMap>>, } impl RegistryAgent { pub fn new(search_addr: Addr) -> Self { - let all_session_clients = HashMap::new(); - let all_clients = HashMap::new(); let all_streams = HashMap::new(); + let all_clients = HashMap::new(); + let all_sockets = HashMap::new(); RegistryAgent { search_addr, - all_session_clients, + all_sockets, all_clients, all_streams, } } - fn update_session_clients(&self, session_uuid: Uuid, msg: Msg) { - if let Some(session_clients) = self.all_session_clients.get(&session_uuid) { - for client_info in session_clients.iter() { - if let Some(client_addr) = self.all_clients.get(&client_info.client_uuid) { - let msg = msg.clone(); + fn clients_with_session(&self, session_uuid: Uuid) -> Vec { + let mut res: Vec = vec![]; + for client_info in self.all_clients.values() { + if client_info.session_uuid == session_uuid { + res.push(client_info.clone()); + } + } + res + } + + fn all_clients(&self) -> Vec { + self.all_clients.values().cloned().collect() + } + + fn update_clients_with_session(&self, session_uuid: Uuid, msg_out: MsgOut) { + for client_info in self.all_clients.values() { + if client_info.session_uuid == session_uuid { + if let Some(client_addr) = self.all_sockets.get(&client_info.client_uuid) { + let msg_out = msg_out.clone(); let client_addr_clone = client_addr.clone(); - let envelope = Envelope::new(client_info.clone(), msg); - let package = Package::new(client_addr_clone, envelope); - client_addr.do_send(package); + let envelope = + Envelope::from_msg_out(client_addr_clone, client_info.clone(), msg_out); + client_addr.do_send(envelope); } else { error!("{} recipient not found", client_info.client_uuid); } } - } else { - error!("{} not found", session_uuid); + } + } + + fn update_all_clients(&self, msg_out: MsgOut) { + for client_info in self.all_clients.values() { + if let Some(client_addr) = self.all_sockets.get(&client_info.client_uuid) { + let msg_out = msg_out.clone(); + let client_addr_clone = client_addr.clone(); + let envelope = + Envelope::from_msg_out(client_addr_clone, client_info.clone(), msg_out); + client_addr.do_send(envelope); + } else { + error!("{} recipient not found", client_info.client_uuid); + } } } } @@ -65,78 +85,83 @@ impl Actor for RegistryAgent { type Context = Context; } -impl Handler for RegistryAgent { +impl Handler for RegistryAgent { type Result = (); - fn handle(&mut self, package: Package, _ctx: &mut Context) { - let Package { + fn handle(&mut self, envelope: Envelope, _ctx: &mut Context) { + let Envelope { client_addr, - envelope, - } = package.clone(); - let Envelope { client_info, msg } = envelope; - use Msg::*; + client_info, + msg, + } = envelope.clone(); + let session_uuid = client_info.session_uuid; + let client_uuid = client_info.client_uuid; + match msg { - Ping => {} - Pong => {} - SessionClients(_) => {} - SearchSuggestions(_) => {} - QueryResults(_) => {} - RegistorClient(_) => { - let session_uuid = client_info.session_uuid; - let client_uuid = client_info.client_uuid; - self.all_session_clients - .entry(client_info.session_uuid) - .or_default() - .insert(client_info); - self.all_clients.entry(client_uuid).or_insert(client_addr); - let session_clients = self - .all_session_clients - .get(&session_uuid) - .expect("session clients"); - let msg = Msg::SessionClients(session_clients.clone()); - self.update_session_clients(session_uuid, msg); - } - UnRegistorClient(_) => { - self.all_session_clients - .entry(client_info.session_uuid) - .or_default() - .remove(&client_info); - self.all_clients.remove(&client_info.client_uuid); - let session_clients = self - .all_session_clients - .get(&client_info.session_uuid) - .expect("session clients"); - let msg = Msg::SessionClients(session_clients.clone()); - let client_uuid = client_info.client_uuid; - self.all_streams.remove(&client_uuid); - self.update_session_clients(client_info.session_uuid, msg); - } - SearchText(_) => { - self.search_addr.do_send(package); - } - QueryText(_) => { - self.search_addr.do_send(package); - } - StreamStart(mime_type) => { - let client_uuid = client_info.client_uuid; - if mime_type == MIME_TYPE { - let file_name = format!("stream/{}.{}", client_uuid, "webm"); - let file = OpenOptions::new() - .write(true) - .create(true) - .open(file_name) - .expect("file"); - self.all_streams - .entry(client_uuid) - .or_insert(Rc::new(RefCell::new(file))); + Msg::In(msg_in) => match msg_in { + MsgIn::SearchText(_) => { + self.search_addr.do_send(envelope); } - - // let ffplay = Command::new("ffplay") - // .args(&["-f", vals[0], "-i", "0"]) - // .output() - // .expect("ffplay"); - // println!("{} {:?}", mime_type, ffplay); - } + MsgIn::QueryText(_) => { + self.search_addr.do_send(envelope); + } + MsgIn::StreamStart(mime_type) => { + if mime_type == MIME_TYPE { + info!("{}", client_uuid); + let playlist = format!("{}/{}.m3u8", STREAM_DIR, client_uuid); + let ffmpeg = Command::new("ffmpeg") + .args(&[ + "-i", + "-", + "-f", + "hls", + "-c:v", + "copy", + "-hls_time", + "0.1", + "-hls_flags", + "delete_segments", + &playlist, + ]) + .stdin(Stdio::piped()) + .spawn() + .expect("ffmpeg"); + self.all_streams + .insert(client_uuid, Rc::new(RefCell::new(ffmpeg))); + if let Some(client_info) = self.all_clients.get_mut(&client_uuid) { + client_info.streaming = true; + } + let all_clients = self.all_clients(); + let msg_out = MsgOut::AllClients(all_clients); + self.update_all_clients(msg_out); + } + } + _ => { + error!("{:?}", msg_in); + } + }, + Msg::Out(msg_out) => match msg_out { + MsgOut::RegistorClient(_) => { + self.all_sockets.entry(client_uuid).or_insert(client_addr); + self.all_clients.entry(client_uuid).or_insert(client_info); + let all_clients = self.all_clients(); + let msg_out = MsgOut::AllClients(all_clients); + self.update_all_clients(msg_out); + } + MsgOut::UnRegistorClient(_) => { + if let Some(ffmpeg) = self.all_streams.get(&client_uuid) { + ffmpeg.borrow_mut().kill().expect("command wasn't running"); + } + self.all_sockets.remove(&client_uuid); + self.all_clients.remove(&client_uuid); + let all_clients = self.all_clients(); + let msg_out = MsgOut::AllClients(all_clients); + self.update_all_clients(msg_out); + } + _ => { + error!("{:?}", msg_out); + } + }, } } } @@ -145,13 +170,10 @@ impl Handler for RegistryAgent { type Result = (); fn handle(&mut self, chunk: Chunk, _ctx: &mut Context) { - let Chunk { - client_addr, - client_info, - bytes, - } = chunk.clone(); - if let Some(file) = self.all_streams.get(&client_info.client_uuid) { - match file.borrow_mut().write_all(&bytes) { + let Chunk { client_info, bytes } = chunk.clone(); + let client_uuid = client_info.client_uuid; + if let Some(ffmpeg) = self.all_streams.get(&client_uuid) { + match ffmpeg.borrow().stdin.as_ref().unwrap().write_all(&bytes) { Ok(_) => { info!("{}", bytes.len()) } diff --git a/server/src/agents/search.rs b/server/src/agents/search.rs index 417da4a6..014df71d 100644 --- a/server/src/agents/search.rs +++ b/server/src/agents/search.rs @@ -1,13 +1,13 @@ use crate::agents::envelope::Envelope; -use crate::agents::package::Package; use crate::db::*; use actix::prelude::*; use anyhow::Result; use diesel::prelude::*; +use log::error; use log::info; use rustgym_consts::*; -use rustgym_msg::Msg; use rustgym_msg::QueryResult; +use rustgym_msg::*; use rustgym_schema::AdventOfCodeDescription; use rustgym_schema::GoogleProblem; use rustgym_schema::LeetcodeQuestion; @@ -36,45 +36,59 @@ impl Actor for SearchAgent { type Context = Context; } -impl Handler for SearchAgent { +impl Handler for SearchAgent { type Result = (); - fn handle(&mut self, package: Package, _ctx: &mut Context) { - let Package { + fn handle(&mut self, envelope: Envelope, _ctx: &mut Context) { + let Envelope { client_addr, - envelope, - } = package; - let Envelope { client_info, msg } = envelope; + client_info, + msg, + } = envelope; - use Msg::*; match msg { - Ping => {} - Pong => {} - SessionClients(_) => {} - SearchSuggestions(_) => {} - RegistorClient(_) => {} - UnRegistorClient(_) => {} - QueryResults(_) => {} - StreamStart(_) => {} - SearchText(text) => { - let text: String = cleanup(text); - let search_words: Vec = - text.split_whitespace().map(|s| s.to_string()).collect(); - if let Some(last) = search_words.last() { - info!("{}", last); - let mut suggestions = vec![]; - + Msg::In(msg_in) => match msg_in { + MsgIn::SearchText(text) => { + let text: String = cleanup(text); + let search_words: Vec = + text.split_whitespace().map(|s| s.to_string()).collect(); + if let Some(last) = search_words.last() { + info!("{}", last); + let mut suggestions = vec![]; + match self + .search_channel + .suggest(SONIC_COLLECTION, SONIC_BUCKET, last) + { + Ok(words) => { + info!("{:?}", words); + for word in words { + let mut words = search_words.to_vec(); + words.pop(); + words.push(word); + let suggestion: String = words.join(" "); + suggestions.push(suggestion); + } + } + Err(err) => { + info!("reconnect {:?}", err); + self.reconnect(); + } + } + let msg_out = MsgOut::SearchSuggestions(suggestions); + let envelope = + Envelope::from_msg_out(client_addr.clone(), client_info, msg_out); + client_addr.do_send(envelope); + } + } + MsgIn::QueryText(text) => { + let text: String = cleanup(text); + let mut query_results: Vec = vec![]; match self .search_channel - .suggest(SONIC_COLLECTION, SONIC_BUCKET, last) + .query(SONIC_COLLECTION, SONIC_BUCKET, &text) { - Ok(words) => { - info!("{:?}", words); - for word in words { - let mut words = search_words.to_vec(); - words.pop(); - words.push(word); - let suggestion: String = words.join(" "); - suggestions.push(suggestion); + Ok(objects) => { + if let Ok(results) = get_results(objects, &self.pool) { + query_results = results; } } Err(err) => { @@ -82,33 +96,17 @@ impl Handler for SearchAgent { self.reconnect(); } } - let msg = Msg::SearchSuggestions(suggestions); - let envelope = Envelope::new(client_info, msg); - let package = Package::new(client_addr.clone(), envelope); - client_addr.do_send(package); + let msg_out = MsgOut::QueryResults(query_results); + let envelope = + Envelope::from_msg_out(client_addr.clone(), client_info, msg_out); + client_addr.do_send(envelope); } - } - QueryText(text) => { - let text: String = cleanup(text); - let mut query_results: Vec = vec![]; - match self - .search_channel - .query(SONIC_COLLECTION, SONIC_BUCKET, &text) - { - Ok(objects) => { - if let Ok(results) = get_results(objects, &self.pool) { - query_results = results; - } - } - Err(err) => { - info!("reconnect {:?}", err); - self.reconnect(); - } + _ => { + error!("{:?}", msg_in); } - let msg = Msg::QueryResults(query_results); - let envelope = Envelope::new(client_info, msg); - let package = Package::new(client_addr.clone(), envelope); - client_addr.do_send(package); + }, + Msg::Out(msg_out) => { + error!("{:?}", msg_out); } } } diff --git a/server/src/agents/websocket.rs b/server/src/agents/websocket.rs index e121b86d..2472daa7 100644 --- a/server/src/agents/websocket.rs +++ b/server/src/agents/websocket.rs @@ -1,5 +1,5 @@ use crate::agents::chunk::Chunk; -use crate::agents::package::Package; +use crate::agents::envelope::Envelope; use crate::agents::registry::RegistryAgent; use crate::agents::uap::{UapAgent, UserAgentRequest}; use crate::session_data::update_session; @@ -11,15 +11,15 @@ use actix_web::HttpRequest; use actix_web::HttpResponse; use actix_web_actors::ws; use log::debug; +use log::error; use log::info; use rustgym_consts::*; use rustgym_msg::ClientInfo; -use rustgym_msg::Msg; +use rustgym_msg::*; use std::cell::RefCell; use std::fs::File; use std::rc::Rc; use std::time::Instant; -use uaparser::UserAgent; use uuid::Uuid; pub async fn ws_index( @@ -38,12 +38,14 @@ pub async fn ws_index( let session_data = update_session(session)?; let session_uuid = session_data.uuid; let name = session_data.name; + let streaming = false; let client_uuid = Uuid::new_v4(); let client_info = ClientInfo { client_uuid, session_uuid, name, chrome, + streaming, }; let socket_client = SocketClient::new(client_info, registry_addr.get_ref().clone()); ws::start(socket_client, &req, stream) @@ -85,34 +87,39 @@ impl Actor for SocketClient { type Context = ws::WebsocketContext; fn started(&mut self, ctx: &mut Self::Context) { - let msg = Msg::RegistorClient(self.client_info.clone()); - let json = serde_json::to_string(&msg).expect("json"); + let msg_out = MsgOut::RegistorClient(self.client_info.clone()); + let json = serde_json::to_string(&msg_out).expect("json"); ctx.text(json); let address = ctx.address(); - let client_info = self.client_info.clone(); - let package = Package::from_message(address, client_info, msg); - self.registry_addr.do_send(package); + let envelop = Envelope::from_msg_out(address, self.client_info.clone(), msg_out); + self.registry_addr.do_send(envelop); // ctx.add_message_stream(); self.hb(ctx); } fn stopping(&mut self, ctx: &mut Self::Context) -> actix::Running { - let msg = Msg::UnRegistorClient(self.client_info.clone()); + let msg_out = MsgOut::UnRegistorClient(self.client_info.clone()); let address = ctx.address(); - let client_info = self.client_info.clone(); - let package = Package::from_message(address, client_info, msg); - self.registry_addr.do_send(package); + let envelop = Envelope::from_msg_out(address, self.client_info.clone(), msg_out); + self.registry_addr.do_send(envelop); self.hb(ctx); actix::Running::Stop } } -impl Handler for SocketClient { +impl Handler for SocketClient { type Result = (); - fn handle(&mut self, package: Package, ctx: &mut Self::Context) { - let json = serde_json::to_string(&package.envelope.msg).expect("json"); - ctx.text(json); + fn handle(&mut self, envelope: Envelope, ctx: &mut Self::Context) { + match envelope.msg { + Msg::In(msg_in) => { + error!("{:?}", msg_in); + } + Msg::Out(msg_out) => { + let json = serde_json::to_string(&msg_out).expect("json"); + ctx.text(json); + } + } } } @@ -128,12 +135,11 @@ impl StreamHandler> for SocketClient { self.hb = Instant::now(); } Ok(ws::Message::Text(text)) => { - if let Ok(msg) = serde_json::from_str::(&text) { - info!("{:?}", msg); + if let Ok(msg_in) = serde_json::from_str::(&text) { + info!("{:?}", msg_in); let address = ctx.address(); - let client_info = self.client_info.clone(); - let package = Package::from_message(address, client_info, msg); - self.registry_addr.do_send(package); + let envelope = Envelope::from_msg_in(address, self.client_info.clone(), msg_in); + self.registry_addr.do_send(envelope); } else { info!("{:?}", text); ctx.text(text) @@ -141,9 +147,8 @@ impl StreamHandler> for SocketClient { } Ok(ws::Message::Binary(bytes)) => { info!("{} {:?}", self.client_info.client_uuid, bytes.len()); - let address = ctx.address(); let client_info = self.client_info.clone(); - let chuck = Chunk::new(address, client_info, bytes); + let chuck = Chunk::new(client_info, bytes); self.registry_addr.do_send(chuck); } Ok(ws::Message::Close(reason)) => { diff --git a/wasm/Cargo.toml b/wasm/Cargo.toml index 9f6fe01b..39aa29f3 100644 --- a/wasm/Cargo.toml +++ b/wasm/Cargo.toml @@ -18,6 +18,7 @@ wasm-bindgen = "0.2.63" js-sys = "0.3.46" console_error_panic_hook = { version = "0.1.6", optional = true } web-sys = { version = "0.3.46", features = [ + "MediaTrackSupportedConstraints", "MediaRecorderOptions", "HtmlVideoElement", "BlobEvent", diff --git a/wasm/src/init.rs b/wasm/src/init.rs index ed38c119..3bcf8e84 100644 --- a/wasm/src/init.rs +++ b/wasm/src/init.rs @@ -9,7 +9,7 @@ fn web_socket(orders: &mut impl Orders) -> WebSocket { WebSocket::builder(wsurl().expect("url"), orders) .on_open(|| Message::WebSocketOpened) .on_message( - |msg: WebSocketMessage| match msg.json::() { + |msg: WebSocketMessage| match msg.json::() { Ok(msg) => Message::WebSocketMsg(msg), Err(err) => Message::WebSocketError(err), }, @@ -29,6 +29,7 @@ pub fn init(_url: seed::Url, orders: &mut impl Orders) -> Model { let search_suggestions = vec![]; let query_results = vec![]; let media_stream = None; + let all_clients = vec![]; Model { search_text, search_suggestions, @@ -36,5 +37,6 @@ pub fn init(_url: seed::Url, orders: &mut impl Orders) -> Model { web_socket, web_socket_errors, media_stream, + all_clients, } } diff --git a/wasm/src/media.rs b/wasm/src/media.rs index 504ecdef..03962043 100644 --- a/wasm/src/media.rs +++ b/wasm/src/media.rs @@ -6,12 +6,16 @@ use seed::{prelude::*, *}; use wasm_bindgen_futures::JsFuture; use web_sys::{ Blob, BlobEvent, Event, EventTarget, FileReader, HtmlVideoElement, MediaDevices, MediaRecorder, - MediaRecorderOptions, MediaSource, MediaStream, MediaStreamConstraints, Navigator, Url, + MediaRecorderOptions, MediaSource, MediaStream, MediaStreamConstraints, + MediaTrackSupportedConstraints, Navigator, Url, }; pub async fn get_media_stream() -> Result { let navigator: Navigator = window().navigator(); let media_devices: MediaDevices = navigator.media_devices()?; + let supported_constraints: MediaTrackSupportedConstraints = + media_devices.get_supported_constraints(); + log!(supported_constraints); let mut constraints = MediaStreamConstraints::new(); constraints.audio(&JsValue::from_bool(true)); constraints.video(&JsValue::from_bool(true)); @@ -46,7 +50,7 @@ pub fn media_recorder( let mime_type = media_recorder.mime_type(); log!(mime_type); wc.borrow() - .send_json(&rustgym_msg::Msg::StreamStart(mime_type)) + .send_json(&rustgym_msg::MsgIn::StreamStart(mime_type)) .expect("strart stream"); }) as Box); media_recorder.set_onstart(Some(onstart_cb.as_ref().unchecked_ref())); diff --git a/wasm/src/message.rs b/wasm/src/message.rs index 35a02c51..7f534a48 100644 --- a/wasm/src/message.rs +++ b/wasm/src/message.rs @@ -1,4 +1,4 @@ -use rustgym_msg::Msg; +use rustgym_msg::*; use seed::prelude::*; use web_sys::MediaStream; @@ -7,10 +7,11 @@ pub enum Message { SearchTextChanged(String), QueryText(String), KeyDown(web_sys::KeyboardEvent), - WebSocketMsg(Msg), + WebSocketMsg(MsgOut), WebSocketClosed(CloseEvent), WebSocketError(WebSocketError), MediaStreamReady(MediaStream), + AllClients(Vec), WebSocketOpened, WebSocketFailed, } diff --git a/wasm/src/model.rs b/wasm/src/model.rs index 9661e7d7..2024eaf6 100644 --- a/wasm/src/model.rs +++ b/wasm/src/model.rs @@ -1,4 +1,4 @@ -use rustgym_msg::QueryResult; +use rustgym_msg::*; use seed::prelude::*; use std::cell::RefCell; use std::rc::Rc; @@ -11,4 +11,5 @@ pub struct Model { pub web_socket: Rc>, pub web_socket_errors: Vec, pub media_stream: Option, + pub all_clients: Vec, } diff --git a/wasm/src/update.rs b/wasm/src/update.rs index 7d1d7bc6..32d163b4 100644 --- a/wasm/src/update.rs +++ b/wasm/src/update.rs @@ -1,8 +1,7 @@ use crate::media::*; use crate::message::Message; use crate::model::Model; -use rustgym_msg::ClientInfo; -use rustgym_msg::Msg; +use rustgym_msg::*; use seed::{prelude::*, *}; use web_sys::MediaRecorder; @@ -17,7 +16,7 @@ pub fn update(msg: Message, model: &mut Model, orders: &mut impl Orders if let Err(err) = model .web_socket .borrow() - .send_json(&rustgym_msg::Msg::SearchText(search_text)) + .send_json(&rustgym_msg::MsgIn::SearchText(search_text)) { log!("error"); orders.send_msg(WebSocketError(err)); @@ -29,7 +28,7 @@ pub fn update(msg: Message, model: &mut Model, orders: &mut impl Orders if let Err(err) = model .web_socket .borrow() - .send_json(&rustgym_msg::Msg::QueryText(search_text)) + .send_json(&rustgym_msg::MsgIn::QueryText(search_text)) { log!("error"); orders.send_msg(WebSocketError(err)); @@ -45,9 +44,7 @@ pub fn update(msg: Message, model: &mut Model, orders: &mut impl Orders WebSocketMsg(msg) => { log!(msg); match msg { - Msg::Ping => {} - Msg::Pong => {} - Msg::RegistorClient(client_info) => { + MsgOut::RegistorClient(client_info) => { if client_info.chrome { orders.perform_cmd(async { let media_stream = get_media_stream().await.expect("media stream"); @@ -55,18 +52,21 @@ pub fn update(msg: Message, model: &mut Model, orders: &mut impl Orders }); } } - Msg::UnRegistorClient(_) => {} - Msg::SessionClients(_) => {} - Msg::SearchText(_) => {} - Msg::QueryText(_) => {} - Msg::StreamStart(_) => {} - Msg::SearchSuggestions(suggestions) => { + MsgOut::UnRegistorClient(_) => {} + MsgOut::SessionClients(_) => {} + MsgOut::SearchSuggestions(suggestions) => { model.search_suggestions = suggestions; } - Msg::QueryResults(mut query_results) => { + MsgOut::QueryResults(mut query_results) => { query_results.reverse(); model.query_results = query_results; } + MsgOut::AllClients(all_clients) => { + orders.send_msg(Message::AllClients(all_clients)); + } + _ => { + log!("error", msg); + } } } WebSocketError(err) => { @@ -87,5 +87,8 @@ pub fn update(msg: Message, model: &mut Model, orders: &mut impl Orders media_recorder(&media_stream, model).expect("media recorder"); model.media_stream = Some(media_stream); } + AllClients(all_clients) => { + model.all_clients = all_clients; + } } } diff --git a/wasm/src/view.rs b/wasm/src/view.rs index 8831049b..4ddb5632 100644 --- a/wasm/src/view.rs +++ b/wasm/src/view.rs @@ -73,8 +73,23 @@ pub fn view(model: &Model) -> Node { ) ], ], - video![ - C!["video"] - ], + model.all_clients.iter().filter(|client| client.streaming).map(|client| + div![ + div![client.client_uuid.to_string()], + video![ + C!["video"], + source![ + attrs!{ + At::Src => format!("/stream/{}.m3u8", client.client_uuid.to_string()), + At::Type => "application/x-mpegURL" + } + ], + attrs!{ + At::AutoPlay => true, + At::Controls => true, + } + ] + ] + ) ] }