Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ruby/red-arrow-format/lib/arrow-format/array.rb
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,21 @@ def to_a
end

class DictionaryArray < Array
attr_reader :indices_buffer
attr_reader :dictionary
def initialize(type, size, validity_buffer, indices_buffer, dictionary)
super(type, size, validity_buffer)
@indices_buffer = indices_buffer
@dictionary = dictionary
end

def each_buffer
return to_enum(__method__) unless block_given?

yield(@validity_buffer)
yield(@indices_buffer)
end

def to_a
values = []
@dictionary.each do |dictionary_chunk|
Expand Down
2 changes: 1 addition & 1 deletion ruby/red-arrow-format/lib/arrow-format/bitmap.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def initialize(buffer, n_values)
end

def [](i)
(@validity_buffer.get_value(:U8, i / 8) & (1 << (i % 8))) > 0
(@buffer.get_value(:U8, i / 8) & (1 << (i % 8))) > 0
end

def each
Expand Down
14 changes: 2 additions & 12 deletions ruby/red-arrow-format/lib/arrow-format/field.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,8 @@ def to_flatbuffers
fb_field = FB::Field::Data.new
fb_field.name = @name
fb_field.nullable = @nullable
if @type.is_a?(DictionaryType)
fb_field.type = @type.value_type.to_flatbuffers
dictionary_encoding = FB::DictionaryEncoding::Data.new
dictionary_encoding.id = @dictionary_id
int = FB::Int::Data.new
int.bit_width = @type.index_type.bit_width
int.signed = @type.index_type.signed?
dictionary_encoding.index_type = int
dictionary_encoding.ordered = @type.ordered?
dictionary_encoding.dictionary_kind =
FB::DictionaryKind::DENSE_ARRAY
fb_field.dictionary = dictionary
if @type.respond_to?(:build_fb_field)
@type.build_fb_field(fb_field, self)
else
fb_field.type = @type.to_flatbuffers
end
Expand Down
2 changes: 1 addition & 1 deletion ruby/red-arrow-format/lib/arrow-format/file-writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def build_footer
fb_footer = FB::Footer::Data.new
fb_footer.version = FB::MetadataVersion::V5
fb_footer.schema = @fb_schema
# fb_footer.dictionaries = ... # TODO
fb_footer.dictionaries = @fb_dictionary_blocks
fb_footer.record_batches = @fb_record_batch_blocks
# fb_footer.custom_metadata = ... # TODO
FB::Footer.serialize(fb_footer)
Expand Down
79 changes: 57 additions & 22 deletions ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,38 +29,26 @@ class StreamingWriter
def initialize(output)
@output = output
@offset = 0
@fb_dictionary_blocks = []
@fb_record_batch_blocks = []
@written_dictionary_offsets = {}
end

def start(schema)
write_message(build_metadata(schema.to_flatbuffers))
# TODO: Write dictionaries
end

def write_record_batch(record_batch)
body_length = 0
record_batch.all_buffers_enumerator.each do |buffer|
body_length += aligned_buffer_size(buffer) if buffer
record_batch.schema.fields.each_with_index do |field, i|
next if field.dictionary_id.nil?
dictionary_array = record_batch.columns[i]
write_dictionary(field.dictionary_id, dictionary_array)
end
metadata = build_metadata(record_batch.to_flatbuffers, body_length)
fb_block = FB::Block::Data.new
fb_block.offset = @offset
fb_block.meta_data_length =
CONTINUATION.bytesize +
MessagePullReader::METADATA_LENGTH_SIZE +
metadata.bytesize
fb_block.body_length = body_length
@fb_record_batch_blocks << fb_block
write_message(metadata) do
record_batch.all_buffers_enumerator.each do |buffer|
write_buffer(buffer) if buffer
end
end
end

# TODO
# def write_dictionary_delta(id, dictionary)
# end
write_record_batch_based_message(record_batch,
record_batch.to_flatbuffers,
@fb_record_batch_blocks)
end

def finish
write_data(EOS)
Expand Down Expand Up @@ -100,6 +88,53 @@ def build_metadata(header, body_length=0)
metadata
end

def write_record_batch_based_message(record_batch, fb_header, fb_blocks)
body_length = 0
record_batch.all_buffers_enumerator.each do |buffer|
body_length += aligned_buffer_size(buffer) if buffer
end
metadata = build_metadata(fb_header, body_length)
fb_block = FB::Block::Data.new
fb_block.offset = @offset
fb_block.meta_data_length =
CONTINUATION.bytesize +
MessagePullReader::METADATA_LENGTH_SIZE +
metadata.bytesize
fb_block.body_length = body_length
fb_blocks << fb_block
write_message(metadata) do
record_batch.all_buffers_enumerator.each do |buffer|
write_buffer(buffer) if buffer
end
end
end

def write_dictionary(id, dictionary_array)
value_type = dictionary_array.type.value_type
dictionary = dictionary_array.dictionary

offset = @written_dictionary_offsets[id]
if offset.nil?
is_delta = false
else
is_delta = true
raise NotImplementedError,
"Delta dictionary message isn't implemented yet"
end

schema = Schema.new([Field.new("dummy", value_type, true, nil)])
size = dictionary.size
record_batch = RecordBatch.new(schema, size, [dictionary])
fb_dictionary_batch = FB::DictionaryBatch::Data.new
fb_dictionary_batch.id = id
fb_dictionary_batch.data = record_batch.to_flatbuffers
fb_dictionary_batch.delta = is_delta
write_record_batch_based_message(record_batch,
fb_dictionary_batch,
@fb_dictionary_blocks)
@written_dictionary_offsets[id] = dictionary_array.dictionary.size
end

