From 70aa04fbb7fadfbf6d94575c4422a41d4401437c Mon Sep 17 00:00:00 2001 From: chupaty Date: Tue, 3 Dec 2024 13:56:04 +1000 Subject: [PATCH 01/10] Added Schema::independent_canonical_form Added independent_canonical_from, which populates names available in the schemata, so that the given schema can be used without the schemata --- avro/src/schema.rs | 98 ++++++++++++++---- avro/tests/schema.rs | 236 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 313 insertions(+), 21 deletions(-) diff --git a/avro/src/schema.rs b/avro/src/schema.rs index 93dbe18..b3005f1 100644 --- a/avro/src/schema.rs +++ b/avro/src/schema.rs @@ -25,6 +25,7 @@ use crate::{ validate_schema_name, }, AvroResult, + Schema::Ref, }; use digest::Digest; use log::{debug, error, warn}; @@ -1041,9 +1042,22 @@ impl Schema { pub fn canonical_form(&self) -> String { let json = serde_json::to_value(self) .unwrap_or_else(|e| panic!("Cannot parse Schema from JSON: {e}")); - parsing_canonical_form(&json) + let mut defined_names = HashSet::new(); + parsing_canonical_form(&json,&mut defined_names) } + /// Returns the [Parsing Canonical Form] of `self` that is self contained (not dependent on + /// any definitions in `schemata`) + /// + /// [Parsing Canonical Form]: + /// https://avro.apache.org/docs/current/specification/#parsing-canonical-form-for-schemas + pub fn independent_canonical_form(&self,schemata: &Vec) -> String { + let mut d = self.clone(); + d.denormalize(schemata); + d.canonical_form() + } + + /// Generate [fingerprint] of Schema's [Parsing Canonical Form]. /// /// [Parsing Canonical Form]: @@ -1246,6 +1260,39 @@ impl Schema { attributes, }) } + + fn denormalize(&mut self,schemata: &Vec) { + match self { + Ref{name} => { + let repl = schemata.iter().find(|s| { + if let Some(n) = s.name() { + if *n == *name { + return true; + } + } + false + }); + if let Some(r) = repl { *self = r.clone(); } + }, + Schema::Record(r) => { + for rr in &mut r.fields { + rr.schema.denormalize(schemata); + } + }, + Schema::Array(a) => { + a.items.denormalize(schemata); + }, + Schema::Map(m) => { + m.types.denormalize(schemata); + }, + Schema::Union(u) => { + for uu in &mut u.schemas { + uu.denormalize(schemata); + } + }, + _ => (), + } + } } impl Parser { @@ -2245,19 +2292,35 @@ impl Serialize for RecordField { /// Parses a **valid** avro schema into the Parsing Canonical Form. /// https://avro.apache.org/docs/current/specification/#parsing-canonical-form-for-schemas -fn parsing_canonical_form(schema: &Value) -> String { +fn parsing_canonical_form(schema: &Value, defined_names: &mut HashSet) -> String { match schema { - Value::Object(map) => pcf_map(map), + Value::Object(map) => pcf_map(map, defined_names), Value::String(s) => pcf_string(s), - Value::Array(v) => pcf_array(v), + Value::Array(v) => pcf_array(v, defined_names), json => panic!("got invalid JSON value for canonical form of schema: {json}"), } } -fn pcf_map(schema: &Map) -> String { +fn pcf_map(schema: &Map,defined_names: &mut HashSet) -> String { // Look for the namespace variant up front. let ns = schema.get("namespace").and_then(|v| v.as_str()); let typ = schema.get("type").and_then(|v| v.as_str()); + let raw_name = schema.get("name").and_then(|v| v.as_str()); + let name = if is_named_type(typ) { + Some(format!("{}{}", ns.map_or("".to_string(), |n| { format!("{n}.") }), raw_name.unwrap_or_default())) + } else { + None + }; + + //if this is already a defined type, early return + if let Some(ref n) = name { + if defined_names.get(n).is_some() { + return pcf_string(n); + } else { + defined_names.insert(n.clone()); + } + } + let mut fields = Vec::new(); for (k, v) in schema { // Reduce primitive types to their simple form. ([PRIMITIVE] rule) @@ -2280,17 +2343,10 @@ fn pcf_map(schema: &Map) -> String { // Fully qualify the name, if it isn't already ([FULLNAMES] rule). if k == "name" { - // Invariant: Only valid schemas. Must be a string. - let name = v.as_str().unwrap(); - let n = match ns { - Some(namespace) if is_named_type(typ) && !name.contains('.') => { - Cow::Owned(format!("{namespace}.{name}")) - } - _ => Cow::Borrowed(name), - }; - - fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&n)))); - continue; + if let Some(ref n) = name { + fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&n)))); + continue; + } } // Strip off quotes surrounding "size" type, if they exist ([INTEGERS] rule). @@ -2306,7 +2362,7 @@ fn pcf_map(schema: &Map) -> String { // For anything else, recursively process the result. fields.push(( k, - format!("{}:{}", pcf_string(k), parsing_canonical_form(v)), + format!("{}:{}", pcf_string(k), parsing_canonical_form(v,defined_names)), )); } @@ -2327,10 +2383,10 @@ fn is_named_type(typ: Option<&str>) -> bool { ) } -fn pcf_array(arr: &[Value]) -> String { +fn pcf_array(arr: &[Value],defined_names: &mut HashSet) -> String { let inter = arr .iter() - .map(parsing_canonical_form) + .map(|a|parsing_canonical_form(a,defined_names)) .collect::>() .join(","); format!("[{inter}]") @@ -3424,7 +3480,7 @@ mod tests { assert_eq!(schema, expected); let canonical_form = &schema.canonical_form(); - let expected = r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}}]}"#; + let expected = r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":"enum"}]}"#; assert_eq!(canonical_form, &expected); Ok(()) @@ -3508,7 +3564,7 @@ mod tests { assert_eq!(schema, expected); let canonical_form = &schema.canonical_form(); - let expected = r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":{"name":"fixed","type":"fixed","size":456}}]}"#; + let expected = r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":"fixed"}]}"#; assert_eq!(canonical_form, &expected); Ok(()) diff --git a/avro/tests/schema.rs b/avro/tests/schema.rs index 13cf6af..06e25cd 100644 --- a/avro/tests/schema.rs +++ b/avro/tests/schema.rs @@ -2017,3 +2017,239 @@ fn test_avro_3851_read_default_value_for_enum() -> TestResult { Ok(()) } + +#[test] +fn test_independent_canonical_form_primitives() -> TestResult { + init(); + let record_primitive = r#"{ + "name": "Rec", + "namespace": "ns", + "type": "record", + "fields": [ + {"name": "v", "type": "int"} + ] + }"#; + + let enum_primitive = r#"{ + "name": "En", + "type": "enum", + "symbols": [ "bar0", "bar1" ] + }"#; + + let fixed_primitive = r#"{ + "name": "Fix", + "type": "fixed", + "size": 4 + }"#; + + let record_with_dependencies = r#"{ + "name": "RecWithDeps", + "type": "record", + "fields": [ + {"name": "v1", "type": "ns.Rec"}, + {"name": "v2", "type": "En"}, + {"name": "v3", "type": "Fix"}, + {"name": "v4", "type": "ns.Rec"}, + {"name": "v5", "type": "En"}, + {"name": "v6", "type": "Fix"} + ] + }"#; + + let record_with_no_dependencies = r#"{ + "name": "RecWithDeps", + "type": "record", + "fields": [ + { + "name": "v1", "type": { + "name": "Rec", + "namespace": "ns", + "type": "record", + "fields": [ + {"name": "v", "type": "int"} + ] + } + }, + { + "name": "v2", "type": { + "name": "En", + "type": "enum", + "symbols": [ "bar0", "bar1" ] + } + }, + {"name": "v3", "type": + { + "name": "Fix", + "type": "fixed", + "size": 4 + } + }, + {"name": "v4", "type": "ns.Rec"}, + {"name": "v5", "type": "En"}, + {"name": "v6", "type": "Fix"} + ] + }"#; + + + let independent_schema = Schema::parse_str(record_with_no_dependencies)?; + let schema_strs = [ + fixed_primitive, + enum_primitive, + record_primitive, + record_with_dependencies]; + + for schema_str_perm in permutations(&schema_strs) { + let schema_str_perm: Vec<&str> = schema_str_perm.iter().map(|s| **s).collect(); + let schemata = Schema::parse_list(&schema_str_perm)?; + assert_eq!(schemata.len(), 4); + let test_schema = schemata + .iter() + .find(|a|a.name().unwrap().to_string() == "RecWithDeps".to_string()).unwrap(); + + assert_eq!( + independent_schema.independent_canonical_form(&schemata), + independent_schema.canonical_form() + ); + + assert_eq!( + independent_schema.canonical_form(), + test_schema.independent_canonical_form(&schemata)); + } + Ok(()) +} + +#[test] +fn test_independent_canonical_form_usages() -> TestResult { + init(); + let record_primitive = r#"{ + "name": "Rec", + "namespace": "ns", + "type": "record", + "fields": [ + {"name": "v", "type": "int"} + ] + }"#; + + let record_usage = r#"{ + "name": "RecUsage", + "type": "record", + "fields": [ + {"name": "v1", "type": "ns.Rec"}, + {"name": "v2", "type": "ns.Rec"} + ] + }"#; + let record_usage_independent = r#"{ + "name": "RecUsage", + "type": "record", + "fields": [ + {"name": "v1", "type": { + "name": "ns.Rec", "type": "record","fields": [{"name": "v", "type": "int"}]} + }, + {"name": "v2", "type": "ns.Rec"} + ] + }"#; + + let array_usage = r#"{ + "name": "ArrayUsage", + "type": "record", + "fields": [ + {"name": "field_one", "type": {"type": "array", "items": "ns.Rec"}}, + {"name": "field_two", "type": {"type": "array", "items": "ns.Rec"}} + ] + }"#; + let array_usage_independent = r#"{ + "name": "ArrayUsage", + "type": "record", + "fields": [ + {"name": "field_one", "type": {"type": "array", "items": { + "name": "ns.Rec", "type": "record","fields": [{"name": "v", "type": "int"}]} + }}, + {"name": "field_two", "type": {"type": "array", "items": "ns.Rec"}} + ] + }"#; + + let union_usage = r#"{ + "name": "UnionUsage", + "type": "record", + "fields": [ + {"name": "field_one", "type": ["null", "ns.Rec"]}, + {"name": "field_two", "type": ["null", "ns.Rec"]} + ] + }"#; + let union_usage_independent = r#"{ + "name": "UnionUsage", + "type": "record", + "fields": [ + {"name": "field_one", "type": ["null", { + "name": "ns.Rec", "type": "record","fields": [{"name": "v", "type": "int"}]} + ]}, + {"name": "field_two", "type": ["null", "ns.Rec"]} + ] + }"#; + + let map_usage = r#"{ + "name": "MapUsage", + "type": "record", + "fields": [ + {"name": "field_one", "type": {"type": "map", "values": "ns.Rec"}}, + {"name": "field_two", "type": {"type": "map", "values": "ns.Rec"}} + ] + }"#; + let map_usage_independent = r#"{ + "name": "MapUsage", + "type": "record", + "fields": [ + {"name": "field_one", "type": {"type": "map", "values": { + "name": "ns.Rec", "type": "record","fields": [{"name": "v", "type": "int"}]} + }}, + {"name": "field_two", "type": {"type": "map", "values": "ns.Rec"}} + ] + }"#; + + let schema_strs = [ + record_primitive, + record_usage, + array_usage, + map_usage, + union_usage]; + + for schema_str_perm in permutations(&schema_strs) { + let schema_str_perm: Vec<&str> = schema_str_perm.iter().map(|s| **s).collect(); + let schemata = Schema::parse_list(&schema_str_perm)?; + for s in &schemata { + match s.name().unwrap().to_string().as_str() { + "RecUsage" => { + assert_eq!( + s.independent_canonical_form(&schemata), + Schema::parse_str(record_usage_independent)?.canonical_form() + ); + }, + "ArrayUsage" => { + assert_eq!( + s.independent_canonical_form(&schemata), + Schema::parse_str(array_usage_independent)?.canonical_form() + ); + }, + "UnionUsage" => { + assert_eq!( + s.independent_canonical_form(&schemata), + Schema::parse_str(union_usage_independent)?.canonical_form() + ); + }, + "MapUsage" => { + assert_eq!( + s.independent_canonical_form(&schemata), + Schema::parse_str(map_usage_independent)?.canonical_form() + ); + }, + "ns.Rec" => { + assert_eq!( + s.independent_canonical_form(&schemata), + s.canonical_form() + ); + }, + _ => assert!(false), + } + } + } + Ok(()) +} \ No newline at end of file From a3661b765565b88d12eae3c20595d47c715e1175 Mon Sep 17 00:00:00 2001 From: chupaty Date: Tue, 3 Dec 2024 14:14:23 +1000 Subject: [PATCH 02/10] rust fmt --- avro/src/schema.rs | 41 +++++++++++++++++++++++++---------------- avro/tests/schema.rs | 30 +++++++++++++++--------------- 2 files changed, 40 insertions(+), 31 deletions(-) diff --git a/avro/src/schema.rs b/avro/src/schema.rs index b3005f1..193fe42 100644 --- a/avro/src/schema.rs +++ b/avro/src/schema.rs @@ -1043,7 +1043,7 @@ impl Schema { let json = serde_json::to_value(self) .unwrap_or_else(|e| panic!("Cannot parse Schema from JSON: {e}")); let mut defined_names = HashSet::new(); - parsing_canonical_form(&json,&mut defined_names) + parsing_canonical_form(&json, &mut defined_names) } /// Returns the [Parsing Canonical Form] of `self` that is self contained (not dependent on @@ -1051,13 +1051,12 @@ impl Schema { /// /// [Parsing Canonical Form]: /// https://avro.apache.org/docs/current/specification/#parsing-canonical-form-for-schemas - pub fn independent_canonical_form(&self,schemata: &Vec) -> String { + pub fn independent_canonical_form(&self, schemata: &Vec) -> String { let mut d = self.clone(); d.denormalize(schemata); d.canonical_form() } - /// Generate [fingerprint] of Schema's [Parsing Canonical Form]. /// /// [Parsing Canonical Form]: @@ -1261,9 +1260,9 @@ impl Schema { }) } - fn denormalize(&mut self,schemata: &Vec) { + fn denormalize(&mut self, schemata: &Vec) { match self { - Ref{name} => { + Ref { name } => { let repl = schemata.iter().find(|s| { if let Some(n) = s.name() { if *n == *name { @@ -1272,24 +1271,26 @@ impl Schema { } false }); - if let Some(r) = repl { *self = r.clone(); } - }, + if let Some(r) = repl { + *self = r.clone(); + } + } Schema::Record(r) => { for rr in &mut r.fields { rr.schema.denormalize(schemata); } - }, + } Schema::Array(a) => { a.items.denormalize(schemata); - }, + } Schema::Map(m) => { m.types.denormalize(schemata); - }, + } Schema::Union(u) => { for uu in &mut u.schemas { uu.denormalize(schemata); } - }, + } _ => (), } } @@ -2301,13 +2302,17 @@ fn parsing_canonical_form(schema: &Value, defined_names: &mut HashSet) - } } -fn pcf_map(schema: &Map,defined_names: &mut HashSet) -> String { +fn pcf_map(schema: &Map, defined_names: &mut HashSet) -> String { // Look for the namespace variant up front. let ns = schema.get("namespace").and_then(|v| v.as_str()); let typ = schema.get("type").and_then(|v| v.as_str()); let raw_name = schema.get("name").and_then(|v| v.as_str()); let name = if is_named_type(typ) { - Some(format!("{}{}", ns.map_or("".to_string(), |n| { format!("{n}.") }), raw_name.unwrap_or_default())) + Some(format!( + "{}{}", + ns.map_or("".to_string(), |n| { format!("{n}.") }), + raw_name.unwrap_or_default() + )) } else { None }; @@ -2362,7 +2367,11 @@ fn pcf_map(schema: &Map,defined_names: &mut HashSet) -> S // For anything else, recursively process the result. fields.push(( k, - format!("{}:{}", pcf_string(k), parsing_canonical_form(v,defined_names)), + format!( + "{}:{}", + pcf_string(k), + parsing_canonical_form(v, defined_names) + ), )); } @@ -2383,10 +2392,10 @@ fn is_named_type(typ: Option<&str>) -> bool { ) } -fn pcf_array(arr: &[Value],defined_names: &mut HashSet) -> String { +fn pcf_array(arr: &[Value], defined_names: &mut HashSet) -> String { let inter = arr .iter() - .map(|a|parsing_canonical_form(a,defined_names)) + .map(|a| parsing_canonical_form(a, defined_names)) .collect::>() .join(","); format!("[{inter}]") diff --git a/avro/tests/schema.rs b/avro/tests/schema.rs index 06e25cd..2c40775 100644 --- a/avro/tests/schema.rs +++ b/avro/tests/schema.rs @@ -2089,13 +2089,13 @@ fn test_independent_canonical_form_primitives() -> TestResult { ] }"#; - let independent_schema = Schema::parse_str(record_with_no_dependencies)?; let schema_strs = [ fixed_primitive, enum_primitive, record_primitive, - record_with_dependencies]; + record_with_dependencies, + ]; for schema_str_perm in permutations(&schema_strs) { let schema_str_perm: Vec<&str> = schema_str_perm.iter().map(|s| **s).collect(); @@ -2103,7 +2103,8 @@ fn test_independent_canonical_form_primitives() -> TestResult { assert_eq!(schemata.len(), 4); let test_schema = schemata .iter() - .find(|a|a.name().unwrap().to_string() == "RecWithDeps".to_string()).unwrap(); + .find(|a| a.name().unwrap().to_string() == "RecWithDeps".to_string()) + .unwrap(); assert_eq!( independent_schema.independent_canonical_form(&schemata), @@ -2112,7 +2113,8 @@ fn test_independent_canonical_form_primitives() -> TestResult { assert_eq!( independent_schema.canonical_form(), - test_schema.independent_canonical_form(&schemata)); + test_schema.independent_canonical_form(&schemata) + ); } Ok(()) } @@ -2210,7 +2212,8 @@ fn test_independent_canonical_form_usages() -> TestResult { record_usage, array_usage, map_usage, - union_usage]; + union_usage, + ]; for schema_str_perm in permutations(&schema_strs) { let schema_str_perm: Vec<&str> = schema_str_perm.iter().map(|s| **s).collect(); @@ -2222,34 +2225,31 @@ fn test_independent_canonical_form_usages() -> TestResult { s.independent_canonical_form(&schemata), Schema::parse_str(record_usage_independent)?.canonical_form() ); - }, + } "ArrayUsage" => { assert_eq!( s.independent_canonical_form(&schemata), Schema::parse_str(array_usage_independent)?.canonical_form() ); - }, + } "UnionUsage" => { assert_eq!( s.independent_canonical_form(&schemata), Schema::parse_str(union_usage_independent)?.canonical_form() ); - }, + } "MapUsage" => { assert_eq!( s.independent_canonical_form(&schemata), Schema::parse_str(map_usage_independent)?.canonical_form() ); - }, + } "ns.Rec" => { - assert_eq!( - s.independent_canonical_form(&schemata), - s.canonical_form() - ); - }, + assert_eq!(s.independent_canonical_form(&schemata), s.canonical_form()); + } _ => assert!(false), } } } Ok(()) -} \ No newline at end of file +} From c70e6c1093d18a61b28bbb55eed639e713f69f57 Mon Sep 17 00:00:00 2001 From: chupaty Date: Tue, 3 Dec 2024 14:23:57 +1000 Subject: [PATCH 03/10] clippy fix --- avro/src/schema.rs | 2 +- avro/tests/schema.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/avro/src/schema.rs b/avro/src/schema.rs index 193fe42..c4fb4a5 100644 --- a/avro/src/schema.rs +++ b/avro/src/schema.rs @@ -2349,7 +2349,7 @@ fn pcf_map(schema: &Map, defined_names: &mut HashSet) -> // Fully qualify the name, if it isn't already ([FULLNAMES] rule). if k == "name" { if let Some(ref n) = name { - fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&n)))); + fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(n)))); continue; } } diff --git a/avro/tests/schema.rs b/avro/tests/schema.rs index 2c40775..2b6249d 100644 --- a/avro/tests/schema.rs +++ b/avro/tests/schema.rs @@ -2103,7 +2103,7 @@ fn test_independent_canonical_form_primitives() -> TestResult { assert_eq!(schemata.len(), 4); let test_schema = schemata .iter() - .find(|a| a.name().unwrap().to_string() == "RecWithDeps".to_string()) + .find(|a| a.name().unwrap().to_string() == *"RecWithDeps") .unwrap(); assert_eq!( @@ -2247,7 +2247,7 @@ fn test_independent_canonical_form_usages() -> TestResult { "ns.Rec" => { assert_eq!(s.independent_canonical_form(&schemata), s.canonical_form()); } - _ => assert!(false), + _ => panic!(), } } } From 1607434b5927d8c4b33036fbb7947dac975f32b6 Mon Sep 17 00:00:00 2001 From: chupaty Date: Tue, 3 Dec 2024 14:55:25 +1000 Subject: [PATCH 04/10] unused imports --- avro/src/schema.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/avro/src/schema.rs b/avro/src/schema.rs index c4fb4a5..aae85d0 100644 --- a/avro/src/schema.rs +++ b/avro/src/schema.rs @@ -35,7 +35,7 @@ use serde::{ }; use serde_json::{Map, Value}; use std::{ - borrow::{Borrow, Cow}, + borrow::Borrow, collections::{BTreeMap, HashMap, HashSet}, fmt, fmt::Debug, @@ -43,6 +43,8 @@ use std::{ io::Read, str::FromStr, }; +#[allow(unused_imports)] +use std::borrow::Cow; use strum_macros::{Display, EnumDiscriminants, EnumString}; /// Represents an Avro schema fingerprint From a36eecd30409be04958693c825b85943740cb580 Mon Sep 17 00:00:00 2001 From: chupaty Date: Wed, 4 Dec 2024 09:10:06 +1000 Subject: [PATCH 05/10] Fix for nested record usage --- avro/src/schema.rs | 11 +++---- avro/tests/schema.rs | 69 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 7 deletions(-) diff --git a/avro/src/schema.rs b/avro/src/schema.rs index aae85d0..20340b5 100644 --- a/avro/src/schema.rs +++ b/avro/src/schema.rs @@ -1266,15 +1266,12 @@ impl Schema { match self { Ref { name } => { let repl = schemata.iter().find(|s| { - if let Some(n) = s.name() { - if *n == *name { - return true; - } - } - false + s.name().map(|n| *n == *name).unwrap_or(false) }); if let Some(r) = repl { - *self = r.clone(); + let mut denorm = r.clone(); + denorm.denormalize(schemata); + *self = denorm; } } Schema::Record(r) => { diff --git a/avro/tests/schema.rs b/avro/tests/schema.rs index 2b6249d..ca0c838 100644 --- a/avro/tests/schema.rs +++ b/avro/tests/schema.rs @@ -2253,3 +2253,72 @@ fn test_independent_canonical_form_usages() -> TestResult { } Ok(()) } + +#[test] +fn test_independent_canonical_form_deep_recursion() -> TestResult { + init(); + let record_primitive = r#"{ + "name": "Rec", + "namespace": "ns", + "type": "record", + "fields": [ + {"name": "v", "type": "int"} + ] + }"#; + + let record_usage = r#"{ + "name": "RecUsage", + "type": "record", + "fields": [ + {"name": "v1", "type": "ns.Rec"}, + {"name": "v2", "type": "ns.Rec"} + ] + }"#; + + let record_usage_usage = r#"{ + "name": "RecUsageUsage", + "type": "record", + "fields": [ + {"name": "r1", "type": "RecUsage"}, + {"name": "r2", "type": "RecUsage"} + ] + }"#; + + let record_usage_usage_independent = r#"{ + "name": "RecUsageUsage", + "type": "record", + "fields": [ + {"name": "r1", "type": { + "name": "RecUsage", + "type": "record", + "fields": [ + { + "name": "v1", "type": { + "name": "ns.Rec", "type": "record","fields": [{"name": "v", "type": "int"}] + } + }, + {"name": "v2", "type": "ns.Rec"} + ] + }}, + {"name": "r2", "type": "RecUsage"} + ] + + }"#; + + + let schema_strs = [ + record_primitive, + record_usage, + record_usage_usage, + ]; + + for schema_str_perm in permutations(&schema_strs) { + let schema_str_perm: Vec<&str> = schema_str_perm.iter().map(|s| **s).collect(); + let schemata = Schema::parse_list(&schema_str_perm)?; + let ruu = schemata.iter().find(|s|s.name().unwrap().to_string().as_str() == "RecUsageUsage").unwrap(); + assert_eq!( + ruu.independent_canonical_form(&schemata), + Schema::parse_str(record_usage_usage_independent)?.canonical_form()); + } + Ok(()) +} From 874843d34221a598e1714c1bb8caae77f25521f7 Mon Sep 17 00:00:00 2001 From: chupaty Date: Wed, 4 Dec 2024 09:15:56 +1000 Subject: [PATCH 06/10] cargo fmt --- avro/src/schema.rs | 9 ++++----- avro/tests/schema.rs | 15 +++++++-------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/avro/src/schema.rs b/avro/src/schema.rs index 20340b5..1a8bec3 100644 --- a/avro/src/schema.rs +++ b/avro/src/schema.rs @@ -43,8 +43,6 @@ use std::{ io::Read, str::FromStr, }; -#[allow(unused_imports)] -use std::borrow::Cow; use strum_macros::{Display, EnumDiscriminants, EnumString}; /// Represents an Avro schema fingerprint @@ -1265,9 +1263,9 @@ impl Schema { fn denormalize(&mut self, schemata: &Vec) { match self { Ref { name } => { - let repl = schemata.iter().find(|s| { - s.name().map(|n| *n == *name).unwrap_or(false) - }); + let repl = schemata + .iter() + .find(|s| s.name().map(|n| *n == *name).unwrap_or(false)); if let Some(r) = repl { let mut denorm = r.clone(); denorm.denormalize(schemata); @@ -2440,6 +2438,7 @@ pub trait AvroSchema { #[cfg(feature = "derive")] pub mod derive { use super::*; + use std::borrow::Cow; /// Trait for types that serve as fully defined components inside an Avro data model. Derive /// implementation available through `derive` feature. This is what is implemented by diff --git a/avro/tests/schema.rs b/avro/tests/schema.rs index ca0c838..5729e36 100644 --- a/avro/tests/schema.rs +++ b/avro/tests/schema.rs @@ -2305,20 +2305,19 @@ fn test_independent_canonical_form_deep_recursion() -> TestResult { }"#; - - let schema_strs = [ - record_primitive, - record_usage, - record_usage_usage, - ]; + let schema_strs = [record_primitive, record_usage, record_usage_usage]; for schema_str_perm in permutations(&schema_strs) { let schema_str_perm: Vec<&str> = schema_str_perm.iter().map(|s| **s).collect(); let schemata = Schema::parse_list(&schema_str_perm)?; - let ruu = schemata.iter().find(|s|s.name().unwrap().to_string().as_str() == "RecUsageUsage").unwrap(); + let ruu = schemata + .iter() + .find(|s| s.name().unwrap().to_string().as_str() == "RecUsageUsage") + .unwrap(); assert_eq!( ruu.independent_canonical_form(&schemata), - Schema::parse_str(record_usage_usage_independent)?.canonical_form()); + Schema::parse_str(record_usage_usage_independent)?.canonical_form() + ); } Ok(()) } From 4703cd427335ab72c976134cb25e63c05ec84378 Mon Sep 17 00:00:00 2001 From: chupaty Date: Tue, 10 Dec 2024 09:05:06 +1000 Subject: [PATCH 07/10] Allow independent_canonical_form() to fail if ref is not found --- avro/src/schema.rs | 21 +++++++++++--------- avro/tests/schema.rs | 47 ++++++++++++++++++++++++++++++++++++-------- 2 files changed, 51 insertions(+), 17 deletions(-) diff --git a/avro/src/schema.rs b/avro/src/schema.rs index 1a8bec3..e1f54e3 100644 --- a/avro/src/schema.rs +++ b/avro/src/schema.rs @@ -1051,10 +1051,10 @@ impl Schema { /// /// [Parsing Canonical Form]: /// https://avro.apache.org/docs/current/specification/#parsing-canonical-form-for-schemas - pub fn independent_canonical_form(&self, schemata: &Vec) -> String { + pub fn independent_canonical_form(&self, schemata: &Vec) -> Result { let mut d = self.clone(); - d.denormalize(schemata); - d.canonical_form() + d.denormalize(schemata)?; + Ok(d.canonical_form()) } /// Generate [fingerprint] of Schema's [Parsing Canonical Form]. @@ -1260,7 +1260,7 @@ impl Schema { }) } - fn denormalize(&mut self, schemata: &Vec) { + fn denormalize(&mut self, schemata: &Vec) -> AvroResult<()> { match self { Ref { name } => { let repl = schemata @@ -1268,28 +1268,31 @@ impl Schema { .find(|s| s.name().map(|n| *n == *name).unwrap_or(false)); if let Some(r) = repl { let mut denorm = r.clone(); - denorm.denormalize(schemata); + denorm.denormalize(schemata)?; *self = denorm; + } else { + return Err(Error::SchemaResolutionError(name.clone())); } } Schema::Record(r) => { for rr in &mut r.fields { - rr.schema.denormalize(schemata); + let _ = rr.schema.denormalize(schemata)?; } } Schema::Array(a) => { - a.items.denormalize(schemata); + let _ = a.items.denormalize(schemata)?; } Schema::Map(m) => { - m.types.denormalize(schemata); + let _ = m.types.denormalize(schemata)?; } Schema::Union(u) => { for uu in &mut u.schemas { - uu.denormalize(schemata); + let _ = uu.denormalize(schemata)?; } } _ => (), } + Ok(()) } } diff --git a/avro/tests/schema.rs b/avro/tests/schema.rs index 5729e36..69f8f01 100644 --- a/avro/tests/schema.rs +++ b/avro/tests/schema.rs @@ -2107,13 +2107,13 @@ fn test_independent_canonical_form_primitives() -> TestResult { .unwrap(); assert_eq!( - independent_schema.independent_canonical_form(&schemata), + independent_schema.independent_canonical_form(&schemata)?, independent_schema.canonical_form() ); assert_eq!( independent_schema.canonical_form(), - test_schema.independent_canonical_form(&schemata) + test_schema.independent_canonical_form(&schemata)? ); } Ok(()) @@ -2222,30 +2222,30 @@ fn test_independent_canonical_form_usages() -> TestResult { match s.name().unwrap().to_string().as_str() { "RecUsage" => { assert_eq!( - s.independent_canonical_form(&schemata), + s.independent_canonical_form(&schemata)?, Schema::parse_str(record_usage_independent)?.canonical_form() ); } "ArrayUsage" => { assert_eq!( - s.independent_canonical_form(&schemata), + s.independent_canonical_form(&schemata)?, Schema::parse_str(array_usage_independent)?.canonical_form() ); } "UnionUsage" => { assert_eq!( - s.independent_canonical_form(&schemata), + s.independent_canonical_form(&schemata)?, Schema::parse_str(union_usage_independent)?.canonical_form() ); } "MapUsage" => { assert_eq!( - s.independent_canonical_form(&schemata), + s.independent_canonical_form(&schemata)?, Schema::parse_str(map_usage_independent)?.canonical_form() ); } "ns.Rec" => { - assert_eq!(s.independent_canonical_form(&schemata), s.canonical_form()); + assert_eq!(s.independent_canonical_form(&schemata)?, s.canonical_form()); } _ => panic!(), } @@ -2315,9 +2315,40 @@ fn test_independent_canonical_form_deep_recursion() -> TestResult { .find(|s| s.name().unwrap().to_string().as_str() == "RecUsageUsage") .unwrap(); assert_eq!( - ruu.independent_canonical_form(&schemata), + ruu.independent_canonical_form(&schemata)?, Schema::parse_str(record_usage_usage_independent)?.canonical_form() ); } Ok(()) } + +#[test] +fn test_independent_canonical_form_missing_ref() -> TestResult { + init(); + let record_primitive = r#"{ + "name": "Rec", + "namespace": "ns", + "type": "record", + "fields": [ + {"name": "v", "type": "int"} + ] + }"#; + + let record_usage = r#"{ + "name": "RecUsage", + "type": "record", + "fields": [ + {"name": "v1", "type": "ns.Rec"} + ] + }"#; + + let schema_strs = [record_primitive, record_usage]; + let schemata = Schema::parse_list(&schema_strs)?; + assert!( + matches!( + schemata[1].independent_canonical_form(&vec![]).err().unwrap(), //NOTE - we're passing in an empty schemata + Error::SchemaResolutionError(..) + ) + ); + Ok(()) +} From 1e6ab58dc17d612655e50f3a5adb8230cc87bd96 Mon Sep 17 00:00:00 2001 From: chupaty Date: Tue, 10 Dec 2024 09:06:33 +1000 Subject: [PATCH 08/10] cargo fmt --- avro/tests/schema.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/avro/tests/schema.rs b/avro/tests/schema.rs index 69f8f01..a0e860a 100644 --- a/avro/tests/schema.rs +++ b/avro/tests/schema.rs @@ -2344,11 +2344,12 @@ fn test_independent_canonical_form_missing_ref() -> TestResult { let schema_strs = [record_primitive, record_usage]; let schemata = Schema::parse_list(&schema_strs)?; - assert!( - matches!( - schemata[1].independent_canonical_form(&vec![]).err().unwrap(), //NOTE - we're passing in an empty schemata - Error::SchemaResolutionError(..) - ) - ); + assert!(matches!( + schemata[1] + .independent_canonical_form(&vec![]) + .err() + .unwrap(), //NOTE - we're passing in an empty schemata + Error::SchemaResolutionError(..) + )); Ok(()) } From 585091211386e092e6a4976dbf54786f241a9ba9 Mon Sep 17 00:00:00 2001 From: chupaty Date: Tue, 10 Dec 2024 09:13:32 +1000 Subject: [PATCH 09/10] clippy cleanup --- avro/src/schema.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/avro/src/schema.rs b/avro/src/schema.rs index e1f54e3..707cf74 100644 --- a/avro/src/schema.rs +++ b/avro/src/schema.rs @@ -1276,18 +1276,18 @@ impl Schema { } Schema::Record(r) => { for rr in &mut r.fields { - let _ = rr.schema.denormalize(schemata)?; + rr.schema.denormalize(schemata)?; } } Schema::Array(a) => { - let _ = a.items.denormalize(schemata)?; + a.items.denormalize(schemata)?; } Schema::Map(m) => { - let _ = m.types.denormalize(schemata)?; + m.types.denormalize(schemata)?; } Schema::Union(u) => { for uu in &mut u.schemas { - let _ = uu.denormalize(schemata)?; + uu.denormalize(schemata)?; } } _ => (), From aa3cf21b8323bbb7f57a132d65865512f1211c17 Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Mon, 20 Jan 2025 10:06:01 +0200 Subject: [PATCH 10/10] Minor cleanup and better naming Signed-off-by: Martin Tzvetanov Grigorov --- avro/src/schema.rs | 39 +++++++++++++++++++-------------------- avro/tests/schema.rs | 36 ++++++++++++++++++------------------ 2 files changed, 37 insertions(+), 38 deletions(-) diff --git a/avro/src/schema.rs b/avro/src/schema.rs index 707cf74..1737665 100644 --- a/avro/src/schema.rs +++ b/avro/src/schema.rs @@ -25,7 +25,6 @@ use crate::{ validate_schema_name, }, AvroResult, - Schema::Ref, }; use digest::Digest; use log::{debug, error, warn}; @@ -1052,9 +1051,9 @@ impl Schema { /// [Parsing Canonical Form]: /// https://avro.apache.org/docs/current/specification/#parsing-canonical-form-for-schemas pub fn independent_canonical_form(&self, schemata: &Vec) -> Result { - let mut d = self.clone(); - d.denormalize(schemata)?; - Ok(d.canonical_form()) + let mut this = self.clone(); + this.denormalize(schemata)?; + Ok(this.canonical_form()) } /// Generate [fingerprint] of Schema's [Parsing Canonical Form]. @@ -1262,32 +1261,32 @@ impl Schema { fn denormalize(&mut self, schemata: &Vec) -> AvroResult<()> { match self { - Ref { name } => { - let repl = schemata + Schema::Ref { name } => { + let replacement_schema = schemata .iter() .find(|s| s.name().map(|n| *n == *name).unwrap_or(false)); - if let Some(r) = repl { - let mut denorm = r.clone(); + if let Some(schema) = replacement_schema { + let mut denorm = schema.clone(); denorm.denormalize(schemata)?; *self = denorm; } else { return Err(Error::SchemaResolutionError(name.clone())); } } - Schema::Record(r) => { - for rr in &mut r.fields { - rr.schema.denormalize(schemata)?; + Schema::Record(record_schema) => { + for field in &mut record_schema.fields { + field.schema.denormalize(schemata)?; } } - Schema::Array(a) => { - a.items.denormalize(schemata)?; + Schema::Array(array_schema) => { + array_schema.items.denormalize(schemata)?; } - Schema::Map(m) => { - m.types.denormalize(schemata)?; + Schema::Map(map_schema) => { + map_schema.types.denormalize(schemata)?; } - Schema::Union(u) => { - for uu in &mut u.schemas { - uu.denormalize(schemata)?; + Schema::Union(union_schema) => { + for schema in &mut union_schema.schemas { + schema.denormalize(schemata)?; } } _ => (), @@ -2319,7 +2318,7 @@ fn pcf_map(schema: &Map, defined_names: &mut HashSet) -> //if this is already a defined type, early return if let Some(ref n) = name { - if defined_names.get(n).is_some() { + if defined_names.contains(n) { return pcf_string(n); } else { defined_names.insert(n.clone()); @@ -2349,7 +2348,7 @@ fn pcf_map(schema: &Map, defined_names: &mut HashSet) -> // Fully qualify the name, if it isn't already ([FULLNAMES] rule). if k == "name" { if let Some(ref n) = name { - fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(n)))); + fields.push(("name", format!("{}:{}", pcf_string(k), pcf_string(n)))); continue; } } diff --git a/avro/tests/schema.rs b/avro/tests/schema.rs index a0e860a..0c90615 100644 --- a/avro/tests/schema.rs +++ b/avro/tests/schema.rs @@ -2019,7 +2019,7 @@ fn test_avro_3851_read_default_value_for_enum() -> TestResult { } #[test] -fn test_independent_canonical_form_primitives() -> TestResult { +fn avro_rs_66_test_independent_canonical_form_primitives() -> TestResult { init(); let record_primitive = r#"{ "name": "Rec", @@ -2100,7 +2100,7 @@ fn test_independent_canonical_form_primitives() -> TestResult { for schema_str_perm in permutations(&schema_strs) { let schema_str_perm: Vec<&str> = schema_str_perm.iter().map(|s| **s).collect(); let schemata = Schema::parse_list(&schema_str_perm)?; - assert_eq!(schemata.len(), 4); + assert_eq!(schemata.len(), schema_strs.len()); let test_schema = schemata .iter() .find(|a| a.name().unwrap().to_string() == *"RecWithDeps") @@ -2120,7 +2120,7 @@ fn test_independent_canonical_form_primitives() -> TestResult { } #[test] -fn test_independent_canonical_form_usages() -> TestResult { +fn avro_rs_66_test_independent_canonical_form_usages() -> TestResult { init(); let record_primitive = r#"{ "name": "Rec", @@ -2218,36 +2218,39 @@ fn test_independent_canonical_form_usages() -> TestResult { for schema_str_perm in permutations(&schema_strs) { let schema_str_perm: Vec<&str> = schema_str_perm.iter().map(|s| **s).collect(); let schemata = Schema::parse_list(&schema_str_perm)?; - for s in &schemata { - match s.name().unwrap().to_string().as_str() { + for schema in &schemata { + match schema.name().unwrap().to_string().as_str() { "RecUsage" => { assert_eq!( - s.independent_canonical_form(&schemata)?, + schema.independent_canonical_form(&schemata)?, Schema::parse_str(record_usage_independent)?.canonical_form() ); } "ArrayUsage" => { assert_eq!( - s.independent_canonical_form(&schemata)?, + schema.independent_canonical_form(&schemata)?, Schema::parse_str(array_usage_independent)?.canonical_form() ); } "UnionUsage" => { assert_eq!( - s.independent_canonical_form(&schemata)?, + schema.independent_canonical_form(&schemata)?, Schema::parse_str(union_usage_independent)?.canonical_form() ); } "MapUsage" => { assert_eq!( - s.independent_canonical_form(&schemata)?, + schema.independent_canonical_form(&schemata)?, Schema::parse_str(map_usage_independent)?.canonical_form() ); } "ns.Rec" => { - assert_eq!(s.independent_canonical_form(&schemata)?, s.canonical_form()); + assert_eq!( + schema.independent_canonical_form(&schemata)?, + schema.canonical_form() + ); } - _ => panic!(), + other => unreachable!("Unknown schema name: {}", other), } } } @@ -2255,7 +2258,7 @@ fn test_independent_canonical_form_usages() -> TestResult { } #[test] -fn test_independent_canonical_form_deep_recursion() -> TestResult { +fn avro_rs_66_test_independent_canonical_form_deep_recursion() -> TestResult { init(); let record_primitive = r#"{ "name": "Rec", @@ -2323,7 +2326,7 @@ fn test_independent_canonical_form_deep_recursion() -> TestResult { } #[test] -fn test_independent_canonical_form_missing_ref() -> TestResult { +fn avro_rs_66_test_independent_canonical_form_missing_ref() -> TestResult { init(); let record_primitive = r#"{ "name": "Rec", @@ -2345,11 +2348,8 @@ fn test_independent_canonical_form_missing_ref() -> TestResult { let schema_strs = [record_primitive, record_usage]; let schemata = Schema::parse_list(&schema_strs)?; assert!(matches!( - schemata[1] - .independent_canonical_form(&vec![]) - .err() - .unwrap(), //NOTE - we're passing in an empty schemata - Error::SchemaResolutionError(..) + schemata[1].independent_canonical_form(&Vec::with_capacity(0)), //NOTE - we're passing in an empty schemata + Err(Error::SchemaResolutionError(..)) )); Ok(()) }