From 85fefe9e7778423820860d1a267869f3b7295294 Mon Sep 17 00:00:00 2001 From: cy Date: Sat, 26 Apr 2025 21:08:01 -0400 Subject: [PATCH] use cpp bindings to make nar --- src/bindings/mod.rs | 21 +++++++++++++++----- src/bindings/nix.cpp | 11 +++++++++++ src/bindings/nix.hpp | 1 + src/make_nar.rs | 47 ++++++++++---------------------------------- src/push.rs | 3 ++- src/store.rs | 24 ++++++++++++++++++++-- src/uploader.rs | 9 ++++----- 7 files changed, 66 insertions(+), 50 deletions(-) diff --git a/src/bindings/mod.rs b/src/bindings/mod.rs index 61a32af..8084dff 100644 --- a/src/bindings/mod.rs +++ b/src/bindings/mod.rs @@ -23,6 +23,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use anyhow::Result; +use bytes::Bytes; use futures::stream::{Stream, StreamExt}; use tokio::io::{AsyncWrite, AsyncWriteExt}; @@ -125,7 +126,7 @@ impl AsyncWriteAdapter { writer.write_all(&v).await?; } Err(e) => { - return Err(e); + return Err(e.into()); } } } @@ -139,7 +140,7 @@ impl AsyncWriteAdapter { } impl Stream for AsyncWriteAdapter { - type Item = Result>; + type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.receiver.poll_recv(cx) { @@ -147,9 +148,12 @@ impl Stream for AsyncWriteAdapter { Poll::Ready(Some(message)) => { use AsyncWriteMessage::*; match message { - Data(v) => Poll::Ready(Some(Ok(v))), + Data(v) => Poll::Ready(Some(Ok(v.into()))), Error(exception) => { - let error = anyhow::Error::msg(format!("cxx error: {exception}")); + let error = std::io::Error::new( + io::ErrorKind::Other, + format!("cxx error: {exception}"), + ); Poll::Ready(Some(Err(error))) } Eof => { @@ -160,7 +164,7 @@ impl Stream for AsyncWriteAdapter { } Poll::Ready(None) => { if !self.eof { - Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe).into()))) + Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe)))) } else { Poll::Ready(None) } @@ -208,6 +212,13 @@ mod ffi { include_derivers: bool, ) -> Result>>; + /// Creates a NAR dump from a path. + fn nar_from_path( + self: Pin<&mut CNixStore>, + base_name: Vec, + sender: Box, + ) -> Result<()>; + /// Obtains a handle to the Nix store. fn open_nix_store() -> Result>; diff --git a/src/bindings/nix.cpp b/src/bindings/nix.cpp index 3914de1..326e878 100644 --- a/src/bindings/nix.cpp +++ b/src/bindings/nix.cpp @@ -108,6 +108,17 @@ std::unique_ptr> CNixStore::compute_fs_closure(RBasePat return std::make_unique>(result); } +void CNixStore::nar_from_path(RVec base_name, RBox sender) { + RustSink sink(std::move(sender)); + + std::string_view sv((const char *)base_name.data(), base_name.size()); + nix::StorePath store_path(sv); + + // exceptions will be thrown into Rust + this->store->narFromPath(store_path, sink); + sink.eof(); +} + std::unique_ptr open_nix_store() { return std::make_unique(); } diff --git a/src/bindings/nix.hpp b/src/bindings/nix.hpp index 9f4964a..5c79a33 100644 --- a/src/bindings/nix.hpp +++ b/src/bindings/nix.hpp @@ -79,6 +79,7 @@ public: bool flip_direction, bool include_outputs, bool include_derivers); + void nar_from_path(RVec base_name, RBox sender); }; std::unique_ptr open_nix_store(); diff --git a/src/make_nar.rs b/src/make_nar.rs index 19a112b..b5c1ab2 100644 --- a/src/make_nar.rs +++ b/src/make_nar.rs @@ -1,25 +1,20 @@ -use anyhow::{Context, Result}; +use anyhow::Result; use async_compression::{Level, tokio::bufread::ZstdEncoder}; use nix_compat::{ narinfo::{self, NarInfo}, store_path::StorePath, }; use sha2::{Digest, Sha256}; -use std::mem::take; -use tempfile::{NamedTempFile, TempPath}; -use tokio::{ - fs::File, - io::{AsyncRead, BufReader}, - process::Command, -}; +use std::{mem::take, sync::Arc}; +use tokio::io::{AsyncRead, BufReader}; use tokio_util::io::InspectReader; -use tracing::debug; use crate::path_info::PathInfo; +use crate::store::Store; pub struct MakeNar<'a> { path_info: &'a PathInfo, - nar_file: TempPath, + store: Arc, nar_hasher: Sha256, /// hash of compressed nar file file_hasher: Sha256, @@ -28,12 +23,10 @@ pub struct MakeNar<'a> { } impl<'a> MakeNar<'a> { - pub fn new(path_info: &'a PathInfo) -> Result { + pub fn new(path_info: &'a PathInfo, store: Arc) -> Result { Ok(Self { path_info, - nar_file: NamedTempFile::new() - .context("create tempfile for nar")? - .into_temp_path(), + store, nar_hasher: Sha256::new(), file_hasher: Sha256::new(), nar_size: 0, @@ -41,32 +34,12 @@ impl<'a> MakeNar<'a> { }) } - pub async fn make(&self) -> Result<()> { - let path = self.path_info.absolute_path(); - let out = std::fs::File::options() - .write(true) - .truncate(true) - .open(&self.nar_file)?; - - debug!("dumping nar for: {}", path); - Ok(Command::new("nix") - .arg("nar") - .arg("pack") - .arg(self.path_info.absolute_path()) - .kill_on_drop(true) - .stdout(out) - .spawn()? - .wait() - .await? - .exit_ok()?) - } - /// Returns a compressed nar reader which can be uploaded. File hash will be available when /// everything is read - pub async fn compress_and_hash(&mut self) -> Result { - let nar_file = File::open(&self.nar_file).await?; + pub fn compress_and_hash(&mut self) -> Result { + let nar_reader = self.store.nar_from_path(self.path_info.path.clone()); // reader that hashes as nar is read - let nar_reader = InspectReader::new(nar_file, |x| { + let nar_reader = InspectReader::new(nar_reader, |x| { self.nar_size += x.len() as u64; self.nar_hasher.update(x); }); diff --git a/src/push.rs b/src/push.rs index 93a28da..57a43d2 100644 --- a/src/push.rs +++ b/src/push.rs @@ -169,8 +169,9 @@ impl Push { uploads.push(tokio::spawn({ let s3 = self.s3.clone(); + let store = self.store.clone(); async move { - let res = uploader.upload(s3).await; + let res = uploader.upload(s3, store).await; drop(permit); self.upload_count.fetch_add(1, Ordering::Relaxed); res diff --git a/src/store.rs b/src/store.rs index 763cb3d..7589e94 100644 --- a/src/store.rs +++ b/src/store.rs @@ -2,9 +2,13 @@ use std::{ffi::OsStr, os::unix::ffi::OsStrExt, sync::Arc}; use anyhow::{Context, Result}; use nix_compat::store_path::StorePath; -use tokio::task; +use tokio::{io::AsyncRead, task}; +use tokio_util::io::StreamReader; -use crate::{bindings, path_info::PathInfo}; +use crate::{ + bindings::{self, AsyncWriteAdapter}, + path_info::PathInfo, +}; pub struct Store { inner: Arc, @@ -75,4 +79,20 @@ impl Store { .await .unwrap() } + + pub fn nar_from_path(&self, store_path: StorePath) -> impl AsyncRead { + let inner = self.inner.clone(); + let (adapter, mut sender) = AsyncWriteAdapter::new(); + let base_name = store_path.to_string().as_bytes().to_vec(); + + tokio::task::spawn_blocking(move || { + // Send all exceptions through the channel, and ignore errors + // during sending (the channel may have been closed). + if let Err(e) = inner.store().nar_from_path(base_name, sender.clone()) { + let _ = sender.rust_error(e); + } + }); + + StreamReader::new(adapter) + } } diff --git a/src/uploader.rs b/src/uploader.rs index a2c2766..c829a79 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -7,7 +7,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tracing::{debug, trace}; use ulid::Ulid; -use crate::{make_nar::MakeNar, path_info::PathInfo}; +use crate::{make_nar::MakeNar, path_info::PathInfo, store::Store}; const CHUNK_SIZE: usize = 1024 * 1024 * 5; @@ -24,9 +24,8 @@ impl<'a> Uploader<'a> { Ok(Self { signing_key, path }) } - pub async fn upload(&self, s3: Arc) -> Result<()> { - let mut nar = MakeNar::new(&self.path)?; - nar.make().await?; + pub async fn upload(&self, s3: Arc, store: Arc) -> Result<()> { + let mut nar = MakeNar::new(&self.path, store)?; // we don't know what the hash of the compressed file will be so upload to a // temp location for now @@ -35,7 +34,7 @@ impl<'a> Uploader<'a> { debug!("uploading to temp path: {}", temp_path); // compress and upload nar - let mut file_reader = nar.compress_and_hash().await?; + let mut file_reader = nar.compress_and_hash()?; loop { let mut buf = BytesMut::with_capacity(CHUNK_SIZE); let n = file_reader.read_buf(&mut buf).await?;