Skip to content

Commit a1f60e8

Browse files
committed
157-Make-module-apis-more-ergonomic
1 parent ee0180e commit a1f60e8

File tree

26 files changed

+1085
-959
lines changed

26 files changed

+1085
-959
lines changed

Cargo.lock

+481-399
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ semver = { version = "1.0.4", features = ["serde"] }
3838
rust-ini = "0.17.0"
3939
comfy-table = "5.0.1"
4040
futures-util = "0.3.25"
41+
async-trait = "0.1.73"
4142

4243
[features]
4344
default = ["full"]

crates/modules/desktop-notifier/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ bpf-common = { path = "../../bpf-common" }
1212
tokio = { version = "1", features = ["full"] }
1313
log = "0.4"
1414
anyhow = "1.0.68"
15+
async-trait = "0.1.73"
+47-53
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,70 @@
11
use std::{
22
os::unix::process::CommandExt,
33
process::{Command, Stdio},
4-
sync::Arc,
54
};
65

76
use anyhow::{Context, Result};
7+
use async_trait::async_trait;
88
use pulsar_core::{
99
event::Threat,
1010
pdk::{
11-
CleanExit, ConfigError, Event, ModuleConfig, ModuleContext, ModuleError, PulsarModule,
12-
ShutdownSignal, Version,
11+
ConfigError, Event, Module, ModuleConfig, ModuleContext, ModuleError, PulsarModule, Version,
1312
},
1413
};
1514

1615
const MODULE_NAME: &str = "desktop-notifier";
1716

