Skip to content

Commit cece664

Browse files
authored
(GH Issue #27521 Workaround) Module api refactoring gh pr issue (#313)
* refactor: replace pulsar module with a trait * refactor: module metadata * feat: move module loops inside module manager * feat(modules_api): add support for an extra function working the state * fix: start module command * Refactored PulsarModule trait * Comment PulsarModule traits * Rename ExtraState to Extension * apply review suggestions * Apply review * fix: daemon refactoring * chore: fix clippy * chore: fmt * fix(module_manager): make tokio select biased
1 parent fc90778 commit cece664

File tree

25 files changed

+1283
-1102
lines changed

25 files changed

+1283
-1102
lines changed

Cargo.lock

-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/engine-api/src/error.rs

+1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ impl From<PulsarDaemonError> for EngineApiError {
5959
fn from(error: PulsarDaemonError) -> Self {
6060
match &error {
6161
PulsarDaemonError::ModuleNotFound(_) => Self::BadRequest(error.to_string()),
62+
PulsarDaemonError::StartError(_) => Self::BadRequest(error.to_string()),
6263
PulsarDaemonError::StopError(_) => Self::BadRequest(error.to_string()),
6364
PulsarDaemonError::ConfigurationUpdateError(_) => {
6465
log::error!("Unexpected Error {}", error.to_string());

crates/modules/desktop-notifier/src/lib.rs

+26-35
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,44 @@
11
use std::{
22
os::unix::process::CommandExt,
33
process::{Command, Stdio},
4-
sync::Arc,
54
};
65

7-
use anyhow::{Context, Result};
6+
use anyhow::Context;
87
use pulsar_core::{
98
event::Threat,
10-
pdk::{
11-
CleanExit, ConfigError, Event, ModuleConfig, ModuleContext, ModuleError, PulsarModule,
12-
ShutdownSignal, Version,
13-
},
9+
pdk::{ConfigError, Event, ModuleConfig, ModuleContext, ModuleError, SimplePulsarModule},
1410
};
1511

16-
const MODULE_NAME: &str = "desktop-notifier";
12+
pub struct DesktopNotifierModule;
1713

18-
pub fn module() -> PulsarModule {
19-
PulsarModule::new(
20-
MODULE_NAME,
21-
Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
22-
false,
23-
desktop_nitifier_task,
24-
)
25-
}
14+
impl SimplePulsarModule for DesktopNotifierModule {
15+
type Config = Config;
16+
type State = ();
2617

27-
async fn desktop_nitifier_task(
28-
ctx: ModuleContext,
29-
mut shutdown: ShutdownSignal,
30-
) -> Result<CleanExit, ModuleError> {
31-
let mut receiver = ctx.get_receiver();
32-
let mut rx_config = ctx.get_config();
33-
let mut config = rx_config.read()?;
18+
const MODULE_NAME: &'static str = "desktop-notifier";
19+
const DEFAULT_ENABLED: bool = false;
3420

35-
loop {
36-
tokio::select! {
37-
r = shutdown.recv() => return r,
38-
_ = rx_config.changed() => {
39-
config = rx_config.read()?;
40-
continue;
41-
}
42-
msg = receiver.recv() => {
43-
handle_event(&config, msg?).await;
44-
}
45-
}
21+
async fn init_state(
22+
&self,
23+
_config: &Self::Config,
24+
_ctx: &ModuleContext,
25+
) -> Result<Self::State, ModuleError> {
26+
Ok(())
27+
}
28+
29+
async fn on_event(
30+
event: &Event,
31+
config: &Self::Config,
32+
_state: &mut Self::State,
33+
_ctx: &ModuleContext,
34+
) -> Result<(), ModuleError> {
35+
handle_event(config, event).await;
36+
Ok(())
4637
}
4738
}
4839

4940
/// Check if the given event is a threat which should be notified to the user
50-
async fn handle_event(config: &Config, event: Arc<Event>) {
41+
async fn handle_event(config: &Config, event: &Event) {
5142
if let Some(Threat {
5243
source,
5344
description,
@@ -99,7 +90,7 @@ async fn notify_send(config: &Config, args: Vec<String>) {
9990
}
10091

10192
#[derive(Clone)]
102-
struct Config {
93+
pub struct Config {
10394
user_id: u32,
10495
display: String,
10596
notify_send_executable: String,

crates/modules/file-system-monitor/src/lib.rs

+36-40
Original file line numberDiff line numberDiff line change
@@ -83,51 +83,48 @@ pub mod pulsar {
8383
use pulsar_core::{
8484
event::FileFlags,
8585
pdk::{
86-
CleanExit, ConfigError, Event, IntoPayload, ModuleConfig, ModuleContext, ModuleError,
87-
ModuleSender, Payload, PulsarModule, ShutdownSignal, Version,
86+
ConfigError, Event, IntoPayload, ModuleConfig, ModuleContext, ModuleError, Payload,
87+
SimplePulsarModule,
8888
},
8989
};
9090
use tokio::{fs::File, io::AsyncReadExt};
9191

92-
pub fn module() -> PulsarModule {
93-
PulsarModule::new(
94-
MODULE_NAME,
95-
Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
96-
true,
97-
fs_monitor_task,
98-
)
99-
}
92+
pub struct FileSystemMonitorModule;
10093

101-
async fn fs_monitor_task(
102-
ctx: ModuleContext,
103-
mut shutdown: ShutdownSignal,
104-
) -> Result<CleanExit, ModuleError> {
105-
let _program = program(ctx.get_bpf_context(), ctx.get_sender()).await?;
106-
let mut receiver = ctx.get_receiver();
107-
let mut rx_config = ctx.get_config();
108-
let mut config: Config = rx_config.read()?;
109-
let sender = ctx.get_sender();
110-
loop {
111-
// enable receiver only if the elf checker is enabled
112-
let receiver_recv = async {
113-
if config.elf_check_enabled {
114-
receiver.recv().await
115-
} else {
116-
std::future::pending().await
117-
}
118-
};
119-
tokio::select! {
120-
Ok(msg) = receiver_recv => {
121-
check_elf(&sender, &config, msg.as_ref()).await;
122-
}
123-
_ = rx_config.changed() => {
124-
config = rx_config.read()?;
125-
}
126-
r = shutdown.recv() => return r,
94+
impl SimplePulsarModule for FileSystemMonitorModule {
95+
type Config = Config;
96+
type State = FileSystemMonitorState;
97+
98+
const MODULE_NAME: &'static str = MODULE_NAME;
99+
const DEFAULT_ENABLED: bool = true;
100+
101+
async fn init_state(
102+
&self,
103+
_config: &Self::Config,
104+
ctx: &ModuleContext,
105+
) -> Result<Self::State, ModuleError> {
106+
Ok(Self::State {
107+
_ebpf_program: program(ctx.get_bpf_context(), ctx.clone()).await?,
108+
})
109+
}
110+
111+
async fn on_event(
112+
event: &Event,
113+
config: &Self::Config,
114+
_state: &mut Self::State,
115+
ctx: &ModuleContext,
116+
) -> Result<(), ModuleError> {
117+
if config.elf_check_enabled {
118+
check_elf(ctx, &config.elf_check_whitelist, event).await;
127119
}
120+
Ok(())
128121
}
129122
}
130123

124+
pub struct FileSystemMonitorState {
125+
_ebpf_program: Program,
126+
}
127+
131128
impl IntoPayload for FsEvent {
132129
type Error = IndexError;
133130

@@ -197,15 +194,14 @@ pub mod pulsar {
197194
}
198195

199196
/// Check if an opened file is an ELF
200-
async fn check_elf(sender: &ModuleSender, config: &Config, event: &Event) {
197+
async fn check_elf(ctx: &ModuleContext, elf_check_whitelist: &[String], event: &Event) {
201198
if let Payload::FileOpened { filename, flags } = event.payload() {
202199
let now = Instant::now();
203-
let should_check = !config
204-
.elf_check_whitelist
200+
let should_check = !elf_check_whitelist
205201
.iter()
206202
.any(|path| filename.starts_with(path));
207203
if should_check && is_elf(filename).await {
208-
sender.send_derived(
204+
ctx.send_derived(
209205
event,
210206
Payload::ElfOpened {
211207
filename: filename.to_string(),

crates/modules/logger/src/lib.rs

+68-58
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
use pulsar_core::pdk::{
2-
CleanExit, ConfigError, Event, ModuleConfig, ModuleContext, ModuleError, PulsarModule,
3-
ShutdownSignal, Version,
4-
};
51
use std::{
62
borrow::Cow,
73
cell::OnceCell,
@@ -14,62 +10,76 @@ use std::{
1410
},
1511
str::FromStr,
1612
};
13+
14+
use pulsar_core::pdk::{
15+
ConfigError, Event, ModuleConfig, ModuleContext, ModuleError, SimplePulsarModule,
16+
};
1717
use thiserror::Error;
1818

1919
const UNIX_SOCK_PATHS: [&str; 3] = ["/dev/log", "/var/run/syslog", "/var/run/log"];
20-
const MODULE_NAME: &str = "logger";
2120
const PRIORITY: u8 = 25; // facility * 8 + severity. facility: daemon (3); severity: alert (1)
2221

23-
pub fn module() -> PulsarModule {
24-
PulsarModule::new(
25-
MODULE_NAME,
26-
Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
27-
true,
28-
logger_task,
29-
)
30-
}
22+
pub struct LoggerModule;
23+
24+
impl SimplePulsarModule for LoggerModule {
25+
type Config = Config;
26+
type State = LoggerState;
27+
28+
const MODULE_NAME: &'static str = "threat-logger";
29+
const DEFAULT_ENABLED: bool = true;
30+
31+
async fn init_state(
32+
&self,
33+
config: &Self::Config,
34+
ctx: &ModuleContext,
35+
) -> Result<Self::State, ModuleError> {
36+
let logger = match Logger::from_config(config) {
37+
Ok(logr) => logr,
38+
Err(logr) => {
39+
ctx.raise_warning("Failed to connect to syslog".into())
40+
.await;
41+
logr
42+
}
43+
};
3144

32-
async fn logger_task(
33-
ctx: ModuleContext,
34-
mut shutdown: ShutdownSignal,
35-
) -> Result<CleanExit, ModuleError> {
36-
let mut receiver = ctx.get_receiver();
37-
let mut rx_config = ctx.get_config();
38-
let sender = ctx.get_sender();
39-
40-
let mut logger = match Logger::from_config(rx_config.read()?) {
41-
Ok(logr) => logr,
42-
Err(logr) => {
43-
sender
44-
.raise_warning("Failed to connect to syslog".into())
45-
.await;
46-
logr
47-
}
48-
};
49-
50-
loop {
51-
tokio::select! {
52-
r = shutdown.recv() => return r,
53-
_ = rx_config.changed() => {
54-
logger = match Logger::from_config(rx_config.read()?) {
55-
Ok(logr) => logr,
56-
Err(logr) => {
57-
sender.raise_warning("Failed to connect to syslog".into()).await;
58-
logr
59-
}
60-
}
45+
Ok(LoggerState { logger })
46+
}
47+
48+
async fn on_config_change(
49+
new_config: &Self::Config,
50+
state: &mut Self::State,
51+
ctx: &ModuleContext,
52+
) -> Result<(), ModuleError> {
53+
state.logger = match Logger::from_config(new_config) {
54+
Ok(logr) => logr,
55+
Err(logr) => {
56+
ctx.raise_warning("Failed to connect to syslog".into())
57+
.await;
58+
logr
6159
}
62-
msg = receiver.recv() => {
63-
let msg = msg?;
64-
if let Err(e) = logger.process(&msg) {
65-
sender.raise_warning(format!("Writing to logs failed: {e}")).await;
66-
logger = Logger { syslog: None, ..logger };
67-
}
68-
},
60+
};
61+
Ok(())
62+
}
63+
64+
async fn on_event(
65+
event: &Event,
66+
_config: &Self::Config,
67+
state: &mut Self::State,
68+
ctx: &ModuleContext,
69+
) -> Result<(), ModuleError> {
70+
if let Err(e) = state.logger.process(event) {
71+
ctx.raise_warning(format!("Writing to logs failed: {e}, syslog disabled"))
72+
.await;
73+
state.logger.syslog = None;
6974
}
75+
Ok(())
7076
}
7177
}
7278

79+
pub struct LoggerState {
80+
logger: Logger,
81+
}
82+
7383
#[derive(Clone, Debug)]
7484
enum OutputFormat {
7585
Plaintext,
@@ -92,7 +102,7 @@ impl FromStr for OutputFormat {
92102
}
93103

94104
#[derive(Clone)]
95-
struct Config {
105+
pub struct Config {
96106
console: bool,
97107
// file: bool, //TODO:
98108
syslog: bool,
@@ -128,12 +138,12 @@ enum LoggerError {
128138
}
129139

130140
impl Logger {
131-
fn from_config(rx_config: Config) -> Result<Self, Self> {
141+
fn from_config(config: &Config) -> Result<Self, Self> {
132142
let Config {
133143
console,
134144
syslog,
135145
output_format,
136-
} = rx_config;
146+
} = config;
137147

138148
let connected_to_journal = io::stderr()
139149
.as_fd()
@@ -146,7 +156,7 @@ impl Logger {
146156
})
147157
.unwrap_or(false);
148158

149-
let opt_sock = (syslog && !connected_to_journal)
159+
let opt_sock = (*syslog && !connected_to_journal)
150160
.then(|| {
151161
let sock = UnixDatagram::unbound().ok()?;
152162
UNIX_SOCK_PATHS
@@ -156,17 +166,17 @@ impl Logger {
156166
})
157167
.flatten();
158168

159-
if syslog && opt_sock.is_none() {
169+
if *syslog && opt_sock.is_none() {
160170
Err(Self {
161-
console,
171+
console: *console,
162172
syslog: opt_sock,
163-
output_format,
173+
output_format: output_format.clone(),
164174
})
165175
} else {
166176
Ok(Self {
167-
console,
177+
console: *console,
168178
syslog: opt_sock,
169-
output_format,
179+
output_format: output_format.clone(),
170180
})
171181
}
172182
}

0 commit comments

Comments
 (0)