Skip to content

Commit

Permalink
Clean up ZipSnapshotUpload a bit (#33219)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 923d92714f4c3f913999297e3de23dd5b1229344
  • Loading branch information
goffrie authored and Convex, Inc. committed Jan 16, 2025
1 parent 8caeeac commit 84679db
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ tokio-postgres = { version = "0.7.10", features = [ "with-serde_json-1" ] }
tokio-process-stream = { version = "0.4.0" }
tokio-stream = { version = "0.1", features = [ "io-util", "sync", "signal" ] }
tokio-tungstenite = { version = "0.21.0", features = [ "native-tls-vendored" ] }
tokio-util = { version = "0.7.13" }
tokio-util = { version = "0.7.13", features = [ "io" ] }
tonic = { version = "0.12.3", features = [ "gzip" ] }
tonic-build = "0.12.3"
tonic-health = "0.12.3"
Expand Down
1 change: 1 addition & 0 deletions crates/application/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ thiserror = { workspace = true }
thousands = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
udf_metrics = { path = "../udf_metrics" }
url = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion crates/application/src/exports/export_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use serde::{
};
use serde_json::json;
use storage::StorageExt;
use tokio_util::io::StreamReader;
use usage_tracking::{
FunctionUsageTracker,
StorageCallTracker,
Expand Down Expand Up @@ -143,7 +144,7 @@ pub async fn write_storage_table<'a, 'b: 'a, RT: Runtime>(
file_stream.content_length as u64,
);
zip_snapshot_upload
.stream_full_file(path, file_stream.stream)
.stream_full_file(path, StreamReader::new(file_stream.stream))
.await?;
}
Ok(())
Expand Down
25 changes: 6 additions & 19 deletions crates/application/src/exports/zip_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ use common::{
types::TableName,
};
use futures::{
stream::BoxStream,
pin_mut,
AsyncWriteExt,
TryStreamExt,
};
use serde_json::{
json,
Expand All @@ -28,6 +27,7 @@ use shape_inference::{
ShapeConfig,
};
use storage::ChannelWriter;
use tokio::io::AsyncBufRead;
use value::export::ValueFormat;

static AFTER_DOCUMENTS_CLEAN: Bytes = Bytes::from_static("\n".as_bytes());
Expand Down Expand Up @@ -96,35 +96,22 @@ impl<'a> ZipSnapshotUpload<'a> {
let writer = ZipFileWriter::new(out);
let mut zip_snapshot_upload = Self { writer };
zip_snapshot_upload
.write_full_file(format!("README.md"), README_MD_CONTENTS)
.stream_full_file("README.md".to_owned(), README_MD_CONTENTS.as_bytes())
.await?;
Ok(zip_snapshot_upload)
}

async fn write_full_file(&mut self, path: String, contents: &str) -> anyhow::Result<()> {
let builder = ZipEntryBuilder::new(path, Compression::Deflate)
.unix_permissions(ZIP_ENTRY_PERMISSIONS);
let mut entry_writer = self.writer.write_entry_stream(builder.build()).await?;
entry_writer
.compat_mut_write()
.write_all(contents.as_bytes())
.await?;
entry_writer.close().await?;
Ok(())
}

#[minitrace::trace]
pub async fn stream_full_file(
&mut self,
path: String,
mut contents: BoxStream<'_, std::io::Result<Bytes>>,
contents: impl AsyncBufRead,
) -> anyhow::Result<()> {
let builder = ZipEntryBuilder::new(path, Compression::Deflate)
.unix_permissions(ZIP_ENTRY_PERMISSIONS);
let mut entry_writer = self.writer.write_entry_stream(builder.build()).await?;
while let Some(chunk) = contents.try_next().await? {
entry_writer.compat_mut_write().write_all(&chunk).await?;
}
pin_mut!(contents);
tokio::io::copy_buf(&mut contents, &mut entry_writer).await?;
entry_writer.close().await?;
Ok(())
}
Expand Down

0 comments on commit 84679db

Please sign in to comment.