diff --git a/Cargo.lock b/Cargo.lock index 10459d6..d16b9d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,46 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "clap" +version = "4.5.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e958897981290da2a852763fe9cdb89cd36977a5d729023127095fa94d95e2ff" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83b0f35019843db2160b5bb19ae09b4e6411ac33fc6a712003c33e03090e2489" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09176aae279615badda0765c0c0b3f6ed53f4709118af73cf4655d85d1530cd7" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" + [[package]] name = "colorchoice" version = "1.0.3" @@ -352,6 +392,12 @@ version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "http" version = "1.3.1" @@ -753,6 +799,7 @@ dependencies = [ name = "nixcp" version = "0.1.0" dependencies = [ + "clap", "env_logger", "log", "reqwest", @@ -1200,6 +1247,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "subtle" version = "2.6.1" diff --git a/Cargo.toml b/Cargo.toml index 1cc3006..2428f7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] +clap = { version = "4.5.34", features = ["derive"] } env_logger = "0.11.7" log = "0.4.27" reqwest = "0.12.15" diff --git a/src/main.rs b/src/main.rs index add8884..efdb8a2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,18 @@ #![feature(let_chains)] -use std::process::{Command, Stdio}; +use std::process::Command; use std::sync::mpsc; -use std::{env, path::Path}; +use std::path::Path; +use std::sync::{Mutex, Arc, atomic::{AtomicUsize, Ordering}}; use log::{debug, trace}; use serde::{Deserialize, Serialize}; use serde_json; use tokio::sync::Semaphore; +use clap::Parser; const UPSTREAM_CACHES: &'static [&'static str] = &[ "https://cache.nixos.org", - "https://nix-community.cachix.org", - "https://nixcache.cy7.sh", ]; // nix path-info --derivation --json @@ -63,12 +63,43 @@ impl PathInfo { } } +#[derive(Parser)] +#[command(version, about, long_about = None)] +struct Cli { + /// Package to upload to the binary cache + package: String, + + /// Address of the binary cache (passed to nix copy --to) + #[arg(long, value_name = "BINARY CACHE")] + to: String, + + /// Upstream cache to check against. Can be specified multiple times. + /// cache.nixos.org is always included + #[arg(long, short)] + upstream_cache: Vec, + + /// Concurrent upstream cache checkers + #[arg(long, default_value_t = 50)] + upstream_checker_concurrency: u8, + + /// Concurrent uploaders + #[arg(long, default_value_t = 10)] + uploader_concurrency: u8, +} + #[tokio::main] async fn main() { env_logger::init(); - let args: Vec = env::args().collect(); - let package = &args[1]; + let cli = Cli::parse(); + let package = &cli.package; + let binary_cache = cli.to; + let mut upstream_caches = cli.upstream_cache; + for upstream in UPSTREAM_CACHES { + upstream_caches.push(upstream.to_string()); + } debug!("package: {}", package); + debug!("binary cache: {}", binary_cache); + debug!("upstream caches: {:#?}", upstream_caches); println!("querying nix path-info"); let path_infos = PathInfo::from_package(package); @@ -78,15 +109,15 @@ async fn main() { let (cacheable_tx, cacheable_rx) = mpsc::channel(); let mut handles = Vec::new(); - + println!("spawning check_upstream"); handles.push(tokio::spawn(async move { - check_upstream(store_paths, cacheable_tx).await; + check_upstream(store_paths, cacheable_tx, cli.upstream_checker_concurrency, upstream_caches).await; })); println!("spawning uploader"); handles.push(tokio::spawn(async move { - uploader(cacheable_rx).await; + uploader(cacheable_rx, binary_cache, cli.uploader_concurrency).await; })); // make sure all threads are done @@ -96,8 +127,8 @@ async fn main() { } // filter out store paths that exist in upstream caches -async fn check_upstream(store_paths: Vec, cacheable_tx: mpsc::Sender) { - let concurrent = Semaphore::new(50); +async fn check_upstream(store_paths: Vec, cacheable_tx: mpsc::Sender, concurrency: u8, upstream_caches: Vec) { + let concurrent = Semaphore::new(concurrency.into()); for store_path in store_paths { let _ = concurrent.acquire().await.unwrap(); let tx = cacheable_tx.clone(); @@ -122,7 +153,7 @@ async fn check_upstream(store_paths: Vec, cacheable_tx: mpsc::Sender, cacheable_tx: mpsc::Sender) { - let mut count = 0; - let concurrent = Semaphore::new(10); +async fn uploader(cacheable_rx: mpsc::Receiver, binary_cache: String, concurrency: u8) { + let upload_count = Arc::new(AtomicUsize::new(0)); + let failures: Arc>> = Arc::new(Mutex::new(Vec::new())); + let concurrent = Semaphore::new(concurrency.into()); let mut handles = Vec::new(); loop { if let Ok(path_to_upload) = cacheable_rx.recv() { let _ = concurrent.acquire().await.unwrap(); + let failures = Arc::clone(&failures); + let binary_cache = binary_cache.clone(); + let upload_count = Arc::clone(&upload_count); + handles.push(tokio::spawn(async move { println!("uploading: {}", path_to_upload); if Command::new("nix") .arg("copy") .arg("--to") - .arg("s3://nixcache?endpoint=s3.cy7.sh&secret-key=/home/yt/cache-priv-key.pem") + .arg(&binary_cache.to_string()) .arg(&path_to_upload) .output() .is_err() { println!("WARN: upload failed: {}", path_to_upload); + failures.lock().unwrap().push(path_to_upload); } else { - count += 1; + upload_count.fetch_add(1, Ordering::Relaxed); } })); } else { @@ -162,7 +199,16 @@ async fn uploader(cacheable_rx: mpsc::Receiver) { for handle in handles { handle.await.unwrap(); } - println!("uploaded {} paths", count); + println!("uploaded {} paths", upload_count.load(Ordering::Relaxed)); + + let failures = failures.lock().unwrap(); + if !failures.is_empty() { + println!("failed to upload these paths: "); + for failure in failures.iter() { + print!("{}", failure); + } + println!(); + } break; } }