diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs index 5ec21cecdbe..447d2711acc 100644 --- a/lang/rust/avro/src/error.rs +++ b/lang/rust/avro/src/error.rs @@ -235,6 +235,9 @@ pub enum Error { #[error("One union type {0:?} must match the `default`'s value type {1:?}")] GetDefaultUnion(SchemaKind, ValueKind), + #[error("`default`'s value type of field {0:?} in {1:?} must be {2:?}")] + GetDefaultRecordField(String, String, String), + #[error("JSON value {0} claims to be u64 but cannot be converted")] GetU64FromJson(serde_json::Number), diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs index 9cae97cff43..139a6259740 100644 --- a/lang/rust/avro/src/schema.rs +++ b/lang/rust/avro/src/schema.rs @@ -637,7 +637,7 @@ impl RecordField { field: &Map, position: usize, parser: &mut Parser, - enclosing_namespace: &Namespace, + enclosing_record: &Name, ) -> AvroResult { let name = field.name().ok_or(Error::GetNameFieldFromRecord)?; @@ -646,9 +646,16 @@ impl RecordField { } // TODO: "type" = "" - let schema = parser.parse_complex(field, enclosing_namespace)?; + let schema = parser.parse_complex(field, &enclosing_record.namespace)?; let default = field.get("default").cloned(); + Self::resolve_default_value( + &schema, + &name, + &enclosing_record.fullname(None), + &parser.parsed_schemas, + &default, + )?; let aliases = field.get("aliases").and_then(|aliases| { aliases.as_array().map(|aliases| { @@ -678,6 +685,56 @@ impl RecordField { }) } + fn resolve_default_value( + field_schema: &Schema, + field_name: &str, + record_name: &str, + names: &Names, + default: &Option, + ) -> AvroResult<()> { + if let Some(value) = default { + let avro_value = types::Value::from(value.clone()); + match field_schema { + Schema::Union(union_schema) => { + let schemas = &union_schema.schemas; + let resolved = schemas.iter().any(|schema| { + avro_value + .to_owned() + .resolve_internal(schema, names, &schema.namespace(), &None) + .is_ok() + }); + + if !resolved { + let schema: Option<&Schema> = schemas.get(0); + return match schema { + Some(first_schema) => Err(Error::GetDefaultUnion( + SchemaKind::from(first_schema), + types::ValueKind::from(avro_value), + )), + None => Err(Error::EmptyUnion), + }; + } + } + _ => { + let resolved = avro_value + .to_owned() + .resolve_internal(field_schema, names, &field_schema.namespace(), &None) + .is_ok(); + + if !resolved { + return Err(Error::GetDefaultRecordField( + field_name.to_string(), + record_name.to_string(), + field_schema.canonical_form(), + )); + } + } + }; + } + + Ok(()) + } + fn get_field_custom_attributes(field: &Map) -> BTreeMap { let mut custom_attributes: BTreeMap = BTreeMap::new(); for (key, value) in field { @@ -1066,7 +1123,7 @@ impl Parser { match *value { Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace), Value::Object(ref data) => self.parse_complex(data, enclosing_namespace), - Value::Array(ref data) => self.parse_union(data, enclosing_namespace, None), + Value::Array(ref data) => self.parse_union(data, enclosing_namespace), _ => Err(Error::ParseSchemaFromValidJson), } } @@ -1355,10 +1412,7 @@ impl Parser { other => self.parse_known_schema(other, enclosing_namespace), }, Some(Value::Object(data)) => self.parse_complex(data, enclosing_namespace), - Some(Value::Array(variants)) => { - let default = complex.get("default"); - self.parse_union(variants, enclosing_namespace, default) - } + Some(Value::Array(variants)) => self.parse_union(variants, enclosing_namespace), Some(unknown) => Err(Error::GetComplexType(unknown.clone())), None => Err(Error::GetComplexTypeField), } @@ -1453,7 +1507,7 @@ impl Parser { .filter_map(|field| field.as_object()) .enumerate() .map(|(position, field)| { - RecordField::parse(field, position, self, &fully_qualified_name.namespace) + RecordField::parse(field, position, self, &fully_qualified_name) }) .collect::>() })?; @@ -1553,6 +1607,19 @@ impl Parser { } } + if let Some(ref value) = default { + let resolved = types::Value::from(value.clone()) + .to_owned() + .resolve_enum(&symbols, &Some(value.to_string()), &None) + .is_ok(); + if !resolved { + return Err(Error::GetEnumDefault { + symbol: value.to_string(), + symbols, + }); + } + } + let schema = Schema::Enum(EnumSchema { name: fully_qualified_name.clone(), aliases: aliases.clone(), @@ -1601,40 +1668,11 @@ impl Parser { &mut self, items: &[Value], enclosing_namespace: &Namespace, - default: Option<&Value>, ) -> AvroResult { items .iter() .map(|v| self.parse(v, enclosing_namespace)) .collect::, _>>() - .and_then(|schemas| { - if let Some(default_value) = default.cloned() { - let avro_value = types::Value::from(default_value); - let resolved = schemas.iter().any(|schema| { - avro_value - .to_owned() - .resolve_internal( - schema, - &self.parsed_schemas, - &schema.namespace(), - &None, - ) - .is_ok() - }); - - if !resolved { - let schema: Option<&Schema> = schemas.get(0); - return match schema { - Some(first_schema) => Err(Error::GetDefaultUnion( - SchemaKind::from(first_schema), - types::ValueKind::from(avro_value), - )), - None => Err(Error::EmptyUnion), - }; - } - } - Ok(schemas) - }) .and_then(|schemas| Ok(Schema::Union(UnionSchema::new(schemas)?))) } @@ -5469,4 +5507,312 @@ mod tests { Ok(()) } + + #[test] + fn test_avro_3851_validate_default_value_of_simple_record_field() -> TestResult { + let schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": "int", + "default": "invalid" + } + ] + } + "#; + let expected = Error::GetDefaultRecordField( + "f1".to_string(), + "ns.record1".to_string(), + r#""int""#.to_string(), + ) + .to_string(); + let result = Schema::parse_str(schema_str); + assert!(result.is_err()); + let err = result + .map_err(|e| e.to_string()) + .err() + .unwrap_or_else(|| "unexpected".to_string()); + assert_eq!(expected, err); + + Ok(()) + } + + #[test] + fn test_avro_3851_validate_default_value_of_nested_record_field() -> TestResult { + let schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": { + "name": "record2", + "type": "record", + "fields": [ + { + "name": "f1_1", + "type": "int" + } + ] + }, + "default": "invalid" + } + ] + } + "#; + let expected = Error::GetDefaultRecordField( + "f1".to_string(), + "ns.record1".to_string(), + r#"{"name":"ns.record2","type":"record","fields":[{"name":"f1_1","type":"int"}]}"# + .to_string(), + ) + .to_string(); + let result = Schema::parse_str(schema_str); + assert!(result.is_err()); + let err = result + .map_err(|e| e.to_string()) + .err() + .unwrap_or_else(|| "unexpected".to_string()); + assert_eq!(expected, err); + + Ok(()) + } + + #[test] + fn test_avro_3851_validate_default_value_of_enum_record_field() -> TestResult { + let schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": { + "name": "enum1", + "type": "enum", + "symbols": ["a", "b", "c"] + }, + "default": "invalid" + } + ] + } + "#; + let expected = Error::GetDefaultRecordField( + "f1".to_string(), + "ns.record1".to_string(), + r#"{"name":"ns.enum1","type":"enum","symbols":["a","b","c"]}"#.to_string(), + ) + .to_string(); + let result = Schema::parse_str(schema_str); + assert!(result.is_err()); + let err = result + .map_err(|e| e.to_string()) + .err() + .unwrap_or_else(|| "unexpected".to_string()); + assert_eq!(expected, err); + + Ok(()) + } + + #[test] + fn test_avro_3851_validate_default_value_of_fixed_record_field() -> TestResult { + let schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": { + "name": "fixed1", + "type": "fixed", + "size": 3 + }, + "default": 100 + } + ] + } + "#; + let expected = Error::GetDefaultRecordField( + "f1".to_string(), + "ns.record1".to_string(), + r#"{"name":"ns.fixed1","type":"fixed","size":3}"#.to_string(), + ) + .to_string(); + let result = Schema::parse_str(schema_str); + assert!(result.is_err()); + let err = result + .map_err(|e| e.to_string()) + .err() + .unwrap_or_else(|| "unexpected".to_string()); + assert_eq!(expected, err); + + Ok(()) + } + + #[test] + fn test_avro_3851_validate_default_value_of_array_record_field() -> TestResult { + let schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": "array", + "items": "int", + "default": "invalid" + } + ] + } + "#; + let expected = Error::GetDefaultRecordField( + "f1".to_string(), + "ns.record1".to_string(), + r#"{"type":"array","items":"int"}"#.to_string(), + ) + .to_string(); + let result = Schema::parse_str(schema_str); + assert!(result.is_err()); + let err = result + .map_err(|e| e.to_string()) + .err() + .unwrap_or_else(|| "unexpected".to_string()); + assert_eq!(expected, err); + + Ok(()) + } + + #[test] + fn test_avro_3851_validate_default_value_of_map_record_field() -> TestResult { + let schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": "map", + "values": "string", + "default": "invalid" + } + ] + } + "#; + let expected = Error::GetDefaultRecordField( + "f1".to_string(), + "ns.record1".to_string(), + r#"{"type":"map","values":"string"}"#.to_string(), + ) + .to_string(); + let result = Schema::parse_str(schema_str); + assert!(result.is_err()); + let err = result + .map_err(|e| e.to_string()) + .err() + .unwrap_or_else(|| "unexpected".to_string()); + assert_eq!(expected, err); + + Ok(()) + } + + #[test] + fn test_avro_3851_validate_default_value_of_ref_record_field() -> TestResult { + let schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": { + "name": "record2", + "type": "record", + "fields": [ + { + "name": "f1_1", + "type": "int" + } + ] + } + }, { + "name": "f2", + "type": "ns.record2", + "default": { "f1_1": true } + } + ] + } + "#; + let expected = Error::GetDefaultRecordField( + "f2".to_string(), + "ns.record1".to_string(), + r#""ns.record2""#.to_string(), + ) + .to_string(); + let result = Schema::parse_str(schema_str); + assert!(result.is_err()); + let err = result + .map_err(|e| e.to_string()) + .err() + .unwrap_or_else(|| "unexpected".to_string()); + assert_eq!(expected, err); + + Ok(()) + } + + #[test] + fn test_avro_3851_validate_default_value_of_enum() -> TestResult { + let schema_str = r#" + { + "name": "enum1", + "namespace": "ns", + "type": "enum", + "symbols": ["a", "b", "c"], + "default": 100 + } + "#; + let expected = Error::EnumDefaultWrongType(100.into()).to_string(); + let result = Schema::parse_str(schema_str); + assert!(result.is_err()); + let err = result + .map_err(|e| e.to_string()) + .err() + .unwrap_or_else(|| "unexpected".to_string()); + assert_eq!(expected, err); + + let schema_str = r#" + { + "name": "enum1", + "namespace": "ns", + "type": "enum", + "symbols": ["a", "b", "c"], + "default": "d" + } + "#; + let expected = Error::GetEnumDefault { + symbol: "d".to_string(), + symbols: vec!["a".to_string(), "b".to_string(), "c".to_string()], + } + .to_string(); + let result = Schema::parse_str(schema_str); + assert!(result.is_err()); + let err = result + .map_err(|e| e.to_string()) + .err() + .unwrap_or_else(|| "unexpected".to_string()); + assert_eq!(expected, err); + + Ok(()) + } } diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs index e9009d03b1e..11bf6356419 100644 --- a/lang/rust/avro/src/types.rs +++ b/lang/rust/avro/src/types.rs @@ -872,7 +872,7 @@ impl Value { } } - fn resolve_enum( + pub(crate) fn resolve_enum( self, symbols: &[String], enum_default: &Option, diff --git a/lang/rust/avro/tests/schema.rs b/lang/rust/avro/tests/schema.rs index f0d11e954c0..1d2cb8b4d98 100644 --- a/lang/rust/avro/tests/schema.rs +++ b/lang/rust/avro/tests/schema.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::io::{Cursor, Read}; +use std::{ + collections::HashMap, + io::{Cursor, Read}, +}; use apache_avro::{ from_avro_datum, from_value, @@ -2225,3 +2228,391 @@ fn test_avro_3847_union_field_with_default_value_of_ref_with_enclosing_namespace Ok(()) } + +fn write_schema_for_default_value_test() -> apache_avro::AvroResult> { + let writer_schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": "int" + } + ] + } + "#; + let writer_schema = Schema::parse_str(writer_schema_str)?; + let mut writer = Writer::new(&writer_schema, Vec::new()); + let mut record = Record::new(writer.schema()) + .ok_or("Expected Some(Record), but got None") + .unwrap(); + record.put("f1", 10); + writer.append(record)?; + + writer.into_inner() +} + +#[test] +fn test_avro_3851_read_default_value_for_simple_record_field() -> TestResult { + let reader_schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": "int" + }, { + "name": "f2", + "type": "int", + "default": 20 + } + ] + } + "#; + let reader_schema = Schema::parse_str(reader_schema_str)?; + let input = write_schema_for_default_value_test()?; + let reader = Reader::with_schema(&reader_schema, &input[..])?; + let result = reader.collect::, _>>()?; + + assert_eq!(1, result.len()); + + let expected = Value::Record(vec![ + ("f1".to_string(), Value::Int(10)), + ("f2".to_string(), Value::Int(20)), + ]); + + assert_eq!(expected, result[0]); + + Ok(()) +} + +#[test] +fn test_avro_3851_read_default_value_for_nested_record_field() -> TestResult { + let reader_schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": "int" + }, { + "name": "f2", + "type": { + "name": "record2", + "type": "record", + "fields": [ + { + "name": "f1_1", + "type": "int" + } + ] + }, + "default": { + "f1_1": 100 + } + } + ] + } + "#; + let reader_schema = Schema::parse_str(reader_schema_str)?; + let input = write_schema_for_default_value_test()?; + let reader = Reader::with_schema(&reader_schema, &input[..])?; + let result = reader.collect::, _>>()?; + + assert_eq!(1, result.len()); + + let expected = Value::Record(vec![ + ("f1".to_string(), Value::Int(10)), + ( + "f2".to_string(), + Value::Record(vec![("f1_1".to_string(), 100.into())]), + ), + ]); + + assert_eq!(expected, result[0]); + + Ok(()) +} + +#[test] +fn test_avro_3851_read_default_value_for_enum_record_field() -> TestResult { + let reader_schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": "int" + }, { + "name": "f2", + "type": { + "name": "enum1", + "type": "enum", + "symbols": ["a", "b", "c"] + }, + "default": "a" + } + ] + } + "#; + let reader_schema = Schema::parse_str(reader_schema_str)?; + let input = write_schema_for_default_value_test()?; + let reader = Reader::with_schema(&reader_schema, &input[..])?; + let result = reader.collect::, _>>()?; + + assert_eq!(1, result.len()); + + let expected = Value::Record(vec![ + ("f1".to_string(), Value::Int(10)), + ("f2".to_string(), Value::Enum(0, "a".to_string())), + ]); + + assert_eq!(expected, result[0]); + + Ok(()) +} + +#[test] +fn test_avro_3851_read_default_value_for_fixed_record_field() -> TestResult { + let reader_schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": "int" + }, { + "name": "f2", + "type": { + "name": "fixed1", + "type": "fixed", + "size": 3 + }, + "default": "abc" + } + ] + } + "#; + let reader_schema = Schema::parse_str(reader_schema_str)?; + let input = write_schema_for_default_value_test()?; + let reader = Reader::with_schema(&reader_schema, &input[..])?; + let result = reader.collect::, _>>()?; + + assert_eq!(1, result.len()); + + let expected = Value::Record(vec![ + ("f1".to_string(), Value::Int(10)), + ("f2".to_string(), Value::Fixed(3, vec![b'a', b'b', b'c'])), + ]); + + assert_eq!(expected, result[0]); + + Ok(()) +} + +#[test] +fn test_avro_3851_read_default_value_for_array_record_field() -> TestResult { + let reader_schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": "int" + }, { + "name": "f2", + "type": "array", + "items": "int", + "default": [1, 2, 3] + } + ] + } + "#; + let reader_schema = Schema::parse_str(reader_schema_str)?; + let input = write_schema_for_default_value_test()?; + let reader = Reader::with_schema(&reader_schema, &input[..])?; + let result = reader.collect::, _>>()?; + + assert_eq!(1, result.len()); + + let expected = Value::Record(vec![ + ("f1".to_string(), Value::Int(10)), + ( + "f2".to_string(), + Value::Array(vec![1.into(), 2.into(), 3.into()]), + ), + ]); + + assert_eq!(expected, result[0]); + + Ok(()) +} + +#[test] +fn test_avro_3851_read_default_value_for_map_record_field() -> TestResult { + let reader_schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": "int" + }, { + "name": "f2", + "type": "map", + "values": "string", + "default": { "a": "A", "b": "B", "c": "C" } + } + ] + } + "#; + let reader_schema = Schema::parse_str(reader_schema_str)?; + let input = write_schema_for_default_value_test()?; + let reader = Reader::with_schema(&reader_schema, &input[..])?; + let result = reader.collect::, _>>()?; + + assert_eq!(1, result.len()); + + let map = HashMap::from_iter([ + ("a".to_string(), "A".into()), + ("b".to_string(), "B".into()), + ("c".to_string(), "C".into()), + ]); + let expected = Value::Record(vec![ + ("f1".to_string(), Value::Int(10)), + ("f2".to_string(), Value::Map(map)), + ]); + + assert_eq!(expected, result[0]); + + Ok(()) +} + +#[test] +fn test_avro_3851_read_default_value_for_ref_record_field() -> TestResult { + let writer_schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": { + "name": "record2", + "type": "record", + "fields": [ + { + "name": "f1_1", + "type": "int" + } + ] + } + } + ] + } + "#; + let writer_schema = Schema::parse_str(writer_schema_str)?; + let mut writer = Writer::new(&writer_schema, Vec::new()); + let mut record = Record::new(writer.schema()).ok_or("Expected Some(Record), but got None")?; + record.put("f1", Value::Record(vec![("f1_1".to_string(), 10.into())])); + writer.append(record)?; + + let reader_schema_str = r#" + { + "name": "record1", + "namespace": "ns", + "type": "record", + "fields": [ + { + "name": "f1", + "type": { + "name": "record2", + "type": "record", + "fields": [ + { + "name": "f1_1", + "type": "int" + } + ] + } + }, { + "name": "f2", + "type": "ns.record2", + "default": { "f1_1": 100 } + } + ] + } + "#; + let reader_schema = Schema::parse_str(reader_schema_str)?; + let input = writer.into_inner()?; + let reader = Reader::with_schema(&reader_schema, &input[..])?; + let result = reader.collect::, _>>()?; + + assert_eq!(1, result.len()); + + let expected = Value::Record(vec![ + ( + "f1".to_string(), + Value::Record(vec![("f1_1".to_string(), 10.into())]), + ), + ( + "f2".to_string(), + Value::Record(vec![("f1_1".to_string(), 100.into())]), + ), + ]); + + assert_eq!(expected, result[0]); + + Ok(()) +} + +#[test] +fn test_avro_3851_read_default_value_for_enum() -> TestResult { + let writer_schema_str = r#" + { + "name": "enum1", + "namespace": "ns", + "type": "enum", + "symbols": ["a", "b", "c"] + } + "#; + let writer_schema = Schema::parse_str(writer_schema_str)?; + let mut writer = Writer::new(&writer_schema, Vec::new()); + writer.append("c")?; + + let reader_schema_str = r#" + { + "name": "enum1", + "namespace": "ns", + "type": "enum", + "symbols": ["a", "b"], + "default": "a" + } + "#; + let reader_schema = Schema::parse_str(reader_schema_str)?; + let input = writer.into_inner()?; + let reader = Reader::with_schema(&reader_schema, &input[..])?; + let result = reader.collect::, _>>()?; + + assert_eq!(1, result.len()); + + let expected = Value::Enum(0, "a".to_string()); + assert_eq!(expected, result[0]); + + Ok(()) +}