From f51099b911dc0377f145259fe1ec21d7680de4aa Mon Sep 17 00:00:00 2001 From: cy Date: Sun, 13 Apr 2025 01:33:15 -0400 Subject: [PATCH 1/2] reformat and stuff ig --- .gitignore | 1 + Cargo.lock | 10 +++ Cargo.toml | 3 + src/cli.rs | 2 + src/main.rs | 162 +++++++++++++---------------------------------- src/nixcp.rs | 64 +++++++++++++++++++ src/path_info.rs | 155 +++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 279 insertions(+), 118 deletions(-) create mode 100644 src/cli.rs create mode 100644 src/nixcp.rs create mode 100644 src/path_info.rs diff --git a/.gitignore b/.gitignore index 2d5df85..9624092 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target .direnv +result diff --git a/Cargo.lock b/Cargo.lock index d16b9d3..5cf46a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "anyhow" +version = "1.0.97" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" + [[package]] name = "atomic-waker" version = "1.1.2" @@ -799,13 +805,16 @@ dependencies = [ name = "nixcp" version = "0.1.0" dependencies = [ + "anyhow", "clap", "env_logger", "log", + "regex", "reqwest", "serde", "serde_json", "tokio", + "url", ] [[package]] @@ -1469,6 +1478,7 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2428f7e..fe2d773 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,10 +4,13 @@ version = "0.1.0" edition = "2024" [dependencies] +anyhow = "1.0.97" clap = { version = "4.5.34", features = ["derive"] } env_logger = "0.11.7" log = "0.4.27" +regex = "1.11.1" reqwest = "0.12.15" serde = { version = "1.0.219", features = [ "derive" ]} serde_json = "1.0.140" tokio = { version = "1.44.1", features = [ "full" ]} +url = { version = "2.5.4", features = [ "serde" ]} diff --git a/src/cli.rs b/src/cli.rs new file mode 100644 index 0000000..139597f --- /dev/null +++ b/src/cli.rs @@ -0,0 +1,2 @@ + + diff --git a/src/main.rs b/src/main.rs index 27abf93..976efb9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ #![feature(let_chains)] +#![feature(extend_one)] use std::path::Path; use std::sync::{ @@ -6,92 +7,39 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, }; -use clap::Parser; +use anyhow::Result; +use clap::{Parser, Subcommand}; use log::{debug, trace}; -use serde::{Deserialize, Serialize}; use tokio::process::Command; use tokio::sync::{Semaphore, mpsc}; -const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"]; +use nixcp::NixCp; -// nix path-info --derivation --json -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -struct PathInfo { - ca: String, - nar_hash: String, - nar_size: u32, - path: String, - references: Vec, - registration_time: u32, - valid: bool, -} +mod cli; +mod nixcp; +mod path_info; -impl PathInfo { - // find derivations related to package - async fn from_package(package: &str, recursive: bool) -> Vec { - let mut args = vec!["path-info", "--derivation", "--json"]; - if recursive { - args.push("--recursive"); - } - let path_infos = Command::new("nix") - .args(args) - .arg(package) - .output() - .await - .expect("path-info failed"); - - let path_infos: Vec = serde_json::from_slice(&path_infos.stdout) - .expect("no derivations found for this package"); - debug!("PathInfo's from nix path-info: {:#?}", path_infos); - path_infos - } - - // find store paths related to derivation - async fn get_store_paths(&self) -> Vec { - let mut store_paths: Vec = Vec::new(); - let nix_store_cmd = Command::new("nix-store") - .arg("--query") - .arg("--requisites") - .arg("--include-outputs") - .arg(&self.path) - .output() - .await - .expect("nix-store cmd failed"); - - let nix_store_out = String::from_utf8(nix_store_cmd.stdout).unwrap(); - for store_path in nix_store_out.split_whitespace().map(ToString::to_string) { - store_paths.push(store_path); - } - store_paths - } -} - -#[derive(Parser)] -#[command(version, about, long_about = None)] +#[derive(Parser, Debug)] +#[command(version, name = "nixcp")] struct Cli { - /// Package to upload to the binary cache - package: String, + #[command(subcommand)] + command: Commands, /// Address of the binary cache (passed to nix copy --to) #[arg(long, value_name = "BINARY CACHE")] to: String, /// Upstream cache to check against. Can be specified multiple times. - /// cache.nixos.org is always included - #[arg(long, short)] - upstream_cache: Vec, - - /// Whether to pass --recursive to nix path-info. Can queue a huge number of paths to upload - #[arg(long, short, default_value_t = false)] - recursive: bool, + /// cache.nixos.org is always included (unless --no-nixos-cache is passed) + #[arg(long = "upstream-cache", short)] + upstream_caches: Vec, /// Concurrent upstream cache checkers #[arg(long, default_value_t = 32)] upstream_checker_concurrency: u8, /// Concurrent uploaders - #[arg(long, default_value_t = 16)] + #[arg(long, default_value_t = 4)] uploader_concurrency: u8, /// Concurrent nix-store commands to run @@ -99,67 +47,45 @@ struct Cli { nix_store_concurrency: u8, } +#[derive(Debug, Subcommand)] +enum Commands { + Push { + /// Package or store path to upload + /// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 + package: String, + }, +} + #[tokio::main] -async fn main() { +async fn main() -> Result<()> { env_logger::init(); let cli = Cli::parse(); - let package = &cli.package; - let binary_cache = cli.to; - let mut upstream_caches = cli.upstream_cache; - for upstream in UPSTREAM_CACHES { - upstream_caches.push(upstream.to_string()); + let mut nixcp = NixCp::new(); + nixcp.add_upstreams(&cli.upstream_caches)?; + + match &cli.command { + Commands::Push { package } => { + nixcp.paths_from_package(package).await?; + } } - debug!("package: {}", package); - debug!("binary cache: {}", binary_cache); - debug!("upstream caches: {:#?}", upstream_caches); - println!("querying nix path-info"); - let derivations = PathInfo::from_package(package, cli.recursive).await; - println!("got {} derivations", derivations.len()); + Ok(()) - println!("querying nix-store"); - let mut handles = Vec::new(); - let concurrency = Arc::new(Semaphore::new(cli.nix_store_concurrency.into())); - let store_paths = Arc::new(RwLock::new(Vec::new())); + /* + let (cacheable_tx, mut cacheable_rx) = mpsc::channel(cli.uploader_concurrency.into()); - for derivation in derivations { - let store_paths = Arc::clone(&store_paths); - let permit = Arc::clone(&concurrency); + println!("spawning check_upstream"); + + println!("spawning uploader"); handles.push(tokio::spawn(async move { - let _permit = permit.acquire_owned().await.unwrap(); - let paths = derivation.get_store_paths().await; - store_paths.write().unwrap().extend(paths); + uploader(&mut cacheable_rx, binary_cache, cli.uploader_concurrency).await; })); - } - // resolve store paths for all derivations before we move on - for handle in handles { - handle.await.unwrap(); - } - println!("got {} store paths", store_paths.read().unwrap().len()); - let (cacheable_tx, mut cacheable_rx) = mpsc::channel(cli.uploader_concurrency.into()); - - println!("spawning check_upstream"); - handles = Vec::new(); - handles.push(tokio::spawn(async move { - check_upstream( - store_paths, - cacheable_tx, - cli.upstream_checker_concurrency, - Arc::new(upstream_caches), - ) - .await; - })); - - println!("spawning uploader"); - handles.push(tokio::spawn(async move { - uploader(&mut cacheable_rx, binary_cache, cli.uploader_concurrency).await; - })); - - // make sure all threads are done - for handle in handles { - handle.await.unwrap(); - } + // make sure all threads are done + for handle in handles { + handle.await.unwrap(); + } + */ } // filter out store paths that exist in upstream caches diff --git a/src/nixcp.rs b/src/nixcp.rs new file mode 100644 index 0000000..74e9fa5 --- /dev/null +++ b/src/nixcp.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; + +use crate::path_info::PathInfo; +use anyhow::{Context, Result}; +use log::info; +use tokio::sync::{Semaphore, mpsc}; +use url::Url; + +pub struct NixCp { + upstream_caches: Arc>, + store_paths: Vec, +} + +impl NixCp { + pub fn new() -> Self { + Self { + upstream_caches: vec![Url::parse("https://cache.nixos.org").unwrap()], + store_paths: Vec::new(), + } + } + + pub fn add_upstreams(&mut self, upstreams: &[String]) -> Result<()> { + self.upstream_caches.reserve(upstreams.len()); + for upstream in upstreams { + self.upstream_caches + .push(Url::parse(upstream).context(format!("failed to parse {upstream} as url"))?); + } + Ok(()) + } + + pub async fn paths_from_package(&mut self, package: &str) -> Result<()> { + let path_info = PathInfo::from_path(package).await?; + self.store_paths = path_info.get_closure().await?; + info!("found {} store paths", self.store_paths.len()); + + Ok(()) + } + + pub async fn run(&mut self) {} + + /// filter paths that are on upstream and send to `tx` + async fn filter_from_upstream(&self, tx: mpsc::Sender<&PathInfo>) { + let permits = Arc::new(Semaphore::new(10)); + let mut handles = Vec::new(); + for path in &self.store_paths { + if path.check_upstream_signature(&self.upstream_caches) { + continue; + } + let permits = permits.clone(); + let tx = tx.clone(); + handles.push(tokio::spawn(async move { + let _permit = permits.acquire().await.unwrap(); + + if !path.check_upstream_hit(&self.upstream_caches).await { + tx.send(path); + } + })); + } + + for handle in handles { + handle.await.unwrap(); + } + } +} diff --git a/src/path_info.rs b/src/path_info.rs new file mode 100644 index 0000000..1213c1b --- /dev/null +++ b/src/path_info.rs @@ -0,0 +1,155 @@ +use std::{collections::HashSet, path::Path}; + +use anyhow::{Context, Result}; +use log::trace; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use tokio::process::Command; +use url::Url; + +// nix path-info --derivation --json +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PathInfo { + deriver: String, + path: String, + signatures: Vec, +} +impl PathInfo { + /// get PathInfo for a package or a store path + pub async fn from_path(path: &str) -> Result { + let path_info = Command::new("nix") + .arg("path-info") + .arg("--json") + .arg(path) + .output() + .await + .context("`nix path-info` failed for {package}")?; + + Ok(serde_json::from_slice(&path_info.stdout)?) + } + + pub async fn get_closure(&self) -> Result> { + let nix_store_cmd = Command::new("nix-store") + .arg("--query") + .arg("--requisites") + .arg("--include-outputs") + .arg(&self.deriver) + .output() + .await + .expect("nix-store cmd failed"); + + let nix_store_paths = String::from_utf8(nix_store_cmd.stdout)?; + let nix_store_paths: Vec<&str> = nix_store_paths.lines().collect(); + let mut closure = Vec::with_capacity(nix_store_paths.len()); + for path in nix_store_paths { + closure.push(Self::from_path(path).await?); + } + Ok(closure) + } + + pub fn get_path(&self) -> &Path { + &Path::new(&self.path) + } + + /// checks if the path is signed by any upstream. if it is, we assume a cache hit. + /// the name of the cache in the signature does not have to be the domain of the cache. + /// in fact, it can be any random string. but, most often it is, and this saves us + /// a request. + pub fn check_upstream_signature(&self, upstreams: &[Url]) -> bool { + let upstreams: HashSet<_> = upstreams.iter().filter_map(|x| x.domain()).collect(); + + // some caches use names prefixed with - + // e.g. cache.nixos.org-1, nix-community.cachix.org-1 + let re = Regex::new(r"-\d+$").expect("regex should be valid"); + for signee in self.signees().iter().map(|&x| re.replace(x, "")) { + if upstreams.contains(signee.as_ref()) { + return true; + } + } + return false; + } + + fn signees(&self) -> Vec<&str> { + let signees: Vec<_> = self + .signatures + .iter() + .filter_map(|signature| Some(signature.split_once(":")?.0)) + .collect(); + trace!("signees for {}: {:?}", self.path, signees); + signees + } + + pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool { + let basename = self.get_path().file_name().unwrap().to_str().unwrap(); + let hash = basename.split_once("-").unwrap().0; + + for upstream in upstreams { + let upstream = upstream + .join(format!("{hash}/.narinfo").as_str()) + .expect("adding .narinfo should make a valid url"); + let res_status = reqwest::Client::new() + .head(upstream.as_str()) + .send() + .await + .map(|x| x.status()); + + match &res_status { + Ok(status) => return status.is_success(), + Err(_) => return false, + } + } + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn get_signees_from_path_info() { + let path_info = PathInfo { + deriver: "".to_string(), + path: "".to_string(), + signatures: vec![ + "cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(), + "nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(), + ], + }; + let signees = path_info.signees(); + assert_eq!(signees, vec!["cache.nixos.org-1", "nixcache.cy7.sh"]); + } + + #[test] + fn match_upstream_cache_from_signature() { + let path_info = PathInfo { + deriver: "".to_string(), + path: "".to_string(), + signatures: vec![ + "cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(), + "nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(), + "nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs=".to_string(), + ], + }; + assert_eq!( + path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()]), + true + ); + assert_eq!( + path_info.check_upstream_signature(&[Url::parse("https://nixcache.cy7.sh").unwrap()]), + true + ); + assert_eq!( + path_info.check_upstream_signature(&[ + Url::parse("https://nix-community.cachix.org").unwrap() + ]), + true + ); + assert_eq!( + path_info + .check_upstream_signature(&[Url::parse("https://fake-cache.cachix.org").unwrap()]), + false + ); + } +} From e58cf2bbd0451b598c5bc722b8ab3ce6bc8388e5 Mon Sep 17 00:00:00 2001 From: cy Date: Sun, 13 Apr 2025 03:27:55 -0400 Subject: [PATCH 2/2] maybe done refactoring but not tested --- src/main.rs | 137 +---------------------------------------------- src/nixcp.rs | 131 +++++++++++++++++++++++++++++++++----------- src/path_info.rs | 8 ++- 3 files changed, 110 insertions(+), 166 deletions(-) diff --git a/src/main.rs b/src/main.rs index 976efb9..e2773cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,8 @@ #![feature(let_chains)] #![feature(extend_one)] -use std::path::Path; -use std::sync::{ - Arc, Mutex, RwLock, - atomic::{AtomicUsize, Ordering}, -}; - use anyhow::Result; use clap::{Parser, Subcommand}; -use log::{debug, trace}; -use tokio::process::Command; -use tokio::sync::{Semaphore, mpsc}; use nixcp::NixCp; @@ -60,138 +51,14 @@ enum Commands { async fn main() -> Result<()> { env_logger::init(); let cli = Cli::parse(); - let mut nixcp = NixCp::new(); - nixcp.add_upstreams(&cli.upstream_caches)?; + let nixcp = Box::leak(Box::new(NixCp::with_upstreams(&cli.upstream_caches)?)); match &cli.command { Commands::Push { package } => { nixcp.paths_from_package(package).await?; + nixcp.run().await; } } Ok(()) - - /* - let (cacheable_tx, mut cacheable_rx) = mpsc::channel(cli.uploader_concurrency.into()); - - println!("spawning check_upstream"); - - println!("spawning uploader"); - handles.push(tokio::spawn(async move { - uploader(&mut cacheable_rx, binary_cache, cli.uploader_concurrency).await; - })); - - // make sure all threads are done - for handle in handles { - handle.await.unwrap(); - } - */ -} - -// filter out store paths that exist in upstream caches -async fn check_upstream( - store_paths: Arc>>, - cacheable_tx: mpsc::Sender, - concurrency: u8, - upstream_caches: Arc>, -) { - let concurrency = Arc::new(Semaphore::new(concurrency.into())); - let c_store_paths = Arc::clone(&store_paths); - let store_paths = c_store_paths.read().unwrap().clone(); - - for store_path in store_paths { - let tx = cacheable_tx.clone(); - let upstream_caches = Arc::clone(&upstream_caches); - let concurrency = Arc::clone(&concurrency); - - tokio::spawn(async move { - let _permit = concurrency.acquire().await.unwrap(); - let basename = Path::new(&store_path) - .file_name() - .unwrap() - .to_str() - .unwrap() - .to_string(); - let hash = basename.split("-").next().unwrap(); - - let mut hit = false; - for upstream in upstream_caches.as_ref() { - let mut uri = upstream.clone(); - uri.push_str(format!("/{}.narinfo", hash).as_str()); - - let res_status = reqwest::Client::new() - .head(uri) - .send() - .await - .map(|x| x.status()); - - if let Ok(res_status) = res_status - && res_status.is_success() - { - debug!("{} was a hit upstream: {}", store_path, upstream); - hit = true; - break; - } - } - if !hit { - trace!("sending {}", store_path); - tx.send(store_path).await.unwrap(); - } - }); - } -} - -async fn uploader( - cacheable_rx: &mut mpsc::Receiver, - binary_cache: String, - concurrency: u8, -) { - let upload_count = Arc::new(AtomicUsize::new(0)); - let failures: Arc>> = Arc::new(Mutex::new(Vec::new())); - let concurrency = Arc::new(Semaphore::new(concurrency.into())); - let mut handles = Vec::new(); - - loop { - if let Some(path_to_upload) = cacheable_rx.recv().await { - let concurrency = Arc::clone(&concurrency); - let failures = Arc::clone(&failures); - let binary_cache = binary_cache.clone(); - let upload_count = Arc::clone(&upload_count); - - handles.push(tokio::spawn(async move { - let _permit = concurrency.acquire().await.unwrap(); - println!("uploading: {}", path_to_upload); - if Command::new("nix") - .arg("copy") - .arg("--to") - .arg(&binary_cache) - .arg(&path_to_upload) - .output() - .await - .is_err() - { - println!("WARN: upload failed: {}", path_to_upload); - failures.lock().unwrap().push(path_to_upload); - } else { - upload_count.fetch_add(1, Ordering::Relaxed); - } - })); - } else { - // make sure all threads are done - for handle in handles { - handle.await.unwrap(); - } - println!("uploaded {} paths", upload_count.load(Ordering::Relaxed)); - - let failures = failures.lock().unwrap(); - if !failures.is_empty() { - println!("failed to upload these paths: "); - for failure in failures.iter() { - print!("{}", failure); - } - println!(); - } - break; - } - } } diff --git a/src/nixcp.rs b/src/nixcp.rs index 74e9fa5..057af55 100644 --- a/src/nixcp.rs +++ b/src/nixcp.rs @@ -1,64 +1,135 @@ -use std::sync::Arc; +use std::{ + iter::once, + sync::{ + Arc, Mutex, + atomic::{AtomicUsize, Ordering}, + }, +}; use crate::path_info::PathInfo; use anyhow::{Context, Result}; -use log::info; -use tokio::sync::{Semaphore, mpsc}; +use log::{info, warn}; +use tokio::{ + process::Command, + sync::{RwLock, Semaphore, mpsc}, +}; use url::Url; pub struct NixCp { upstream_caches: Arc>, - store_paths: Vec, + store_paths: Arc>>, } impl NixCp { - pub fn new() -> Self { - Self { - upstream_caches: vec![Url::parse("https://cache.nixos.org").unwrap()], - store_paths: Vec::new(), - } - } - - pub fn add_upstreams(&mut self, upstreams: &[String]) -> Result<()> { - self.upstream_caches.reserve(upstreams.len()); - for upstream in upstreams { - self.upstream_caches + pub fn with_upstreams(new_upstreams: &[String]) -> Result { + let mut upstreams = Vec::with_capacity(new_upstreams.len() + 1); + for upstream in new_upstreams + .iter() + .chain(once(&"https://cache.nixos.org".to_string())) + { + upstreams .push(Url::parse(upstream).context(format!("failed to parse {upstream} as url"))?); } - Ok(()) + Ok(Self { + upstream_caches: Arc::new(upstreams), + store_paths: Arc::new(RwLock::new(Vec::new())), + }) } pub async fn paths_from_package(&mut self, package: &str) -> Result<()> { let path_info = PathInfo::from_path(package).await?; - self.store_paths = path_info.get_closure().await?; - info!("found {} store paths", self.store_paths.len()); + self.store_paths + .write() + .await + .extend(path_info.get_closure().await?); + info!("found {} store paths", self.store_paths.read().await.len()); Ok(()) } - pub async fn run(&mut self) {} + pub async fn run(&'static self) { + let (tx, rx) = mpsc::channel(10); + let tx = Arc::new(tx); + tokio::spawn(self.filter_from_upstream(tx)); + tokio::spawn(self.uploader("".to_string(), rx)); + } /// filter paths that are on upstream and send to `tx` - async fn filter_from_upstream(&self, tx: mpsc::Sender<&PathInfo>) { + async fn filter_from_upstream(&self, tx: Arc>) { let permits = Arc::new(Semaphore::new(10)); - let mut handles = Vec::new(); - for path in &self.store_paths { + let mut handles = Vec::with_capacity(10); + let store_paths = self.store_paths.read().await.clone(); + + for path in store_paths.into_iter() { if path.check_upstream_signature(&self.upstream_caches) { continue; } - let permits = permits.clone(); - let tx = tx.clone(); - handles.push(tokio::spawn(async move { - let _permit = permits.acquire().await.unwrap(); + handles.push({ + let permits = permits.clone(); + let tx = tx.clone(); + let upstream_caches = self.upstream_caches.clone(); + tokio::spawn(async move { + let _permit = permits.acquire().await.unwrap(); - if !path.check_upstream_hit(&self.upstream_caches).await { - tx.send(path); - } - })); + if !path.check_upstream_hit(upstream_caches.as_slice()).await { + tx.send(path.to_string()).await.unwrap(); + } + }) + }); } for handle in handles { handle.await.unwrap(); } } + + async fn uploader(&self, cache: String, mut rx: mpsc::Receiver) { + let upload_count = Arc::new(AtomicUsize::new(0)); + let failures: Arc>> = Arc::new(Mutex::new(Vec::new())); + let permits = Arc::new(Semaphore::new(10)); + let mut handles = Vec::with_capacity(10); + + loop { + if let Some(path_to_upload) = rx.recv().await { + let permits = Arc::clone(&permits); + let failures = Arc::clone(&failures); + let binary_cache = cache.clone(); + let upload_count = Arc::clone(&upload_count); + + handles.push(tokio::spawn(async move { + let _permit = permits.acquire().await.unwrap(); + info!("uploading: {}", path_to_upload.to_string()); + if Command::new("nix") + .arg("copy") + .arg("--to") + .arg(&binary_cache) + .arg(&path_to_upload.to_string()) + .output() + .await + .is_err() + { + warn!("upload failed: {}", path_to_upload); + failures.lock().unwrap().push(path_to_upload); + } else { + upload_count.fetch_add(1, Ordering::Relaxed); + } + })); + } else { + // make sure all threads are done + for handle in handles { + handle.await.unwrap(); + } + println!("uploaded {} paths", upload_count.load(Ordering::Relaxed)); + + let failures = failures.lock().unwrap(); + if !failures.is_empty() { + warn!("failed to upload these paths: "); + for failure in failures.iter() { + warn!("{}", failure); + } + } + break; + } + } + } } diff --git a/src/path_info.rs b/src/path_info.rs index 1213c1b..e1cf25a 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -8,7 +8,7 @@ use tokio::process::Command; use url::Url; // nix path-info --derivation --json -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct PathInfo { deriver: String, @@ -103,6 +103,12 @@ impl PathInfo { } } +impl ToString for PathInfo { + fn to_string(&self) -> String { + self.path.clone() + } +} + #[cfg(test)] mod tests { use super::*;