def write_message(metadata)
write_data(CONTINUATION)
metadata_size = metadata.bytesize
Expand Down
14 changes: 14 additions & 0 deletions ruby/red-arrow-format/lib/arrow-format/type.rb
Original file line number Diff line number Diff line change
Expand Up @@ -873,5 +873,19 @@ def build_array(size, validity_buffer, indices_buffer, dictionary)
indices_buffer,
dictionary)
end

def build_fb_field(fb_field, field)
fb_dictionary_encoding = FB::DictionaryEncoding::Data.new
fb_dictionary_encoding.id = field.dictionary_id
fb_int = FB::Int::Data.new
fb_int.bit_width = @index_type.bit_width
fb_int.signed = @index_type.signed?
fb_dictionary_encoding.index_type = fb_int
fb_dictionary_encoding.ordered = @ordered
fb_dictionary_encoding.dictionary_kind =
FB::DictionaryKind::DENSE_ARRAY
fb_field.type = @value_type.to_flatbuffers
fb_field.dictionary = fb_dictionary_encoding
end
end
end
81 changes: 53 additions & 28 deletions ruby/red-arrow-format/test/test-writer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,16 +106,30 @@ def convert_type(red_arrow_type)
convert_field(field)
end
ArrowFormat::SparseUnionType.new(fields, red_arrow_type.type_codes)
when Arrow::DictionaryDataType
index_type = convert_type(red_arrow_type.index_data_type)
type = convert_type(red_arrow_type.value_data_type)
ArrowFormat::DictionaryType.new(index_type,
type,
red_arrow_type.ordered?)
else
raise "Unsupported type: #{red_arrow_type.inspect}"
end
end

def convert_field(red_arrow_field)
type = convert_type(red_arrow_field.data_type)
if type.is_a?(ArrowFormat::DictionaryType)
@dictionary_id ||= 0
dictionary_id = @dictionary_id
@dictionary_id += 1
else
dictionary_id = nil
end
ArrowFormat::Field.new(red_arrow_field.name,
convert_type(red_arrow_field.data_type),
type,
red_arrow_field.nullable?,
nil)
dictionary_id)
end

def convert_buffer(buffer)
Expand Down Expand Up @@ -171,11 +185,33 @@ def convert_array(red_arrow_array)
type.build_array(red_arrow_array.size,
types_buffer,
children)
when ArrowFormat::DictionaryType
validity_buffer = convert_buffer(red_arrow_array.null_bitmap)
indices_buffer = convert_buffer(red_arrow_array.indices.data_buffer)
dictionary = convert_array(red_arrow_array.dictionary)
type.build_array(red_arrow_array.size,
validity_buffer,
indices_buffer,
dictionary)
else
raise "Unsupported array #{red_arrow_array.inspect}"
end
end

def write(writer)
red_arrow_array = build_array
array = convert_array(red_arrow_array)
red_arrow_field = Arrow::Field.new("value",
red_arrow_array.value_data_type,
true)
fields = [convert_field(red_arrow_field)]
schema = ArrowFormat::Schema.new(fields)
record_batch = ArrowFormat::RecordBatch.new(schema, array.size, [array])
writer.start(schema)
writer.write_record_batch(record_batch)
writer.finish
end

class << self
def included(base)
base.class_eval do
Expand Down Expand Up @@ -939,6 +975,19 @@ def test_write
@values)
end
end

sub_test_case("Dictionary") do
def build_array
values = ["a", "b", "c", nil, "a"]
string_array = Arrow::StringArray.new(values)
string_array.dictionary_encode
end

def test_write
assert_equal(["a", "b", "c", nil, "a"],
@values)
end
end
end
end
end
Expand All @@ -952,19 +1001,7 @@ def setup
path = File.join(tmp_dir, "data.arrow")
File.open(path, "wb") do |output|
writer = ArrowFormat::FileWriter.new(output)
red_arrow_array = build_array
array = convert_array(red_arrow_array)
fields = [
ArrowFormat::Field.new("value",
array.type,
true,
nil),
]
schema = ArrowFormat::Schema.new(fields)
record_batch = ArrowFormat::RecordBatch.new(schema, array.size, [array])
writer.start(schema)
writer.write_record_batch(record_batch)
writer.finish
write(writer)
end
data = File.open(path, "rb", &:read).freeze
table = Arrow::Table.load(Arrow::Buffer.new(data), format: :arrow)
Expand All @@ -982,19 +1019,7 @@ def setup
path = File.join(tmp_dir, "data.arrows")
File.open(path, "wb") do |output|
writer = ArrowFormat::StreamingWriter.new(output)
red_arrow_array = build_array
array = convert_array(red_arrow_array)
fields = [
ArrowFormat::Field.new("value",
array.type,
true,
nil),
]
schema = ArrowFormat::Schema.new(fields)
record_batch = ArrowFormat::RecordBatch.new(schema, array.size, [array])
writer.start(schema)
writer.write_record_batch(record_batch)
writer.finish
write(writer)
end
data = File.open(path, "rb", &:read).freeze
table = Arrow::Table.load(Arrow::Buffer.new(data), format: :arrows)
Expand Down
Loading