use cpp bindings to make nar

This commit is contained in:
cy 2025-04-26 21:08:01 -04:00
parent 0fedae9334
commit 85fefe9e77
Signed by: cy
SSH key fingerprint: SHA256:o/geVWV4om1QhUSkKvDQeW/eAihwnjyXkqMwrVdbuts
7 changed files with 66 additions and 50 deletions

View file

@ -23,6 +23,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::Result;
use bytes::Bytes;
use futures::stream::{Stream, StreamExt};
use tokio::io::{AsyncWrite, AsyncWriteExt};
@ -125,7 +126,7 @@ impl AsyncWriteAdapter {
writer.write_all(&v).await?;
}
Err(e) => {
return Err(e);
return Err(e.into());
}
}
}
@ -139,7 +140,7 @@ impl AsyncWriteAdapter {
}
impl Stream for AsyncWriteAdapter {
type Item = Result<Vec<u8>>;
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.receiver.poll_recv(cx) {
@ -147,9 +148,12 @@ impl Stream for AsyncWriteAdapter {
Poll::Ready(Some(message)) => {
use AsyncWriteMessage::*;
match message {
Data(v) => Poll::Ready(Some(Ok(v))),
Data(v) => Poll::Ready(Some(Ok(v.into()))),
Error(exception) => {
let error = anyhow::Error::msg(format!("cxx error: {exception}"));
let error = std::io::Error::new(
io::ErrorKind::Other,
format!("cxx error: {exception}"),
);
Poll::Ready(Some(Err(error)))
}
Eof => {
@ -160,7 +164,7 @@ impl Stream for AsyncWriteAdapter {
}
Poll::Ready(None) => {
if !self.eof {
Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe).into())))
Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe))))
} else {
Poll::Ready(None)
}
@ -208,6 +212,13 @@ mod ffi {
include_derivers: bool,
) -> Result<UniquePtr<CxxVector<CxxString>>>;
/// Creates a NAR dump from a path.
fn nar_from_path(
self: Pin<&mut CNixStore>,
base_name: Vec<u8>,
sender: Box<AsyncWriteSender>,
) -> Result<()>;
/// Obtains a handle to the Nix store.
fn open_nix_store() -> Result<UniquePtr<CNixStore>>;

View file

@ -108,6 +108,17 @@ std::unique_ptr<std::vector<std::string>> CNixStore::compute_fs_closure(RBasePat
return std::make_unique<std::vector<std::string>>(result);
}
void CNixStore::nar_from_path(RVec<unsigned char> base_name, RBox<AsyncWriteSender> sender) {
RustSink sink(std::move(sender));
std::string_view sv((const char *)base_name.data(), base_name.size());
nix::StorePath store_path(sv);
// exceptions will be thrown into Rust
this->store->narFromPath(store_path, sink);
sink.eof();
}
std::unique_ptr<CNixStore> open_nix_store() {
return std::make_unique<CNixStore>();
}

View file

@ -79,6 +79,7 @@ public:
bool flip_direction,
bool include_outputs,
bool include_derivers);
void nar_from_path(RVec<unsigned char> base_name, RBox<AsyncWriteSender> sender);
};
std::unique_ptr<CNixStore> open_nix_store();

View file

