Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions ingest/ledgerbackend/buffered_meta_pipe_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import (
"bufio"
"fmt"
"io"
"time"

"github.com/pkg/errors"
xdr3 "github.com/stellar/go-xdr/xdr3"

"github.com/stellar/go-stellar-sdk/support/log"
"github.com/stellar/go-stellar-sdk/xdr"
Expand Down Expand Up @@ -36,6 +36,9 @@
// available. When catching up and small buffer this can increase the overall
// time because ledgers are not available.
ledgerReadAheadBufferSize = 20
// maxLedgerMetaFrameSize is the maximum allowed size for a single
// LedgerCloseMeta XDR frame read from the captive core pipe.
maxLedgerMetaFrameSize = 256 * 1024 * 1024 // 256 MB
)

type metaResult struct {
Expand All @@ -54,29 +57,27 @@
// while previous ledger are being processed.
// - Limits memory usage in case of large ledgers are closed by the network.
//
// Internally, it keeps two buffers: bufio.Reader with binary ledger data and
// buffered channel with unmarshaled xdr.LedgerCloseMeta objects ready for
// processing. The first buffer removes overhead time connected to reading from
// a file. The second buffer allows unmarshaling binary data into XDR objects
// (which can be a bottleneck) while clients are processing previous ledgers.
// It reads each framed XDR record into a reusable byte buffer first, then
// decodes from that buffer. This ensures the XDR decoder always sees a
// bounded input (bytes.NewReader implements Len()), so all InputLen() guards
// in the generated DecodeFrom methods are active.
//
// Finally, when a large ledger (larger than binary buffer) is closed it waits
// until xdr.LedgerCloseMeta objects channel is empty. This prevents memory
// exhaustion when network closes a series a large ledgers.
// When a large ledger (larger than the binary buffer) is closed it waits
// until the xdr.LedgerCloseMeta channel is empty. This prevents memory
// exhaustion when the network closes a series of large ledgers.
type bufferedLedgerMetaReader struct {
r *bufio.Reader
c chan metaResult
decoder *xdr3.Decoder
r *bufio.Reader
c chan metaResult
frameBuf []byte // reusable frame buffer
}

// newBufferedLedgerMetaReader creates a new meta reader that will shutdown
// when stellar-core terminates.
func newBufferedLedgerMetaReader(reader io.Reader) *bufferedLedgerMetaReader {
r := bufio.NewReaderSize(reader, metaPipeBufferSize)
return &bufferedLedgerMetaReader{
c: make(chan metaResult, ledgerReadAheadBufferSize),
r: r,
decoder: xdr3.NewDecoder(r),
c: make(chan metaResult, ledgerReadAheadBufferSize),
r: r,
}
}

Expand All @@ -86,19 +87,33 @@
// - The next ledger available in the buffer exceeds the meta pipe buffer size.
// In such case the method will block until LedgerCloseMeta buffer is empty.
func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMeta, error) {
frameLength, err := xdr.ReadFrameLength(b.decoder)
frameLength, err := xdr.ReadFrameLength(b.r)
if err != nil {
return nil, errors.Wrap(err, "error reading frame length")
}

if frameLength > maxLedgerMetaFrameSize {
return nil, fmt.Errorf("LedgerCloseMeta frame too large: %d bytes (max %d)", frameLength, maxLedgerMetaFrameSize)
}

for frameLength > metaPipeBufferSize && len(b.c) > 0 {
// Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage.
<-time.After(time.Second)
}

// Grow/reuse the frame buffer.
if uint32(cap(b.frameBuf)) < frameLength {

Check failure on line 105 in ingest/ledgerbackend/buffered_meta_pipe_reader.go

View workflow job for this annotation

GitHub Actions / golangci

G115: integer overflow conversion int -> uint32 (gosec)
b.frameBuf = make([]byte, frameLength)
} else {
b.frameBuf = b.frameBuf[:frameLength]
}

if _, err = io.ReadFull(b.r, b.frameBuf); err != nil {
return nil, errors.Wrap(err, "reading LedgerCloseMeta frame body")
}

var xlcm xdr.LedgerCloseMeta
_, err = xlcm.DecodeFrom(b.decoder, xdr3.DecodeDefaultMaxDepth)
if err != nil {
if err = xdr.SafeUnmarshal(b.frameBuf, &xlcm); err != nil {
return nil, errors.Wrap(err, "unmarshaling framed LedgerCloseMeta")
}
return &xlcm, nil
Expand Down
69 changes: 69 additions & 0 deletions ingest/ledgerbackend/buffered_meta_pipe_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package ledgerbackend

import (
"bytes"
"encoding/binary"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/stellar/go-stellar-sdk/xdr"
)

func createTestLedgerCloseMeta(seq uint32) xdr.LedgerCloseMeta {
return xdr.LedgerCloseMeta{
V: int32(0),
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(seq),
},
},
},
}
}

