diff --git a/ruby/red-arrow-format/lib/arrow-format/array.rb b/ruby/red-arrow-format/lib/arrow-format/array.rb index 4728d7ca708a..73e87cf721c4 100644 --- a/ruby/red-arrow-format/lib/arrow-format/array.rb +++ b/ruby/red-arrow-format/lib/arrow-format/array.rb @@ -508,12 +508,21 @@ def to_a end class DictionaryArray < Array + attr_reader :indices_buffer + attr_reader :dictionary def initialize(type, size, validity_buffer, indices_buffer, dictionary) super(type, size, validity_buffer) @indices_buffer = indices_buffer @dictionary = dictionary end + def each_buffer + return to_enum(__method__) unless block_given? + + yield(@validity_buffer) + yield(@indices_buffer) + end + def to_a values = [] @dictionary.each do |dictionary_chunk| diff --git a/ruby/red-arrow-format/lib/arrow-format/bitmap.rb b/ruby/red-arrow-format/lib/arrow-format/bitmap.rb index 0cd517a37fb7..88a1ab2ff435 100644 --- a/ruby/red-arrow-format/lib/arrow-format/bitmap.rb +++ b/ruby/red-arrow-format/lib/arrow-format/bitmap.rb @@ -24,7 +24,7 @@ def initialize(buffer, n_values) end def [](i) - (@validity_buffer.get_value(:U8, i / 8) & (1 << (i % 8))) > 0 + (@buffer.get_value(:U8, i / 8) & (1 << (i % 8))) > 0 end def each diff --git a/ruby/red-arrow-format/lib/arrow-format/field.rb b/ruby/red-arrow-format/lib/arrow-format/field.rb index 3642c867c8b5..7736bbf5e7e7 100644 --- a/ruby/red-arrow-format/lib/arrow-format/field.rb +++ b/ruby/red-arrow-format/lib/arrow-format/field.rb @@ -34,18 +34,8 @@ def to_flatbuffers fb_field = FB::Field::Data.new fb_field.name = @name fb_field.nullable = @nullable - if @type.is_a?(DictionaryType) - fb_field.type = @type.value_type.to_flatbuffers - dictionary_encoding = FB::DictionaryEncoding::Data.new - dictionary_encoding.id = @dictionary_id - int = FB::Int::Data.new - int.bit_width = @type.index_type.bit_width - int.signed = @type.index_type.signed? - dictionary_encoding.index_type = int - dictionary_encoding.ordered = @type.ordered? - dictionary_encoding.dictionary_kind = - FB::DictionaryKind::DENSE_ARRAY - fb_field.dictionary = dictionary + if @type.respond_to?(:build_fb_field) + @type.build_fb_field(fb_field, self) else fb_field.type = @type.to_flatbuffers end diff --git a/ruby/red-arrow-format/lib/arrow-format/file-writer.rb b/ruby/red-arrow-format/lib/arrow-format/file-writer.rb index 8509be59b6de..27b6b55bbf9a 100644 --- a/ruby/red-arrow-format/lib/arrow-format/file-writer.rb +++ b/ruby/red-arrow-format/lib/arrow-format/file-writer.rb @@ -41,7 +41,7 @@ def build_footer fb_footer = FB::Footer::Data.new fb_footer.version = FB::MetadataVersion::V5 fb_footer.schema = @fb_schema - # fb_footer.dictionaries = ... # TODO + fb_footer.dictionaries = @fb_dictionary_blocks fb_footer.record_batches = @fb_record_batch_blocks # fb_footer.custom_metadata = ... # TODO FB::Footer.serialize(fb_footer) diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb b/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb index 313c1b38ad99..2f8f90b70622 100644 --- a/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb +++ b/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb @@ -29,38 +29,26 @@ class StreamingWriter def initialize(output) @output = output @offset = 0 + @fb_dictionary_blocks = [] @fb_record_batch_blocks = [] + @written_dictionary_offsets = {} end def start(schema) write_message(build_metadata(schema.to_flatbuffers)) - # TODO: Write dictionaries end def write_record_batch(record_batch) - body_length = 0 - record_batch.all_buffers_enumerator.each do |buffer| - body_length += aligned_buffer_size(buffer) if buffer + record_batch.schema.fields.each_with_index do |field, i| + next if field.dictionary_id.nil? + dictionary_array = record_batch.columns[i] + write_dictionary(field.dictionary_id, dictionary_array) end - metadata = build_metadata(record_batch.to_flatbuffers, body_length) - fb_block = FB::Block::Data.new - fb_block.offset = @offset - fb_block.meta_data_length = - CONTINUATION.bytesize + - MessagePullReader::METADATA_LENGTH_SIZE + - metadata.bytesize - fb_block.body_length = body_length - @fb_record_batch_blocks << fb_block - write_message(metadata) do - record_batch.all_buffers_enumerator.each do |buffer| - write_buffer(buffer) if buffer - end - end - end - # TODO - # def write_dictionary_delta(id, dictionary) - # end + write_record_batch_based_message(record_batch, + record_batch.to_flatbuffers, + @fb_record_batch_blocks) + end def finish write_data(EOS) @@ -100,6 +88,53 @@ def build_metadata(header, body_length=0) metadata end + def write_record_batch_based_message(record_batch, fb_header, fb_blocks) + body_length = 0 + record_batch.all_buffers_enumerator.each do |buffer| + body_length += aligned_buffer_size(buffer) if buffer + end + metadata = build_metadata(fb_header, body_length) + fb_block = FB::Block::Data.new + fb_block.offset = @offset + fb_block.meta_data_length = + CONTINUATION.bytesize + + MessagePullReader::METADATA_LENGTH_SIZE + + metadata.bytesize + fb_block.body_length = body_length + fb_blocks << fb_block + write_message(metadata) do + record_batch.all_buffers_enumerator.each do |buffer| + write_buffer(buffer) if buffer + end + end + end + + def write_dictionary(id, dictionary_array) + value_type = dictionary_array.type.value_type + dictionary = dictionary_array.dictionary + + offset = @written_dictionary_offsets[id] + if offset.nil? + is_delta = false + else + is_delta = true + raise NotImplementedError, + "Delta dictionary message isn't implemented yet" + end + + schema = Schema.new([Field.new("dummy", value_type, true, nil)]) + size = dictionary.size + record_batch = RecordBatch.new(schema, size, [dictionary]) + fb_dictionary_batch = FB::DictionaryBatch::Data.new + fb_dictionary_batch.id = id + fb_dictionary_batch.data = record_batch.to_flatbuffers + fb_dictionary_batch.delta = is_delta + write_record_batch_based_message(record_batch, + fb_dictionary_batch, + @fb_dictionary_blocks) + @written_dictionary_offsets[id] = dictionary_array.dictionary.size + end + def write_message(metadata) write_data(CONTINUATION) metadata_size = metadata.bytesize diff --git a/ruby/red-arrow-format/lib/arrow-format/type.rb b/ruby/red-arrow-format/lib/arrow-format/type.rb index 808117740e11..4ea41a25388a 100644 --- a/ruby/red-arrow-format/lib/arrow-format/type.rb +++ b/ruby/red-arrow-format/lib/arrow-format/type.rb @@ -873,5 +873,19 @@ def build_array(size, validity_buffer, indices_buffer, dictionary) indices_buffer, dictionary) end + + def build_fb_field(fb_field, field) + fb_dictionary_encoding = FB::DictionaryEncoding::Data.new + fb_dictionary_encoding.id = field.dictionary_id + fb_int = FB::Int::Data.new + fb_int.bit_width = @index_type.bit_width + fb_int.signed = @index_type.signed? + fb_dictionary_encoding.index_type = fb_int + fb_dictionary_encoding.ordered = @ordered + fb_dictionary_encoding.dictionary_kind = + FB::DictionaryKind::DENSE_ARRAY + fb_field.type = @value_type.to_flatbuffers + fb_field.dictionary = fb_dictionary_encoding + end end end diff --git a/ruby/red-arrow-format/test/test-writer.rb b/ruby/red-arrow-format/test/test-writer.rb index 183a5f29ddca..3e4b5bedba3a 100644 --- a/ruby/red-arrow-format/test/test-writer.rb +++ b/ruby/red-arrow-format/test/test-writer.rb @@ -106,16 +106,30 @@ def convert_type(red_arrow_type) convert_field(field) end ArrowFormat::SparseUnionType.new(fields, red_arrow_type.type_codes) + when Arrow::DictionaryDataType + index_type = convert_type(red_arrow_type.index_data_type) + type = convert_type(red_arrow_type.value_data_type) + ArrowFormat::DictionaryType.new(index_type, + type, + red_arrow_type.ordered?) else raise "Unsupported type: #{red_arrow_type.inspect}" end end def convert_field(red_arrow_field) + type = convert_type(red_arrow_field.data_type) + if type.is_a?(ArrowFormat::DictionaryType) + @dictionary_id ||= 0 + dictionary_id = @dictionary_id + @dictionary_id += 1 + else + dictionary_id = nil + end ArrowFormat::Field.new(red_arrow_field.name, - convert_type(red_arrow_field.data_type), + type, red_arrow_field.nullable?, - nil) + dictionary_id) end def convert_buffer(buffer) @@ -171,11 +185,33 @@ def convert_array(red_arrow_array) type.build_array(red_arrow_array.size, types_buffer, children) + when ArrowFormat::DictionaryType + validity_buffer = convert_buffer(red_arrow_array.null_bitmap) + indices_buffer = convert_buffer(red_arrow_array.indices.data_buffer) + dictionary = convert_array(red_arrow_array.dictionary) + type.build_array(red_arrow_array.size, + validity_buffer, + indices_buffer, + dictionary) else raise "Unsupported array #{red_arrow_array.inspect}" end end + def write(writer) + red_arrow_array = build_array + array = convert_array(red_arrow_array) + red_arrow_field = Arrow::Field.new("value", + red_arrow_array.value_data_type, + true) + fields = [convert_field(red_arrow_field)] + schema = ArrowFormat::Schema.new(fields) + record_batch = ArrowFormat::RecordBatch.new(schema, array.size, [array]) + writer.start(schema) + writer.write_record_batch(record_batch) + writer.finish + end + class << self def included(base) base.class_eval do @@ -939,6 +975,19 @@ def test_write @values) end end + + sub_test_case("Dictionary") do + def build_array + values = ["a", "b", "c", nil, "a"] + string_array = Arrow::StringArray.new(values) + string_array.dictionary_encode + end + + def test_write + assert_equal(["a", "b", "c", nil, "a"], + @values) + end + end end end end @@ -952,19 +1001,7 @@ def setup path = File.join(tmp_dir, "data.arrow") File.open(path, "wb") do |output| writer = ArrowFormat::FileWriter.new(output) - red_arrow_array = build_array - array = convert_array(red_arrow_array) - fields = [ - ArrowFormat::Field.new("value", - array.type, - true, - nil), - ] - schema = ArrowFormat::Schema.new(fields) - record_batch = ArrowFormat::RecordBatch.new(schema, array.size, [array]) - writer.start(schema) - writer.write_record_batch(record_batch) - writer.finish + write(writer) end data = File.open(path, "rb", &:read).freeze table = Arrow::Table.load(Arrow::Buffer.new(data), format: :arrow) @@ -982,19 +1019,7 @@ def setup path = File.join(tmp_dir, "data.arrows") File.open(path, "wb") do |output| writer = ArrowFormat::StreamingWriter.new(output) - red_arrow_array = build_array - array = convert_array(red_arrow_array) - fields = [ - ArrowFormat::Field.new("value", - array.type, - true, - nil), - ] - schema = ArrowFormat::Schema.new(fields) - record_batch = ArrowFormat::RecordBatch.new(schema, array.size, [array]) - writer.start(schema) - writer.write_record_batch(record_batch) - writer.finish + write(writer) end data = File.open(path, "rb", &:read).freeze table = Arrow::Table.load(Arrow::Buffer.new(data), format: :arrows)