Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions ruby/red-arrow-format/lib/arrow-format/field.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@ class Field
attr_reader :name
attr_reader :type
attr_reader :dictionary_id
def initialize(name, type, nullable, dictionary_id)
attr_reader :metadata
def initialize(name,
type,
nullable: true,
dictionary_id: nil,
metadata: nil)
@name = name
@type = type
@nullable = nullable
@dictionary_id = dictionary_id
@metadata = metadata
end

def nullable?
Expand All @@ -44,7 +50,14 @@ def to_flatbuffers
elsif @type.respond_to?(:children)
fb_field.children = @type.children.collect(&:to_flatbuffers)
end
# fb_field.custom_metadata = @custom_metadata
if @metadata
fb_field.custom_metadata = @metadata.collect do |key, value|
fb_key_value = FB::KeyValue::Data.new
fb_key_value.key = key
fb_key_value.value = value
fb_key_value
end
end
fb_field
end
end
Expand Down
4 changes: 2 additions & 2 deletions ruby/red-arrow-format/lib/arrow-format/file-reader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def initialize(input)

validate
@footer = read_footer
@record_batch_blocks = @footer.record_batches
@record_batch_blocks = @footer.record_batches || []
@schema = read_schema(@footer.schema)
@dictionaries = read_dictionaries
end
Expand Down Expand Up @@ -193,7 +193,7 @@ def read_dictionaries
end

value_type = dictionary_fields[id].type.value_type
schema = Schema.new([Field.new("dummy", value_type, true, nil)])
schema = Schema.new([Field.new("dummy", value_type)])
record_batch = read_record_batch(fb_header.data, schema, body)
if fb_header.delta?
dictionaries[id] << record_batch.columns[0]
Expand Down
18 changes: 16 additions & 2 deletions ruby/red-arrow-format/lib/arrow-format/readable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,21 @@
module ArrowFormat
module Readable
private
def read_custom_metadata(fb_custom_metadata)
return nil if fb_custom_metadata.nil?
metadata = {}
fb_custom_metadata.each do |key_value|
metadata[key_value.key] = key_value.value
end
metadata
end

def read_schema(fb_schema)
fields = fb_schema.fields.collect do |fb_field|
read_field(fb_field)
end
Schema.new(fields)
Schema.new(fields,
metadata: read_custom_metadata(fb_schema.custom_metadata))
end

def read_field(fb_field)
Expand Down Expand Up @@ -132,7 +142,11 @@ def read_field(fb_field)
else
dictionary_id = nil
end
Field.new(fb_field.name, type, fb_field.nullable?, dictionary_id)
Field.new(fb_field.name,
type,
nullable: fb_field.nullable?,
dictionary_id: dictionary_id,
metadata: read_custom_metadata(fb_field.custom_metadata))
end

def read_type_int(fb_type)
Expand Down
13 changes: 11 additions & 2 deletions ruby/red-arrow-format/lib/arrow-format/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,24 @@
module ArrowFormat
class Schema
attr_reader :fields
def initialize(fields)
attr_reader :metadata
def initialize(fields, metadata: nil)
@fields = fields
@metadata = metadata
end

