Skip to content

Commit

Permalink
turn off streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
warycat committed Apr 24, 2021
1 parent b4612e6 commit 5a7591f
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 61 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions infra/create_instance.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
TAG=v0.3.0
VM_NAME=rustgym-25
TAG=v0.3.2
VM_NAME=rustgym-27
SERVER_NAME=rustgym.com
WORK_DIR=/root
[email protected]
Expand Down Expand Up @@ -52,7 +52,6 @@ curl -LJO $RUSTGYM_DOWNLOAD/$TAG/rustgym-ingest
chmod u+x rustgym-ingest
./rustgym-ingest >> ingest.log &>> ingest.error.log &
chmod u+x rustgym-server
mkdir stream
git clone https://github.com/ua-parser/uap-core.git
TAG=$TAG ./rustgym-server >> server.log &>> server.error.log &
Expand Down
6 changes: 6 additions & 0 deletions msg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,9 @@ pub struct QueryResult {
pub href: String,
pub from: String,
}

#[derive(Serialize, Deserialize, Debug, Clone, new)]
pub struct MsgBin {
pub uuid: Uuid,
pub bytes: Vec<u8>,
}
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,4 @@ sonic-channel = "0.4.0"
tokio = { version = "0.2", features = ["full"] }
uaparser = "0.4.0"
notify = "4.0.16"
bincode = "1.3.3"
36 changes: 15 additions & 21 deletions server/src/agents/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,15 @@ use crate::agents::envelope::Envelope;
use crate::agents::search::SearchAgent;
use crate::agents::websocket::SocketClient;
use actix::prelude::*;
use anyhow::Result;
use log::{error, info};
use notify::op;
use notify::{raw_watcher, RawEvent, RecursiveMode, Watcher};
use rustgym_consts::*;
use rustgym_msg::ClientInfo;
use rustgym_msg::*;
use std::cell::RefCell;
use std::collections::HashMap;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::process::{Child, Command, Stdio};
use std::rc::Rc;
use std::sync::mpsc;
use std::thread::sleep;
use std::time::Duration;
use uuid::Uuid;

Expand Down Expand Up @@ -87,6 +80,12 @@ impl RegistryAgent {
}
}
}

fn boardcast_chunk(&self, chunk: Chunk) {
for socket in self.all_sockets.values() {
socket.do_send(chunk.clone());
}
}
}

