From 6fc3b1c0bc808da061dd1b55be8cb8e75920d914 Mon Sep 17 00:00:00 2001 From: cy Date: Tue, 1 Apr 2025 12:40:28 -0400 Subject: [PATCH 1/3] Revert "use cranelib devshell" This reverts commit 341424f663ab7ab5ad8e71b90d11a73670f5ab89. --- flake.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flake.nix b/flake.nix index 38af66d..53b074e 100644 --- a/flake.nix +++ b/flake.nix @@ -23,7 +23,7 @@ craneLib = (crane.mkLib pkgs).overrideToolchain(p: p.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml); in { - devShells.default = craneLib.devShell { + devShells.default = pkgs.mkShell { nativeBuildInputs = with pkgs; [ pkg-config ]; From 0612ea653049dfc3ead006bbc0d2287bf3f48a8b Mon Sep 17 00:00:00 2001 From: cy Date: Tue, 1 Apr 2025 14:33:40 -0400 Subject: [PATCH 2/3] fix cargo and flake stuff --- Cargo.lock | 49 ------------------------------------------------- Cargo.toml | 2 +- flake.nix | 4 +++- 3 files changed, 4 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d16b9d3..30716eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -730,16 +730,6 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" -[[package]] -name = "lock_api" -version = "0.4.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" -dependencies = [ - "autocfg", - "scopeguard", -] - [[package]] name = "log" version = "0.4.27" @@ -867,29 +857,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "parking_lot" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" -dependencies = [ - "lock_api", - "parking_lot_core", -] - -[[package]] -name = "parking_lot_core" -version = "0.9.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" -dependencies = [ - "cfg-if", - "libc", - "redox_syscall", - "smallvec", - "windows-targets 0.52.6", -] - [[package]] name = "percent-encoding" version = "2.3.1" @@ -953,15 +920,6 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" -[[package]] -name = "redox_syscall" -version = "0.5.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1" -dependencies = [ - "bitflags", -] - [[package]] name = "regex" version = "1.11.1" @@ -1128,12 +1086,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "scopeguard" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" - [[package]] name = "security-framework" version = "2.11.1" @@ -1344,7 +1296,6 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", diff --git a/Cargo.toml b/Cargo.toml index 2428f7e..a037e6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,4 +10,4 @@ log = "0.4.27" reqwest = "0.12.15" serde = { version = "1.0.219", features = [ "derive" ]} serde_json = "1.0.140" -tokio = { version = "1.44.1", features = [ "full" ]} +tokio = { version = "1.44.1", features = [ "rt", "rt-multi-thread", "macros", "sync", "process" ]} diff --git a/flake.nix b/flake.nix index 53b074e..c263f49 100644 --- a/flake.nix +++ b/flake.nix @@ -20,7 +20,8 @@ (import inputs.rust-overlay) ]; }; - craneLib = (crane.mkLib pkgs).overrideToolchain(p: p.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml); + toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml; + craneLib = (crane.mkLib pkgs).overrideToolchain(_: toolchain); in { devShells.default = pkgs.mkShell { @@ -29,6 +30,7 @@ ]; buildInputs = with pkgs; [ openssl + toolchain ]; }; From 97b35ef080ac440954e5a52aedbe33102eee6b62 Mon Sep 17 00:00:00 2001 From: cy Date: Tue, 1 Apr 2025 14:34:14 -0400 Subject: [PATCH 3/3] fix concurrency limiting and use --recursive --- src/main.rs | 57 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 13 deletions(-) diff --git a/src/main.rs b/src/main.rs index 819460a..51c3134 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,16 @@ #![feature(let_chains)] use std::path::Path; -use std::process::Command; use std::sync::mpsc; use std::sync::{ - Arc, Mutex, + Arc, Mutex, RwLock, atomic::{AtomicUsize, Ordering}, }; use clap::Parser; use log::{debug, trace}; use serde::{Deserialize, Serialize}; +use tokio::process::Command; use tokio::sync::Semaphore; const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"]; @@ -30,13 +30,15 @@ struct PathInfo { impl PathInfo { // find derivations related to package - fn from_package(package: &str) -> Vec { + async fn from_package(package: &str) -> Vec { let path_infos = Command::new("nix") .arg("path-info") .arg("--derivation") + .arg("--recursive") .arg("--json") .arg(package) .output() + .await .expect("path-info failed"); let path_infos: Vec = serde_json::from_slice(&path_infos.stdout).unwrap(); @@ -45,7 +47,7 @@ impl PathInfo { } // find store paths related to derivation - fn get_store_paths(&self) -> Vec { + async fn get_store_paths(&self) -> Vec { let mut store_paths: Vec = Vec::new(); let nix_store_cmd = Command::new("nix-store") .arg("--query") @@ -53,6 +55,7 @@ impl PathInfo { .arg("--include-outputs") .arg(&self.path) .output() + .await .expect("nix-store cmd failed"); let nix_store_out = String::from_utf8(nix_store_cmd.stdout).unwrap(); @@ -85,6 +88,10 @@ struct Cli { /// Concurrent uploaders #[arg(long, default_value_t = 10)] uploader_concurrency: u8, + + /// Concurrent nix-store commands to run + #[arg(long, default_value_t = 50)] + nix_store_concurrency: u8, } #[tokio::main] @@ -102,15 +109,33 @@ async fn main() { debug!("upstream caches: {:#?}", upstream_caches); println!("querying nix path-info"); - let path_infos = PathInfo::from_package(package); + let derivations = PathInfo::from_package(package).await; + println!("got {} derivations", derivations.len()); println!("querying nix-store"); - let store_paths = path_infos[0].get_store_paths(); + let mut handles = Vec::new(); + let concurrency = Arc::new(Semaphore::new(cli.nix_store_concurrency.into())); + let store_paths = Arc::new(RwLock::new(Vec::new())); + + for derivation in derivations { + let store_paths = Arc::clone(&store_paths); + let permit = Arc::clone(&concurrency); + handles.push(tokio::spawn(async move { + let _permit = permit.acquire_owned().await.unwrap(); + let paths = derivation.get_store_paths().await; + store_paths.write().unwrap().extend(paths); + })); + } + // resolve store paths for all derivations before we move on + for handle in handles { + handle.await.unwrap(); + } + println!("got {} store paths", store_paths.read().unwrap().len()); + let (cacheable_tx, cacheable_rx) = mpsc::channel(); - let mut handles = Vec::new(); - println!("spawning check_upstream"); + handles = Vec::new(); handles.push(tokio::spawn(async move { check_upstream( store_paths, @@ -134,19 +159,22 @@ async fn main() { // filter out store paths that exist in upstream caches async fn check_upstream( - store_paths: Vec, + store_paths: Arc>>, cacheable_tx: mpsc::Sender, concurrency: u8, upstream_caches: Arc>, ) { - let concurrent = Semaphore::new(concurrency.into()); + let concurrency = Arc::new(Semaphore::new(concurrency.into())); + let c_store_paths = Arc::clone(&store_paths); + let store_paths = c_store_paths.read().unwrap().clone(); for store_path in store_paths { - let _ = concurrent.acquire().await.unwrap(); let tx = cacheable_tx.clone(); let upstream_caches = Arc::clone(&upstream_caches); + let concurrency = Arc::clone(&concurrency); tokio::spawn(async move { + let _permit = concurrency.acquire().await.unwrap(); let basename = Path::new(&store_path) .file_name() .unwrap() @@ -185,16 +213,18 @@ async fn check_upstream( async fn uploader(cacheable_rx: mpsc::Receiver, binary_cache: String, concurrency: u8) { let upload_count = Arc::new(AtomicUsize::new(0)); let failures: Arc>> = Arc::new(Mutex::new(Vec::new())); - let concurrent = Semaphore::new(concurrency.into()); + let concurrency = Arc::new(Semaphore::new(concurrency.into())); let mut handles = Vec::new(); + loop { if let Ok(path_to_upload) = cacheable_rx.recv() { - let _ = concurrent.acquire().await.unwrap(); + let concurrency = Arc::clone(&concurrency); let failures = Arc::clone(&failures); let binary_cache = binary_cache.clone(); let upload_count = Arc::clone(&upload_count); handles.push(tokio::spawn(async move { + let _permit = concurrency.acquire().await.unwrap(); println!("uploading: {}", path_to_upload); if Command::new("nix") .arg("copy") @@ -202,6 +232,7 @@ async fn uploader(cacheable_rx: mpsc::Receiver, binary_cache: String, co .arg(&binary_cache) .arg(&path_to_upload) .output() + .await .is_err() { println!("WARN: upload failed: {}", path_to_upload);