@ -1,25 +1,20 @@
use anyhow::{Context, Result};
use anyhow::Result;
use async_compression::{Level, tokio::bufread::ZstdEncoder};
use nix_compat::{
narinfo::{self, NarInfo},
store_path::StorePath,
};
use sha2::{Digest, Sha256};
use std::mem::take;
use tempfile::{NamedTempFile, TempPath};
use tokio::{
fs::File,
io::{AsyncRead, BufReader},
process::Command,
};
use std::{mem::take, sync::Arc};
use tokio::io::{AsyncRead, BufReader};
use tokio_util::io::InspectReader;
use tracing::debug;
use crate::path_info::PathInfo;
use crate::store::Store;
pub struct MakeNar<'a> {
path_info: &'a PathInfo,
nar_file: TempPath,
store: Arc<Store>,
nar_hasher: Sha256,
/// hash of compressed nar file
file_hasher: Sha256,
@ -28,12 +23,10 @@ pub struct MakeNar<'a> {
}
impl<'a> MakeNar<'a> {
pub fn new(path_info: &'a PathInfo) -> Result<Self> {
pub fn new(path_info: &'a PathInfo, store: Arc<Store>) -> Result<Self> {
Ok(Self {
path_info,
nar_file: NamedTempFile::new()
.context("create tempfile for nar")?
.into_temp_path(),
store,
nar_hasher: Sha256::new(),
file_hasher: Sha256::new(),
nar_size: 0,
@ -41,32 +34,12 @@ impl<'a> MakeNar<'a> {
})
}
pub async fn make(&self) -> Result<()> {
let path = self.path_info.absolute_path();
let out = std::fs::File::options()
.write(true)
.truncate(true)
.open(&self.nar_file)?;
debug!("dumping nar for: {}", path);
Ok(Command::new("nix")
.arg("nar")
.arg("pack")
.arg(self.path_info.absolute_path())
.kill_on_drop(true)
.stdout(out)
.spawn()?
.wait()
.await?
.exit_ok()?)
}
/// Returns a compressed nar reader which can be uploaded. File hash will be available when
/// everything is read
pub async fn compress_and_hash(&mut self) -> Result<impl AsyncRead> {
let nar_file = File::open(&self.nar_file).await?;
pub fn compress_and_hash(&mut self) -> Result<impl AsyncRead> {
let nar_reader = self.store.nar_from_path(self.path_info.path.clone());
// reader that hashes as nar is read
let nar_reader = InspectReader::new(nar_file, |x| {
let nar_reader = InspectReader::new(nar_reader, |x| {
self.nar_size += x.len() as u64;
self.nar_hasher.update(x);
});

View file

@ -169,8 +169,9 @@ impl Push {
uploads.push(tokio::spawn({
let s3 = self.s3.clone();
let store = self.store.clone();
async move {
let res = uploader.upload(s3).await;
let res = uploader.upload(s3, store).await;
drop(permit);
self.upload_count.fetch_add(1, Ordering::Relaxed);
res

View file

@ -2,9 +2,13 @@ use std::{ffi::OsStr, os::unix::ffi::OsStrExt, sync::Arc};
use anyhow::{Context, Result};
use nix_compat::store_path::StorePath;
use tokio::task;
use tokio::{io::AsyncRead, task};
use tokio_util::io::StreamReader;
use crate::{bindings, path_info::PathInfo};
use crate::{
bindings::{self, AsyncWriteAdapter},
path_info::PathInfo,
};
pub struct Store {
inner: Arc<bindings::FfiNixStore>,
@ -75,4 +79,20 @@ impl Store {
.await
.unwrap()
}
pub fn nar_from_path(&self, store_path: StorePath<String>) -> impl AsyncRead {
let inner = self.inner.clone();
let (adapter, mut sender) = AsyncWriteAdapter::new();
let base_name = store_path.to_string().as_bytes().to_vec();
tokio::task::spawn_blocking(move || {
// Send all exceptions through the channel, and ignore errors
// during sending (the channel may have been closed).
if let Err(e) = inner.store().nar_from_path(base_name, sender.clone()) {
let _ = sender.rust_error(e);
}
});
StreamReader::new(adapter)
}
}

View file

@ -7,7 +7,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{debug, trace};
use ulid::Ulid;
use crate::{make_nar::MakeNar, path_info::PathInfo};
use crate::{make_nar::MakeNar, path_info::PathInfo, store::Store};
const CHUNK_SIZE: usize = 1024 * 1024 * 5;
@ -24,9 +24,8 @@ impl<'a> Uploader<'a> {
Ok(Self { signing_key, path })
}
pub async fn upload(&self, s3: Arc<AmazonS3>) -> Result<()> {
let mut nar = MakeNar::new(&self.path)?;
nar.make().await?;
pub async fn upload(&self, s3: Arc<AmazonS3>, store: Arc<Store>) -> Result<()> {
let mut nar = MakeNar::new(&self.path, store)?;
// we don't know what the hash of the compressed file will be so upload to a
// temp location for now
@ -35,7 +34,7 @@ impl<'a> Uploader<'a> {
debug!("uploading to temp path: {}", temp_path);
// compress and upload nar
let mut file_reader = nar.compress_and_hash().await?;
let mut file_reader = nar.compress_and_hash()?;
loop {
let mut buf = BytesMut::with_capacity(CHUNK_SIZE);
let n = file_reader.read_buf(&mut buf).await?;