Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
cy
ac4b2ba136
fancy nar cooking 2025-04-26 00:39:22 -04:00
cy
4808671071
some progress on using nix-compat for nar creation 2025-04-18 21:23:52 -04:00
cy
a17fa92c78
some progress at an attempt 2025-04-18 00:50:21 -04:00
8 changed files with 158 additions and 43 deletions

4
Cargo.lock generated
View file

@ -2228,6 +2228,7 @@ dependencies = [
"async-compression", "async-compression",
"aws-config", "aws-config",
"aws-sdk-s3", "aws-sdk-s3",
"bytes",
"clap", "clap",
"console-subscriber", "console-subscriber",
"cxx", "cxx",
@ -2241,7 +2242,10 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"sha2", "sha2",
"tempfile",
"tokio", "tokio",
"tokio-stream",
"tokio-util",
"tracing", "tracing",
"url", "url",
] ]

View file

@ -22,6 +22,10 @@ tracing = "0.1.41"
url = { version = "2.5.4", features = [ "serde" ]} url = { version = "2.5.4", features = [ "serde" ]}
cxx = "1.0" cxx = "1.0"
console-subscriber = "0.4.1" console-subscriber = "0.4.1"
bytes = "1.10.1"
tokio-stream = { version = "0.1.17", features = ["fs"] }
tempfile = "3.19.1"
tokio-util = { version = "0.7.14", features = ["io"] }
[build-dependencies] [build-dependencies]
cxx-build = "1.0" cxx-build = "1.0"

View file

@ -211,6 +211,13 @@ mod ffi {
/// Obtains a handle to the Nix store. /// Obtains a handle to the Nix store.
fn open_nix_store() -> Result<UniquePtr<CNixStore>>; fn open_nix_store() -> Result<UniquePtr<CNixStore>>;
/// Creates a NAR dump from a path.
fn nar_from_path(
self: Pin<&mut CNixStore>,
base_name: Vec<u8>,
sender: Box<AsyncWriteSender>,
) -> Result<()>;
// ========= // =========
// CPathInfo // CPathInfo
// ========= // =========

View file

@ -108,6 +108,17 @@ std::unique_ptr<std::vector<std::string>> CNixStore::compute_fs_closure(RBasePat
return std::make_unique<std::vector<std::string>>(result); return std::make_unique<std::vector<std::string>>(result);
} }
void CNixStore::nar_from_path(RVec<unsigned char> base_name, RBox<AsyncWriteSender> sender) {
RustSink sink(std::move(sender));
std::string_view sv((const char *)base_name.data(), base_name.size());
nix::StorePath store_path(sv);
// exceptions will be thrown into Rust
this->store->narFromPath(store_path, sink);
sink.eof();
}
std::unique_ptr<CNixStore> open_nix_store() { std::unique_ptr<CNixStore> open_nix_store() {
return std::make_unique<CNixStore>(); return std::make_unique<CNixStore>();
} }

View file

@ -79,6 +79,7 @@ public:
bool flip_direction, bool flip_direction,
bool include_outputs, bool include_outputs,
bool include_derivers); bool include_derivers);
void nar_from_path(RVec<unsigned char> base_name, RBox<AsyncWriteSender> sender);
}; };
std::unique_ptr<CNixStore> open_nix_store(); std::unique_ptr<CNixStore> open_nix_store();

View file

