Compare commits

...

3 commits

Author SHA1 Message Date
cy
97b35ef080
fix concurrency limiting and use --recursive 2025-04-01 14:35:08 -04:00
cy
0612ea6530
fix cargo and flake stuff 2025-04-01 14:33:40 -04:00
cy
6fc3b1c0bc
Revert "use cranelib devshell"
This reverts commit 341424f663.
2025-04-01 12:40:28 -04:00
4 changed files with 49 additions and 65 deletions

49
Cargo.lock generated
View file

@ -730,16 +730,6 @@ version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856"
[[package]]
name = "lock_api"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
dependencies = [
"autocfg",
"scopeguard",
]
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.27" version = "0.4.27"
@ -867,29 +857,6 @@ dependencies = [
"vcpkg", "vcpkg",
] ]
[[package]]
name = "parking_lot"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-targets 0.52.6",
]
[[package]] [[package]]
name = "percent-encoding" name = "percent-encoding"
version = "2.3.1" version = "2.3.1"
@ -953,15 +920,6 @@ version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
[[package]]
name = "redox_syscall"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1"
dependencies = [
"bitflags",
]
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.11.1" version = "1.11.1"
@ -1128,12 +1086,6 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]] [[package]]
name = "security-framework" name = "security-framework"
version = "2.11.1" version = "2.11.1"
@ -1344,7 +1296,6 @@ dependencies = [
"bytes", "bytes",
"libc", "libc",
"mio", "mio",
"parking_lot",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"socket2", "socket2",

View file

@ -10,4 +10,4 @@ log = "0.4.27"
reqwest = "0.12.15" reqwest = "0.12.15"
serde = { version = "1.0.219", features = [ "derive" ]} serde = { version = "1.0.219", features = [ "derive" ]}
serde_json = "1.0.140" serde_json = "1.0.140"
tokio = { version = "1.44.1", features = [ "full" ]} tokio = { version = "1.44.1", features = [ "rt", "rt-multi-thread", "macros", "sync", "process" ]}

View file

@ -20,15 +20,17 @@
(import inputs.rust-overlay) (import inputs.rust-overlay)
]; ];
}; };
craneLib = (crane.mkLib pkgs).overrideToolchain(p: p.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml); toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;
craneLib = (crane.mkLib pkgs).overrideToolchain(_: toolchain);
in in
{ {
devShells.default = craneLib.devShell { devShells.default = pkgs.mkShell {
nativeBuildInputs = with pkgs; [ nativeBuildInputs = with pkgs; [
pkg-config pkg-config
]; ];
buildInputs = with pkgs; [ buildInputs = with pkgs; [
openssl openssl
toolchain
]; ];
}; };

View file

@ -1,16 +1,16 @@
#![feature(let_chains)] #![feature(let_chains)]
use std::path::Path; use std::path::Path;
use std::process::Command;
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::{ use std::sync::{
Arc, Mutex, Arc, Mutex, RwLock,
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
}; };
use clap::Parser; use clap::Parser;
use log::{debug, trace}; use log::{debug, trace};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::process::Command;
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"]; const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"];
@ -30,13 +30,15 @@ struct PathInfo {
impl PathInfo { impl PathInfo {
// find derivations related to package // find derivations related to package
fn from_package(package: &str) -> Vec<Self> { async fn from_package(package: &str) -> Vec<Self> {
let path_infos = Command::new("nix") let path_infos = Command::new("nix")
.arg("path-info") .arg("path-info")
.arg("--derivation") .arg("--derivation")
.arg("--recursive")
.arg("--json") .arg("--json")
.arg(package) .arg(package)
.output() .output()
.await
.expect("path-info failed"); .expect("path-info failed");
let path_infos: Vec<PathInfo> = serde_json::from_slice(&path_infos.stdout).unwrap(); let path_infos: Vec<PathInfo> = serde_json::from_slice(&path_infos.stdout).unwrap();
@ -45,7 +47,7 @@ impl PathInfo {
} }
// find store paths related to derivation // find store paths related to derivation
fn get_store_paths(&self) -> Vec<String> { async fn get_store_paths(&self) -> Vec<String> {
let mut store_paths: Vec<String> = Vec::new(); let mut store_paths: Vec<String> = Vec::new();
let nix_store_cmd = Command::new("nix-store") let nix_store_cmd = Command::new("nix-store")
.arg("--query") .arg("--query")
@ -53,6 +55,7 @@ impl PathInfo {
.arg("--include-outputs") .arg("--include-outputs")
.arg(&self.path) .arg(&self.path)
.output() .output()
.await
.expect("nix-store cmd failed"); .expect("nix-store cmd failed");
let nix_store_out = String::from_utf8(nix_store_cmd.stdout).unwrap(); let nix_store_out = String::from_utf8(nix_store_cmd.stdout).unwrap();
@ -85,6 +88,10 @@ struct Cli {
/// Concurrent uploaders /// Concurrent uploaders
#[arg(long, default_value_t = 10)] #[arg(long, default_value_t = 10)]
uploader_concurrency: u8, uploader_concurrency: u8,
/// Concurrent nix-store commands to run
#[arg(long, default_value_t = 50)]
nix_store_concurrency: u8,
} }
#[tokio::main] #[tokio::main]
@ -102,15 +109,33 @@ async fn main() {
debug!("upstream caches: {:#?}", upstream_caches); debug!("upstream caches: {:#?}", upstream_caches);
println!("querying nix path-info"); println!("querying nix path-info");
let path_infos = PathInfo::from_package(package); let derivations = PathInfo::from_package(package).await;
println!("got {} derivations", derivations.len());
println!("querying nix-store"); println!("querying nix-store");
let store_paths = path_infos[0].get_store_paths(); let mut handles = Vec::new();
let concurrency = Arc::new(Semaphore::new(cli.nix_store_concurrency.into()));
let store_paths = Arc::new(RwLock::new(Vec::new()));
for derivation in derivations {
let store_paths = Arc::clone(&store_paths);
let permit = Arc::clone(&concurrency);
handles.push(tokio::spawn(async move {
let _permit = permit.acquire_owned().await.unwrap();
let paths = derivation.get_store_paths().await;
store_paths.write().unwrap().extend(paths);
}));
}
// resolve store paths for all derivations before we move on
for handle in handles {
handle.await.unwrap();
}
println!("got {} store paths", store_paths.read().unwrap().len());
let (cacheable_tx, cacheable_rx) = mpsc::channel(); let (cacheable_tx, cacheable_rx) = mpsc::channel();
let mut handles = Vec::new();
println!("spawning check_upstream"); println!("spawning check_upstream");
handles = Vec::new();
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
check_upstream( check_upstream(
store_paths, store_paths,
@ -134,19 +159,22 @@ async fn main() {
// filter out store paths that exist in upstream caches // filter out store paths that exist in upstream caches
async fn check_upstream( async fn check_upstream(
store_paths: Vec<String>, store_paths: Arc<RwLock<Vec<String>>>,
cacheable_tx: mpsc::Sender<String>, cacheable_tx: mpsc::Sender<String>,
concurrency: u8, concurrency: u8,
upstream_caches: Arc<Vec<String>>, upstream_caches: Arc<Vec<String>>,
) { ) {
let concurrent = Semaphore::new(concurrency.into()); let concurrency = Arc::new(Semaphore::new(concurrency.into()));
let c_store_paths = Arc::clone(&store_paths);
let store_paths = c_store_paths.read().unwrap().clone();
for store_path in store_paths { for store_path in store_paths {
let _ = concurrent.acquire().await.unwrap();
let tx = cacheable_tx.clone(); let tx = cacheable_tx.clone();
let upstream_caches = Arc::clone(&upstream_caches); let upstream_caches = Arc::clone(&upstream_caches);
let concurrency = Arc::clone(&concurrency);
tokio::spawn(async move { tokio::spawn(async move {
let _permit = concurrency.acquire().await.unwrap();
let basename = Path::new(&store_path) let basename = Path::new(&store_path)
.file_name() .file_name()
.unwrap() .unwrap()
@ -185,16 +213,18 @@ async fn check_upstream(
async fn uploader(cacheable_rx: mpsc::Receiver<String>, binary_cache: String, concurrency: u8) { async fn uploader(cacheable_rx: mpsc::Receiver<String>, binary_cache: String, concurrency: u8) {
let upload_count = Arc::new(AtomicUsize::new(0)); let upload_count = Arc::new(AtomicUsize::new(0));
let failures: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new())); let failures: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let concurrent = Semaphore::new(concurrency.into()); let concurrency = Arc::new(Semaphore::new(concurrency.into()));
let mut handles = Vec::new(); let mut handles = Vec::new();
loop { loop {
if let Ok(path_to_upload) = cacheable_rx.recv() { if let Ok(path_to_upload) = cacheable_rx.recv() {
let _ = concurrent.acquire().await.unwrap(); let concurrency = Arc::clone(&concurrency);
let failures = Arc::clone(&failures); let failures = Arc::clone(&failures);
let binary_cache = binary_cache.clone(); let binary_cache = binary_cache.clone();
let upload_count = Arc::clone(&upload_count); let upload_count = Arc::clone(&upload_count);
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
let _permit = concurrency.acquire().await.unwrap();
println!("uploading: {}", path_to_upload); println!("uploading: {}", path_to_upload);
if Command::new("nix") if Command::new("nix")
.arg("copy") .arg("copy")
@ -202,6 +232,7 @@ async fn uploader(cacheable_rx: mpsc::Receiver<String>, binary_cache: String, co
.arg(&binary_cache) .arg(&binary_cache)
.arg(&path_to_upload) .arg(&path_to_upload)
.output() .output()
.await
.is_err() .is_err()
{ {
println!("WARN: upload failed: {}", path_to_upload); println!("WARN: upload failed: {}", path_to_upload);