refactor and bunch more improvements; use object_store for s3

This commit is contained in:
cy 2025-04-26 12:37:07 -04:00
parent b1e59d0a6c
commit 81ce855dae
Signed by: cy
SSH key fingerprint: SHA256:o/geVWV4om1QhUSkKvDQeW/eAihwnjyXkqMwrVdbuts
9 changed files with 535 additions and 1320 deletions

1454
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,8 +6,6 @@ edition = "2024"
[dependencies] [dependencies]
anyhow = "1.0.97" anyhow = "1.0.97"
async-compression = { version = "0.4.22", features = ["tokio", "zstd"] } async-compression = { version = "0.4.22", features = ["tokio", "zstd"] }
aws-config = { version = "1.6.1", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.82.0"
clap = { version = "4.5.34", features = ["derive"] } clap = { version = "4.5.34", features = ["derive"] }
ed25519-dalek = "2.1.1" ed25519-dalek = "2.1.1"
futures = "0.3.31" futures = "0.3.31"
@ -22,6 +20,11 @@ tracing = "0.1.41"
url = { version = "2.5.4", features = [ "serde" ]} url = { version = "2.5.4", features = [ "serde" ]}
cxx = "1.0" cxx = "1.0"
console-subscriber = "0.4.1" console-subscriber = "0.4.1"
tempfile = "3.19.1"
tokio-util = { version = "0.7.15", features = ["io"] }
bytes = "1.10.1"
object_store = { version = "0.12.0", features = ["aws"] }
ulid = "1.2.1"
[build-dependencies] [build-dependencies]
cxx-build = "1.0" cxx-build = "1.0"

View file

@ -35,6 +35,7 @@
nix nix
boost boost
tokio-console tokio-console
cargo-udeps
]; ];
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include"; NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
RUST_LOG = "nixcp=debug"; RUST_LOG = "nixcp=debug";

View file

@ -1,5 +1,6 @@
#![feature(let_chains)] #![feature(let_chains)]
#![feature(extend_one)] #![feature(extend_one)]
#![feature(exit_status_error)]
use std::path::PathBuf; use std::path::PathBuf;
@ -11,6 +12,7 @@ use store::Store;
mod bindings; mod bindings;
mod cli; mod cli;
mod make_nar;
mod path_info; mod path_info;
mod push; mod push;
mod store; mod store;
@ -48,19 +50,15 @@ pub struct PushArgs {
#[arg(long)] #[arg(long)]
signing_key: String, signing_key: String,
/// If unspecified, will get it form AWS_DEFAULT_REGION envar /// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
#[arg(long)] #[arg(long)]
region: Option<String>, region: Option<String>,
/// If unspecifed, will get it from AWS_ENDPOINT_URL envar /// If unspecifed, will get it from AWS_ENDPOINT envar
/// e.g. https://s3.example.com /// e.g. https://s3.example.com
#[arg(long)] #[arg(long)]
endpoint: Option<String>, endpoint: Option<String>,
/// AWS profile to use
#[arg(long)]
profile: Option<String>,
#[arg(long)] #[arg(long)]
skip_signature_check: bool, skip_signature_check: bool,

92
src/make_nar.rs Normal file
View file

@ -0,0 +1,92 @@
use anyhow::{Context, Result};
use async_compression::{Level, tokio::bufread::ZstdEncoder};
use nix_compat::{
narinfo::{self, NarInfo},
store_path::StorePath,
};
use sha2::{Digest, Sha256};
use std::mem::take;
use tempfile::NamedTempFile;
use tokio::{
fs::File,
io::{AsyncRead, BufReader},
process::Command,
};
use tokio_util::io::InspectReader;
use crate::path_info::PathInfo;
pub struct MakeNar<'a> {
path_info: &'a PathInfo,
nar_file: NamedTempFile,
nar_hasher: Sha256,
/// hash of compressed nar file
file_hasher: Sha256,
nar_size: u64,
file_size: u64,
}
impl<'a> MakeNar<'a> {
pub fn new(path_info: &'a PathInfo) -> Result<Self> {
Ok(Self {
path_info,
nar_file: NamedTempFile::new().context("crated tempfile for nar")?,
nar_hasher: Sha256::new(),
file_hasher: Sha256::new(),
nar_size: 0,
file_size: 0,
})
}
pub async fn make(&self) -> Result<()> {
Ok(Command::new("nix")
.arg("nar")
.arg("dump-path")
.arg(self.path_info.absolute_path())
.kill_on_drop(true)
.stdout(self.nar_file.reopen()?)
.spawn()?
.wait()
.await?
.exit_ok()?)
}
/// Returns a compressed nar reader which can be uploaded. File hash will be available when
/// everything is read
pub async fn compress_and_hash(&mut self) -> Result<impl AsyncRead> {
let nar_file = File::from_std(self.nar_file.reopen()?);
// reader that hashes as nar is read
let nar_reader = InspectReader::new(nar_file, |x| self.nar_hasher.update(x));
let encoder = ZstdEncoder::with_quality(BufReader::new(nar_reader), Level::Default);
// reader that updates file_hash as the compressed nar is read
Ok(InspectReader::new(encoder, |x| self.file_hasher.update(x)))
}
/// Returns *unsigned* narinfo. `url` must be updated before uploading
pub fn get_narinfo(&mut self) -> Result<NarInfo> {
let file_hash = take(&mut self.file_hasher).finalize().into();
Ok(NarInfo {
flags: narinfo::Flags::empty(),
store_path: self.path_info.path.as_ref(),
nar_hash: take(&mut self.nar_hasher).finalize().into(),
nar_size: self.nar_size,
references: self
.path_info
.references
.iter()
.map(StorePath::as_ref)
.collect(),
signatures: Vec::new(),
ca: None,
system: None,
deriver: None,
compression: Some("zstd"),
file_hash: Some(file_hash),
file_size: Some(self.file_size),
url: "",
})
// signature consists of: store_path, nar_hash, nar_size, and references
// nar_info.add_signature(self.signing_key);
}
}

