Compare commits
No commits in common. "97b35ef080ac440954e5a52aedbe33102eee6b62" and "341424f663ab7ab5ad8e71b90d11a73670f5ab89" have entirely different histories.
97b35ef080
...
341424f663
4 changed files with 65 additions and 49 deletions
49
Cargo.lock
generated
49
Cargo.lock
generated
|
@ -730,6 +730,16 @@ version = "0.7.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856"
|
checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "lock_api"
|
||||||
|
version = "0.4.12"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17"
|
||||||
|
dependencies = [
|
||||||
|
"autocfg",
|
||||||
|
"scopeguard",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "log"
|
name = "log"
|
||||||
version = "0.4.27"
|
version = "0.4.27"
|
||||||
|
@ -857,6 +867,29 @@ dependencies = [
|
||||||
"vcpkg",
|
"vcpkg",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "parking_lot"
|
||||||
|
version = "0.12.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27"
|
||||||
|
dependencies = [
|
||||||
|
"lock_api",
|
||||||
|
"parking_lot_core",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "parking_lot_core"
|
||||||
|
version = "0.9.10"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"libc",
|
||||||
|
"redox_syscall",
|
||||||
|
"smallvec",
|
||||||
|
"windows-targets 0.52.6",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "percent-encoding"
|
name = "percent-encoding"
|
||||||
version = "2.3.1"
|
version = "2.3.1"
|
||||||
|
@ -920,6 +953,15 @@ version = "5.2.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
|
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "redox_syscall"
|
||||||
|
version = "0.5.10"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "regex"
|
name = "regex"
|
||||||
version = "1.11.1"
|
version = "1.11.1"
|
||||||
|
@ -1086,6 +1128,12 @@ dependencies = [
|
||||||
"windows-sys 0.59.0",
|
"windows-sys 0.59.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "scopeguard"
|
||||||
|
version = "1.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "security-framework"
|
name = "security-framework"
|
||||||
version = "2.11.1"
|
version = "2.11.1"
|
||||||
|
@ -1296,6 +1344,7 @@ dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"libc",
|
"libc",
|
||||||
"mio",
|
"mio",
|
||||||
|
"parking_lot",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"signal-hook-registry",
|
"signal-hook-registry",
|
||||||
"socket2",
|
"socket2",
|
||||||
|
|
|
@ -10,4 +10,4 @@ log = "0.4.27"
|
||||||
reqwest = "0.12.15"
|
reqwest = "0.12.15"
|
||||||
serde = { version = "1.0.219", features = [ "derive" ]}
|
serde = { version = "1.0.219", features = [ "derive" ]}
|
||||||
serde_json = "1.0.140"
|
serde_json = "1.0.140"
|
||||||
tokio = { version = "1.44.1", features = [ "rt", "rt-multi-thread", "macros", "sync", "process" ]}
|
tokio = { version = "1.44.1", features = [ "full" ]}
|
||||||
|
|
|
@ -20,17 +20,15 @@
|
||||||
(import inputs.rust-overlay)
|
(import inputs.rust-overlay)
|
||||||
];
|
];
|
||||||
};
|
};
|
||||||
toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;
|
craneLib = (crane.mkLib pkgs).overrideToolchain(p: p.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml);
|
||||||
craneLib = (crane.mkLib pkgs).overrideToolchain(_: toolchain);
|
|
||||||
in
|
in
|
||||||
{
|
{
|
||||||
devShells.default = pkgs.mkShell {
|
devShells.default = craneLib.devShell {
|
||||||
nativeBuildInputs = with pkgs; [
|
nativeBuildInputs = with pkgs; [
|
||||||
pkg-config
|
pkg-config
|
||||||
];
|
];
|
||||||
buildInputs = with pkgs; [
|
buildInputs = with pkgs; [
|
||||||
openssl
|
openssl
|
||||||
toolchain
|
|
||||||
];
|
];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
57
src/main.rs
57
src/main.rs
|
@ -1,16 +1,16 @@
|
||||||
#![feature(let_chains)]
|
#![feature(let_chains)]
|
||||||
|
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::process::Command;
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
use std::sync::{
|
use std::sync::{
|
||||||
Arc, Mutex, RwLock,
|
Arc, Mutex,
|
||||||
atomic::{AtomicUsize, Ordering},
|
atomic::{AtomicUsize, Ordering},
|
||||||
};
|
};
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use log::{debug, trace};
|
use log::{debug, trace};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::process::Command;
|
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"];
|
const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"];
|
||||||
|
@ -30,15 +30,13 @@ struct PathInfo {
|
||||||
|
|
||||||
impl PathInfo {
|
impl PathInfo {
|
||||||
// find derivations related to package
|
// find derivations related to package
|
||||||
async fn from_package(package: &str) -> Vec<Self> {
|
fn from_package(package: &str) -> Vec<Self> {
|
||||||
let path_infos = Command::new("nix")
|
let path_infos = Command::new("nix")
|
||||||
.arg("path-info")
|
.arg("path-info")
|
||||||
.arg("--derivation")
|
.arg("--derivation")
|
||||||
.arg("--recursive")
|
|
||||||
.arg("--json")
|
.arg("--json")
|
||||||
.arg(package)
|
.arg(package)
|
||||||
.output()
|
.output()
|
||||||
.await
|
|
||||||
.expect("path-info failed");
|
.expect("path-info failed");
|
||||||
|
|
||||||
let path_infos: Vec<PathInfo> = serde_json::from_slice(&path_infos.stdout).unwrap();
|
let path_infos: Vec<PathInfo> = serde_json::from_slice(&path_infos.stdout).unwrap();
|
||||||
|
@ -47,7 +45,7 @@ impl PathInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
// find store paths related to derivation
|
// find store paths related to derivation
|
||||||
async fn get_store_paths(&self) -> Vec<String> {
|
fn get_store_paths(&self) -> Vec<String> {
|
||||||
let mut store_paths: Vec<String> = Vec::new();
|
let mut store_paths: Vec<String> = Vec::new();
|
||||||
let nix_store_cmd = Command::new("nix-store")
|
let nix_store_cmd = Command::new("nix-store")
|
||||||
.arg("--query")
|
.arg("--query")
|
||||||
|
@ -55,7 +53,6 @@ impl PathInfo {
|
||||||
.arg("--include-outputs")
|
.arg("--include-outputs")
|
||||||
.arg(&self.path)
|
.arg(&self.path)
|
||||||
.output()
|
.output()
|
||||||
.await
|
|
||||||
.expect("nix-store cmd failed");
|
.expect("nix-store cmd failed");
|
||||||
|
|
||||||
let nix_store_out = String::from_utf8(nix_store_cmd.stdout).unwrap();
|
let nix_store_out = String::from_utf8(nix_store_cmd.stdout).unwrap();
|
||||||
|
@ -88,10 +85,6 @@ struct Cli {
|
||||||
/// Concurrent uploaders
|
/// Concurrent uploaders
|
||||||
#[arg(long, default_value_t = 10)]
|
#[arg(long, default_value_t = 10)]
|
||||||
uploader_concurrency: u8,
|
uploader_concurrency: u8,
|
||||||
|
|
||||||
/// Concurrent nix-store commands to run
|
|
||||||
#[arg(long, default_value_t = 50)]
|
|
||||||
nix_store_concurrency: u8,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
@ -109,33 +102,15 @@ async fn main() {
|
||||||
debug!("upstream caches: {:#?}", upstream_caches);
|
debug!("upstream caches: {:#?}", upstream_caches);
|
||||||
|
|
||||||
println!("querying nix path-info");
|
println!("querying nix path-info");
|
||||||
let derivations = PathInfo::from_package(package).await;
|
let path_infos = PathInfo::from_package(package);
|
||||||
println!("got {} derivations", derivations.len());
|
|
||||||
|
|
||||||
println!("querying nix-store");
|
println!("querying nix-store");
|
||||||
let mut handles = Vec::new();
|
let store_paths = path_infos[0].get_store_paths();
|
||||||
let concurrency = Arc::new(Semaphore::new(cli.nix_store_concurrency.into()));
|
|
||||||
let store_paths = Arc::new(RwLock::new(Vec::new()));
|
|
||||||
|
|
||||||
for derivation in derivations {
|
|
||||||
let store_paths = Arc::clone(&store_paths);
|
|
||||||
let permit = Arc::clone(&concurrency);
|
|
||||||
handles.push(tokio::spawn(async move {
|
|
||||||
let _permit = permit.acquire_owned().await.unwrap();
|
|
||||||
let paths = derivation.get_store_paths().await;
|
|
||||||
store_paths.write().unwrap().extend(paths);
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
// resolve store paths for all derivations before we move on
|
|
||||||
for handle in handles {
|
|
||||||
handle.await.unwrap();
|
|
||||||
}
|
|
||||||
println!("got {} store paths", store_paths.read().unwrap().len());
|
|
||||||
|
|
||||||
let (cacheable_tx, cacheable_rx) = mpsc::channel();
|
let (cacheable_tx, cacheable_rx) = mpsc::channel();
|
||||||
|
|
||||||
|
let mut handles = Vec::new();
|
||||||
|
|
||||||
println!("spawning check_upstream");
|
println!("spawning check_upstream");
|
||||||
handles = Vec::new();
|
|
||||||
handles.push(tokio::spawn(async move {
|
handles.push(tokio::spawn(async move {
|
||||||
check_upstream(
|
check_upstream(
|
||||||
store_paths,
|
store_paths,
|
||||||
|
@ -159,22 +134,19 @@ async fn main() {
|
||||||
|
|
||||||
// filter out store paths that exist in upstream caches
|
// filter out store paths that exist in upstream caches
|
||||||
async fn check_upstream(
|
async fn check_upstream(
|
||||||
store_paths: Arc<RwLock<Vec<String>>>,
|
store_paths: Vec<String>,
|
||||||
cacheable_tx: mpsc::Sender<String>,
|
cacheable_tx: mpsc::Sender<String>,
|
||||||
concurrency: u8,
|
concurrency: u8,
|
||||||
upstream_caches: Arc<Vec<String>>,
|
upstream_caches: Arc<Vec<String>>,
|
||||||
) {
|
) {
|
||||||
let concurrency = Arc::new(Semaphore::new(concurrency.into()));
|
let concurrent = Semaphore::new(concurrency.into());
|
||||||
let c_store_paths = Arc::clone(&store_paths);
|
|
||||||
let store_paths = c_store_paths.read().unwrap().clone();
|
|
||||||
|
|
||||||
for store_path in store_paths {
|
for store_path in store_paths {
|
||||||
|
let _ = concurrent.acquire().await.unwrap();
|
||||||
let tx = cacheable_tx.clone();
|
let tx = cacheable_tx.clone();
|
||||||
let upstream_caches = Arc::clone(&upstream_caches);
|
let upstream_caches = Arc::clone(&upstream_caches);
|
||||||
let concurrency = Arc::clone(&concurrency);
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _permit = concurrency.acquire().await.unwrap();
|
|
||||||
let basename = Path::new(&store_path)
|
let basename = Path::new(&store_path)
|
||||||
.file_name()
|
.file_name()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
@ -213,18 +185,16 @@ async fn check_upstream(
|
||||||
async fn uploader(cacheable_rx: mpsc::Receiver<String>, binary_cache: String, concurrency: u8) {
|
async fn uploader(cacheable_rx: mpsc::Receiver<String>, binary_cache: String, concurrency: u8) {
|
||||||
let upload_count = Arc::new(AtomicUsize::new(0));
|
let upload_count = Arc::new(AtomicUsize::new(0));
|
||||||
let failures: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
|
let failures: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
|
||||||
let concurrency = Arc::new(Semaphore::new(concurrency.into()));
|
let concurrent = Semaphore::new(concurrency.into());
|
||||||
let mut handles = Vec::new();
|
let mut handles = Vec::new();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Ok(path_to_upload) = cacheable_rx.recv() {
|
if let Ok(path_to_upload) = cacheable_rx.recv() {
|
||||||
let concurrency = Arc::clone(&concurrency);
|
let _ = concurrent.acquire().await.unwrap();
|
||||||
let failures = Arc::clone(&failures);
|
let failures = Arc::clone(&failures);
|
||||||
let binary_cache = binary_cache.clone();
|
let binary_cache = binary_cache.clone();
|
||||||
let upload_count = Arc::clone(&upload_count);
|
let upload_count = Arc::clone(&upload_count);
|
||||||
|
|
||||||
handles.push(tokio::spawn(async move {
|
handles.push(tokio::spawn(async move {
|
||||||
let _permit = concurrency.acquire().await.unwrap();
|
|
||||||
println!("uploading: {}", path_to_upload);
|
println!("uploading: {}", path_to_upload);
|
||||||
if Command::new("nix")
|
if Command::new("nix")
|
||||||
.arg("copy")
|
.arg("copy")
|
||||||
|
@ -232,7 +202,6 @@ async fn uploader(cacheable_rx: mpsc::Receiver<String>, binary_cache: String, co
|
||||||
.arg(&binary_cache)
|
.arg(&binary_cache)
|
||||||
.arg(&path_to_upload)
|
.arg(&path_to_upload)
|
||||||
.output()
|
.output()
|
||||||
.await
|
|
||||||
.is_err()
|
.is_err()
|
||||||
{
|
{
|
||||||
println!("WARN: upload failed: {}", path_to_upload);
|
println!("WARN: upload failed: {}", path_to_upload);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue