diff --git a/src/main.rs b/src/main.rs index 976efb9..e2773cc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,8 @@ #![feature(let_chains)] #![feature(extend_one)] -use std::path::Path; -use std::sync::{ - Arc, Mutex, RwLock, - atomic::{AtomicUsize, Ordering}, -}; - use anyhow::Result; use clap::{Parser, Subcommand}; -use log::{debug, trace}; -use tokio::process::Command; -use tokio::sync::{Semaphore, mpsc}; use nixcp::NixCp; @@ -60,138 +51,14 @@ enum Commands { async fn main() -> Result<()> { env_logger::init(); let cli = Cli::parse(); - let mut nixcp = NixCp::new(); - nixcp.add_upstreams(&cli.upstream_caches)?; + let nixcp = Box::leak(Box::new(NixCp::with_upstreams(&cli.upstream_caches)?)); match &cli.command { Commands::Push { package } => { nixcp.paths_from_package(package).await?; + nixcp.run().await; } } Ok(()) - - /* - let (cacheable_tx, mut cacheable_rx) = mpsc::channel(cli.uploader_concurrency.into()); - - println!("spawning check_upstream"); - - println!("spawning uploader"); - handles.push(tokio::spawn(async move { - uploader(&mut cacheable_rx, binary_cache, cli.uploader_concurrency).await; - })); - - // make sure all threads are done - for handle in handles { - handle.await.unwrap(); - } - */ -} - -// filter out store paths that exist in upstream caches -async fn check_upstream( - store_paths: Arc>>, - cacheable_tx: mpsc::Sender, - concurrency: u8, - upstream_caches: Arc>, -) { - let concurrency = Arc::new(Semaphore::new(concurrency.into())); - let c_store_paths = Arc::clone(&store_paths); - let store_paths = c_store_paths.read().unwrap().clone(); - - for store_path in store_paths { - let tx = cacheable_tx.clone(); - let upstream_caches = Arc::clone(&upstream_caches); - let concurrency = Arc::clone(&concurrency); - - tokio::spawn(async move { - let _permit = concurrency.acquire().await.unwrap(); - let basename = Path::new(&store_path) - .file_name() - .unwrap() - .to_str() - .unwrap() - .to_string(); - let hash = basename.split("-").next().unwrap(); - - let mut hit = false; - for upstream in upstream_caches.as_ref() { - let mut uri = upstream.clone(); - uri.push_str(format!("/{}.narinfo", hash).as_str()); - - let res_status = reqwest::Client::new() - .head(uri) - .send() - .await - .map(|x| x.status()); - - if let Ok(res_status) = res_status - && res_status.is_success() - { - debug!("{} was a hit upstream: {}", store_path, upstream); - hit = true; - break; - } - } - if !hit { - trace!("sending {}", store_path); - tx.send(store_path).await.unwrap(); - } - }); - } -} - -async fn uploader( - cacheable_rx: &mut mpsc::Receiver, - binary_cache: String, - concurrency: u8, -) { - let upload_count = Arc::new(AtomicUsize::new(0)); - let failures: Arc>> = Arc::new(Mutex::new(Vec::new())); - let concurrency = Arc::new(Semaphore::new(concurrency.into())); - let mut handles = Vec::new(); - - loop { - if let Some(path_to_upload) = cacheable_rx.recv().await { - let concurrency = Arc::clone(&concurrency); - let failures = Arc::clone(&failures); - let binary_cache = binary_cache.clone(); - let upload_count = Arc::clone(&upload_count); - - handles.push(tokio::spawn(async move { - let _permit = concurrency.acquire().await.unwrap(); - println!("uploading: {}", path_to_upload); - if Command::new("nix") - .arg("copy") - .arg("--to") - .arg(&binary_cache) - .arg(&path_to_upload) - .output() - .await - .is_err() - { - println!("WARN: upload failed: {}", path_to_upload); - failures.lock().unwrap().push(path_to_upload); - } else { - upload_count.fetch_add(1, Ordering::Relaxed); - } - })); - } else { - // make sure all threads are done - for handle in handles { - handle.await.unwrap(); - } - println!("uploaded {} paths", upload_count.load(Ordering::Relaxed)); - - let failures = failures.lock().unwrap(); - if !failures.is_empty() { - println!("failed to upload these paths: "); - for failure in failures.iter() { - print!("{}", failure); - } - println!(); - } - break; - } - } } diff --git a/src/nixcp.rs b/src/nixcp.rs index 74e9fa5..057af55 100644 --- a/src/nixcp.rs +++ b/src/nixcp.rs @@ -1,64 +1,135 @@ -use std::sync::Arc; +use std::{ + iter::once, + sync::{ + Arc, Mutex, + atomic::{AtomicUsize, Ordering}, + }, +}; use crate::path_info::PathInfo; use anyhow::{Context, Result}; -use log::info; -use tokio::sync::{Semaphore, mpsc}; +use log::{info, warn}; +use tokio::{ + process::Command, + sync::{RwLock, Semaphore, mpsc}, +}; use url::Url; pub struct NixCp { upstream_caches: Arc>, - store_paths: Vec, + store_paths: Arc>>, } impl NixCp { - pub fn new() -> Self { - Self { - upstream_caches: vec![Url::parse("https://cache.nixos.org").unwrap()], - store_paths: Vec::new(), - } - } - - pub fn add_upstreams(&mut self, upstreams: &[String]) -> Result<()> { - self.upstream_caches.reserve(upstreams.len()); - for upstream in upstreams { - self.upstream_caches + pub fn with_upstreams(new_upstreams: &[String]) -> Result { + let mut upstreams = Vec::with_capacity(new_upstreams.len() + 1); + for upstream in new_upstreams + .iter() + .chain(once(&"https://cache.nixos.org".to_string())) + { + upstreams .push(Url::parse(upstream).context(format!("failed to parse {upstream} as url"))?); } - Ok(()) + Ok(Self { + upstream_caches: Arc::new(upstreams), + store_paths: Arc::new(RwLock::new(Vec::new())), + }) } pub async fn paths_from_package(&mut self, package: &str) -> Result<()> { let path_info = PathInfo::from_path(package).await?; - self.store_paths = path_info.get_closure().await?; - info!("found {} store paths", self.store_paths.len()); + self.store_paths + .write() + .await + .extend(path_info.get_closure().await?); + info!("found {} store paths", self.store_paths.read().await.len()); Ok(()) } - pub async fn run(&mut self) {} + pub async fn run(&'static self) { + let (tx, rx) = mpsc::channel(10); + let tx = Arc::new(tx); + tokio::spawn(self.filter_from_upstream(tx)); + tokio::spawn(self.uploader("".to_string(), rx)); + } /// filter paths that are on upstream and send to `tx` - async fn filter_from_upstream(&self, tx: mpsc::Sender<&PathInfo>) { + async fn filter_from_upstream(&self, tx: Arc>) { let permits = Arc::new(Semaphore::new(10)); - let mut handles = Vec::new(); - for path in &self.store_paths { + let mut handles = Vec::with_capacity(10); + let store_paths = self.store_paths.read().await.clone(); + + for path in store_paths.into_iter() { if path.check_upstream_signature(&self.upstream_caches) { continue; } - let permits = permits.clone(); - let tx = tx.clone(); - handles.push(tokio::spawn(async move { - let _permit = permits.acquire().await.unwrap(); + handles.push({ + let permits = permits.clone(); + let tx = tx.clone(); + let upstream_caches = self.upstream_caches.clone(); + tokio::spawn(async move { + let _permit = permits.acquire().await.unwrap(); - if !path.check_upstream_hit(&self.upstream_caches).await { - tx.send(path); - } - })); + if !path.check_upstream_hit(upstream_caches.as_slice()).await { + tx.send(path.to_string()).await.unwrap(); + } + }) + }); } for handle in handles { handle.await.unwrap(); } } + + async fn uploader(&self, cache: String, mut rx: mpsc::Receiver) { + let upload_count = Arc::new(AtomicUsize::new(0)); + let failures: Arc>> = Arc::new(Mutex::new(Vec::new())); + let permits = Arc::new(Semaphore::new(10)); + let mut handles = Vec::with_capacity(10); + + loop { + if let Some(path_to_upload) = rx.recv().await { + let permits = Arc::clone(&permits); + let failures = Arc::clone(&failures); + let binary_cache = cache.clone(); + let upload_count = Arc::clone(&upload_count); + + handles.push(tokio::spawn(async move { + let _permit = permits.acquire().await.unwrap(); + info!("uploading: {}", path_to_upload.to_string()); + if Command::new("nix") + .arg("copy") + .arg("--to") + .arg(&binary_cache) + .arg(&path_to_upload.to_string()) + .output() + .await + .is_err() + { + warn!("upload failed: {}", path_to_upload); + failures.lock().unwrap().push(path_to_upload); + } else { + upload_count.fetch_add(1, Ordering::Relaxed); + } + })); + } else { + // make sure all threads are done + for handle in handles { + handle.await.unwrap(); + } + println!("uploaded {} paths", upload_count.load(Ordering::Relaxed)); + + let failures = failures.lock().unwrap(); + if !failures.is_empty() { + warn!("failed to upload these paths: "); + for failure in failures.iter() { + warn!("{}", failure); + } + } + break; + } + } + } } diff --git a/src/path_info.rs b/src/path_info.rs index 1213c1b..e1cf25a 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -8,7 +8,7 @@ use tokio::process::Command; use url::Url; // nix path-info --derivation --json -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct PathInfo { deriver: String, @@ -103,6 +103,12 @@ impl PathInfo { } } +impl ToString for PathInfo { + fn to_string(&self) -> String { + self.path.clone() + } +} + #[cfg(test)] mod tests { use super::*;