From f51099b911dc0377f145259fe1ec21d7680de4aa Mon Sep 17 00:00:00 2001 From: cy Date: Sun, 13 Apr 2025 01:33:15 -0400 Subject: [PATCH] reformat and stuff ig --- .gitignore | 1 + Cargo.lock | 10 +++ Cargo.toml | 3 + src/cli.rs | 2 + src/main.rs | 162 +++++++++++++---------------------------------- src/nixcp.rs | 64 +++++++++++++++++++ src/path_info.rs | 155 +++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 279 insertions(+), 118 deletions(-) create mode 100644 src/cli.rs create mode 100644 src/nixcp.rs create mode 100644 src/path_info.rs diff --git a/.gitignore b/.gitignore index 2d5df85..9624092 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target .direnv +result diff --git a/Cargo.lock b/Cargo.lock index d16b9d3..5cf46a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "anyhow" +version = "1.0.97" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" + [[package]] name = "atomic-waker" version = "1.1.2" @@ -799,13 +805,16 @@ dependencies = [ name = "nixcp" version = "0.1.0" dependencies = [ + "anyhow", "clap", "env_logger", "log", + "regex", "reqwest", "serde", "serde_json", "tokio", + "url", ] [[package]] @@ -1469,6 +1478,7 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2428f7e..fe2d773 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,10 +4,13 @@ version = "0.1.0" edition = "2024" [dependencies] +anyhow = "1.0.97" clap = { version = "4.5.34", features = ["derive"] } env_logger = "0.11.7" log = "0.4.27" +regex = "1.11.1" reqwest = "0.12.15" serde = { version = "1.0.219", features = [ "derive" ]} serde_json = "1.0.140" tokio = { version = "1.44.1", features = [ "full" ]} +url = { version = "2.5.4", features = [ "serde" ]} diff --git a/src/cli.rs b/src/cli.rs new file mode 100644 index 0000000..139597f --- /dev/null +++ b/src/cli.rs @@ -0,0 +1,2 @@ + + diff --git a/src/main.rs b/src/main.rs index 27abf93..976efb9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ #![feature(let_chains)] +#![feature(extend_one)] use std::path::Path; use std::sync::{ @@ -6,92 +7,39 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, }; -use clap::Parser; +use anyhow::Result; +use clap::{Parser, Subcommand}; use log::{debug, trace}; -use serde::{Deserialize, Serialize}; use tokio::process::Command; use tokio::sync::{Semaphore, mpsc}; -const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"]; +use nixcp::NixCp; -// nix path-info --derivation --json -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -struct PathInfo { - ca: String, - nar_hash: String, - nar_size: u32, - path: String, - references: Vec, - registration_time: u32, - valid: bool, -} +mod cli; +mod nixcp; +mod path_info; -impl PathInfo { - // find derivations related to package - async fn from_package(package: &str, recursive: bool) -> Vec { - let mut args = vec!["path-info", "--derivation", "--json"]; - if recursive { - args.push("--recursive"); - } - let path_infos = Command::new("nix") - .args(args) - .arg(package) - .output() - .await - .expect("path-info failed"); - - let path_infos: Vec = serde_json::from_slice(&path_infos.stdout) - .expect("no derivations found for this package"); - debug!("PathInfo's from nix path-info: {:#?}", path_infos); - path_infos - } - - // find store paths related to derivation - async fn get_store_paths(&self) -> Vec { - let mut store_paths: Vec = Vec::new(); - let nix_store_cmd = Command::new("nix-store") - .arg("--query") - .arg("--requisites") - .arg("--include-outputs") - .arg(&self.path) - .output() - .await - .expect("nix-store cmd failed"); - - let nix_store_out = String::from_utf8(nix_store_cmd.stdout).unwrap(); - for store_path in nix_store_out.split_whitespace().map(ToString::to_string) { - store_paths.push(store_path); - } - store_paths - } -} - -#[derive(Parser)] -#[command(version, about, long_about = None)] +#[derive(Parser, Debug)] +#[command(version, name = "nixcp")] struct Cli { - /// Package to upload to the binary cache - package: String, + #[command(subcommand)] + command: Commands, /// Address of the binary cache (passed to nix copy --to) #[arg(long, value_name = "BINARY CACHE")] to: String, /// Upstream cache to check against. Can be specified multiple times. - /// cache.nixos.org is always included - #[arg(long, short)] - upstream_cache: Vec, - - /// Whether to pass --recursive to nix path-info. Can queue a huge number of paths to upload - #[arg(long, short, default_value_t = false)] - recursive: bool, + /// cache.nixos.org is always included (unless --no-nixos-cache is passed) + #[arg(long = "upstream-cache", short)] + upstream_caches: Vec, /// Concurrent upstream cache checkers #[arg(long, default_value_t = 32)] upstream_checker_concurrency: u8, /// Concurrent uploaders - #[arg(long, default_value_t = 16)] + #[arg(long, default_value_t = 4)] uploader_concurrency: u8, /// Concurrent nix-store commands to run @@ -99,67 +47,45 @@ struct Cli { nix_store_concurrency: u8, } +#[derive(Debug, Subcommand)] +enum Commands { + Push { + /// Package or store path to upload + /// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 + package: String, + }, +} + #[tokio::main] -async fn main() { +async fn main() -> Result<()> { env_logger::init(); let cli = Cli::parse(); - let package = &cli.package; - let binary_cache = cli.to; - let mut upstream_caches = cli.upstream_cache; - for upstream in UPSTREAM_CACHES { - upstream_caches.push(upstream.to_string()); + let mut nixcp = NixCp::new(); + nixcp.add_upstreams(&cli.upstream_caches)?; + + match &cli.command { + Commands::Push { package } => { + nixcp.paths_from_package(package).await?; + } } - debug!("package: {}", package); - debug!("binary cache: {}", binary_cache); - debug!("upstream caches: {:#?}", upstream_caches); - println!("querying nix path-info"); - let derivations = PathInfo::from_package(package, cli.recursive).await; - println!("got {} derivations", derivations.len()); + Ok(()) - println!("querying nix-store"); - let mut handles = Vec::new(); - let concurrency = Arc::new(Semaphore::new(cli.nix_store_concurrency.into())); - let store_paths = Arc::new(RwLock::new(Vec::new())); + /* + let (cacheable_tx, mut cacheable_rx) = mpsc::channel(cli.uploader_concurrency.into()); - for derivation in derivations { - let store_paths = Arc::clone(&store_paths); - let permit = Arc::clone(&concurrency); + println!("spawning check_upstream"); + + println!("spawning uploader"); handles.push(tokio::spawn(async move { - let _permit = permit.acquire_owned().await.unwrap(); - let paths = derivation.get_store_paths().await; - store_paths.write().unwrap().extend(paths); + uploader(&mut cacheable_rx, binary_cache, cli.uploader_concurrency).await; })); - } - // resolve store paths for all derivations before we move on - for handle in handles { - handle.await.unwrap(); - } - println!("got {} store paths", store_paths.read().unwrap().len()); - let (cacheable_tx, mut cacheable_rx) = mpsc::channel(cli.uploader_concurrency.into()); - - println!("spawning check_upstream"); - handles = Vec::new(); - handles.push(tokio::spawn(async move { - check_upstream( - store_paths, - cacheable_tx, - cli.upstream_checker_concurrency, - Arc::new(upstream_caches), - ) - .await; - })); - - println!("spawning uploader"); - handles.push(tokio::spawn(async move { - uploader(&mut cacheable_rx, binary_cache, cli.uploader_concurrency).await; - })); - - // make sure all threads are done - for handle in handles { - handle.await.unwrap(); - } + // make sure all threads are done + for handle in handles { + handle.await.unwrap(); + } + */ } // filter out store paths that exist in upstream caches diff --git a/src/nixcp.rs b/src/nixcp.rs new file mode 100644 index 0000000..74e9fa5 --- /dev/null +++ b/src/nixcp.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; + +use crate::path_info::PathInfo; +use anyhow::{Context, Result}; +use log::info; +use tokio::sync::{Semaphore, mpsc}; +use url::Url; + +pub struct NixCp { + upstream_caches: Arc>, + store_paths: Vec, +} + +impl NixCp { + pub fn new() -> Self { + Self { + upstream_caches: vec![Url::parse("https://cache.nixos.org").unwrap()], + store_paths: Vec::new(), + } + } + + pub fn add_upstreams(&mut self, upstreams: &[String]) -> Result<()> { + self.upstream_caches.reserve(upstreams.len()); + for upstream in upstreams { + self.upstream_caches + .push(Url::parse(upstream).context(format!("failed to parse {upstream} as url"))?); + } + Ok(()) + } + + pub async fn paths_from_package(&mut self, package: &str) -> Result<()> { + let path_info = PathInfo::from_path(package).await?; + self.store_paths = path_info.get_closure().await?; + info!("found {} store paths", self.store_paths.len()); + + Ok(()) + } + + pub async fn run(&mut self) {} + + /// filter paths that are on upstream and send to `tx` + async fn filter_from_upstream(&self, tx: mpsc::Sender<&PathInfo>) { + let permits = Arc::new(Semaphore::new(10)); + let mut handles = Vec::new(); + for path in &self.store_paths { + if path.check_upstream_signature(&self.upstream_caches) { + continue; + } + let permits = permits.clone(); + let tx = tx.clone(); + handles.push(tokio::spawn(async move { + let _permit = permits.acquire().await.unwrap(); + + if !path.check_upstream_hit(&self.upstream_caches).await { + tx.send(path); + } + })); + } + + for handle in handles { + handle.await.unwrap(); + } + } +} diff --git a/src/path_info.rs b/src/path_info.rs new file mode 100644 index 0000000..1213c1b --- /dev/null +++ b/src/path_info.rs @@ -0,0 +1,155 @@ +use std::{collections::HashSet, path::Path}; + +use anyhow::{Context, Result}; +use log::trace; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use tokio::process::Command; +use url::Url; + +// nix path-info --derivation --json +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PathInfo { + deriver: String, + path: String, + signatures: Vec, +} +impl PathInfo { + /// get PathInfo for a package or a store path + pub async fn from_path(path: &str) -> Result { + let path_info = Command::new("nix") + .arg("path-info") + .arg("--json") + .arg(path) + .output() + .await + .context("`nix path-info` failed for {package}")?; + + Ok(serde_json::from_slice(&path_info.stdout)?) + } + + pub async fn get_closure(&self) -> Result> { + let nix_store_cmd = Command::new("nix-store") + .arg("--query") + .arg("--requisites") + .arg("--include-outputs") + .arg(&self.deriver) + .output() + .await + .expect("nix-store cmd failed"); + + let nix_store_paths = String::from_utf8(nix_store_cmd.stdout)?; + let nix_store_paths: Vec<&str> = nix_store_paths.lines().collect(); + let mut closure = Vec::with_capacity(nix_store_paths.len()); + for path in nix_store_paths { + closure.push(Self::from_path(path).await?); + } + Ok(closure) + } + + pub fn get_path(&self) -> &Path { + &Path::new(&self.path) + } + + /// checks if the path is signed by any upstream. if it is, we assume a cache hit. + /// the name of the cache in the signature does not have to be the domain of the cache. + /// in fact, it can be any random string. but, most often it is, and this saves us + /// a request. + pub fn check_upstream_signature(&self, upstreams: &[Url]) -> bool { + let upstreams: HashSet<_> = upstreams.iter().filter_map(|x| x.domain()).collect(); + + // some caches use names prefixed with - + // e.g. cache.nixos.org-1, nix-community.cachix.org-1 + let re = Regex::new(r"-\d+$").expect("regex should be valid"); + for signee in self.signees().iter().map(|&x| re.replace(x, "")) { + if upstreams.contains(signee.as_ref()) { + return true; + } + } + return false; + } + + fn signees(&self) -> Vec<&str> { + let signees: Vec<_> = self + .signatures + .iter() + .filter_map(|signature| Some(signature.split_once(":")?.0)) + .collect(); + trace!("signees for {}: {:?}", self.path, signees); + signees + } + + pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool { + let basename = self.get_path().file_name().unwrap().to_str().unwrap(); + let hash = basename.split_once("-").unwrap().0; + + for upstream in upstreams { + let upstream = upstream + .join(format!("{hash}/.narinfo").as_str()) + .expect("adding .narinfo should make a valid url"); + let res_status = reqwest::Client::new() + .head(upstream.as_str()) + .send() + .await + .map(|x| x.status()); + + match &res_status { + Ok(status) => return status.is_success(), + Err(_) => return false, + } + } + false + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn get_signees_from_path_info() { + let path_info = PathInfo { + deriver: "".to_string(), + path: "".to_string(), + signatures: vec![ + "cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(), + "nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(), + ], + }; + let signees = path_info.signees(); + assert_eq!(signees, vec!["cache.nixos.org-1", "nixcache.cy7.sh"]); + } + + #[test] + fn match_upstream_cache_from_signature() { + let path_info = PathInfo { + deriver: "".to_string(), + path: "".to_string(), + signatures: vec![ + "cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(), + "nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(), + "nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs=".to_string(), + ], + }; + assert_eq!( + path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()]), + true + ); + assert_eq!( + path_info.check_upstream_signature(&[Url::parse("https://nixcache.cy7.sh").unwrap()]), + true + ); + assert_eq!( + path_info.check_upstream_signature(&[ + Url::parse("https://nix-community.cachix.org").unwrap() + ]), + true + ); + assert_eq!( + path_info + .check_upstream_signature(&[Url::parse("https://fake-cache.cachix.org").unwrap()]), + false + ); + } +}