Skip to content

Commit 2ef4478

Browse files
Events - Timeouts -Task Cleanup
1 parent 2b5cdd3 commit 2ef4478

File tree

16 files changed

+444
-333
lines changed

16 files changed

+444
-333
lines changed

pkg/common/task/worker.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,15 @@ func NewWorker() *Worker {
109109
}
110110

111111
// cleanupLoop periodically removes tasks that are in a final state for a grace period
112+
// or any task that has been around for too long
112113
func (worker *Worker) cleanupLoop(ctx context.Context) {
113114
const (
114115
cleanupInterval = 30 * time.Second
115116
finalTaskTTL = 2 * time.Minute
117+
// maxTaskAge removes any task entry after this age, regardless of state.
118+
// Keep greater than the largest server-side task envelope (RegisterTimeout ~75m)
119+
// to avoid pruning legitimate long-running tasks from the worker registry.
120+
maxTaskAge = 2 * time.Hour
116121
)
117122

118123
ticker := time.NewTicker(cleanupInterval)
@@ -129,11 +134,17 @@ func (worker *Worker) cleanupLoop(ctx context.Context) {
129134
kept := worker.tasks[:0]
130135
for _, t := range worker.tasks {
131136
st := t.Status()
132-
if st != nil && st.SubStatus != nil && st.SubStatus.IsFinal() {
133-
if now.Sub(st.CreatedAt) >= finalTaskTTL {
134-
// drop this finalized task
137+
if st != nil {
138+
// Remove any task older than 30 minutes, regardless of state
139+
if now.Sub(st.CreatedAt) >= maxTaskAge {
135140
continue
136141
}
142+
// Also remove final tasks after 2 minutes
143+
if st.SubStatus != nil && st.SubStatus.IsFinal() {
144+
if now.Sub(st.CreatedAt) >= finalTaskTTL {
145+
continue
146+
}
147+
}
137148
}
138149
kept = append(kept, t)
139150
}

sdk/README.md

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,12 @@ if err != nil {
221221
// taskID can be used to track the download progress
222222
```
223223

224+
Behavior notes:
225+
- The SDK determines the final filename from on-chain cascade metadata and creates the path: `outputDir/<actionID>/<filename>`.
226+
- Events will include both the supernode endpoint (`event.KeySupernode`) and its Cosmos address (`event.KeySupernodeAddress`).
227+
- On success, both `SDKOutputPathReceived` and `SDKDownloadSuccessful` include `event.KeyOutputPath`.
228+
- Failures include reason-coded suffixes in the message (e.g., `| reason=timeout`) and set `event.KeyMessage` accordingly.
229+
224230
**Parameters:**
225231
- `ctx context.Context`: Context for the operation
226232
- `actionID string`: ID of the action to download
@@ -381,7 +387,7 @@ The SDK provides an event system to monitor task progress through event subscrip
381387
- `SDKRegistrationAttempt`: Attempting to register with a supernode
382388
- `SDKRegistrationFailure`: Registration with supernode failed
383389
- `SDKRegistrationSuccessful`: Successfully registered with supernode
384-
- `SDKTaskTxHashReceived`: Transaction hash received from supernode
390+
- `SDKTaskTxHashReceived`: Transaction hash received from supernode (includes endpoint in `KeySupernode` and Cosmos address in `KeySupernodeAddress`)
385391
- `SDKTaskCompleted`: Task completed successfully
386392
- `SDKTaskFailed`: Task failed with error
387393
- `SDKConnectionEstablished`: Connection to supernode established
@@ -392,9 +398,9 @@ The SDK provides an event system to monitor task progress through event subscrip
392398
- `SDKProcessingFailed`: Processing failed (reason=stream_recv|missing_final_response)
393399
- `SDKProcessingTimeout`: Processing exceeded time budget and was cancelled
394400
- `SDKDownloadAttempt`: Attempting to download from supernode
395-
- `SDKDownloadFailure`: Download attempt failed
396-
- `SDKOutputPathReceived`: File download path received
397-
- `SDKDownloadSuccessful`: Download completed successfully
401+
- `SDKDownloadFailure`: Download attempt failed (message may include `| reason=timeout|canceled`; `KeyMessage` mirrors the reason)
402+
- `SDKOutputPathReceived`: File download path received (includes `KeyOutputPath` and supernode identity keys)
403+
- `SDKDownloadSuccessful`: Download completed successfully (includes `KeyOutputPath` and supernode identity keys)
398404

399405
**Supernode Events (forwarded from supernodes):**
400406
- `SupernodeActionRetrieved`: Action retrieved from blockchain
@@ -421,8 +427,8 @@ Events may include additional data accessible through these keys:
421427

422428
- `event.KeyError`: Error message (for failure events)
423429
- `event.KeyCount`: Count of items (e.g., supernodes found)
424-
- `event.KeySupernode`: Supernode endpoint
425-
- `event.KeySupernodeAddress`: Supernode cosmos address
430+
- `event.KeySupernode`: Supernode gRPC endpoint
431+
- `event.KeySupernodeAddress`: Supernode Cosmos address
426432
- `event.KeyIteration`: Attempt iteration number
427433
- `event.KeyTxHash`: Transaction hash
428434
- `event.KeyMessage`: Event message

0 commit comments

Comments
 (0)