Skip to content

Commit

Permalink
arrow logger
Browse files Browse the repository at this point in the history
  • Loading branch information
thorfour committed Oct 3, 2024
1 parent ba43b51 commit 7bad9c2
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 64 deletions.
113 changes: 62 additions & 51 deletions reporter/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,71 +118,46 @@ func (o *OfflineReporter) saveDataToTempFile(ctx context.Context) (string, strin
id := uuidv7.New()
fname := filepath.Join(o.dir, id.String()+".ipc")
ftmp := fname + ".tmp"

f, err := os.Create(ftmp)
arrowLogger, err := NewArrowLogger(ftmp)
if err != nil {
return "", "", err
}
defer f.Close()

accountant := &accountingWriter{w: f}
n, err := o.writeSamples(ctx, accountant)
n, err := o.writeSamples(ctx, arrowLogger)
if err != nil {
return "", "", err
}
if n == 0 {
return "", "", nil
}

// Write the size of the file at the end
size := make([]byte, 8)
binary.LittleEndian.PutUint64(size, uint64(accountant.n))
if _, err := f.Write(size); err != nil {
return "", "", err
}

accountant = &accountingWriter{w: f}
if err := o.writeLocations(ctx, accountant); err != nil {
if err := o.writeLocations(ctx, arrowLogger); err != nil {
return "", "", err
}

size = make([]byte, 8)
binary.LittleEndian.PutUint64(size, uint64(accountant.n))
if _, err := f.Write(size); err != nil {
if err := arrowLogger.Close(); err != nil {
return "", "", err
}

o.stacktraceIDs = make(map[[16]byte]struct{})
return fname, ftmp, err
}

func (o *OfflineReporter) writeSamples(ctx context.Context, f io.WriteSeeker) (int64, error) {
func (o *OfflineReporter) writeSamples(ctx context.Context, log *ArrowLogger) (int64, error) {
record := o.pr.buildSampleRecord(ctx)
if record.NumRows() == 0 {
return 0, nil
}
defer record.Release()

w, err := ipc.NewFileWriter(f,
ipc.WithSchema(record.Schema()),
ipc.WithAllocator(o.pr.mem),
)
if err != nil {
return 0, err
}

if err := w.Write(record); err != nil {
return 0, err
}

if err := w.Close(); err != nil {
if err := log.Write(o.pr.mem, record); err != nil {
return 0, err
}

return record.NumRows(), nil
}

func (o *OfflineReporter) writeLocations(ctx context.Context, f io.WriteSeeker) error {
func (o *OfflineReporter) writeLocations(ctx context.Context, log *ArrowLogger) error {
lw := NewLocationsWriter(o.pr.mem)
stacktraceIDBuilder := array.NewBuilder(o.pr.mem, arrow.BinaryTypes.Binary)
for k, _ := range o.stacktraceIDs {
Expand All @@ -194,20 +169,7 @@ func (o *OfflineReporter) writeLocations(ctx context.Context, f io.WriteSeeker)
}

rec := lw.NewRecord(stacktraceIDBuilder.NewArray())

w, err := ipc.NewFileWriter(f,
ipc.WithSchema(rec.Schema()),
ipc.WithAllocator(o.pr.mem),
)
if err != nil {
return err
}

if err := w.Write(rec); err != nil {
return err
}

if err := w.Close(); err != nil {
if err := log.Write(o.pr.mem, rec); err != nil {
return err
}

Expand Down Expand Up @@ -293,14 +255,63 @@ func (o *OfflineReporter) GetMetrics() reporter.Metrics {
panic("not implemented") // TODO: Implement
}

type ArrowLogger struct {
type ArrowLogReader struct {
lastReader bool
currentReader *ipc.FileReader
currentRecord int
f *os.File
}

func OpenArrowLog(fname string) (*ArrowLogger, error) {
type ArrowLogger struct {
accountingWriter *accountingWriter
f *os.File
}

func NewArrowLogger(name string) (*ArrowLogger, error) {
f, err := os.Create(name)
if err != nil {
return nil, err
}

return &ArrowLogger{
accountingWriter: &accountingWriter{w: f},
f: f,
}, nil
}

func (a *ArrowLogger) Close() error {
return a.f.Close()
}

func (a *ArrowLogger) Write(mem memory.Allocator, rec arrow.Record) error {
a.accountingWriter.n = 0
w, err := ipc.NewFileWriter(a.accountingWriter,
ipc.WithSchema(rec.Schema()),
ipc.WithAllocator(mem),
)
if err != nil {
return err
}

if err := w.Write(rec); err != nil {
return err
}

if err := w.Close(); err != nil {
return err
}

// Write the size of the file at the end
size := make([]byte, 8)
binary.LittleEndian.PutUint64(size, uint64(a.accountingWriter.n))
if _, err := a.f.Write(size); err != nil {
return err
}

return nil
}

func OpenArrowLog(fname string) (*ArrowLogReader, error) {
f, err := os.Open(fname)
if err != nil {
return nil, err
Expand All @@ -312,12 +323,12 @@ func OpenArrowLog(fname string) (*ArrowLogger, error) {
return nil, err
}

return &ArrowLogger{
return &ArrowLogReader{
f: f,
}, nil
}

func (a *ArrowLogger) initNextReader() error {
func (a *ArrowLogReader) initNextReader() error {
if a.lastReader {
return io.EOF
}
Expand Down Expand Up @@ -353,7 +364,7 @@ func (a *ArrowLogger) initNextReader() error {
return nil
}

func (a *ArrowLogger) Next() (arrow.Record, error) {
func (a *ArrowLogReader) Next() (arrow.Record, error) {
if a.currentReader == nil || a.currentRecord == a.currentReader.NumRecords() {
err := a.initNextReader()
if err != nil {
Expand Down
54 changes: 41 additions & 13 deletions reporter/offline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,57 @@ import (
"os"
"testing"

"github.com/apache/arrow/go/v16/arrow/ipc"
"github.com/apache/arrow/go/v16/arrow"
"github.com/apache/arrow/go/v16/arrow/array"
"github.com/apache/arrow/go/v16/arrow/memory"
"github.com/stretchr/testify/require"
)

func Test_Offline_ReadFile(t *testing.T) {
f, err := os.Open("/tmp/offline-data/019251da-d851-7cee-a4dd-385102cb1d9a.ipc")
alloc := memory.NewGoAllocator()
schema := arrow.NewSchema([]arrow.Field{
{
Name: "name",
Type: &arrow.StringType{},
Nullable: true,
},
}, nil)

bld := array.NewRecordBuilder(alloc, schema)
t.Cleanup(bld.Release)

bld.Field(0).(*array.StringBuilder).Append("hello")
bld.Field(0).(*array.StringBuilder).Append("world")

rec := bld.NewRecord()
t.Cleanup(rec.Release)

f, err := os.CreateTemp("", "test-*.arrow")
require.NoError(t, err)

printFile(t, f)
printFile(t, f)
}
// Write a few records to the file
log, err := NewArrowLogger(f.Name())
require.NoError(t, err)
for i := 0; i < 10; i++ {
require.NoError(t, log.Write(alloc, rec))
}
require.NoError(t, log.Close())

func printFile(t *testing.T, f ipc.ReadAtSeeker) {
t.Helper()
reader, err := ipc.NewFileReader(f)
// Read the Arrow log
logreader, err := OpenArrowLog(f.Name())
require.NoError(t, err)

// Validate the number of records the log has
count := 0
for {
batch, err := reader.Read()
if errors.Is(err, io.EOF) {
break
_, err := logreader.Next()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
}
require.NoError(t, err)
t.Log(batch)
count++
}
require.Equal(t, 10, count)
}

0 comments on commit 7bad9c2

Please sign in to comment.