diff --git a/masq/src/communications/broadcast_handler.rs b/masq/src/communications/broadcast_handler.rs index 23f829d0a..ebc2b2d76 100644 --- a/masq/src/communications/broadcast_handler.rs +++ b/masq/src/communications/broadcast_handler.rs @@ -2,15 +2,16 @@ use crate::commands::change_password_command::ChangePasswordCommand; use crate::commands::setup_command::SetupCommand; -use crate::communications::{ - handle_node_is_dead_while_f_f_on_the_way_broadcast, handle_unrecognized_broadcast, +use crate::communications::broadcast_tools::tools::{ + handle_node_is_dead_while_f_f_on_the_way_broadcast, handle_ui_log_broadcast, + handle_unrecognized_broadcast, }; use crate::notifications::crashed_notification::CrashNotifier; use crate::terminal::terminal_interface::TerminalWrapper; use crossbeam_channel::{unbounded, RecvError, Sender}; use masq_lib::messages::{ - FromMessageBody, UiNewPasswordBroadcast, UiNodeCrashedBroadcast, UiSetupBroadcast, - UiUndeliveredFireAndForget, + FromMessageBody, UiLogBroadcast, UiNewPasswordBroadcast, UiNodeCrashedBroadcast, + UiSetupBroadcast, UiUndeliveredFireAndForget, }; use masq_lib::ui_gateway::MessageBody; use masq_lib::utils::ExpectValue; @@ -106,6 +107,8 @@ impl BroadcastHandlerReal { stdout, terminal_interface, ); + } else if let Ok((body, _)) = UiLogBroadcast::fmb(message_body.clone()) { + handle_ui_log_broadcast(body, stdout, terminal_interface) } else { handle_unrecognized_broadcast(message_body, stderr, terminal_interface) } @@ -148,7 +151,9 @@ mod tests { TerminalPassiveMock, TestStreamFactory, }; use crossbeam_channel::{bounded, unbounded, Receiver}; - use masq_lib::messages::{CrashReason, ToMessageBody, UiNodeCrashedBroadcast}; + use masq_lib::messages::{ + CrashReason, SerializableLogLevel, ToMessageBody, UiLogBroadcast, UiNodeCrashedBroadcast, + }; use masq_lib::messages::{UiSetupBroadcast, UiSetupResponseValue, UiSetupResponseValueStatus}; use masq_lib::ui_gateway::MessagePath; use std::sync::Arc; @@ -186,6 +191,34 @@ mod tests { ); } + #[test] + fn broadcast_of_ui_log_was_successful() { + let (factory, handle) = TestStreamFactory::new(); + let subject = BroadcastHandlerReal::new(Some(TerminalWrapper::new(Arc::new( + TerminalPassiveMock::new(), + )))) + .start(Box::new(factory)); + let message = masq_lib::messages::UiLogBroadcast { + msg: "Empty. No Nodes to report to; continuing".to_string(), + log_level: SerializableLogLevel::Info, + } + .tmb(0); + + subject.send(message); + + let stdout = handle.stdout_so_far(); + assert_eq!( + stdout, + "\nInfo: Empty. No Nodes to report to; continuing\n\n", + ); + assert_eq!( + handle.stderr_so_far(), + "".to_string(), + "stderr: '{}'", + stdout + ); + } + #[test] fn broadcast_of_crashed_triggers_correct_handler() { let (factory, handle) = TestStreamFactory::new(); @@ -448,9 +481,21 @@ Cannot handle crash request: Node is not running. ) } + #[test] + fn ui_log_broadcast_handle_has_a_synchronizer_correctly_implemented() { + let ui_log_broadcast = UiLogBroadcast { + msg: "Empty. No Nodes to report to; continuing".to_string(), + log_level: SerializableLogLevel::Info, + }; + + let broadcast_output = "\nInfo: Empty. No Nodes to report to; continuing\n\n"; + + assertion_for_handle_broadcast(handle_ui_log_broadcast, ui_log_broadcast, broadcast_output) + } + fn assertion_for_handle_broadcast( broadcast_handler: F, - broadcast_message_body: U, + broadcast_body: U, broadcast_desired_output: &str, ) where F: FnOnce(U, &mut dyn Write, &TerminalWrapper) + Copy, @@ -470,7 +515,7 @@ Cannot handle crash request: Node is not running. Box::new(stdout_clone), synchronizer, broadcast_handler, - broadcast_message_body.clone(), + broadcast_body.clone(), rx.clone(), ); @@ -492,7 +537,7 @@ Cannot handle crash request: Node is not running. Box::new(stdout_second_clone), synchronizer_clone_idle, broadcast_handler, - broadcast_message_body, + broadcast_body, rx, ); diff --git a/masq/src/communications/broadcast_tools.rs b/masq/src/communications/broadcast_tools.rs new file mode 100644 index 000000000..10609d030 --- /dev/null +++ b/masq/src/communications/broadcast_tools.rs @@ -0,0 +1,46 @@ +// Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. + +pub(in crate::communications) mod tools { + use crate::terminal::terminal_interface::TerminalWrapper; + use masq_lib::messages::{UiLogBroadcast, UiUndeliveredFireAndForget}; + use masq_lib::short_writeln; + use masq_lib::ui_gateway::MessageBody; + use std::io::Write; + + pub fn handle_node_is_dead_while_f_f_on_the_way_broadcast( + body: UiUndeliveredFireAndForget, + stdout: &mut dyn Write, + term_interface: &TerminalWrapper, + ) { + let _lock = term_interface.lock(); + short_writeln!( + stdout, + "\nCannot handle {} request: Node is not running.\n", + body.opcode + ); + stdout.flush().expect("flush failed"); + } + + pub fn handle_unrecognized_broadcast( + message_body: MessageBody, + stderr: &mut dyn Write, + term_interface: &TerminalWrapper, + ) { + let _lock = term_interface.lock(); + short_writeln!( + stderr, + "Discarding unrecognized broadcast with opcode '{}'\n", + message_body.opcode + ) + } + + pub fn handle_ui_log_broadcast( + body: UiLogBroadcast, + stdout: &mut dyn Write, + term_interface: &TerminalWrapper, + ) { + let _lock = term_interface.lock(); + write!(stdout, "\n{:?}: {}\n\n", body.log_level, body.msg).expect("write! failed"); + stdout.flush().expect("flush failed"); + } +} diff --git a/masq/src/communications/mod.rs b/masq/src/communications/mod.rs index 6895fa291..f9de0989a 100644 --- a/masq/src/communications/mod.rs +++ b/masq/src/communications/mod.rs @@ -1,38 +1,6 @@ // Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. pub mod broadcast_handler; +pub mod broadcast_tools; mod client_listener_thread; pub mod connection_manager; pub mod node_conversation; - -use crate::terminal::terminal_interface::TerminalWrapper; -use masq_lib::messages::UiUndeliveredFireAndForget; -use masq_lib::short_writeln; -use masq_lib::ui_gateway::MessageBody; -use std::io::Write; - -fn handle_node_is_dead_while_f_f_on_the_way_broadcast( - body: UiUndeliveredFireAndForget, - stdout: &mut dyn Write, - term_interface: &TerminalWrapper, -) { - let _lock = term_interface.lock(); - short_writeln!( - stdout, - "\nCannot handle {} request: Node is not running.\n", - body.opcode - ); - stdout.flush().expect("flush failed"); -} - -fn handle_unrecognized_broadcast( - message_body: MessageBody, - stderr: &mut dyn Write, - term_interface: &TerminalWrapper, -) { - let _lock = term_interface.lock(); - short_writeln!( - stderr, - "Discarding unrecognized broadcast with opcode '{}'\n", - message_body.opcode - ) -} diff --git a/masq_lib/Cargo.toml b/masq_lib/Cargo.toml index fa82da2b4..5135ef496 100644 --- a/masq_lib/Cargo.toml +++ b/masq_lib/Cargo.toml @@ -28,6 +28,7 @@ websocket = {version = "0.26.2", default-features = false, features = ["sync"]} [features] no_test_share = [] +log_recipient_test = [] [target.'cfg(not(target_os = "windows"))'.dependencies] nix = "0.23.0" diff --git a/masq_lib/src/logger.rs b/masq_lib/src/logger.rs index 29cab70f2..9e3c1fb5a 100644 --- a/masq_lib/src/logger.rs +++ b/masq_lib/src/logger.rs @@ -1,10 +1,50 @@ // Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. +use crate::messages::SerializableLogLevel; +#[cfg(not(feature = "log_recipient_test"))] +use crate::messages::{ToMessageBody, UiLogBroadcast}; +#[cfg(not(feature = "log_recipient_test"))] +use crate::ui_gateway::MessageTarget; +use crate::ui_gateway::NodeToUiMessage; +use actix::Recipient; +use lazy_static::lazy_static; use log::logger; use log::Level; #[allow(unused_imports)] use log::Metadata; #[allow(unused_imports)] use log::Record; +use std::sync::Mutex; + +lazy_static! { + pub static ref LOG_RECIPIENT_OPT: Mutex>> = Mutex::new(None); +} + +const UI_MESSAGE_LOG_LEVEL: Level = Level::Info; + +#[cfg(not(feature = "log_recipient_test"))] +pub fn prepare_log_recipient(recipient: Recipient) { + if LOG_RECIPIENT_OPT + .lock() + .expect("log recipient poisoned") + .replace(recipient) + .is_some() + { + panic!("Log recipient should be initiated only once") + } +} + +#[cfg(feature = "log_recipient_test")] +pub struct Counter(pub usize); + +#[cfg(feature = "log_recipient_test")] +lazy_static! { + pub static ref INITIALIZATION_COUNTER: Mutex = Mutex::new(Counter(0)); +} + +#[cfg(feature = "log_recipient_test")] +pub fn prepare_log_recipient(_recipient: Recipient) { + INITIALIZATION_COUNTER.lock().unwrap().0 += 1; +} #[derive(Clone)] pub struct Logger { @@ -132,13 +172,36 @@ impl Logger { where F: FnOnce() -> String, { - if !self.level_enabled(level) { - return; + match (self.level_enabled(level), level.le(&UI_MESSAGE_LOG_LEVEL)) { + (true, true) => { + let msg = log_function(); + Self::transmit(msg.clone(), level.into()); + self.log(level, msg); + } + (true, false) => self.log(level, log_function()), + (false, true) => Self::transmit(log_function(), level.into()), + _ => {} } - let string = log_function(); - self.log(level, string) } + #[cfg(not(feature = "log_recipient_test"))] + fn transmit(msg: String, log_level: SerializableLogLevel) { + if let Some(recipient) = LOG_RECIPIENT_OPT + .lock() + .expect("log recipient mutex poisoned") + .as_ref() + { + let actix_msg = NodeToUiMessage { + target: MessageTarget::AllClients, + body: UiLogBroadcast { msg, log_level }.tmb(0), + }; + recipient.try_send(actix_msg).expect("UiGateway is dead") + } + } + + #[cfg(feature = "log_recipient_test")] + fn transmit(_msg: String, _log_level: SerializableLogLevel) {} + pub fn log(&self, level: Level, msg: String) { logger().log( &Record::builder() @@ -150,6 +213,17 @@ impl Logger { } } +impl From for SerializableLogLevel { + fn from(native_level: Level) -> Self { + match native_level { + Level::Error => SerializableLogLevel::Error, + Level::Warn => SerializableLogLevel::Warn, + Level::Info => SerializableLogLevel::Info, + _ => panic!("The level you're converting is below log broadcast level."), + } + } +} + #[cfg(feature = "no_test_share")] impl Logger { pub fn level_enabled(&self, level: Level) -> bool { @@ -168,21 +242,400 @@ impl Logger { } } +#[cfg(not(feature = "no_test_share"))] +lazy_static! { + pub static ref TEST_LOG_RECIPIENT_GUARD: Mutex<()> = Mutex::new(()); +} + #[cfg(test)] mod tests { use super::*; + use crate::messages::{ToMessageBody, UiLogBroadcast}; use crate::test_utils::logging::init_test_logging; use crate::test_utils::logging::TestLogHandler; + use crate::ui_gateway::{MessageBody, MessagePath}; + use actix::{Actor, AsyncContext, Context, Handler, Message, System}; use chrono::format::StrftimeItems; use chrono::{DateTime, Local}; - use std::sync::{Arc, Mutex}; + use crossbeam_channel::{unbounded, Sender}; + use std::panic::{catch_unwind, AssertUnwindSafe}; + use std::sync::{Arc, Barrier, Mutex}; use std::thread; use std::thread::ThreadId; - use std::time::SystemTime; + use std::time::{Duration, SystemTime}; + + struct TestUiGateway { + received_messages: Arc>>, + expected_msg_count: usize, + } + + impl TestUiGateway { + fn new(msg_count: usize, recording_arc: &Arc>>) -> Self { + Self { + received_messages: recording_arc.clone(), + expected_msg_count: msg_count, + } + } + } + + impl Actor for TestUiGateway { + type Context = Context; + } + + impl Handler for TestUiGateway { + type Result = (); + + fn handle(&mut self, msg: NodeToUiMessage, _ctx: &mut Self::Context) -> Self::Result { + let mut inner = self.received_messages.lock().unwrap(); + inner.push(msg); + if inner.len() == self.expected_msg_count { + System::current().stop(); + } + } + } + + #[derive(Message)] + struct ScheduleStop { + timeout: Duration, + } + + #[derive(Message)] + struct Stop {} + + impl Handler for TestUiGateway { + type Result = (); + + fn handle(&mut self, msg: ScheduleStop, ctx: &mut Self::Context) -> Self::Result { + ctx.set_mailbox_capacity(0); //this is important + ctx.notify_later(Stop {}, msg.timeout); + } + } + + impl Handler for TestUiGateway { + type Result = (); + + fn handle(&mut self, _msg: Stop, _ctx: &mut Self::Context) -> Self::Result { + System::current().stop() + } + } + + lazy_static! { + static ref SENDER: Mutex>> = Mutex::new(None); + } + + #[test] + fn transmit_log_handles_overloading_by_sending_msgs_from_multiple_threads() { + let _test_guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + let number_of_sent_msgs_in_total = 10000; + let factor = match f64::sqrt(number_of_sent_msgs_in_total as f64) { + x if x.fract() == 0.0 => x as usize, + _ => panic!("we expected a square number"), + }; + let (tx, rx) = unbounded(); + { + SENDER.lock().unwrap().replace(tx); + } + let (template_before, template_after) = { + let before = SystemTime::now(); + overloading_function( + move || { + SENDER + .lock() + .unwrap() + .as_ref() + .unwrap() + .clone() + .send(create_msg()) + .unwrap(); + }, + DoAllAtOnce { factor }, + ); + let mut counter = 0; + loop { + rx.recv().unwrap(); + counter += 1; + if counter == number_of_sent_msgs_in_total { + break; + } + } + let after = SystemTime::now(); + (before, after) + }; + let labour_time_example = template_after.duration_since(template_before).unwrap(); + let recording_arc = Arc::new(Mutex::new(vec![])); + let ui_gateway = TestUiGateway::new(number_of_sent_msgs_in_total, &recording_arc); + let system = System::new("test_system"); + let addr = ui_gateway.start(); + let recipient = addr.clone().recipient(); + { + LOG_RECIPIENT_OPT.lock().unwrap().replace(recipient); + } + addr.try_send(ScheduleStop { + timeout: Duration::from_secs(8), + }) + .unwrap(); + + addr.try_send(DoAllAtOnce { factor }).unwrap(); + + let (actual_start, actual_end) = { + let start = SystemTime::now(); + system.run(); + let end = SystemTime::now(); + (start, end) + }; + let recording = recording_arc.lock().unwrap(); + assert_eq!(recording.len(), number_of_sent_msgs_in_total); + let measured = actual_end.duration_since(actual_start).unwrap(); + let safe_estimation = labour_time_example * 3; + eprintln!("measured {:?}, template {:?}", measured, safe_estimation); + assert!(measured < safe_estimation) //this should pass even on slow machines + } + + #[derive(Message)] + struct DoAllAtOnce { + factor: usize, + } + + impl Handler for TestUiGateway { + type Result = (); + + fn handle(&mut self, msg: DoAllAtOnce, _ctx: &mut Self::Context) -> Self::Result { + overloading_function(send_message_to_recipient, msg) + } + } + + fn overloading_function(closure: C, msg: DoAllAtOnce) + where + C: Fn() + Send + 'static + Clone, + { + let barrier_arc = Arc::new(Barrier::new(msg.factor)); + let mut join_handle_vector = Vec::new(); + (0..msg.factor).for_each(|_| { + let barrier_arc_clone = Arc::clone(&barrier_arc); + let closure_clone = closure.clone(); + join_handle_vector.push(thread::spawn(move || { + barrier_arc_clone.wait(); + (0..msg.factor).for_each(|_| closure_clone()) + })) + }); + } + + fn create_msg() -> NodeToUiMessage { + NodeToUiMessage { + target: MessageTarget::AllClients, + body: MessageBody { + opcode: "whatever".to_string(), + path: MessagePath::FireAndForget, + payload: Ok(String::from("our message")), + }, + } + } + fn send_message_to_recipient() { + let recipient = LOG_RECIPIENT_OPT.lock().unwrap(); + recipient.as_ref().unwrap().try_send(create_msg()).unwrap() + } + + #[test] + fn prepare_log_recipient_works() { + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + { + LOG_RECIPIENT_OPT.lock().unwrap().take(); + } + let message_container_arc = Arc::new(Mutex::new(vec![])); + let system = System::new("prepare log recipient"); + let ui_gateway = TestUiGateway::new(0, &message_container_arc); + let recipient: Recipient = ui_gateway.start().recipient(); + + prepare_log_recipient(recipient); + + LOG_RECIPIENT_OPT + .lock() + .unwrap() + .as_ref() + .unwrap() + .try_send(create_msg()) + .unwrap(); + System::current().stop(); + system.run(); + let message_container = message_container_arc.lock().unwrap(); + assert_eq!(*message_container, vec![create_msg()]); + } + + #[test] + fn prepare_log_recipient_should_be_called_only_once_panic() { + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + { + LOG_RECIPIENT_OPT.lock().unwrap().take(); + } + let ui_gateway = TestUiGateway::new(0, &Arc::new(Mutex::new(vec![]))); + let recipient: Recipient = ui_gateway.start().recipient(); + prepare_log_recipient(recipient.clone()); + + let caught_panic = + catch_unwind(AssertUnwindSafe(|| prepare_log_recipient(recipient))).unwrap_err(); + + let panic_message = caught_panic.downcast_ref::<&str>().unwrap(); + assert_eq!( + *panic_message, + "Log recipient should be initiated only once" + ) + } + + #[test] + fn conversion_between_different_level_types_is_enabled() { + assert_eq!( + SerializableLogLevel::from(Level::Error), + SerializableLogLevel::Error + ); + assert_eq!( + SerializableLogLevel::from(Level::Warn), + SerializableLogLevel::Warn + ); + assert_eq!( + SerializableLogLevel::from(Level::Info), + SerializableLogLevel::Info + ); + } + + #[test] + #[should_panic(expected = "The level you're converting is below log broadcast level.")] + fn conversion_between_levels_below_log_broadcast_level_should_panic() { + let level_below_broadcast_level = Level::Debug; + let _serializable_level_below_broadcast_level: SerializableLogLevel = + level_below_broadcast_level.into(); + } + + #[test] + fn transmit_fn_can_handle_no_recipients() { + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + { + LOG_RECIPIENT_OPT.lock().unwrap().take(); + } + let system = System::new("Trying to transmit with no recipient"); + + Logger::transmit("Some message".to_string(), Level::Warn.into()); + + System::current().stop(); + system.run(); + } + + #[test] + fn generic_log_when_neither_logging_nor_transmitting() { + init_test_logging(); + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + let logger = make_logger_at_level(Level::Debug); + let system = System::new("Neither Logging, Nor Transmitting"); + let ui_gateway_recording_arc = Arc::new(Mutex::new(vec![])); + let ui_gateway = TestUiGateway::new(0, &ui_gateway_recording_arc); + let recipient = ui_gateway.start().recipient(); + { + LOG_RECIPIENT_OPT.lock().unwrap().replace(recipient); + } + let log_function = move || "This is a trace log.".to_string(); + + logger.trace(log_function); + + System::current().stop(); + system.run(); + let ui_gateway_recording = ui_gateway_recording_arc.lock().unwrap(); + assert_eq!(*ui_gateway_recording, vec![]); + TestLogHandler::new().exists_no_log_containing("This is a trace log."); + } + + #[test] + fn generic_log_when_only_logging() { + init_test_logging(); + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + let logger = make_logger_at_level(Level::Debug); + let system = System::new("Only Logging, Not Transmitting"); + let ui_gateway_recording_arc = Arc::new(Mutex::new(vec![])); + let ui_gateway = TestUiGateway::new(0, &ui_gateway_recording_arc); + let recipient = ui_gateway.start().recipient(); + { + LOG_RECIPIENT_OPT.lock().unwrap().replace(recipient); + } + let log_function = move || "This is a debug log.".to_string(); + + logger.debug(log_function); + + System::current().stop(); + system.run(); + let ui_gateway_recording = ui_gateway_recording_arc.lock().unwrap(); + assert_eq!(*ui_gateway_recording, vec![]); + TestLogHandler::new().exists_log_containing("This is a debug log."); + } + + #[test] + fn generic_log_when_only_transmitting() { + init_test_logging(); + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + let logger = make_logger_at_level(Level::Warn); + let system = System::new("transmitting but not logging"); + let ui_gateway_recording_arc = Arc::new(Mutex::new(vec![])); + let ui_gateway = TestUiGateway::new(1, &ui_gateway_recording_arc); + let recipient = ui_gateway.start().recipient(); + { + LOG_RECIPIENT_OPT.lock().unwrap().replace(recipient); + } + let log_function = move || "This is an info log.".to_string(); + + logger.info(log_function); + + system.run(); + let ui_gateway_recording = ui_gateway_recording_arc.lock().unwrap(); + assert_eq!( + *ui_gateway_recording, + vec![NodeToUiMessage { + target: MessageTarget::AllClients, + body: UiLogBroadcast { + msg: "This is an info log.".to_string(), + log_level: SerializableLogLevel::Info + } + .tmb(0) + }] + ); + TestLogHandler::new().exists_no_log_containing("This is an info log."); + } + + #[test] + fn generic_log_when_both_logging_and_transmitting() { + init_test_logging(); + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + let logger = make_logger_at_level(Level::Debug); + let system = System::new("logging ang transmitting"); + let ui_gateway_recording_arc = Arc::new(Mutex::new(vec![])); + let ui_gateway = TestUiGateway::new(1, &ui_gateway_recording_arc); + let recipient = ui_gateway.start().recipient(); + { + LOG_RECIPIENT_OPT.lock().unwrap().replace(recipient); + } + let log_function = move || "This is a warn log.".to_string(); + + logger.warning(log_function); + + system.run(); //shut down by matching the Actor's expected count of received messages + let ui_gateway_recording = ui_gateway_recording_arc.lock().unwrap(); + assert_eq!( + *ui_gateway_recording, + vec![NodeToUiMessage { + target: MessageTarget::AllClients, + body: UiLogBroadcast { + msg: "This is a warn log.".to_string(), + log_level: SerializableLogLevel::Warn + } + .tmb(0) + }] + ); + TestLogHandler::new().exists_log_containing("WARN: test: This is a warn log."); + } #[test] fn logger_format_is_correct() { init_test_logging(); + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + { + LOG_RECIPIENT_OPT.lock().unwrap().take(); + } let one_logger = Logger::new("logger_format_is_correct_one"); let another_logger = Logger::new("logger_format_is_correct_another"); @@ -244,36 +697,32 @@ mod tests { #[test] fn info_is_not_computed_when_log_level_is_warn() { + init_test_logging(); + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + { + LOG_RECIPIENT_OPT.lock().unwrap().take(); + } let logger = make_logger_at_level(Level::Warn); - let signal = Arc::new(Mutex::new(Some(false))); - let signal_c = signal.clone(); - - let log_function = move || { - let mut locked_signal = signal_c.lock().unwrap(); - locked_signal.replace(true); - "blah".to_string() - }; + let log_function = move || "info 445566".to_string(); logger.info(log_function); - assert_eq!(signal.lock().unwrap().as_ref(), Some(&false)); + TestLogHandler::new().exists_no_log_containing("info 445566") } #[test] fn warning_is_not_computed_when_log_level_is_error() { + init_test_logging(); + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + { + LOG_RECIPIENT_OPT.lock().unwrap().take(); + } let logger = make_logger_at_level(Level::Error); - let signal = Arc::new(Mutex::new(Some(false))); - let signal_c = signal.clone(); - - let log_function = move || { - let mut locked_signal = signal_c.lock().unwrap(); - locked_signal.replace(true); - "blah".to_string() - }; + let log_function = move || "warning 335566".to_string(); logger.warning(log_function); - assert_eq!(signal.lock().unwrap().as_ref(), Some(&false)); + TestLogHandler::new().exists_no_log_containing("warning 335566") } #[test] @@ -312,10 +761,13 @@ mod tests { #[test] fn info_is_computed_when_log_level_is_info() { + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + { + LOG_RECIPIENT_OPT.lock().unwrap().take(); + } let logger = make_logger_at_level(Level::Info); let signal = Arc::new(Mutex::new(Some(false))); let signal_c = signal.clone(); - let log_function = move || { let mut locked_signal = signal_c.lock().unwrap(); locked_signal.replace(true); @@ -329,6 +781,10 @@ mod tests { #[test] fn warn_is_computed_when_log_level_is_warn() { + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + { + LOG_RECIPIENT_OPT.lock().unwrap().take(); + } let logger = make_logger_at_level(Level::Warn); let signal = Arc::new(Mutex::new(Some(false))); let signal_c = signal.clone(); @@ -346,10 +802,13 @@ mod tests { #[test] fn error_is_computed_when_log_level_is_error() { + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + { + LOG_RECIPIENT_OPT.lock().unwrap().take(); + } let logger = make_logger_at_level(Level::Error); let signal = Arc::new(Mutex::new(Some(false))); let signal_c = signal.clone(); - let log_function = move || { let mut locked_signal = signal_c.lock().unwrap(); locked_signal.replace(true); @@ -364,6 +823,10 @@ mod tests { #[test] fn macros_work() { init_test_logging(); + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + { + LOG_RECIPIENT_OPT.lock().unwrap().take(); + } let logger = Logger::new("test"); trace!(logger, "trace! {}", 42); diff --git a/masq_lib/src/messages.rs b/masq_lib/src/messages.rs index 45579de95..16cf67af2 100644 --- a/masq_lib/src/messages.rs +++ b/masq_lib/src/messages.rs @@ -637,6 +637,20 @@ pub struct UiGenerateWalletsResponse { } conversation_message!(UiGenerateWalletsResponse, "generateWallets"); +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub enum SerializableLogLevel { + Error, + Warn, + Info, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct UiLogBroadcast { + pub msg: String, + pub log_level: SerializableLogLevel, +} +fire_and_forget_message!(UiLogBroadcast, "logBroadcast"); + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct UiNewPasswordBroadcast {} fire_and_forget_message!(UiNewPasswordBroadcast, "newPassword"); diff --git a/masq_lib/src/test_utils/mock_websockets_server.rs b/masq_lib/src/test_utils/mock_websockets_server.rs index 42d1fbfa3..57ad6409e 100644 --- a/masq_lib/src/test_utils/mock_websockets_server.rs +++ b/masq_lib/src/test_utils/mock_websockets_server.rs @@ -498,9 +498,10 @@ mod tests { ); let _received_message_number_three: UiConfigurationChangedBroadcast = - connection.receive().unwrap(); + connection.skip_until_received().unwrap(); - let _received_message_number_four: UiNodeCrashedBroadcast = connection.receive().unwrap(); + let _received_message_number_four: UiNodeCrashedBroadcast = + connection.skip_until_received().unwrap(); let received_message_number_five: UiDescriptorResponse = connection .transact_with_context_id(conversation_number_three_request.clone(), 3) @@ -510,7 +511,8 @@ mod tests { Some("ae15fe6".to_string()) ); - let _received_message_number_six: UiNewPasswordBroadcast = connection.receive().unwrap(); + let _received_message_number_six: UiNewPasswordBroadcast = + connection.skip_until_received().unwrap(); let requests = stop_handle.stop(); diff --git a/masq_lib/src/test_utils/ui_connection.rs b/masq_lib/src/test_utils/ui_connection.rs index d417da686..21843200a 100644 --- a/masq_lib/src/test_utils/ui_connection.rs +++ b/masq_lib/src/test_utils/ui_connection.rs @@ -1,8 +1,10 @@ // Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. use crate::messages::{FromMessageBody, ToMessageBody, UiMessageError}; +use crate::test_utils::ui_connection::ReceiveResult::{Correct, MarshalError, TransactionError}; use crate::ui_gateway::MessagePath::Conversation; use crate::ui_gateway::MessageTarget::ClientId; +use crate::ui_gateway::NodeToUiMessage; use crate::ui_traffic_converter::UiTrafficConverter; use crate::utils::localhost; use std::io::Write; @@ -61,10 +63,10 @@ impl UiConnection { self.client.writer_mut() } - fn receive_raw( + fn receive_main_impl( &mut self, context_id: Option, - ) -> Result { + ) -> ReceiveResult { let incoming_msg_json = match self.client.recv_message() { Ok(OwnedMessage::Binary(bytes)) if bytes == b"EMPTY QUEUE" => { panic!("The queue is empty; all messages are gone.") @@ -92,22 +94,32 @@ impl UiConnection { } } - let result: Result<(T, u64), UiMessageError> = T::fmb(incoming_msg.body); + let result: Result<(T, u64), UiMessageError> = T::fmb(incoming_msg.body.clone()); match result { - Ok((payload, _)) => Ok(payload), + Ok((payload, _)) => ReceiveResult::Correct(payload), Err(UiMessageError::PayloadError(message_body)) => { let payload_error = message_body .payload .err() .expect("PayloadError message body contained no payload error"); - Err(payload_error) + ReceiveResult::TransactionError(payload_error) } - Err(e) => panic!("Deserialization problem for {}: {:?}", opcode, e), + Err(e) => ReceiveResult::MarshalError((incoming_msg, e, opcode)), } } - pub fn receive(&mut self) -> Result { - self.receive_raw::(None) + pub fn skip_until_received(&mut self) -> Result { + Self::await_message(self) + } + + fn await_message(&mut self) -> Result { + loop { + match self.receive_main_impl::(None) { + Correct(msg) => break Ok(msg), + TransactionError(e) => break Err(e), + MarshalError(_) => continue, + } + } } pub fn transact( @@ -115,7 +127,7 @@ impl UiConnection { payload: S, ) -> Result { self.send(payload); - self.receive_raw::(None) + Self::standard_result_resolution(self.receive_main_impl::(None)) } pub fn transact_with_context_id( @@ -124,10 +136,28 @@ impl UiConnection { context_id: u64, ) -> Result { self.send_with_context_id(payload, context_id); - self.receive_raw::(Some(context_id)) + Self::standard_result_resolution(self.receive_main_impl::(Some(context_id))) + } + + fn standard_result_resolution( + extended_result: ReceiveResult, + ) -> Result { + match extended_result { + Correct(msg) => Ok(msg), + TransactionError(e) => Err(e), + MarshalError((_, e, opcode)) => { + panic!("Deserialization problem for {}: {:?}", opcode, e) + } + } } pub fn shutdown(self) { self.client.shutdown().unwrap() } } + +pub enum ReceiveResult { + Correct(T), + TransactionError((u64, String)), + MarshalError((NodeToUiMessage, UiMessageError, String)), +} diff --git a/node/ci/build.sh b/node/ci/build.sh index 506487dbb..641e2fd96 100755 --- a/node/ci/build.sh +++ b/node/ci/build.sh @@ -3,5 +3,5 @@ CI_DIR="$( cd "$( dirname "$0" )" && pwd )" pushd "$CI_DIR/.." -cargo build --all --lib --bins --release --verbose --features masq_lib/no_test_share +cargo build --all --lib --bins --release --verbose --features masq_lib/no_test_share masq_lib/log_recipient_test popd diff --git a/node/ci/run_integration_tests.sh b/node/ci/run_integration_tests.sh index 1239bb543..3bddb3474 100755 --- a/node/ci/run_integration_tests.sh +++ b/node/ci/run_integration_tests.sh @@ -9,7 +9,7 @@ export RUSTFLAGS="-D warnings -Anon-snake-case" umask 000 pushd "$CI_DIR/.." -cargo test --release --no-fail-fast -- --nocapture --test-threads=1 _integration +cargo test --release --no-fail-fast --features masq_lib/no_test_share -- --nocapture --test-threads=1 _integration BUILD_RESULT=$? chmod -R 777 target popd diff --git a/node/ci/unit_tests.sh b/node/ci/unit_tests.sh index 663809a0e..5076c6117 100755 --- a/node/ci/unit_tests.sh +++ b/node/ci/unit_tests.sh @@ -5,5 +5,5 @@ CI_DIR="$( cd "$( dirname "$0" )" && pwd )" export RUST_BACKTRACE=full export RUSTFLAGS="-D warnings -Anon-snake-case" pushd "$CI_DIR/.." -cargo test --release --lib --no-fail-fast -- --nocapture --skip _integration +cargo test --release --lib --no-fail-fast --features masq_lib/log_recipient_test -- --nocapture --skip _integration popd diff --git a/node/src/actor_system_factory.rs b/node/src/actor_system_factory.rs index 06c64ded8..06c7a3e32 100644 --- a/node/src/actor_system_factory.rs +++ b/node/src/actor_system_factory.rs @@ -39,7 +39,8 @@ use automap_lib::control_layer::automap_control::{ }; use masq_lib::blockchains::chains::Chain; use masq_lib::crash_point::CrashPoint; -use masq_lib::ui_gateway::NodeFromUiMessage; +use masq_lib::logger::prepare_log_recipient; +use masq_lib::ui_gateway::{NodeFromUiMessage, NodeToUiMessage}; use masq_lib::utils::{exit_process, AutomapProtocol}; use std::net::{IpAddr, Ipv4Addr}; use std::path::Path; @@ -95,6 +96,7 @@ pub trait ActorSystemFactoryTools: Send { } pub struct ActorSystemFactoryToolsReal { + log_recipient_setter: Box, automap_control_factory: Box, } @@ -164,7 +166,7 @@ impl ActorSystemFactoryTools for ActorSystemFactoryToolsReal { hopper: hopper_subs, neighborhood: neighborhood_subs.clone(), accountant: accountant_subs, - ui_gateway: ui_gateway_subs, + ui_gateway: ui_gateway_subs.clone(), blockchain_bridge: blockchain_bridge_subs, configurator: configurator_subs, }; @@ -197,6 +199,9 @@ impl ActorSystemFactoryTools for ActorSystemFactoryToolsReal { }) .expect("Dispatcher is dead"); + self.log_recipient_setter + .prepare_log_recipient(ui_gateway_subs.node_to_ui_message_sub); + self.start_automap( &config, vec![ @@ -237,6 +242,7 @@ impl ActorSystemFactoryTools for ActorSystemFactoryToolsReal { impl ActorSystemFactoryToolsReal { pub fn new() -> Self { Self { + log_recipient_setter: Box::new(LogRecipientSetterReal::new()), automap_control_factory: Box::new(AutomapControlFactoryReal::new()), } } @@ -545,6 +551,24 @@ impl AutomapControlFactory for AutomapControlFactoryNull { } } +trait LogRecipientSetter: Send { + fn prepare_log_recipient(&self, recipient: Recipient); +} + +struct LogRecipientSetterReal {} + +impl LogRecipientSetterReal { + pub fn new() -> Self { + Self {} + } +} + +impl LogRecipientSetter for LogRecipientSetterReal { + fn prepare_log_recipient(&self, recipient: Recipient) { + prepare_log_recipient(recipient); + } +} + #[cfg(test)] mod tests { use super::*; @@ -587,6 +611,8 @@ mod tests { use log::LevelFilter; use masq_lib::constants::DEFAULT_CHAIN; use masq_lib::crash_point::CrashPoint; + #[cfg(feature = "log_recipient_test")] + use masq_lib::logger::INITIALIZATION_COUNTER; use masq_lib::messages::{ToMessageBody, UiCrashRequest, UiDescriptorRequest}; use masq_lib::test_utils::utils::TEST_DEFAULT_CHAIN; use masq_lib::ui_gateway::NodeFromUiMessage; @@ -603,6 +629,18 @@ mod tests { use std::thread; use std::time::Duration; + struct LogRecipientSetterNull {} + + impl LogRecipientSetterNull { + pub fn new() -> Self { + Self {} + } + } + + impl LogRecipientSetter for LogRecipientSetterNull { + fn prepare_log_recipient(&self, _recipient: Recipient) {} + } + #[derive(Default)] struct ActorSystemFactoryToolsMock { prepare_initial_messages_params: Arc< @@ -989,6 +1027,7 @@ mod tests { &Some(alias_cryptde()), ); let mut tools = ActorSystemFactoryToolsReal::new(); + tools.log_recipient_setter = Box::new(LogRecipientSetterNull::new()); tools.automap_control_factory = Box::new( AutomapControlFactoryMock::new().make_result( AutomapControlMock::new() @@ -997,12 +1036,12 @@ mod tests { ), ); let subject = ActorSystemFactoryReal::new(Box::new(tools)); - let system = System::new("test"); + subject.make_and_start_actors(config, Box::new(actor_factory), &persistent_config); + System::current().stop(); system.run(); - thread::sleep(Duration::from_millis(100)); Recording::get::(&recordings.dispatcher, 0); Recording::get::(&recordings.hopper, 0); @@ -1055,6 +1094,7 @@ mod tests { }; let add_mapping_params_arc = Arc::new(Mutex::new(vec![])); let mut subject = ActorSystemFactoryToolsReal::new(); + subject.log_recipient_setter = Box::new(LogRecipientSetterNull::new()); subject.automap_control_factory = Box::new( AutomapControlFactoryMock::new().make_result( AutomapControlMock::new() @@ -1155,6 +1195,30 @@ mod tests { assert_eq!(*add_mapping_params, vec![1234, 2345]); } + #[cfg(feature = "log_recipient_test")] + #[test] + fn prepare_initial_messages_initiates_global_log_recipient() { + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); + running_test(); + let actor_factory = ActorFactoryMock::new(); + let mut config = BootstrapperConfig::default(); + config.neighborhood_config = NeighborhoodConfig { + mode: NeighborhoodMode::ConsumeOnly(vec![]), + }; + let subject = ActorSystemFactoryToolsReal::new(); + let state_before = INITIALIZATION_COUNTER.lock().unwrap().0; + + let _ = subject.prepare_initial_messages( + main_cryptde(), + alias_cryptde(), + config, + Box::new(actor_factory), + ); + + let state_after = INITIALIZATION_COUNTER.lock().unwrap().0; + assert_eq!(state_after, state_before + 1) + } + #[test] #[should_panic( expected = "1: IP change to 1.2.3.5 reported from ISP. We can't handle that until GH-499. Going down..." @@ -1172,6 +1236,7 @@ mod tests { }; let make_params_arc = Arc::new(Mutex::new(vec![])); let mut subject = ActorSystemFactoryToolsReal::new(); + subject.log_recipient_setter = Box::new(LogRecipientSetterNull::new()); subject.automap_control_factory = Box::new( AutomapControlFactoryMock::new() .make_params(&make_params_arc) @@ -1231,6 +1296,7 @@ mod tests { }; let system = System::new("MASQNode"); let mut subject = ActorSystemFactoryToolsReal::new(); + subject.log_recipient_setter = Box::new(LogRecipientSetterNull::new()); subject.automap_control_factory = Box::new(AutomapControlFactoryMock::new()); let _ = subject.prepare_initial_messages( @@ -1242,7 +1308,6 @@ mod tests { System::current().stop(); system.run(); - let messages = recordings.proxy_client.lock().unwrap(); assert!(messages.is_empty()); check_bind_message(&recordings.dispatcher, true); @@ -1396,7 +1461,8 @@ mod tests { }, node_descriptor: Default::default(), }; - let subject = ActorSystemFactoryToolsReal::new(); + let mut subject = ActorSystemFactoryToolsReal::new(); + subject.log_recipient_setter = Box::new(LogRecipientSetterNull::new()); let system = System::new("MASQNode"); let _ = subject.prepare_initial_messages( diff --git a/node/src/bootstrapper.rs b/node/src/bootstrapper.rs index 6ddab3bb0..918d2a8c9 100644 --- a/node/src/bootstrapper.rs +++ b/node/src/bootstrapper.rs @@ -714,6 +714,7 @@ mod tests { use log::LevelFilter::Off; use masq_lib::blockchains::chains::Chain; use masq_lib::logger::Logger; + use masq_lib::logger::TEST_LOG_RECIPIENT_GUARD; use masq_lib::test_utils::environment_guard::ClapGuard; use masq_lib::test_utils::fake_stream_holder::FakeStreamHolder; use masq_lib::test_utils::logging::{init_test_logging, TestLog, TestLogHandler}; @@ -1359,6 +1360,7 @@ mod tests { #[test] fn init_as_privileged_stores_dns_servers_and_passes_them_to_actor_system_factory_for_proxy_client_in_init_as_unprivileged( ) { + let _guard = TEST_LOG_RECIPIENT_GUARD.lock().unwrap(); // don't remove; protection for test 'prepare_initial_messages_initiates_global_log_recipient' let _lock = INITIALIZATION.lock(); let _clap_guard = ClapGuard::new(); let data_dir = ensure_node_home_directory_exists( diff --git a/node/tests/ui_gateway_test.rs b/node/tests/ui_gateway_test.rs index 3a6028838..42412b678 100644 --- a/node/tests/ui_gateway_test.rs +++ b/node/tests/ui_gateway_test.rs @@ -2,9 +2,11 @@ pub mod utils; +use masq_lib::messages::SerializableLogLevel::Warn; use masq_lib::messages::{ - UiDescriptorRequest, UiDescriptorResponse, UiFinancialsRequest, UiFinancialsResponse, - UiShutdownRequest, NODE_UI_PROTOCOL, + UiChangePasswordRequest, UiDescriptorRequest, UiDescriptorResponse, UiFinancialsRequest, + UiFinancialsResponse, UiLogBroadcast, UiShutdownRequest, UiWalletAddressesRequest, + NODE_UI_PROTOCOL, }; use masq_lib::test_utils::ui_connection::UiConnection; use masq_lib::utils::find_free_port; @@ -29,7 +31,7 @@ fn dispatcher_message_integration() { let mut shutdown_client = UiConnection::new(port, NODE_UI_PROTOCOL); descriptor_client.send(descriptor_req); - let _: UiDescriptorResponse = descriptor_client.receive().unwrap(); + let _: UiDescriptorResponse = descriptor_client.skip_until_received().unwrap(); shutdown_client.send(shutdown_req); node.wait_for_exit(); @@ -57,10 +59,46 @@ fn request_financial_information_integration() { let mut client = UiConnection::new(port, NODE_UI_PROTOCOL); client.send(financials_request); - let financials_response: UiFinancialsResponse = client.receive().unwrap(); + let financials_response: UiFinancialsResponse = client.skip_until_received().unwrap(); assert_eq!(financials_response.payables.len(), 0); assert_eq!(financials_response.receivables.len(), 0); client.send(UiShutdownRequest {}); node.wait_for_exit(); } + +#[test] +fn log_broadcasts_are_correctly_received_integration() { + fdlimit::raise_fd_limit(); + let port = find_free_port(); + let mut node = utils::MASQNode::start_standard( + "log_broadcasts_are_correctly_received", + Some(CommandConfig::new().pair("--ui-port", &port.to_string())), + true, + true, + false, + true, + ); + node.wait_for_log("UIGateway bound", Some(5000)); + let mut client = UiConnection::new(port, NODE_UI_PROTOCOL); + client.send(UiWalletAddressesRequest { + db_password: "blah".to_string(), + }); + client.send(UiChangePasswordRequest { + old_password_opt: Some("blah".to_string()), + new_password: "blah".to_string(), + }); + + let broadcasts: Vec = (0..2) + .map(|_| client.skip_until_received().unwrap()) + .collect(); + + assert_eq!(broadcasts, + vec![ + UiLogBroadcast { msg: "Failed to obtain wallet addresses: 281474976710669, Wallet pair not yet configured".to_string(), log_level: Warn }, + UiLogBroadcast { msg: "Failed to change password: PasswordError".to_string(), log_level: Warn } + ] + ); + client.send(UiShutdownRequest {}); + node.wait_for_exit(); +}