@ -160,7 +160,7 @@ impl Push {
} }
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> { async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
let mut uploads = Vec::with_capacity(10); let mut uploads = Vec::new();
loop { loop {
if let Some(path_to_upload) = rx.recv().await { if let Some(path_to_upload) = rx.recv().await {

View file

@ -4,7 +4,10 @@ use anyhow::{Context, Result};
use nix_compat::store_path::StorePath; use nix_compat::store_path::StorePath;
use tokio::task; use tokio::task;
use crate::{bindings, path_info::PathInfo}; use crate::{
bindings::{self, AsyncWriteAdapter},
path_info::PathInfo,
};
pub struct Store { pub struct Store {
inner: Arc<bindings::FfiNixStore>, inner: Arc<bindings::FfiNixStore>,
@ -75,4 +78,19 @@ impl Store {
.await .await
.unwrap() .unwrap()
} }
pub fn stream_nar(&self, path: StorePath<String>) -> AsyncWriteAdapter {
let inner = self.inner.clone();
let (adapter, mut sender) = AsyncWriteAdapter::new();
task::spawn_blocking(move || {
if let Err(e) = inner
.store()
.nar_from_path(path.to_string().as_bytes().to_vec(), sender.clone())
{
let _ = sender.rust_error(e);
}
});
adapter
}
} }

View file

@ -1,20 +1,30 @@
use std::{collections::BTreeMap, os::unix::fs::PermissionsExt, path::PathBuf};
use anyhow::Result; use anyhow::Result;
use async_compression::{Level, tokio::bufread::ZstdEncoder}; use async_compression::{Level, tokio::bufread::ZstdEncoder};
use aws_sdk_s3::{ use aws_sdk_s3::{
self as s3, self as s3,
types::{CompletedMultipartUpload, CompletedPart}, types::{CompletedMultipartUpload, CompletedPart},
}; };
use futures::future::join_all; use bytes::{BufMut, Bytes, BytesMut};
use futures::{future::join_all, stream::TryStreamExt};
use nix_compat::{ use nix_compat::{
nar::writer::r#async as nar,
narinfo::{self, NarInfo, SigningKey}, narinfo::{self, NarInfo, SigningKey},
nixbase32, nixbase32,
store_path::StorePath, store_path::StorePath,
}; };
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use tokio::{io::AsyncReadExt, process::Command}; use tokio::{
fs::{File, read_dir, read_link},
io::{AsyncRead, BufReader},
pin,
};
use tokio_stream::wrappers::ReadDirStream;
use tokio_util::io::{InspectReader, read_buf};
use tracing::debug; use tracing::debug;
use crate::path_info::PathInfo; use crate::{bindings::AsyncWriteAdapter, path_info::PathInfo, store::Store};
const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5; const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5;
@ -23,7 +33,7 @@ pub struct Uploader<'a> {
path: PathInfo, path: PathInfo,
s3_client: &'a s3::Client, s3_client: &'a s3::Client,
bucket: String, bucket: String,
hash: Sha256, store: &'a Store,
} }
impl<'a> Uploader<'a> { impl<'a> Uploader<'a> {
@ -32,38 +42,37 @@ impl<'a> Uploader<'a> {
path: PathInfo, path: PathInfo,
s3_client: &'a s3::Client, s3_client: &'a s3::Client,
bucket: String, bucket: String,
store: &'a Store,
) -> Result<Self> { ) -> Result<Self> {
Ok(Self { Ok(Self {
signing_key, signing_key,
path, path,
s3_client, s3_client,
bucket, bucket,
hash: Sha256::new(), store,
}) })
} }
pub async fn upload(&self) -> Result<()> { pub async fn upload(&self) -> Result<()> {
let nar = self.make_nar().await?; let mut nar_temp = File::open(tempfile::Builder::new().tempfile()?.path()).await?;
let mut nar_info = self.narinfo_from_nar(&nar)?; self.make_nar(&mut nar_temp).await;
let nar = self.compress_nar(&nar).await;
// update fields that we know after compression // this goes to the .narinfo file
let mut hasher = Sha256::new(); let mut nar_hasher = Sha256::new();
hasher.update(&nar); // this is the URL for file .narinfo points to
let hash: [u8; 32] = hasher.finalize().into(); let mut file_hasher = Sha256::new();
let nar_url = self.nar_url(&hash); let nar_reader = compress_and_hash_nar(nar_temp, &mut nar_hasher, &mut file_hasher);
nar_info.file_hash = Some(hash);
nar_info.file_size = Some(nar.len() as u64);
nar_info.url = nar_url.as_str();
debug!("uploading nar with key: {nar_url}");
if nar.len() < MULTIPART_CUTOFF { let buf = BytesMut::with_capacity(MULTIPART_CUTOFF);
let
if first_chunk.len() < MULTIPART_CUTOFF {
let put_object = self let put_object = self
.s3_client .s3_client
.put_object() .put_object()
.bucket(&self.bucket) .bucket(&self.bucket)
.key(&nar_url) .key(&nar_url)
.body(nar.into()) .body(first_chunk.into())
.send() .send()
.await?; .await?;
debug!("put object: {:#?}", put_object); debug!("put object: {:#?}", put_object);
@ -139,16 +148,6 @@ impl<'a> Uploader<'a> {
Ok(()) Ok(())
} }
async fn make_nar(&self) -> Result<Vec<u8>> {
Ok(Command::new("nix")
.arg("nar")
.arg("dump-path")
.arg(self.path.absolute_path())
.output()
.await?
.stdout)
}
fn narinfo_from_nar(&self, nar: &[u8]) -> Result<NarInfo> { fn narinfo_from_nar(&self, nar: &[u8]) -> Result<NarInfo> {
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
hasher.update(nar); hasher.update(nar);
@ -173,18 +172,89 @@ impl<'a> Uploader<'a> {
Ok(nar_info) Ok(nar_info)
} }
fn nar_url(&self, compressed_nar_hash: &[u8]) -> String { async fn make_nar(&self, sink: &mut File) -> Result<()> {
let compressed_nar_hash = nixbase32::encode(compressed_nar_hash); let nar = nar::open(sink).await?;
format!("nar/{compressed_nar_hash}.nar.zst") let path = self.path.absolute_path();
let metadata = File::open(&path).await?.metadata().await?;
if metadata.is_symlink() {
let target = read_link(&path).await?;
nar.symlink(target.as_os_str().as_encoded_bytes()).await;
} else if metadata.is_dir() {
let mut nar = nar.directory().await?;
nar_from_dir(path.into(), &mut nar).await;
nar.close().await;
} else if metadata.is_file() {
let perms = metadata.permissions().mode();
let mut executable = false;
if (perms & 0o700) == 0o700 {
executable = true;
} }
async fn compress_nar(&self, nar: &[u8]) -> Vec<u8> { let mut file = BufReader::new(File::open(&path).await?);
let mut encoder = ZstdEncoder::with_quality(nar, Level::Default); nar.file(executable, metadata.len(), &mut file).await;
let mut compressed = Vec::with_capacity(nar.len()); }
encoder
.read_to_end(&mut compressed) Ok(())
.await
.expect("should compress just fine");
compressed
} }
} }
async fn nar_from_dir(path: PathBuf, node: &mut nar::Directory<'_, '_>) -> Result<()> {
let root = ReadDirStream::new(read_dir(&path).await?);
let entries = root
.map_ok(|x| (x.file_name(), x))
.try_collect::<BTreeMap<_, _>>()
.await?;
// directory entries must be written in ascending order of name
for (name, entry) in entries.iter() {
let node = node.entry(name.as_encoded_bytes()).await?;
let metadata = entry.metadata().await?;
if metadata.is_symlink() {
let target = read_link(entry.path()).await?;
node.symlink(target.as_os_str().as_encoded_bytes()).await;
} else if metadata.is_dir() {
let mut node = node.directory().await?;
Box::pin(nar_from_dir(entry.path(), &mut node)).await;
node.close().await;
} else if metadata.is_file() {
let perms = metadata.permissions().mode();
let mut executable = false;
if (perms & 0o700) == 0o700 {
executable = true;
}
let mut file = BufReader::new(File::open(entry.path()).await?);
node.file(executable, metadata.len(), &mut file).await;
}
}
Ok(())
}
fn compress_and_hash_nar(
nar_file: File,
nar_hasher: &mut Sha256,
compressed_nar_hasher: &mut Sha256,
) -> impl AsyncRead {
let nar_reader = InspectReader::new(nar_file, |x| nar_hasher.update(x));
let nar_buf_reader = BufReader::new(nar_reader);
let compressed_nar_reader = ZstdEncoder::with_quality(nar_buf_reader, Level::Default);
InspectReader::new(compressed_nar_reader, |x| compressed_nar_hasher.update(x))
}
fn nar_url(compressed_nar_hash: &[u8]) -> String {
format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash))
}
async fn read_buf_nar<S: AsyncRead + Unpin>(stream: &mut S, mut buf: BytesMut) -> Result<Bytes> {
while buf.len() < buf.capacity() {
let n = read_buf(stream, &mut buf).await?;
if n == 0 {
break;
}
}
Ok(buf.freeze())
}