View file

@ -1,10 +1,10 @@
use std::collections::HashSet; use std::collections::HashSet;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use aws_sdk_s3 as s3;
use futures::future::join_all; use futures::future::join_all;
use nix_compat::nixbase32; use nix_compat::nixbase32;
use nix_compat::store_path::StorePath; use nix_compat::store_path::StorePath;
use object_store::{ObjectStore, aws::AmazonS3, path::Path as ObjectPath};
use regex::Regex; use regex::Regex;
use std::path::Path; use std::path::Path;
use tokio::process::Command; use tokio::process::Command;
@ -82,13 +82,13 @@ impl PathInfo {
.filter_map(|signature| Some(signature.split_once(":")?.0)) .filter_map(|signature| Some(signature.split_once(":")?.0))
.collect(); .collect();
trace!("signers for {}: {:?}", self.path, signers); trace!("signers for {}: {:?}", self.path, signers);
return signers; signers
} }
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool { pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
for upstream in upstreams { for upstream in upstreams {
let upstream = upstream let upstream = upstream
.join(format!("{}.narinfo", self.digest()).as_str()) .join(self.narinfo_path().as_ref())
.expect("adding <hash>.narinfo should make a valid url"); .expect("adding <hash>.narinfo should make a valid url");
debug!("querying {}", upstream); debug!("querying {}", upstream);
let res_status = reqwest::Client::new() let res_status = reqwest::Client::new()
@ -108,17 +108,12 @@ impl PathInfo {
self.path.to_absolute_path() self.path.to_absolute_path()
} }
pub fn digest(&self) -> String { pub fn narinfo_path(&self) -> ObjectPath {
nixbase32::encode(self.path.digest()) ObjectPath::parse(format!("{}.narinfo", nixbase32::encode(self.path.digest())))
.expect("must parse to a valid object_store path")
} }
pub async fn check_if_already_exists(&self, s3_client: &s3::Client, bucket: String) -> bool { pub async fn check_if_already_exists(&self, s3: &AmazonS3) -> bool {
s3_client s3.head(&self.narinfo_path()).await.is_ok()
.head_object()
.bucket(bucket)
.key(format!("{}.narinfo", self.digest()))
.send()
.await
.is_ok()
} }
} }

View file

