Compare commits
3 commits
main
...
2025-04-15
Author | SHA1 | Date | |
---|---|---|---|
ac4b2ba136 | |||
4808671071 | |||
a17fa92c78 |
8 changed files with 158 additions and 43 deletions
4
Cargo.lock
generated
4
Cargo.lock
generated
|
@ -2228,6 +2228,7 @@ dependencies = [
|
||||||
"async-compression",
|
"async-compression",
|
||||||
"aws-config",
|
"aws-config",
|
||||||
"aws-sdk-s3",
|
"aws-sdk-s3",
|
||||||
|
"bytes",
|
||||||
"clap",
|
"clap",
|
||||||
"console-subscriber",
|
"console-subscriber",
|
||||||
"cxx",
|
"cxx",
|
||||||
|
@ -2241,7 +2242,10 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sha2",
|
"sha2",
|
||||||
|
"tempfile",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tokio-stream",
|
||||||
|
"tokio-util",
|
||||||
"tracing",
|
"tracing",
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
|
@ -22,6 +22,10 @@ tracing = "0.1.41"
|
||||||
url = { version = "2.5.4", features = [ "serde" ]}
|
url = { version = "2.5.4", features = [ "serde" ]}
|
||||||
cxx = "1.0"
|
cxx = "1.0"
|
||||||
console-subscriber = "0.4.1"
|
console-subscriber = "0.4.1"
|
||||||
|
bytes = "1.10.1"
|
||||||
|
tokio-stream = { version = "0.1.17", features = ["fs"] }
|
||||||
|
tempfile = "3.19.1"
|
||||||
|
tokio-util = { version = "0.7.14", features = ["io"] }
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
cxx-build = "1.0"
|
cxx-build = "1.0"
|
||||||
|
|
|
@ -211,6 +211,13 @@ mod ffi {
|
||||||
/// Obtains a handle to the Nix store.
|
/// Obtains a handle to the Nix store.
|
||||||
fn open_nix_store() -> Result<UniquePtr<CNixStore>>;
|
fn open_nix_store() -> Result<UniquePtr<CNixStore>>;
|
||||||
|
|
||||||
|
/// Creates a NAR dump from a path.
|
||||||
|
fn nar_from_path(
|
||||||
|
self: Pin<&mut CNixStore>,
|
||||||
|
base_name: Vec<u8>,
|
||||||
|
sender: Box<AsyncWriteSender>,
|
||||||
|
) -> Result<()>;
|
||||||
|
|
||||||
// =========
|
// =========
|
||||||
// CPathInfo
|
// CPathInfo
|
||||||
// =========
|
// =========
|
||||||
|
|
|
@ -108,6 +108,17 @@ std::unique_ptr<std::vector<std::string>> CNixStore::compute_fs_closure(RBasePat
|
||||||
return std::make_unique<std::vector<std::string>>(result);
|
return std::make_unique<std::vector<std::string>>(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void CNixStore::nar_from_path(RVec<unsigned char> base_name, RBox<AsyncWriteSender> sender) {
|
||||||
|
RustSink sink(std::move(sender));
|
||||||
|
|
||||||
|
std::string_view sv((const char *)base_name.data(), base_name.size());
|
||||||
|
nix::StorePath store_path(sv);
|
||||||
|
|
||||||
|
// exceptions will be thrown into Rust
|
||||||
|
this->store->narFromPath(store_path, sink);
|
||||||
|
sink.eof();
|
||||||
|
}
|
||||||
|
|
||||||
std::unique_ptr<CNixStore> open_nix_store() {
|
std::unique_ptr<CNixStore> open_nix_store() {
|
||||||
return std::make_unique<CNixStore>();
|
return std::make_unique<CNixStore>();
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,6 +79,7 @@ public:
|
||||||
bool flip_direction,
|
bool flip_direction,
|
||||||
bool include_outputs,
|
bool include_outputs,
|
||||||
bool include_derivers);
|
bool include_derivers);
|
||||||
|
void nar_from_path(RVec<unsigned char> base_name, RBox<AsyncWriteSender> sender);
|
||||||
};
|
};
|
||||||
|
|
||||||
std::unique_ptr<CNixStore> open_nix_store();
|
std::unique_ptr<CNixStore> open_nix_store();
|
||||||
|
|
|
@ -160,7 +160,7 @@ impl Push {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||||
let mut uploads = Vec::with_capacity(10);
|
let mut uploads = Vec::new();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Some(path_to_upload) = rx.recv().await {
|
if let Some(path_to_upload) = rx.recv().await {
|
||||||
|
|
20
src/store.rs
20
src/store.rs
|
@ -4,7 +4,10 @@ use anyhow::{Context, Result};
|
||||||
use nix_compat::store_path::StorePath;
|
use nix_compat::store_path::StorePath;
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
|
|
||||||
use crate::{bindings, path_info::PathInfo};
|
use crate::{
|
||||||
|
bindings::{self, AsyncWriteAdapter},
|
||||||
|
path_info::PathInfo,
|
||||||
|
};
|
||||||
|
|
||||||
pub struct Store {
|
pub struct Store {
|
||||||
inner: Arc<bindings::FfiNixStore>,
|
inner: Arc<bindings::FfiNixStore>,
|
||||||
|
@ -75,4 +78,19 @@ impl Store {
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn stream_nar(&self, path: StorePath<String>) -> AsyncWriteAdapter {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let (adapter, mut sender) = AsyncWriteAdapter::new();
|
||||||
|
|
||||||
|
task::spawn_blocking(move || {
|
||||||
|
if let Err(e) = inner
|
||||||
|
.store()
|
||||||
|
.nar_from_path(path.to_string().as_bytes().to_vec(), sender.clone())
|
||||||
|
{
|
||||||
|
let _ = sender.rust_error(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
adapter
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
152
src/uploader.rs
152
src/uploader.rs
|
@ -1,20 +1,30 @@
|
||||||
|
use std::{collections::BTreeMap, os::unix::fs::PermissionsExt, path::PathBuf};
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use async_compression::{Level, tokio::bufread::ZstdEncoder};
|
use async_compression::{Level, tokio::bufread::ZstdEncoder};
|
||||||
use aws_sdk_s3::{
|
use aws_sdk_s3::{
|
||||||
self as s3,
|
self as s3,
|
||||||
types::{CompletedMultipartUpload, CompletedPart},
|
types::{CompletedMultipartUpload, CompletedPart},
|
||||||
};
|
};
|
||||||
use futures::future::join_all;
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
|
use futures::{future::join_all, stream::TryStreamExt};
|
||||||
use nix_compat::{
|
use nix_compat::{
|
||||||
|
nar::writer::r#async as nar,
|
||||||
narinfo::{self, NarInfo, SigningKey},
|
narinfo::{self, NarInfo, SigningKey},
|
||||||
nixbase32,
|
nixbase32,
|
||||||
store_path::StorePath,
|
store_path::StorePath,
|
||||||
};
|
};
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
use tokio::{io::AsyncReadExt, process::Command};
|
use tokio::{
|
||||||
|
fs::{File, read_dir, read_link},
|
||||||
|
io::{AsyncRead, BufReader},
|
||||||
|
pin,
|
||||||
|
};
|
||||||
|
use tokio_stream::wrappers::ReadDirStream;
|
||||||
|
use tokio_util::io::{InspectReader, read_buf};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
use crate::path_info::PathInfo;
|
use crate::{bindings::AsyncWriteAdapter, path_info::PathInfo, store::Store};
|
||||||
|
|
||||||
const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5;
|
const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5;
|
||||||
|
|
||||||
|
@ -23,7 +33,7 @@ pub struct Uploader<'a> {
|
||||||
path: PathInfo,
|
path: PathInfo,
|
||||||
s3_client: &'a s3::Client,
|
s3_client: &'a s3::Client,
|
||||||
bucket: String,
|
bucket: String,
|
||||||
hash: Sha256,
|
store: &'a Store,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Uploader<'a> {
|
impl<'a> Uploader<'a> {
|
||||||
|
@ -32,38 +42,37 @@ impl<'a> Uploader<'a> {
|
||||||
path: PathInfo,
|
path: PathInfo,
|
||||||
s3_client: &'a s3::Client,
|
s3_client: &'a s3::Client,
|
||||||
bucket: String,
|
bucket: String,
|
||||||
|
store: &'a Store,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
signing_key,
|
signing_key,
|
||||||
path,
|
path,
|
||||||
s3_client,
|
s3_client,
|
||||||
bucket,
|
bucket,
|
||||||
hash: Sha256::new(),
|
store,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn upload(&self) -> Result<()> {
|
pub async fn upload(&self) -> Result<()> {
|
||||||
let nar = self.make_nar().await?;
|
let mut nar_temp = File::open(tempfile::Builder::new().tempfile()?.path()).await?;
|
||||||
let mut nar_info = self.narinfo_from_nar(&nar)?;
|
self.make_nar(&mut nar_temp).await;
|
||||||
let nar = self.compress_nar(&nar).await;
|
|
||||||
|
|
||||||
// update fields that we know after compression
|
// this goes to the .narinfo file
|
||||||
let mut hasher = Sha256::new();
|
let mut nar_hasher = Sha256::new();
|
||||||
hasher.update(&nar);
|
// this is the URL for file .narinfo points to
|
||||||
let hash: [u8; 32] = hasher.finalize().into();
|
let mut file_hasher = Sha256::new();
|
||||||
let nar_url = self.nar_url(&hash);
|
let nar_reader = compress_and_hash_nar(nar_temp, &mut nar_hasher, &mut file_hasher);
|
||||||
nar_info.file_hash = Some(hash);
|
|
||||||
nar_info.file_size = Some(nar.len() as u64);
|
|
||||||
nar_info.url = nar_url.as_str();
|
|
||||||
debug!("uploading nar with key: {nar_url}");
|
|
||||||
|
|
||||||
if nar.len() < MULTIPART_CUTOFF {
|
let buf = BytesMut::with_capacity(MULTIPART_CUTOFF);
|
||||||
|
let
|
||||||
|
|
||||||
|
if first_chunk.len() < MULTIPART_CUTOFF {
|
||||||
let put_object = self
|
let put_object = self
|
||||||
.s3_client
|
.s3_client
|
||||||
.put_object()
|
.put_object()
|
||||||
.bucket(&self.bucket)
|
.bucket(&self.bucket)
|
||||||
.key(&nar_url)
|
.key(&nar_url)
|
||||||
.body(nar.into())
|
.body(first_chunk.into())
|
||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
debug!("put object: {:#?}", put_object);
|
debug!("put object: {:#?}", put_object);
|
||||||
|
@ -139,16 +148,6 @@ impl<'a> Uploader<'a> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn make_nar(&self) -> Result<Vec<u8>> {
|
|
||||||
Ok(Command::new("nix")
|
|
||||||
.arg("nar")
|
|
||||||
.arg("dump-path")
|
|
||||||
.arg(self.path.absolute_path())
|
|
||||||
.output()
|
|
||||||
.await?
|
|
||||||
.stdout)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn narinfo_from_nar(&self, nar: &[u8]) -> Result<NarInfo> {
|
fn narinfo_from_nar(&self, nar: &[u8]) -> Result<NarInfo> {
|
||||||
let mut hasher = Sha256::new();
|
let mut hasher = Sha256::new();
|
||||||
hasher.update(nar);
|
hasher.update(nar);
|
||||||
|
@ -173,18 +172,89 @@ impl<'a> Uploader<'a> {
|
||||||
Ok(nar_info)
|
Ok(nar_info)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn nar_url(&self, compressed_nar_hash: &[u8]) -> String {
|
async fn make_nar(&self, sink: &mut File) -> Result<()> {
|
||||||
let compressed_nar_hash = nixbase32::encode(compressed_nar_hash);
|
let nar = nar::open(sink).await?;
|
||||||
format!("nar/{compressed_nar_hash}.nar.zst")
|
let path = self.path.absolute_path();
|
||||||
}
|
let metadata = File::open(&path).await?.metadata().await?;
|
||||||
|
|
||||||
async fn compress_nar(&self, nar: &[u8]) -> Vec<u8> {
|
if metadata.is_symlink() {
|
||||||
let mut encoder = ZstdEncoder::with_quality(nar, Level::Default);
|
let target = read_link(&path).await?;
|
||||||
let mut compressed = Vec::with_capacity(nar.len());
|
nar.symlink(target.as_os_str().as_encoded_bytes()).await;
|
||||||
encoder
|
} else if metadata.is_dir() {
|
||||||
.read_to_end(&mut compressed)
|
let mut nar = nar.directory().await?;
|
||||||
.await
|
nar_from_dir(path.into(), &mut nar).await;
|
||||||
.expect("should compress just fine");
|
nar.close().await;
|
||||||
compressed
|
} else if metadata.is_file() {
|
||||||
|
let perms = metadata.permissions().mode();
|
||||||
|
let mut executable = false;
|
||||||
|
if (perms & 0o700) == 0o700 {
|
||||||
|
executable = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut file = BufReader::new(File::open(&path).await?);
|
||||||
|
nar.file(executable, metadata.len(), &mut file).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn nar_from_dir(path: PathBuf, node: &mut nar::Directory<'_, '_>) -> Result<()> {
|
||||||
|
let root = ReadDirStream::new(read_dir(&path).await?);
|
||||||
|
let entries = root
|
||||||
|
.map_ok(|x| (x.file_name(), x))
|
||||||
|
.try_collect::<BTreeMap<_, _>>()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// directory entries must be written in ascending order of name
|
||||||
|
for (name, entry) in entries.iter() {
|
||||||
|
let node = node.entry(name.as_encoded_bytes()).await?;
|
||||||
|
let metadata = entry.metadata().await?;
|
||||||
|
|
||||||
|
if metadata.is_symlink() {
|
||||||
|
let target = read_link(entry.path()).await?;
|
||||||
|
node.symlink(target.as_os_str().as_encoded_bytes()).await;
|
||||||
|
} else if metadata.is_dir() {
|
||||||
|
let mut node = node.directory().await?;
|
||||||
|
Box::pin(nar_from_dir(entry.path(), &mut node)).await;
|
||||||
|
node.close().await;
|
||||||
|
} else if metadata.is_file() {
|
||||||
|
let perms = metadata.permissions().mode();
|
||||||
|
let mut executable = false;
|
||||||
|
if (perms & 0o700) == 0o700 {
|
||||||
|
executable = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut file = BufReader::new(File::open(entry.path()).await?);
|
||||||
|
node.file(executable, metadata.len(), &mut file).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn compress_and_hash_nar(
|
||||||
|
nar_file: File,
|
||||||
|
nar_hasher: &mut Sha256,
|
||||||
|
compressed_nar_hasher: &mut Sha256,
|
||||||
|
) -> impl AsyncRead {
|
||||||
|
let nar_reader = InspectReader::new(nar_file, |x| nar_hasher.update(x));
|
||||||
|
let nar_buf_reader = BufReader::new(nar_reader);
|
||||||
|
|
||||||
|
let compressed_nar_reader = ZstdEncoder::with_quality(nar_buf_reader, Level::Default);
|
||||||
|
InspectReader::new(compressed_nar_reader, |x| compressed_nar_hasher.update(x))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn nar_url(compressed_nar_hash: &[u8]) -> String {
|
||||||
|
format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_buf_nar<S: AsyncRead + Unpin>(stream: &mut S, mut buf: BytesMut) -> Result<Bytes> {
|
||||||
|
while buf.len() < buf.capacity() {
|
||||||
|
let n = read_buf(stream, &mut buf).await?;
|
||||||
|
|
||||||
|
if n == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(buf.freeze())
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue