Skip to content

Commit

Permalink
💄 Run cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
Shark committed Mar 9, 2022
1 parent 137c1a5 commit 10ef6b8
Show file tree
Hide file tree
Showing 16 changed files with 291 additions and 198 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ jobs:
- name: cargo fmt
uses: actions-rs/cargo@v1
with:
command: fmt --check
command: fmt
args: --check
7 changes: 7 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
repos:
- repo: https://github.com/doublify/pre-commit-rust
rev: eeee35a89e69d5772bdee97db1a6a898467b686e
hooks:
- id: fmt
- id: cargo-check
- id: clippy
6 changes: 5 additions & 1 deletion src/app/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ pub struct Config {
pub bind_port: u16,

/// Comma-separated list of insecure OCI registry hosts
#[clap(long = "insecure-oci-registries", env = "INSECURE_OCI_REGISTRIES", use_value_delimiter = true)]
#[clap(
long = "insecure-oci-registries",
env = "INSECURE_OCI_REGISTRIES",
use_value_delimiter = true
)]
pub insecure_oci_registries: Vec<String>,

#[clap(long = "fs-cache-dir", env = "FS_CACHE_DIR")]
Expand Down
6 changes: 3 additions & 3 deletions src/app/dependencies.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::app::config::Config;
use crate::app::wasm::{self, cache, cache::ModuleCache};
use clap::Parser;
use std::path::PathBuf;
use std::sync::Arc;
use clap::Parser;
use wasmtime::Engine;
use crate::app::config::Config;
use crate::app::wasm::{self, cache, cache::ModuleCache};

