Compare commits
2 commits
70739d7dcf
...
e58cf2bbd0
Author | SHA1 | Date | |
---|---|---|---|
e58cf2bbd0 | |||
f51099b911 |
7 changed files with 344 additions and 239 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1,2 +1,3 @@
|
||||||
/target
|
/target
|
||||||
.direnv
|
.direnv
|
||||||
|
result
|
||||||
|
|
10
Cargo.lock
generated
10
Cargo.lock
generated
|
@ -76,6 +76,12 @@ dependencies = [
|
||||||
"windows-sys 0.59.0",
|
"windows-sys 0.59.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "anyhow"
|
||||||
|
version = "1.0.97"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "atomic-waker"
|
name = "atomic-waker"
|
||||||
version = "1.1.2"
|
version = "1.1.2"
|
||||||
|
@ -799,13 +805,16 @@ dependencies = [
|
||||||
name = "nixcp"
|
name = "nixcp"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
"clap",
|
"clap",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
"log",
|
"log",
|
||||||
|
"regex",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1469,6 +1478,7 @@ dependencies = [
|
||||||
"form_urlencoded",
|
"form_urlencoded",
|
||||||
"idna",
|
"idna",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
@ -4,10 +4,13 @@ version = "0.1.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
anyhow = "1.0.97"
|
||||||
clap = { version = "4.5.34", features = ["derive"] }
|
clap = { version = "4.5.34", features = ["derive"] }
|
||||||
env_logger = "0.11.7"
|
env_logger = "0.11.7"
|
||||||
log = "0.4.27"
|
log = "0.4.27"
|
||||||
|
regex = "1.11.1"
|
||||||
reqwest = "0.12.15"
|
reqwest = "0.12.15"
|
||||||
serde = { version = "1.0.219", features = [ "derive" ]}
|
serde = { version = "1.0.219", features = [ "derive" ]}
|
||||||
serde_json = "1.0.140"
|
serde_json = "1.0.140"
|
||||||
tokio = { version = "1.44.1", features = [ "full" ]}
|
tokio = { version = "1.44.1", features = [ "full" ]}
|
||||||
|
url = { version = "2.5.4", features = [ "serde" ]}
|
||||||
|
|
2
src/cli.rs
Normal file
2
src/cli.rs
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
|
||||||
|
|
269
src/main.rs
269
src/main.rs
|
@ -1,97 +1,36 @@
|
||||||
#![feature(let_chains)]
|
#![feature(let_chains)]
|
||||||
|
#![feature(extend_one)]
|
||||||
|
|
||||||
use std::path::Path;
|
use anyhow::Result;
|
||||||
use std::sync::{
|
use clap::{Parser, Subcommand};
|
||||||
Arc, Mutex, RwLock,
|
|
||||||
atomic::{AtomicUsize, Ordering},
|
|
||||||
};
|
|
||||||
|
|
||||||
use clap::Parser;
|
use nixcp::NixCp;
|
||||||
use log::{debug, trace};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use tokio::process::Command;
|
|
||||||
use tokio::sync::{Semaphore, mpsc};
|
|
||||||
|
|
||||||
const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"];
|
mod cli;
|
||||||
|
mod nixcp;
|
||||||
|
mod path_info;
|
||||||
|
|
||||||
// nix path-info --derivation --json
|
#[derive(Parser, Debug)]
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[command(version, name = "nixcp")]
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
struct PathInfo {
|
|
||||||
ca: String,
|
|
||||||
nar_hash: String,
|
|
||||||
nar_size: u32,
|
|
||||||
path: String,
|
|
||||||
references: Vec<String>,
|
|
||||||
registration_time: u32,
|
|
||||||
valid: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PathInfo {
|
|
||||||
// find derivations related to package
|
|
||||||
async fn from_package(package: &str, recursive: bool) -> Vec<Self> {
|
|
||||||
let mut args = vec!["path-info", "--derivation", "--json"];
|
|
||||||
if recursive {
|
|
||||||
args.push("--recursive");
|
|
||||||
}
|
|
||||||
let path_infos = Command::new("nix")
|
|
||||||
.args(args)
|
|
||||||
.arg(package)
|
|
||||||
.output()
|
|
||||||
.await
|
|
||||||
.expect("path-info failed");
|
|
||||||
|
|
||||||
let path_infos: Vec<PathInfo> = serde_json::from_slice(&path_infos.stdout)
|
|
||||||
.expect("no derivations found for this package");
|
|
||||||
debug!("PathInfo's from nix path-info: {:#?}", path_infos);
|
|
||||||
path_infos
|
|
||||||
}
|
|
||||||
|
|
||||||
// find store paths related to derivation
|
|
||||||
async fn get_store_paths(&self) -> Vec<String> {
|
|
||||||
let mut store_paths: Vec<String> = Vec::new();
|
|
||||||
let nix_store_cmd = Command::new("nix-store")
|
|
||||||
.arg("--query")
|
|
||||||
.arg("--requisites")
|
|
||||||
.arg("--include-outputs")
|
|
||||||
.arg(&self.path)
|
|
||||||
.output()
|
|
||||||
.await
|
|
||||||
.expect("nix-store cmd failed");
|
|
||||||
|
|
||||||
let nix_store_out = String::from_utf8(nix_store_cmd.stdout).unwrap();
|
|
||||||
for store_path in nix_store_out.split_whitespace().map(ToString::to_string) {
|
|
||||||
store_paths.push(store_path);
|
|
||||||
}
|
|
||||||
store_paths
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Parser)]
|
|
||||||
#[command(version, about, long_about = None)]
|
|
||||||
struct Cli {
|
struct Cli {
|
||||||
/// Package to upload to the binary cache
|
#[command(subcommand)]
|
||||||
package: String,
|
command: Commands,
|
||||||
|
|
||||||
/// Address of the binary cache (passed to nix copy --to)
|
/// Address of the binary cache (passed to nix copy --to)
|
||||||
#[arg(long, value_name = "BINARY CACHE")]
|
#[arg(long, value_name = "BINARY CACHE")]
|
||||||
to: String,
|
to: String,
|
||||||
|
|
||||||
/// Upstream cache to check against. Can be specified multiple times.
|
/// Upstream cache to check against. Can be specified multiple times.
|
||||||
/// cache.nixos.org is always included
|
/// cache.nixos.org is always included (unless --no-nixos-cache is passed)
|
||||||
#[arg(long, short)]
|
#[arg(long = "upstream-cache", short)]
|
||||||
upstream_cache: Vec<String>,
|
upstream_caches: Vec<String>,
|
||||||
|
|
||||||
/// Whether to pass --recursive to nix path-info. Can queue a huge number of paths to upload
|
|
||||||
#[arg(long, short, default_value_t = false)]
|
|
||||||
recursive: bool,
|
|
||||||
|
|
||||||
/// Concurrent upstream cache checkers
|
/// Concurrent upstream cache checkers
|
||||||
#[arg(long, default_value_t = 32)]
|
#[arg(long, default_value_t = 32)]
|
||||||
upstream_checker_concurrency: u8,
|
upstream_checker_concurrency: u8,
|
||||||
|
|
||||||
/// Concurrent uploaders
|
/// Concurrent uploaders
|
||||||
#[arg(long, default_value_t = 16)]
|
#[arg(long, default_value_t = 4)]
|
||||||
uploader_concurrency: u8,
|
uploader_concurrency: u8,
|
||||||
|
|
||||||
/// Concurrent nix-store commands to run
|
/// Concurrent nix-store commands to run
|
||||||
|
@ -99,173 +38,27 @@ struct Cli {
|
||||||
nix_store_concurrency: u8,
|
nix_store_concurrency: u8,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Subcommand)]
|
||||||
|
enum Commands {
|
||||||
|
Push {
|
||||||
|
/// Package or store path to upload
|
||||||
|
/// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||||
|
package: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() -> Result<()> {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
let package = &cli.package;
|
let nixcp = Box::leak(Box::new(NixCp::with_upstreams(&cli.upstream_caches)?));
|
||||||
let binary_cache = cli.to;
|
|
||||||
let mut upstream_caches = cli.upstream_cache;
|
|
||||||
for upstream in UPSTREAM_CACHES {
|
|
||||||
upstream_caches.push(upstream.to_string());
|
|
||||||
}
|
|
||||||
debug!("package: {}", package);
|
|
||||||
debug!("binary cache: {}", binary_cache);
|
|
||||||
debug!("upstream caches: {:#?}", upstream_caches);
|
|
||||||
|
|
||||||
println!("querying nix path-info");
|
match &cli.command {
|
||||||
let derivations = PathInfo::from_package(package, cli.recursive).await;
|
Commands::Push { package } => {
|
||||||
println!("got {} derivations", derivations.len());
|
nixcp.paths_from_package(package).await?;
|
||||||
|
nixcp.run().await;
|
||||||
println!("querying nix-store");
|
|
||||||
let mut handles = Vec::new();
|
|
||||||
let concurrency = Arc::new(Semaphore::new(cli.nix_store_concurrency.into()));
|
|
||||||
let store_paths = Arc::new(RwLock::new(Vec::new()));
|
|
||||||
|
|
||||||
for derivation in derivations {
|
|
||||||
let store_paths = Arc::clone(&store_paths);
|
|
||||||
let permit = Arc::clone(&concurrency);
|
|
||||||
handles.push(tokio::spawn(async move {
|
|
||||||
let _permit = permit.acquire_owned().await.unwrap();
|
|
||||||
let paths = derivation.get_store_paths().await;
|
|
||||||
store_paths.write().unwrap().extend(paths);
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
// resolve store paths for all derivations before we move on
|
|
||||||
for handle in handles {
|
|
||||||
handle.await.unwrap();
|
|
||||||
}
|
|
||||||
println!("got {} store paths", store_paths.read().unwrap().len());
|
|
||||||
|
|
||||||
let (cacheable_tx, mut cacheable_rx) = mpsc::channel(cli.uploader_concurrency.into());
|
|
||||||
|
|
||||||
println!("spawning check_upstream");
|
|
||||||
handles = Vec::new();
|
|
||||||
handles.push(tokio::spawn(async move {
|
|
||||||
check_upstream(
|
|
||||||
store_paths,
|
|
||||||
cacheable_tx,
|
|
||||||
cli.upstream_checker_concurrency,
|
|
||||||
Arc::new(upstream_caches),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}));
|
|
||||||
|
|
||||||
println!("spawning uploader");
|
|
||||||
handles.push(tokio::spawn(async move {
|
|
||||||
uploader(&mut cacheable_rx, binary_cache, cli.uploader_concurrency).await;
|
|
||||||
}));
|
|
||||||
|
|
||||||
// make sure all threads are done
|
|
||||||
for handle in handles {
|
|
||||||
handle.await.unwrap();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// filter out store paths that exist in upstream caches
|
Ok(())
|
||||||
async fn check_upstream(
|
|
||||||
store_paths: Arc<RwLock<Vec<String>>>,
|
|
||||||
cacheable_tx: mpsc::Sender<String>,
|
|
||||||
concurrency: u8,
|
|
||||||
upstream_caches: Arc<Vec<String>>,
|
|
||||||
) {
|
|
||||||
let concurrency = Arc::new(Semaphore::new(concurrency.into()));
|
|
||||||
let c_store_paths = Arc::clone(&store_paths);
|
|
||||||
let store_paths = c_store_paths.read().unwrap().clone();
|
|
||||||
|
|
||||||
for store_path in store_paths {
|
|
||||||
let tx = cacheable_tx.clone();
|
|
||||||
let upstream_caches = Arc::clone(&upstream_caches);
|
|
||||||
let concurrency = Arc::clone(&concurrency);
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let _permit = concurrency.acquire().await.unwrap();
|
|
||||||
let basename = Path::new(&store_path)
|
|
||||||
.file_name()
|
|
||||||
.unwrap()
|
|
||||||
.to_str()
|
|
||||||
.unwrap()
|
|
||||||
.to_string();
|
|
||||||
let hash = basename.split("-").next().unwrap();
|
|
||||||
|
|
||||||
let mut hit = false;
|
|
||||||
for upstream in upstream_caches.as_ref() {
|
|
||||||
let mut uri = upstream.clone();
|
|
||||||
uri.push_str(format!("/{}.narinfo", hash).as_str());
|
|
||||||
|
|
||||||
let res_status = reqwest::Client::new()
|
|
||||||
.head(uri)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.map(|x| x.status());
|
|
||||||
|
|
||||||
if let Ok(res_status) = res_status
|
|
||||||
&& res_status.is_success()
|
|
||||||
{
|
|
||||||
debug!("{} was a hit upstream: {}", store_path, upstream);
|
|
||||||
hit = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !hit {
|
|
||||||
trace!("sending {}", store_path);
|
|
||||||
tx.send(store_path).await.unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn uploader(
|
|
||||||
cacheable_rx: &mut mpsc::Receiver<String>,
|
|
||||||
binary_cache: String,
|
|
||||||
concurrency: u8,
|
|
||||||
) {
|
|
||||||
let upload_count = Arc::new(AtomicUsize::new(0));
|
|
||||||
let failures: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
let concurrency = Arc::new(Semaphore::new(concurrency.into()));
|
|
||||||
let mut handles = Vec::new();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
if let Some(path_to_upload) = cacheable_rx.recv().await {
|
|
||||||
let concurrency = Arc::clone(&concurrency);
|
|
||||||
let failures = Arc::clone(&failures);
|
|
||||||
let binary_cache = binary_cache.clone();
|
|
||||||
let upload_count = Arc::clone(&upload_count);
|
|
||||||
|
|
||||||
handles.push(tokio::spawn(async move {
|
|
||||||
let _permit = concurrency.acquire().await.unwrap();
|
|
||||||
println!("uploading: {}", path_to_upload);
|
|
||||||
if Command::new("nix")
|
|
||||||
.arg("copy")
|
|
||||||
.arg("--to")
|
|
||||||
.arg(&binary_cache)
|
|
||||||
.arg(&path_to_upload)
|
|
||||||
.output()
|
|
||||||
.await
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
println!("WARN: upload failed: {}", path_to_upload);
|
|
||||||
failures.lock().unwrap().push(path_to_upload);
|
|
||||||
} else {
|
|
||||||
upload_count.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
} else {
|
|
||||||
// make sure all threads are done
|
|
||||||
for handle in handles {
|
|
||||||
handle.await.unwrap();
|
|
||||||
}
|
|
||||||
println!("uploaded {} paths", upload_count.load(Ordering::Relaxed));
|
|
||||||
|
|
||||||
let failures = failures.lock().unwrap();
|
|
||||||
if !failures.is_empty() {
|
|
||||||
println!("failed to upload these paths: ");
|
|
||||||
for failure in failures.iter() {
|
|
||||||
print!("{}", failure);
|
|
||||||
}
|
|
||||||
println!();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
135
src/nixcp.rs
Normal file
135
src/nixcp.rs
Normal file
|
@ -0,0 +1,135 @@
|
||||||
|
use std::{
|
||||||
|
iter::once,
|
||||||
|
sync::{
|
||||||
|
Arc, Mutex,
|
||||||
|
atomic::{AtomicUsize, Ordering},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::path_info::PathInfo;
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use log::{info, warn};
|
||||||
|
use tokio::{
|
||||||
|
process::Command,
|
||||||
|
sync::{RwLock, Semaphore, mpsc},
|
||||||
|
};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
pub struct NixCp {
|
||||||
|
upstream_caches: Arc<Vec<Url>>,
|
||||||
|
store_paths: Arc<RwLock<Vec<PathInfo>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NixCp {
|
||||||
|
pub fn with_upstreams(new_upstreams: &[String]) -> Result<Self> {
|
||||||
|
let mut upstreams = Vec::with_capacity(new_upstreams.len() + 1);
|
||||||
|
for upstream in new_upstreams
|
||||||
|
.iter()
|
||||||
|
.chain(once(&"https://cache.nixos.org".to_string()))
|
||||||
|
{
|
||||||
|
upstreams
|
||||||
|
.push(Url::parse(upstream).context(format!("failed to parse {upstream} as url"))?);
|
||||||
|
}
|
||||||
|
Ok(Self {
|
||||||
|
upstream_caches: Arc::new(upstreams),
|
||||||
|
store_paths: Arc::new(RwLock::new(Vec::new())),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn paths_from_package(&mut self, package: &str) -> Result<()> {
|
||||||
|
let path_info = PathInfo::from_path(package).await?;
|
||||||
|
self.store_paths
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.extend(path_info.get_closure().await?);
|
||||||
|
info!("found {} store paths", self.store_paths.read().await.len());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(&'static self) {
|
||||||
|
let (tx, rx) = mpsc::channel(10);
|
||||||
|
let tx = Arc::new(tx);
|
||||||
|
tokio::spawn(self.filter_from_upstream(tx));
|
||||||
|
tokio::spawn(self.uploader("".to_string(), rx));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// filter paths that are on upstream and send to `tx`
|
||||||
|
async fn filter_from_upstream(&self, tx: Arc<mpsc::Sender<String>>) {
|
||||||
|
let permits = Arc::new(Semaphore::new(10));
|
||||||
|
let mut handles = Vec::with_capacity(10);
|
||||||
|
let store_paths = self.store_paths.read().await.clone();
|
||||||
|
|
||||||
|
for path in store_paths.into_iter() {
|
||||||
|
if path.check_upstream_signature(&self.upstream_caches) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
handles.push({
|
||||||
|
let permits = permits.clone();
|
||||||
|
let tx = tx.clone();
|
||||||
|
let upstream_caches = self.upstream_caches.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _permit = permits.acquire().await.unwrap();
|
||||||
|
|
||||||
|
if !path.check_upstream_hit(upstream_caches.as_slice()).await {
|
||||||
|
tx.send(path.to_string()).await.unwrap();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
for handle in handles {
|
||||||
|
handle.await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn uploader(&self, cache: String, mut rx: mpsc::Receiver<String>) {
|
||||||
|
let upload_count = Arc::new(AtomicUsize::new(0));
|
||||||
|
let failures: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
|
||||||
|
let permits = Arc::new(Semaphore::new(10));
|
||||||
|
let mut handles = Vec::with_capacity(10);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if let Some(path_to_upload) = rx.recv().await {
|
||||||
|
let permits = Arc::clone(&permits);
|
||||||
|
let failures = Arc::clone(&failures);
|
||||||
|
let binary_cache = cache.clone();
|
||||||
|
let upload_count = Arc::clone(&upload_count);
|
||||||
|
|
||||||
|
handles.push(tokio::spawn(async move {
|
||||||
|
let _permit = permits.acquire().await.unwrap();
|
||||||
|
info!("uploading: {}", path_to_upload.to_string());
|
||||||
|
if Command::new("nix")
|
||||||
|
.arg("copy")
|
||||||
|
.arg("--to")
|
||||||
|
.arg(&binary_cache)
|
||||||
|
.arg(&path_to_upload.to_string())
|
||||||
|
.output()
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
{
|
||||||
|
warn!("upload failed: {}", path_to_upload);
|
||||||
|
failures.lock().unwrap().push(path_to_upload);
|
||||||
|
} else {
|
||||||
|
upload_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
} else {
|
||||||
|
// make sure all threads are done
|
||||||
|
for handle in handles {
|
||||||
|
handle.await.unwrap();
|
||||||
|
}
|
||||||
|
println!("uploaded {} paths", upload_count.load(Ordering::Relaxed));
|
||||||
|
|
||||||
|
let failures = failures.lock().unwrap();
|
||||||
|
if !failures.is_empty() {
|
||||||
|
warn!("failed to upload these paths: ");
|
||||||
|
for failure in failures.iter() {
|
||||||
|
warn!("{}", failure);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
161
src/path_info.rs
Normal file
161
src/path_info.rs
Normal file
|
@ -0,0 +1,161 @@
|
||||||
|
use std::{collections::HashSet, path::Path};
|
||||||
|
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use log::trace;
|
||||||
|
use regex::Regex;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::process::Command;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
// nix path-info --derivation --json
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct PathInfo {
|
||||||
|
deriver: String,
|
||||||
|
path: String,
|
||||||
|
signatures: Vec<String>,
|
||||||
|
}
|
||||||
|
impl PathInfo {
|
||||||
|
/// get PathInfo for a package or a store path
|
||||||
|
pub async fn from_path(path: &str) -> Result<Self> {
|
||||||
|
let path_info = Command::new("nix")
|
||||||
|
.arg("path-info")
|
||||||
|
.arg("--json")
|
||||||
|
.arg(path)
|
||||||
|
.output()
|
||||||
|
.await
|
||||||
|
.context("`nix path-info` failed for {package}")?;
|
||||||
|
|
||||||
|
Ok(serde_json::from_slice(&path_info.stdout)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_closure(&self) -> Result<Vec<Self>> {
|
||||||
|
let nix_store_cmd = Command::new("nix-store")
|
||||||
|
.arg("--query")
|
||||||
|
.arg("--requisites")
|
||||||
|
.arg("--include-outputs")
|
||||||
|
.arg(&self.deriver)
|
||||||
|
.output()
|
||||||
|
.await
|
||||||
|
.expect("nix-store cmd failed");
|
||||||
|
|
||||||
|
let nix_store_paths = String::from_utf8(nix_store_cmd.stdout)?;
|
||||||
|
let nix_store_paths: Vec<&str> = nix_store_paths.lines().collect();
|
||||||
|
let mut closure = Vec::with_capacity(nix_store_paths.len());
|
||||||
|
for path in nix_store_paths {
|
||||||
|
closure.push(Self::from_path(path).await?);
|
||||||
|
}
|
||||||
|
Ok(closure)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_path(&self) -> &Path {
|
||||||
|
&Path::new(&self.path)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// checks if the path is signed by any upstream. if it is, we assume a cache hit.
|
||||||
|
/// the name of the cache in the signature does not have to be the domain of the cache.
|
||||||
|
/// in fact, it can be any random string. but, most often it is, and this saves us
|
||||||
|
/// a request.
|
||||||
|
pub fn check_upstream_signature(&self, upstreams: &[Url]) -> bool {
|
||||||
|
let upstreams: HashSet<_> = upstreams.iter().filter_map(|x| x.domain()).collect();
|
||||||
|
|
||||||
|
// some caches use names prefixed with -<some number>
|
||||||
|
// e.g. cache.nixos.org-1, nix-community.cachix.org-1
|
||||||
|
let re = Regex::new(r"-\d+$").expect("regex should be valid");
|
||||||
|
for signee in self.signees().iter().map(|&x| re.replace(x, "")) {
|
||||||
|
if upstreams.contains(signee.as_ref()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn signees(&self) -> Vec<&str> {
|
||||||
|
let signees: Vec<_> = self
|
||||||
|
.signatures
|
||||||
|
.iter()
|
||||||
|
.filter_map(|signature| Some(signature.split_once(":")?.0))
|
||||||
|
.collect();
|
||||||
|
trace!("signees for {}: {:?}", self.path, signees);
|
||||||
|
signees
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
|
||||||
|
let basename = self.get_path().file_name().unwrap().to_str().unwrap();
|
||||||
|
let hash = basename.split_once("-").unwrap().0;
|
||||||
|
|
||||||
|
for upstream in upstreams {
|
||||||
|
let upstream = upstream
|
||||||
|
.join(format!("{hash}/.narinfo").as_str())
|
||||||
|
.expect("adding <hash>.narinfo should make a valid url");
|
||||||
|
let res_status = reqwest::Client::new()
|
||||||
|
.head(upstream.as_str())
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map(|x| x.status());
|
||||||
|
|
||||||
|
match &res_status {
|
||||||
|
Ok(status) => return status.is_success(),
|
||||||
|
Err(_) => return false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ToString for PathInfo {
|
||||||
|
fn to_string(&self) -> String {
|
||||||
|
self.path.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn get_signees_from_path_info() {
|
||||||
|
let path_info = PathInfo {
|
||||||
|
deriver: "".to_string(),
|
||||||
|
path: "".to_string(),
|
||||||
|
signatures: vec![
|
||||||
|
"cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(),
|
||||||
|
"nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(),
|
||||||
|
],
|
||||||
|
};
|
||||||
|
let signees = path_info.signees();
|
||||||
|
assert_eq!(signees, vec!["cache.nixos.org-1", "nixcache.cy7.sh"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn match_upstream_cache_from_signature() {
|
||||||
|
let path_info = PathInfo {
|
||||||
|
deriver: "".to_string(),
|
||||||
|
path: "".to_string(),
|
||||||
|
signatures: vec![
|
||||||
|
"cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(),
|
||||||
|
"nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(),
|
||||||
|
"nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs=".to_string(),
|
||||||
|
],
|
||||||
|
};
|
||||||
|
assert_eq!(
|
||||||
|
path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()]),
|
||||||
|
true
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
path_info.check_upstream_signature(&[Url::parse("https://nixcache.cy7.sh").unwrap()]),
|
||||||
|
true
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
path_info.check_upstream_signature(&[
|
||||||
|
Url::parse("https://nix-community.cachix.org").unwrap()
|
||||||
|
]),
|
||||||
|
true
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
path_info
|
||||||
|
.check_upstream_signature(&[Url::parse("https://fake-cache.cachix.org").unwrap()]),
|
||||||
|
false
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue