Skip to content

Commit

Permalink
fix stream folder
Browse files Browse the repository at this point in the history
  • Loading branch information
warycat committed Apr 23, 2021
1 parent 33204d9 commit b4612e6
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 21 deletions.
103 changes: 100 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion consts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ pub const SEARCH_PLACEHOLDER: &str = "Search Rust Solutions";

pub const TIME_SLICE: i32 = 100;
pub const MIME_TYPE: &str = "video/x-matroska;codecs=avc1,opus";
pub const STREAM_DIR: &str = "/Users/larry/Documents/rustgym/stream";
pub const STREAM_DIR: &str = "./stream";
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
sonic-channel = "0.4.0"
tokio = { version = "0.2", features = ["full"] }
uaparser = "0.4.0"
notify = "4.0.16"
52 changes: 39 additions & 13 deletions server/src/agents/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,23 @@ use crate::agents::envelope::Envelope;
use crate::agents::search::SearchAgent;
use crate::agents::websocket::SocketClient;
use actix::prelude::*;
use anyhow::Result;
use log::{error, info};
use notify::op;
use notify::{raw_watcher, RawEvent, RecursiveMode, Watcher};
use rustgym_consts::*;
use rustgym_msg::ClientInfo;
use rustgym_msg::*;
use std::cell::RefCell;
use std::collections::HashMap;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::process::{Child, Command, Stdio};
use std::rc::Rc;
use std::sync::mpsc;
use std::thread::sleep;
use std::time::Duration;
use uuid::Uuid;

#[derive(Clone)]
Expand Down Expand Up @@ -88,7 +96,7 @@ impl Actor for RegistryAgent {
impl Handler<Envelope> for RegistryAgent {
type Result = ();

fn handle(&mut self, envelope: Envelope, _ctx: &mut Context<Self>) {
fn handle(&mut self, envelope: Envelope, ctx: &mut Context<Self>) {
let Envelope {
client_addr,
client_info,
Expand All @@ -107,8 +115,14 @@ impl Handler<Envelope> for RegistryAgent {
}
MsgIn::StreamStart(mime_type) => {
if mime_type == MIME_TYPE {
info!("{}", client_uuid);
let playlist = format!("{}/{}.m3u8", STREAM_DIR, client_uuid);
let hls_dir = format!("{}/{}", STREAM_DIR, client_uuid);
Command::new("mkdir")
.arg(&hls_dir)
.output()
.expect("hls dir");
let playlist_path_str =
format!("{}/{}/playlist.m3u8", STREAM_DIR, client_uuid);
let playlist_path = Path::new(&playlist_path_str);
let ffmpeg = Command::new("ffmpeg")
.args(&[
"-i",
Expand All @@ -121,19 +135,21 @@ impl Handler<Envelope> for RegistryAgent {
"0.1",
"-hls_flags",
"delete_segments",
&playlist,
&playlist_path_str,
])
.stdin(Stdio::piped())
.spawn()
.expect("ffmpeg");
self.all_streams
.insert(client_uuid, Rc::new(RefCell::new(ffmpeg)));
if let Some(client_info) = self.all_clients.get_mut(&client_uuid) {
client_info.streaming = true;
}
let all_clients = self.all_clients();
let msg_out = MsgOut::AllClients(all_clients);
self.update_all_clients(msg_out);
ctx.run_later(Duration::from_secs(3), move |act, _| {
if let Some(client_info) = act.all_clients.get_mut(&client_uuid) {
client_info.streaming = true;
}
let all_clients = act.all_clients();
let msg_out = MsgOut::AllClients(all_clients);
act.update_all_clients(msg_out);
});
}
}
_ => {
Expand Down Expand Up @@ -174,13 +190,23 @@ impl Handler<Chunk> for RegistryAgent {
let client_uuid = client_info.client_uuid;
if let Some(ffmpeg) = self.all_streams.get(&client_uuid) {
match ffmpeg.borrow().stdin.as_ref().unwrap().write_all(&bytes) {
Ok(_) => {
info!("{}", bytes.len())
}
Ok(_) => {}
Err(e) => {
error!("{}", e);
}
}
}
}
}

fn wait_until_file_created(file_path: &Path) -> bool {
for i in 0..10 {
info!("{} {:?} {}", i, file_path, file_path.is_file());
if file_path.exists() {
return true;
} else {
sleep(Duration::from_secs(1));
}
}
false
}
1 change: 0 additions & 1 deletion server/src/agents/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for SocketClient {
}
}
Ok(ws::Message::Binary(bytes)) => {
info!("{} {:?}", self.client_info.client_uuid, bytes.len());
let client_info = self.client_info.clone();
let chuck = Chunk::new(client_info, bytes);
self.registry_addr.do_send(chuck);
Expand Down
6 changes: 5 additions & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use db::*;
use log::info;
use rustgym_consts::*;
use std::env;
use std::process::Command;

#[actix_web::main]
async fn main() -> std::io::Result<()> {
Expand All @@ -37,7 +38,10 @@ async fn main() -> std::io::Result<()> {
let search_addr = SearchAgent::new(pool.clone()).start();
let registry_addr = RegistryAgent::new(search_addr).start();
let uap_addr = UapAgent::new().start();

Command::new("mkdir")
.arg(STREAM_DIR)
.output()
.expect("Create Stream Dir");
info!("RUST GYM Server {}", tag);
HttpServer::new(move || {
App::new()
Expand Down
1 change: 0 additions & 1 deletion wasm/src/media.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ pub fn media_recorder(
let mut bytes = vec![0; size as usize];
arr.copy_to(&mut bytes);
wcc.borrow().send_bytes(&bytes).expect("send bytes");
log!(bytes.len());
}) as Box<dyn FnMut(Event)>);
file_reader.set_onload(Some(onload.as_ref().unchecked_ref()));
onload.forget();
Expand Down
2 changes: 1 addition & 1 deletion wasm/src/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub fn view(model: &Model) -> Node<Message> {
C!["video"],
source![
attrs!{
At::Src => format!("/stream/{}.m3u8", client.client_uuid.to_string()),
At::Src => format!("/stream/{}/playlist.m3u8", client.client_uuid.to_string()),
At::Type => "application/x-mpegURL"
}
],
Expand Down

0 comments on commit b4612e6

Please sign in to comment.