Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog.d/24074_arrow_null_handling.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
The Arrow encoder now supports configurable null handling through the `allow_nullable_fields`
option. This controls whether nullable fields should be explicitly marked
as nullable in the Arrow schema, enabling better compatibility with
downstream systems that have specific requirements for null handling.

authors: benjamin-awd
211 changes: 209 additions & 2 deletions lib/codecs/src/encoding/format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ pub struct ArrowStreamSerializerConfig {
#[serde(skip)]
#[configurable(derived)]
pub schema: Option<Arc<arrow::datatypes::Schema>>,

/// Allow null values for non-nullable fields in the schema.
///
/// When enabled, missing or incompatible values will be encoded as null even for fields
/// marked as non-nullable in the Arrow schema. This is useful when working with downstream
/// systems that can handle null values through defaults, computed columns, or other mechanisms.
///
/// When disabled (default), missing values for non-nullable fields will cause encoding errors,
/// ensuring all required data is present before sending to the sink.
#[serde(default)]
#[configurable(metadata(docs::examples = true))]
pub allow_nullable_fields: bool,
}

impl std::fmt::Debug for ArrowStreamSerializerConfig {
Expand All @@ -45,6 +57,7 @@ impl std::fmt::Debug for ArrowStreamSerializerConfig {
.as_ref()
.map(|s| format!("{} fields", s.fields().len())),
)
.field("allow_nullable_fields", &self.allow_nullable_fields)
.finish()
}
}
Expand All @@ -54,6 +67,7 @@ impl ArrowStreamSerializerConfig {
pub fn new(schema: Arc<arrow::datatypes::Schema>) -> Self {
Self {
schema: Some(schema),
allow_nullable_fields: false,
}
}

Expand All @@ -77,12 +91,25 @@ pub struct ArrowStreamSerializer {
impl ArrowStreamSerializer {
/// Create a new ArrowStreamSerializer with the given configuration
pub fn new(config: ArrowStreamSerializerConfig) -> Result<Self, vector_common::Error> {
let schema = config.schema.ok_or_else(|| {
let mut schema = config.schema.ok_or_else(|| {
vector_common::Error::from(
"Arrow serializer requires a schema. Pass a schema or fetch from provider before creating serializer."
)
})?;

// If allow_nullable_fields is enabled, transform the schema once here
// instead of on every batch encoding
if config.allow_nullable_fields {
schema = Arc::new(Schema::new_with_metadata(
schema
.fields()
.iter()
.map(|f| Arc::new(make_field_nullable(f)))
.collect::<Vec<_>>(),
schema.metadata().clone(),
));
}

Ok(Self { schema })
}
}
Expand Down Expand Up @@ -177,13 +204,32 @@ pub fn encode_events_to_arrow_ipc_stream(
let ipc_err = |source| ArrowEncodingError::IpcWrite { source };

let mut buffer = BytesMut::new().writer();
let mut writer = StreamWriter::try_new(&mut buffer, &schema_ref).map_err(ipc_err)?;
let mut writer =
StreamWriter::try_new(&mut buffer, record_batch.schema_ref()).map_err(ipc_err)?;
writer.write(&record_batch).map_err(ipc_err)?;
writer.finish().map_err(ipc_err)?;

Ok(buffer.into_inner().freeze())
}

/// Recursively makes a Field and all its nested fields nullable
fn make_field_nullable(field: &arrow::datatypes::Field) -> arrow::datatypes::Field {
let new_data_type = match field.data_type() {
DataType::List(inner_field) => {
DataType::List(Arc::new(make_field_nullable(inner_field)))
}
DataType::Struct(fields) => {
DataType::Struct(fields.iter().map(|f| make_field_nullable(f)).collect())
}
DataType::Map(inner_field, sorted) => {
DataType::Map(Arc::new(make_field_nullable(inner_field)), *sorted)
}
other => other.clone(),
};

field.clone().with_data_type(new_data_type).with_nullable(true)
}

/// Builds an Arrow RecordBatch from events
fn build_record_batch(
schema: Arc<Schema>,
Expand Down Expand Up @@ -1442,4 +1488,165 @@ mod tests {
assert!(!id_array.is_null(1));
assert!(!id_array.is_null(2));
}

#[test]
fn test_config_allow_nullable_fields_overrides_schema() {
use tokio_util::codec::Encoder;

// Create events: One valid, one missing the "required" field
let mut log1 = LogEvent::default();
log1.insert("strict_field", 42);
let log2 = LogEvent::default();
let events = vec![Event::Log(log1), Event::Log(log2)];

let schema = Arc::new(Schema::new(vec![Field::new(
"strict_field",
DataType::Int64,
false,
)]));

let mut config = ArrowStreamSerializerConfig::new(Arc::clone(&schema));
config.allow_nullable_fields = true;

let mut serializer =
ArrowStreamSerializer::new(config).expect("Failed to create serializer");

let mut buffer = BytesMut::new();
serializer
.encode(events, &mut buffer)
.expect("Encoding should succeed when allow_nullable_fields is true");

let cursor = Cursor::new(buffer);
let mut reader = StreamReader::try_new(cursor, None).expect("Failed to create reader");
let batch = reader.next().unwrap().expect("Failed to read batch");

assert_eq!(batch.num_rows(), 2);

let binding = batch.schema();
let output_field = binding.field(0);
assert!(
output_field.is_nullable(),
"The output schema field should have been transformed to nullable=true"
);

let array = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();

assert_eq!(array.value(0), 42);
assert!(!array.is_null(0));
assert!(
array.is_null(1),
"The missing value should be encoded as null"
);
}

#[test]
fn test_make_field_nullable_with_nested_types() {
// Test that make_field_nullable recursively handles List and Struct types

// Create a nested structure: Struct containing a List of Structs
// struct { inner_list: [{ nested_field: Int64 }] }
let inner_struct_field = Field::new("nested_field", DataType::Int64, false);
let inner_struct =
DataType::Struct(arrow::datatypes::Fields::from(vec![inner_struct_field]));
let list_field = Field::new("item", inner_struct, false);
let list_type = DataType::List(Arc::new(list_field));
let outer_field = Field::new("inner_list", list_type, false);
let outer_struct = DataType::Struct(arrow::datatypes::Fields::from(vec![outer_field]));

let original_field = Field::new("root", outer_struct, false);

// Apply make_field_nullable
let nullable_field = make_field_nullable(&original_field);

// Verify root field is nullable
assert!(
nullable_field.is_nullable(),
"Root field should be nullable"
);

// Verify nested struct is nullable
if let DataType::Struct(root_fields) = nullable_field.data_type() {
let inner_list_field = &root_fields[0];
assert!(
inner_list_field.is_nullable(),
"inner_list field should be nullable"
);

// Verify list element is nullable
if let DataType::List(list_item_field) = inner_list_field.data_type() {
assert!(
list_item_field.is_nullable(),
"List item field should be nullable"
);

// Verify inner struct fields are nullable
if let DataType::Struct(inner_struct_fields) = list_item_field.data_type() {
let nested_field = &inner_struct_fields[0];
assert!(
nested_field.is_nullable(),
"nested_field should be nullable"
);
} else {
panic!("Expected Struct type for list items");
}
} else {
panic!("Expected List type for inner_list");
}
} else {
panic!("Expected Struct type for root field");
}
}

#[test]
fn test_make_field_nullable_with_map_type() {
// Test that make_field_nullable handles Map types
// Map is internally represented as List<Struct<key, value>>

// Create a map: Map<Utf8, Int64>
// Internally: List<Struct<entries: {key: Utf8, value: Int64}>>
let key_field = Field::new("key", DataType::Utf8, false);
let value_field = Field::new("value", DataType::Int64, false);
let entries_struct =
DataType::Struct(arrow::datatypes::Fields::from(vec![key_field, value_field]));
let entries_field = Field::new("entries", entries_struct, false);
let map_type = DataType::Map(Arc::new(entries_field), false);

let original_field = Field::new("my_map", map_type, false);

// Apply make_field_nullable
let nullable_field = make_field_nullable(&original_field);

// Verify root field is nullable
assert!(
nullable_field.is_nullable(),
"Root map field should be nullable"
);

// Verify map entries are nullable
if let DataType::Map(entries_field, _sorted) = nullable_field.data_type() {
assert!(
entries_field.is_nullable(),
"Map entries field should be nullable"
);

// Verify the struct inside the map is nullable
if let DataType::Struct(struct_fields) = entries_field.data_type() {
let key_field = &struct_fields[0];
let value_field = &struct_fields[1];
assert!(key_field.is_nullable(), "Map key field should be nullable");
assert!(
value_field.is_nullable(),
"Map value field should be nullable"
);
} else {
panic!("Expected Struct type for map entries");
}
} else {
panic!("Expected Map type for my_map field");
}
}
}
Loading