Skip to content
This repository has been archived by the owner on Apr 3, 2019. It is now read-only.

Commit

Permalink
chore(queues): use mozlog and failure in queues process (#161) r=@vla…
Browse files Browse the repository at this point in the history
…dikoff,@philbooth

Fixes #10
Fixes #99
Connects to #116

This was my first experience messing with the queues part of the service, I thought dealing with these issues first would make the experience smoother. I added the standard logging and error handling with the failure crate just like the rest of the lib.

Since our errors were very much centered around requests and rocket, we have some fields that don't really make sense for the queues process, like the http code, so let me know what you think about that and if the queues process should have a different error format. I also did another minor change to the errors: now the response JSON will return a number for code and errno, instead of a string.

For logging, I implemented an AppErrorFields so that we get better structured logs for our AppErrors. I used slog_scope to make it easier to have a global logger for the queues process, let me know what you think about that... In the README.md for that crate, they have some warnings about it not being the best idea all the time, but anyways, I thought it was neat and worked well for our case.

Finally, when I started working in the queues they were not really working due to parsing errors, you will see that I changed a little bit the SQS Notification struct, that was for parsing to work, also I created a notification-dev queue in the AWS Console, because it didn't exist yet, I think everything is working fine now.
  • Loading branch information
brizental authored and vladikoff committed Aug 9, 2018
1 parent 95acb7c commit 501a591
Show file tree
Hide file tree
Showing 22 changed files with 388 additions and 273 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ sha2 = ">=0.7.1"
slog = ">=2.2.3"
slog-async = ">=2.3.0"
slog-mozlog-json = ">=0.1.0"
slog-scope = ">=4.0.1"
slog-term = ">=2.4.0"
socketlabs = ">=0.1.3"
118 changes: 102 additions & 16 deletions src/app_errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use rocket::{
self,
http::Status,
response::{self, Responder, Response},
Request,
Outcome, Request, State,
};
use rocket_contrib::{Json, JsonValue};
use serde_json::{map::Map, ser::to_string, Value};
Expand Down Expand Up @@ -45,15 +45,18 @@ impl AppError {
let mut fields = Map::new();
fields.insert(
String::from("code"),
Value::String(format!("{}", status.code)),
Value::Number(status.code.to_owned().into()),
);
fields.insert(
String::from("error"),
Value::String(format!("{}", status.reason)),
);
let errno = kind.errno();
if let Some(ref errno) = errno {
fields.insert(String::from("errno"), Value::String(format!("{}", errno)));
fields.insert(
String::from("errno"),
Value::Number(errno.to_owned().into()),
);
fields.insert(String::from("message"), Value::String(format!("{}", self)));
};
let additional_fields = kind.additional_fields();
Expand Down Expand Up @@ -111,10 +114,10 @@ pub enum AppErrorKind {
RocketError(rocket::Error),

/// An error for invalid email params in the /send handler.
#[fail(display = "Error validating email params.")]
#[fail(display = "Error validating email params")]
InvalidEmailParams,
/// An error for missing email params in the /send handler.
#[fail(display = "Missing email params.")]
#[fail(display = "Missing email params")]
MissingEmailParams(String),

/// An error for invalid provider names.
Expand All @@ -128,29 +131,67 @@ pub enum AppErrorKind {
EmailParsingError(String),

/// An error for when a bounce violation happens.
#[fail(display = "Email account sent complaint.")]
#[fail(display = "Email account sent complaint")]
BounceComplaintError {
address: String,
bounce: Option<BounceRecord>,
},
#[fail(display = "Email account soft bounced.")]
#[fail(display = "Email account soft bounced")]
BounceSoftError {
address: String,
bounce: Option<BounceRecord>,
},
#[fail(display = "Email account hard bounced.")]
#[fail(display = "Email account hard bounced")]
BounceHardError {
address: String,
bounce: Option<BounceRecord>,
},

/// An error for when an error happens on a request to the db.
#[fail(display = "{:?}", _0)]
#[fail(display = "{}", _0)]
DbError(String),

/// An error for when an error happens on the queues process.
#[fail(display = "{}", _0)]
QueueError(String),
/// An error for when we get an invalid notification type in the queues
/// process.
#[fail(display = "Invalid notification type")]
InvalidNotificationType,
/// An error for when we get notification without a payload in the queues
/// process.
#[fail(display = "Missing payload in {} notification", _0)]
MissingNotificationPayload(String),

/// An error for when we get SQS messages with missing fields.
#[fail(display = "Missing SQS message {} field", field)]
MissingSqsMessageFields { queue: String, field: String },
/// An error for when the SQS message body does not match MD5 hash.
#[fail(display = "Message body does not match MD5 hash")]
SqsMessageHashMismatch {
queue: String,
hash: String,
body: String,
},
/// An error for when we can't parse the SQS message.
#[fail(display = "SQS message parsing error")]
SqsMessageParsingError {
queue: String,
message: String,
body: String,
},

/// An error for when we get and invalid duration string.
#[fail(display = "invalid duration: {}", _0)]
DurationError(String),

/// An error for when we get erros in the message_data module.
#[fail(display = "{}", _0)]
MessageDataError(String),

/// An error for when we try to access functionality that is not
/// implemented.
#[fail(display = "Feature not implemented.")]
#[fail(display = "Feature not implemented")]
NotImplemented,
}

Expand All @@ -171,7 +212,7 @@ impl AppErrorKind {
}
}

pub fn errno(&self) -> Option<i32> {
pub fn errno(&self) -> Option<u16> {
match self {
AppErrorKind::RocketError(_) => Some(100),

Expand All @@ -188,7 +229,18 @@ impl AppErrorKind {

AppErrorKind::DbError(_) => Some(109),

AppErrorKind::NotImplemented => Some(110),
AppErrorKind::QueueError(_) => Some(110),
AppErrorKind::InvalidNotificationType => Some(111),
AppErrorKind::MissingNotificationPayload(_) => Some(112),

AppErrorKind::MissingSqsMessageFields { .. } => Some(113),
AppErrorKind::SqsMessageHashMismatch { .. } => Some(114),
AppErrorKind::SqsMessageParsingError { .. } => Some(115),

AppErrorKind::DurationError(_) => Some(116),
AppErrorKind::MessageDataError(_) => Some(117),

AppErrorKind::NotImplemented => Some(118),

AppErrorKind::BadRequest
| AppErrorKind::NotFound
Expand Down Expand Up @@ -229,6 +281,35 @@ impl AppErrorKind {
);
}
}

AppErrorKind::MissingSqsMessageFields {
ref queue,
ref field,
} => {
fields.insert(String::from("queue"), Value::String(queue.to_owned()));
fields.insert(String::from("field"), Value::String(field.to_owned()));
}

AppErrorKind::SqsMessageHashMismatch {
ref queue,
ref hash,
ref body,
} => {
fields.insert(String::from("queue"), Value::String(queue.to_owned()));
fields.insert(String::from("hash"), Value::String(hash.to_owned()));
fields.insert(String::from("body"), Value::String(body.to_owned()));
}

AppErrorKind::SqsMessageParsingError {
ref queue,
ref message,
ref body,
} => {
fields.insert(String::from("queue"), Value::String(queue.to_owned()));
fields.insert(String::from("message"), Value::String(message.to_owned()));
fields.insert(String::from("body"), Value::String(body.to_owned()));
}

_ => (),
}
fields
Expand All @@ -250,12 +331,17 @@ impl From<Context<AppErrorKind>> for AppError {
/// Generate HTTP error responses for AppErrors
impl<'r> Responder<'r> for AppError {
fn respond_to(self, request: &Request) -> response::Result<'r> {
match request.guard::<State<MozlogLogger>>() {
Outcome::Success(logger) => {
let log = MozlogLogger::with_app_error(&logger, &self)
.map_err(|_| Status::InternalServerError)?;
slog_error!(log, "{}", "Request errored");
}
_ => panic!("Internal error: No managed MozlogLogger"),
}

let status = self.kind().http_status();
let json = Json(self.json());

let log = MozlogLogger::with_request(request).map_err(|_| Status::InternalServerError)?;
slog_error!(log, "{}", json.to_string());

let mut builder = Response::build_from(json.respond_to(request)?);
builder.status(status).ok()
}
Expand Down
2 changes: 1 addition & 1 deletion src/auth_db/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ fn get_bounces_invalid_address() {
let db = DbClient::new(&settings);
match db.get_bounces("") {
Ok(_) => assert!(false, "DbClient::get_bounces should have failed"),
Err(error) => assert_eq!(format!("{}", error), "\"400 Bad Request\""),
Err(error) => assert_eq!(format!("{}", error), "400 Bad Request"),
}
}

Expand Down
33 changes: 26 additions & 7 deletions src/bin/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,23 @@ extern crate fxa_email_service;
extern crate lazy_static;

use futures::future::{self, Future, Loop};
#[macro_use(
slog_b,
slog_error,
slog_info,
slog_kv,
slog_log,
slog_record,
slog_record_static
)]
extern crate slog;
#[macro_use]
extern crate slog_scope;

use fxa_email_service::{
queues::{QueueError, QueueIds, Queues, Sqs},
app_errors::AppError,
logging::MozlogLogger,
queues::{QueueIds, Queues, Sqs},
settings::Settings,
};

Expand All @@ -39,20 +53,25 @@ lazy_static! {
};
}

type LoopResult = Box<Future<Item = Loop<usize, usize>, Error = QueueError>>;
type LoopResult = Box<Future<Item = Loop<usize, usize>, Error = AppError>>;

fn main() {
let logger = MozlogLogger::new(&SETTINGS).expect("MozlogLogger::init error");
let _guard = slog_scope::set_global_logger(logger.0);
let process_queues: &Fn(usize) -> LoopResult = &|previous_count: usize| {
let future = QUEUES
.process()
.or_else(|error: QueueError| {
println!("{:?}", error);
.or_else(move |error: AppError| {
let logger = MozlogLogger(slog_scope::logger());
let log = MozlogLogger::with_app_error(&logger, &error)
.expect("MozlogLogger::with_app_error error");
slog_error!(log, "{}", "Error processing queue");
future::ok(0)
}).and_then(move |count: usize| {
let total_count = count + previous_count;
println!(
"Processed {} messages, total message count is now {}",
count, total_count
info!(
"Succesfully processed queue message";
"processed_messages" => count, "total_messages" => total_count
);
Ok(Loop::Continue(total_count))
});
Expand Down
4 changes: 2 additions & 2 deletions src/bin/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ fn main() {
.attach(rocket::fairing::AdHoc::on_request(|request, _| {
let log =
MozlogLogger::with_request(request).expect("MozlogLogger::with_request error");
slog_info!(log, "{}", "Request started.");
slog_info!(log, "{}", "Request started");
}))
.attach(rocket::fairing::AdHoc::on_response(|request, response| {
let log =
MozlogLogger::with_request(request).expect("MozlogLogger::with_request error");
if response.status().code == 200 {
slog_info!(log, "{}", "Request finished succesfully.";
slog_info!(log, "{}", "Request finished succesfully";
"status_code" => response.status().code, "status_msg" => response.status().reason);
}
}))
Expand Down
Loading

0 comments on commit 501a591

Please sign in to comment.