reformat and stuff ig

This commit is contained in:
cy 2025-04-13 01:33:15 -04:00
parent 70739d7dcf
commit f51099b911
Signed by: cy
SSH key fingerprint: SHA256:o/geVWV4om1QhUSkKvDQeW/eAihwnjyXkqMwrVdbuts
7 changed files with 279 additions and 118 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
/target /target
.direnv .direnv
result

10
Cargo.lock generated
View file

@ -76,6 +76,12 @@ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.59.0",
] ]
[[package]]
name = "anyhow"
version = "1.0.97"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
[[package]] [[package]]
name = "atomic-waker" name = "atomic-waker"
version = "1.1.2" version = "1.1.2"
@ -799,13 +805,16 @@ dependencies = [
name = "nixcp" name = "nixcp"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"clap", "clap",
"env_logger", "env_logger",
"log", "log",
"regex",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"url",
] ]
[[package]] [[package]]
@ -1469,6 +1478,7 @@ dependencies = [
"form_urlencoded", "form_urlencoded",
"idna", "idna",
"percent-encoding", "percent-encoding",
"serde",
] ]
[[package]] [[package]]

View file

@ -4,10 +4,13 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
anyhow = "1.0.97"
clap = { version = "4.5.34", features = ["derive"] } clap = { version = "4.5.34", features = ["derive"] }
env_logger = "0.11.7" env_logger = "0.11.7"
log = "0.4.27" log = "0.4.27"
regex = "1.11.1"
reqwest = "0.12.15" reqwest = "0.12.15"
serde = { version = "1.0.219", features = [ "derive" ]} serde = { version = "1.0.219", features = [ "derive" ]}
serde_json = "1.0.140" serde_json = "1.0.140"
tokio = { version = "1.44.1", features = [ "full" ]} tokio = { version = "1.44.1", features = [ "full" ]}
url = { version = "2.5.4", features = [ "serde" ]}

2
src/cli.rs Normal file
View file

@ -0,0 +1,2 @@

View file