func TestReadLedgerMetaFromPipe(t *testing.T) {
lcm := createTestLedgerCloseMeta(1234)

var buf bytes.Buffer
require.NoError(t, xdr.MarshalFramed(&buf, lcm))

reader := newBufferedLedgerMetaReader(&buf)
result, err := reader.readLedgerMetaFromPipe()
require.NoError(t, err)
assert.Equal(t, uint32(1234), uint32(result.LedgerHeaderHistoryEntry().Header.LedgerSeq))
}

func TestReadLedgerMetaFromPipeFrameTooLarge(t *testing.T) {
var buf bytes.Buffer
// Write a frame header with length exceeding maxLedgerMetaFrameSize.
// The high bit marks the last fragment per RFC 5531 record marking.
frameHeader := uint32(0x80000000) | (maxLedgerMetaFrameSize + 1)
require.NoError(t, binary.Write(&buf, binary.BigEndian, frameHeader))

reader := newBufferedLedgerMetaReader(&buf)
_, err := reader.readLedgerMetaFromPipe()
require.Error(t, err)
assert.Contains(t, err.Error(), "frame too large")
}

func TestReadLedgerMetaFromPipeMultipleFrames(t *testing.T) {
lcm1 := createTestLedgerCloseMeta(100)
lcm2 := createTestLedgerCloseMeta(200)

var buf bytes.Buffer
require.NoError(t, xdr.MarshalFramed(&buf, lcm1))
require.NoError(t, xdr.MarshalFramed(&buf, lcm2))

reader := newBufferedLedgerMetaReader(&buf)

result1, err := reader.readLedgerMetaFromPipe()
require.NoError(t, err)
assert.Equal(t, uint32(100), uint32(result1.LedgerHeaderHistoryEntry().Header.LedgerSeq))

result2, err := reader.readLedgerMetaFromPipe()
require.NoError(t, err)
assert.Equal(t, uint32(200), uint32(result2.LedgerHeaderHistoryEntry().Header.LedgerSeq))
}
11 changes: 9 additions & 2 deletions support/compressxdr/compress_xdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ func (d XDRDecoder) ReadFrom(r io.Reader) (int64, error) {
}
defer zr.Close()

n, err := xdr.Unmarshal(zr, d.XdrPayload)
return int64(n), err
data, err := io.ReadAll(zr)
if err != nil {
return 0, err
}

if err = xdr.SafeUnmarshal(data, d.XdrPayload); err != nil {
return 0, err
}
return int64(len(data)), nil
}
40 changes: 24 additions & 16 deletions xdr/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ import (
"encoding/base64"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"strings"

xdr "github.com/stellar/go-xdr/xdr3"

"github.com/stellar/go-stellar-sdk/support/errors"
)

// CommitHash is the commit hash that was used to generate the xdr in this folder.
Expand Down Expand Up @@ -270,21 +269,30 @@ func (e *EncodingBuffer) MarshalHex(encodable EncoderTo) (string, error) {
return string(b), nil
}

// XDR record marking constants from RFC 5531 (section 11).
// Each record fragment is preceded by a 4-byte header: the high bit
// indicates the last (or only) fragment, and the lower 31 bits
// contain the fragment length in bytes.
const (
xdrFrameLastFragment = 0x80000000
xdrFrameLengthMask = 0x7fffffff
)

