Skip to content

Commit

Permalink
use ipc file writer
Browse files Browse the repository at this point in the history
  • Loading branch information
thorfour committed Oct 3, 2024
1 parent 15d8bf6 commit 4afa3cb
Showing 1 changed file with 30 additions and 28 deletions.
58 changes: 30 additions & 28 deletions reporter/offline.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package reporter

import (
"bytes"
"context"
"io"
"os"
"path/filepath"
"time"
Expand Down Expand Up @@ -87,59 +87,58 @@ func (o *OfflineReporter) Stop() {
}

func (o *OfflineReporter) saveDataToFile(ctx context.Context) error {
fname, ftmp, err := o.saveDataToTempFile(ctx)
if err != nil {
return err
}
if fname == "" {
return nil
}

return os.Rename(ftmp, fname)
}

func (o *OfflineReporter) saveDataToTempFile(ctx context.Context) (string, string, error) {
id := uuidv7.New()
fname := filepath.Join(o.dir, id.String()+".ipc")
ftmp := fname + ".tmp"

f, err := os.Create(ftmp)
if err != nil {
return err
return "", "", err
}
defer f.Close()

buf := bytes.NewBuffer(nil)
n, err := o.writeSamples(ctx, buf)
n, err := o.writeSamples(ctx, f)
if err != nil {
return err
return "", "", err
}
if n == 0 {
return nil
return "", "", nil
}

if _, err := f.Write(buf.Bytes()); err != nil {
return err
}

buf.Reset()
if err := o.writeLocations(ctx, buf); err != nil {
return err
if err := o.writeLocations(ctx, f); err != nil {
return "", "", err
}

o.stacktraceIDs = make(map[[16]byte]struct{})

_, err = f.Write(buf.Bytes())
if err != nil {
return err
}

if err := os.Rename(ftmp, fname); err != nil {
return err
}

return err
return fname, ftmp, err
}

func (o *OfflineReporter) writeSamples(ctx context.Context, buf *bytes.Buffer) (int64, error) {
func (o *OfflineReporter) writeSamples(ctx context.Context, f io.WriteSeeker) (int64, error) {
record := o.pr.buildSampleRecord(ctx)
if record.NumRows() == 0 {
return 0, nil
}
defer record.Release()

w := ipc.NewWriter(buf,
w, err := ipc.NewFileWriter(f,
ipc.WithSchema(record.Schema()),
ipc.WithAllocator(o.pr.mem),
)
if err != nil {
return 0, err
}

if err := w.Write(record); err != nil {
return 0, err
Expand All @@ -152,7 +151,7 @@ func (o *OfflineReporter) writeSamples(ctx context.Context, buf *bytes.Buffer) (
return record.NumRows(), nil
}

func (o *OfflineReporter) writeLocations(ctx context.Context, buf *bytes.Buffer) error {
func (o *OfflineReporter) writeLocations(ctx context.Context, f io.WriteSeeker) error {
lw := NewLocationsWriter(o.pr.mem)
stacktraceIDBuilder := array.NewBuilder(o.pr.mem, arrow.BinaryTypes.Binary)
for k, _ := range o.stacktraceIDs {
Expand All @@ -165,10 +164,13 @@ func (o *OfflineReporter) writeLocations(ctx context.Context, buf *bytes.Buffer)

rec := lw.NewRecord(stacktraceIDBuilder.NewArray())

w := ipc.NewWriter(buf,
w, err := ipc.NewFileWriter(f,
ipc.WithSchema(rec.Schema()),
ipc.WithAllocator(o.pr.mem),
)
if err != nil {
return err
}

if err := w.Write(rec); err != nil {
return err
Expand Down

0 comments on commit 4afa3cb

Please sign in to comment.