Skip to content

Commit 7232fdc

Browse files
committed
Implements logging using OpenTelemetry
1 parent 9aa73b5 commit 7232fdc

File tree

10 files changed

+436
-17
lines changed

10 files changed

+436
-17
lines changed

Cargo.lock

Lines changed: 290 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ axum = "0.8.1"
1010
chrono = { version = "0.4.38", features = ["clock"] }
1111
serde = { version = "1.0.188", features = ["derive"] }
1212
sqlx = { version = "0.8.3", features = ["chrono", "postgres", "runtime-tokio"] }
13-
tokio = { version = "1.28.2", features = ["default", "macros", "rt-multi-thread"] } # For async tests
13+
tokio = { version = "1.28.2", features = ["default", "macros", "rt-multi-thread", "signal"] } # For async tests
1414
hmac = "0.12.1"
1515
sha2 = "0.10.8"
1616
hex = "0.4.3"
@@ -24,3 +24,10 @@ tracing = "0.1.41"
2424
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "time", "fmt", "std"] }
2525
dotenv = "0.15.0"
2626
time = { version = "0.3.37", features = ["formatting"] }
27+
opentelemetry = "0.29.1"
28+
opentelemetry_sdk = "0.29.0"
29+
opentelemetry-semantic-conventions = { version = "0.29.0", features = ["semconv_experimental"] }
30+
tracing-core = "0.1.33"
31+
tracing-opentelemetry = "0.30.0"
32+
opentelemetry-otlp = { version = "0.29.0", features = ["grpc-tonic"] }
33+
opentelemetry-stdout = "0.29.0"

src/daily_task/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use tracing::{debug, error, info};
77

88
use crate::models::member::Member;
99

