diff --git a/.gitignore b/.gitignore index 2d5df85..9624092 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target .direnv +result diff --git a/Cargo.lock b/Cargo.lock index d16b9d3..5cf46a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "anyhow" +version = "1.0.97" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" + [[package]] name = "atomic-waker" version = "1.1.2" @@ -799,13 +805,16 @@ dependencies = [ name = "nixcp" version = "0.1.0" dependencies = [ + "anyhow", "clap", "env_logger", "log", + "regex", "reqwest", "serde", "serde_json", "tokio", + "url", ] [[package]] @@ -1469,6 +1478,7 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2428f7e..fe2d773 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,10 +4,13 @@ version = "0.1.0" edition = "2024" [dependencies] +anyhow = "1.0.97" clap = { version = "4.5.34", features = ["derive"] } env_logger = "0.11.7" log = "0.4.27" +regex = "1.11.1" reqwest = "0.12.15" serde = { version = "1.0.219", features = [ "derive" ]} serde_json = "1.0.140" tokio = { version = "1.44.1", features = [ "full" ]} +url = { version = "2.5.4", features = [ "serde" ]} diff --git a/src/cli.rs b/src/cli.rs new file mode 100644 index 0000000..139597f --- /dev/null +++ b/src/cli.rs @@ -0,0 +1,2 @@ + + diff --git a/src/main.rs b/src/main.rs index 27abf93..e2773cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,97 +1,36 @@ #![feature(let_chains)] +#![feature(extend_one)] -use std::path::Path; -use std::sync::{ - Arc, Mutex, RwLock, - atomic::{AtomicUsize, Ordering}, -}; +use anyhow::Result; +use clap::{Parser, Subcommand}; -use clap::Parser; -use log::{debug, trace}; -use serde::{Deserialize, Serialize}; -use tokio::process::Command; -use tokio::sync::{Semaphore, mpsc}; +use nixcp::NixCp; -const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"]; +mod cli; +mod nixcp; +mod path_info; -// nix path-info --derivation --json -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -struct PathInfo { - ca: String, - nar_hash: String, - nar_size: u32, - path: String, - references: Vec, - registration_time: u32, - valid: bool, -} - -impl PathInfo { - // find derivations related to package - async fn from_package(package: &str, recursive: bool) -> Vec { - let mut args = vec!["path-info", "--derivation", "--json"]; - if recursive { - args.push("--recursive"); - } - let path_infos = Command::new("nix") - .args(args) - .arg(package) - .output() - .await - .expect("path-info failed"); - - let path_infos: Vec = serde_json::from_slice(&path_infos.stdout) - .expect("no derivations found for this package"); - debug!("PathInfo's from nix path-info: {:#?}", path_infos); - path_infos - } - - // find store paths related to derivation - async fn get_store_paths(&self) -> Vec { - let mut store_paths: Vec = Vec::new(); - let nix_store_cmd = Command::new("nix-store") - .arg("--query") - .arg("--requisites") - .arg("--include-outputs") - .arg(&self.path) - .output() - .await - .expect("nix-store cmd failed"); - - let nix_store_out = String::from_utf8(nix_store_cmd.stdout).unwrap(); - for store_path in nix_store_out.split_whitespace().map(ToString::to_string) { - store_paths.push(store_path); - } - store_paths - } -} - -#[derive(Parser)] -#[command(version, about, long_about = None)] +#[derive(Parser, Debug)] +#[command(version, name = "nixcp")] struct Cli { - /// Package to upload to the binary cache - package: String, + #[command(subcommand)] + command: Commands, /// Address of the binary cache (passed to nix copy --to) #[arg(long, value_name = "BINARY CACHE")] to: String, /// Upstream cache to check against. Can be specified multiple times. - /// cache.nixos.org is always included - #[arg(long, short)] - upstream_cache: Vec, - - /// Whether to pass --recursive to nix path-info. Can queue a huge number of paths to upload - #[arg(long, short, default_value_t = false)] - recursive: bool, + /// cache.nixos.org is always included (unless --no-nixos-cache is passed) + #[arg(long = "upstream-cache", short)] + upstream_caches: Vec, /// Concurrent upstream cache checkers #[arg(long, default_value_t = 32)] upstream_checker_concurrency: u8, /// Concurrent uploaders - #[arg(long, default_value_t = 16)] + #[arg(long, default_value_t = 4)] uploader_concurrency: u8, /// Concurrent nix-store commands to run @@ -99,173 +38,27 @@ struct Cli { nix_store_concurrency: u8, } +#[derive(Debug, Subcommand)] +enum Commands { + Push { + /// Package or store path to upload + /// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 + package: String, + }, +} + #[tokio::main] -async fn main() { +async fn main() -> Result<()> { env_logger::init(); let cli = Cli::parse(); - let package = &cli.package; - let binary_cache = cli.to; - let mut upstream_caches = cli.upstream_cache; - for upstream in UPSTREAM_CACHES { - upstream_caches.push(upstream.to_string()); - } - debug!("package: {}", package); - debug!("binary cache: {}", binary_cache); - debug!("upstream caches: {:#?}", upstream_caches); + let nixcp = Box::leak(Box::new(NixCp::with_upstreams(&cli.upstream_caches)?)); - println!("querying nix path-info"); - let derivations = PathInfo::from_package(package, cli.recursive).await; - println!("got {} derivations", derivations.len()); - - println!("querying nix-store"); - let mut handles = Vec::new(); - let concurrency = Arc::new(Semaphore::new(cli.nix_store_concurrency.into())); - let store_paths = Arc::new(RwLock::new(Vec::new())); - - for derivation in derivations { - let store_paths = Arc::clone(&store_paths); - let permit = Arc::clone(&concurrency); - handles.push(tokio::spawn(async move { - let _permit = permit.acquire_owned().await.unwrap(); - let paths = derivation.get_store_paths().await; - store_paths.write().unwrap().extend(paths); - })); - } - // resolve store paths for all derivations before we move on - for handle in handles { - handle.await.unwrap(); - } - println!("got {} store paths", store_paths.read().unwrap().len()); - - let (cacheable_tx, mut cacheable_rx) = mpsc::channel(cli.uploader_concurrency.into()); - - println!("spawning check_upstream"); - handles = Vec::new(); - handles.push(tokio::spawn(async move { - check_upstream( - store_paths, - cacheable_tx, - cli.upstream_checker_concurrency, - Arc::new(upstream_caches), - ) - .await; - })); - - println!("spawning uploader"); - handles.push(tokio::spawn(async move { - uploader(&mut cacheable_rx, binary_cache, cli.uploader_concurrency).await; - })); - - // make sure all threads are done - for handle in handles { - handle.await.unwrap(); - } -} - -// filter out store paths that exist in upstream caches -async fn check_upstream( - store_paths: Arc>>, - cacheable_tx: mpsc::Sender, - concurrency: u8, - upstream_caches: Arc>, -) { - let concurrency = Arc::new(Semaphore::new(concurrency.into())); - let c_store_paths = Arc::clone(&store_paths); - let store_paths = c_store_paths.read().unwrap().clone(); - - for store_path in store_paths { - let tx = cacheable_tx.clone(); - let upstream_caches = Arc::clone(&upstream_caches); - let concurrency = Arc::clone(&concurrency); - - tokio::spawn(async move { - let _permit = concurrency.acquire().await.unwrap(); - let basename = Path::new(&store_path) - .file_name() - .unwrap() - .to_str() - .unwrap() - .to_string(); - let hash = basename.split("-").next().unwrap(); - - let mut hit = false; - for upstream in upstream_caches.as_ref() { - let mut uri = upstream.clone(); - uri.push_str(format!("/{}.narinfo", hash).as_str()); - - let res_status = reqwest::Client::new() - .head(uri) - .send() - .await - .map(|x| x.status()); - - if let Ok(res_status) = res_status - && res_status.is_success() - { - debug!("{} was a hit upstream: {}", store_path, upstream); - hit = true; - break; - } - } - if !hit { - trace!("sending {}", store_path); - tx.send(store_path).await.unwrap(); - } - }); - } -} - -async fn uploader( - cacheable_rx: &mut mpsc::Receiver, - binary_cache: String, - concurrency: u8, -) { - let upload_count = Arc::new(AtomicUsize::new(0)); - let failures: Arc>> = Arc::new(Mutex::new(Vec::new())); - let concurrency = Arc::new(Semaphore::new(concurrency.into())); - let mut handles = Vec::new(); - - loop { - if let Some(path_to_upload) = cacheable_rx.recv().await { - let concurrency = Arc::clone(&concurrency); - let failures = Arc::clone(&failures); - let binary_cache = binary_cache.clone(); - let upload_count = Arc::clone(&upload_count); - - handles.push(tokio::spawn(async move { - let _permit = concurrency.acquire().await.unwrap(); - println!("uploading: {}", path_to_upload); - if Command::new("nix") - .arg("copy") - .arg("--to") - .arg(&binary_cache) - .arg(&path_to_upload) - .output() - .await - .is_err() - { - println!("WARN: upload failed: {}", path_to_upload); - failures.lock().unwrap().push(path_to_upload); - } else { - upload_count.fetch_add(1, Ordering::Relaxed); - } - })); - } else { - // make sure all threads are done - for handle in handles { - handle.await.unwrap(); - } - println!("uploaded {} paths", upload_count.load(Ordering::Relaxed)); - - let failures = failures.lock().unwrap(); - if !failures.is_empty() { - println!("failed to upload these paths: "); - for failure in failures.iter() { - print!("{}", failure); - } - println!(); - } - break; + match &cli.command { + Commands::Push { package } => { + nixcp.paths_from_package(package).await?; + nixcp.run().await; } } + + Ok(()) } diff --git a/src/nixcp.rs b/src/nixcp.rs new file mode 100644 index 0000000..057af55 --- /dev/null +++ b/src/nixcp.rs @@ -0,0 +1,135 @@ +use std::{ + iter::once, + sync::{ + Arc, Mutex, + atomic::{AtomicUsize, Ordering}, + }, +}; + +use crate::path_info::PathInfo; +use anyhow::{Context, Result}; +use log::{info, warn}; +use tokio::{ + process::Command, + sync::{RwLock, Semaphore, mpsc}, +}; +use url::Url; + +pub struct NixCp { + upstream_caches: Arc>, + store_paths: Arc>>, +} + +impl NixCp { + pub fn with_upstreams(new_upstreams: &[String]) -> Result { + let mut upstreams = Vec::with_capacity(new_upstreams.len() + 1); + for upstream in new_upstreams + .iter() + .chain(once(&"https://cache.nixos.org".to_string())) + { + upstreams + .push(Url::parse(upstream).context(format!("failed to parse {upstream} as url"))?); + } + Ok(Self { + upstream_caches: Arc::new(upstreams), + store_paths: Arc::new(RwLock::new(Vec::new())), + }) + } + + pub async fn paths_from_package(&mut self, package: &str) -> Result<()> { + let path_info = PathInfo::from_path(package).await?; + self.store_paths + .write() + .await + .extend(path_info.get_closure().await?); + info!("found {} store paths", self.store_paths.read().await.len()); + + Ok(()) + } + + pub async fn run(&'static self) { + let (tx, rx) = mpsc::channel(10); + let tx = Arc::new(tx); + tokio::spawn(self.filter_from_upstream(tx)); + tokio::spawn(self.uploader("".to_string(), rx)); + } + + /// filter paths that are on upstream and send to `tx` + async fn filter_from_upstream(&self, tx: Arc>) { + let permits = Arc::new(Semaphore::new(10)); + let mut handles = Vec::with_capacity(10); + let store_paths = self.store_paths.read().await.clone(); + + for path in store_paths.into_iter() { + if path.check_upstream_signature(&self.upstream_caches) { + continue; + } + handles.push({ + let permits = permits.clone(); + let tx = tx.clone(); + let upstream_caches = self.upstream_caches.clone(); + tokio::spawn(async move { + let _permit = permits.acquire().await.unwrap(); + + if !path.check_upstream_hit(upstream_caches.as_slice()).await { + tx.send(path.to_string()).await.unwrap(); + } + }) + }); + } + + for handle in handles { + handle.await.unwrap(); + } + } + + async fn uploader(&self, cache: String, mut rx: mpsc::Receiver) { + let upload_count = Arc::new(AtomicUsize::new(0)); + let failures: Arc>> = Arc::new(Mutex::new(Vec::new())); + let permits = Arc::new(Semaphore::new(10)); + let mut handles = Vec::with_capacity(10); + + loop { + if let Some(path_to_upload) = rx.recv().await { + let permits = Arc::clone(&permits); + let failures = Arc::clone(&failures); + let binary_cache = cache.clone(); + let upload_count = Arc::clone(&upload_count); + + handles.push(tokio::spawn(async move { + let _permit = permits.acquire().await.unwrap(); + info!("uploading: {}", path_to_upload.to_string()); + if Command::new("nix") + .arg("copy") + .arg("--to") + .arg(&binary_cache) + .arg(&path_to_upload.to_string()) + .output() + .await + .is_err() + { + warn!("upload failed: {}", path_to_upload); + failures.lock().unwrap().push(path_to_upload); + } else { + upload_count.fetch_add(1, Ordering::Relaxed); + } + })); + } else { + // make sure all threads are done + for handle in handles { + handle.await.unwrap(); + } + println!("uploaded {} paths", upload_count.load(Ordering::Relaxed)); + + let failures = failures.lock().unwrap(); + if !failures.is_empty() { + warn!("failed to upload these paths: "); + for failure in failures.iter() { + warn!("{}", failure); + } + } + break; + } + } + } +} diff --git a/src/path_info.rs b/src/path_info.rs new file mode 100644 index 0000000..e1cf25a --- /dev/null +++ b/src/path_info.rs @@ -0,0 +1,161 @@ +use std::{collections::HashSet, path::Path}; + +use anyhow::{Context, Result}; +use log::trace; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use tokio::process::Command; +use url::Url; + +// nix path-info --derivation --json +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PathInfo { + deriver: String, + path: String, + signatures: Vec, +} +impl PathInfo { + /// get PathInfo for a package or a store path + pub async fn from_path(path: &str) -> Result { + let path_info = Command::new("nix") + .arg("path-info") + .arg("--json") + .arg(path) + .output() + .await + .context("`nix path-info` failed for {package}")?; + + Ok(serde_json::from_slice(&path_info.stdout)?) + } + + pub async fn get_closure(&self) -> Result> { + let nix_store_cmd = Command::new("nix-store") + .arg("--query") + .arg("--requisites") + .arg("--include-outputs") + .arg(&self.deriver) + .output() + .await + .expect("nix-store cmd failed"); + + let nix_store_paths = String::from_utf8(nix_store_cmd.stdout)?; + let nix_store_paths: Vec<&str> = nix_store_paths.lines().collect(); + let mut closure = Vec::with_capacity(nix_store_paths.len()); + for path in nix_store_paths { + closure.push(Self::from_path(path).await?); + } + Ok(closure) + } + + pub fn get_path(&self) -> &Path { + &Path::new(&self.path) + } + + /// checks if the path is signed by any upstream. if it is, we assume a cache hit. + /// the name of the cache in the signature does not have to be the domain of the cache. + /// in fact, it can be any random string. but, most often it is, and this saves us + /// a request. + pub fn check_upstream_signature(&self, upstreams: &[Url]) -> bool { + let upstreams: HashSet<_> = upstreams.iter().filter_map(|x| x.domain()).collect(); + + // some caches use names prefixed with - + // e.g. cache.nixos.org-1, nix-community.cachix.org-1 + let re = Regex::new(r"-\d+$").expect("regex should be valid"); + for signee in self.signees().iter().map(|&x| re.replace(x, "")) { + if upstreams.contains(signee.as_ref()) { + return true; + } + } + return false; + } + + fn signees(&self) -> Vec<&str> { + let signees: Vec<_> = self + .signatures + .iter() + .filter_map(|signature| Some(signature.split_once(":")?.0)) + .collect(); + trace!("signees for {}: {:?}", self.path, signees); + signees + } + + pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool { + let basename = self.get_path().file_name().unwrap().to_str().unwrap(); + let hash = basename.split_once("-").unwrap().0; + + for upstream in upstreams { + let upstream = upstream + .join(format!("{hash}/.narinfo").as_str()) + .expect("adding .narinfo should make a valid url"); + let res_status = reqwest::Client::new() + .head(upstream.as_str()) + .send() + .await + .map(|x| x.status()); + + match &res_status { + Ok(status) => return status.is_success(), + Err(_) => return false, + } + } + false + } +} + +impl ToString for PathInfo { + fn to_string(&self) -> String { + self.path.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn get_signees_from_path_info() { + let path_info = PathInfo { + deriver: "".to_string(), + path: "".to_string(), + signatures: vec![ + "cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(), + "nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(), + ], + }; + let signees = path_info.signees(); + assert_eq!(signees, vec!["cache.nixos.org-1", "nixcache.cy7.sh"]); + } + + #[test] + fn match_upstream_cache_from_signature() { + let path_info = PathInfo { + deriver: "".to_string(), + path: "".to_string(), + signatures: vec![ + "cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(), + "nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(), + "nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs=".to_string(), + ], + }; + assert_eq!( + path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()]), + true + ); + assert_eq!( + path_info.check_upstream_signature(&[Url::parse("https://nixcache.cy7.sh").unwrap()]), + true + ); + assert_eq!( + path_info.check_upstream_signature(&[ + Url::parse("https://nix-community.cachix.org").unwrap() + ]), + true + ); + assert_eq!( + path_info + .check_upstream_signature(&[Url::parse("https://fake-cache.cachix.org").unwrap()]), + false + ); + } +}