Skip to content
This repository has been archived by the owner on Jan 11, 2021. It is now read-only.

Commit

Permalink
parquet_derive WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
xrl committed Dec 1, 2018
1 parent a258939 commit 38ec590
Show file tree
Hide file tree
Showing 10 changed files with 428 additions and 147 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
Cargo.lock
/target
target
**/*.rs.bk
11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[package]
name = "parquet"
version = "0.3.0"
version = "0.4.2"
license = "Apache-2.0"
description = "Apache Parquet implementation in Rust"
authors = [
Expand All @@ -31,7 +31,7 @@ repository = "https://github.com/sunchao/parquet-rs"
keywords = ["parquet", "hadoop"]

[dependencies]
parquet-format = "2.4.0"
parquet-format = "2.5.0"
quick-error = "1.2.2"
byteorder = "1"
thrift = "0.0.4"
Expand All @@ -42,7 +42,12 @@ lz4 = "1.23"
zstd = "0.4"
chrono = "0.4"
num-bigint = "0.2"
syn = "0.15.21"
quote = "0.6.10"

[dev-dependencies]
lazy_static = "1"
rand = "0.4"
rand = "0.5"

[workspace]
members = ["parquet_derive", "parquet_derive_test"]
14 changes: 14 additions & 0 deletions parquet_derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "parquet_derive"
version = "0.1.0"
authors = ["Xavier Lange <xrlange@gmail.com>"]
edition = "2018"

[lib]
proc-macro = true

[dependencies]
proc-macro2 = "0.4"
quote = "0.6.10"
syn = { version = "0.15.22", features = ["extra-traits"] }
parquet = { path = ".." }
140 changes: 140 additions & 0 deletions parquet_derive/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#![feature(impl_trait_in_bindings)]
#![recursion_limit="128"]

extern crate proc_macro;
extern crate proc_macro2;
extern crate syn;
#[macro_use]
extern crate quote;

extern crate parquet;

#[allow(unused_imports)]
use syn::{ parse_macro_input, DeriveInput, Fields, Ident, Data, DataStruct, Type };

#[proc_macro_derive(ParquetRecordWriter)]
pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
let input : DeriveInput = parse_macro_input!(input as DeriveInput);
let ident = input.ident;
let fields : Fields = match input.data {
Data::Struct(DataStruct{fields, ..}) => {
fields
},
Data::Enum(_) => unimplemented!(),
Data::Union(_) => unimplemented!(),
};

let attrs = fields.iter().map(|f| {
f.ident.as_ref().unwrap()
});

let attrs_types : Vec<proc_macro2::Ident> = fields.iter().map(move |f| {
match &f.ty {
Type::Slice(_) => unimplemented!("boom: Slice"),
Type::Array(_) => unimplemented!("boom: Array"),
Type::Ptr(_) => unimplemented!("boom: Ptr"),
Type::Reference(a) => {
unimplemented!("boom: Reference {:#?}", a)
},
Type::BareFn(_) => unimplemented!("boom: BareFn"),
Type::Never(_) => unimplemented!("boom: Never"),
Type::Tuple(_) => unimplemented!("boom: Tuple"),
Type::Path(syn::TypePath{ path: syn::Path{ segments /* syn::punctuated::Punctuated{inner,..} */, .. }, ..} ) => {
let entry = segments.iter().next().unwrap();
entry.ident.clone()
// let sample_bool = quote! {
// bool
// };
// unimplemented!("{:#?} `{}`, `{}`", entry.ident == sample_bool.to_string(), entry.ident, sample_bool)
},
Type::TraitObject(_) => unimplemented!("boom: TraitObject"),
Type::ImplTrait(_) => unimplemented!("boom: ImplTrait"),
Type::Paren(_) => unimplemented!("boom: Paren"),
Type::Group(_) => unimplemented!("boom: Group"),
Type::Infer(_) => unimplemented!("boom: Infer"),
Type::Macro(_) => unimplemented!("boom: Macro"),
Type::Verbatim(_) => unimplemented!("boom: Verbatim"),
}
}).collect();

