diff --git a/ingest/ledgerbackend/buffered_meta_pipe_reader.go b/ingest/ledgerbackend/buffered_meta_pipe_reader.go index f042f7aecc..c8e2e14e77 100644 --- a/ingest/ledgerbackend/buffered_meta_pipe_reader.go +++ b/ingest/ledgerbackend/buffered_meta_pipe_reader.go @@ -2,11 +2,11 @@ package ledgerbackend import ( "bufio" + "fmt" "io" "time" "github.com/pkg/errors" - xdr3 "github.com/stellar/go-xdr/xdr3" "github.com/stellar/go-stellar-sdk/support/log" "github.com/stellar/go-stellar-sdk/xdr" @@ -36,6 +36,9 @@ const ( // available. When catching up and small buffer this can increase the overall // time because ledgers are not available. ledgerReadAheadBufferSize = 20 + // maxLedgerMetaFrameSize is the maximum allowed size for a single + // LedgerCloseMeta XDR frame read from the captive core pipe. + maxLedgerMetaFrameSize = 256 * 1024 * 1024 // 256 MB ) type metaResult struct { @@ -54,19 +57,18 @@ type metaResult struct { // while previous ledger are being processed. // - Limits memory usage in case of large ledgers are closed by the network. // -// Internally, it keeps two buffers: bufio.Reader with binary ledger data and -// buffered channel with unmarshaled xdr.LedgerCloseMeta objects ready for -// processing. The first buffer removes overhead time connected to reading from -// a file. The second buffer allows unmarshaling binary data into XDR objects -// (which can be a bottleneck) while clients are processing previous ledgers. +// It reads each framed XDR record into a reusable byte buffer first, then +// decodes from that buffer. This ensures the XDR decoder always sees a +// bounded input (bytes.NewReader implements Len()), so all InputLen() guards +// in the generated DecodeFrom methods are active. // -// Finally, when a large ledger (larger than binary buffer) is closed it waits -// until xdr.LedgerCloseMeta objects channel is empty. This prevents memory -// exhaustion when network closes a series a large ledgers. +// When a large ledger (larger than the binary buffer) is closed it waits +// until the xdr.LedgerCloseMeta channel is empty. This prevents memory +// exhaustion when the network closes a series of large ledgers. type bufferedLedgerMetaReader struct { - r *bufio.Reader - c chan metaResult - decoder *xdr3.Decoder + r *bufio.Reader + c chan metaResult + frameBuf []byte // reusable frame buffer } // newBufferedLedgerMetaReader creates a new meta reader that will shutdown @@ -74,9 +76,8 @@ type bufferedLedgerMetaReader struct { func newBufferedLedgerMetaReader(reader io.Reader) *bufferedLedgerMetaReader { r := bufio.NewReaderSize(reader, metaPipeBufferSize) return &bufferedLedgerMetaReader{ - c: make(chan metaResult, ledgerReadAheadBufferSize), - r: r, - decoder: xdr3.NewDecoder(r), + c: make(chan metaResult, ledgerReadAheadBufferSize), + r: r, } } @@ -86,19 +87,33 @@ func newBufferedLedgerMetaReader(reader io.Reader) *bufferedLedgerMetaReader { // - The next ledger available in the buffer exceeds the meta pipe buffer size. // In such case the method will block until LedgerCloseMeta buffer is empty. func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMeta, error) { - frameLength, err := xdr.ReadFrameLength(b.decoder) + frameLength, err := xdr.ReadFrameLength(b.r) if err != nil { return nil, errors.Wrap(err, "error reading frame length") } + if frameLength > maxLedgerMetaFrameSize { + return nil, fmt.Errorf("LedgerCloseMeta frame too large: %d bytes (max %d)", frameLength, maxLedgerMetaFrameSize) + } + for frameLength > metaPipeBufferSize && len(b.c) > 0 { // Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage. <-time.After(time.Second) } + // Grow/reuse the frame buffer. + if uint32(cap(b.frameBuf)) < frameLength { + b.frameBuf = make([]byte, frameLength) + } else { + b.frameBuf = b.frameBuf[:frameLength] + } + + if _, err = io.ReadFull(b.r, b.frameBuf); err != nil { + return nil, errors.Wrap(err, "reading LedgerCloseMeta frame body") + } + var xlcm xdr.LedgerCloseMeta - _, err = xlcm.DecodeFrom(b.decoder, xdr3.DecodeDefaultMaxDepth) - if err != nil { + if err = xdr.SafeUnmarshal(b.frameBuf, &xlcm); err != nil { return nil, errors.Wrap(err, "unmarshaling framed LedgerCloseMeta") } return &xlcm, nil diff --git a/ingest/ledgerbackend/buffered_meta_pipe_reader_test.go b/ingest/ledgerbackend/buffered_meta_pipe_reader_test.go new file mode 100644 index 0000000000..0934f8c97b --- /dev/null +++ b/ingest/ledgerbackend/buffered_meta_pipe_reader_test.go @@ -0,0 +1,69 @@ +package ledgerbackend + +import ( + "bytes" + "encoding/binary" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/stellar/go-stellar-sdk/xdr" +) + +func createTestLedgerCloseMeta(seq uint32) xdr.LedgerCloseMeta { + return xdr.LedgerCloseMeta{ + V: int32(0), + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(seq), + }, + }, + }, + } +} + +func TestReadLedgerMetaFromPipe(t *testing.T) { + lcm := createTestLedgerCloseMeta(1234) + + var buf bytes.Buffer + require.NoError(t, xdr.MarshalFramed(&buf, lcm)) + + reader := newBufferedLedgerMetaReader(&buf) + result, err := reader.readLedgerMetaFromPipe() + require.NoError(t, err) + assert.Equal(t, uint32(1234), uint32(result.LedgerHeaderHistoryEntry().Header.LedgerSeq)) +} + +func TestReadLedgerMetaFromPipeFrameTooLarge(t *testing.T) { + var buf bytes.Buffer + // Write a frame header with length exceeding maxLedgerMetaFrameSize. + // The high bit marks the last fragment per RFC 5531 record marking. + frameHeader := uint32(0x80000000) | (maxLedgerMetaFrameSize + 1) + require.NoError(t, binary.Write(&buf, binary.BigEndian, frameHeader)) + + reader := newBufferedLedgerMetaReader(&buf) + _, err := reader.readLedgerMetaFromPipe() + require.Error(t, err) + assert.Contains(t, err.Error(), "frame too large") +} + +func TestReadLedgerMetaFromPipeMultipleFrames(t *testing.T) { + lcm1 := createTestLedgerCloseMeta(100) + lcm2 := createTestLedgerCloseMeta(200) + + var buf bytes.Buffer + require.NoError(t, xdr.MarshalFramed(&buf, lcm1)) + require.NoError(t, xdr.MarshalFramed(&buf, lcm2)) + + reader := newBufferedLedgerMetaReader(&buf) + + result1, err := reader.readLedgerMetaFromPipe() + require.NoError(t, err) + assert.Equal(t, uint32(100), uint32(result1.LedgerHeaderHistoryEntry().Header.LedgerSeq)) + + result2, err := reader.readLedgerMetaFromPipe() + require.NoError(t, err) + assert.Equal(t, uint32(200), uint32(result2.LedgerHeaderHistoryEntry().Header.LedgerSeq)) +} diff --git a/support/compressxdr/compress_xdr.go b/support/compressxdr/compress_xdr.go index 696dbf0800..f383487e77 100644 --- a/support/compressxdr/compress_xdr.go +++ b/support/compressxdr/compress_xdr.go @@ -47,6 +47,13 @@ func (d XDRDecoder) ReadFrom(r io.Reader) (int64, error) { } defer zr.Close() - n, err := xdr.Unmarshal(zr, d.XdrPayload) - return int64(n), err + data, err := io.ReadAll(zr) + if err != nil { + return 0, err + } + + if err = xdr.SafeUnmarshal(data, d.XdrPayload); err != nil { + return 0, err + } + return int64(len(data)), nil } diff --git a/xdr/main.go b/xdr/main.go index e0ea11bde1..59ffd884f0 100644 --- a/xdr/main.go +++ b/xdr/main.go @@ -8,13 +8,12 @@ import ( "encoding/base64" "encoding/binary" "encoding/hex" + "errors" "fmt" "io" "strings" xdr "github.com/stellar/go-xdr/xdr3" - - "github.com/stellar/go-stellar-sdk/support/errors" ) // CommitHash is the commit hash that was used to generate the xdr in this folder. @@ -270,6 +269,15 @@ func (e *EncodingBuffer) MarshalHex(encodable EncoderTo) (string, error) { return string(b), nil } +// XDR record marking constants from RFC 5531 (section 11). +// Each record fragment is preceded by a 4-byte header: the high bit +// indicates the last (or only) fragment, and the lower 31 bits +// contain the fragment length in bytes. +const ( + xdrFrameLastFragment = 0x80000000 + xdrFrameLengthMask = 0x7fffffff +) + func MarshalFramed(w io.Writer, v interface{}) error { var tmp bytes.Buffer n, err := Marshal(&tmp, v) @@ -277,14 +285,14 @@ func MarshalFramed(w io.Writer, v interface{}) error { return err } un := uint32(n) - if un > 0x7fffffff { + if un > xdrFrameLengthMask { return fmt.Errorf("Overlong write: %d bytes", n) } - un = un | 0x80000000 + un = un | xdrFrameLastFragment err = binary.Write(w, binary.BigEndian, &un) if err != nil { - return errors.Wrap(err, "error in binary.Write") + return fmt.Errorf("error in binary.Write: %w", err) } k, err := tmp.WriteTo(w) if int64(n) != k { @@ -293,20 +301,20 @@ func MarshalFramed(w io.Writer, v interface{}) error { return err } -// ReadFrameLength returns a length of a framed XDR object. -func ReadFrameLength(d *xdr.Decoder) (uint32, error) { - frameLen, n, e := d.DecodeUint() - if e != nil { - return 0, errors.Wrap(e, "unmarshaling XDR frame header") - } - if n != 4 { - return 0, errors.New("bad length of XDR frame header") +// ReadFrameLength reads and returns the payload length from a framed XDR +// stream. +func ReadFrameLength(r io.Reader) (uint32, error) { + var frameHeader uint32 + if err := binary.Read(r, binary.BigEndian, &frameHeader); err != nil { + if errors.Is(err, io.EOF) { + return 0, io.EOF + } + return 0, fmt.Errorf("reading XDR frame header: %w", err) } - if (frameLen & 0x80000000) != 0x80000000 { + if (frameHeader & xdrFrameLastFragment) != xdrFrameLastFragment { return 0, errors.New("malformed XDR frame header") } - frameLen &= 0x7fffffff - return frameLen, nil + return frameHeader & xdrFrameLengthMask, nil } type countWriter struct { diff --git a/xdr/xdrstream.go b/xdr/xdrstream.go index 141f0fd292..c5acc46ad2 100644 --- a/xdr/xdrstream.go +++ b/xdr/xdrstream.go @@ -9,21 +9,18 @@ import ( "bytes" "compress/gzip" "crypto/sha256" - "encoding/binary" - stderrors "errors" + "errors" "fmt" "hash" "io" "io/ioutil" "github.com/klauspost/compress/zstd" - - "github.com/stellar/go-stellar-sdk/support/errors" ) const DefaultMaxXDRStreamRecordSize = 64 * 1024 * 1024 // 64 MB -var ErrRecordTooLarge = stderrors.New("xdr record too large") +var ErrRecordTooLarge = errors.New("xdr record too large") type Stream struct { buf bytes.Buffer @@ -177,17 +174,15 @@ func (x *Stream) closeReaders() error { } func (x *Stream) ReadOne(in DecoderFrom) error { - var nbytes uint32 - err := binary.Read(x.reader, binary.BigEndian, &nbytes) + nbytes, err := ReadFrameLength(x.reader) if err != nil { x.reader.Close() - if err == io.EOF { + if errors.Is(err, io.EOF) { // Do not wrap io.EOF - return err + return io.EOF } - return errors.Wrap(err, "binary.Read error") + return fmt.Errorf("reading frame length: %w", err) } - nbytes &= 0x7fffffff x.buf.Reset() if nbytes == 0 { x.reader.Close() diff --git a/xdr/xdrstream_test.go b/xdr/xdrstream_test.go index 9b4f0c146e..6f60c0d70d 100644 --- a/xdr/xdrstream_test.go +++ b/xdr/xdrstream_test.go @@ -235,7 +235,7 @@ func TestValidateHashDrainsUnreadBytes(t *testing.T) { func TestReadOneRecordTooLarge(t *testing.T) { // Write a 4-byte header claiming 256 MB, with no actual payload. var header [4]byte - binary.BigEndian.PutUint32(header[:], 256*1024*1024) + binary.BigEndian.PutUint32(header[:], 256*1024*1024|xdrFrameLastFragment) stream := NewStream(io.NopCloser(bytes.NewReader(header[:]))) var entry BucketEntry @@ -247,7 +247,7 @@ func TestReadOneRecordTooLarge(t *testing.T) { func TestReadOneCustomMaxRecordSize(t *testing.T) { // Write a 4-byte header claiming 2048 bytes. var header [4]byte - binary.BigEndian.PutUint32(header[:], 2048) + binary.BigEndian.PutUint32(header[:], 2048|xdrFrameLastFragment) stream := NewStream(io.NopCloser(bytes.NewReader(header[:]))) stream.SetMaxRecordSize(1024) @@ -260,7 +260,7 @@ func TestReadOneCustomMaxRecordSize(t *testing.T) { func TestReadOneMaxBoundary(t *testing.T) { // Header at exactly DefaultMaxXDRStreamRecordSize + 1 -> rejected. var header [4]byte - binary.BigEndian.PutUint32(header[:], DefaultMaxXDRStreamRecordSize+1) + binary.BigEndian.PutUint32(header[:], (DefaultMaxXDRStreamRecordSize+1)|xdrFrameLastFragment) stream := NewStream(io.NopCloser(bytes.NewReader(header[:]))) var entry BucketEntry @@ -269,7 +269,7 @@ func TestReadOneMaxBoundary(t *testing.T) { assert.True(t, errors.Is(err, ErrRecordTooLarge)) // Header at exactly DefaultMaxXDRStreamRecordSize -> accepted (will fail reading data, not size check). - binary.BigEndian.PutUint32(header[:], DefaultMaxXDRStreamRecordSize) + binary.BigEndian.PutUint32(header[:], DefaultMaxXDRStreamRecordSize|xdrFrameLastFragment) stream = NewStream(io.NopCloser(bytes.NewReader(header[:]))) err = stream.ReadOne(&entry) @@ -281,7 +281,7 @@ func TestReadOneMaxBoundary(t *testing.T) { func TestSetMaxRecordSizeZero(t *testing.T) { // SetMaxRecordSize(0) should use the default. var header [4]byte - binary.BigEndian.PutUint32(header[:], DefaultMaxXDRStreamRecordSize+1) + binary.BigEndian.PutUint32(header[:], (DefaultMaxXDRStreamRecordSize+1)|xdrFrameLastFragment) stream := NewStream(io.NopCloser(bytes.NewReader(header[:]))) stream.SetMaxRecordSize(0) // should reset to default