Skip to content

Implements logging using OpenTelemetry as per #67 #103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 290 additions & 9 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ axum = "0.8.1"
chrono = { version = "0.4.38", features = ["clock"] }
serde = { version = "1.0.188", features = ["derive"] }
sqlx = { version = "0.8.3", features = ["chrono", "postgres", "runtime-tokio"] }
tokio = { version = "1.28.2", features = ["default", "macros", "rt-multi-thread"] } # For async tests
tokio = { version = "1.28.2", features = ["default", "macros", "rt-multi-thread", "signal"] } # For async tests
hmac = "0.12.1"
sha2 = "0.10.8"
hex = "0.4.3"
Expand All @@ -24,3 +24,10 @@ tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "time", "fmt", "std"] }
dotenv = "0.15.0"
time = { version = "0.3.37", features = ["formatting"] }
opentelemetry = "0.29.1"
opentelemetry_sdk = "0.29.0"
opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] }
tracing-core = "0.1.33"
tracing-opentelemetry = "0.30.0"
opentelemetry-otlp = { version = "0.29.0", features = ["grpc-tonic"] }
opentelemetry-stdout = "0.29.0"
5 changes: 5 additions & 0 deletions src/daily_task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tracing::{debug, error, info};

use crate::models::member::Member;

#[tracing::instrument]
pub async fn run_daily_task_at_midnight(pool: Arc<PgPool>) {
loop {
let now = chrono::Utc::now().with_timezone(&Kolkata);
Expand Down Expand Up @@ -37,6 +38,7 @@ pub async fn run_daily_task_at_midnight(pool: Arc<PgPool>) {
/// This function does a number of things, including:
/// * Insert new attendance records everyday for [`presense`](https://www.github.com/amfoss/presense) to update them later in the day.
/// * Update the AttendanceSummary table
#[tracing::instrument]
async fn execute_daily_task(pool: Arc<PgPool>) {
// Members is queried outside of each function to avoid repetition
let members = sqlx::query_as::<_, Member>("SELECT * FROM Member")
Expand All @@ -50,6 +52,7 @@ async fn execute_daily_task(pool: Arc<PgPool>) {
};
}

#[tracing::instrument]
async fn update_attendance(members: Vec<Member>, pool: &PgPool) {
#[allow(deprecated)]
let today = chrono::Utc::now()
Expand Down Expand Up @@ -92,6 +95,7 @@ async fn update_attendance(members: Vec<Member>, pool: &PgPool) {
}
}

#[tracing::instrument]
async fn update_attendance_summary(member_id: i32, pool: &PgPool) {
debug!("Updating summary for member #{}", member_id);
#[allow(deprecated)]
Expand Down Expand Up @@ -129,6 +133,7 @@ async fn update_attendance_summary(member_id: i32, pool: &PgPool) {
}
}

#[tracing::instrument]
async fn update_days_attended(member_id: i32, today: NaiveDate, pool: &PgPool) {
// Convert year and month into i32 cause SQLx cannot encode u32 into database types
let month: i32 = (today.month0() + 1) as i32;
Expand Down
3 changes: 2 additions & 1 deletion src/graphql/mutations/attendance_mutations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ use crate::models::attendance::{Attendance, MarkAttendanceInput};

type HmacSha256 = Hmac<Sha256>;

#[derive(Default)]
#[derive(Default, Debug)]
pub struct AttendanceMutations;

#[Object]
impl AttendanceMutations {
#[tracing::instrument(skip(ctx))]
#[graphql(name = "markAttendance")]
async fn mark_attendance(
&self,
Expand Down
5 changes: 4 additions & 1 deletion src/graphql/queries/attendance_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ use async_graphql::{Context, Object, Result};
use chrono::NaiveDate;
use sqlx::PgPool;

#[derive(Default)]
#[derive(Default, Debug)]
pub struct AttendanceQueries;

#[Object]
impl AttendanceQueries {
#[tracing::instrument(skip(ctx))]
async fn attendance(&self, ctx: &Context<'_>, member_id: i32) -> Result<Vec<Attendance>> {
let pool = ctx.data::<Arc<PgPool>>().expect("Pool must be in context.");

tracing::info!("Fetching attendance for member ID: {}", member_id);

Ok(
sqlx::query_as::<_, Attendance>("SELECT * FROM Attendance WHERE member_id = $1")
.bind(member_id)
Expand Down
6 changes: 5 additions & 1 deletion src/graphql/queries/member_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use crate::models::{
status_update_streak::StatusUpdateStreakInfo,
};

#[derive(Default)]
#[derive(Default, Debug)]
pub struct MemberQueries;

#[Object]
impl MemberQueries {
#[tracing::instrument(skip(ctx))]
pub async fn members(
&self,
ctx: &Context<'_>,
Expand Down Expand Up @@ -45,9 +46,12 @@ impl MemberQueries {

#[ComplexObject]
impl Member {
#[tracing::instrument(skip(ctx))]
async fn attendance(&self, ctx: &Context<'_>) -> Vec<AttendanceInfo> {
let pool = ctx.data::<Arc<PgPool>>().expect("Pool must be in context.");

tracing::info!("Fetching attendance for member ID: {}", self.member_id);

sqlx::query_as::<_, AttendanceInfo>(
"SELECT date, is_present, time_in, time_out FROM Attendance WHERE member_id = $1",
)
Expand Down
115 changes: 112 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ use tower_http::cors::CorsLayer;
use tracing::info;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

use opentelemetry::{global, trace::TracerProvider as _, KeyValue};
use opentelemetry_sdk::{
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
Resource,
};
use opentelemetry_semantic_conventions::{
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_NAME, SERVICE_VERSION},
SCHEMA_URL,
};
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};

use daily_task::run_daily_task_at_midnight;
use graphql::{Mutation, Query};
use routes::setup_router;
Expand Down Expand Up @@ -37,10 +49,27 @@ impl Config {
}
}

struct OtelGuard {
tracer_provider: SdkTracerProvider,
meter_provider: SdkMeterProvider,
}

impl Drop for OtelGuard {
fn drop(&mut self) {
if let Err(err) = self.tracer_provider.shutdown() {
eprintln!("{err:?}");
}
if let Err(err) = self.meter_provider.shutdown() {
eprintln!("{err:?}");
}
}
}

#[tokio::main]
#[tracing::instrument]
async fn main() {
let config = Config::from_env();
setup_tracing(&config.env);
let guard = setup_tracing(&config.env);

let pool = setup_database(&config.database_url).await;
let schema = build_graphql_schema(pool.clone(), config.secret_key);
Expand All @@ -56,10 +85,81 @@ async fn main() {
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", config.port))
.await
.unwrap();
axum::serve(listener, router).await.unwrap();

axum::serve(listener, router)
.with_graceful_shutdown(shutdown_signal())
.await
.unwrap();

drop(guard);
}

#[tracing::instrument]
async fn shutdown_signal() {
tokio::signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");

tracing::info!("Shutdown signal received. Flushing telemetry...");
}

fn setup_tracing(env: &str) {
fn resource() -> Resource {
Resource::builder()
.with_attributes(vec![
KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")),
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"),
])
.with_schema_url(Vec::new(), SCHEMA_URL)
.build()
}

fn init_meter_provider() -> SdkMeterProvider {
let exporter = opentelemetry_otlp::MetricExporter::builder()
.with_tonic()
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
.build()
.unwrap();

let reader = PeriodicReader::builder(exporter)
.with_interval(std::time::Duration::from_secs(30))
.build();

let stdout_reader =
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()).build();

let meter_provider = MeterProviderBuilder::default()
.with_resource(resource())
.with_reader(reader)
.with_reader(stdout_reader)
.build();

global::set_meter_provider(meter_provider.clone());

meter_provider
}

fn init_tracer_provider() -> SdkTracerProvider {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.build()
.unwrap();

SdkTracerProvider::builder()
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(
1.0,
))))
.with_id_generator(RandomIdGenerator::default())
.with_resource(resource())
.with_batch_exporter(exporter)
.build()
}

fn setup_tracing(env: &str) -> OtelGuard {
let tracer_provider = init_tracer_provider();
let meter_provider = init_meter_provider();
let tracer = tracer_provider.tracer("tracing-otel-subscriber");

let kolkata_offset = UtcOffset::from_hms(5, 30, 0).expect("Hardcoded offset must be correct");
let timer = fmt::time::OffsetTime::new(
kolkata_offset,
Expand All @@ -75,6 +175,8 @@ fn setup_tracing(env: &str) {
.with_ansi(false) // ANSI encodings are unreadable in the raw file.
.with_writer(std::fs::File::create("root.log").unwrap()),
)
.with(MetricsLayer::new(meter_provider.clone()))
.with(OpenTelemetryLayer::new(tracer))
.with(EnvFilter::new("info"))
.init();
info!("Running in production mode.")
Expand All @@ -93,10 +195,17 @@ fn setup_tracing(env: &str) {
.with_ansi(false)
.with_writer(std::fs::File::create("root.log").unwrap()),
)
.with(MetricsLayer::new(meter_provider.clone()))
.with(OpenTelemetryLayer::new(tracer))
.with(EnvFilter::new("trace"))
.init();
info!("Running in development mode.");
}

OtelGuard {
tracer_provider,
meter_provider,
}
}

async fn setup_database(database_url: &str) -> Arc<PgPool> {
Expand Down
2 changes: 1 addition & 1 deletion src/models/attendance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct AttendanceSummaryInfo {
pub days_attended: i32,
}

#[derive(InputObject)]
#[derive(InputObject, Debug)]
pub struct MarkAttendanceInput {
pub member_id: i32,
pub date: NaiveDate,
Expand Down
2 changes: 2 additions & 0 deletions src/models/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use sqlx::FromRow;

#[derive(Enum, Copy, Clone, Eq, PartialEq, sqlx::Type)]
#[sqlx(type_name = "sex_type")]
#[derive(Debug)]
pub enum Sex {
M,
F,
Expand All @@ -12,6 +13,7 @@ pub enum Sex {

#[derive(SimpleObject, FromRow)]
#[graphql(complex)]
#[derive(Debug)]
pub struct Member {
pub member_id: i32,
pub roll_no: String,
Expand Down
1 change: 1 addition & 0 deletions src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tower_http::cors::CorsLayer;

use crate::graphql::{Mutation, Query};

#[tracing::instrument(skip(schema, cors))]
pub fn setup_router(
schema: Schema<Query, Mutation, EmptySubscription>,
cors: CorsLayer,
Expand Down