many changes

This commit is contained in:
cy 2025-04-13 20:17:54 -04:00
parent 681ee5e826
commit 2c252a42c5
Signed by: cy
SSH key fingerprint: SHA256:o/geVWV4om1QhUSkKvDQeW/eAihwnjyXkqMwrVdbuts
6 changed files with 1470 additions and 113 deletions

1257
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -6,9 +6,12 @@ edition = "2024"
[dependencies] [dependencies]
anyhow = "1.0.97" anyhow = "1.0.97"
async-compression = { version = "0.4.22", features = ["tokio", "zstd"] } async-compression = { version = "0.4.22", features = ["tokio", "zstd"] }
aws-config = { version = "1.6.1", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.82.0"
clap = { version = "4.5.34", features = ["derive"] } clap = { version = "4.5.34", features = ["derive"] }
ed25519-dalek = "2.1.1" ed25519-dalek = "2.1.1"
env_logger = "0.11.7" env_logger = "0.11.7"
futures = "0.3.31"
log = "0.4.27" log = "0.4.27"
nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" } nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" }
regex = "1.11.1" regex = "1.11.1"

View file

@ -1,7 +1,8 @@
#![feature(let_chains)] #![feature(let_chains)]
#![feature(extend_one)] #![feature(extend_one)]
#![feature(array_chunks)]
use anyhow::Result; use anyhow::{Context, Result};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use nixcp::NixCp; use nixcp::NixCp;
@ -17,14 +18,32 @@ struct Cli {
#[command(subcommand)] #[command(subcommand)]
command: Commands, command: Commands,
/// Address of the binary cache (passed to nix copy --to) /// The s3 bucket to upload to
#[arg(long, value_name = "BINARY CACHE")] #[arg(long, value_name = "bucket name")]
to: String, bucket: String,
/// Upstream cache to check against. Can be specified multiple times. /// Upstream cache to check against. Can be specified multiple times.
/// cache.nixos.org is always included /// cache.nixos.org is always included
#[arg(long = "upstream-cache", short)] #[arg(long = "upstream", short, value_name = "nixcache.example.com")]
upstream_caches: Vec<String>, upstreams: Vec<String>,
/// Path to the file containing signing key
/// e.g. ~/cache-priv-key.pem
#[arg(long)]
signing_key: String,
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or the AWS default
#[arg(long)]
region: Option<String>,
/// If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default
/// e.g. s3.example.com
#[arg(long)]
endpoint: Option<String>,
/// AWS profile to use
#[arg(long)]
profile: Option<String>,
} }
#[derive(Debug, Subcommand)] #[derive(Debug, Subcommand)]
@ -32,6 +51,7 @@ enum Commands {
Push { Push {
/// Package or store path to upload /// Package or store path to upload
/// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 /// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
#[arg(value_name = "package or store path")]
package: String, package: String,
}, },
} }
@ -40,12 +60,15 @@ enum Commands {
async fn main() -> Result<()> { async fn main() -> Result<()> {
env_logger::init(); env_logger::init();
let cli = Cli::parse(); let cli = Cli::parse();
let nixcp = Box::leak(Box::new(NixCp::with_upstreams(&cli.upstream_caches)?)); let nixcp = Box::leak(Box::new(NixCp::new(&cli).await?));
match &cli.command { match &cli.command {
Commands::Push { package } => { Commands::Push { package } => {
nixcp.paths_from_package(package).await?; nixcp
nixcp.run().await; .paths_from_package(package)
.await
.context("nixcp get paths from package")?;
nixcp.run().await.context("nixcp run")?;
} }
} }

View file

@ -1,4 +1,5 @@
use std::{ use std::{
fs,
iter::once, iter::once,
sync::{ sync::{
Arc, Mutex, Arc, Mutex,
@ -6,56 +7,86 @@ use std::{
}, },
}; };
use crate::path_info::PathInfo;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use log::{info, warn}; use aws_config::Region;
use tokio::{ use aws_sdk_s3 as s3;
process::Command, use futures::future::join_all;
sync::{RwLock, Semaphore, mpsc}, use log::{debug, info, warn};
}; use nix_compat::narinfo::{self, SigningKey};
use tokio::sync::{RwLock, Semaphore, mpsc};
use url::Url; use url::Url;
use crate::{Cli, path_info::PathInfo, uploader::Uploader};
pub struct NixCp { pub struct NixCp {
upstream_caches: Arc<Vec<Url>>, upstream_caches: Arc<Vec<Url>>,
store_paths: Arc<RwLock<Vec<PathInfo>>>, store_paths: Arc<RwLock<Vec<PathInfo>>>,
s3_client: s3::Client,
signing_key: SigningKey<ed25519_dalek::SigningKey>,
bucket: String,
} }
impl NixCp { impl NixCp {
pub fn with_upstreams(new_upstreams: &[String]) -> Result<Self> { pub async fn new(cli: &Cli) -> Result<Self> {
let mut upstreams = Vec::with_capacity(new_upstreams.len() + 1); let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1);
for upstream in new_upstreams for upstream in cli
.upstreams
.iter() .iter()
.chain(once(&"https://cache.nixos.org".to_string())) .chain(once(&"https://cache.nixos.org".to_string()))
{ {
upstreams upstreams
.push(Url::parse(upstream).context(format!("failed to parse {upstream} as url"))?); .push(Url::parse(upstream).context(format!("failed to parse {upstream} as url"))?);
} }
let key = fs::read_to_string(&cli.signing_key)?;
let signing_key = narinfo::parse_keypair(key.as_str())?.0;
let mut s3_config = aws_config::from_env();
if let Some(region) = &cli.region {
s3_config = s3_config.region(Region::new(region.clone()));
}
if let Some(endpoint) = &cli.endpoint {
s3_config = s3_config.endpoint_url(endpoint);
}
if let Some(profile) = &cli.profile {
s3_config = s3_config.profile_name(profile);
}
let s3_client = s3::Client::new(&s3_config.load().await);
Ok(Self { Ok(Self {
upstream_caches: Arc::new(upstreams), upstream_caches: Arc::new(upstreams),
store_paths: Arc::new(RwLock::new(Vec::new())), store_paths: Arc::new(RwLock::new(Vec::new())),
s3_client,
signing_key,
bucket: cli.bucket.clone(),
}) })
} }
pub async fn paths_from_package(&mut self, package: &str) -> Result<()> { pub async fn paths_from_package(&mut self, package: &str) -> Result<()> {
let path_info = PathInfo::from_path(package).await?; let path_info = PathInfo::from_path(package)
self.store_paths
.write()
.await .await
.extend(path_info.get_closure().await?); .context("get path info for package")?;
debug!("path-info for {package}: {:?}", path_info);
self.store_paths.write().await.extend(
path_info
.get_closure()
.await
.context("closure from path info")?,
);
info!("found {} store paths", self.store_paths.read().await.len()); info!("found {} store paths", self.store_paths.read().await.len());
Ok(()) Ok(())
} }
pub async fn run(&'static self) { pub async fn run(&'static self) -> Result<()> {
let (tx, rx) = mpsc::channel(10); let (tx, rx) = mpsc::channel(10);
let tx = Arc::new(tx); let tx = Arc::new(tx);
tokio::spawn(self.filter_from_upstream(tx)); tokio::spawn(self.filter_from_upstream(tx));
tokio::spawn(self.uploader("".to_string(), rx)); self.upload(rx).await
} }
/// filter paths that are on upstream and send to `tx` /// filter paths that are on upstream and send to `tx`
async fn filter_from_upstream(&self, tx: Arc<mpsc::Sender<String>>) { async fn filter_from_upstream(&self, tx: Arc<mpsc::Sender<PathInfo>>) {
let permits = Arc::new(Semaphore::new(10)); let permits = Arc::new(Semaphore::new(10));
let mut handles = Vec::with_capacity(10); let mut handles = Vec::with_capacity(10);
let store_paths = self.store_paths.read().await.clone(); let store_paths = self.store_paths.read().await.clone();
@ -72,7 +103,7 @@ impl NixCp {
let _permit = permits.acquire().await.unwrap(); let _permit = permits.acquire().await.unwrap();
if !path.check_upstream_hit(upstream_caches.as_slice()).await { if !path.check_upstream_hit(upstream_caches.as_slice()).await {
tx.send(path.absolute_path()).await.unwrap(); tx.send(path).await.unwrap();
} }
}) })
}); });
@ -83,42 +114,32 @@ impl NixCp {
} }
} }
async fn uploader(&self, cache: String, mut rx: mpsc::Receiver<String>) { async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
let upload_count = Arc::new(AtomicUsize::new(0)); let upload_count = AtomicUsize::new(0);
let failures: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new())); let failures: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let permits = Arc::new(Semaphore::new(10)); let permits = Arc::new(Semaphore::new(10));
let mut handles = Vec::with_capacity(10); let mut uploads = Vec::with_capacity(10);
loop { loop {
if let Some(path_to_upload) = rx.recv().await { if let Some(path_to_upload) = rx.recv().await {
let permits = Arc::clone(&permits); let permits = Arc::clone(&permits);
let failures = Arc::clone(&failures); let absolute_path = path_to_upload.absolute_path();
let binary_cache = cache.clone();
let upload_count = Arc::clone(&upload_count);
handles.push(tokio::spawn(async move { info!("uploading: {}", absolute_path);
let uploader = Uploader::new(
&self.signing_key,
path_to_upload,
&self.s3_client,
self.bucket.clone(),
)?;
let fut = tokio::spawn({
let _permit = permits.acquire().await.unwrap(); let _permit = permits.acquire().await.unwrap();
info!("uploading: {}", path_to_upload.to_string()); async move { uploader.upload().await }
if Command::new("nix") });
.arg("copy") uploads.push(fut);
.arg("--to")
.arg(&binary_cache)
.arg(&path_to_upload.to_string())
.output()
.await
.is_err()
{
warn!("upload failed: {}", path_to_upload);
failures.lock().unwrap().push(path_to_upload);
} else { } else {
upload_count.fetch_add(1, Ordering::Relaxed); join_all(uploads).await;
}
}));
} else {
// make sure all threads are done
for handle in handles {
handle.await.unwrap();
}
println!("uploaded {} paths", upload_count.load(Ordering::Relaxed)); println!("uploaded {} paths", upload_count.load(Ordering::Relaxed));
let failures = failures.lock().unwrap(); let failures = failures.lock().unwrap();
@ -131,5 +152,6 @@ impl NixCp {
break; break;
} }
} }
Ok(())
} }
} }

