Skip to content

Commit 3592539

Browse files
committed
feat: extract TraversalResumerPathState interface and allow it to be shared across traversals
1 parent 926d766 commit 3592539

File tree

4 files changed

+72
-33
lines changed

4 files changed

+72
-33
lines changed

v2/options.go

+16
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/multiformats/go-multicodec"
99

1010
"github.com/ipld/go-car/v2/internal/carv1"
11+
resumetraversal "github.com/ipld/go-car/v2/traversal"
1112
)
1213

1314
// DefaultMaxIndexCidSize specifies the maximum size in byptes accepted as a section CID by CARv2 index.
@@ -62,6 +63,7 @@ type Options struct {
6263
DataPayloadSize uint64
6364
SkipOffset uint64
6465
TraversalPrototypeChooser traversal.LinkTargetNodePrototypeChooser
66+
TraversalResumerPathState resumetraversal.TraversalResumerPathState
6567

6668
MaxAllowedHeaderSize uint64
6769
MaxAllowedSectionSize uint64
@@ -182,3 +184,17 @@ func MaxAllowedSectionSize(max uint64) Option {
182184
o.MaxAllowedSectionSize = max
183185
}
184186
}
187+
188+
// WithTraversalResumerPathState provides a custom TraversalResumerPathState
189+
// that can be reused between selective CAR creations where traversals may need
190+
// to be resumed at arbitrary points within the DAG.
191+
//
192+
// A TraversalResumerPathState shared across multiple traversals using the same
193+
// selector and DAG will yield the same state. This allows us to resume at
194+
// arbitrary points within in the DAG and load the minimal additional blocks
195+
// required to resume the traversal at that point.
196+
func WithTraversalResumerPathState(pathState resumetraversal.TraversalResumerPathState) Option {
197+
return func(o *Options) {
198+
o.TraversalResumerPathState = pathState
199+
}
200+
}

v2/selective.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,11 @@ func (tc *traversalCar) setup(ctx context.Context, ls *ipld.LinkSystem, opts Opt
321321
}
322322

323323
ls.TrustedStorage = true
324-
resumer, err := resumetraversal.WithTraversingLinksystem(&progress)
324+
pathState := opts.TraversalResumerPathState
325+
if pathState == nil {
326+
pathState = resumetraversal.NewTraversalResumerPathState()
327+
}
328+
resumer, err := resumetraversal.WithTraversingLinksystem(&progress, pathState)
325329
if err != nil {
326330
return err
327331
}

v2/traversal/resumption.go

+48-29
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,41 @@ import (
1717
"github.com/ipld/go-ipld-prime/traversal"
1818
)
1919

20+
// TraverseResumer allows resuming a progress from a previously encountered path in the selector.
21+
type TraverseResumer interface {
22+
RewindToPath(from datamodel.Path) error
23+
RewindToOffset(offset uint64) error
24+
Position() uint64
25+
}
26+
27+
// TraversalResumerPathState tracks a traversal state for the purpose of
28+
// building a CAR. For each block in the CAR it tracks the path to that block,
29+
// the Link of the block and where in the CAR the block is located.
30+
//
31+
// A TraversalResumerPathState shared across multiple traversals using the same
32+
// selector and DAG will yield the same state. This allows us to resume at
33+
// arbitrary points within in the DAG and load the minimal additional blocks
34+
// required to resume the traversal at that point.
35+
type TraversalResumerPathState interface {
36+
AddPath(path []datamodel.PathSegment, link datamodel.Link, atOffset uint64)
37+
GetLinks(root datamodel.Path) []datamodel.Link
38+
GetOffsetAfter(root datamodel.Path) (uint64, error)
39+
}
40+
2041
type pathNode struct {
2142
link datamodel.Link
2243
offset uint64
2344
children map[datamodel.PathSegment]*pathNode
2445
}
2546

