Skip to content

Commit dd784f2

Browse files
add even more tracing
1 parent 7b5233c commit dd784f2

File tree

1 file changed

+113
-85
lines changed

1 file changed

+113
-85
lines changed

packages/core-bridge/src/worker.rs

Lines changed: 113 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use temporal_sdk_core::{
2424
};
2525

2626
use bridge_macros::js_function;
27+
use tracing::Instrument;
2728

2829
use crate::{
2930
client::Client,
@@ -103,12 +104,15 @@ pub fn worker_validate(worker: OpaqueInboundHandle<Worker>) -> BridgeResult<Brid
103104
let worker = worker_ref.core_worker.clone();
104105
let runtime = worker_ref.core_runtime.clone();
105106

106-
runtime.future_to_promise(async move {
107-
worker
108-
.validate()
109-
.await
110-
.map_err(|err| BridgeError::TransportError(err.to_string()))
111-
})
107+
runtime.future_to_promise(
108+
async move {
109+
worker
110+
.validate()
111+
.await
112+
.map_err(|err| BridgeError::TransportError(err.to_string()))
113+
}
114+
.instrument(tracing::info_span!("worker_validate")),
115+
)
112116
}
113117

114118
/// Initiate a single workflow activation poll request.
@@ -121,19 +125,22 @@ pub fn worker_poll_workflow_activation(
121125
let worker = worker_ref.core_worker.clone();
122126
let runtime = worker_ref.core_runtime.clone();
123127

124-
runtime.future_to_promise(async move {
125-
let result = worker.poll_workflow_activation().await;
128+
runtime.future_to_promise(
129+
async move {
130+
let result = worker.poll_workflow_activation().await;
126131

127-
match result {
128-
Ok(task) => Ok(task.encode_to_vec()),
129-
Err(err) => match err {
130-
PollError::ShutDown => Err(BridgeError::WorkerShutdown)?,
131-
PollError::TonicError(status) => {
132-
Err(BridgeError::TransportError(status.message().to_string()))?
133-
}
134-
},
132+
match result {
133+
Ok(task) => Ok(task.encode_to_vec()),
134+
Err(err) => match err {
135+
PollError::ShutDown => Err(BridgeError::WorkerShutdown)?,
136+
PollError::TonicError(status) => {
137+
Err(BridgeError::TransportError(status.message().to_string()))?
138+
}
139+
},
140+
}
135141
}
136-
})
142+
.instrument(tracing::info_span!("worker_poll_workflow_activation")),
143+
)
137144
}
138145

139146
/// Submit a workflow activation completion to core.
@@ -154,21 +161,24 @@ pub fn worker_complete_workflow_activation(
154161
let worker = worker_ref.core_worker.clone();
155162
let runtime = worker_ref.core_runtime.clone();
156163

157-
runtime.future_to_promise(async move {
158-
worker
159-
.complete_workflow_activation(workflow_completion)
160-
.await
161-
.map_err(|err| match err {
162-
CompleteWfError::MalformedWorkflowCompletion { reason, run_id } => {
163-
BridgeError::TypeError {
164-
field: None,
165-
message: format!(
166-
"Malformed Workflow Completion: {reason:?} for RunID={run_id}"
167-
),
164+
runtime.future_to_promise(
165+
async move {
166+
worker
167+
.complete_workflow_activation(workflow_completion)
168+
.await
169+
.map_err(|err| match err {
170+
CompleteWfError::MalformedWorkflowCompletion { reason, run_id } => {
171+
BridgeError::TypeError {
172+
field: None,
173+
message: format!(
174+
"Malformed Workflow Completion: {reason:?} for RunID={run_id}"
175+
),
176+
}
168177
}
169-
}
170-
})
171-
})
178+
})
179+
}
180+
.instrument(tracing::info_span!("worker_complete_workflow_activation")),
181+
)
172182
}
173183

174184
/// Initiate a single activity task poll request.
@@ -181,19 +191,22 @@ pub fn worker_poll_activity_task(
181191
let worker = worker_ref.core_worker.clone();
182192
let runtime = worker_ref.core_runtime.clone();
183193

184-
runtime.future_to_promise(async move {
185-
let result = worker.poll_activity_task().await;
194+
runtime.future_to_promise(
195+
async move {
196+
let result = worker.poll_activity_task().await;
186197

187-
match result {
188-
Ok(task) => Ok(task.encode_to_vec()),
189-
Err(err) => match err {
190-
PollError::ShutDown => Err(BridgeError::WorkerShutdown)?,
191-
PollError::TonicError(status) => {
192-
Err(BridgeError::TransportError(status.message().to_string()))?
193-
}
194-
},
198+
match result {
199+
Ok(task) => Ok(task.encode_to_vec()),
200+
Err(err) => match err {
201+
PollError::ShutDown => Err(BridgeError::WorkerShutdown)?,
202+
PollError::TonicError(status) => {
203+
Err(BridgeError::TransportError(status.message().to_string()))?
204+
}
205+
},
206+
}
195207
}
196-
})
208+
.instrument(tracing::info_span!("worker_poll_activity_task")),
209+
)
197210
}
198211

199212
/// Submit an activity task completion to core.
@@ -214,20 +227,23 @@ pub fn worker_complete_activity_task(
214227
let worker = worker_ref.core_worker.clone();
215228
let runtime = worker_ref.core_runtime.clone();
216229

217-
runtime.future_to_promise(async move {
218-
worker
219-
.complete_activity_task(activity_completion)
220-
.await
221-
.map_err(|err| match err {
222-
CompleteActivityError::MalformedActivityCompletion {
223-
reason,
224-
completion: _,
225-
} => BridgeError::TypeError {
226-
field: None,
227-
message: format!("Malformed Activity Completion: {reason:?}"),
228-
},
229-
})
230-
})
230+
runtime.future_to_promise(
231+
async move {
232+
worker
233+
.complete_activity_task(activity_completion)
234+
.await
235+
.map_err(|err| match err {
236+
CompleteActivityError::MalformedActivityCompletion {
237+
reason,
238+
completion: _,
239+
} => BridgeError::TypeError {
240+
field: None,
241+
message: format!("Malformed Activity Completion: {reason:?}"),
242+
},
243+
})
244+
}
245+
.instrument(tracing::info_span!("worker_complete_activity_task")),
246+
)
231247
}
232248

233249
/// Submit an activity heartbeat to core.
@@ -260,19 +276,22 @@ pub fn worker_poll_nexus_task(
260276
let worker = worker_ref.core_worker.clone();
261277
let runtime = worker_ref.core_runtime.clone();
262278

263-
runtime.future_to_promise(async move {
264-
let result = worker.poll_nexus_task().await;
279+
runtime.future_to_promise(
280+
async move {
281+
let result = worker.poll_nexus_task().await;
265282

266-
match result {
267-
Ok(task) => Ok(task.encode_to_vec()),
268-
Err(err) => match err {
269-
PollError::ShutDown => Err(BridgeError::WorkerShutdown)?,
270-
PollError::TonicError(status) => {
271-
Err(BridgeError::TransportError(status.message().to_string()))?
272-
}
273-
},
283+
match result {
284+
Ok(task) => Ok(task.encode_to_vec()),
285+
Err(err) => match err {
286+
PollError::ShutDown => Err(BridgeError::WorkerShutdown)?,
287+
PollError::TonicError(status) => {
288+
Err(BridgeError::TransportError(status.message().to_string()))?
289+
}
290+
},
291+
}
274292
}
275-
})
293+
.instrument(tracing::info_span!("worker_poll_nexus_task")),
294+
)
276295
}
277296

278297
/// Submit an nexus task completion to core.
@@ -291,20 +310,25 @@ pub fn worker_complete_nexus_task(
291310
let worker = worker_ref.core_worker.clone();
292311
let runtime = worker_ref.core_runtime.clone();
293312

294-
runtime.future_to_promise(async move {
295-
worker
296-
.complete_nexus_task(nexus_completion)
297-
.await
298-
.map_err(|err| match err {
299-
CompleteNexusError::NexusNotEnabled {} => {
300-
BridgeError::UnexpectedError(format!("{err}"))
301-
}
302-
CompleteNexusError::MalformedNexusCompletion { reason } => BridgeError::TypeError {
303-
field: None,
304-
message: format!("Malformed nexus Completion: {reason:?}"),
305-
},
306-
})
307-
})
313+
runtime.future_to_promise(
314+
async move {
315+
worker
316+
.complete_nexus_task(nexus_completion)
317+
.await
318+
.map_err(|err| match err {
319+
CompleteNexusError::NexusNotEnabled {} => {
320+
BridgeError::UnexpectedError(format!("{err}"))
321+
}
322+
CompleteNexusError::MalformedNexusCompletion { reason } => {
323+
BridgeError::TypeError {
324+
field: None,
325+
message: format!("Malformed nexus Completion: {reason:?}"),
326+
}
327+
}
328+
})
329+
}
330+
.instrument(tracing::info_span!("worker_complete_nexus_task")),
331+
)
308332
}
309333

310334
/// Request shutdown of the worker.
@@ -313,6 +337,7 @@ pub fn worker_complete_nexus_task(
313337
/// the loop to ensure graceful shutdown.
314338
#[js_function]
315339
pub fn worker_initiate_shutdown(worker: OpaqueInboundHandle<Worker>) -> BridgeResult<()> {
340+
tracing::info!("Typescript initiate worker shutdown");
316341
let worker_ref = worker.borrow()?;
317342
worker_ref.core_worker.initiate_shutdown();
318343
Ok(())
@@ -337,10 +362,13 @@ pub fn worker_finalize_shutdown(
337362
}
338363
})?;
339364

340-
worker_ref.core_runtime.future_to_promise(async move {
341-
worker.finalize_shutdown().await;
342-
Ok(())
343-
})
365+
worker_ref.core_runtime.future_to_promise(
366+
async move {
367+
worker.finalize_shutdown().await;
368+
Ok(())
369+
}
370+
.instrument(tracing::info_span!("worker_finalize_shutdown")),
371+
)
344372
}
345373

346374
impl MutableFinalize for Worker {

0 commit comments

Comments
 (0)