18-
pub fn module() -> PulsarModule {
19-
PulsarModule::new(
20-
MODULE_NAME,
21-
Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
22-
desktop_nitifier_task,
23-
)
17+
#[derive(Clone)]
18+
pub struct DesktopNotifier {
19+
user_id: u32,
20+
display: String,
21+
notify_send_executable: String,
22+
bus_address: String,
2423
}
2524

26-
async fn desktop_nitifier_task(
27-
ctx: ModuleContext,
28-
mut shutdown: ShutdownSignal,
29-
) -> Result<CleanExit, ModuleError> {
30-
let mut receiver = ctx.get_receiver();
31-
let mut rx_config = ctx.get_config();
32-
let mut config = rx_config.read()?;
25+
impl TryFrom<&ModuleConfig> for DesktopNotifier {
26+
type Error = ConfigError;
3327

34-
loop {
35-
tokio::select! {
36-
r = shutdown.recv() => return r,
37-
_ = rx_config.changed() => {
38-
config = rx_config.read()?;
39-
continue;
40-
}
41-
msg = receiver.recv() => {
42-
handle_event(&config, msg?).await;
43-
}
44-
}
28+
fn try_from(config: &ModuleConfig) -> Result<Self, Self::Error> {
29+
let user_id = config.with_default("user_id", 1000)?;
30+
Ok(Self {
31+
user_id,
32+
display: config.with_default("display", ":0".to_string())?,
33+
notify_send_executable: config
34+
.with_default("notify_send_executable", "notify-send".to_string())?,
35+
bus_address: config
36+
.with_default("bus_address", format!("unix:path=/run/user/{user_id}/bus"))?,
37+
})
38+
}
39+
}
40+
41+
#[async_trait]
42+
impl Module for DesktopNotifier {
43+
fn start() -> PulsarModule {
44+
PulsarModule::new(
45+
MODULE_NAME,
46+
Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
47+
|ctx: &ModuleContext| {
48+
let desktop_notifier: DesktopNotifier = ctx.get_config().read()?;
49+
Ok(desktop_notifier)
50+
},
51+
)
52+
}
53+
54+
async fn on_event(&mut self, event: &Event, _ctx: &ModuleContext) -> Result<(), ModuleError> {
55+
handle_event(self, event).await;
56+
Ok(())
57+
}
58+
59+
fn on_change(&mut self, ctx: &ModuleContext) -> Result<(), ModuleError> {
60+
let desktop_notifier: DesktopNotifier = ctx.get_config().read()?;
61+
*self = desktop_notifier;
62+
Ok(())
4563
}
4664
}
4765

4866
/// Check if the given event is a threat which should be notified to the user
49-
async fn handle_event(config: &Config, event: Arc<Event>) {
67+
async fn handle_event(config: &DesktopNotifier, event: &Event) {
5068
if let Some(Threat {
5169
source,
5270
description,
@@ -61,7 +79,7 @@ async fn handle_event(config: &Config, event: Arc<Event>) {
6179
}
6280

6381
/// Send a desktop notification spawning `notify-send` with the provided arguments
64-
async fn notify_send(config: &Config, args: Vec<String>) {
82+
async fn notify_send(config: &DesktopNotifier, args: Vec<String>) {
6583
let mut command = Command::new(&config.notify_send_executable);
6684
command
6785
.args(args)
@@ -96,27 +114,3 @@ async fn notify_send(config: &Config, args: Vec<String>) {
96114
}
97115
});
98116
}
99-
100-
#[derive(Clone)]
101-
struct Config {
102-
user_id: u32,
103-
display: String,
104-
notify_send_executable: String,
105-
bus_address: String,
106-
}
107-
108-
impl TryFrom<&ModuleConfig> for Config {
109-
type Error = ConfigError;
110-
111-
fn try_from(config: &ModuleConfig) -> Result<Self, Self::Error> {
112-
let user_id = config.with_default("user_id", 1000)?;
113-
Ok(Self {
114-
user_id,
115-
display: config.with_default("display", ":0".to_string())?,
116-
notify_send_executable: config
117-
.with_default("notify_send_executable", "notify-send".to_string())?,
118-
bus_address: config
119-
.with_default("bus_address", format!("unix:path=/run/user/{user_id}/bus"))?,
120-
})
121-
}
122-
}

crates/modules/file-system-monitor/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pulsar-core = { path = "../../pulsar-core" }
1515
nix = "0.26.2"
1616
tokio = { version = "1", features = ["full"] }
1717
log = "0.4"
18+
async-trait = "0.1.73"
1819

1920
[build-dependencies]
2021
bpf-builder = { path = "../../bpf-builder" }

crates/modules/file-system-monitor/src/lib.rs

+43-36
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
use async_trait::async_trait;
12
use bpf_common::{
23
ebpf_program, parsing::BufferIndex, program::BpfContext, BpfSender, Program, ProgramBuilder,
34
ProgramError,
45
};
56

7+
use pulsar_core::pdk::{Module, ModuleError};
8+
69
const MODULE_NAME: &str = "file-system-monitor";
710

811
pub async fn program(
@@ -83,50 +86,54 @@ pub mod pulsar {
8386
use pulsar_core::{
8487
event::FileFlags,
8588
pdk::{
86-
CleanExit, ConfigError, Event, IntoPayload, ModuleConfig, ModuleContext, ModuleError,
87-
ModuleSender, Payload, PulsarModule, ShutdownSignal, Version,
89+
ConfigError, Event, IntoPayload, ModuleConfig, ModuleContext, ModuleSender, Payload,
90+
PulsarModule, Version,
8891
},
8992
};
9093
use tokio::{fs::File, io::AsyncReadExt};
9194

92-
pub fn module() -> PulsarModule {
93-
PulsarModule::new(
94-
MODULE_NAME,
95-
Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
96-
fs_monitor_task,
97-
)
95+
pub struct FileSystemMonitor {
96+
sender: ModuleSender,
97+
config: Config,
9898
}
9999

100-
async fn fs_monitor_task(
101-
ctx: ModuleContext,
102-
mut shutdown: ShutdownSignal,
103-
) -> Result<CleanExit, ModuleError> {
104-
let _program = program(ctx.get_bpf_context(), ctx.get_sender()).await?;
105-
let mut receiver = ctx.get_receiver();
106-
let mut rx_config = ctx.get_config();
107-
let mut config: Config = rx_config.read()?;
108-
let sender = ctx.get_sender();
109-
loop {
110-
// enable receiver only if the elf checker is enabled
111-
let receiver_recv = async {
112-
if config.elf_check_enabled {
113-
receiver.recv().await
114-
} else {
115-
std::future::pending().await
116-
}
117-
};
118-
tokio::select! {
119-
Ok(msg) = receiver_recv => {
120-
check_elf(&sender, &config, msg.as_ref()).await;
121-
}
122-
_ = rx_config.changed() => {
123-
config = rx_config.read()?;
124-
}
125-
r = shutdown.recv() => return r,
126-
}
100+
impl FileSystemMonitor {
101+
pub fn new(config: Config, sender: ModuleSender) -> Self {
102+
Self { config, sender }
127103
}
128104
}
129105

106+
#[async_trait]
107+
impl Module for FileSystemMonitor {
108+
fn start() -> PulsarModule {
109+
PulsarModule::new(
110+
MODULE_NAME,
111+
Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
112+
|ctx: &ModuleContext| {
113+
let config: Config = ctx.get_config().read()?;
114+
Ok(FileSystemMonitor::new(config, ctx.get_sender()))
115+
},
116+
)
117+
}
118+
119+
async fn ebpf_probe(&self, ctx: &ModuleContext) -> Result<Option<Program>, ProgramError> {
120+
let program = program(ctx.get_bpf_context(), ctx.get_sender()).await?;
121+
Ok(Some(program))
122+
}
123+
124+
async fn on_event(
125+
&mut self,
126+
event: &Event,
127+
_ctx: &ModuleContext,
128+
) -> Result<(), ModuleError> {
129+
if self.config.elf_check_enabled {
130+
check_elf(&self.sender, &self.config, event).await;
131+
Ok(())
132+
} else {
133+
Ok(())
134+
}
135+
}
136+
}
130137
impl IntoPayload for FsEvent {
131138
type Error = IndexError;
132139

@@ -181,7 +188,7 @@ pub mod pulsar {
181188
type Error = ConfigError;
182189

183190
fn try_from(config: &ModuleConfig) -> Result<Self, Self::Error> {
184-
Ok(Config {
191+
Ok(Self {
185192
elf_check_enabled: config.with_default("elf_check_enabled", true)?,
186193
elf_check_whitelist: config.get_list_with_default(
187194
"elf_check_whitelist",

crates/modules/logger/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ bpf-common = { path = "../../bpf-common" }
1212
tokio = { version = "1", features = ["full"] }
1313
log = "0.4"
1414
chrono = { version = "0.4.23", features = ["std"], default-features = false }
15+
async-trait = "0.1.73"

crates/modules/logger/src/lib.rs

+29-44
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,17 @@
1+
use async_trait::async_trait;
12
use pulsar_core::pdk::{
2-
CleanExit, ConfigError, Event, ModuleConfig, ModuleContext, ModuleError, PulsarModule,
3-
ShutdownSignal, Version,
3+
ConfigError, Event, Module, ModuleConfig, ModuleContext, ModuleError, PulsarModule, Version,
44
};
55

66
const MODULE_NAME: &str = "logger";
77

8-
pub fn module() -> PulsarModule {
9-
PulsarModule::new(
10-
MODULE_NAME,
11-
Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
12-
logger_task,
13-
)
14-
}
15-
16-
async fn logger_task(
17-
ctx: ModuleContext,
18-
mut shutdown: ShutdownSignal,
19-
) -> Result<CleanExit, ModuleError> {
20-
let mut receiver = ctx.get_receiver();
21-
let mut rx_config = ctx.get_config();
22-
let mut logger = Logger::from_config(rx_config.read()?);
23-
24-
loop {
25-
tokio::select! {
26-
r = shutdown.recv() => return r,
27-
_ = rx_config.changed() => {
28-
logger = Logger::from_config(rx_config.read()?);
29-
}
30-
msg = receiver.recv() => {
31-
let msg = msg?;
32-
logger.process(&msg)
33-
},
34-
}
35-
}
36-
}
37-
38-
#[derive(Clone)]
39-
struct Config {
8+
pub struct Logger {
409
console: bool,
4110
// file: bool, //TODO:
4211
// syslog: bool, //TODO:
4312
}
4413

45-
impl TryFrom<&ModuleConfig> for Config {
14+
impl TryFrom<&ModuleConfig> for Logger {
4615
type Error = ConfigError;
4716

4817
fn try_from(config: &ModuleConfig) -> Result<Self, Self::Error> {
@@ -54,23 +23,39 @@ impl TryFrom<&ModuleConfig> for Config {
5423
}
5524
}
5625

57-
struct Logger {
58-
console: bool,
59-
}
60-
6126
impl Logger {
62-
fn from_config(rx_config: Config) -> Self {
63-
let Config { console } = rx_config;
64-
Self { console }
65-
}
66-
6727
fn process(&self, event: &Event) {
6828
if event.header().threat.is_some() && self.console {
6929
terminal::print_event(event);
7030
}
7131
}
7232
}
7333

34+
#[async_trait]
35+
impl Module for Logger {
36+
fn start() -> PulsarModule {
37+
PulsarModule::new(
38+
MODULE_NAME,
39+
Version::parse(env!("CARGO_PKG_VERSION")).unwrap(),
40+
|ctx: &ModuleContext| {
41+
let logger: Logger = ctx.get_config().read()?;
42+
Ok(logger)
43+
},
44+
)
45+
}
46+
47+
async fn on_event(&mut self, event: &Event, _ctx: &ModuleContext) -> Result<(), ModuleError> {
48+
self.process(event);
49+
Ok(())
50+
}
51+
52+
fn on_change(&mut self, ctx: &ModuleContext) -> Result<(), ModuleError> {
53+
let logger: Logger = ctx.get_config().read()?;
54+
*self = logger;
55+
Ok(())
56+
}
57+
}
58+
7459
pub mod terminal {
7560
use chrono::{DateTime, Utc};
7661
use pulsar_core::{event::Threat, pdk::Event};

crates/modules/network-monitor/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ tokio = { version = "1", features = ["full"] }
1616
log = "0.4"
1717
nix = "0.26.2"
1818
dns-parser = "0.8.0"
19+
async-trait = "0.1.73"
1920

2021
[build-dependencies]
2122
bpf-builder = { path = "../../bpf-builder" }

0 commit comments

Comments
 (0)