@ -1,4 +1,5 @@
#![feature(let_chains)] #![feature(let_chains)]
#![feature(extend_one)]
use std::path::Path; use std::path::Path;
use std::sync::{ use std::sync::{
@ -6,92 +7,39 @@ use std::sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
}; };
use clap::Parser; use anyhow::Result;
use clap::{Parser, Subcommand};
use log::{debug, trace}; use log::{debug, trace};
use serde::{Deserialize, Serialize};
use tokio::process::Command; use tokio::process::Command;
use tokio::sync::{Semaphore, mpsc}; use tokio::sync::{Semaphore, mpsc};
const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"]; use nixcp::NixCp;
// nix path-info --derivation --json mod cli;
#[derive(Debug, Serialize, Deserialize)] mod nixcp;
#[serde(rename_all = "camelCase")] mod path_info;
struct PathInfo {
ca: String,
nar_hash: String,
nar_size: u32,
path: String,
references: Vec<String>,
registration_time: u32,
valid: bool,
}
impl PathInfo { #[derive(Parser, Debug)]
// find derivations related to package #[command(version, name = "nixcp")]
async fn from_package(package: &str, recursive: bool) -> Vec<Self> {
let mut args = vec!["path-info", "--derivation", "--json"];
if recursive {
args.push("--recursive");
}
let path_infos = Command::new("nix")
.args(args)
.arg(package)
.output()
.await
.expect("path-info failed");
let path_infos: Vec<PathInfo> = serde_json::from_slice(&path_infos.stdout)
.expect("no derivations found for this package");
debug!("PathInfo's from nix path-info: {:#?}", path_infos);
path_infos
}
// find store paths related to derivation
async fn get_store_paths(&self) -> Vec<String> {
let mut store_paths: Vec<String> = Vec::new();
let nix_store_cmd = Command::new("nix-store")
.arg("--query")
.arg("--requisites")
.arg("--include-outputs")
.arg(&self.path)
.output()
.await
.expect("nix-store cmd failed");
let nix_store_out = String::from_utf8(nix_store_cmd.stdout).unwrap();
for store_path in nix_store_out.split_whitespace().map(ToString::to_string) {
store_paths.push(store_path);
}
store_paths
}
}
#[derive(Parser)]
#[command(version, about, long_about = None)]
struct Cli { struct Cli {
/// Package to upload to the binary cache #[command(subcommand)]
package: String, command: Commands,
/// Address of the binary cache (passed to nix copy --to) /// Address of the binary cache (passed to nix copy --to)
#[arg(long, value_name = "BINARY CACHE")] #[arg(long, value_name = "BINARY CACHE")]
to: String, to: String,
/// Upstream cache to check against. Can be specified multiple times. /// Upstream cache to check against. Can be specified multiple times.
/// cache.nixos.org is always included /// cache.nixos.org is always included (unless --no-nixos-cache is passed)
#[arg(long, short)] #[arg(long = "upstream-cache", short)]
upstream_cache: Vec<String>, upstream_caches: Vec<String>,
/// Whether to pass --recursive to nix path-info. Can queue a huge number of paths to upload
#[arg(long, short, default_value_t = false)]
recursive: bool,
/// Concurrent upstream cache checkers /// Concurrent upstream cache checkers
#[arg(long, default_value_t = 32)] #[arg(long, default_value_t = 32)]
upstream_checker_concurrency: u8, upstream_checker_concurrency: u8,
/// Concurrent uploaders /// Concurrent uploaders
#[arg(long, default_value_t = 16)] #[arg(long, default_value_t = 4)]
uploader_concurrency: u8, uploader_concurrency: u8,
/// Concurrent nix-store commands to run /// Concurrent nix-store commands to run
@ -99,57 +47,34 @@ struct Cli {
nix_store_concurrency: u8, nix_store_concurrency: u8,
} }
#[derive(Debug, Subcommand)]
enum Commands {
Push {
/// Package or store path to upload
/// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
package: String,
},
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() -> Result<()> {
env_logger::init(); env_logger::init();
let cli = Cli::parse(); let cli = Cli::parse();
let package = &cli.package; let mut nixcp = NixCp::new();
let binary_cache = cli.to; nixcp.add_upstreams(&cli.upstream_caches)?;
let mut upstream_caches = cli.upstream_cache;
for upstream in UPSTREAM_CACHES { match &cli.command {
upstream_caches.push(upstream.to_string()); Commands::Push { package } => {
nixcp.paths_from_package(package).await?;
} }
debug!("package: {}", package);
debug!("binary cache: {}", binary_cache);
debug!("upstream caches: {:#?}", upstream_caches);
println!("querying nix path-info");
let derivations = PathInfo::from_package(package, cli.recursive).await;
println!("got {} derivations", derivations.len());
println!("querying nix-store");
let mut handles = Vec::new();
let concurrency = Arc::new(Semaphore::new(cli.nix_store_concurrency.into()));
let store_paths = Arc::new(RwLock::new(Vec::new()));
for derivation in derivations {
let store_paths = Arc::clone(&store_paths);
let permit = Arc::clone(&concurrency);
handles.push(tokio::spawn(async move {
let _permit = permit.acquire_owned().await.unwrap();
let paths = derivation.get_store_paths().await;
store_paths.write().unwrap().extend(paths);
}));
} }
// resolve store paths for all derivations before we move on
for handle in handles {
handle.await.unwrap();
}
println!("got {} store paths", store_paths.read().unwrap().len());
Ok(())
/*
let (cacheable_tx, mut cacheable_rx) = mpsc::channel(cli.uploader_concurrency.into()); let (cacheable_tx, mut cacheable_rx) = mpsc::channel(cli.uploader_concurrency.into());
println!("spawning check_upstream"); println!("spawning check_upstream");
handles = Vec::new();
handles.push(tokio::spawn(async move {
check_upstream(
store_paths,
cacheable_tx,
cli.upstream_checker_concurrency,
Arc::new(upstream_caches),
)
.await;
}));
println!("spawning uploader"); println!("spawning uploader");
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
@ -160,6 +85,7 @@ async fn main() {
for handle in handles { for handle in handles {
handle.await.unwrap(); handle.await.unwrap();
} }
*/
} }
// filter out store paths that exist in upstream caches // filter out store paths that exist in upstream caches

64
src/nixcp.rs Normal file
View file

@ -0,0 +1,64 @@
use std::sync::Arc;
use crate::path_info::PathInfo;
use anyhow::{Context, Result};
use log::info;
use tokio::sync::{Semaphore, mpsc};
use url::Url;
pub struct NixCp {
upstream_caches: Arc<Vec<Url>>,
store_paths: Vec<PathInfo>,
}
impl NixCp {
pub fn new() -> Self {
Self {
upstream_caches: vec![Url::parse("https://cache.nixos.org").unwrap()],
store_paths: Vec::new(),
}
}
pub fn add_upstreams(&mut self, upstreams: &[String]) -> Result<()> {
self.upstream_caches.reserve(upstreams.len());
for upstream in upstreams {
self.upstream_caches
.push(Url::parse(upstream).context(format!("failed to parse {upstream} as url"))?);
}
Ok(())
}
pub async fn paths_from_package(&mut self, package: &str) -> Result<()> {
let path_info = PathInfo::from_path(package).await?;
self.store_paths = path_info.get_closure().await?;
info!("found {} store paths", self.store_paths.len());
Ok(())
}
pub async fn run(&mut self) {}
/// filter paths that are on upstream and send to `tx`
async fn filter_from_upstream(&self, tx: mpsc::Sender<&PathInfo>) {
let permits = Arc::new(Semaphore::new(10));
let mut handles = Vec::new();
for path in &self.store_paths {
if path.check_upstream_signature(&self.upstream_caches) {
continue;
}
let permits = permits.clone();
let tx = tx.clone();
handles.push(tokio::spawn(async move {
let _permit = permits.acquire().await.unwrap();
if !path.check_upstream_hit(&self.upstream_caches).await {
tx.send(path);
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}
}

155
src/path_info.rs Normal file
View file

@ -0,0 +1,155 @@
use std::{collections::HashSet, path::Path};
use anyhow::{Context, Result};
use log::trace;
use regex::Regex;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
use url::Url;
// nix path-info --derivation --json
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PathInfo {
deriver: String,
path: String,
signatures: Vec<String>,
}
impl PathInfo {
/// get PathInfo for a package or a store path
pub async fn from_path(path: &str) -> Result<Self> {
let path_info = Command::new("nix")
.arg("path-info")
.arg("--json")
.arg(path)
.output()
.await
.context("`nix path-info` failed for {package}")?;
Ok(serde_json::from_slice(&path_info.stdout)?)
}
pub async fn get_closure(&self) -> Result<Vec<Self>> {
let nix_store_cmd = Command::new("nix-store")
.arg("--query")
.arg("--requisites")
.arg("--include-outputs")
.arg(&self.deriver)
.output()
.await
.expect("nix-store cmd failed");
let nix_store_paths = String::from_utf8(nix_store_cmd.stdout)?;
let nix_store_paths: Vec<&str> = nix_store_paths.lines().collect();
let mut closure = Vec::with_capacity(nix_store_paths.len());
for path in nix_store_paths {
closure.push(Self::from_path(path).await?);
}
Ok(closure)
}
pub fn get_path(&self) -> &Path {
&Path::new(&self.path)
}
/// checks if the path is signed by any upstream. if it is, we assume a cache hit.
/// the name of the cache in the signature does not have to be the domain of the cache.
/// in fact, it can be any random string. but, most often it is, and this saves us
/// a request.
pub fn check_upstream_signature(&self, upstreams: &[Url]) -> bool {
let upstreams: HashSet<_> = upstreams.iter().filter_map(|x| x.domain()).collect();
// some caches use names prefixed with -<some number>
// e.g. cache.nixos.org-1, nix-community.cachix.org-1
let re = Regex::new(r"-\d+$").expect("regex should be valid");
for signee in self.signees().iter().map(|&x| re.replace(x, "")) {
if upstreams.contains(signee.as_ref()) {
return true;
}
}
return false;
}
fn signees(&self) -> Vec<&str> {
let signees: Vec<_> = self
.signatures
.iter()
.filter_map(|signature| Some(signature.split_once(":")?.0))
.collect();
trace!("signees for {}: {:?}", self.path, signees);
signees
}
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
let basename = self.get_path().file_name().unwrap().to_str().unwrap();
let hash = basename.split_once("-").unwrap().0;
for upstream in upstreams {
let upstream = upstream
.join(format!("{hash}/.narinfo").as_str())
.expect("adding <hash>.narinfo should make a valid url");
let res_status = reqwest::Client::new()
.head(upstream.as_str())
.send()
.await
.map(|x| x.status());
match &res_status {
Ok(status) => return status.is_success(),
Err(_) => return false,
}
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn get_signees_from_path_info() {
let path_info = PathInfo {
deriver: "".to_string(),
path: "".to_string(),
signatures: vec![
"cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(),
"nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(),
],
};
let signees = path_info.signees();
assert_eq!(signees, vec!["cache.nixos.org-1", "nixcache.cy7.sh"]);
}
#[test]
fn match_upstream_cache_from_signature() {
let path_info = PathInfo {
deriver: "".to_string(),
path: "".to_string(),
signatures: vec![
"cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(),
"nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(),
"nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs=".to_string(),
],
};
assert_eq!(
path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()]),
true
);
assert_eq!(
path_info.check_upstream_signature(&[Url::parse("https://nixcache.cy7.sh").unwrap()]),
true
);
assert_eq!(
path_info.check_upstream_signature(&[
Url::parse("https://nix-community.cachix.org").unwrap()
]),
true
);
assert_eq!(
path_info
.check_upstream_signature(&[Url::parse("https://fake-cache.cachix.org").unwrap()]),
false
);
}
}