let _typed_writer_types : Vec<proc_macro2::TokenStream> = fields.iter().map(move |f| {
match &f.ty {
Type::Slice(_) => unimplemented!("boom: Slice"),
Type::Array(_) => unimplemented!("boom: Array"),
Type::Ptr(_) => unimplemented!("boom: Ptr"),
Type::Reference(_) => unimplemented!("boom: Reference"),
Type::BareFn(_) => unimplemented!("boom: BareFn"),
Type::Never(_) => unimplemented!("boom: Never"),
Type::Tuple(_) => unimplemented!("boom: Tuple"),
Type::Path(syn::TypePath{ path: syn::Path{ segments /* syn::punctuated::Punctuated{inner,..} */, .. }, ..} ) => {
let entry = segments.iter().next().unwrap();
let string = entry.ident.to_string();
match &string[..] {
"bool" => quote!{ parquet::data_type::BoolType },
o => unimplemented!("don't know {}", o)
}
},
Type::TraitObject(_) => unimplemented!("boom: TraitObject"),
Type::ImplTrait(_) => unimplemented!("boom: ImplTrait"),
Type::Paren(_) => unimplemented!("boom: Paren"),
Type::Group(_) => unimplemented!("boom: Group"),
Type::Infer(_) => unimplemented!("boom: Infer"),
Type::Macro(_) => unimplemented!("boom: Macro"),
Type::Verbatim(_) => unimplemented!("boom: Verbatim"),
}
}).collect();

let column_writer_variant : Vec<proc_macro2::TokenStream> = fields.iter().map(move |f| {
match &f.ty {
Type::Slice(_) => unimplemented!("boom: Slice"),
Type::Array(_) => unimplemented!("boom: Array"),
Type::Ptr(_) => unimplemented!("boom: Ptr"),
Type::Reference(_) => unimplemented!("boom: Reference"),
Type::BareFn(_) => unimplemented!("boom: BareFn"),
Type::Never(_) => unimplemented!("boom: Never"),
Type::Tuple(_) => unimplemented!("boom: Tuple"),
Type::Path(syn::TypePath{ path: syn::Path{ segments /* syn::punctuated::Punctuated{inner,..} */, .. }, ..} ) => {
let entry = segments.iter().next().unwrap();
let string = entry.ident.to_string();
match &string[..] {
"bool" => quote!{ parquet::column::writer::ColumnWriter::BoolColumnWriter },
o => unimplemented!("don't know {}", o)
}
},
Type::TraitObject(_) => unimplemented!("boom: TraitObject"),
Type::ImplTrait(_) => unimplemented!("boom: ImplTrait"),
Type::Paren(_) => unimplemented!("boom: Paren"),
Type::Group(_) => unimplemented!("boom: Group"),
Type::Infer(_) => unimplemented!("boom: Infer"),
Type::Macro(_) => unimplemented!("boom: Macro"),
Type::Verbatim(_) => unimplemented!("boom: Verbatim"),
}
}).collect();

let expanded = quote! {
impl RecordWriter<#ident> for &[#ident] {
fn write_to_row_group(&self, row_group_writer: &mut Box<parquet::file::writer::RowGroupWriter>) {
let mut row_group_writer = row_group_writer;
#(
{
let vals : Vec<#attrs_types> = self.iter().map(|x| x.#attrs).collect();
let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
if let #column_writer_variant(ref mut typed) = column_writer {
typed.write_batch(&vals[..], None, None).unwrap();

}
row_group_writer.close_column(column_writer).unwrap();
// TODO: the below did not work, I could not get the row_group to close the typed_writer
// perhaps this method is only intended for testing? should it be behind a #[cfg(test)]?
// let mut typed_writer : parquet::column::writer::ColumnWriterImpl<#typed_writer_types> = parquet::column::writer::get_typed_column_writer(column_writer);
// typed_writer.write_batch(&vals[..], None, None).unwrap();
// typed_writer.close().unwrap();
}
);*
}
}
};

// Hand the output tokens back to the compiler
proc_macro::TokenStream::from(expanded)
}
9 changes: 9 additions & 0 deletions parquet_derive_test/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "parquet_derive_test"
version = "0.1.0"
authors = ["Xavier Lange <xrlange@gmail.com>"]
edition = "2018"

[dependencies]
parquet = {path = ".."}
parquet_derive = {path = "../parquet_derive"}
88 changes: 88 additions & 0 deletions parquet_derive_test/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
extern crate parquet;
#[macro_use]
extern crate parquet_derive;

//use parquet::file::writer::SerializedRowGroupWriter;
use parquet::file::writer::RecordWriter;

#[derive(ParquetRecordWriter)]
//struct ACompleteRecord<'a> {
struct ACompleteRecord {
pub a_bool: bool,
pub a2_bool: bool,
// pub a_str: &'a str
}