def to_flatbuffers
fb_schema = FB::Schema::Data.new
fb_schema.endianness = FB::Endianness::LITTLE
fb_schema.fields = fields.collect(&:to_flatbuffers)
# fb_schema.custom_metadata = @custom_metadata
if @metadata
fb_schema.custom_metadata = @metadata.collect do |key, value|
fb_key_value = FB::KeyValue::Data.new
fb_key_value.key = key
fb_key_value.value = value
fb_key_value
end
end
# fb_schema.features = @features
fb_schema
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def process_dictionary_batch_message(message, body)
end
field = @dictionary_fields[header.id]
value_type = field.type.value_type
schema = Schema.new([Field.new("dummy", value_type, true, nil)])
schema = Schema.new([Field.new("dummy", value_type)])
record_batch = read_record_batch(header.data, schema, body)
if header.delta?
@dictionaries[header.id] << record_batch.columns[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def write_dictionary(id, dictionary_array)
dictionary = dictionary.slice(written_offset - current_base_offset)
end

schema = Schema.new([Field.new("dummy", value_type, true, nil)])
schema = Schema.new([Field.new("dummy", value_type)])
size = dictionary.size
record_batch = RecordBatch.new(schema, size, [dictionary])
fb_dictionary_batch = FB::DictionaryBatch::Data.new
Expand Down
2 changes: 1 addition & 1 deletion ruby/red-arrow-format/red-arrow-format.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Gem::Specification.new do |spec|
spec.files += Dir.glob("lib/**/*.rb")
spec.files += Dir.glob("doc/text/*")

spec.add_runtime_dependency("red-flatbuffers", ">=0.0.5")
spec.add_runtime_dependency("red-flatbuffers", ">=0.0.6")

github_url = "https://github.com/apache/arrow"
spec.metadata = {
Expand Down
69 changes: 63 additions & 6 deletions ruby/red-arrow-format/test/test-reader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,81 @@
# under the License.

module ReaderTests
def roundtrip(array)
def roundtrip(data)
Dir.mktmpdir do |tmp_dir|
table = Arrow::Table.new(value: array)
case data
when Arrow::Array
table = Arrow::Table.new(value: data)
when Arrow::RecordBatch
table = data.to_table
else
table = data
end
path = File.join(tmp_dir, "data.#{file_extension}")
table.save(path)
File.open(path, "rb") do |input|
reader = reader_class.new(input)
values = []
reader.each do |record_batch|
values.concat(record_batch.columns[0].to_a)
case data
when Arrow::Array
values = []
reader.each do |record_batch|
values.concat(record_batch.columns[0].to_a)
end
[reader.schema.fields[0].type, values]
else
record_batches = reader.collect do |record_batch|
record_batch.to_h.tap do |hash|
hash.each do |key, value|
hash[key] = value.to_a
end
end
end
[reader.schema, record_batches]
end
[reader.schema.fields[0].type, values]
end
ensure
GC.start
end
end

def test_custom_metadata_field
field =
Arrow::Field.new("value", :boolean)
.with_metadata("key1" => "value1",
"key2" => "value2")
schema = Arrow::Schema.new([field])
values = [true, nil, false]
record_batch = Arrow::RecordBatch.new(schema, {"value" => values})
schema, record_batches = roundtrip(record_batch)
assert_equal([
{
"key1" => "value1",
"key2" => "value2",
},
[{"value" => values}],
],
[schema.fields[0].metadata, record_batches])
end

def test_custom_metadata_schema
field = Arrow::Field.new("value", :boolean)
schema =
Arrow::Schema.new([field])
.with_metadata("key1" => "value1",
"key2" => "value2")
values = [true, nil, false]
record_batch = Arrow::RecordBatch.new(schema, {"value" => values})
schema, record_batches = roundtrip(record_batch)
assert_equal([
{
"key1" => "value1",
"key2" => "value2",
},
[{"value" => values}],
],
[schema.metadata, record_batches])
end

def test_null
type, values = roundtrip(Arrow::NullArray.new(3))
assert_equal(["Null", [nil, nil, nil]],
Expand Down
64 changes: 54 additions & 10 deletions ruby/red-arrow-format/test/test-writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ def convert_field(red_arrow_field)
end
ArrowFormat::Field.new(red_arrow_field.name,
type,
red_arrow_field.nullable?,
dictionary_id)
nullable: red_arrow_field.nullable?,
dictionary_id: dictionary_id,
metadata: red_arrow_field.metadata)
end

def convert_buffer(buffer)
Expand Down Expand Up @@ -230,13 +231,58 @@ def roundtrip(*inputs)
end
# pp(read(path)) # debug
data = File.open(path, "rb", &:read).freeze
table = Arrow::Table.load(Arrow::Buffer.new(data), format: :arrow)
[table.value.data_type, table.value.values]
case file_extension
when "arrow"
format = :arrow_file
else
format = :arrow_streaming
end
table = Arrow::Table.load(Arrow::Buffer.new(data), format: format)
if inputs[0].is_a?(Arrow::Array)
[table.value.data_type, table.value.values]
else
table
end
end
end
end

module WriterTests
def test_custom_metadata_field
field = ArrowFormat::Field.new("value",
ArrowFormat::BooleanType.new,
metadata: {
"key1" => "value1",
"key2" => "value2",
})
schema = ArrowFormat::Schema.new([field])
column = convert_array(Arrow::BooleanArray.new([true, nil, false]))
record_batch = ArrowFormat::RecordBatch.new(schema, 3, [column])
table = roundtrip(record_batch)
assert_equal({
"key1" => "value1",
"key2" => "value2",
},
table.schema.fields[0].metadata)
end

def test_custom_metadata_schema
field = ArrowFormat::Field.new("value", ArrowFormat::BooleanType.new)
schema = ArrowFormat::Schema.new([field],
metadata: {
"key1" => "value1",
"key2" => "value2",
})
column = convert_array(Arrow::BooleanArray.new([true, nil, false]))
record_batch = ArrowFormat::RecordBatch.new(schema, 3, [column])
table = roundtrip(record_batch)
assert_equal({
"key1" => "value1",
"key2" => "value2",
},
table.schema.metadata)
end

def test_null
array = Arrow::NullArray.new(3)
type, values = roundtrip(array)
Expand Down Expand Up @@ -865,12 +911,9 @@ def build_schema(value_type)
type = ArrowFormat::DictionaryType.new(index_type,
value_type,
ordered)
nullable = true
dictionary_id = 1
field = ArrowFormat::Field.new("value",
type,
nullable,
dictionary_id)
dictionary_id: 1)
ArrowFormat::Schema.new([field])
end

Expand Down Expand Up @@ -934,9 +977,10 @@ def build_record_batches(red_arrow_value_type, values1, values2)
end

def roundtrip(value_type, values1, values2)
r = build_record_batches(value_type, values1, values2)
record_batches = build_record_batches(value_type, values1, values2)
GC.start
super(*r)
table = super(*record_batches)
[table.value.data_type, table.value.values]
end

def test_boolean
Expand Down
Loading