impl Actor for RegistryAgent {
Expand Down Expand Up @@ -122,7 +121,6 @@ impl Handler<Envelope> for RegistryAgent {
.expect("hls dir");
let playlist_path_str =
format!("{}/{}/playlist.m3u8", STREAM_DIR, client_uuid);
let playlist_path = Path::new(&playlist_path_str);
let ffmpeg = Command::new("ffmpeg")
.args(&[
"-i",
Expand Down Expand Up @@ -187,9 +185,17 @@ impl Handler<Chunk> for RegistryAgent {

fn handle(&mut self, chunk: Chunk, _ctx: &mut Context<Self>) {
let Chunk { client_info, bytes } = chunk.clone();
let msg_bin: MsgBin = bincode::deserialize(&bytes).unwrap();
let client_uuid = client_info.client_uuid;
self.boardcast_chunk(chunk);
if let Some(ffmpeg) = self.all_streams.get(&client_uuid) {
match ffmpeg.borrow().stdin.as_ref().unwrap().write_all(&bytes) {
match ffmpeg
.borrow()
.stdin
.as_ref()
.unwrap()
.write_all(&msg_bin.bytes)
{
Ok(_) => {}
Err(e) => {
error!("{}", e);
Expand All @@ -198,15 +204,3 @@ impl Handler<Chunk> for RegistryAgent {
}
}
}

fn wait_until_file_created(file_path: &Path) -> bool {
for i in 0..10 {
info!("{} {:?} {}", i, file_path, file_path.is_file());
if file_path.exists() {
return true;
} else {
sleep(Duration::from_secs(1));
}
}
false
}
8 changes: 8 additions & 0 deletions server/src/agents/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,11 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for SocketClient {
}
}
}

impl Handler<Chunk> for SocketClient {
type Result = ();

fn handle(&mut self, chunk: Chunk, ctx: &mut Self::Context) {
ctx.binary(chunk.bytes);
}
}
1 change: 1 addition & 0 deletions wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ serde_json = "1.0.62"
uuid = { version = "0.8", features = ["serde", "v4", "wasm-bindgen"] }
seed = "0.8.0"
anyhow = "1.0.38"
bincode = "1.3.3"

[dev-dependencies]
wasm-bindgen-test = "0.3.13"
32 changes: 25 additions & 7 deletions wasm/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
use crate::message::Message;
use crate::model::Model;
use crate::utils::*;
use seed::prelude::*;
use rustgym_msg::*;
use seed::{prelude::*, *};
use std::cell::RefCell;
use std::rc::Rc;

fn decode_message(message: WebSocketMessage, msg_sender: Rc<dyn Fn(Option<Message>)>) {
if message.contains_text() {
let msg_out = message
.json::<rustgym_msg::MsgOut>()
.expect("Failed to decode WebSocket text message");
msg_sender(Some(Message::WebSocketMsgOut(msg_out)));
} else {
spawn_local(async move {
let bytes: Vec<u8> = message
.bytes()
.await
.expect("WebsocketError on binary data");

let msg_bin: MsgBin = bincode::deserialize(&bytes).unwrap();
msg_sender(Some(Message::WebSocketMsgBin(msg_bin)));
});
}
}

fn web_socket(orders: &mut impl Orders<Message>) -> WebSocket {
let msg_sender = orders.msg_sender();
WebSocket::builder(wsurl().expect("url"), orders)
.on_open(|| Message::WebSocketOpened)
.on_message(
|msg: WebSocketMessage| match msg.json::<rustgym_msg::MsgOut>() {
Ok(msg) => Message::WebSocketMsg(msg),
Err(err) => Message::WebSocketError(err),
},
)
.on_message(|msg: WebSocketMessage| decode_message(msg, msg_sender))
.on_close(Message::WebSocketClosed)
.on_error(|| Message::WebSocketFailed)
.build_and_open()
Expand All @@ -23,6 +39,7 @@ fn web_socket(orders: &mut impl Orders<Message>) -> WebSocket {
pub fn init(_url: seed::Url, orders: &mut impl Orders<Message>) -> Model {
orders.subscribe(Message::UrlChanged);

let client_info = None;
let web_socket = Rc::new(RefCell::new(web_socket(orders)));
let search_text = "".to_string();
let web_socket_errors = vec![];
Expand All @@ -31,6 +48,7 @@ pub fn init(_url: seed::Url, orders: &mut impl Orders<Message>) -> Model {
let media_stream = None;
let all_clients = vec![];
Model {
client_info,
search_text,
search_suggestions,
query_results,
Expand Down
6 changes: 5 additions & 1 deletion wasm/src/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::model::Model;

use js_sys::{ArrayBuffer, Uint8Array};
use rustgym_consts::*;
use rustgym_msg::*;
use seed::{prelude::*, *};
use wasm_bindgen_futures::JsFuture;
use web_sys::{
Expand Down Expand Up @@ -53,6 +54,7 @@ pub fn media_recorder(
onstart_cb.forget();

let wc = model.web_socket.clone();
let uuid = model.client_info.as_ref().unwrap().client_uuid;
let ondataavailable_cb = Closure::wrap(Box::new(move |event: BlobEvent| {
let blob: Blob = event.data().unwrap();
let wcc = wc.clone();
Expand All @@ -65,7 +67,9 @@ pub fn media_recorder(
let size = buf.byte_length();
let mut bytes = vec![0; size as usize];
arr.copy_to(&mut bytes);
wcc.borrow().send_bytes(&bytes).expect("send bytes");
let msg_bin = MsgBin::new(uuid, bytes);
let encoded: Vec<u8> = bincode::serialize(&msg_bin).unwrap();
wcc.borrow().send_bytes(&encoded).expect("send bytes");
}) as Box<dyn FnMut(Event)>);
file_reader.set_onload(Some(onload.as_ref().unchecked_ref()));
onload.forget();
Expand Down
3 changes: 2 additions & 1 deletion wasm/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ pub enum Message {
SearchTextChanged(String),
QueryText(String),
KeyDown(web_sys::KeyboardEvent),
WebSocketMsg(MsgOut),
WebSocketMsgOut(MsgOut),
WebSocketMsgBin(MsgBin),
WebSocketClosed(CloseEvent),
WebSocketError(WebSocketError),
MediaStreamReady(MediaStream),
Expand Down
1 change: 1 addition & 0 deletions wasm/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::rc::Rc;
use web_sys::MediaStream;

pub struct Model {
pub client_info: Option<ClientInfo>,
pub search_text: String,
pub search_suggestions: Vec<String>,
pub query_results: Vec<QueryResult>,
Expand Down
24 changes: 14 additions & 10 deletions wasm/src/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ pub fn update(msg: Message, model: &mut Model, orders: &mut impl Orders<Message>
orders.send_msg(Message::QueryText(model.search_text.to_string()));
}
}
WebSocketMsg(msg) => {
log!(msg);
match msg {
WebSocketMsgOut(msg_out) => {
log!(msg_out);
match msg_out {
MsgOut::RegistorClient(client_info) => {
if client_info.chrome {
orders.perform_cmd(async {
let media_stream = get_media_stream().await.expect("media stream");
Message::MediaStreamReady(media_stream)
});
}
// if client_info.chrome {
// orders.perform_cmd(async {
// let media_stream = get_media_stream().await.expect("media stream");
// Message::MediaStreamReady(media_stream)
// });
// }
model.client_info = Some(client_info);
}
MsgOut::UnRegistorClient(_) => {}
MsgOut::SessionClients(_) => {}
Expand All @@ -65,10 +66,13 @@ pub fn update(msg: Message, model: &mut Model, orders: &mut impl Orders<Message>
orders.send_msg(Message::AllClients(all_clients));
}
_ => {
log!("error", msg);
log!("error", msg_out);
}
}
}
WebSocketMsgBin(msg_bin) => {
log!(msg_bin.uuid);
}
WebSocketError(err) => {
model.web_socket_errors.push(err);
}
Expand Down
36 changes: 18 additions & 18 deletions wasm/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,23 @@ pub fn view(model: &Model) -> Node<Message> {
)
],
],
model.all_clients.iter().filter(|client| client.streaming).map(|client|
div![
div![client.client_uuid.to_string()],
video![
C!["video"],
source![
attrs!{
At::Src => format!("/stream/{}/playlist.m3u8", client.client_uuid.to_string()),
At::Type => "application/x-mpegURL"
}
],
attrs!{
At::AutoPlay => true,
At::Controls => true,
}
]
]
)
// model.all_clients.iter().filter(|client| client.streaming).map(|client|
// div![
// div![client.client_uuid.to_string()],
// video![
// C!["video"],
// source![
// attrs!{
// At::Src => format!("/stream/{}/playlist.m3u8", client.client_uuid.to_string()),
// At::Type => "application/x-mpegURL"
// }
// ],
// attrs!{
// At::AutoPlay => true,
// At::Controls => true,
// }
// ]
// ]
// ),
]
}

0 comments on commit 5a7591f

Please sign in to comment.