Skip to content

Commit 1c6d3ac

Browse files
authored
feat: include latest probe in lineage response (#180)
- Return node attributes in Lineage response for GetGraph call. Node attributes include latest probe for asset nodes in the Graph. This enables showing live lineage. - Use context for gRPC dial and compass service handler in server. - Add support for fetching probes with filters from asset repository. Supported filters: - AssetURNs - zero or more asset URNs. - MaxRows - Number of rows for each Asset URN ordered by latest first. - OlderThan,NewerThan - Data range. - Fix some lint errors
1 parent 899286e commit 1c6d3ac

17 files changed

+1078
-71
lines changed

Diff for: cli/utils.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"encoding/json"
55
"errors"
66
"fmt"
7-
"io/ioutil"
7+
"os"
88
"path/filepath"
99
"strings"
1010

@@ -13,7 +13,7 @@ import (
1313
)
1414

1515
func parseFile(filePath string, v protoreflect.ProtoMessage) error {
16-
b, err := ioutil.ReadFile(filePath)
16+
b, err := os.ReadFile(filePath)
1717
if err != nil {
1818
return err
1919
}

Diff for: core/asset/asset.go

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type Repository interface {
2222
DeleteByURN(ctx context.Context, urn string) error
2323
AddProbe(ctx context.Context, assetURN string, probe *Probe) error
2424
GetProbes(ctx context.Context, assetURN string) ([]Probe, error)
25+
GetProbesWithFilter(ctx context.Context, flt ProbesFilter) (map[string][]Probe, error)
2526
}
2627

2728
// Asset is a model that wraps arbitrary data with Compass' context

Diff for: core/asset/lineage.go

+13
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ type LineageRepository interface {
3333

3434
type LineageGraph []LineageEdge
3535

36+
type Lineage struct {
37+
Edges []LineageEdge `json:"edges"`
38+
NodeAttrs map[string]NodeAttributes `json:"node_attrs"`
39+
}
40+
3641
type LineageEdge struct {
3742
// Source represents source's node ID
3843
Source string `json:"source"`
@@ -43,3 +48,11 @@ type LineageEdge struct {
4348
// Prop is a map containing extra information about the edge
4449
Prop map[string]interface{} `json:"prop"`
4550
}
51+
52+
type NodeAttributes struct {
53+
Probes ProbesInfo `json:"probes"`
54+
}
55+
56+
type ProbesInfo struct {
57+
Latest Probe `json:"latest"`
58+
}

Diff for: core/asset/mocks/asset_repository.go

+82-32
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: core/asset/probes_filter.go

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package asset
2+
3+
import "time"
4+
5+
type ProbesFilter struct {
6+
AssetURNs []string
7+
MaxRows int
8+
NewerThan time.Time
9+
OlderThan time.Time
10+
}

Diff for: core/asset/service.go

+39-2
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,30 @@ func (s *Service) AddProbe(ctx context.Context, assetURN string, probe *Probe) e
108108
return s.assetRepository.AddProbe(ctx, assetURN, probe)
109109
}
110110

111-
func (s *Service) GetLineage(ctx context.Context, urn string, query LineageQuery) (LineageGraph, error) {
112-
return s.lineageRepository.GetGraph(ctx, urn, query)
111+
func (s *Service) GetLineage(ctx context.Context, urn string, query LineageQuery) (Lineage, error) {
112+
edges, err := s.lineageRepository.GetGraph(ctx, urn, query)
113+
if err != nil {
114+
return Lineage{}, fmt.Errorf("get lineage: get graph edges: %w", err)
115+
}
116+
117+
urns := newUniqueStrings(len(edges))
118+
urns.add(urn)
119+
for _, edge := range edges {
120+
urns.add(edge.Source, edge.Target)
121+
}
122+
123+
assetProbes, err := s.assetRepository.GetProbesWithFilter(ctx, ProbesFilter{
124+
AssetURNs: urns.list(),
125+
MaxRows: 1,
126+
})
127+
if err != nil {
128+
return Lineage{}, fmt.Errorf("get lineage: get latest probes: %w", err)
129+
}
130+
131+
return Lineage{
132+
Edges: edges,
133+
NodeAttrs: buildNodeAttrs(assetProbes),
134+
}, nil
113135
}
114136

115137
func (s *Service) GetTypes(ctx context.Context, flt Filter) (map[Type]int, error) {
@@ -131,3 +153,18 @@ func isValidUUID(u string) bool {
131153
_, err := uuid.Parse(u)
132154
return err == nil
133155
}
156+
157+
func buildNodeAttrs(assetProbes map[string][]Probe) map[string]NodeAttributes {
158+
nodeAttrs := make(map[string]NodeAttributes, len(assetProbes))
159+
for urn, probes := range assetProbes {
160+
if len(probes) == 0 {
161+
continue
162+
}
163+
164+
nodeAttrs[urn] = NodeAttributes{
165+
Probes: ProbesInfo{Latest: probes[0]},
166+
}
167+
}
168+
169+
return nodeAttrs
170+
}

0 commit comments

Comments
 (0)