diff --git a/Cargo.lock b/Cargo.lock index 9fa7a11..2608cf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2228,6 +2228,7 @@ dependencies = [ "async-compression", "aws-config", "aws-sdk-s3", + "bytes", "clap", "console-subscriber", "cxx", @@ -2241,7 +2242,10 @@ dependencies = [ "serde", "serde_json", "sha2", + "tempfile", "tokio", + "tokio-stream", + "tokio-util", "tracing", "url", ] diff --git a/Cargo.toml b/Cargo.toml index ee646cf..2430fdd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,10 @@ tracing = "0.1.41" url = { version = "2.5.4", features = [ "serde" ]} cxx = "1.0" console-subscriber = "0.4.1" +bytes = "1.10.1" +tokio-stream = { version = "0.1.17", features = ["fs"] } +tempfile = "3.19.1" +tokio-util = { version = "0.7.14", features = ["io"] } [build-dependencies] cxx-build = "1.0" diff --git a/src/store.rs b/src/store.rs index 9aa5b44..0433362 100644 --- a/src/store.rs +++ b/src/store.rs @@ -79,7 +79,7 @@ impl Store { .unwrap() } - pub fn make_nar(&self, path: StorePath) -> AsyncWriteAdapter { + pub fn stream_nar(&self, path: StorePath) -> AsyncWriteAdapter { let inner = self.inner.clone(); let (adapter, mut sender) = AsyncWriteAdapter::new(); diff --git a/src/uploader.rs b/src/uploader.rs index f77a3d3..390c3ac 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -1,20 +1,30 @@ +use std::{collections::BTreeMap, os::unix::fs::PermissionsExt, path::PathBuf}; + use anyhow::Result; use async_compression::{Level, tokio::bufread::ZstdEncoder}; use aws_sdk_s3::{ self as s3, types::{CompletedMultipartUpload, CompletedPart}, }; -use futures::future::join_all; +use bytes::{BufMut, Bytes, BytesMut}; +use futures::{future::join_all, stream::TryStreamExt}; use nix_compat::{ + nar::writer::r#async as nar, narinfo::{self, NarInfo, SigningKey}, nixbase32, store_path::StorePath, }; use sha2::{Digest, Sha256}; -use tokio::{io::AsyncReadExt, process::Command}; +use tokio::{ + fs::{File, read_dir, read_link}, + io::{AsyncRead, BufReader}, + pin, +}; +use tokio_stream::wrappers::ReadDirStream; +use tokio_util::io::InspectReader; use tracing::debug; -use crate::path_info::PathInfo; +use crate::{bindings::AsyncWriteAdapter, path_info::PathInfo, store::Store}; const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5; @@ -23,7 +33,7 @@ pub struct Uploader<'a> { path: PathInfo, s3_client: &'a s3::Client, bucket: String, - hash: Sha256, + store: &'a Store, } impl<'a> Uploader<'a> { @@ -32,38 +42,28 @@ impl<'a> Uploader<'a> { path: PathInfo, s3_client: &'a s3::Client, bucket: String, + store: &'a Store, ) -> Result { Ok(Self { signing_key, path, s3_client, bucket, - hash: Sha256::new(), + store, }) } pub async fn upload(&self) -> Result<()> { - let nar = self.make_nar().await?; - let mut nar_info = self.narinfo_from_nar(&nar)?; - let nar = self.compress_nar(&nar).await; + let mut nar_temp = File::open(tempfile::Builder::new().tempfile()?.path()).await?; + self.make_nar(&mut nar_temp); - // update fields that we know after compression - let mut hasher = Sha256::new(); - hasher.update(&nar); - let hash: [u8; 32] = hasher.finalize().into(); - let nar_url = self.nar_url(&hash); - nar_info.file_hash = Some(hash); - nar_info.file_size = Some(nar.len() as u64); - nar_info.url = nar_url.as_str(); - debug!("uploading nar with key: {nar_url}"); - - if nar.len() < MULTIPART_CUTOFF { + if first_chunk.len() < MULTIPART_CUTOFF { let put_object = self .s3_client .put_object() .bucket(&self.bucket) .key(&nar_url) - .body(nar.into()) + .body(first_chunk.into()) .send() .await?; debug!("put object: {:#?}", put_object); @@ -164,17 +164,72 @@ impl<'a> Uploader<'a> { } fn nar_url(&self, compressed_nar_hash: &[u8]) -> String { - let compressed_nar_hash = nixbase32::encode(compressed_nar_hash); - format!("nar/{compressed_nar_hash}.nar.zst") + format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash)) } - async fn compress_nar(&self, nar: &[u8]) -> Vec { - let mut encoder = ZstdEncoder::with_quality(nar, Level::Default); - let mut compressed = Vec::with_capacity(nar.len()); - encoder - .read_to_end(&mut compressed) - .await - .expect("should compress just fine"); - compressed + async fn make_nar(&self, sink: &mut File) -> Result<()> { + let nar = nar::open(sink).await?; + let path = self.path.absolute_path(); + let metadata = File::open(&path).await?.metadata().await?; + + if metadata.is_symlink() { + let target = read_link(&path).await?; + nar.symlink(target.as_os_str().as_encoded_bytes()).await; + } else if metadata.is_dir() { + let mut nar = nar.directory().await?; + nar_from_dir(path.into(), &mut nar).await; + nar.close().await; + } else if metadata.is_file() { + let perms = metadata.permissions().mode(); + let mut executable = false; + if (perms & 0o700) == 0o700 { + executable = true; + } + + let mut file = BufReader::new(File::open(&path).await?); + nar.file(executable, metadata.len(), &mut file).await; + } + + Ok(()) } } + +async fn nar_from_dir(path: PathBuf, node: &mut nar::Directory<'_, '_>) -> Result<()> { + let root = ReadDirStream::new(read_dir(&path).await?); + let entries = root + .map_ok(|x| (x.file_name(), x)) + .try_collect::>() + .await?; + + // directory entries must be written in ascending order of name + for (name, entry) in entries.iter() { + let node = node.entry(name.as_encoded_bytes()).await?; + let metadata = entry.metadata().await?; + + if metadata.is_symlink() { + let target = read_link(entry.path()).await?; + node.symlink(target.as_os_str().as_encoded_bytes()).await; + } else if metadata.is_dir() { + let mut node = node.directory().await?; + Box::pin(nar_from_dir(entry.path(), &mut node)).await; + node.close().await; + } else if metadata.is_file() { + let perms = metadata.permissions().mode(); + let mut executable = false; + if (perms & 0o700) == 0o700 { + executable = true; + } + + let mut file = BufReader::new(File::open(entry.path()).await?); + node.file(executable, metadata.len(), &mut file).await; + } + } + Ok(()) +} + +async fn compress_and_hash_nar(nar: File, nar_hasher: &mut Sha256) -> impl AsyncRead { + let nar_reader = InspectReader::new(nar, |x| nar_hasher.update(x)); + let nar_buf_reader = BufReader::new(nar_reader); + + ZstdEncoder::with_quality(nar_buf_reader, Level::Default) +}