func MarshalFramed(w io.Writer, v interface{}) error {
var tmp bytes.Buffer
n, err := Marshal(&tmp, v)
if err != nil {
return err
}
un := uint32(n)
if un > 0x7fffffff {
if un > xdrFrameLengthMask {
return fmt.Errorf("Overlong write: %d bytes", n)
}

un = un | 0x80000000
un = un | xdrFrameLastFragment
err = binary.Write(w, binary.BigEndian, &un)
if err != nil {
return errors.Wrap(err, "error in binary.Write")
return fmt.Errorf("error in binary.Write: %w", err)
}
k, err := tmp.WriteTo(w)
if int64(n) != k {
Expand All @@ -293,20 +301,20 @@ func MarshalFramed(w io.Writer, v interface{}) error {
return err
}

// ReadFrameLength returns a length of a framed XDR object.
func ReadFrameLength(d *xdr.Decoder) (uint32, error) {
frameLen, n, e := d.DecodeUint()
if e != nil {
return 0, errors.Wrap(e, "unmarshaling XDR frame header")
}
if n != 4 {
return 0, errors.New("bad length of XDR frame header")
// ReadFrameLength reads and returns the payload length from a framed XDR
// stream.
func ReadFrameLength(r io.Reader) (uint32, error) {
var frameHeader uint32
if err := binary.Read(r, binary.BigEndian, &frameHeader); err != nil {
if errors.Is(err, io.EOF) {
return 0, io.EOF
}
return 0, fmt.Errorf("reading XDR frame header: %w", err)
}
if (frameLen & 0x80000000) != 0x80000000 {
if (frameHeader & xdrFrameLastFragment) != xdrFrameLastFragment {
return 0, errors.New("malformed XDR frame header")
}
frameLen &= 0x7fffffff
return frameLen, nil
return frameHeader & xdrFrameLengthMask, nil
}

type countWriter struct {
Expand Down
17 changes: 6 additions & 11 deletions xdr/xdrstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,18 @@ import (
"bytes"
"compress/gzip"
"crypto/sha256"
"encoding/binary"
stderrors "errors"
"errors"
"fmt"
"hash"
"io"
"io/ioutil"

"github.com/klauspost/compress/zstd"

"github.com/stellar/go-stellar-sdk/support/errors"
)

const DefaultMaxXDRStreamRecordSize = 64 * 1024 * 1024 // 64 MB

var ErrRecordTooLarge = stderrors.New("xdr record too large")
var ErrRecordTooLarge = errors.New("xdr record too large")

type Stream struct {
buf bytes.Buffer
Expand Down Expand Up @@ -177,17 +174,15 @@ func (x *Stream) closeReaders() error {
}

func (x *Stream) ReadOne(in DecoderFrom) error {
var nbytes uint32
err := binary.Read(x.reader, binary.BigEndian, &nbytes)
nbytes, err := ReadFrameLength(x.reader)
if err != nil {
x.reader.Close()
if err == io.EOF {
if errors.Is(err, io.EOF) {
// Do not wrap io.EOF
return err
return io.EOF
}
return errors.Wrap(err, "binary.Read error")
return fmt.Errorf("reading frame length: %w", err)
}
nbytes &= 0x7fffffff
x.buf.Reset()
if nbytes == 0 {
x.reader.Close()
Expand Down
10 changes: 5 additions & 5 deletions xdr/xdrstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestValidateHashDrainsUnreadBytes(t *testing.T) {
func TestReadOneRecordTooLarge(t *testing.T) {
// Write a 4-byte header claiming 256 MB, with no actual payload.
var header [4]byte
binary.BigEndian.PutUint32(header[:], 256*1024*1024)
binary.BigEndian.PutUint32(header[:], 256*1024*1024|xdrFrameLastFragment)
stream := NewStream(io.NopCloser(bytes.NewReader(header[:])))

var entry BucketEntry
Expand All @@ -247,7 +247,7 @@ func TestReadOneRecordTooLarge(t *testing.T) {
func TestReadOneCustomMaxRecordSize(t *testing.T) {
// Write a 4-byte header claiming 2048 bytes.
var header [4]byte
binary.BigEndian.PutUint32(header[:], 2048)
binary.BigEndian.PutUint32(header[:], 2048|xdrFrameLastFragment)
stream := NewStream(io.NopCloser(bytes.NewReader(header[:])))
stream.SetMaxRecordSize(1024)

Expand All @@ -260,7 +260,7 @@ func TestReadOneCustomMaxRecordSize(t *testing.T) {
func TestReadOneMaxBoundary(t *testing.T) {
// Header at exactly DefaultMaxXDRStreamRecordSize + 1 -> rejected.
var header [4]byte
binary.BigEndian.PutUint32(header[:], DefaultMaxXDRStreamRecordSize+1)
binary.BigEndian.PutUint32(header[:], (DefaultMaxXDRStreamRecordSize+1)|xdrFrameLastFragment)
stream := NewStream(io.NopCloser(bytes.NewReader(header[:])))

var entry BucketEntry
Expand All @@ -269,7 +269,7 @@ func TestReadOneMaxBoundary(t *testing.T) {
assert.True(t, errors.Is(err, ErrRecordTooLarge))

// Header at exactly DefaultMaxXDRStreamRecordSize -> accepted (will fail reading data, not size check).
binary.BigEndian.PutUint32(header[:], DefaultMaxXDRStreamRecordSize)
binary.BigEndian.PutUint32(header[:], DefaultMaxXDRStreamRecordSize|xdrFrameLastFragment)
stream = NewStream(io.NopCloser(bytes.NewReader(header[:])))

err = stream.ReadOne(&entry)
Expand All @@ -281,7 +281,7 @@ func TestReadOneMaxBoundary(t *testing.T) {
func TestSetMaxRecordSizeZero(t *testing.T) {
// SetMaxRecordSize(0) should use the default.
var header [4]byte
binary.BigEndian.PutUint32(header[:], DefaultMaxXDRStreamRecordSize+1)
binary.BigEndian.PutUint32(header[:], (DefaultMaxXDRStreamRecordSize+1)|xdrFrameLastFragment)
stream := NewStream(io.NopCloser(bytes.NewReader(header[:])))
stream.SetMaxRecordSize(0) // should reset to default

Expand Down
Loading