diff --git a/src/main.rs b/src/main.rs index 171b390..a7eb618 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,10 @@ +use std::process::{Command, Stdio}; +use std::sync::mpsc; use std::{env, path::Path}; -use std::process::Command; +use log::{debug, trace}; use serde::{Deserialize, Serialize}; use serde_json; -use log::debug; use tokio; const UPSTREAM_CACHES: &'static [&'static str] = &[ @@ -66,18 +67,81 @@ async fn main() { let args: Vec = env::args().collect(); let package = &args[1]; debug!("package: {}", package); + + println!("querying nix path-info"); let path_infos = PathInfo::from_package(package); - // filter out store paths that exist in upstream caches + println!("querying nix-store"); let store_paths = path_infos[0].get_store_paths(); + let (cacheable_tx, cacheable_rx) = mpsc::channel(); + + println!("spawning check_upstream"); + tokio::spawn(async move { + check_upstream(store_paths, cacheable_tx).await; + }); + + println!("spawning uploader"); + tokio::spawn(async move { + uploader(cacheable_rx).await; + }).await.unwrap(); +} + +// filter out store paths that exist in upstream caches +async fn check_upstream(store_paths: Vec, cacheable_tx: mpsc::Sender) { for store_path in store_paths { - let basename = Path::new(&store_path).file_name().unwrap().to_str().unwrap().to_string(); + let basename = Path::new(&store_path) + .file_name() + .unwrap() + .to_str() + .unwrap() + .to_string(); let hash = basename.split("-").nth(0).unwrap(); + + let mut hit = false; for upstream in UPSTREAM_CACHES { let mut uri = String::from(*upstream); uri.push_str(format!("/{}.narinfo", hash).as_str()); - let res_status = reqwest::Client::new().head(uri).send().await.unwrap().status(); - println!("{} responded with {}", *upstream, res_status); + + let res_status = reqwest::Client::new() + .head(uri) + .send() + .await + .unwrap() + .status(); + + if res_status.is_success() { + debug!("{} was a hit upstream: {}", store_path, upstream); + hit = true; + break; + } + } + if !hit { + trace!("sending {}", store_path); + cacheable_tx.send(store_path).unwrap(); + } + } +} + +async fn uploader(cacheable_rx: mpsc::Receiver) { + let mut count = 0; + loop { + if let Ok(path_to_upload) = cacheable_rx.recv() { + trace!("to upload: {}", path_to_upload); + if Command::new("nix") + .arg("copy") + .arg("--to") + .arg("s3://nixcache?endpoint=s3.cy7.sh&secret-key=/home/yt/cache-priv-key.pem") + .arg(&path_to_upload) + .output() + .is_err() + { + println!("WARN: upload failed: {}", path_to_upload); + } else { + count += 1; + } + } else { + println!("uploaded {} paths", count); + break; } } }