View file

@ -1,7 +1,7 @@
use std::{collections::HashSet, path::Path}; use std::collections::HashSet;
use anyhow::{Context, Result}; use anyhow::{Context, Error, Result};
use log::trace; use log::{debug, error, trace};
use nix_compat::nixhash::CAHash; use nix_compat::nixhash::CAHash;
use nix_compat::store_path::StorePath; use nix_compat::store_path::StorePath;
use regex::Regex; use regex::Regex;
@ -22,7 +22,8 @@ pub struct PathInfo {
impl PathInfo { impl PathInfo {
/// get PathInfo for a package or a store path /// get PathInfo for a package or a store path
pub async fn from_path(path: &str) -> Result<Self> { pub async fn from_path(path: &str) -> Result<Self> {
let path_info = Command::new("nix") debug!("query nix path-info for {path}");
let nix_cmd = Command::new("nix")
.arg("path-info") .arg("path-info")
.arg("--json") .arg("--json")
.arg(path) .arg(path)
@ -30,15 +31,30 @@ impl PathInfo {
.await .await
.context("`nix path-info` failed for {package}")?; .context("`nix path-info` failed for {package}")?;
Ok(serde_json::from_slice(&path_info.stdout)?) // nix path-info returns an array with one element
match serde_json::from_slice::<Vec<_>>(&nix_cmd.stdout)
.context("parse path info from stdout")
{
Ok(path_info) => path_info
.into_iter()
.next()
.ok_or_else(|| Error::msg("nix path-info returned empty")),
Err(e) => {
error!(
"Failed to parse data from `nix path-info`. The path may not exist on your system."
);
Err(e)
}
}
} }
pub async fn get_closure(&self) -> Result<Vec<Self>> { pub async fn get_closure(&self) -> Result<Vec<Self>> {
debug!("query nix-store for {}", self.absolute_path());
let nix_store_cmd = Command::new("nix-store") let nix_store_cmd = Command::new("nix-store")
.arg("--query") .arg("--query")
.arg("--requisites") .arg("--requisites")
.arg("--include-outputs") .arg("--include-outputs")
.arg(self.deriver.to_string()) .arg(self.absolute_path())
.output() .output()
.await .await
.expect("nix-store cmd failed"); .expect("nix-store cmd failed");
@ -67,7 +83,7 @@ impl PathInfo {
return true; return true;
} }
} }
return false; false
} }
fn signees(&self) -> Vec<&str> { fn signees(&self) -> Vec<&str> {
@ -94,9 +110,8 @@ impl PathInfo {
.await .await
.map(|x| x.status()); .map(|x| x.status());
match &res_status { if res_status.map(|code| code.is_success()).unwrap_or_default() {
Ok(status) => return status.is_success(), return true;
Err(_) => return false,
} }
} }
false false
@ -105,6 +120,10 @@ impl PathInfo {
pub fn absolute_path(&self) -> String { pub fn absolute_path(&self) -> String {
self.path.to_absolute_path() self.path.to_absolute_path()
} }
pub fn digest(&self) -> &str {
str::from_utf8(self.path.digest()).expect("digest should be valid string")
}
} }
/* /*

View file

@ -1,36 +1,139 @@
use anyhow::Result; use anyhow::Result;
use async_compression::{Level, tokio::bufread::ZstdEncoder}; use async_compression::{Level, tokio::bufread::ZstdEncoder};
use ed25519_dalek; use aws_sdk_s3::{
self as s3,
types::{CompletedMultipartUpload, CompletedPart},
};
use futures::future::join_all;
use log::debug;
use nix_compat::{ use nix_compat::{
narinfo::{self, NarInfo}, narinfo::{self, NarInfo, SigningKey},
nixbase32, nixbase32,
store_path::StorePath, store_path::StorePath,
}; };
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::fs;
use tokio::{io::AsyncReadExt, process::Command}; use tokio::{io::AsyncReadExt, process::Command};
use crate::path_info::PathInfo; use crate::path_info::PathInfo;
pub struct Uploader { const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5;
signing_key: narinfo::SigningKey<ed25519_dalek::SigningKey>,
pub struct Uploader<'a> {
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
path: PathInfo, path: PathInfo,
compression: Option<String>, s3_client: &'a s3::Client,
bucket: String,
} }
impl Uploader { impl<'a> Uploader<'a> {
pub fn new(key_file: &str, path: PathInfo) -> Result<Self> { pub fn new(
let key = fs::read_to_string(key_file)?; signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
let signing_key = narinfo::parse_keypair(key.as_str())?.0; path: PathInfo,
s3_client: &'a s3::Client,
bucket: String,
) -> Result<Self> {
Ok(Self { Ok(Self {
signing_key, signing_key,
path, path,
// TODO: support other algorithms s3_client,
compression: Some("zstd".to_string()), bucket,
}) })
} }
pub async fn make_nar(&self) -> Result<Vec<u8>> { pub async fn upload(&self) -> Result<()> {
let nar = self.make_nar().await?;
let mut nar_info = self.narinfo_from_nar(&nar)?;
let nar = self.compress_nar(&nar).await;
// update fields that we know after compression
nar_info.file_size = Some(nar.len() as u64);
let mut hasher = Sha256::new();
hasher.update(&nar);
nar_info.file_hash = Some(hasher.finalize().into());
let nar_url = self.nar_url(&nar);
if nar.len() < MULTIPART_CUTOFF {
let put_object = self
.s3_client
.put_object()
.bucket(&self.bucket)
.key(&nar_url)
.body(nar.into())
.send()
.await?;
debug!("put object: {:#?}", put_object);
} else {
let multipart = self
.s3_client
.create_multipart_upload()
.bucket(&self.bucket)
.key(&nar_url)
.send()
.await?;
let upload_id = multipart.upload_id().unwrap();
let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF);
let chunks = nar.array_chunks::<MULTIPART_CUTOFF>();
for (i, chunk) in chunks.enumerate() {
parts.push(tokio::task::spawn(
self.s3_client
.upload_part()
.bucket(&self.bucket)
.key(&nar_url)
.upload_id(upload_id)
.part_number(i as i32 + 1)
.body(chunk.to_vec().into())
.send(),
));
}
let completed_parts = join_all(parts)
.await
.into_iter()
.flatten()
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.enumerate()
.map(|(i, part)| {
CompletedPart::builder()
.set_e_tag(part.e_tag().map(ToString::to_string))
.set_part_number(Some(i as i32 + 1))
.set_checksum_sha256(part.checksum_sha256().map(ToString::to_string))
.build()
})
.collect::<Vec<_>>();
let completed_mp_upload = CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build();
let complete_mp_upload = self
.s3_client
.complete_multipart_upload()
.bucket(&self.bucket)
.key(&nar_url)
.upload_id(upload_id)
.multipart_upload(completed_mp_upload)
.send()
.await?;
debug!("complete multipart upload: {:#?}", complete_mp_upload);
}
nar_info.add_signature(self.signing_key);
self.s3_client
.put_object()
.bucket(&self.bucket)
.key(format!("{}.narinfo", self.path.digest()))
.body(nar_info.to_string().as_bytes().to_vec().into())
.send()
.await?;
Ok(())
}
async fn make_nar(&self) -> Result<Vec<u8>> {
Ok(Command::new("nix") Ok(Command::new("nix")
.arg("nar") .arg("nar")
.arg("dump-path") .arg("dump-path")
@ -40,7 +143,7 @@ impl Uploader {
.stdout) .stdout)
} }
pub fn narinfo_from_nar(&self, nar: &[u8]) -> Result<NarInfo> { fn narinfo_from_nar(&self, nar: &[u8]) -> Result<NarInfo> {
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
hasher.update(nar); hasher.update(nar);
let nar_hash: [u8; 32] = hasher.finalize().into(); let nar_hash: [u8; 32] = hasher.finalize().into();
@ -54,7 +157,7 @@ impl Uploader {
ca: self.path.ca.clone(), ca: self.path.ca.clone(),
system: None, system: None,
deriver: Some(self.path.deriver.as_ref()), deriver: Some(self.path.deriver.as_ref()),
compression: self.compression.as_ref().map(String::as_str), compression: Some("zstd"),
file_hash: None, file_hash: None,
file_size: None, file_size: None,
url: "", url: "",