Skip to content
This repository has been archived by the owner on Apr 3, 2019. It is now read-only.

chore(queues): use mozlog and failure in queues process #161

Merged
merged 9 commits into from
Aug 9, 2018
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ sha2 = ">=0.7.1"
slog = ">=2.2.3"
slog-async = ">=2.3.0"
slog-mozlog-json = ">=0.1.0"
slog-scope = ">=4.0.1"
slog-term = ">=2.4.0"
socketlabs = ">=0.1.3"
118 changes: 102 additions & 16 deletions src/app_errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use rocket::{
self,
http::Status,
response::{self, Responder, Response},
Request,
Outcome, Request, State,
};
use rocket_contrib::{Json, JsonValue};
use serde_json::{map::Map, ser::to_string, Value};
Expand Down Expand Up @@ -45,15 +45,18 @@ impl AppError {
let mut fields = Map::new();
fields.insert(
String::from("code"),
Value::String(format!("{}", status.code)),
Value::Number(status.code.to_owned().into()),
);
fields.insert(
String::from("error"),
Value::String(format!("{}", status.reason)),
);
let errno = kind.errno();
if let Some(ref errno) = errno {
fields.insert(String::from("errno"), Value::String(format!("{}", errno)));
fields.insert(
String::from("errno"),
Value::Number(errno.to_owned().into()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

);
fields.insert(String::from("message"), Value::String(format!("{}", self)));
};
let additional_fields = kind.additional_fields();
Expand Down Expand Up @@ -111,10 +114,10 @@ pub enum AppErrorKind {
RocketError(rocket::Error),

/// An error for invalid email params in the /send handler.
#[fail(display = "Error validating email params.")]
#[fail(display = "Error validating email params")]
InvalidEmailParams,
/// An error for missing email params in the /send handler.
#[fail(display = "Missing email params.")]
#[fail(display = "Missing email params")]
MissingEmailParams(String),

/// An error for invalid provider names.
Expand All @@ -128,29 +131,67 @@ pub enum AppErrorKind {
EmailParsingError(String),

/// An error for when a bounce violation happens.
#[fail(display = "Email account sent complaint.")]
#[fail(display = "Email account sent complaint")]
BounceComplaintError {
address: String,
bounce: Option<BounceRecord>,
},
#[fail(display = "Email account soft bounced.")]
#[fail(display = "Email account soft bounced")]
BounceSoftError {
address: String,
bounce: Option<BounceRecord>,
},
#[fail(display = "Email account hard bounced.")]
#[fail(display = "Email account hard bounced")]
BounceHardError {
address: String,
bounce: Option<BounceRecord>,
},

/// An error for when an error happens on a request to the db.
#[fail(display = "{:?}", _0)]
#[fail(display = "{}", _0)]
DbError(String),

/// An error for when an error happens on the queues process.
#[fail(display = "{}", _0)]
QueueError(String),
/// An error for when we get an invalid notification type in the queues
/// process.
#[fail(display = "Invalid notification type")]
InvalidNotificationType,
/// An error for when we get notification without a payload in the queues
/// process.
#[fail(display = "Missing payload in {} notification", _0)]
MissingNotificationPayload(String),

/// An error for when we get SQS messages with missing fields.
#[fail(display = "Missing SQS message {} field", field)]
MissingSqsMessageFields { queue: String, field: String },
/// An error for when the SQS message body does not match MD5 hash.
#[fail(display = "Message body does not match MD5 hash")]
SqsMessageHashMismatch {
queue: String,
hash: String,
body: String,
},
/// An error for when we can't parse the SQS message.
#[fail(display = "SQS message parsing error")]
SqsMessageParsingError {
queue: String,
message: String,
body: String,
},

/// An error for when we get and invalid duration string.
#[fail(display = "invalid duration: {}", _0)]
DurationError(String),

/// An error for when we get erros in the message_data module.
#[fail(display = "{}", _0)]
MessageDataError(String),

/// An error for when we try to access functionality that is not
/// implemented.
#[fail(display = "Feature not implemented.")]
#[fail(display = "Feature not implemented")]
NotImplemented,
}

Expand All @@ -171,7 +212,7 @@ impl AppErrorKind {
}
}

pub fn errno(&self) -> Option<i32> {
pub fn errno(&self) -> Option<u16> {
match self {
AppErrorKind::RocketError(_) => Some(100),

Expand All @@ -188,7 +229,18 @@ impl AppErrorKind {

AppErrorKind::DbError(_) => Some(109),

AppErrorKind::NotImplemented => Some(110),
AppErrorKind::QueueError(_) => Some(110),
AppErrorKind::InvalidNotificationType => Some(111),
AppErrorKind::MissingNotificationPayload(_) => Some(112),

AppErrorKind::MissingSqsMessageFields { .. } => Some(113),
AppErrorKind::SqsMessageHashMismatch { .. } => Some(114),
AppErrorKind::SqsMessageParsingError { .. } => Some(115),

AppErrorKind::DurationError(_) => Some(116),
AppErrorKind::MessageDataError(_) => Some(117),

AppErrorKind::NotImplemented => Some(118),

AppErrorKind::BadRequest
| AppErrorKind::NotFound
Expand Down Expand Up @@ -229,6 +281,35 @@ impl AppErrorKind {
);
}
}

AppErrorKind::MissingSqsMessageFields {
ref queue,
ref field,
} => {
fields.insert(String::from("queue"), Value::String(queue.to_owned()));
fields.insert(String::from("field"), Value::String(field.to_owned()));
}

AppErrorKind::SqsMessageHashMismatch {
ref queue,
ref hash,
ref body,
} => {
fields.insert(String::from("queue"), Value::String(queue.to_owned()));
fields.insert(String::from("hash"), Value::String(hash.to_owned()));
fields.insert(String::from("body"), Value::String(body.to_owned()));
}

AppErrorKind::SqsMessageParsingError {
ref queue,
ref message,
ref body,
} => {
fields.insert(String::from("queue"), Value::String(queue.to_owned()));
fields.insert(String::from("message"), Value::String(message.to_owned()));
fields.insert(String::from("body"), Value::String(body.to_owned()));
}

_ => (),
}
fields
Expand All @@ -250,12 +331,17 @@ impl From<Context<AppErrorKind>> for AppError {
/// Generate HTTP error responses for AppErrors
impl<'r> Responder<'r> for AppError {
fn respond_to(self, request: &Request) -> response::Result<'r> {
match request.guard::<State<MozlogLogger>>() {
Outcome::Success(logger) => {
let log = MozlogLogger::with_app_error(&logger, &self)
.map_err(|_| Status::InternalServerError)?;
slog_error!(log, "{}", "Request errored");
}
_ => panic!("Internal error: No managed MozlogLogger"),
}

let status = self.kind().http_status();
let json = Json(self.json());

let log = MozlogLogger::with_request(request).map_err(|_| Status::InternalServerError)?;
slog_error!(log, "{}", json.to_string());

let mut builder = Response::build_from(json.respond_to(request)?);
builder.status(status).ok()
}
Expand Down
2 changes: 1 addition & 1 deletion src/auth_db/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ fn get_bounces_invalid_address() {
let db = DbClient::new(&settings);
match db.get_bounces("") {
Ok(_) => assert!(false, "DbClient::get_bounces should have failed"),
Err(error) => assert_eq!(format!("{}", error), "\"400 Bad Request\""),
Err(error) => assert_eq!(format!("{}", error), "400 Bad Request"),
}
}

Expand Down
33 changes: 26 additions & 7 deletions src/bin/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,23 @@ extern crate fxa_email_service;
extern crate lazy_static;

use futures::future::{self, Future, Loop};
#[macro_use(
slog_b,
slog_error,
slog_info,
slog_kv,
slog_log,
slog_record,
slog_record_static
)]
extern crate slog;
#[macro_use]
extern crate slog_scope;

use fxa_email_service::{
queues::{QueueError, QueueIds, Queues, Sqs},
app_errors::AppError,
logging::MozlogLogger,
queues::{QueueIds, Queues, Sqs},
settings::Settings,
};

Expand All @@ -39,20 +53,25 @@ lazy_static! {
};
}

type LoopResult = Box<Future<Item = Loop<usize, usize>, Error = QueueError>>;
type LoopResult = Box<Future<Item = Loop<usize, usize>, Error = AppError>>;

fn main() {
let logger = MozlogLogger::new(&SETTINGS).expect("MozlogLogger::init error");
let _guard = slog_scope::set_global_logger(logger.0);
let process_queues: &Fn(usize) -> LoopResult = &|previous_count: usize| {
let future = QUEUES
.process()
.or_else(|error: QueueError| {
println!("{:?}", error);
.or_else(move |error: AppError| {
let logger = MozlogLogger(slog_scope::logger());
let log = MozlogLogger::with_app_error(&logger, &error)
.expect("MozlogLogger::with_app_error error");
slog_error!(log, "{}", "Error processing queue");
future::ok(0)
}).and_then(move |count: usize| {
let total_count = count + previous_count;
println!(
"Processed {} messages, total message count is now {}",
count, total_count
info!(
"Succesfully processed queue message";
"processed_messages" => count, "total_messages" => total_count
);
Ok(Loop::Continue(total_count))
});
Expand Down
4 changes: 2 additions & 2 deletions src/bin/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ fn main() {
.attach(rocket::fairing::AdHoc::on_request(|request, _| {
let log =
MozlogLogger::with_request(request).expect("MozlogLogger::with_request error");
slog_info!(log, "{}", "Request started.");
slog_info!(log, "{}", "Request started");
}))
.attach(rocket::fairing::AdHoc::on_response(|request, response| {
let log =
MozlogLogger::with_request(request).expect("MozlogLogger::with_request error");
if response.status().code == 200 {
slog_info!(log, "{}", "Request finished succesfully.";
slog_info!(log, "{}", "Request finished succesfully";
"status_code" => response.status().code, "status_msg" => response.status().reason);
}
}))
Expand Down
Loading