Skip to content

Commit

Permalink
ArrowLog reader
Browse files Browse the repository at this point in the history
  • Loading branch information
thorfour committed Oct 3, 2024
1 parent 43f751e commit ba43b51
Showing 1 changed file with 77 additions and 0 deletions.
77 changes: 77 additions & 0 deletions reporter/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,80 @@ func (o *OfflineReporter) ReportMetrics(timestamp uint32, ids []uint32, values [
func (o *OfflineReporter) GetMetrics() reporter.Metrics {
panic("not implemented") // TODO: Implement
}

type ArrowLogger struct {
lastReader bool
currentReader *ipc.FileReader
currentRecord int
f *os.File
}

func OpenArrowLog(fname string) (*ArrowLogger, error) {
f, err := os.Open(fname)
if err != nil {
return nil, err
}

// Seek to the last 8 bytes
_, err = f.Seek(-8, io.SeekEnd)
if err != nil {
return nil, err
}

return &ArrowLogger{
f: f,
}, nil
}

func (a *ArrowLogger) initNextReader() error {
if a.lastReader {
return io.EOF
}

// Read the last 8 bytes
size := make([]byte, 8)
_, err := a.f.Read(size)
if err != nil {
return err
}

// Read the size of the record
recordSize := binary.LittleEndian.Uint64(size)

// Seek to the start of the record
offset, err := a.f.Seek(-int64(recordSize+8), io.SeekCurrent)
if err != nil {
return err
}

// We've reached the last record
if offset == 0 {
a.lastReader = true
}

// Start a new reader
a.currentReader, err = ipc.NewFileReader(a.f, ipc.WithAllocator(memory.NewGoAllocator()))
if err != nil {
return err
}
a.currentRecord = 0

return nil
}

func (a *ArrowLogger) Next() (arrow.Record, error) {
if a.currentReader == nil || a.currentRecord == a.currentReader.NumRecords() {
err := a.initNextReader()
if err != nil {
return nil, err
}
}

rec, err := a.currentReader.Record(a.currentRecord)
if err != nil {
return nil, err
}
a.currentRecord++

return rec, nil
}

0 comments on commit ba43b51

Please sign in to comment.