//impl RecordWriter<DumbRecord> for &[DumbRecord] {
// fn write_to_row_group(&self, row_group_writer: SerializedRowGroupWriter) {
// unimplemented!()
// }
//}

#[cfg(test)]
mod tests {
use super::*;

use std::rc::Rc;
use parquet::schema::{
parser::parse_message_type
};
use parquet::file::{
properties::WriterProperties,
writer::{ FileWriter, SerializedFileWriter }
};
use std::{fs, env, io::Write};

#[test]
fn hello() {
let file = get_temp_file("test_parquet_derive_hello", &[]);
let schema_str = "message schema {
REQUIRED boolean a_bool;
REQUIRED boolean a2_bool;
}";
let schema = Rc::new(parse_message_type(schema_str).unwrap());

let props = Rc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();

let a_str = "hi mom".to_owned();
let drs : Vec<ACompleteRecord> = vec![
ACompleteRecord {
a_bool: true,
a2_bool: false,
// a_str: &a_str[..],
}
];
let chunks = &drs[..];

let mut row_group = writer.next_row_group().unwrap();
chunks.write_to_row_group(&mut row_group);
writer.close_row_group(row_group).unwrap();
writer.close().unwrap();
}

/// Returns file handle for a temp file in 'target' directory with a provided content
pub fn get_temp_file(file_name: &str, content: &[u8]) -> fs::File {
// build tmp path to a file in "target/debug/testdata"
let mut path_buf = env::current_dir().unwrap();
path_buf.push("target");
path_buf.push("debug");
path_buf.push("testdata");
fs::create_dir_all(&path_buf).unwrap();
path_buf.push(file_name);

// write file content
let mut tmp_file = fs::File::create(path_buf.as_path()).unwrap();
tmp_file.write_all(content).unwrap();
tmp_file.sync_all().unwrap();

// return file handle for both read and write
let file = fs::OpenOptions::new()
.read(true)
.write(true)
.open(path_buf.as_path());
assert!(file.is_ok());
file.unwrap()
}

}
26 changes: 14 additions & 12 deletions src/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
//!
//! Provides access to file and row group readers and writers, record API, metadata, etc.
//!
//! See [`reader::SerializedFileReader`] or [`writer::SerializedFileWriter`] for a
//! starting reference, [`metadata::ParquetMetaData`] for file metadata,
//! and [`statistics`] for working with statistics.
//! See [`reader::SerializedFileReader`](reader/struct.SerializedFileReader.html) or
//! [`writer::SerializedFileWriter`](writer/struct.SerializedFileWriter.html) for a
//! starting reference, [`metadata::ParquetMetaData`](metadata/index.html) for file
//! metadata, and [`statistics`](statistics/index.html) for working with statistics.
//!
//! # Example of reading an existing file
//!
//! ```rust
//! use std::fs::File;
//! use std::path::Path;
//! use parquet::file::reader::{FileReader, SerializedFileReader};
//! use std::{fs::File, path::Path};
//!
//! let path = Path::new("data/alltypes_plain.parquet");
//! let file = File::open(&path).unwrap();
Expand All @@ -44,13 +44,15 @@
//! # Example of writing a new file
//!
//! ```rust
//!use std::fs;
//! use std::path::Path;
//! use std::rc::Rc;
//! use std::{fs, path::Path, rc::Rc};
//!
//! use parquet::file::properties::WriterProperties;
//! use parquet::file::writer::{FileWriter, SerializedFileWriter};
//! use parquet::schema::parser::parse_message_type;
//! use parquet::{
//! file::{
//! properties::WriterProperties,
//! writer::{FileWriter, SerializedFileWriter},
//! },
//! schema::parser::parse_message_type,
//! };
//!
//! let path = Path::new("target/debug/examples/sample.parquet");
//!
Expand Down Expand Up @@ -78,8 +80,8 @@
pub mod metadata;
pub mod properties;
pub mod reader;
pub mod writer;
pub mod statistics;
pub mod writer;

const FOOTER_SIZE: usize = 8;
const PARQUET_MAGIC: [u8; 4] = [b'P', b'A', b'R', b'1'];
5 changes: 5 additions & 0 deletions src/file/writer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod record_writer;
pub use self::record_writer::*;

mod writer;
pub use self::writer::*;
5 changes: 5 additions & 0 deletions src/file/writer/record_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use super::RowGroupWriter;

pub trait RecordWriter<T> {
fn write_to_row_group(&self, row_group_writer: &mut Box<RowGroupWriter>);
}
Loading

0 comments on commit 38ec590

Please sign in to comment.