diff --git a/Cargo.lock b/Cargo.lock index 30716eb..d16b9d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -730,6 +730,16 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.27" @@ -857,6 +867,29 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.52.6", +] + [[package]] name = "percent-encoding" version = "2.3.1" @@ -920,6 +953,15 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +[[package]] +name = "redox_syscall" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.11.1" @@ -1086,6 +1128,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "security-framework" version = "2.11.1" @@ -1296,6 +1344,7 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", diff --git a/Cargo.toml b/Cargo.toml index a037e6f..2428f7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,4 +10,4 @@ log = "0.4.27" reqwest = "0.12.15" serde = { version = "1.0.219", features = [ "derive" ]} serde_json = "1.0.140" -tokio = { version = "1.44.1", features = [ "rt", "rt-multi-thread", "macros", "sync", "process" ]} +tokio = { version = "1.44.1", features = [ "full" ]} diff --git a/flake.nix b/flake.nix index c263f49..38af66d 100644 --- a/flake.nix +++ b/flake.nix @@ -20,17 +20,15 @@ (import inputs.rust-overlay) ]; }; - toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml; - craneLib = (crane.mkLib pkgs).overrideToolchain(_: toolchain); + craneLib = (crane.mkLib pkgs).overrideToolchain(p: p.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml); in { - devShells.default = pkgs.mkShell { + devShells.default = craneLib.devShell { nativeBuildInputs = with pkgs; [ pkg-config ]; buildInputs = with pkgs; [ openssl - toolchain ]; }; diff --git a/src/main.rs b/src/main.rs index 51c3134..819460a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,16 @@ #![feature(let_chains)] use std::path::Path; +use std::process::Command; use std::sync::mpsc; use std::sync::{ - Arc, Mutex, RwLock, + Arc, Mutex, atomic::{AtomicUsize, Ordering}, }; use clap::Parser; use log::{debug, trace}; use serde::{Deserialize, Serialize}; -use tokio::process::Command; use tokio::sync::Semaphore; const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"]; @@ -30,15 +30,13 @@ struct PathInfo { impl PathInfo { // find derivations related to package - async fn from_package(package: &str) -> Vec { + fn from_package(package: &str) -> Vec { let path_infos = Command::new("nix") .arg("path-info") .arg("--derivation") - .arg("--recursive") .arg("--json") .arg(package) .output() - .await .expect("path-info failed"); let path_infos: Vec = serde_json::from_slice(&path_infos.stdout).unwrap(); @@ -47,7 +45,7 @@ impl PathInfo { } // find store paths related to derivation - async fn get_store_paths(&self) -> Vec { + fn get_store_paths(&self) -> Vec { let mut store_paths: Vec = Vec::new(); let nix_store_cmd = Command::new("nix-store") .arg("--query") @@ -55,7 +53,6 @@ impl PathInfo { .arg("--include-outputs") .arg(&self.path) .output() - .await .expect("nix-store cmd failed"); let nix_store_out = String::from_utf8(nix_store_cmd.stdout).unwrap(); @@ -88,10 +85,6 @@ struct Cli { /// Concurrent uploaders #[arg(long, default_value_t = 10)] uploader_concurrency: u8, - - /// Concurrent nix-store commands to run - #[arg(long, default_value_t = 50)] - nix_store_concurrency: u8, } #[tokio::main] @@ -109,33 +102,15 @@ async fn main() { debug!("upstream caches: {:#?}", upstream_caches); println!("querying nix path-info"); - let derivations = PathInfo::from_package(package).await; - println!("got {} derivations", derivations.len()); + let path_infos = PathInfo::from_package(package); println!("querying nix-store"); - let mut handles = Vec::new(); - let concurrency = Arc::new(Semaphore::new(cli.nix_store_concurrency.into())); - let store_paths = Arc::new(RwLock::new(Vec::new())); - - for derivation in derivations { - let store_paths = Arc::clone(&store_paths); - let permit = Arc::clone(&concurrency); - handles.push(tokio::spawn(async move { - let _permit = permit.acquire_owned().await.unwrap(); - let paths = derivation.get_store_paths().await; - store_paths.write().unwrap().extend(paths); - })); - } - // resolve store paths for all derivations before we move on - for handle in handles { - handle.await.unwrap(); - } - println!("got {} store paths", store_paths.read().unwrap().len()); - + let store_paths = path_infos[0].get_store_paths(); let (cacheable_tx, cacheable_rx) = mpsc::channel(); + let mut handles = Vec::new(); + println!("spawning check_upstream"); - handles = Vec::new(); handles.push(tokio::spawn(async move { check_upstream( store_paths, @@ -159,22 +134,19 @@ async fn main() { // filter out store paths that exist in upstream caches async fn check_upstream( - store_paths: Arc>>, + store_paths: Vec, cacheable_tx: mpsc::Sender, concurrency: u8, upstream_caches: Arc>, ) { - let concurrency = Arc::new(Semaphore::new(concurrency.into())); - let c_store_paths = Arc::clone(&store_paths); - let store_paths = c_store_paths.read().unwrap().clone(); + let concurrent = Semaphore::new(concurrency.into()); for store_path in store_paths { + let _ = concurrent.acquire().await.unwrap(); let tx = cacheable_tx.clone(); let upstream_caches = Arc::clone(&upstream_caches); - let concurrency = Arc::clone(&concurrency); tokio::spawn(async move { - let _permit = concurrency.acquire().await.unwrap(); let basename = Path::new(&store_path) .file_name() .unwrap() @@ -213,18 +185,16 @@ async fn check_upstream( async fn uploader(cacheable_rx: mpsc::Receiver, binary_cache: String, concurrency: u8) { let upload_count = Arc::new(AtomicUsize::new(0)); let failures: Arc>> = Arc::new(Mutex::new(Vec::new())); - let concurrency = Arc::new(Semaphore::new(concurrency.into())); + let concurrent = Semaphore::new(concurrency.into()); let mut handles = Vec::new(); - loop { if let Ok(path_to_upload) = cacheable_rx.recv() { - let concurrency = Arc::clone(&concurrency); + let _ = concurrent.acquire().await.unwrap(); let failures = Arc::clone(&failures); let binary_cache = binary_cache.clone(); let upload_count = Arc::clone(&upload_count); handles.push(tokio::spawn(async move { - let _permit = concurrency.acquire().await.unwrap(); println!("uploading: {}", path_to_upload); if Command::new("nix") .arg("copy") @@ -232,7 +202,6 @@ async fn uploader(cacheable_rx: mpsc::Receiver, binary_cache: String, co .arg(&binary_cache) .arg(&path_to_upload) .output() - .await .is_err() { println!("WARN: upload failed: {}", path_to_upload);