Skip to content

Commit

Permalink
Fixed file writing/reading
Browse files Browse the repository at this point in the history
Previously using the Arrow IPC File format it contains hardcoded
offsets into the file. This becomes problematic when you try to write
multiple of these to a single file, and the IPC reader can't handle a
section reader (as many of the offsets will be out of bounds of the
section). Switching to using the IPC Stream format fixes that and allows
us to concatenate multiple arrow records to a single physical file and
read them back.
  • Loading branch information
thorfour committed Oct 4, 2024
1 parent 68a9790 commit e3e658a
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 34 deletions.
50 changes: 20 additions & 30 deletions reporter/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,6 @@ func (o *OfflineReporter) saveDataToFile(ctx context.Context) error {
return os.Rename(ftmp, fname)
}

type accountingWriter struct {
n int
w io.WriteSeeker
}

func (a *accountingWriter) Write(p []byte) (n int, err error) {
n, err = a.w.Write(p)
a.n += n
return
}

func (a *accountingWriter) Seek(offset int64, whence int) (int64, error) {
return a.w.Seek(offset, whence)
}

func (o *OfflineReporter) saveDataToTempFile(ctx context.Context) (string, string, error) {
id := uuidv7.New()
fname := filepath.Join(o.dir, id.String()+".ipc")
Expand Down Expand Up @@ -256,8 +241,7 @@ func (o *OfflineReporter) GetMetrics() reporter.Metrics {
}

type ArrowLogReader struct {
currentReader *ipc.FileReader
currentRecord int
currentReader *ipc.Reader
f *os.File
offset int64
}
Expand Down Expand Up @@ -285,13 +269,10 @@ func (a *ArrowLogger) Close() error {

func (a *ArrowLogger) Write(mem memory.Allocator, rec arrow.Record) error {
a.accountingWriter.n = 0
w, err := ipc.NewFileWriter(a.accountingWriter,
w := ipc.NewWriter(a.accountingWriter,
ipc.WithSchema(rec.Schema()),
ipc.WithAllocator(mem),
)
if err != nil {
return err
}

if err := w.Write(rec); err != nil {
return err
Expand Down Expand Up @@ -353,28 +334,37 @@ func (a *ArrowLogReader) initNextReader() error {

// Start a new reader
section := io.NewSectionReader(a.f, a.offset, int64(recordSize))
a.currentReader, err = ipc.NewFileReader(section, ipc.WithAllocator(memory.NewGoAllocator()))
a.currentReader, err = ipc.NewReader(section)
if err != nil {
return err
}
a.currentRecord = 0

a.currentReader.Next()
return nil
}

func (a *ArrowLogReader) Next() (arrow.Record, error) {
if a.currentReader == nil || a.currentRecord == a.currentReader.NumRecords() {
if a.currentReader == nil || !a.currentReader.Next() {
err := a.initNextReader()
if err != nil {
return nil, err
}
}

rec, err := a.currentReader.Record(a.currentRecord)
if err != nil {
return nil, err
}
a.currentRecord++
return a.currentReader.Record(), nil
}

type accountingWriter struct {
n int
w io.WriteSeeker
}

return rec, nil
func (a *accountingWriter) Write(p []byte) (n int, err error) {
n, err = a.w.Write(p)
a.n += n
return
}

func (a *accountingWriter) Seek(offset int64, whence int) (int64, error) {
return a.w.Seek(offset, whence)
}
9 changes: 5 additions & 4 deletions reporter/offline_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package reporter
package frostdb

import (
"errors"
Expand Down Expand Up @@ -37,7 +37,8 @@ func Test_Offline_ReadFile(t *testing.T) {
// Write a few records to the file
log, err := NewArrowLogger(f.Name())
require.NoError(t, err)
for i := 0; i < 10; i++ {
n := 10
for i := 0; i < n; i++ {
require.NoError(t, log.Write(alloc, rec))
}
require.NoError(t, log.Close())
Expand All @@ -49,7 +50,7 @@ func Test_Offline_ReadFile(t *testing.T) {
// Validate the number of records the log has
count := 0
for {
_, err := logreader.Next()
rec, err := logreader.Next()
if err != nil {
if errors.Is(err, io.EOF) {
break
Expand All @@ -58,5 +59,5 @@ func Test_Offline_ReadFile(t *testing.T) {
}
count++
}
require.Equal(t, 10, count)
require.Equal(t, n, count)
}

0 comments on commit e3e658a

Please sign in to comment.