diff --git a/Cargo.lock b/Cargo.lock index 9fa7a11..2608cf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2228,6 +2228,7 @@ dependencies = [ "async-compression", "aws-config", "aws-sdk-s3", + "bytes", "clap", "console-subscriber", "cxx", @@ -2241,7 +2242,10 @@ dependencies = [ "serde", "serde_json", "sha2", + "tempfile", "tokio", + "tokio-stream", + "tokio-util", "tracing", "url", ] diff --git a/Cargo.toml b/Cargo.toml index ee646cf..2430fdd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,10 @@ tracing = "0.1.41" url = { version = "2.5.4", features = [ "serde" ]} cxx = "1.0" console-subscriber = "0.4.1" +bytes = "1.10.1" +tokio-stream = { version = "0.1.17", features = ["fs"] } +tempfile = "3.19.1" +tokio-util = { version = "0.7.14", features = ["io"] } [build-dependencies] cxx-build = "1.0" diff --git a/src/bindings/mod.rs b/src/bindings/mod.rs index 61a32af..701a15e 100644 --- a/src/bindings/mod.rs +++ b/src/bindings/mod.rs @@ -211,6 +211,13 @@ mod ffi { /// Obtains a handle to the Nix store. fn open_nix_store() -> Result>; + /// Creates a NAR dump from a path. + fn nar_from_path( + self: Pin<&mut CNixStore>, + base_name: Vec, + sender: Box, + ) -> Result<()>; + // ========= // CPathInfo // ========= diff --git a/src/bindings/nix.cpp b/src/bindings/nix.cpp index 3914de1..326e878 100644 --- a/src/bindings/nix.cpp +++ b/src/bindings/nix.cpp @@ -108,6 +108,17 @@ std::unique_ptr> CNixStore::compute_fs_closure(RBasePat return std::make_unique>(result); } +void CNixStore::nar_from_path(RVec base_name, RBox sender) { + RustSink sink(std::move(sender)); + + std::string_view sv((const char *)base_name.data(), base_name.size()); + nix::StorePath store_path(sv); + + // exceptions will be thrown into Rust + this->store->narFromPath(store_path, sink); + sink.eof(); +} + std::unique_ptr open_nix_store() { return std::make_unique(); } diff --git a/src/bindings/nix.hpp b/src/bindings/nix.hpp index 9f4964a..5c79a33 100644 --- a/src/bindings/nix.hpp +++ b/src/bindings/nix.hpp @@ -79,6 +79,7 @@ public: bool flip_direction, bool include_outputs, bool include_derivers); + void nar_from_path(RVec base_name, RBox sender); }; std::unique_ptr open_nix_store(); diff --git a/src/push.rs b/src/push.rs index 8177864..18f74b9 100644 --- a/src/push.rs +++ b/src/push.rs @@ -160,7 +160,7 @@ impl Push { } async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { - let mut uploads = Vec::with_capacity(10); + let mut uploads = Vec::new(); loop { if let Some(path_to_upload) = rx.recv().await { diff --git a/src/store.rs b/src/store.rs index 4499243..0433362 100644 --- a/src/store.rs +++ b/src/store.rs @@ -4,7 +4,10 @@ use anyhow::{Context, Result}; use nix_compat::store_path::StorePath; use tokio::task; -use crate::{bindings, path_info::PathInfo}; +use crate::{ + bindings::{self, AsyncWriteAdapter}, + path_info::PathInfo, +}; pub struct Store { inner: Arc, @@ -75,4 +78,19 @@ impl Store { .await .unwrap() } + + pub fn stream_nar(&self, path: StorePath) -> AsyncWriteAdapter { + let inner = self.inner.clone(); + let (adapter, mut sender) = AsyncWriteAdapter::new(); + + task::spawn_blocking(move || { + if let Err(e) = inner + .store() + .nar_from_path(path.to_string().as_bytes().to_vec(), sender.clone()) + { + let _ = sender.rust_error(e); + } + }); + adapter + } } diff --git a/src/uploader.rs b/src/uploader.rs index eb955a2..8f4efaa 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -1,20 +1,30 @@ +use std::{collections::BTreeMap, os::unix::fs::PermissionsExt, path::PathBuf}; + use anyhow::Result; use async_compression::{Level, tokio::bufread::ZstdEncoder}; use aws_sdk_s3::{ self as s3, types::{CompletedMultipartUpload, CompletedPart}, }; -use futures::future::join_all; +use bytes::{BufMut, Bytes, BytesMut}; +use futures::{future::join_all, stream::TryStreamExt}; use nix_compat::{ + nar::writer::r#async as nar, narinfo::{self, NarInfo, SigningKey}, nixbase32, store_path::StorePath, }; use sha2::{Digest, Sha256}; -use tokio::{io::AsyncReadExt, process::Command}; +use tokio::{ + fs::{File, read_dir, read_link}, + io::{AsyncRead, BufReader}, + pin, +}; +use tokio_stream::wrappers::ReadDirStream; +use tokio_util::io::{InspectReader, read_buf}; use tracing::debug; -use crate::path_info::PathInfo; +use crate::{bindings::AsyncWriteAdapter, path_info::PathInfo, store::Store}; const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5; @@ -23,7 +33,7 @@ pub struct Uploader<'a> { path: PathInfo, s3_client: &'a s3::Client, bucket: String, - hash: Sha256, + store: &'a Store, } impl<'a> Uploader<'a> { @@ -32,38 +42,37 @@ impl<'a> Uploader<'a> { path: PathInfo, s3_client: &'a s3::Client, bucket: String, + store: &'a Store, ) -> Result { Ok(Self { signing_key, path, s3_client, bucket, - hash: Sha256::new(), + store, }) } pub async fn upload(&self) -> Result<()> { - let nar = self.make_nar().await?; - let mut nar_info = self.narinfo_from_nar(&nar)?; - let nar = self.compress_nar(&nar).await; + let mut nar_temp = File::open(tempfile::Builder::new().tempfile()?.path()).await?; + self.make_nar(&mut nar_temp).await; - // update fields that we know after compression - let mut hasher = Sha256::new(); - hasher.update(&nar); - let hash: [u8; 32] = hasher.finalize().into(); - let nar_url = self.nar_url(&hash); - nar_info.file_hash = Some(hash); - nar_info.file_size = Some(nar.len() as u64); - nar_info.url = nar_url.as_str(); - debug!("uploading nar with key: {nar_url}"); + // this goes to the .narinfo file + let mut nar_hasher = Sha256::new(); + // this is the URL for file .narinfo points to + let mut file_hasher = Sha256::new(); + let nar_reader = compress_and_hash_nar(nar_temp, &mut nar_hasher, &mut file_hasher); - if nar.len() < MULTIPART_CUTOFF { + let buf = BytesMut::with_capacity(MULTIPART_CUTOFF); + let + + if first_chunk.len() < MULTIPART_CUTOFF { let put_object = self .s3_client .put_object() .bucket(&self.bucket) .key(&nar_url) - .body(nar.into()) + .body(first_chunk.into()) .send() .await?; debug!("put object: {:#?}", put_object); @@ -139,16 +148,6 @@ impl<'a> Uploader<'a> { Ok(()) } - async fn make_nar(&self) -> Result> { - Ok(Command::new("nix") - .arg("nar") - .arg("dump-path") - .arg(self.path.absolute_path()) - .output() - .await? - .stdout) - } - fn narinfo_from_nar(&self, nar: &[u8]) -> Result { let mut hasher = Sha256::new(); hasher.update(nar); @@ -173,18 +172,89 @@ impl<'a> Uploader<'a> { Ok(nar_info) } - fn nar_url(&self, compressed_nar_hash: &[u8]) -> String { - let compressed_nar_hash = nixbase32::encode(compressed_nar_hash); - format!("nar/{compressed_nar_hash}.nar.zst") - } + async fn make_nar(&self, sink: &mut File) -> Result<()> { + let nar = nar::open(sink).await?; + let path = self.path.absolute_path(); + let metadata = File::open(&path).await?.metadata().await?; - async fn compress_nar(&self, nar: &[u8]) -> Vec { - let mut encoder = ZstdEncoder::with_quality(nar, Level::Default); - let mut compressed = Vec::with_capacity(nar.len()); - encoder - .read_to_end(&mut compressed) - .await - .expect("should compress just fine"); - compressed + if metadata.is_symlink() { + let target = read_link(&path).await?; + nar.symlink(target.as_os_str().as_encoded_bytes()).await; + } else if metadata.is_dir() { + let mut nar = nar.directory().await?; + nar_from_dir(path.into(), &mut nar).await; + nar.close().await; + } else if metadata.is_file() { + let perms = metadata.permissions().mode(); + let mut executable = false; + if (perms & 0o700) == 0o700 { + executable = true; + } + + let mut file = BufReader::new(File::open(&path).await?); + nar.file(executable, metadata.len(), &mut file).await; + } + + Ok(()) } } + +async fn nar_from_dir(path: PathBuf, node: &mut nar::Directory<'_, '_>) -> Result<()> { + let root = ReadDirStream::new(read_dir(&path).await?); + let entries = root + .map_ok(|x| (x.file_name(), x)) + .try_collect::>() + .await?; + + // directory entries must be written in ascending order of name + for (name, entry) in entries.iter() { + let node = node.entry(name.as_encoded_bytes()).await?; + let metadata = entry.metadata().await?; + + if metadata.is_symlink() { + let target = read_link(entry.path()).await?; + node.symlink(target.as_os_str().as_encoded_bytes()).await; + } else if metadata.is_dir() { + let mut node = node.directory().await?; + Box::pin(nar_from_dir(entry.path(), &mut node)).await; + node.close().await; + } else if metadata.is_file() { + let perms = metadata.permissions().mode(); + let mut executable = false; + if (perms & 0o700) == 0o700 { + executable = true; + } + + let mut file = BufReader::new(File::open(entry.path()).await?); + node.file(executable, metadata.len(), &mut file).await; + } + } + Ok(()) +} + +fn compress_and_hash_nar( + nar_file: File, + nar_hasher: &mut Sha256, + compressed_nar_hasher: &mut Sha256, +) -> impl AsyncRead { + let nar_reader = InspectReader::new(nar_file, |x| nar_hasher.update(x)); + let nar_buf_reader = BufReader::new(nar_reader); + + let compressed_nar_reader = ZstdEncoder::with_quality(nar_buf_reader, Level::Default); + InspectReader::new(compressed_nar_reader, |x| compressed_nar_hasher.update(x)) +} + +fn nar_url(compressed_nar_hash: &[u8]) -> String { + format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash)) +} + +async fn read_buf_nar(stream: &mut S, mut buf: BytesMut) -> Result { + while buf.len() < buf.capacity() { + let n = read_buf(stream, &mut buf).await?; + + if n == 0 { + break; + } + } + Ok(buf.freeze()) +}