diff --git a/Cargo.lock b/Cargo.lock index 2ddd37e8..bfe70fb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -238,6 +238,7 @@ dependencies = [ "thousands", "tokio", "tokio-stream", + "tokio-util", "tracing", "udf_metrics", "url", diff --git a/Cargo.toml b/Cargo.toml index 1e218ea6..a0d08149 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,7 +147,7 @@ tokio-postgres = { version = "0.7.10", features = [ "with-serde_json-1" ] } tokio-process-stream = { version = "0.4.0" } tokio-stream = { version = "0.1", features = [ "io-util", "sync", "signal" ] } tokio-tungstenite = { version = "0.21.0", features = [ "native-tls-vendored" ] } -tokio-util = { version = "0.7.13" } +tokio-util = { version = "0.7.13", features = [ "io" ] } tonic = { version = "0.12.3", features = [ "gzip" ] } tonic-build = "0.12.3" tonic-health = "0.12.3" diff --git a/crates/application/Cargo.toml b/crates/application/Cargo.toml index 9e4909d0..80525fd0 100644 --- a/crates/application/Cargo.toml +++ b/crates/application/Cargo.toml @@ -67,6 +67,7 @@ thiserror = { workspace = true } thousands = { workspace = true } tokio = { workspace = true } tokio-stream = { workspace = true } +tokio-util = { workspace = true } tracing = { workspace = true } udf_metrics = { path = "../udf_metrics" } url = { workspace = true } diff --git a/crates/application/src/exports/export_storage.rs b/crates/application/src/exports/export_storage.rs index 5b228561..12e56001 100644 --- a/crates/application/src/exports/export_storage.rs +++ b/crates/application/src/exports/export_storage.rs @@ -31,6 +31,7 @@ use serde::{ }; use serde_json::json; use storage::StorageExt; +use tokio_util::io::StreamReader; use usage_tracking::{ FunctionUsageTracker, StorageCallTracker, @@ -143,7 +144,7 @@ pub async fn write_storage_table<'a, 'b: 'a, RT: Runtime>( file_stream.content_length as u64, ); zip_snapshot_upload - .stream_full_file(path, file_stream.stream) + .stream_full_file(path, StreamReader::new(file_stream.stream)) .await?; } Ok(()) diff --git a/crates/application/src/exports/zip_uploader.rs b/crates/application/src/exports/zip_uploader.rs index 4f0ebddf..f5108c61 100644 --- a/crates/application/src/exports/zip_uploader.rs +++ b/crates/application/src/exports/zip_uploader.rs @@ -15,9 +15,8 @@ use common::{ types::TableName, }; use futures::{ - stream::BoxStream, + pin_mut, AsyncWriteExt, - TryStreamExt, }; use serde_json::{ json, @@ -28,6 +27,7 @@ use shape_inference::{ ShapeConfig, }; use storage::ChannelWriter; +use tokio::io::AsyncBufRead; use value::export::ValueFormat; static AFTER_DOCUMENTS_CLEAN: Bytes = Bytes::from_static("\n".as_bytes()); @@ -96,35 +96,22 @@ impl<'a> ZipSnapshotUpload<'a> { let writer = ZipFileWriter::new(out); let mut zip_snapshot_upload = Self { writer }; zip_snapshot_upload - .write_full_file(format!("README.md"), README_MD_CONTENTS) + .stream_full_file("README.md".to_owned(), README_MD_CONTENTS.as_bytes()) .await?; Ok(zip_snapshot_upload) } - async fn write_full_file(&mut self, path: String, contents: &str) -> anyhow::Result<()> { - let builder = ZipEntryBuilder::new(path, Compression::Deflate) - .unix_permissions(ZIP_ENTRY_PERMISSIONS); - let mut entry_writer = self.writer.write_entry_stream(builder.build()).await?; - entry_writer - .compat_mut_write() - .write_all(contents.as_bytes()) - .await?; - entry_writer.close().await?; - Ok(()) - } - #[minitrace::trace] pub async fn stream_full_file( &mut self, path: String, - mut contents: BoxStream<'_, std::io::Result>, + contents: impl AsyncBufRead, ) -> anyhow::Result<()> { let builder = ZipEntryBuilder::new(path, Compression::Deflate) .unix_permissions(ZIP_ENTRY_PERMISSIONS); let mut entry_writer = self.writer.write_entry_stream(builder.build()).await?; - while let Some(chunk) = contents.try_next().await? { - entry_writer.compat_mut_write().write_all(&chunk).await?; - } + pin_mut!(contents); + tokio::io::copy_buf(&mut contents, &mut entry_writer).await?; entry_writer.close().await?; Ok(()) }