Skip to content

Commit

Permalink
fix: fix accumulated clippy warnings (txpipe#700)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Sep 9, 2023
1 parent 57b90cc commit aba91d0
Show file tree
Hide file tree
Showing 17 changed files with 44 additions and 43 deletions.
4 changes: 2 additions & 2 deletions src/filters/deno/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ pub struct Worker {
runtime: WrappedRuntime,
}

const SYNC_CALL_SNIPPET: &'static str = r#"Deno[Deno.internal].core.ops.op_put_record(mapEvent(Deno[Deno.internal].core.ops.op_pop_record()));"#;
const SYNC_CALL_SNIPPET: &str = r#"Deno[Deno.internal].core.ops.op_put_record(mapEvent(Deno[Deno.internal].core.ops.op_pop_record()));"#;

const ASYNC_CALL_SNIPPET: &'static str = r#"mapEvent(Deno[Deno.internal].core.ops.op_pop_record()).then(x => Deno[Deno.internal].core.ops.op_put_record(x));"#;
const ASYNC_CALL_SNIPPET: &str = r#"mapEvent(Deno[Deno.internal].core.ops.op_pop_record()).then(x => Deno[Deno.internal].core.ops.op_put_record(x));"#;

impl Worker {
async fn map_record(
Expand Down
2 changes: 1 addition & 1 deletion src/filters/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/filters/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand Down
10 changes: 3 additions & 7 deletions src/filters/legacy_v1/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl EventWriter<'_> {
record.output_count = outputs.len();
record.total_output = outputs.iter().map(|o| o.amount).sum();

let inputs: Vec<_> = tx.inputs().iter().map(|x| TxInputRecord::from(x)).collect();
let inputs: Vec<_> = tx.inputs().iter().map(TxInputRecord::from).collect();

record.input_count = inputs.len();

Expand All @@ -207,11 +207,7 @@ impl EventWriter<'_> {

record.mint_count = mints.len();

let collateral_inputs: Vec<_> = tx
.collateral()
.iter()
.map(|x| TxInputRecord::from(x))
.collect();
let collateral_inputs: Vec<_> = tx.collateral().iter().map(TxInputRecord::from).collect();

record.collateral_input_count = collateral_inputs.len();

Expand Down Expand Up @@ -280,7 +276,7 @@ impl EventWriter<'_> {
record.plutus_data = tx
.plutus_data()
.iter()
.map(|x| PlutusDatumRecord::from(x))
.map(PlutusDatumRecord::from)
.collect::<Vec<_>>()
.into();

Expand Down
4 changes: 2 additions & 2 deletions src/filters/legacy_v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand All @@ -48,7 +48,7 @@ gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
&mut buffer,
);

writer.crawl_cbor(&cbor)?;
writer.crawl_cbor(cbor)?;
}
ChainEvent::Reset(point) => {
let mut writer = EventWriter::new(
Expand Down
2 changes: 1 addition & 1 deletion src/filters/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/filters/parse_cbor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/filters/split_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/filters/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Worker::default()
Self
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/sinks/file_rotate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ pub struct Stage {

#[derive(Debug, Deserialize, Clone)]
pub enum Format {
JSONL,
#[serde(rename = "JSONL")]
Jsonl,
}

#[derive(Default, Debug, Deserialize)]
Expand Down
12 changes: 10 additions & 2 deletions src/sinks/gcp_cloudfunction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ struct Claims {
pub iat: u64,
pub exp: u64,
}

impl Claims {
pub fn new(audience: &String, credentials: &Credentials) -> Self {
pub fn new(audience: &str, credentials: &Credentials) -> Self {
let iat = jsonwebtoken::get_current_timestamp();
let exp = iat + 60;

Self {
iss: credentials.client_email.clone(),
aud: credentials.token_uri.clone(),
target_audience: audience.clone(),
target_audience: audience.to_owned(),
iat,
exp,
}
Expand All @@ -47,6 +49,7 @@ struct Credentials {
pub token_uri: String,
pub private_key: String,
}

impl TryFrom<serde_json::Value> for Credentials {
type Error = Error;

Expand Down Expand Up @@ -77,6 +80,7 @@ pub struct GCPAuth {
audience: String,
token: Option<String>,
}

impl GCPAuth {
pub fn try_new(audience: String) -> Result<Self, Error> {
let client = reqwest::ClientBuilder::new()
Expand Down Expand Up @@ -267,21 +271,25 @@ impl From<std::env::VarError> for Error {
Error::Config(value.to_string())
}
}

impl From<std::io::Error> for Error {
fn from(value: std::io::Error) -> Self {
Error::Config(value.to_string())
}
}

impl From<serde_json::Error> for Error {
fn from(value: serde_json::Error) -> Self {
Error::Config(value.to_string())
}
}

impl From<reqwest::Error> for Error {
fn from(value: reqwest::Error) -> Self {
Error::Custom(value.to_string())
}
}

impl From<jsonwebtoken::errors::Error> for Error {
fn from(value: jsonwebtoken::errors::Error) -> Self {
Error::Custom(value.to_string())
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/rabbitmq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl gasket::framework::Worker<Stage> for Worker {
self.channel
.basic_publish(
&stage.config.exchange,
&stage.config.routing_key.clone().unwrap_or(String::new()),
&stage.config.routing_key.clone().unwrap_or_default(),
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
Expand Down
2 changes: 0 additions & 2 deletions src/sinks/stdout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ impl gasket::framework::Worker<Stage> for Worker {
#[derive(Stage)]
#[stage(name = "sink-stdout", unit = "ChainEvent", worker = "Worker")]
pub struct Stage {
config: Config,
cursor: Cursor,

pub input: MapperInputPort,
Expand All @@ -66,7 +65,6 @@ pub struct Config;
impl Config {
pub fn bootstrapper(self, ctx: &Context) -> Result<Stage, Error> {
let stage = Stage {
config: self,
cursor: ctx.cursor.clone(),
ops_count: Default::default(),
latest_block: Default::default(),
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/terminal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ impl gasket::framework::Worker<Stage> for Worker {

let line = match unit {
ChainEvent::Apply(_, record) => {
LogLine::new_apply(&record, width, &stage.config.adahandle_policy)
LogLine::new_apply(record, width, &stage.config.adahandle_policy)
}
ChainEvent::Undo(_, record) => {
LogLine::new_undo(&record, width, &stage.config.adahandle_policy)
LogLine::new_undo(record, width, &stage.config.adahandle_policy)
}
ChainEvent::Reset(point) => LogLine::new_reset(point.clone()),
};
Expand Down
4 changes: 2 additions & 2 deletions src/sources/n2c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl Worker {
) -> Result<(), WorkerError> {
match next {
NextResponse::RollForward(cbor, tip) => {
let block = MultiEraBlock::decode(&cbor).or_panic()?;
let block = MultiEraBlock::decode(cbor).or_panic()?;
let slot = block.slot();
let hash = block.hash();

Expand All @@ -114,7 +114,7 @@ impl Worker {

stage
.output
.send(ChainEvent::reset(point.clone()).into())
.send(ChainEvent::reset(point.clone()))
.await
.or_panic()?;

Expand Down
2 changes: 1 addition & 1 deletion src/sources/n2n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Worker {

stage
.output
.send(ChainEvent::reset(point.clone()).into())
.send(ChainEvent::reset(point.clone()))
.await
.or_panic()?;

Expand Down
28 changes: 13 additions & 15 deletions src/sources/utxorpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ impl Worker {
}

async fn next_dump_history(&mut self) -> Result<WorkSchedule<Vec<Action>>, WorkerError> {
let mut dump_history_request = DumpHistoryRequest::default();
dump_history_request.start_token = self.block_ref.clone();
dump_history_request.max_items = self.max_items_per_page;
let dump_history_request = DumpHistoryRequest {
start_token: self.block_ref.clone(),
max_items: self.max_items_per_page,
..Default::default()
};

let result = self
.client
Expand All @@ -156,11 +158,11 @@ impl Worker {
self.block_ref = result.next_token;

if !result.block.is_empty() {
let actions: Vec<Action> = result.block.into_iter().map(|b| Action::Apply(b)).collect();
let actions: Vec<Action> = result.block.into_iter().map(Action::Apply).collect();
return Ok(WorkSchedule::Unit(actions));
}

return Ok(WorkSchedule::Idle);
Ok(WorkSchedule::Idle)
}
}

Expand All @@ -185,14 +187,10 @@ impl gasket::framework::Worker<Stage> for Worker {
};
}

let block_ref = if let Some((slot, hash)) = point {
let mut block_ref = BlockRef::default();
block_ref.index = slot;
block_ref.hash = hash.into();
Some(block_ref)
} else {
None
};
let block_ref = point.map(|(slot, hash)| BlockRef {
index: slot,
hash: hash.into(),
});

let max_items_per_page = stage.config.max_items_per_page.unwrap_or(20);

Expand All @@ -206,10 +204,10 @@ impl gasket::framework::Worker<Stage> for Worker {

async fn schedule(&mut self, _: &mut Stage) -> Result<WorkSchedule<Vec<Action>>, WorkerError> {
if self.block_ref.is_some() {
return Ok(self.next_dump_history().await?);
return self.next_dump_history().await;
}

Ok(self.next_stream().await?)
self.next_stream().await
}

async fn execute(&mut self, unit: &Vec<Action>, stage: &mut Stage) -> Result<(), WorkerError> {
Expand Down

0 comments on commit aba91d0

Please sign in to comment.