Skip to content

Commit

Permalink
Add a "from" datum reader that can take reader schemata (#106)
Browse files Browse the repository at this point in the history
* Add a "from" datum reader that can take reader schemata

* Minor cleanp

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>

* Better reuse by delegating to the same method with schemata

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>

---------

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>
Co-authored-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>
  • Loading branch information
rayokota and martin-g authored Jan 20, 2025
1 parent 4411f75 commit c6a3366
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 10 deletions.
4 changes: 2 additions & 2 deletions avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,8 +898,8 @@ pub use decimal::Decimal;
pub use duration::{Days, Duration, Millis, Months};
pub use error::Error;
pub use reader::{
from_avro_datum, from_avro_datum_schemata, read_marker, GenericSingleObjectReader, Reader,
SpecificSingleObjectReader,
from_avro_datum, from_avro_datum_reader_schemata, from_avro_datum_schemata, read_marker,
GenericSingleObjectReader, Reader, SpecificSingleObjectReader,
};
pub use schema::{AvroSchema, Schema};
pub use ser::to_value;
Expand Down
34 changes: 31 additions & 3 deletions avro/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,14 +461,42 @@ pub fn from_avro_datum<R: Read>(
/// In case a reader `Schema` is provided, schema resolution will also be performed.
pub fn from_avro_datum_schemata<R: Read>(
writer_schema: &Schema,
schemata: Vec<&Schema>,
writer_schemata: Vec<&Schema>,
reader: &mut R,
reader_schema: Option<&Schema>,
) -> AvroResult<Value> {
let rs = ResolvedSchema::try_from(schemata)?;
from_avro_datum_reader_schemata(
writer_schema,
writer_schemata,
reader,
reader_schema,
Vec::with_capacity(0),
)
}

/// Decode a `Value` encoded in Avro format given the provided `Schema` and anything implementing `io::Read`
/// to read from.
/// If the writer schema is incomplete, i.e. contains `Schema::Ref`s then it will use the provided
/// schemata to resolve any dependencies.
///
/// In case a reader `Schema` is provided, schema resolution will also be performed.
pub fn from_avro_datum_reader_schemata<R: Read>(
writer_schema: &Schema,
writer_schemata: Vec<&Schema>,
reader: &mut R,
reader_schema: Option<&Schema>,
reader_schemata: Vec<&Schema>,
) -> AvroResult<Value> {
let rs = ResolvedSchema::try_from(writer_schemata)?;
let value = decode_internal(writer_schema, rs.get_names(), &None, reader)?;
match reader_schema {
Some(schema) => value.resolve(schema),
Some(schema) => {
if reader_schemata.is_empty() {
value.resolve(schema)
} else {
value.resolve_schemata(schema, reader_schemata)
}
}
None => Ok(value),
}
}
Expand Down
10 changes: 6 additions & 4 deletions avro/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,9 +628,7 @@ impl Value {
/// in the Avro specification for the full set of rules of schema
/// resolution.
pub fn resolve(self, schema: &Schema) -> AvroResult<Self> {
let enclosing_namespace = schema.namespace();
let rs = ResolvedSchema::try_from(schema)?;
self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None)
self.resolve_schemata(schema, Vec::with_capacity(0))
}

/// Attempt to perform schema resolution on the value, with the given
Expand All @@ -641,7 +639,11 @@ impl Value {
/// resolution.
pub fn resolve_schemata(self, schema: &Schema, schemata: Vec<&Schema>) -> AvroResult<Self> {
let enclosing_namespace = schema.namespace();
let rs = ResolvedSchema::try_from(schemata)?;
let rs = if schemata.is_empty() {
ResolvedSchema::try_from(schema)?
} else {
ResolvedSchema::try_from(schemata)?
};
self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None)
}

Expand Down
33 changes: 32 additions & 1 deletion avro/tests/to_from_avro_datum_schemata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
// under the License.

use apache_avro::{
from_avro_datum_schemata, to_avro_datum_schemata, types::Value, Codec, Reader, Schema, Writer,
from_avro_datum_reader_schemata, from_avro_datum_schemata, to_avro_datum_schemata,
types::Value, Codec, Reader, Schema, Writer,
};
use apache_avro_test_helper::{init, TestResult};

Expand Down Expand Up @@ -60,6 +61,36 @@ fn test_avro_3683_multiple_schemata_to_from_avro_datum() -> TestResult {
Ok(())
}

#[test]
fn avro_rs_106_test_multiple_schemata_to_from_avro_datum_with_resolution() -> TestResult {
init();

let record: Value = Value::Record(vec![(
String::from("field_b"),
Value::Record(vec![(String::from("field_a"), Value::Float(1.0))]),
)]);

let schemata: Vec<Schema> = Schema::parse_list(&[SCHEMA_A_STR, SCHEMA_B_STR])?;
let schemata: Vec<&Schema> = schemata.iter().collect();

// this is the Schema we want to use for write/read
let schema_b = schemata[1];
let expected: Vec<u8> = vec![0, 0, 128, 63];
let actual = to_avro_datum_schemata(schema_b, schemata.clone(), record.clone())?;
assert_eq!(actual, expected);

let value = from_avro_datum_reader_schemata(
schema_b,
schemata.clone(),
&mut actual.as_slice(),
Some(schema_b),
schemata,
)?;
assert_eq!(value, record);

Ok(())
}

#[test]
fn test_avro_3683_multiple_schemata_writer_reader() -> TestResult {
init();
Expand Down

0 comments on commit c6a3366

Please sign in to comment.