-
Notifications
You must be signed in to change notification settings - Fork 53
/
Copy pathdiagnostics.go
119 lines (100 loc) · 3.01 KB
/
diagnostics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package postgresbackend
import (
"context"
"database/sql"
"time"
"github.com/cschleiden/go-workflows/core"
"github.com/cschleiden/go-workflows/diag"
)
var _ diag.Backend = (*postgresBackend)(nil)
func (mb *postgresBackend) GetWorkflowInstances(ctx context.Context, afterInstanceID, afterExecutionID string, count int) ([]*diag.WorkflowInstanceRef, error) {
var err error
tx, err := mb.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()
var rows *sql.Rows
if afterInstanceID != "" {
rows, err = tx.QueryContext(
ctx,
SQLReplacer(`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
FROM instances i
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE id = ? AND execution_id = ?) ii
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.instance_id < ii.instance_id)
ORDER BY i.created_at DESC, i.instance_id DESC
LIMIT ?`),
afterInstanceID,
afterExecutionID,
count,
)
} else {
rows, err = tx.QueryContext(
ctx,
SQLReplacer(`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
FROM instances i
ORDER BY i.created_at DESC, i.instance_id DESC
LIMIT ?`),
count,
)
}
if err != nil {
return nil, err
}
defer rows.Close()
var instances []*diag.WorkflowInstanceRef
for rows.Next() {
var id, executionID string
var createdAt time.Time
var completedAt *time.Time
err = rows.Scan(&id, &executionID, &createdAt, &completedAt)
if err != nil {
return nil, err
}
var state core.WorkflowInstanceState
if completedAt != nil {
state = core.WorkflowInstanceStateFinished
}
instances = append(instances, &diag.WorkflowInstanceRef{
Instance: core.NewWorkflowInstance(id, executionID),
CreatedAt: createdAt,
CompletedAt: completedAt,
State: state,
})
}
return instances, nil
}
func (mb *postgresBackend) GetWorkflowInstance(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceRef, error) {
tx, err := mb.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()
res := tx.QueryRowContext(
ctx,
SQLReplacer("SELECT instance_id, execution_id, created_at, completed_at FROM instances WHERE instance_id = ? AND execution_id = ?"), instance.InstanceID, instance.ExecutionID)
var id, executionID string
var createdAt time.Time
var completedAt *time.Time
err = res.Scan(&id, &executionID, &createdAt, &completedAt)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, err
}
var state core.WorkflowInstanceState
if completedAt != nil {
state = core.WorkflowInstanceStateFinished
}
return &diag.WorkflowInstanceRef{
Instance: core.NewWorkflowInstance(id, executionID),
CreatedAt: createdAt,
CompletedAt: completedAt,
State: state,
}, nil
}
func (mb *postgresBackend) GetWorkflowTree(ctx context.Context, instance *core.WorkflowInstance) (*diag.WorkflowInstanceTree, error) {
itb := diag.NewInstanceTreeBuilder(mb)
return itb.BuildWorkflowInstanceTree(ctx, instance)
}