From ac4b2ba136e7fa82336aa91c2088ff407e49e603 Mon Sep 17 00:00:00 2001 From: cy Date: Sat, 26 Apr 2025 00:39:22 -0400 Subject: [PATCH] fancy nar cooking --- src/push.rs | 2 +- src/uploader.rs | 43 ++++++++++++++++++++++++++++++++++--------- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/push.rs b/src/push.rs index 8177864..18f74b9 100644 --- a/src/push.rs +++ b/src/push.rs @@ -160,7 +160,7 @@ impl Push { } async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { - let mut uploads = Vec::with_capacity(10); + let mut uploads = Vec::new(); loop { if let Some(path_to_upload) = rx.recv().await { diff --git a/src/uploader.rs b/src/uploader.rs index 390c3ac..8f4efaa 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -21,7 +21,7 @@ use tokio::{ pin, }; use tokio_stream::wrappers::ReadDirStream; -use tokio_util::io::InspectReader; +use tokio_util::io::{InspectReader, read_buf}; use tracing::debug; use crate::{bindings::AsyncWriteAdapter, path_info::PathInfo, store::Store}; @@ -55,7 +55,16 @@ impl<'a> Uploader<'a> { pub async fn upload(&self) -> Result<()> { let mut nar_temp = File::open(tempfile::Builder::new().tempfile()?.path()).await?; - self.make_nar(&mut nar_temp); + self.make_nar(&mut nar_temp).await; + + // this goes to the .narinfo file + let mut nar_hasher = Sha256::new(); + // this is the URL for file .narinfo points to + let mut file_hasher = Sha256::new(); + let nar_reader = compress_and_hash_nar(nar_temp, &mut nar_hasher, &mut file_hasher); + + let buf = BytesMut::with_capacity(MULTIPART_CUTOFF); + let if first_chunk.len() < MULTIPART_CUTOFF { let put_object = self @@ -163,10 +172,6 @@ impl<'a> Uploader<'a> { Ok(nar_info) } - fn nar_url(&self, compressed_nar_hash: &[u8]) -> String { - format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash)) - } - async fn make_nar(&self, sink: &mut File) -> Result<()> { let nar = nar::open(sink).await?; let path = self.path.absolute_path(); @@ -227,9 +232,29 @@ async fn nar_from_dir(path: PathBuf, node: &mut nar::Directory<'_, '_>) -> Resul Ok(()) } -async fn compress_and_hash_nar(nar: File, nar_hasher: &mut Sha256) -> impl AsyncRead { - let nar_reader = InspectReader::new(nar, |x| nar_hasher.update(x)); +fn compress_and_hash_nar( + nar_file: File, + nar_hasher: &mut Sha256, + compressed_nar_hasher: &mut Sha256, +) -> impl AsyncRead { + let nar_reader = InspectReader::new(nar_file, |x| nar_hasher.update(x)); let nar_buf_reader = BufReader::new(nar_reader); - ZstdEncoder::with_quality(nar_buf_reader, Level::Default) + let compressed_nar_reader = ZstdEncoder::with_quality(nar_buf_reader, Level::Default); + InspectReader::new(compressed_nar_reader, |x| compressed_nar_hasher.update(x)) +} + +fn nar_url(compressed_nar_hash: &[u8]) -> String { + format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash)) +} + +async fn read_buf_nar(stream: &mut S, mut buf: BytesMut) -> Result { + while buf.len() < buf.capacity() { + let n = read_buf(stream, &mut buf).await?; + + if n == 0 { + break; + } + } + Ok(buf.freeze()) }