Skip to content

Commit

Permalink
Added Schema::independent_canonical_form (#66)
Browse files Browse the repository at this point in the history
* Added Schema::independent_canonical_form

Added independent_canonical_from, which populates names available in the schemata, so that the given schema can be used without the schemata

* rust fmt

* clippy fix

* unused imports

* Fix for nested record usage

* cargo fmt

* Allow independent_canonical_form() to fail if ref is not found

* cargo fmt

* clippy cleanup

* Minor cleanup and better naming

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>

---------

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>
Co-authored-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>
  • Loading branch information
chupaty and martin-g authored Jan 20, 2025
1 parent 7528d0b commit 17bd39c
Show file tree
Hide file tree
Showing 2 changed files with 423 additions and 22 deletions.
109 changes: 87 additions & 22 deletions avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use serde::{
};
use serde_json::{Map, Value};
use std::{
borrow::{Borrow, Cow},
borrow::Borrow,
collections::{BTreeMap, HashMap, HashSet},
fmt,
fmt::Debug,
Expand Down Expand Up @@ -1041,7 +1041,19 @@ impl Schema {
pub fn canonical_form(&self) -> String {
let json = serde_json::to_value(self)
.unwrap_or_else(|e| panic!("Cannot parse Schema from JSON: {e}"));
parsing_canonical_form(&json)
let mut defined_names = HashSet::new();
parsing_canonical_form(&json, &mut defined_names)
}

/// Returns the [Parsing Canonical Form] of `self` that is self contained (not dependent on
/// any definitions in `schemata`)
///
/// [Parsing Canonical Form]:
/// https://avro.apache.org/docs/current/specification/#parsing-canonical-form-for-schemas
pub fn independent_canonical_form(&self, schemata: &Vec<Schema>) -> Result<String, Error> {
let mut this = self.clone();
this.denormalize(schemata)?;
Ok(this.canonical_form())
}

/// Generate [fingerprint] of Schema's [Parsing Canonical Form].
Expand Down Expand Up @@ -1246,6 +1258,41 @@ impl Schema {
attributes,
})
}

fn denormalize(&mut self, schemata: &Vec<Schema>) -> AvroResult<()> {
match self {
Schema::Ref { name } => {
let replacement_schema = schemata
.iter()
.find(|s| s.name().map(|n| *n == *name).unwrap_or(false));
if let Some(schema) = replacement_schema {
let mut denorm = schema.clone();
denorm.denormalize(schemata)?;
*self = denorm;
} else {
return Err(Error::SchemaResolutionError(name.clone()));
}
}
Schema::Record(record_schema) => {
for field in &mut record_schema.fields {
field.schema.denormalize(schemata)?;
}
}
Schema::Array(array_schema) => {
array_schema.items.denormalize(schemata)?;
}
Schema::Map(map_schema) => {
map_schema.types.denormalize(schemata)?;
}
Schema::Union(union_schema) => {
for schema in &mut union_schema.schemas {
schema.denormalize(schemata)?;
}
}
_ => (),
}
Ok(())
}
}