pub trait DependencyProvider {
fn get_config(&self) -> &Config;
Expand Down
8 changes: 4 additions & 4 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod model;
pub mod web;
pub mod wasm;
pub mod config;
pub mod tracing;
pub mod dependencies;
pub mod model;
pub mod tracing;
pub mod wasm;
pub mod web;
6 changes: 3 additions & 3 deletions src/app/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use serde::{Serialize, Deserialize};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;

#[derive(Deserialize, Debug)]
#[allow(dead_code)]
Expand Down Expand Up @@ -75,7 +75,7 @@ pub struct WasmPluginConfig {
#[allow(dead_code)]
pub enum ModuleSource {
#[serde(rename = "oci")]
OCI(String)
OCI(String),
}

#[derive(Deserialize, Debug)]
Expand Down
46 changes: 27 additions & 19 deletions src/app/tracing.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,54 @@
use anyhow::anyhow;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{filter::{EnvFilter, LevelFilter}, prelude::*};
use tracing_subscriber::filter::dynamic_filter_fn;
use tracing_subscriber::{
filter::{EnvFilter, LevelFilter},
prelude::*,
};

pub fn setup(
debug: bool,
enable_telemetry: bool,
) -> anyhow::Result<()> {
let filter = EnvFilter::default()
.add_directive(LevelFilter::INFO.into());
pub fn setup(debug: bool, enable_telemetry: bool) -> anyhow::Result<()> {
let filter = EnvFilter::default().add_directive(LevelFilter::INFO.into());

let filter = match debug {
true => filter
.add_directive("tower_http=debug".parse().expect("parse directive"))
.add_directive("wasm_workflow_executor=debug".parse().expect("parse directive")),
.add_directive(
"wasm_workflow_executor=debug"
.parse()
.expect("parse directive"),
),
false => filter,
};

let telemetry : Option<OpenTelemetryLayer<_, _>> = match enable_telemetry {
let telemetry: Option<OpenTelemetryLayer<_, _>> = match enable_telemetry {
true => Some(telemetry()?),
false => None,
};

tracing_subscriber::registry()
.with(telemetry)
.with(tracing_subscriber::fmt::layer().compact().with_filter(dynamic_filter_fn(move |metadata, ctx| {
filter.enabled(metadata, ctx.clone())
})))
.with(
tracing_subscriber::fmt::layer()
.compact()
.with_filter(dynamic_filter_fn(move |metadata, ctx| {
filter.enabled(metadata, ctx.clone())
})),
)
.init();

Ok(())
}

fn telemetry() -> anyhow::Result<tracing_opentelemetry::OpenTelemetryLayer<
tracing_subscriber::Registry,
opentelemetry::sdk::trace::Tracer,
>> {
fn telemetry() -> anyhow::Result<
tracing_opentelemetry::OpenTelemetryLayer<
tracing_subscriber::Registry,
opentelemetry::sdk::trace::Tracer,
>,
> {
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("wasm-workflow-executor")
.install_simple().map_err(
|err| anyhow!(err).context("opentelemetry_jaeger setup failed")
)?;
.install_simple()
.map_err(|err| anyhow!(err).context("opentelemetry_jaeger setup failed"))?;

// Create a tracing layer with the configured tracer
Ok(tracing_opentelemetry::layer().with_tracer(tracer))
Expand Down
77 changes: 52 additions & 25 deletions src/app/wasm/cache.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use anyhow::{anyhow, Result};
use itertools::Itertools;
use std::fs;
use std::fs::File;
use std::io::ErrorKind;
use std::os::unix::fs::MetadataExt;
use std::path::PathBuf;
use std::time::SystemTime;
use anyhow::{Result, anyhow};
use itertools::Itertools;

pub trait ModuleCache {
fn get(&self, image: &str) -> Result<Option<Vec<u8>>>;
Expand All @@ -14,23 +14,24 @@ pub trait ModuleCache {
}

pub fn new_fs_cache(base_dir: PathBuf) -> FSCache {
FSCache {
base_dir,
}
FSCache { base_dir }
}

#[derive(Debug)]
pub struct FSCache {
base_dir: PathBuf
base_dir: PathBuf,
}

impl FSCache {
fn canonical_name(image: &str) -> String {
image.chars().map(|c| match c {
'/' => '-',
':' => '-',
_ => c,
}).collect()
image
.chars()
.map(|c| match c {
'/' => '-',
':' => '-',
_ => c,
})
.collect()
}
}

Expand All @@ -46,10 +47,10 @@ impl ModuleCache for FSCache {
Err(err) => {
if err.kind() == ErrorKind::NotFound {
tracing::debug!("Cache miss for {:?}", path);
return Ok(None)
return Ok(None);
}
return Err(err.into());
},
}
};
tracing::debug!("Cache hit for {:?}", path);
let buf = zstd::stream::decode_all(f)?;
Expand All @@ -71,17 +72,24 @@ impl ModuleCache for FSCache {
let paths = fs::read_dir(&self.base_dir)?;
// paths.collect::<Result<Vec<_>, _>>()?
// .iter():
let files = paths
let files = paths
.collect::<Result<Vec<_>, _>>()?
.iter()
.map(|path| {
let metadata = match path.metadata() {
Ok(meta) => meta,
Err(err) => return Err(anyhow!(err).context(format!("Reading file \"{:?}\" failed", path)))
Err(err) => {
return Err(
anyhow!(err).context(format!("Reading file \"{:?}\" failed", path))
)
}
};
let modified = match metadata.modified() {
Ok(modified) => modified,
Err(err) => return Err(anyhow!(err).context(format!("Reading mtime of file \"{:?}\" failed", path)))
Err(err) => {
return Err(anyhow!(err)
.context(format!("Reading mtime of file \"{:?}\" failed", path)))
}
};
Ok(CachedFile {
path: path.path(),
Expand All @@ -93,24 +101,43 @@ impl ModuleCache for FSCache {

let total_size: u64 = files.iter().map(|f| f.size_mib).sum();
if total_size < max_size_mib {
tracing::debug!("Not purging cache, total cached files: {}MiB, max size: {}MiB", total_size, max_size_mib);
return Ok(())
tracing::debug!(
"Not purging cache, total cached files: {}MiB, max size: {}MiB",
total_size,
max_size_mib
);
return Ok(());
}

let mut deleted_mib: u64 = 0;
for (i, file) in files.
into_iter()
.sorted_by(|a,b| a.modified_at.cmp(&b.modified_at))
.enumerate() {
for (i, file) in files
.into_iter()
.sorted_by(|a, b| a.modified_at.cmp(&b.modified_at))
.enumerate()
{
match fs::remove_file(&file.path) {
Ok(_) => (),
Err(err) => return Err(anyhow!(err).context(format!("Deleting file \"{:?}\" failed", &file.path)))
Err(err) => {
return Err(
anyhow!(err).context(format!("Deleting file \"{:?}\" failed", &file.path))
)
}
}
deleted_mib += file.size_mib;
if total_size - deleted_mib <= max_size_mib {
tracing::info!("Cached purged, deleted {}MiB in {} files, now using {}MiB out of {}MiB", deleted_mib, i, total_size - deleted_mib, max_size_mib);
tracing::info!(
"Cached purged, deleted {}MiB in {} files, now using {}MiB out of {}MiB",
deleted_mib,
i,
total_size - deleted_mib,
max_size_mib
);
} else {
tracing::debug!("Deleted file #{} ({:?}), continuing to delete", i, &file.path)
tracing::debug!(
"Deleted file #{} ({:?}), continuing to delete",
i,
&file.path
)
}
}

Expand Down
25 changes: 13 additions & 12 deletions src/app/wasm/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,28 @@ pub async fn fetch_oci_image(name: &str, allowed_insecure: Vec<String>) -> anyho
// TODO add auth support
let img = oci_distribution::Reference::from_str(name)?;
let auth = oci_distribution::secrets::RegistryAuth::Anonymous;
let protocol =
oci_distribution::client::ClientProtocol::HttpsExcept(allowed_insecure.to_vec());
let protocol = oci_distribution::client::ClientProtocol::HttpsExcept(allowed_insecure.to_vec());
let config = oci_distribution::client::ClientConfig {
protocol,
..Default::default()
};
let mut oci_client = oci_distribution::Client::new(config);
// TODO add pull timeout
let img_data = oci_client.pull(
&img,
&auth,
vec![
"application/vnd.module.wasm.content.layer.v1+wasm",
"application/vnd.wasm.content.layer.v1+wasm",
"application/vnd.oci.image.layer.v1.tar"
]).await?;
let img_data = oci_client
.pull(
&img,
&auth,
vec![
"application/vnd.module.wasm.content.layer.v1+wasm",
"application/vnd.wasm.content.layer.v1+wasm",
"application/vnd.oci.image.layer.v1.tar",
],
)
.await?;
let content = img_data
.layers
.iter()
.map(|l| l.data.clone())
.flatten()
.flat_map(|l| l.data.clone())
.collect::<Vec<_>>();

Ok(content)
Expand Down
Loading

0 comments on commit 10ef6b8

Please sign in to comment.