diff --git a/reporter/offline.go b/reporter/offline.go index b7b8717992..7983156864 100644 --- a/reporter/offline.go +++ b/reporter/offline.go @@ -1,8 +1,8 @@ package reporter import ( - "bytes" "context" + "io" "os" "path/filepath" "time" @@ -87,59 +87,58 @@ func (o *OfflineReporter) Stop() { } func (o *OfflineReporter) saveDataToFile(ctx context.Context) error { + fname, ftmp, err := o.saveDataToTempFile(ctx) + if err != nil { + return err + } + if fname == "" { + return nil + } + + return os.Rename(ftmp, fname) +} + +func (o *OfflineReporter) saveDataToTempFile(ctx context.Context) (string, string, error) { id := uuidv7.New() fname := filepath.Join(o.dir, id.String()+".ipc") ftmp := fname + ".tmp" f, err := os.Create(ftmp) if err != nil { - return err + return "", "", err } defer f.Close() - buf := bytes.NewBuffer(nil) - n, err := o.writeSamples(ctx, buf) + n, err := o.writeSamples(ctx, f) if err != nil { - return err + return "", "", err } if n == 0 { - return nil + return "", "", nil } - if _, err := f.Write(buf.Bytes()); err != nil { - return err - } - - buf.Reset() - if err := o.writeLocations(ctx, buf); err != nil { - return err + if err := o.writeLocations(ctx, f); err != nil { + return "", "", err } o.stacktraceIDs = make(map[[16]byte]struct{}) - - _, err = f.Write(buf.Bytes()) - if err != nil { - return err - } - - if err := os.Rename(ftmp, fname); err != nil { - return err - } - - return err + return fname, ftmp, err } -func (o *OfflineReporter) writeSamples(ctx context.Context, buf *bytes.Buffer) (int64, error) { +func (o *OfflineReporter) writeSamples(ctx context.Context, f io.WriteSeeker) (int64, error) { record := o.pr.buildSampleRecord(ctx) if record.NumRows() == 0 { return 0, nil } defer record.Release() - w := ipc.NewWriter(buf, + w, err := ipc.NewFileWriter(f, ipc.WithSchema(record.Schema()), ipc.WithAllocator(o.pr.mem), ) + if err != nil { + return 0, err + } if err := w.Write(record); err != nil { return 0, err @@ -152,7 +151,7 @@ func (o *OfflineReporter) writeSamples(ctx context.Context, buf *bytes.Buffer) ( return record.NumRows(), nil } -func (o *OfflineReporter) writeLocations(ctx context.Context, buf *bytes.Buffer) error { +func (o *OfflineReporter) writeLocations(ctx context.Context, f io.WriteSeeker) error { lw := NewLocationsWriter(o.pr.mem) stacktraceIDBuilder := array.NewBuilder(o.pr.mem, arrow.BinaryTypes.Binary) for k, _ := range o.stacktraceIDs { @@ -165,10 +164,13 @@ func (o *OfflineReporter) writeLocations(ctx context.Context, buf *bytes.Buffer) rec := lw.NewRecord(stacktraceIDBuilder.NewArray()) - w := ipc.NewWriter(buf, + w, err := ipc.NewFileWriter(f, ipc.WithSchema(rec.Schema()), ipc.WithAllocator(o.pr.mem), ) + if err != nil { + return err + } if err := w.Write(rec); err != nil { return err