diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 5323251b07e..2c8a59399de 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -15,65 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! Provides `async` API for reading parquet files as +//! [`ParquetRecordBatchStreamBuilder`]: `async` API for reading Parquet files as //! [`RecordBatch`]es //! -//! ``` -//! # #[tokio::main(flavor="current_thread")] -//! # async fn main() { -//! # -//! # use arrow_array::RecordBatch; -//! # use arrow::util::pretty::pretty_format_batches; -//! # use futures::TryStreamExt; -//! # use tokio::fs::File; -//! # -//! # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; -//! # -//! # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { -//! # let formatted = pretty_format_batches(batches).unwrap().to_string(); -//! # let actual_lines: Vec<_> = formatted.trim().lines().collect(); -//! # assert_eq!( -//! # &actual_lines, expected_lines, -//! # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", -//! # expected_lines, actual_lines -//! # ); -//! # } -//! # -//! let testdata = arrow::util::test_util::parquet_test_data(); -//! let path = format!("{}/alltypes_plain.parquet", testdata); -//! let file = File::open(path).await.unwrap(); +//! This can be used to decode a Parquet file in streaming fashion (without +//! downloading the whole file at once) from a remote source, such as an object store. //! -//! let builder = ParquetRecordBatchStreamBuilder::new(file) -//! .await -//! .unwrap() -//! .with_batch_size(3); -//! -//! let file_metadata = builder.metadata().file_metadata(); -//! let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]); -//! -//! let stream = builder.with_projection(mask).build().unwrap(); -//! let results = stream.try_collect::>().await.unwrap(); -//! assert_eq!(results.len(), 3); -//! -//! assert_batches_eq( -//! &results, -//! &[ -//! "+----------+-------------+-----------+", -//! "| bool_col | tinyint_col | float_col |", -//! "+----------+-------------+-----------+", -//! "| true | 0 | 0.0 |", -//! "| false | 1 | 1.1 |", -//! "| true | 0 | 0.0 |", -//! "| false | 1 | 1.1 |", -//! "| true | 0 | 0.0 |", -//! "| false | 1 | 1.1 |", -//! "| true | 0 | 0.0 |", -//! "| false | 1 | 1.1 |", -//! "+----------+-------------+-----------+", -//! ], -//! ); -//! # } -//! ``` +//! See example on [`ParquetRecordBatchStreamBuilder::new`] use std::collections::VecDeque; use std::fmt::Formatter; @@ -249,53 +197,153 @@ impl ArrowReaderMetadata { /// breaking the pre-existing ParquetRecordBatchStreamBuilder API pub struct AsyncReader(T); -/// A builder used to construct a [`ParquetRecordBatchStream`] for `async` reading of a parquet file +/// A builder for reading parquet files from an `async` source as [`ParquetRecordBatchStream`] /// -/// In particular, this handles reading the parquet file metadata, allowing consumers +/// This builder handles reading the parquet file metadata, allowing consumers /// to use this information to select what specific columns, row groups, etc... /// they wish to be read by the resulting stream /// +/// See examples on [`ParquetRecordBatchStreamBuilder::new`] +/// /// See [`ArrowReaderBuilder`] for additional member functions pub type ParquetRecordBatchStreamBuilder = ArrowReaderBuilder>; impl ParquetRecordBatchStreamBuilder { - /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file + /// Create a new [`ParquetRecordBatchStreamBuilder`] for reading from the + /// specified source. /// /// # Example + /// ``` + /// # #[tokio::main(flavor="current_thread")] + /// # async fn main() { + /// # + /// # use arrow_array::RecordBatch; + /// # use arrow::util::pretty::pretty_format_batches; + /// # use futures::TryStreamExt; + /// # + /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; + /// # + /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { + /// # let formatted = pretty_format_batches(batches).unwrap().to_string(); + /// # let actual_lines: Vec<_> = formatted.trim().lines().collect(); + /// # assert_eq!( + /// # &actual_lines, expected_lines, + /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + /// # expected_lines, actual_lines + /// # ); + /// # } + /// # + /// # let testdata = arrow::util::test_util::parquet_test_data(); + /// # let path = format!("{}/alltypes_plain.parquet", testdata); + /// // Use tokio::fs::File to read data using an async I/O. This can be replaced with + /// // another async I/O reader such as a reader from an object store. + /// let file = tokio::fs::File::open(path).await.unwrap(); + /// + /// // Configure options for reading from the async souce + /// let builder = ParquetRecordBatchStreamBuilder::new(file) + /// .await + /// .unwrap(); + /// // Building the stream opens the parquet file (reads metadata, etc) and returns + /// // a stream that can be used to incrementally read the data in batches + /// let stream = builder.build().unwrap(); + /// // In this example, we collect the stream into a Vec + /// // but real applications would likely process the batches as they are read + /// let results = stream.try_collect::>().await.unwrap(); + /// // Demonstrate the results are as expected + /// assert_batches_eq( + /// &results, + /// &[ + /// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + /// "| id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |", + /// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + /// "| 4 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30332f30312f3039 | 30 | 2009-03-01T00:00:00 |", + /// "| 5 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30332f30312f3039 | 31 | 2009-03-01T00:01:00 |", + /// "| 6 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30342f30312f3039 | 30 | 2009-04-01T00:00:00 |", + /// "| 7 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30342f30312f3039 | 31 | 2009-04-01T00:01:00 |", + /// "| 2 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30322f30312f3039 | 30 | 2009-02-01T00:00:00 |", + /// "| 3 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30322f30312f3039 | 31 | 2009-02-01T00:01:00 |", + /// "| 0 | true | 0 | 0 | 0 | 0 | 0.0 | 0.0 | 30312f30312f3039 | 30 | 2009-01-01T00:00:00 |", + /// "| 1 | false | 1 | 1 | 1 | 10 | 1.1 | 10.1 | 30312f30312f3039 | 31 | 2009-01-01T00:01:00 |", + /// "+----+----------+-------------+--------------+---------+------------+-----------+------------+------------------+------------+---------------------+", + /// ], + /// ); + /// # } + /// ``` + /// + /// # Example configuring options and reading metadata + /// + /// There are many options that control the behavior of the reader, such as + /// `with_batch_size`, `with_projection`, `with_filter`, etc... /// /// ``` - /// # use std::fs::metadata; - /// # use std::sync::Arc; - /// # use bytes::Bytes; - /// # use arrow_array::{Int32Array, RecordBatch}; - /// # use arrow_schema::{DataType, Field, Schema}; - /// # use parquet::arrow::arrow_reader::ArrowReaderMetadata; - /// # use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder}; - /// # use tempfile::tempfile; - /// # use futures::StreamExt; /// # #[tokio::main(flavor="current_thread")] /// # async fn main() { /// # - /// # let mut file = tempfile().unwrap(); - /// # let schema = Arc::new(Schema::new(vec![Field::new("i32", DataType::Int32, false)])); - /// # let mut writer = ArrowWriter::try_new(&mut file, schema.clone(), None).unwrap(); - /// # let batch = RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(vec![1, 2, 3]))]).unwrap(); - /// # writer.write(&batch).unwrap(); - /// # writer.close().unwrap(); - /// // Open async file containing parquet data - /// let mut file = tokio::fs::File::from_std(file); - /// // construct the reader - /// let mut reader = ParquetRecordBatchStreamBuilder::new(file) - /// .await.unwrap().build().unwrap(); - /// // Read batche - /// let batch: RecordBatch = reader.next().await.unwrap().unwrap(); + /// # use arrow_array::RecordBatch; + /// # use arrow::util::pretty::pretty_format_batches; + /// # use futures::TryStreamExt; + /// # + /// # use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; + /// # + /// # fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) { + /// # let formatted = pretty_format_batches(batches).unwrap().to_string(); + /// # let actual_lines: Vec<_> = formatted.trim().lines().collect(); + /// # assert_eq!( + /// # &actual_lines, expected_lines, + /// # "\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n", + /// # expected_lines, actual_lines + /// # ); + /// # } + /// # + /// # let testdata = arrow::util::test_util::parquet_test_data(); + /// # let path = format!("{}/alltypes_plain.parquet", testdata); + /// // As before, use tokio::fs::File to read data using an async I/O. + /// let file = tokio::fs::File::open(path).await.unwrap(); + /// + /// // Configure options for reading from the async source, in this case we set the batch size + /// // to 3 which produces 3 rows at a time. + /// let builder = ParquetRecordBatchStreamBuilder::new(file) + /// .await + /// .unwrap() + /// .with_batch_size(3); + /// + /// // We can also read the metadata to inspect the schema and other metadata + /// // before actually reading the data + /// let file_metadata = builder.metadata().file_metadata(); + /// // Specify that we only want to read the 1st, 2nd, and 6th columns + /// let mask = ProjectionMask::roots(file_metadata.schema_descr(), [1, 2, 6]); + /// + /// let stream = builder.with_projection(mask).build().unwrap(); + /// let results = stream.try_collect::>().await.unwrap(); + /// // Print out the results + /// assert_batches_eq( + /// &results, + /// &[ + /// "+----------+-------------+-----------+", + /// "| bool_col | tinyint_col | float_col |", + /// "+----------+-------------+-----------+", + /// "| true | 0 | 0.0 |", + /// "| false | 1 | 1.1 |", + /// "| true | 0 | 0.0 |", + /// "| false | 1 | 1.1 |", + /// "| true | 0 | 0.0 |", + /// "| false | 1 | 1.1 |", + /// "| true | 0 | 0.0 |", + /// "| false | 1 | 1.1 |", + /// "+----------+-------------+-----------+", + /// ], + /// ); + /// + /// // The results has 8 rows, so since we set the batch size to 3, we expect + /// // 3 batches, two with 3 rows each and the last batch with 2 rows. + /// assert_eq!(results.len(), 3); /// # } /// ``` pub async fn new(input: T) -> Result { Self::new_with_options(input, Default::default()).await } - /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided parquet file + /// Create a new [`ParquetRecordBatchStreamBuilder`] with the provided async source /// and [`ArrowReaderOptions`] pub async fn new_with_options(mut input: T, options: ArrowReaderOptions) -> Result { let metadata = ArrowReaderMetadata::load_async(&mut input, options).await?; @@ -352,6 +400,7 @@ impl ParquetRecordBatchStreamBuilder { } /// Read bloom filter for a column in a row group + /// /// Returns `None` if the column does not have a bloom filter /// /// We should call this function after other forms pruning, such as projection and predicate pushdown. @@ -415,6 +464,8 @@ impl ParquetRecordBatchStreamBuilder { } /// Build a new [`ParquetRecordBatchStream`] + /// + /// See examples on [`ParquetRecordBatchStreamBuilder::new`] pub fn build(self) -> Result> { let num_row_groups = self.metadata.row_groups().len();