10+
11+
#[tracing::instrument]
1012
pub async fn run_daily_task_at_midnight(pool: Arc<PgPool>) {
1113
loop {
1214
let now = chrono::Utc::now().with_timezone(&Kolkata);
@@ -37,6 +39,7 @@ pub async fn run_daily_task_at_midnight(pool: Arc<PgPool>) {
3739
/// This function does a number of things, including:
3840
/// * Insert new attendance records everyday for [`presense`](https://www.github.com/amfoss/presense) to update them later in the day.
3941
/// * Update the AttendanceSummary table
42+
#[tracing::instrument]
4043
async fn execute_daily_task(pool: Arc<PgPool>) {
4144
// Members is queried outside of each function to avoid repetition
4245
let members = sqlx::query_as::<_, Member>("SELECT * FROM Member")
@@ -50,6 +53,8 @@ async fn execute_daily_task(pool: Arc<PgPool>) {
5053
};
5154
}
5255

56+
57+
#[tracing::instrument]
5358
async fn update_attendance(members: Vec<Member>, pool: &PgPool) {
5459
#[allow(deprecated)]
5560
let today = chrono::Utc::now()
@@ -92,6 +97,7 @@ async fn update_attendance(members: Vec<Member>, pool: &PgPool) {
9297
}
9398
}
9499

100+
#[tracing::instrument]
95101
async fn update_attendance_summary(member_id: i32, pool: &PgPool) {
96102
debug!("Updating summary for member #{}", member_id);
97103
#[allow(deprecated)]
@@ -129,6 +135,7 @@ async fn update_attendance_summary(member_id: i32, pool: &PgPool) {
129135
}
130136
}
131137

138+
#[tracing::instrument]
132139
async fn update_days_attended(member_id: i32, today: NaiveDate, pool: &PgPool) {
133140
// Convert year and month into i32 cause SQLx cannot encode u32 into database types
134141
let month: i32 = (today.month0() + 1) as i32;

src/graphql/mutations/attendance_mutations.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ use crate::models::attendance::{Attendance, MarkAttendanceInput};
1111

1212
type HmacSha256 = Hmac<Sha256>;
1313

14-
#[derive(Default)]
14+
#[derive(Default, Debug)]
1515
pub struct AttendanceMutations;
1616

1717
#[Object]
1818
impl AttendanceMutations {
19+
#[tracing::instrument(skip(ctx))]
1920
#[graphql(name = "markAttendance")]
2021
async fn mark_attendance(
2122
&self,

src/graphql/queries/attendance_queries.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@ use async_graphql::{Context, Object, Result};
55
use chrono::NaiveDate;
66
use sqlx::PgPool;
77

8-
#[derive(Default)]
8+
#[derive(Default, Debug)]
99
pub struct AttendanceQueries;
1010

1111
#[Object]
1212
impl AttendanceQueries {
13+
#[tracing::instrument(skip(ctx))]
1314
async fn attendance(&self, ctx: &Context<'_>, member_id: i32) -> Result<Vec<Attendance>> {
1415
let pool = ctx.data::<Arc<PgPool>>().expect("Pool must be in context.");
1516

17+
tracing::info!("Fetching attendance for member ID: {}", member_id);
18+
1619
Ok(
1720
sqlx::query_as::<_, Attendance>("SELECT * FROM Attendance WHERE member_id = $1")
1821
.bind(member_id)

src/graphql/queries/member_queries.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ use crate::models::{
99
status_update_streak::StatusUpdateStreakInfo,
1010
};
1111

12-
#[derive(Default)]
12+
#[derive(Default, Debug)]
1313
pub struct MemberQueries;
1414

1515
#[Object]
1616
impl MemberQueries {
17+
#[tracing::instrument(skip(ctx))]
1718
pub async fn members(
1819
&self,
1920
ctx: &Context<'_>,
@@ -45,8 +46,11 @@ impl MemberQueries {
4546

4647
#[ComplexObject]
4748
impl Member {
49+
#[tracing::instrument(skip(ctx))]
4850
async fn attendance(&self, ctx: &Context<'_>) -> Vec<AttendanceInfo> {
4951
let pool = ctx.data::<Arc<PgPool>>().expect("Pool must be in context.");
52+
53+
tracing::info!("Fetching attendance for member ID: {}", self.member_id);
5054

5155
sqlx::query_as::<_, AttendanceInfo>(
5256
"SELECT date, is_present, time_in, time_out FROM Attendance WHERE member_id = $1",

src/main.rs

Lines changed: 116 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,19 @@ use tower_http::cors::CorsLayer;
77
use tracing::info;
88
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
99

10+
use opentelemetry::{global, trace::TracerProvider as _, KeyValue};
11+
use opentelemetry_sdk::{
12+
metrics::{MeterProviderBuilder, PeriodicReader, SdkMeterProvider},
13+
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
14+
Resource,
15+
};
16+
use opentelemetry_semantic_conventions::{
17+
attribute::{DEPLOYMENT_ENVIRONMENT_NAME, SERVICE_NAME, SERVICE_VERSION},
18+
SCHEMA_URL,
19+
};
20+
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
21+
22+
1023
use daily_task::run_daily_task_at_midnight;
1124
use graphql::{Mutation, Query};
1225
use routes::setup_router;
@@ -37,10 +50,27 @@ impl Config {
3750
}
3851
}
3952

53+
struct OtelGuard {
54+
tracer_provider: SdkTracerProvider,
55+
meter_provider: SdkMeterProvider,
56+
}
57+
58+
impl Drop for OtelGuard {
59+
fn drop(&mut self) {
60+
if let Err(err) = self.tracer_provider.shutdown() {
61+
eprintln!("{err:?}");
62+
}
63+
if let Err(err) = self.meter_provider.shutdown() {
64+
eprintln!("{err:?}");
65+
}
66+
}
67+
}
68+
4069
#[tokio::main]
70+
#[tracing::instrument]
4171
async fn main() {
4272
let config = Config::from_env();
43-
setup_tracing(&config.env);
73+
let guard = setup_tracing(&config.env);
4474

4575
let pool = setup_database(&config.database_url).await;
4676
let schema = build_graphql_schema(pool.clone(), config.secret_key);
@@ -56,10 +86,84 @@ async fn main() {
5686
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", config.port))
5787
.await
5888
.unwrap();
59-
axum::serve(listener, router).await.unwrap();
89+
90+
axum::serve(listener, router)
91+
.with_graceful_shutdown(shutdown_signal())
92+
.await
93+
.unwrap();
94+
95+
drop(guard);
96+
97+
}
98+
99+
#[tracing::instrument]
100+
async fn shutdown_signal() {
101+
// Wait for Ctrl-C
102+
tokio::signal::ctrl_c()
103+
.await
104+
.expect("failed to install Ctrl+C handler");
105+
106+
tracing::info!("Shutdown signal received. Flushing telemetry...");
107+
108+
// Flush traces and metrics
109+
// guard.tracer_provider.shutdown().unwrap();
110+
}
111+
112+
fn resource() -> Resource {
113+
Resource::builder()
114+
.with_attributes(vec![
115+
KeyValue::new(SERVICE_NAME, env!("CARGO_PKG_NAME")),
116+
KeyValue::new(SERVICE_VERSION, env!("CARGO_PKG_VERSION")),
117+
KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"),
118+
])
119+
.with_schema_url(Vec::new(), SCHEMA_URL)
120+
.build()
121+
}
122+
123+
fn init_meter_provider() -> SdkMeterProvider {
124+
let exporter = opentelemetry_otlp::MetricExporter::builder()
125+
.with_tonic()
126+
.with_temporality(opentelemetry_sdk::metrics::Temporality::default())
127+
.build()
128+
.unwrap();
129+
130+
let reader = PeriodicReader::builder(exporter)
131+
.with_interval(std::time::Duration::from_secs(30))
132+
.build();
133+
134+
let stdout_reader =
135+
PeriodicReader::builder(opentelemetry_stdout::MetricExporter::default()).build();
136+
137+
let meter_provider = MeterProviderBuilder::default()
138+
.with_resource(resource())
139+
.with_reader(reader)
140+
.with_reader(stdout_reader)
141+
.build();
142+
143+
global::set_meter_provider(meter_provider.clone());
144+
145+
meter_provider
146+
}
147+
148+
fn init_tracer_provider() -> SdkTracerProvider {
149+
let exporter = opentelemetry_otlp::SpanExporter::builder()
150+
.with_tonic()
151+
.build()
152+
.unwrap();
153+
154+
SdkTracerProvider::builder()
155+
.with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased(1.0))))
156+
.with_id_generator(RandomIdGenerator::default())
157+
.with_resource(resource())
158+
.with_batch_exporter(exporter)
159+
.build()
60160
}
61161

62-
fn setup_tracing(env: &str) {
162+
fn setup_tracing(env: &str) -> OtelGuard {
163+
let tracer_provider = init_tracer_provider();
164+
let meter_provider = init_meter_provider();
165+
let tracer = tracer_provider.tracer("tracing-otel-subscriber");
166+
63167
let kolkata_offset = UtcOffset::from_hms(5, 30, 0).expect("Hardcoded offset must be correct");
64168
let timer = fmt::time::OffsetTime::new(
65169
kolkata_offset,
@@ -75,6 +179,8 @@ fn setup_tracing(env: &str) {
75179
.with_ansi(false) // ANSI encodings are unreadable in the raw file.
76180
.with_writer(std::fs::File::create("root.log").unwrap()),
77181
)
182+
.with(MetricsLayer::new(meter_provider.clone()))
183+
.with(OpenTelemetryLayer::new(tracer))
78184
.with(EnvFilter::new("info"))
79185
.init();
80186
info!("Running in production mode.")
@@ -93,10 +199,17 @@ fn setup_tracing(env: &str) {
93199
.with_ansi(false)
94200
.with_writer(std::fs::File::create("root.log").unwrap()),
95201
)
202+
.with(MetricsLayer::new(meter_provider.clone()))
203+
.with(OpenTelemetryLayer::new(tracer))
96204
.with(EnvFilter::new("trace"))
97205
.init();
98206
info!("Running in development mode.");
99207
}
208+
209+
OtelGuard {
210+
tracer_provider,
211+
meter_provider,
212+
}
100213
}
101214

102215
async fn setup_database(database_url: &str) -> Arc<PgPool> {

src/models/attendance.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ pub struct AttendanceSummaryInfo {
3939
pub days_attended: i32,
4040
}
4141

42-
#[derive(InputObject)]
42+
#[derive(InputObject, Debug)]
4343
pub struct MarkAttendanceInput {
4444
pub member_id: i32,
4545
pub date: NaiveDate,

src/models/member.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use sqlx::FromRow;
44

55
#[derive(Enum, Copy, Clone, Eq, PartialEq, sqlx::Type)]
66
#[sqlx(type_name = "sex_type")]
7+
#[derive(Debug)]
78
pub enum Sex {
89
M,
910
F,
@@ -12,6 +13,7 @@ pub enum Sex {
1213

1314
#[derive(SimpleObject, FromRow)]
1415
#[graphql(complex)]
16+
#[derive(Debug)]
1517
pub struct Member {
1618
pub member_id: i32,
1719
pub roll_no: String,

src/routes.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use tower_http::cors::CorsLayer;
99

1010
use crate::graphql::{Mutation, Query};
1111

12+
#[tracing::instrument(skip(schema, cors))]
1213
pub fn setup_router(
1314
schema: Schema<Query, Mutation, EmptySubscription>,
1415
cors: CorsLayer,

0 commit comments

Comments
 (0)