From 1cea6907a24773bdc5d7282fdd90e92b6aef0ab3 Mon Sep 17 00:00:00 2001 From: Marcos Schroh <2828842+marcosschroh@users.noreply.github.com> Date: Tue, 28 Nov 2023 10:02:49 +0100 Subject: [PATCH] =?UTF-8?q?AVRO-3904:=20[RUST]=20return=20a=20Result=20whe?= =?UTF-8?q?n=20checking=20schema=20compatibility=20so=20the=20=E2=80=A6=20?= =?UTF-8?q?(#2587)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * AVRO-3904: [Rust] return a Result when checking schema compatibility so the end users will have feedback in case or errors Co-authored-by: Marcos Schroh --- lang/rust/avro/README.md | 4 +- lang/rust/avro/src/error.rs | 42 ++ lang/rust/avro/src/lib.rs | 4 +- lang/rust/avro/src/schema.rs | 36 ++ lang/rust/avro/src/schema_compatibility.rs | 524 ++++++++++++++------- 5 files changed, 439 insertions(+), 171 deletions(-) diff --git a/lang/rust/avro/README.md b/lang/rust/avro/README.md index 07b18748f7d..a349847fac7 100644 --- a/lang/rust/avro/README.md +++ b/lang/rust/avro/README.md @@ -634,7 +634,7 @@ use apache_avro::{Schema, schema_compatibility::SchemaCompatibility}; let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap(); let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap(); -assert_eq!(true, SchemaCompatibility::can_read(&writers_schema, &readers_schema)); +assert!(SchemaCompatibility::can_read(&writers_schema, &readers_schema).is_ok()); ``` 2. Incompatible schemas (a long array schema cannot be read by an int array schema) @@ -647,7 +647,7 @@ use apache_avro::{Schema, schema_compatibility::SchemaCompatibility}; let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap(); let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap(); -assert_eq!(false, SchemaCompatibility::can_read(&writers_schema, &readers_schema)); +assert!(SchemaCompatibility::can_read(&writers_schema, &readers_schema).is_err()); ``` diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs index 8fa14602753..810c5687a03 100644 --- a/lang/rust/avro/src/error.rs +++ b/lang/rust/avro/src/error.rs @@ -480,6 +480,48 @@ pub enum Error { BadCodecMetadata, } +#[derive(thiserror::Error, Debug)] +pub enum CompatibilityError { + #[error("Schemas are not compatible. Writer schema is {writer_schema_type}, but reader schema is {reader_schema_type}")] + WrongType { + writer_schema_type: String, + reader_schema_type: String, + }, + + #[error("Schemas are not compatible. The {schema_type} should have been {expected_type}")] + TypeExpected { + schema_type: String, + expected_type: String, + }, + + #[error("Schemas are not compatible. Field '{0}' in reader schema does not match the type in the writer schema")] + FieldTypeMismatch(String), + + #[error("Schemas are not compatible. Schemas mismatch")] + SchemaMismatch, + + #[error("Schemas are not compatible. Field '{0}' in reader schema must have a default value")] + MissingDefaultValue(String), + + #[error("Schemas are not compatible. Reader's symbols must contain all writer's symbols")] + MissingSymbols, + + #[error("Schemas are not compatible. All elements in union must match for both schemas")] + MissingUnionElements, + + #[error("Schemas are not compatible. Name and size don't match for fixed")] + FixedMismatch, + + #[error("Schemas are not compatible. The name must be the same for both schemas. Writer's name {writer_name} and reader's name {reader_name}")] + NameMismatch { + writer_name: String, + reader_name: String, + }, + + #[error("Schemas are not compatible. Unknown type for '{0}'. Make sure that the type is a valid one")] + Inconclusive(String), +} + impl serde::ser::Error for Error { fn custom(msg: T) -> Self { Error::SerializeValue(msg.to_string()) diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs index b2d930068b1..cb49555d567 100644 --- a/lang/rust/avro/src/lib.rs +++ b/lang/rust/avro/src/lib.rs @@ -747,7 +747,7 @@ //! //! let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap(); //! let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap(); -//! assert_eq!(true, SchemaCompatibility::can_read(&writers_schema, &readers_schema)); +//! assert!(SchemaCompatibility::can_read(&writers_schema, &readers_schema).is_ok()); //! ``` //! //! 2. Incompatible schemas (a long array schema cannot be read by an int array schema) @@ -760,7 +760,7 @@ //! //! let writers_schema = Schema::parse_str(r#"{"type": "array", "items":"long"}"#).unwrap(); //! let readers_schema = Schema::parse_str(r#"{"type": "array", "items":"int"}"#).unwrap(); -//! assert_eq!(false, SchemaCompatibility::can_read(&writers_schema, &readers_schema)); +//! assert!(SchemaCompatibility::can_read(&writers_schema, &readers_schema).is_err()); //! ``` mod codec; diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs index f2487e316f4..181e3fcffee 100644 --- a/lang/rust/avro/src/schema.rs +++ b/lang/rust/avro/src/schema.rs @@ -206,6 +206,42 @@ impl From<&types::Value> for SchemaKind { } } +// Implement `Display` for `SchemaKind`. +impl fmt::Display for Schema { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Schema::Null => write!(f, "Null"), + Schema::Boolean => write!(f, "Boolean"), + Schema::Int => write!(f, "Int"), + Schema::Long => write!(f, "Long"), + Schema::Float => write!(f, "Float"), + Schema::Double => write!(f, "Double"), + Schema::Bytes => write!(f, "Bytes"), + Schema::String => write!(f, "String"), + Schema::Array(..) => write!(f, "Array"), + Schema::Map(..) => write!(f, "Map"), + Schema::Union(..) => write!(f, "Union"), + Schema::Record(..) => write!(f, "Record"), + Schema::Enum(..) => write!(f, "Enum"), + Schema::Fixed(..) => write!(f, "Fixed"), + Schema::Decimal(..) => write!(f, "Decimal"), + Schema::BigDecimal => write!(f, "BigDecimal"), + Schema::Uuid => write!(f, "Uuid"), + Schema::Date => write!(f, "Date"), + Schema::TimeMillis => write!(f, "TimeMillis"), + Schema::TimeMicros => write!(f, "TimeMicros"), + Schema::TimestampMillis => write!(f, "TimestampMillis"), + Schema::TimestampMicros => write!(f, "TimestampMicros"), + Schema::LocalTimestampMillis => write!(f, "LocalTimestampMillis"), + Schema::LocalTimestampMicros => write!(f, "LocalTimestampMicros"), + Schema::Duration => write!(f, "Duration"), + Schema::Ref { name } => { + write!(f, "{}", name.name) + } + } + } +} + /// Represents names for `record`, `enum` and `fixed` Avro schemas. /// /// Each of these `Schema`s have a `fullname` composed of two parts: diff --git a/lang/rust/avro/src/schema_compatibility.rs b/lang/rust/avro/src/schema_compatibility.rs index 8a0b2a4d793..1f2f29e3451 100644 --- a/lang/rust/avro/src/schema_compatibility.rs +++ b/lang/rust/avro/src/schema_compatibility.rs @@ -16,7 +16,10 @@ // under the License. //! Logic for checking schema compatibility -use crate::schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind}; +use crate::{ + error::CompatibilityError, + schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind}, +}; use std::{ collections::{hash_map::DefaultHasher, HashSet}, hash::Hasher, @@ -37,7 +40,11 @@ impl Checker { } } - pub(crate) fn can_read(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool { + pub(crate) fn can_read( + &mut self, + writers_schema: &Schema, + readers_schema: &Schema, + ) -> Result<(), CompatibilityError> { self.full_match_schemas(writers_schema, readers_schema) } @@ -45,44 +52,52 @@ impl Checker { &mut self, writers_schema: &Schema, readers_schema: &Schema, - ) -> bool { + ) -> Result<(), CompatibilityError> { if self.recursion_in_progress(writers_schema, readers_schema) { - return true; + return Ok(()); } - if !SchemaCompatibility::match_schemas(writers_schema, readers_schema) { - return false; - } + SchemaCompatibility::match_schemas(writers_schema, readers_schema)?; let w_type = SchemaKind::from(writers_schema); let r_type = SchemaKind::from(readers_schema); if w_type != SchemaKind::Union && (r_type.is_primitive() || r_type == SchemaKind::Fixed) { - return true; + return Ok(()); } match r_type { SchemaKind::Record => self.match_record_schemas(writers_schema, readers_schema), SchemaKind::Map => { if let Schema::Map(w_m) = writers_schema { - if let Schema::Map(r_m) = readers_schema { - self.full_match_schemas(w_m, r_m) - } else { - unreachable!("readers_schema should have been Schema::Map") + match readers_schema { + Schema::Map(r_m) => self.full_match_schemas(w_m, r_m), + _ => Err(CompatibilityError::WrongType { + writer_schema_type: format!("{}", writers_schema), + reader_schema_type: format!("{}", readers_schema), + }), } } else { - unreachable!("writers_schema should have been Schema::Map") + Err(CompatibilityError::WrongType { + writer_schema_type: format!("{}", writers_schema), + reader_schema_type: format!("{}", readers_schema), + }) } } SchemaKind::Array => { if let Schema::Array(w_a) = writers_schema { - if let Schema::Array(r_a) = readers_schema { - self.full_match_schemas(w_a, r_a) - } else { - unreachable!("readers_schema should have been Schema::Array") + match readers_schema { + Schema::Array(r_a) => self.full_match_schemas(w_a, r_a), + _ => Err(CompatibilityError::WrongType { + writer_schema_type: format!("{}", writers_schema), + reader_schema_type: format!("{}", readers_schema), + }), } } else { - unreachable!("writers_schema should have been Schema::Array") + Err(CompatibilityError::WrongType { + writer_schema_type: format!("{}", writers_schema), + reader_schema_type: format!("{}", readers_schema), + }) } } SchemaKind::Union => self.match_union_schemas(writers_schema, readers_schema), @@ -96,10 +111,12 @@ impl Checker { symbols: r_symbols, .. }) = readers_schema { - return !w_symbols.iter().any(|e| !r_symbols.contains(e)); + if w_symbols.iter().all(|e| r_symbols.contains(e)) { + return Ok(()); + } } } - false + Err(CompatibilityError::MissingSymbols) } _ => { if w_type == SchemaKind::Union { @@ -109,16 +126,25 @@ impl Checker { } } } - false + Err(CompatibilityError::Inconclusive(String::from( + "writers_schema", + ))) } } } - fn match_record_schemas(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool { + fn match_record_schemas( + &mut self, + writers_schema: &Schema, + readers_schema: &Schema, + ) -> Result<(), CompatibilityError> { let w_type = SchemaKind::from(writers_schema); if w_type == SchemaKind::Union { - return false; + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: String::from("record"), + }); } if let Schema::Record(RecordSchema { @@ -133,39 +159,58 @@ impl Checker { { for field in r_fields.iter() { if let Some(pos) = w_lookup.get(&field.name) { - if !self.full_match_schemas(&w_fields[*pos].schema, &field.schema) { - return false; + if self + .full_match_schemas(&w_fields[*pos].schema, &field.schema) + .is_err() + { + return Err(CompatibilityError::FieldTypeMismatch(field.name.clone())); } } else if field.default.is_none() { - return false; + return Err(CompatibilityError::MissingDefaultValue(field.name.clone())); } } } } - true + Ok(()) } - fn match_union_schemas(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool { + fn match_union_schemas( + &mut self, + writers_schema: &Schema, + readers_schema: &Schema, + ) -> Result<(), CompatibilityError> { + // Do not need to check the SchemaKind of reader as this function + // is only called when the readers_schema is Union let w_type = SchemaKind::from(writers_schema); - let r_type = SchemaKind::from(readers_schema); - - assert_eq!(r_type, SchemaKind::Union); if w_type == SchemaKind::Union { if let Schema::Union(u) = writers_schema { - u.schemas + if u.schemas .iter() - .all(|schema| self.full_match_schemas(schema, readers_schema)) - } else { - unreachable!("writers_schema should have been Schema::Union") + .all(|schema| self.full_match_schemas(schema, readers_schema).is_ok()) + { + return Ok(()); + } else { + return Err(CompatibilityError::MissingUnionElements); + } } + // } else { + // return Err(Error::CompatibilityError(String::from( + // "writers_schema should have been Schema::Union", + // ))); + // } } else if let Schema::Union(u) = readers_schema { - u.schemas + // This check is nneded because the writer_schema can be a not union + // but the type can be contain in the union of the reeader schema + // e.g. writer_schema is string and reader_schema is [string, int] + if u.schemas .iter() - .any(|schema| self.full_match_schemas(writers_schema, schema)) - } else { - unreachable!("readers_schema should have been Schema::Union") + .any(|schema| self.full_match_schemas(writers_schema, schema).is_ok()) + { + return Ok(()); + } } + Err(CompatibilityError::SchemaMismatch) } fn recursion_in_progress(&mut self, writers_schema: &Schema, readers_schema: &Schema) -> bool { @@ -187,7 +232,10 @@ impl Checker { impl SchemaCompatibility { /// `can_read` performs a full, recursive check that a datum written using the /// writers_schema can be read using the readers_schema. - pub fn can_read(writers_schema: &Schema, readers_schema: &Schema) -> bool { + pub fn can_read( + writers_schema: &Schema, + readers_schema: &Schema, + ) -> Result<(), CompatibilityError> { let mut c = Checker::new(); c.can_read(writers_schema, readers_schema) } @@ -195,8 +243,8 @@ impl SchemaCompatibility { /// `mutual_read` performs a full, recursive check that a datum written using either /// the writers_schema or the readers_schema can be read using the other schema. pub fn mutual_read(writers_schema: &Schema, readers_schema: &Schema) -> bool { - SchemaCompatibility::can_read(writers_schema, readers_schema) - && SchemaCompatibility::can_read(readers_schema, writers_schema) + SchemaCompatibility::can_read(writers_schema, readers_schema).is_ok() + && SchemaCompatibility::can_read(readers_schema, writers_schema).is_ok() } /// `match_schemas` performs a basic check that a datum written with the @@ -204,29 +252,45 @@ impl SchemaCompatibility { /// matching the types, including schema promotion, and matching the full name for /// named types. Aliases for named types are not supported here, and the rust /// implementation of Avro in general does not include support for aliases (I think). - pub(crate) fn match_schemas(writers_schema: &Schema, readers_schema: &Schema) -> bool { + pub(crate) fn match_schemas( + writers_schema: &Schema, + readers_schema: &Schema, + ) -> Result<(), CompatibilityError> { let w_type = SchemaKind::from(writers_schema); let r_type = SchemaKind::from(readers_schema); if w_type == SchemaKind::Union || r_type == SchemaKind::Union { - return true; + return Ok(()); } if w_type == r_type { if r_type.is_primitive() { - return true; + return Ok(()); } match r_type { SchemaKind::Record => { if let Schema::Record(RecordSchema { name: w_name, .. }) = writers_schema { if let Schema::Record(RecordSchema { name: r_name, .. }) = readers_schema { - return w_name.name == r_name.name; + if w_name.name == r_name.name { + return Ok(()); + } else { + return Err(CompatibilityError::NameMismatch { + writer_name: w_name.name.clone(), + reader_name: r_name.name.clone(), + }); + } } else { - unreachable!("readers_schema should have been Schema::Record") + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: String::from("record"), + }); } } else { - unreachable!("writers_schema should have been Schema::Record") + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: String::from("record"), + }); } } SchemaKind::Fixed => { @@ -246,23 +310,39 @@ impl SchemaCompatibility { attributes: _, }) = readers_schema { - return w_name.name == r_name.name && w_size == r_size; + return (w_name.name == r_name.name && w_size == r_size) + .then_some(()) + .ok_or(CompatibilityError::FixedMismatch); } else { - unreachable!("readers_schema should have been Schema::Fixed") + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: String::from("fFixed"), + }); } - } else { - unreachable!("writers_schema should have been Schema::Fixed") } } SchemaKind::Enum => { if let Schema::Enum(EnumSchema { name: w_name, .. }) = writers_schema { if let Schema::Enum(EnumSchema { name: r_name, .. }) = readers_schema { - return w_name.name == r_name.name; + if w_name.name == r_name.name { + return Ok(()); + } else { + return Err(CompatibilityError::NameMismatch { + writer_name: w_name.name.clone(), + reader_name: r_name.name.clone(), + }); + } } else { - unreachable!("readers_schema should have been Schema::Enum") + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: String::from("enum"), + }); } } else { - unreachable!("writers_schema should have been Schema::Enum") + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: String::from("enum"), + }); } } SchemaKind::Map => { @@ -270,10 +350,16 @@ impl SchemaCompatibility { if let Schema::Map(r_m) = readers_schema { return SchemaCompatibility::match_schemas(w_m, r_m); } else { - unreachable!("readers_schema should have been Schema::Map") + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: String::from("map"), + }); } } else { - unreachable!("writers_schema should have been Schema::Map") + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: String::from("map"), + }); } } SchemaKind::Array => { @@ -281,45 +367,91 @@ impl SchemaCompatibility { if let Schema::Array(r_a) = readers_schema { return SchemaCompatibility::match_schemas(w_a, r_a); } else { - unreachable!("readers_schema should have been Schema::Array") + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: String::from("array"), + }); } } else { - unreachable!("writers_schema should have been Schema::Array") + return Err(CompatibilityError::TypeExpected { + schema_type: String::from("writers_schema"), + expected_type: String::from("array"), + }); } } - _ => (), + _ => { + return Err(CompatibilityError::Inconclusive(String::from( + "readers_schema", + ))) + } }; } - if w_type == SchemaKind::Int - && [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double] - .iter() - .any(|&t| t == r_type) - { - return true; - } - - if w_type == SchemaKind::Long - && [SchemaKind::Float, SchemaKind::Double] - .iter() - .any(|&t| t == r_type) - { - return true; - } - - if w_type == SchemaKind::Float && r_type == SchemaKind::Double { - return true; - } - - if w_type == SchemaKind::String && r_type == SchemaKind::Bytes { - return true; - } - - if w_type == SchemaKind::Bytes && r_type == SchemaKind::String { - return true; + // Here are the checks for primitive types + match w_type { + SchemaKind::Int => { + if [SchemaKind::Long, SchemaKind::Float, SchemaKind::Double] + .iter() + .any(|&t| t == r_type) + { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: String::from("long, float or double"), + }) + } + } + SchemaKind::Long => { + if [SchemaKind::Float, SchemaKind::Double] + .iter() + .any(|&t| t == r_type) + { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: String::from("float or double"), + }) + } + } + SchemaKind::Float => { + if [SchemaKind::Float, SchemaKind::Double] + .iter() + .any(|&t| t == r_type) + { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: String::from("float or double"), + }) + } + } + SchemaKind::String => { + if r_type == SchemaKind::Bytes { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: String::from("bytes"), + }) + } + } + SchemaKind::Bytes => { + if r_type == SchemaKind::String { + Ok(()) + } else { + Err(CompatibilityError::TypeExpected { + schema_type: String::from("readers_schema"), + expected_type: String::from("string"), + }) + } + } + _ => Err(CompatibilityError::Inconclusive(String::from( + "writers_schema", + ))), } - - false } } @@ -473,10 +605,12 @@ mod tests { #[test] fn test_broken() { - assert!(!SchemaCompatibility::can_read( - &int_string_union_schema(), - &int_union_schema() - )) + assert_eq!( + "Schemas are not compatible. All elements in union must match for both schemas", + SchemaCompatibility::can_read(&int_string_union_schema(), &int_union_schema()) + .unwrap_err() + .to_string() + ) } #[test] @@ -526,9 +660,9 @@ mod tests { (nested_record(), nested_optional_record()), ]; - assert!(!incompatible_schemas + assert!(incompatible_schemas .iter() - .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader))); + .any(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_err())); } #[test] @@ -577,7 +711,7 @@ mod tests { assert!(compatible_schemas .iter() - .all(|(reader, writer)| SchemaCompatibility::can_read(writer, reader))); + .all(|(reader, writer)| SchemaCompatibility::can_read(writer, reader).is_ok())); } fn writer_schema() -> Schema { @@ -601,14 +735,11 @@ mod tests { ]} "#, )?; - assert!(SchemaCompatibility::can_read( - &writer_schema(), - &reader_schema, - )); - assert!(!SchemaCompatibility::can_read( + assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema,).is_ok()); + assert_eq!("Schemas are not compatible. Field 'oldfield2' in reader schema must have a default value", SchemaCompatibility::can_read( &reader_schema, &writer_schema() - )); + ).unwrap_err().to_string()); Ok(()) } @@ -622,14 +753,11 @@ mod tests { ]} "#, )?; - assert!(SchemaCompatibility::can_read( - &writer_schema(), - &reader_schema - )); - assert!(!SchemaCompatibility::can_read( + assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok()); + assert_eq!("Schemas are not compatible. Field 'oldfield1' in reader schema must have a default value", SchemaCompatibility::can_read( &reader_schema, &writer_schema() - )); + ).unwrap_err().to_string()); Ok(()) } @@ -644,14 +772,8 @@ mod tests { ]} "#, )?; - assert!(SchemaCompatibility::can_read( - &writer_schema(), - &reader_schema - )); - assert!(SchemaCompatibility::can_read( - &reader_schema, - &writer_schema() - )); + assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok()); + assert!(SchemaCompatibility::can_read(&reader_schema, &writer_schema()).is_ok()); Ok(()) } @@ -666,14 +788,11 @@ mod tests { ]} "#, )?; - assert!(SchemaCompatibility::can_read( - &writer_schema(), - &reader_schema - )); - assert!(!SchemaCompatibility::can_read( + assert!(SchemaCompatibility::can_read(&writer_schema(), &reader_schema).is_ok()); + assert_eq!("Schemas are not compatible. Field 'oldfield2' in reader schema must have a default value",SchemaCompatibility::can_read( &reader_schema, &writer_schema() - )); + ).unwrap_err().to_string()); Ok(()) } @@ -688,14 +807,13 @@ mod tests { ]} "#, )?; - assert!(!SchemaCompatibility::can_read( + assert_eq!("Schemas are not compatible. Field 'newfield1' in reader schema must have a default value", SchemaCompatibility::can_read( &writer_schema(), - &reader_schema - )); - assert!(!SchemaCompatibility::can_read( + &reader_schema).unwrap_err().to_string()); + assert_eq!("Schemas are not compatible. Field 'oldfield2' in reader schema must have a default value", SchemaCompatibility::can_read( &reader_schema, &writer_schema() - )); + ).unwrap_err().to_string()); Ok(()) } @@ -705,27 +823,23 @@ mod tests { let valid_reader = string_array_schema(); let invalid_reader = string_map_schema(); - assert!(SchemaCompatibility::can_read( - &string_array_schema(), - &valid_reader - )); - assert!(!SchemaCompatibility::can_read( + assert!(SchemaCompatibility::can_read(&string_array_schema(), &valid_reader).is_ok()); + assert_eq!("Schemas are not compatible. Unknown type for 'writers_schema'. Make sure that the type is a valid one", SchemaCompatibility::can_read( &string_array_schema(), &invalid_reader - )); + ).unwrap_err().to_string()); } #[test] fn test_primitive_writer_schema() { let valid_reader = Schema::String; - assert!(SchemaCompatibility::can_read( - &Schema::String, - &valid_reader - )); - assert!(!SchemaCompatibility::can_read( - &Schema::Int, - &Schema::String - )); + assert!(SchemaCompatibility::can_read(&Schema::String, &valid_reader).is_ok()); + assert_eq!( + "Schemas are not compatible. The readers_schema should have been long, float or double", + SchemaCompatibility::can_read(&Schema::Int, &Schema::String) + .unwrap_err() + .to_string() + ); } #[test] @@ -734,8 +848,13 @@ mod tests { let union_writer = union_schema(vec![Schema::Int, Schema::String]); let union_reader = union_schema(vec![Schema::String]); - assert!(!SchemaCompatibility::can_read(&union_writer, &union_reader)); - assert!(SchemaCompatibility::can_read(&union_reader, &union_writer)); + assert_eq!( + "Schemas are not compatible. All elements in union must match for both schemas", + SchemaCompatibility::can_read(&union_writer, &union_reader) + .unwrap_err() + .to_string() + ); + assert!(SchemaCompatibility::can_read(&union_reader, &union_writer).is_ok()); } #[test] @@ -756,7 +875,10 @@ mod tests { "#, )?; - assert!(!SchemaCompatibility::can_read(&string_schema, &int_schema)); + assert_eq!( + "Schemas are not compatible. Field 'field1' in reader schema does not match the type in the writer schema", + SchemaCompatibility::can_read(&string_schema, &int_schema).unwrap_err().to_string() + ); Ok(()) } @@ -770,8 +892,13 @@ mod tests { )?; let enum_schema2 = Schema::parse_str(r#"{"type":"enum", "name":"MyEnum", "symbols":["A","B","C"]}"#)?; - assert!(!SchemaCompatibility::can_read(&enum_schema2, &enum_schema1)); - assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2)); + assert_eq!( + "Schemas are not compatible. Reader's symbols must contain all writer's symbols", + SchemaCompatibility::can_read(&enum_schema2, &enum_schema1) + .unwrap_err() + .to_string() + ); + assert!(SchemaCompatibility::can_read(&enum_schema1, &enum_schema2).is_ok()); Ok(()) } @@ -843,10 +970,12 @@ mod tests { fn test_union_resolution_no_structure_match() { // short name match, but no structure match let read_schema = union_schema(vec![Schema::Null, point_3d_no_default_schema()]); - assert!(!SchemaCompatibility::can_read( - &point_2d_fullname_schema(), - &read_schema - )); + assert_eq!( + "Schemas are not compatible. Schemas mismatch", + SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema) + .unwrap_err() + .to_string() + ); } #[test] @@ -858,10 +987,12 @@ mod tests { point_2d_schema(), point_3d_schema(), ]); - assert!(!SchemaCompatibility::can_read( - &point_2d_fullname_schema(), - &read_schema - )); + assert_eq!( + "Schemas are not compatible. Schemas mismatch", + SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema) + .unwrap_err() + .to_string() + ); } #[test] @@ -873,10 +1004,12 @@ mod tests { point_3d_schema(), point_2d_schema(), ]); - assert!(!SchemaCompatibility::can_read( - &point_2d_fullname_schema(), - &read_schema - )); + assert_eq!( + "Schemas are not compatible. Schemas mismatch", + SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema) + .unwrap_err() + .to_string() + ); } #[test] @@ -888,10 +1021,12 @@ mod tests { point_3d_match_name_schema(), point_3d_schema(), ]); - assert!(!SchemaCompatibility::can_read( - &point_2d_fullname_schema(), - &read_schema - )); + assert_eq!( + "Schemas are not compatible. Schemas mismatch", + SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema) + .unwrap_err() + .to_string() + ); } #[test] @@ -904,10 +1039,7 @@ mod tests { point_3d_schema(), point_2d_fullname_schema(), ]); - assert!(SchemaCompatibility::can_read( - &point_2d_fullname_schema(), - &read_schema - )); + assert!(SchemaCompatibility::can_read(&point_2d_fullname_schema(), &read_schema).is_ok()); } #[test] @@ -1078,7 +1210,7 @@ mod tests { let schema_v1 = Schema::parse_str(RAW_SCHEMA_V1)?; let schema_v2 = Schema::parse_str(RAW_SCHEMA_V2)?; - assert!(SchemaCompatibility::can_read(&schema_v1, &schema_v2)); + assert!(SchemaCompatibility::can_read(&schema_v1, &schema_v2).is_ok()); Ok(()) } @@ -1153,7 +1285,65 @@ mod tests { ]; for (schema_1, schema_2) in schemas { - assert!(SchemaCompatibility::can_read(&schema_1, &schema_2)); + assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok()); + } + + Ok(()) + } + + #[test] + fn test_can_read_compatibility_errors() -> TestResult { + let schemas = [ + ( + Schema::parse_str( + r#"{ + "type": "record", + "name": "StatisticsMap", + "fields": [ + {"name": "average", "type": "int", "default": 0}, + {"name": "success", "type": {"type": "map", "values": "int"}} + ] + }"#)?, + Schema::parse_str( + r#"{ + "type": "record", + "name": "StatisticsMap", + "fields": [ + {"name": "average", "type": "int", "default": 0}, + {"name": "success", "type": ["null", {"type": "map", "values": "int"}], "default": null} + ] + }"#)?, + "Schemas are not compatible. Field 'success' in reader schema does not match the type in the writer schema" + ), + ( + Schema::parse_str( + r#"{ + "type": "record", + "name": "StatisticsArray", + "fields": [ + {"name": "max_values", "type": {"type": "array", "items": "int"}} + ] + }"#)?, + Schema::parse_str( + r#"{ + "type": "record", + "name": "StatisticsArray", + "fields": [ + {"name": "max_values", "type": ["null", {"type": "array", "items": "int"}], "default": null} + ] + }"#)?, + "Schemas are not compatible. Field 'max_values' in reader schema does not match the type in the writer schema" + ) + ]; + + for (schema_1, schema_2, error) in schemas { + assert!(SchemaCompatibility::can_read(&schema_1, &schema_2).is_ok()); + assert_eq!( + error, + SchemaCompatibility::can_read(&schema_2, &schema_1) + .unwrap_err() + .to_string() + ); } Ok(())