Skip to content

Commit 8c99e85

Browse files
committed
prevent "NewPartial" and "UndoPartial" steps from getting to user through cursor in dataOutput, fix partial blocks output in v4
1 parent ee2adf0 commit 8c99e85

3 files changed

Lines changed: 33 additions & 7 deletions

File tree

pipeline/pipeline.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -919,25 +919,51 @@ func (p *Pipeline) cleanUpModuleExecutors(ctx context.Context, logger *zap.Logge
919919
return nil
920920
}
921921

922+
// normalizedOpaqueCursor returns a opaque cursor string without some
923+
// esoteric steps like 'NewPartial' and 'UndoPartial' which may not be supported by some clients
924+
func normalizedOpaqueCursor(cursor bstream.Cursor) string {
925+
switch cursor.Step {
926+
case bstream.StepNewPartial:
927+
cursor.Step = bstream.StepNew
928+
case bstream.StepUndoPartial:
929+
cursor.Step = bstream.StepUndo
930+
}
931+
return cursor.ToOpaque()
932+
}
933+
922934
func returnPartialDataOutput(
923935
clock *pbsubstreams.Clock,
924936
cursor *bstream.Cursor,
925937
mapModuleOutput *pbsubstreamsrpc.MapModuleOutput,
926938
respFunc substreams.ResponseFunc,
927939
partialIdx uint32,
928940
lastPartial bool,
941+
supportBuffering bool,
929942
) error {
930943

931944
out := &pbsubstreamsrpc.BlockScopedData{
932945
Clock: clock,
933-
Cursor: cursor.ToOpaque(),
946+
Cursor: normalizedOpaqueCursor(*cursor),
934947
FinalBlockHeight: cursor.LIB.Num(),
935948
Output: mapModuleOutput,
936949
IsPartial: true,
937950
PartialIndex: &partialIdx,
938951
IsLastPartial: &lastPartial,
939952
}
940953

954+
if supportBuffering { //v4 support
955+
bsd := &pbsubstreamsrpcv4.BlockScopedDatas{
956+
Items: []*pbsubstreamsrpc.BlockScopedData{
957+
out,
958+
},
959+
}
960+
961+
if err := respFunc(substreams.NewBlockScopedDatasResponse(bsd)); err != nil {
962+
return fmt.Errorf("calling return func: %w", err)
963+
}
964+
return nil
965+
}
966+
941967
if err := respFunc(substreams.NewBlockScopedDataResponse(out)); err != nil {
942968
return fmt.Errorf("calling response func: %w", err)
943969
}
@@ -966,7 +992,7 @@ func returnModuleDataOutputs(
966992
Output: mapModuleOutput,
967993
DebugMapOutputs: extraMapModuleOutputs,
968994
DebugStoreOutputs: extraStoreModuleOutputs,
969-
Cursor: cursor.ToOpaque(),
995+
Cursor: normalizedOpaqueCursor(*cursor),
970996
FinalBlockHeight: cursor.LIB.Num(),
971997
}
972998

pipeline/process_block.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ func (p *Pipeline) handleUndo(clock *pbsubstreams.Clock, cursor *bstream.Cursor,
272272
Message: &pbsubstreamsrpc.Response_BlockUndoSignal{
273273
BlockUndoSignal: &pbsubstreamsrpc.BlockUndoSignal{
274274
LastValidBlock: targetClock,
275-
LastValidCursor: targetCursor.ToOpaque(),
275+
LastValidCursor: normalizedOpaqueCursor(*targetCursor),
276276
},
277277
},
278278
})
@@ -399,7 +399,7 @@ func (p *Pipeline) handleStepPartial(ctx context.Context, clock *pbsubstreams.Cl
399399

400400
mapModuleOutput := normalizeModuleOutput(p.mapModuleOutput, reqDetails.OutputModule)
401401

402-
if err = returnPartialDataOutput(clock, cursor, mapModuleOutput, p.respFunc, uint32(idx), isLast); err != nil {
402+
if err = returnPartialDataOutput(clock, cursor, mapModuleOutput, p.respFunc, uint32(idx), isLast, p.supportBuffering); err != nil {
403403
return fmt.Errorf("failed to return module data output: %w", err)
404404
}
405405

pipeline/resolve.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func resolveStartBlockNum(ctx context.Context, req *pbsubstreamsrpc.Request, res
222222
Number: cursor.Block.Num(),
223223
Id: cursor.Block.ID(),
224224
},
225-
LastValidCursor: cursor.ToOpaque(),
225+
LastValidCursor: normalizedOpaqueCursor(*cursor),
226226
}
227227
}
228228

@@ -255,7 +255,7 @@ func resolveStartBlockNum(ctx context.Context, req *pbsubstreamsrpc.Request, res
255255

256256
undoSignal = &pbsubstreamsrpc.BlockUndoSignal{
257257
LastValidBlock: blockRefToPB(reorgJunctionBlock),
258-
LastValidCursor: resolvedCursor.ToOpaque(),
258+
LastValidCursor: normalizedOpaqueCursor(*resolvedCursor),
259259
}
260260
}
261261

@@ -267,7 +267,7 @@ func resolveStartBlockNum(ctx context.Context, req *pbsubstreamsrpc.Request, res
267267
resolvedStartBlockNum = resolvedCursor.Block.Num()
268268
}
269269

270-
return resolvedStartBlockNum, resolvedCursor.ToOpaque(), undoSignal, nil
270+
return resolvedStartBlockNum, normalizedOpaqueCursor(*resolvedCursor), undoSignal, nil
271271
}
272272

273273
type CursorResolver func(context.Context, *bstream.Cursor) (reorgJunctionBlock, currentHead bstream.BlockRef, err error)

0 commit comments

Comments
 (0)