@ -9,23 +9,23 @@ use std::{
}; };
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use aws_config::Region;
use aws_sdk_s3 as s3;
use futures::future::join_all; use futures::future::join_all;
use nix_compat::narinfo::{self, SigningKey}; use nix_compat::narinfo::{self, SigningKey};
use tokio::sync::{RwLock, mpsc}; use object_store::aws::{AmazonS3, AmazonS3Builder};
use tokio::sync::{RwLock, Semaphore, mpsc};
use tracing::debug; use tracing::debug;
use url::Url; use url::Url;
use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader}; use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
const UPLOAD_CONCURRENCY: usize = 32;
pub struct Push { pub struct Push {
upstream_caches: Vec<Url>, upstream_caches: Vec<Url>,
store_paths: Arc<RwLock<Vec<PathInfo>>>, store_paths: Arc<RwLock<Vec<PathInfo>>>,
s3_client: s3::Client,
signing_key: SigningKey<ed25519_dalek::SigningKey>, signing_key: SigningKey<ed25519_dalek::SigningKey>,
bucket: String,
store: Arc<Store>, store: Arc<Store>,
s3: Arc<AmazonS3>,
// paths that we skipped cause of a signature match // paths that we skipped cause of a signature match
signature_hit_count: AtomicUsize, signature_hit_count: AtomicUsize,
// paths that we skipped cause we found it on an upstream // paths that we skipped cause we found it on an upstream
@ -51,25 +51,21 @@ impl Push {
let key = fs::read_to_string(&cli.signing_key)?; let key = fs::read_to_string(&cli.signing_key)?;
let signing_key = narinfo::parse_keypair(key.as_str())?.0; let signing_key = narinfo::parse_keypair(key.as_str())?.0;
let mut s3_config = aws_config::from_env(); let mut s3_builder = AmazonS3Builder::from_env().with_bucket_name(&cli.bucket);
if let Some(region) = &cli.region { if let Some(region) = &cli.region {
s3_config = s3_config.region(Region::new(region.clone())); s3_builder = s3_builder.with_region(region);
} }
if let Some(endpoint) = &cli.endpoint { if let Some(endpoint) = &cli.endpoint {
s3_config = s3_config.endpoint_url(endpoint); s3_builder = s3_builder.with_endpoint(endpoint);
}
if let Some(profile) = &cli.profile {
s3_config = s3_config.profile_name(profile);
} }
let s3_client = s3::Client::new(&s3_config.load().await);
Ok(Self { Ok(Self {
upstream_caches: upstreams, upstream_caches: upstreams,
store_paths: Arc::new(RwLock::new(Vec::new())), store_paths: Arc::new(RwLock::new(Vec::new())),
s3_client,
signing_key, signing_key,
bucket: cli.bucket.clone(),
store: Arc::new(store), store: Arc::new(store),
s3: Arc::new(s3_builder.build()?),
signature_hit_count: AtomicUsize::new(0), signature_hit_count: AtomicUsize::new(0),
upstream_hit_count: AtomicUsize::new(0), upstream_hit_count: AtomicUsize::new(0),
already_exists_count: AtomicUsize::new(0), already_exists_count: AtomicUsize::new(0),
@ -109,9 +105,10 @@ impl Push {
} }
pub async fn run(&'static self) -> Result<()> { pub async fn run(&'static self) -> Result<()> {
let (tx, rx) = mpsc::channel(10); let (tx, rx) = mpsc::channel(UPLOAD_CONCURRENCY);
let filter = tokio::spawn(self.filter_from_upstream(tx)); let filter = tokio::spawn(self.filter_from_upstream(tx));
let upload = tokio::spawn(self.upload(rx)); let upload = tokio::spawn(self.upload(rx));
filter.await?; filter.await?;
upload.await??; upload.await??;
Ok(()) Ok(())
@ -135,10 +132,7 @@ impl Push {
.check_upstream_hit(self.upstream_caches.as_slice()) .check_upstream_hit(self.upstream_caches.as_slice())
.await .await
{ {
if path if path.check_if_already_exists(&self.s3).await {
.check_if_already_exists(&self.s3_client, self.bucket.clone())
.await
{
debug!("skip {} (already exists)", path.absolute_path()); debug!("skip {} (already exists)", path.absolute_path());
self.already_exists_count.fetch_add(1, Ordering::Relaxed); self.already_exists_count.fetch_add(1, Ordering::Relaxed);
} else { } else {
@ -160,22 +154,23 @@ impl Push {
} }
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> { async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
let mut uploads = Vec::with_capacity(10); let mut uploads = Vec::new();
let permits = Arc::new(Semaphore::new(UPLOAD_CONCURRENCY));
loop { loop {
let permits = permits.clone();
let permit = permits.acquire_owned().await.unwrap();
if let Some(path_to_upload) = rx.recv().await { if let Some(path_to_upload) = rx.recv().await {
println!("uploading: {}", path_to_upload.absolute_path()); println!("uploading: {}", path_to_upload.absolute_path());
let uploader = Uploader::new( let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
&self.signing_key,
path_to_upload,
&self.s3_client,
self.bucket.clone(),
)?;
uploads.push(tokio::spawn({ uploads.push(tokio::spawn({
let s3 = self.s3.clone();
async move { async move {
let res = uploader.upload().await; let res = uploader.upload(s3).await;
drop(permit);
self.upload_count.fetch_add(1, Ordering::Relaxed); self.upload_count.fetch_add(1, Ordering::Relaxed);
res res
} }

View file

@ -28,13 +28,13 @@ impl Store {
inner inner
.store() .store()
.compute_fs_closure(path.to_string().as_bytes(), false, true, true)?; .compute_fs_closure(path.to_string().as_bytes(), false, true, true)?;
Ok(cxx_vector cxx_vector
.iter() .iter()
.map(|x| { .map(|x| {
StorePath::from_bytes(x.as_bytes()) StorePath::from_bytes(x.as_bytes())
.context("make StorePath from vector returned by compute_fs_closure") .context("make StorePath from vector returned by compute_fs_closure")
}) })
.collect::<Result<_, _>>()?) .collect::<Result<_, _>>()
}) })
.await .await
.unwrap() .unwrap()

View file

@ -1,190 +1,75 @@
use anyhow::Result; use anyhow::Result;
use async_compression::{Level, tokio::bufread::ZstdEncoder}; use bytes::BytesMut;
use aws_sdk_s3::{ use nix_compat::{narinfo::SigningKey, nixbase32};
self as s3, use object_store::{ObjectStore, aws::AmazonS3, buffered::BufWriter, path::Path};
types::{CompletedMultipartUpload, CompletedPart}, use std::sync::Arc;
}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use futures::future::join_all; use tracing::{debug, trace};
use nix_compat::{ use ulid::Ulid;
narinfo::{self, NarInfo, SigningKey},
nixbase32,
store_path::StorePath,
};
use sha2::{Digest, Sha256};
use tokio::{io::AsyncReadExt, process::Command};
use tracing::debug;
use crate::path_info::PathInfo; use crate::{make_nar::MakeNar, path_info::PathInfo};
const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5; const CHUNK_SIZE: usize = 1024 * 1024 * 5;
pub struct Uploader<'a> { pub struct Uploader<'a> {
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>, signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
path: PathInfo, path: PathInfo,
s3_client: &'a s3::Client,
bucket: String,
hash: Sha256,
} }
impl<'a> Uploader<'a> { impl<'a> Uploader<'a> {
pub fn new( pub fn new(
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>, signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
path: PathInfo, path: PathInfo,
s3_client: &'a s3::Client,
bucket: String,
) -> Result<Self> { ) -> Result<Self> {
Ok(Self { Ok(Self { signing_key, path })
signing_key,
path,
s3_client,
bucket,
hash: Sha256::new(),
})
} }
pub async fn upload(&self) -> Result<()> { pub async fn upload(&self, s3: Arc<AmazonS3>) -> Result<()> {
let nar = self.make_nar().await?; let mut nar = MakeNar::new(&self.path)?;
let mut nar_info = self.narinfo_from_nar(&nar)?; nar.make().await?;
let nar = self.compress_nar(&nar).await;
// update fields that we know after compression // we don't know what the hash of the compressed file will be so upload to a
let mut hasher = Sha256::new(); // temp location for now
hasher.update(&nar); let temp_path = Path::parse(Ulid::new().to_string())?;
let hash: [u8; 32] = hasher.finalize().into(); let mut s3_writer = BufWriter::new(s3.clone(), temp_path.clone());
let nar_url = self.nar_url(&hash);
nar_info.file_hash = Some(hash);
nar_info.file_size = Some(nar.len() as u64);
nar_info.url = nar_url.as_str();
debug!("uploading nar with key: {nar_url}");
if nar.len() < MULTIPART_CUTOFF { // compress and upload nar
let put_object = self let mut file_reader = nar.compress_and_hash().await?;
.s3_client let mut buf = BytesMut::with_capacity(CHUNK_SIZE);
.put_object() debug!("uploading to temp path: {}", temp_path);
.bucket(&self.bucket) while let n = file_reader.read_buf(&mut buf).await?
.key(&nar_url) && n != 0
.body(nar.into()) {
.send() s3_writer.write_all_buf(&mut buf).await?;
.await?;
debug!("put object: {:#?}", put_object);
} else {
let multipart = self
.s3_client
.create_multipart_upload()
.bucket(&self.bucket)
.key(&nar_url)
.send()
.await?;
let upload_id = multipart.upload_id().unwrap();
let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF);
let chunks = nar.chunks(MULTIPART_CUTOFF);
for (i, chunk) in chunks.enumerate() {
parts.push(tokio::task::spawn(
self.s3_client
.upload_part()
.bucket(&self.bucket)
.key(&nar_url)
.upload_id(upload_id)
.part_number(i as i32 + 1)
.body(chunk.to_vec().into())
.send(),
));
}
let completed_parts = join_all(parts)
.await
.into_iter()
.flatten()
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.enumerate()
.map(|(i, part)| {
CompletedPart::builder()
.set_e_tag(part.e_tag().map(ToString::to_string))
.set_part_number(Some(i as i32 + 1))
.set_checksum_sha256(part.checksum_sha256().map(ToString::to_string))
.build()
})
.collect::<Vec<_>>();
let completed_mp_upload = CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build();
let complete_mp_upload = self
.s3_client
.complete_multipart_upload()
.bucket(&self.bucket)
.key(&nar_url)
.upload_id(upload_id)
.multipart_upload(completed_mp_upload)
.send()
.await?;
debug!("complete multipart upload: {:#?}", complete_mp_upload);
} }
drop(file_reader);
let narinfo_url = format!("{}.narinfo", self.path.digest()); let mut nar_info = nar.get_narinfo()?;
debug!("uploading narinfo with key {narinfo_url}"); nar_info.add_signature(self.signing_key);
self.s3_client trace!("narinfo: {:#}", nar_info);
.put_object()
.bucket(&self.bucket) // now that we can calculate the file_hash move the nar to where it should be
.key(narinfo_url) let real_path = nar_url(
.body(nar_info.to_string().as_bytes().to_vec().into()) &nar_info
.send() .file_hash
.await?; .expect("file hash must be known at this point"),
debug!("done uploading narinfo"); );
debug!("moving {} to {}", temp_path, real_path);
// this is implemented as a copy-and-delete
s3.rename(&temp_path, &real_path).await?;
// upload narinfo
let narinfo_path = self.path.narinfo_path();
debug!("uploading narinfo: {}", narinfo_path);
s3.put(&narinfo_path, nar_info.to_string().into()).await?;
Ok(()) Ok(())
} }
}
async fn make_nar(&self) -> Result<Vec<u8>> {
Ok(Command::new("nix") /// calculate url where the compressed nar should be uploaded
.arg("nar") fn nar_url(file_hash: &[u8]) -> Path {
.arg("dump-path") let compressed_nar_hash = nixbase32::encode(file_hash);
.arg(self.path.absolute_path()) Path::parse(format!("nar/{compressed_nar_hash}.nar.zst"))
.output() .expect("should parse to a valid object_store::path::Path")
.await?
.stdout)
}
fn narinfo_from_nar(&self, nar: &[u8]) -> Result<NarInfo> {
let mut hasher = Sha256::new();
hasher.update(nar);
let nar_hash: [u8; 32] = hasher.finalize().into();
let mut nar_info = NarInfo {
flags: narinfo::Flags::empty(),
store_path: self.path.path.as_ref(),
nar_hash,
nar_size: nar.len() as u64,
references: self.path.references.iter().map(StorePath::as_ref).collect(),
signatures: Vec::new(),
ca: None,
system: None,
deriver: None,
compression: Some("zstd"),
file_hash: None,
file_size: None,
url: "",
};
// signature consists of: store_path, nar_hash, nar_size, and references
nar_info.add_signature(self.signing_key);
Ok(nar_info)
}
fn nar_url(&self, compressed_nar_hash: &[u8]) -> String {
let compressed_nar_hash = nixbase32::encode(compressed_nar_hash);
format!("nar/{compressed_nar_hash}.nar.zst")
}
async fn compress_nar(&self, nar: &[u8]) -> Vec<u8> {
let mut encoder = ZstdEncoder::with_quality(nar, Level::Default);
let mut compressed = Vec::with_capacity(nar.len());
encoder
.read_to_end(&mut compressed)
.await
.expect("should compress just fine");
compressed
}
} }