Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Client refinement (#11)
Browse files Browse the repository at this point in the history
* enhance test for async api

* avoid dup file name corruption

* fmtted

* update version num

* update version num

* fix relative path

* update version num

* add log

* update version num

* fix an important typo

* update version num
  • Loading branch information
ScottLinnn authored May 1, 2024
1 parent 376db54 commit 7db5ef6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 26 deletions.
6 changes: 3 additions & 3 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "istziio-client"
version = "0.1.7"
version = "0.1.11"
edition = "2018"
authors = ["Shuning Lin <86640627+ScottLinnn@users.noreply.github.com>", "J-HowHuang <howardhuang2001@gmail.com>"]
description = "Client application for ISTZIIO OLAP database I/O cache service"
Expand All @@ -12,7 +12,8 @@ readme = "README.md"
include = [
"src/client_api.rs",
"src/storage_client.rs",
"src/lib.rs"
"src/lib.rs",
"src/benchmark.rs"
]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand All @@ -30,7 +31,6 @@ async-trait = "0.1"
rocket = { version = "0.5.0", features = ["json"] }
reqwest = { version = "0.11", features = ["stream", "json"] }


[[bin]]
name = "benchmark"
path = "src/benchmark.rs"
48 changes: 25 additions & 23 deletions client/src/storage_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,19 @@ impl StorageClientImpl {
}

pub fn local_cache_path() -> String {
String::from("./istziio_client_cache/")
let home = std::env::var("HOME").unwrap();
String::from(home + "/istziio_client_cache/")
}

/// Fetch all data of a table, call get_path() to get the file name that stores the table
pub async fn read_entire_table(&self, table: TableId) -> Result<Receiver<RecordBatch>> {
// let mut local_path = self.local_cache.clone();
let file_path = self.get_path(table)?;
let mut file_path = self.get_path(table)?;
// local_path.push_str(&file_path);

if !self.use_local_cache {
let start = std::time::Instant::now();
let _ = self.fetch_file(&file_path).await;
file_path = self.fetch_file(&file_path).await.unwrap();
let duration = start.elapsed();
println!("Time used to fetch file: {:?}", duration);
}
Expand All @@ -89,12 +90,12 @@ impl StorageClientImpl {

pub async fn read_entire_table_sync(&self, table: TableId) -> Result<Vec<RecordBatch>> {
// let mut local_path = self.local_cache.clone();
let file_path = self.get_path(table)?;
let mut file_path = self.get_path(table)?;
// local_path.push_str(&file_path);

if !self.use_local_cache {
let start = std::time::Instant::now();
let _ = self.fetch_file(&file_path).await;
file_path = self.fetch_file(&file_path).await.unwrap();
let duration = start.elapsed();
println!("Time used to fetch file: {:?}", duration);
}
Expand Down Expand Up @@ -142,7 +143,7 @@ impl StorageClientImpl {
}

/// Fetch file from I/O server
async fn fetch_file(&self, file_path: &str) -> Result<()> {
async fn fetch_file(&self, file_path: &str) -> Result<String> {
// First, trim the path to leave only the file name after the last '/'
let trimmed_path: Vec<&str> = file_path.split('/').collect();
let file_name = trimmed_path.last().ok_or_else(|| {
Expand All @@ -169,35 +170,35 @@ impl StorageClientImpl {

let mut file_path = self.local_cache.clone();

// If file_path exists, append a auto increaseing number to the file name
// and re-detect duplication, until a unique file name is found
file_path.push_str(file_name);
let mut dup_id = 0;
while Path::new(&file_path).exists() {
dup_id += 1;
file_path = self.local_cache.clone();
file_path.push_str(file_name);
file_path.push_str(&format!("_{}", dup_id));
}

let mut file = File::create(&file_path)?;

file.write_all(&file_contents)?;
println!("parquet written to {}", file_path);
// STREAM VERSION CODE - NOT IN USE!

// let mut file_path = self.local_cache.clone();

// file_path.push_str(file_name);
// let mut file = File::create(file_path)?;

// let mut stream = response.bytes_stream();
// while let Some(chunk) = stream.next().await {
// let data = chunk?;
// file.write_all(&data)?;
// }

// let duration = start.elapsed();
// println!("Time used to receive stream: {:?}", duration);

Ok(())
if dup_id > 0 {
Ok(file_name.to_string() + "_" + dup_id.to_string().as_str())
} else {
Ok(file_name.to_string())
}
}

async fn read_pqt_all(file_path: &str, sender: Sender<RecordBatch>) -> Result<()> {
// If the file exists, open it and read the data. Otherwise, call fetch_file to get the file
let mut local_path = StorageClientImpl::local_cache_path();

local_path.push_str(file_path);
println!("read_pqt_all Reading from local_path: {:?}", local_path);
let file = File::open(local_path)?;
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let mut reader = builder.build()?;
Expand All @@ -213,11 +214,12 @@ impl StorageClientImpl {
// print curr time
let start = std::time::Instant::now();
local_path.push_str(file_path);
print!(
println!(
"read_pqt_all_sync Reading from local_path: {:?}",
local_path
);
let file = File::open(local_path)?;
println!("File opened");
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
let mut reader = builder.build()?;
let mut result: Vec<RecordBatch> = Vec::new();
Expand Down

0 comments on commit 7db5ef6

Please sign in to comment.