impl Parser {
Expand Down Expand Up @@ -2245,19 +2292,39 @@ impl Serialize for RecordField {

/// Parses a **valid** avro schema into the Parsing Canonical Form.
/// https://avro.apache.org/docs/current/specification/#parsing-canonical-form-for-schemas
fn parsing_canonical_form(schema: &Value) -> String {
fn parsing_canonical_form(schema: &Value, defined_names: &mut HashSet<String>) -> String {
match schema {
Value::Object(map) => pcf_map(map),
Value::Object(map) => pcf_map(map, defined_names),
Value::String(s) => pcf_string(s),
Value::Array(v) => pcf_array(v),
Value::Array(v) => pcf_array(v, defined_names),
json => panic!("got invalid JSON value for canonical form of schema: {json}"),
}
}

fn pcf_map(schema: &Map<String, Value>) -> String {
fn pcf_map(schema: &Map<String, Value>, defined_names: &mut HashSet<String>) -> String {
// Look for the namespace variant up front.
let ns = schema.get("namespace").and_then(|v| v.as_str());
let typ = schema.get("type").and_then(|v| v.as_str());
let raw_name = schema.get("name").and_then(|v| v.as_str());
let name = if is_named_type(typ) {
Some(format!(
"{}{}",
ns.map_or("".to_string(), |n| { format!("{n}.") }),
raw_name.unwrap_or_default()
))
} else {
None
};

//if this is already a defined type, early return
if let Some(ref n) = name {
if defined_names.contains(n) {
return pcf_string(n);
} else {
defined_names.insert(n.clone());
}
}

let mut fields = Vec::new();
for (k, v) in schema {
// Reduce primitive types to their simple form. ([PRIMITIVE] rule)
Expand All @@ -2280,17 +2347,10 @@ fn pcf_map(schema: &Map<String, Value>) -> String {

// Fully qualify the name, if it isn't already ([FULLNAMES] rule).
if k == "name" {
// Invariant: Only valid schemas. Must be a string.
let name = v.as_str().unwrap();
let n = match ns {
Some(namespace) if is_named_type(typ) && !name.contains('.') => {
Cow::Owned(format!("{namespace}.{name}"))
}
_ => Cow::Borrowed(name),
};

fields.push((k, format!("{}:{}", pcf_string(k), pcf_string(&n))));
continue;
if let Some(ref n) = name {
fields.push(("name", format!("{}:{}", pcf_string(k), pcf_string(n))));
continue;
}
}

// Strip off quotes surrounding "size" type, if they exist ([INTEGERS] rule).
Expand All @@ -2306,7 +2366,11 @@ fn pcf_map(schema: &Map<String, Value>) -> String {
// For anything else, recursively process the result.
fields.push((
k,
format!("{}:{}", pcf_string(k), parsing_canonical_form(v)),
format!(
"{}:{}",
pcf_string(k),
parsing_canonical_form(v, defined_names)
),
));
}

Expand All @@ -2327,10 +2391,10 @@ fn is_named_type(typ: Option<&str>) -> bool {
)
}

fn pcf_array(arr: &[Value]) -> String {
fn pcf_array(arr: &[Value], defined_names: &mut HashSet<String>) -> String {
let inter = arr
.iter()
.map(parsing_canonical_form)
.map(|a| parsing_canonical_form(a, defined_names))
.collect::<Vec<String>>()
.join(",");
format!("[{inter}]")
Expand Down Expand Up @@ -2376,6 +2440,7 @@ pub trait AvroSchema {
#[cfg(feature = "derive")]
pub mod derive {
use super::*;
use std::borrow::Cow;

/// Trait for types that serve as fully defined components inside an Avro data model. Derive
/// implementation available through `derive` feature. This is what is implemented by
Expand Down Expand Up @@ -3424,7 +3489,7 @@ mod tests {
assert_eq!(schema, expected);

let canonical_form = &schema.canonical_form();
let expected = r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}}]}"#;
let expected = r#"{"name":"record","type":"record","fields":[{"name":"enum","type":{"name":"enum","type":"enum","symbols":["one","two","three"]}},{"name":"next","type":"enum"}]}"#;
assert_eq!(canonical_form, &expected);

Ok(())
Expand Down Expand Up @@ -3508,7 +3573,7 @@ mod tests {
assert_eq!(schema, expected);

let canonical_form = &schema.canonical_form();
let expected = r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":{"name":"fixed","type":"fixed","size":456}}]}"#;
let expected = r#"{"name":"record","type":"record","fields":[{"name":"fixed","type":{"name":"fixed","type":"fixed","size":456}},{"name":"next","type":"fixed"}]}"#;
assert_eq!(canonical_form, &expected);

Ok(())
Expand Down
Loading

0 comments on commit 17bd39c

Please sign in to comment.