47+
// NewTraversalResumerPathState creates a new TraversalResumerPathState.
48+
//
49+
// Note that the TraversalResumerPathState returned by this factory is not
50+
// thread-safe.
51+
func NewTraversalResumerPathState() TraversalResumerPathState {
52+
return newPath(nil, 0)
53+
}
54+
2655
func newPath(link datamodel.Link, at uint64) *pathNode {
2756
return &pathNode{
2857
link: link,
@@ -31,15 +60,15 @@ func newPath(link datamodel.Link, at uint64) *pathNode {
3160
}
3261
}
3362

34-
func (pn pathNode) addPath(p []datamodel.PathSegment, link datamodel.Link, at uint64) {
63+
func (pn pathNode) AddPath(p []datamodel.PathSegment, link datamodel.Link, atOffset uint64) {
3564
if len(p) == 0 {
3665
return
3766
}
3867
if _, ok := pn.children[p[0]]; !ok {
39-
child := newPath(link, at)
68+
child := newPath(link, atOffset)
4069
pn.children[p[0]] = child
4170
}
42-
pn.children[p[0]].addPath(p[1:], link, at)
71+
pn.children[p[0]].AddPath(p[1:], link, atOffset)
4372
}
4473

4574
func (pn pathNode) allLinks() []datamodel.Link {
@@ -57,7 +86,7 @@ func (pn pathNode) allLinks() []datamodel.Link {
5786
}
5887

5988
// getPaths returns reconstructed paths in the tree rooted at 'root'
60-
func (pn pathNode) getLinks(root datamodel.Path) []datamodel.Link {
89+
func (pn pathNode) GetLinks(root datamodel.Path) []datamodel.Link {
6190
segs := root.Segments()
6291
switch len(segs) {
6392
case 0:
@@ -80,12 +109,12 @@ func (pn pathNode) getLinks(root datamodel.Path) []datamodel.Link {
80109
// base case 2: not registered sub-path.
81110
return []datamodel.Link{}
82111
}
83-
return pn.children[next].getLinks(datamodel.NewPathNocopy(segs[1:]))
112+
return pn.children[next].GetLinks(datamodel.NewPathNocopy(segs[1:]))
84113
}
85114

86115
var errInvalid = fmt.Errorf("invalid path")
87116

88-
func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) {
117+
func (pn pathNode) GetOffsetAfter(root datamodel.Path) (uint64, error) {
89118
// we look for offset of next sibling.
90119
// if no next sibling recurse up the path segments until we find a next sibling.
91120
segs := root.Segments()
@@ -100,7 +129,7 @@ func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) {
100129
closest := chld.offset
101130
// try recursive path
102131
if len(segs) > 1 {
103-
co, err := chld.offsetAfter(datamodel.NewPathNocopy(segs[1:]))
132+
co, err := chld.GetOffsetAfter(datamodel.NewPathNocopy(segs[1:]))
104133
if err == nil {
105134
return co, err
106135
}
@@ -121,35 +150,28 @@ func (pn pathNode) offsetAfter(root datamodel.Path) (uint64, error) {
121150
return 0, errInvalid
122151
}
123152

124-
// TraverseResumer allows resuming a progress from a previously encountered path in the selector.
125-
type TraverseResumer interface {
126-
RewindToPath(from datamodel.Path) error
127-
RewindToOffset(offset uint64) error
128-
Position() uint64
129-
}
130-
131153
type traversalState struct {
132154
wrappedLinksystem *linking.LinkSystem
133155
lsCounter *loader.Counter
134-
blockNumber int
135-
pathOrder map[int]datamodel.Path
136-
pathTree *pathNode
156+
pathTree TraversalResumerPathState
137157
rewindPathTarget *datamodel.Path
138158
rewindOffsetTarget uint64
139159
pendingBlockStart uint64 // on rewinds, we store where the counter was in order to know the length of the last read block.
140160
progress *traversal.Progress
141161
}
142162

163+
var _ TraverseResumer = (*traversalState)(nil)
164+
143165
func (ts *traversalState) RewindToPath(from datamodel.Path) error {
144166
if ts.progress == nil {
145167
return nil
146168
}
147169
// reset progress and traverse until target.
148170
ts.progress.SeenLinks = make(map[datamodel.Link]struct{})
149-
ts.blockNumber = 0
150171
ts.pendingBlockStart = ts.lsCounter.Size()
151172
ts.lsCounter.TotalRead = 0
152173
ts.rewindPathTarget = &from
174+
ts.rewindOffsetTarget = 0
153175
return nil
154176
}
155177

@@ -163,10 +185,10 @@ func (ts *traversalState) RewindToOffset(offset uint64) error {
163185
}
164186
// reset progress and traverse until target.
165187
ts.progress.SeenLinks = make(map[datamodel.Link]struct{})
166-
ts.blockNumber = 0
167188
ts.pendingBlockStart = ts.lsCounter.Size()
168189
ts.lsCounter.TotalRead = 0
169190
ts.rewindOffsetTarget = offset
191+
ts.rewindPathTarget = nil
170192
return nil
171193
}
172194

@@ -177,9 +199,7 @@ func (ts *traversalState) Position() uint64 {
177199
func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Reader, error) {
178200
// when not in replay mode, we track metadata
179201
if ts.rewindPathTarget == nil && ts.rewindOffsetTarget == 0 {
180-
ts.pathOrder[ts.blockNumber] = lc.LinkPath
181-
ts.pathTree.addPath(lc.LinkPath.Segments(), l, ts.lsCounter.Size())
182-
ts.blockNumber++
202+
ts.pathTree.AddPath(lc.LinkPath.Segments(), l, ts.lsCounter.Size())
183203
return ts.wrappedLinksystem.StorageReadOpener(lc, l)
184204
}
185205

@@ -205,12 +225,12 @@ func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Read
205225
break
206226
}
207227
if targetSegments[i].String() != s.String() {
208-
links := ts.pathTree.getLinks(datamodel.NewPathNocopy(seg[0 : i+1]))
228+
links := ts.pathTree.GetLinks(datamodel.NewPathNocopy(seg[0 : i+1]))
209229
for _, l := range links {
210230
ts.progress.SeenLinks[l] = struct{}{}
211231
}
212232
var err error
213-
ts.lsCounter.TotalRead, err = ts.pathTree.offsetAfter(datamodel.NewPathNocopy(seg[0 : i+1]))
233+
ts.lsCounter.TotalRead, err = ts.pathTree.GetOffsetAfter(datamodel.NewPathNocopy(seg[0 : i+1]))
214234
if err == errInvalid {
215235
ts.lsCounter.TotalRead = ts.pendingBlockStart
216236
} else if err != nil {
@@ -222,12 +242,12 @@ func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Read
222242
}
223243
}
224244
if ts.rewindOffsetTarget != 0 {
225-
links := ts.pathTree.getLinks(lc.LinkPath)
245+
links := ts.pathTree.GetLinks(lc.LinkPath)
226246
for _, l := range links {
227247
ts.progress.SeenLinks[l] = struct{}{}
228248
}
229249
var err error
230-
ts.lsCounter.TotalRead, err = ts.pathTree.offsetAfter(lc.LinkPath)
250+
ts.lsCounter.TotalRead, err = ts.pathTree.GetOffsetAfter(lc.LinkPath)
231251
if err == errInvalid {
232252
ts.lsCounter.TotalRead = ts.pendingBlockStart
233253
} else if err != nil {
@@ -243,13 +263,12 @@ func (ts *traversalState) traverse(lc linking.LinkContext, l ipld.Link) (io.Read
243263
// WithTraversingLinksystem extends a progress for traversal such that it can
244264
// subsequently resume and perform subsets of the walk efficiently from
245265
// an arbitrary position within the selector traversal.
246-
func WithTraversingLinksystem(p *traversal.Progress) (TraverseResumer, error) {
266+
func WithTraversingLinksystem(p *traversal.Progress, pathState TraversalResumerPathState) (TraverseResumer, error) {
247267
wls, ctr := loader.CountingLinkSystem(p.Cfg.LinkSystem)
248268
ts := &traversalState{
249269
wrappedLinksystem: &wls,
250270
lsCounter: ctr.(*loader.Counter),
251-
pathOrder: make(map[int]datamodel.Path),
252-
pathTree: newPath(nil, 0),
271+
pathTree: pathState,
253272
progress: p,
254273
}
255274
p.Cfg.LinkSystem.StorageReadOpener = ts.traverse

v2/traversal/resumption_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func TestWalkResumeByPath(t *testing.T) {
8989
LinkTargetNodePrototypeChooser: basicnode.Chooser,
9090
},
9191
}
92-
resumer, err := cartraversal.WithTraversingLinksystem(&p)
92+
resumer, err := cartraversal.WithTraversingLinksystem(&p, cartraversal.NewTraversalResumerPathState())
9393
if err != nil {
9494
t.Fatal(err)
9595
}
@@ -154,7 +154,7 @@ func TestWalkResumeByPathPartialWalk(t *testing.T) {
154154
LinkTargetNodePrototypeChooser: basicnode.Chooser,
155155
},
156156
}
157-
resumer, err := cartraversal.WithTraversingLinksystem(&p)
157+
resumer, err := cartraversal.WithTraversingLinksystem(&p, cartraversal.NewTraversalResumerPathState())
158158
if err != nil {
159159
t.Fatal(err)
160160
}
@@ -195,7 +195,7 @@ func TestWalkResumeByOffset(t *testing.T) {
195195
LinkTargetNodePrototypeChooser: basicnode.Chooser,
196196
},
197197
}
198-
resumer, err := cartraversal.WithTraversingLinksystem(&p)
198+
resumer, err := cartraversal.WithTraversingLinksystem(&p, cartraversal.NewTraversalResumerPathState())
199199
if err != nil {
200200
t.Fatal(err)
201201
}

0 commit comments

Comments
 (0)