-
Notifications
You must be signed in to change notification settings - Fork 1
/
parquet.go
80 lines (73 loc) · 2 KB
/
parquet.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package arrow3
import (
"context"
"io"
"github.com/apache/arrow/go/v17/arrow"
"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/apache/arrow/go/v17/parquet"
"github.com/apache/arrow/go/v17/parquet/file"
"github.com/apache/arrow/go/v17/parquet/pqarrow"
"github.com/apache/arrow/go/v17/parquet/schema"
)
func (msg *message) Parquet() *schema.Schema {
return msg.parquet
}
// Read reads specified columns from parquet source r. The returned record must
// be released by the caller.
func (msg *message) Read(ctx context.Context, r parquet.ReaderAtSeeker, columns []int) (arrow.Record, error) {
f, err := file.NewParquetReader(r)
if err != nil {
return nil, err
}
defer f.Close()
pr, err := pqarrow.NewFileReader(f, pqarrow.ArrowReadProperties{
BatchSize: f.NumRows(), // we read full columns
}, memory.DefaultAllocator)
if err != nil {
return nil, err
}
rd, err := pr.GetRecordReader(ctx, columns, []int{0})
if err != nil {
return nil, err
}
defer rd.Release()
o, err := rd.Read()
if err != nil {
return nil, err
}
o.Retain()
return o, nil
}
// WriteParquet writes existing record as parquet file to w.
func (msg *message) WriteParquet(w io.Writer) error {
r := msg.NewRecord()
defer r.Release()
return msg.WriteParquetRecords(w, r)
}
// WriteParquetRecords writes multiple records sequentially. Similar to doing
// concat on records and writing as a single record.
func (msg *message) WriteParquetRecords(w io.Writer, records ...arrow.Record) error {
f, err := pqarrow.NewFileWriter(msg.schema, w,
parquet.NewWriterProperties(msg.props...),
pqarrow.NewArrowWriterProperties(),
)
if err != nil {
return err
}
f.NewRowGroup()
chunk := make([]arrow.Array, len(records))
for i := 0; i < int(records[0].NumCols()); i++ {
for j := range records {
chunk[j] = records[j].Column(i)
}
a := arrow.NewChunked(chunk[0].DataType(), chunk)
err := f.WriteColumnChunked(a, 0, int64(a.Len()))
if err != nil {
a.Release()
f.Close()
return err
}
a.Release()
}
return f.Close()
}