diff --git a/Cargo.lock b/Cargo.lock index a3e049c..2a630e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -968,6 +968,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humansize" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7" +dependencies = [ + "libm", +] + [[package]] name = "humantime" version = "2.2.0" @@ -1304,6 +1313,12 @@ version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +[[package]] +name = "libm" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72" + [[package]] name = "libmimalloc-sys" version = "0.1.42" @@ -1491,6 +1506,7 @@ dependencies = [ "cxx-build", "ed25519-dalek", "futures", + "humansize", "nix-compat", "object_store", "pkg-config", diff --git a/Cargo.toml b/Cargo.toml index 6ca052f..272be3b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ bytes = "1.10.1" object_store = { version = "0.12.0", features = ["aws"] } ulid = "1.2.1" tracing-subscriber = "0.3.19" +humansize = "2.1.3" [build-dependencies] cxx-build = "1.0" diff --git a/flake.nix b/flake.nix index 01d135a..fb86d40 100644 --- a/flake.nix +++ b/flake.nix @@ -59,7 +59,6 @@ devShells.default = craneLib.devShell { inputsFrom = [ nixcp ]; - RUST_LOG = "nixcp=debug"; RUST_BACKGRACE = 1; # for cpp bindings to work NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include"; diff --git a/src/bindings/mod.rs b/src/bindings/mod.rs index 450bc98..46b86dd 100644 --- a/src/bindings/mod.rs +++ b/src/bindings/mod.rs @@ -150,10 +150,7 @@ impl Stream for AsyncWriteAdapter { match message { Data(v) => Poll::Ready(Some(Ok(v.into()))), Error(exception) => { - let error = std::io::Error::new( - io::ErrorKind::Other, - format!("cxx error: {exception}"), - ); + let error = std::io::Error::other(format!("cxx error: {exception}")); Poll::Ready(Some(Err(error))) } Eof => { diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..dfbab4f --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,65 @@ +use std::path::PathBuf; + +use clap::{Args, Parser, Subcommand}; + +mod bindings; +mod cli; +pub mod make_nar; +pub mod path_info; +pub mod push; +pub mod store; +mod uploader; + +#[derive(Parser, Debug)] +#[command(version)] +#[command(name = "nixcp")] +#[command(about = "Upload store paths to a s3 binary cache")] +#[command(long_about = None)] +pub struct Cli { + #[command(subcommand)] + pub command: Commands, + + /// Whether to enable tokio-console + #[arg(long)] + pub tokio_console: bool, +} + +#[derive(Debug, Subcommand)] +pub enum Commands { + #[command(arg_required_else_help = true)] + Push(PushArgs), +} + +#[derive(Debug, Args)] +pub struct PushArgs { + /// The s3 bucket to upload to + #[arg(long, value_name = "bucket name")] + bucket: String, + + /// Upstream cache to check against. Can be specified multiple times. + /// cache.nixos.org is always included. + #[arg(long = "upstream", short, value_name = "nixcache.example.com")] + upstreams: Vec, + + /// Path to the file containing signing key + /// e.g. ~/cache-priv-key.pem + #[arg(long)] + signing_key: String, + + /// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1 + #[arg(long)] + region: Option, + + /// If unspecifed, will get it from AWS_ENDPOINT envar + /// e.g. https://s3.example.com + #[arg(long)] + endpoint: Option, + + #[arg(long)] + skip_signature_check: bool, + + /// Path to upload + /// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 + #[arg(value_name = "PATH")] + pub paths: Vec, +} diff --git a/src/main.rs b/src/main.rs index 0fefdf5..1afe4b6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,76 +1,10 @@ -#![feature(let_chains)] -#![feature(exit_status_error)] - -use std::path::PathBuf; - use anyhow::{Context, Result}; -use clap::{Args, Parser, Subcommand}; +use clap::Parser; use tracing_subscriber::{EnvFilter, prelude::*}; -use push::Push; -use store::Store; - -mod bindings; -mod cli; -mod make_nar; -mod path_info; -mod push; -mod store; -mod uploader; - -#[derive(Parser, Debug)] -#[command(version)] -#[command(name = "nixcp")] -#[command(about = "Upload store paths to a s3 binary cache")] -#[command(long_about = None)] -struct Cli { - #[command(subcommand)] - command: Commands, - - /// Whether to enable tokio-console - #[arg(long)] - tokio_console: bool, -} - -#[derive(Debug, Subcommand)] -enum Commands { - #[command(arg_required_else_help = true)] - Push(PushArgs), -} - -#[derive(Debug, Args)] -pub struct PushArgs { - /// The s3 bucket to upload to - #[arg(long, value_name = "bucket name")] - bucket: String, - - /// Upstream cache to check against. Can be specified multiple times. - /// cache.nixos.org is always included. - #[arg(long = "upstream", short, value_name = "nixcache.example.com")] - upstreams: Vec, - - /// Path to the file containing signing key - /// e.g. ~/cache-priv-key.pem - #[arg(long)] - signing_key: String, - - /// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1 - #[arg(long)] - region: Option, - - /// If unspecifed, will get it from AWS_ENDPOINT envar - /// e.g. https://s3.example.com - #[arg(long)] - endpoint: Option, - - #[arg(long)] - skip_signature_check: bool, - - /// Path to upload - /// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 - #[arg(value_name = "PATH")] - paths: Vec, -} +use nixcp::push::Push; +use nixcp::store::Store; +use nixcp::{Cli, Commands}; #[tokio::main] async fn main() -> Result<()> { diff --git a/src/make_nar.rs b/src/make_nar.rs index b5c1ab2..97d6b1f 100644 --- a/src/make_nar.rs +++ b/src/make_nar.rs @@ -15,10 +15,10 @@ use crate::store::Store; pub struct MakeNar<'a> { path_info: &'a PathInfo, store: Arc, - nar_hasher: Sha256, + pub nar_hasher: Sha256, /// hash of compressed nar file file_hasher: Sha256, - nar_size: u64, + pub nar_size: u64, file_size: u64, } diff --git a/src/path_info.rs b/src/path_info.rs index beea6b9..1e1282d 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use anyhow::{Context, Result}; +use anyhow::{Context, Result, anyhow}; use futures::future::join_all; use nix_compat::nixbase32; use nix_compat::store_path::StorePath; @@ -13,7 +13,7 @@ use url::Url; use crate::store::Store; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct PathInfo { pub path: StorePath, pub signatures: Vec, @@ -22,33 +22,44 @@ pub struct PathInfo { } impl PathInfo { - pub async fn from_path(path: &Path, store: &Store) -> Result { - debug!("query path info for {:?}", path); + pub async fn from_derivation(drv: &Path, store: &Store) -> Result { + debug!("query path info for {:?}", drv); - let derivation = match path.extension() { - Some(ext) if ext == "drv" => path.as_os_str().as_encoded_bytes(), + let derivation = match drv.extension() { + Some(ext) if ext == "drv" => drv.as_os_str().as_encoded_bytes(), _ => { &Command::new("nix") .arg("path-info") .arg("--derivation") - .arg(path) + .arg(drv) .output() .await - .context(format!("run command: nix path-info --derivaiton {path:?}"))? + .context(format!("run command: nix path-info --derivaiton {drv:?}"))? .stdout } }; let derivation = String::from_utf8_lossy(derivation); debug!("derivation: {derivation}"); - let store_path = StorePath::from_absolute_path(derivation.trim().as_bytes()) - .context("storepath from derivation")?; + if derivation.is_empty() { + return Err(anyhow!( + "nix path-info did not return a derivation for {drv:#?}" + )); + } + + Self::from_path(derivation.trim(), store).await + } + + pub async fn from_path(path: &str, store: &Store) -> Result { + let store_path = + StorePath::from_absolute_path(path.as_bytes()).context("storepath from path")?; store .query_path_info(store_path) .await - .context("query pathinfo for derivation") + .context("query pathinfo for path") } + // TODO: skip call to query_path_info and return Vec? pub async fn get_closure(&self, store: &Store) -> Result> { let futs = store .compute_fs_closure(self.path.clone()) diff --git a/src/push.rs b/src/push.rs index 8943355..bf25ea1 100644 --- a/src/push.rs +++ b/src/push.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, fs, iter::once, path::PathBuf, @@ -10,6 +11,7 @@ use std::{ use anyhow::{Context, Result}; use futures::future::join_all; +use humansize::{DECIMAL, format_size}; use nix_compat::narinfo::{self, SigningKey}; use object_store::aws::{AmazonS3, AmazonS3Builder}; use tokio::sync::{RwLock, Semaphore, mpsc}; @@ -20,7 +22,7 @@ use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader}; pub struct Push { upstream_caches: Vec, - store_paths: Arc>>, + store_paths: Arc>>, signing_key: SigningKey, store: Arc, s3: Arc, @@ -60,7 +62,7 @@ impl Push { Ok(Self { upstream_caches: upstreams, - store_paths: Arc::new(RwLock::new(Vec::new())), + store_paths: Arc::new(RwLock::new(HashSet::new())), signing_key, store: Arc::new(store), s3: Arc::new(s3_builder.build()?), @@ -78,7 +80,7 @@ impl Push { let store = self.store.clone(); futs.push(tokio::spawn(async move { - let path_info = PathInfo::from_path(path.as_path(), &store) + let path_info = PathInfo::from_derivation(path.as_path(), &store) .await .context("get path info for path")?; debug!("path-info for {path:?}: {path_info:?}"); @@ -157,34 +159,24 @@ impl Push { async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { let mut uploads = Vec::new(); - let permits = Arc::new(Semaphore::new(16)); - let big_permits = Arc::new(Semaphore::new(5)); + let permits = Arc::new(Semaphore::new(10)); loop { let permits = permits.clone(); - let big_permits = big_permits.clone(); if let Some(path_to_upload) = rx.recv().await { - debug!("upload permits available: {}", permits.available_permits()); - let mut permit = permits.acquire_owned().await.unwrap(); - uploads.push(tokio::spawn({ - // a large directory may have many files and end up causing "too many open files" - if PathBuf::from(path_to_upload.absolute_path()).is_dir() - && path_to_upload.nar_size > 5 * 1024 * 1024 - { - debug!( - "upload big permits available: {}", - big_permits.available_permits() - ); - // drop regular permit and take the big one - permit = big_permits.acquire_owned().await.unwrap(); - } - + // large uploads will be concurrently uploaded with multipart anyway so don't spawn + // too much of them + let permit = if path_to_upload.nar_size > 15 * 1024 * 1024 { + Some(permits.acquire_owned().await.unwrap()) + } else { + None + }; println!( "uploading: {} (size: {})", path_to_upload.absolute_path(), - path_to_upload.nar_size + format_size(path_to_upload.nar_size, DECIMAL) ); let uploader = Uploader::new(&self.signing_key, path_to_upload)?; let s3 = self.s3.clone(); diff --git a/tests/common/mod.rs b/tests/common/mod.rs new file mode 100644 index 0000000..3870a1d --- /dev/null +++ b/tests/common/mod.rs @@ -0,0 +1,32 @@ +#![allow(dead_code)] + +use std::process::Command; +use std::sync::Arc; + +use nixcp::store::Store; + +pub const HELLO: &str = "github:nixos/nixpkgs?ref=f771eb401a46846c1aebd20552521b233dd7e18b#hello"; +pub const HELLO_DRV: &str = "iqbwkm8mjjjlmw6x6ry9rhzin2cp9372-hello-2.12.1.drv"; +pub const HELLO_PATH: &str = "/nix/store/9bwryidal9q3g91cjm6xschfn4ikd82q-hello-2.12.1"; + +pub struct Context { + pub store: Arc, +} + +impl Context { + fn new() -> Self { + // hello must be in the store + Command::new("nix") + .arg("build") + .arg("--no-link") + .arg(HELLO) + .status() + .unwrap(); + let store = Arc::new(Store::connect().expect("connect to nix store")); + Self { store } + } +} + +pub fn context() -> Context { + Context::new() +} diff --git a/tests/nar.rs b/tests/nar.rs new file mode 100644 index 0000000..5ebadc5 --- /dev/null +++ b/tests/nar.rs @@ -0,0 +1,26 @@ +use crate::common::HELLO_PATH; +use nix_compat::nixbase32; +use nixcp::make_nar::MakeNar; +use nixcp::path_info::PathInfo; +use sha2::Digest; +use tokio::io::AsyncReadExt; + +mod common; + +#[tokio::test] +async fn nar_size_and_hash() { + let ctx = common::context(); + let path_info = PathInfo::from_path(HELLO_PATH, &ctx.store).await.unwrap(); + + let mut nar = MakeNar::new(&path_info, ctx.store).unwrap(); + let mut reader = nar.compress_and_hash().unwrap(); + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await.unwrap(); + drop(reader); + + assert_eq!(nar.nar_size, 234680); + + let nar_hash = nar.nar_hasher.finalize(); + let real_nar_hash = "08za7nnjda8kpdsd73v3mhykjvp0rsmskwsr37winhmzgm6iw79w"; + assert_eq!(nixbase32::encode(nar_hash.as_slice()), real_nar_hash); +} diff --git a/tests/path_info.rs b/tests/path_info.rs new file mode 100644 index 0000000..57738fd --- /dev/null +++ b/tests/path_info.rs @@ -0,0 +1,37 @@ +use nixcp::path_info::PathInfo; +use std::path::PathBuf; + +use crate::common::{HELLO, HELLO_DRV, HELLO_PATH}; + +mod common; + +#[tokio::test] +async fn path_info_from_package() { + let ctx = common::context(); + let path = PathBuf::from(HELLO); + let path_info = PathInfo::from_derivation(&path, &ctx.store) + .await + .expect("get pathinfo from package"); + assert_eq!(path_info.path.to_string(), HELLO_DRV); +} + +#[tokio::test] +async fn path_info_from_path() { + let ctx = common::context(); + let path = PathBuf::from(HELLO_PATH); + let path_info = PathInfo::from_derivation(&path, &ctx.store) + .await + .expect("get pathinfo from package"); + assert_eq!(path_info.path.to_string(), HELLO_DRV); +} + +#[tokio::test] +async fn closure() { + let ctx = common::context(); + let path = PathBuf::from(HELLO); + let path_info = PathInfo::from_derivation(&path, &ctx.store) + .await + .expect("get pathinfo from package"); + let closure = path_info.get_closure(&ctx.store).await.unwrap(); + assert_eq!(closure.len(), 472); +}