diff --git a/Cargo.lock b/Cargo.lock index 2a630e0..a3e049c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -968,15 +968,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humansize" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7" -dependencies = [ - "libm", -] - [[package]] name = "humantime" version = "2.2.0" @@ -1313,12 +1304,6 @@ version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" -[[package]] -name = "libm" -version = "0.2.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72" - [[package]] name = "libmimalloc-sys" version = "0.1.42" @@ -1506,7 +1491,6 @@ dependencies = [ "cxx-build", "ed25519-dalek", "futures", - "humansize", "nix-compat", "object_store", "pkg-config", diff --git a/Cargo.toml b/Cargo.toml index 272be3b..6ca052f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,6 @@ bytes = "1.10.1" object_store = { version = "0.12.0", features = ["aws"] } ulid = "1.2.1" tracing-subscriber = "0.3.19" -humansize = "2.1.3" [build-dependencies] cxx-build = "1.0" diff --git a/flake.nix b/flake.nix index fb86d40..01d135a 100644 --- a/flake.nix +++ b/flake.nix @@ -59,6 +59,7 @@ devShells.default = craneLib.devShell { inputsFrom = [ nixcp ]; + RUST_LOG = "nixcp=debug"; RUST_BACKGRACE = 1; # for cpp bindings to work NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include"; diff --git a/src/bindings/mod.rs b/src/bindings/mod.rs index 46b86dd..450bc98 100644 --- a/src/bindings/mod.rs +++ b/src/bindings/mod.rs @@ -150,7 +150,10 @@ impl Stream for AsyncWriteAdapter { match message { Data(v) => Poll::Ready(Some(Ok(v.into()))), Error(exception) => { - let error = std::io::Error::other(format!("cxx error: {exception}")); + let error = std::io::Error::new( + io::ErrorKind::Other, + format!("cxx error: {exception}"), + ); Poll::Ready(Some(Err(error))) } Eof => { diff --git a/src/lib.rs b/src/lib.rs deleted file mode 100644 index dfbab4f..0000000 --- a/src/lib.rs +++ /dev/null @@ -1,65 +0,0 @@ -use std::path::PathBuf; - -use clap::{Args, Parser, Subcommand}; - -mod bindings; -mod cli; -pub mod make_nar; -pub mod path_info; -pub mod push; -pub mod store; -mod uploader; - -#[derive(Parser, Debug)] -#[command(version)] -#[command(name = "nixcp")] -#[command(about = "Upload store paths to a s3 binary cache")] -#[command(long_about = None)] -pub struct Cli { - #[command(subcommand)] - pub command: Commands, - - /// Whether to enable tokio-console - #[arg(long)] - pub tokio_console: bool, -} - -#[derive(Debug, Subcommand)] -pub enum Commands { - #[command(arg_required_else_help = true)] - Push(PushArgs), -} - -#[derive(Debug, Args)] -pub struct PushArgs { - /// The s3 bucket to upload to - #[arg(long, value_name = "bucket name")] - bucket: String, - - /// Upstream cache to check against. Can be specified multiple times. - /// cache.nixos.org is always included. - #[arg(long = "upstream", short, value_name = "nixcache.example.com")] - upstreams: Vec, - - /// Path to the file containing signing key - /// e.g. ~/cache-priv-key.pem - #[arg(long)] - signing_key: String, - - /// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1 - #[arg(long)] - region: Option, - - /// If unspecifed, will get it from AWS_ENDPOINT envar - /// e.g. https://s3.example.com - #[arg(long)] - endpoint: Option, - - #[arg(long)] - skip_signature_check: bool, - - /// Path to upload - /// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 - #[arg(value_name = "PATH")] - pub paths: Vec, -} diff --git a/src/main.rs b/src/main.rs index 1afe4b6..0fefdf5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,76 @@ +#![feature(let_chains)] +#![feature(exit_status_error)] + +use std::path::PathBuf; + use anyhow::{Context, Result}; -use clap::Parser; +use clap::{Args, Parser, Subcommand}; use tracing_subscriber::{EnvFilter, prelude::*}; -use nixcp::push::Push; -use nixcp::store::Store; -use nixcp::{Cli, Commands}; +use push::Push; +use store::Store; + +mod bindings; +mod cli; +mod make_nar; +mod path_info; +mod push; +mod store; +mod uploader; + +#[derive(Parser, Debug)] +#[command(version)] +#[command(name = "nixcp")] +#[command(about = "Upload store paths to a s3 binary cache")] +#[command(long_about = None)] +struct Cli { + #[command(subcommand)] + command: Commands, + + /// Whether to enable tokio-console + #[arg(long)] + tokio_console: bool, +} + +#[derive(Debug, Subcommand)] +enum Commands { + #[command(arg_required_else_help = true)] + Push(PushArgs), +} + +#[derive(Debug, Args)] +pub struct PushArgs { + /// The s3 bucket to upload to + #[arg(long, value_name = "bucket name")] + bucket: String, + + /// Upstream cache to check against. Can be specified multiple times. + /// cache.nixos.org is always included. + #[arg(long = "upstream", short, value_name = "nixcache.example.com")] + upstreams: Vec, + + /// Path to the file containing signing key + /// e.g. ~/cache-priv-key.pem + #[arg(long)] + signing_key: String, + + /// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1 + #[arg(long)] + region: Option, + + /// If unspecifed, will get it from AWS_ENDPOINT envar + /// e.g. https://s3.example.com + #[arg(long)] + endpoint: Option, + + #[arg(long)] + skip_signature_check: bool, + + /// Path to upload + /// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 + #[arg(value_name = "PATH")] + paths: Vec, +} #[tokio::main] async fn main() -> Result<()> { diff --git a/src/make_nar.rs b/src/make_nar.rs index 97d6b1f..b5c1ab2 100644 --- a/src/make_nar.rs +++ b/src/make_nar.rs @@ -15,10 +15,10 @@ use crate::store::Store; pub struct MakeNar<'a> { path_info: &'a PathInfo, store: Arc, - pub nar_hasher: Sha256, + nar_hasher: Sha256, /// hash of compressed nar file file_hasher: Sha256, - pub nar_size: u64, + nar_size: u64, file_size: u64, } diff --git a/src/path_info.rs b/src/path_info.rs index 1e1282d..beea6b9 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use anyhow::{Context, Result, anyhow}; +use anyhow::{Context, Result}; use futures::future::join_all; use nix_compat::nixbase32; use nix_compat::store_path::StorePath; @@ -13,7 +13,7 @@ use url::Url; use crate::store::Store; -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone)] pub struct PathInfo { pub path: StorePath, pub signatures: Vec, @@ -22,44 +22,33 @@ pub struct PathInfo { } impl PathInfo { - pub async fn from_derivation(drv: &Path, store: &Store) -> Result { - debug!("query path info for {:?}", drv); + pub async fn from_path(path: &Path, store: &Store) -> Result { + debug!("query path info for {:?}", path); - let derivation = match drv.extension() { - Some(ext) if ext == "drv" => drv.as_os_str().as_encoded_bytes(), + let derivation = match path.extension() { + Some(ext) if ext == "drv" => path.as_os_str().as_encoded_bytes(), _ => { &Command::new("nix") .arg("path-info") .arg("--derivation") - .arg(drv) + .arg(path) .output() .await - .context(format!("run command: nix path-info --derivaiton {drv:?}"))? + .context(format!("run command: nix path-info --derivaiton {path:?}"))? .stdout } }; let derivation = String::from_utf8_lossy(derivation); debug!("derivation: {derivation}"); - if derivation.is_empty() { - return Err(anyhow!( - "nix path-info did not return a derivation for {drv:#?}" - )); - } - - Self::from_path(derivation.trim(), store).await - } - - pub async fn from_path(path: &str, store: &Store) -> Result { - let store_path = - StorePath::from_absolute_path(path.as_bytes()).context("storepath from path")?; + let store_path = StorePath::from_absolute_path(derivation.trim().as_bytes()) + .context("storepath from derivation")?; store .query_path_info(store_path) .await - .context("query pathinfo for path") + .context("query pathinfo for derivation") } - // TODO: skip call to query_path_info and return Vec? pub async fn get_closure(&self, store: &Store) -> Result> { let futs = store .compute_fs_closure(self.path.clone()) diff --git a/src/push.rs b/src/push.rs index bf25ea1..8943355 100644 --- a/src/push.rs +++ b/src/push.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashSet, fs, iter::once, path::PathBuf, @@ -11,7 +10,6 @@ use std::{ use anyhow::{Context, Result}; use futures::future::join_all; -use humansize::{DECIMAL, format_size}; use nix_compat::narinfo::{self, SigningKey}; use object_store::aws::{AmazonS3, AmazonS3Builder}; use tokio::sync::{RwLock, Semaphore, mpsc}; @@ -22,7 +20,7 @@ use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader}; pub struct Push { upstream_caches: Vec, - store_paths: Arc>>, + store_paths: Arc>>, signing_key: SigningKey, store: Arc, s3: Arc, @@ -62,7 +60,7 @@ impl Push { Ok(Self { upstream_caches: upstreams, - store_paths: Arc::new(RwLock::new(HashSet::new())), + store_paths: Arc::new(RwLock::new(Vec::new())), signing_key, store: Arc::new(store), s3: Arc::new(s3_builder.build()?), @@ -80,7 +78,7 @@ impl Push { let store = self.store.clone(); futs.push(tokio::spawn(async move { - let path_info = PathInfo::from_derivation(path.as_path(), &store) + let path_info = PathInfo::from_path(path.as_path(), &store) .await .context("get path info for path")?; debug!("path-info for {path:?}: {path_info:?}"); @@ -159,24 +157,34 @@ impl Push { async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { let mut uploads = Vec::new(); - let permits = Arc::new(Semaphore::new(10)); + let permits = Arc::new(Semaphore::new(16)); + let big_permits = Arc::new(Semaphore::new(5)); loop { let permits = permits.clone(); + let big_permits = big_permits.clone(); if let Some(path_to_upload) = rx.recv().await { + debug!("upload permits available: {}", permits.available_permits()); + let mut permit = permits.acquire_owned().await.unwrap(); + uploads.push(tokio::spawn({ - // large uploads will be concurrently uploaded with multipart anyway so don't spawn - // too much of them - let permit = if path_to_upload.nar_size > 15 * 1024 * 1024 { - Some(permits.acquire_owned().await.unwrap()) - } else { - None - }; + // a large directory may have many files and end up causing "too many open files" + if PathBuf::from(path_to_upload.absolute_path()).is_dir() + && path_to_upload.nar_size > 5 * 1024 * 1024 + { + debug!( + "upload big permits available: {}", + big_permits.available_permits() + ); + // drop regular permit and take the big one + permit = big_permits.acquire_owned().await.unwrap(); + } + println!( "uploading: {} (size: {})", path_to_upload.absolute_path(), - format_size(path_to_upload.nar_size, DECIMAL) + path_to_upload.nar_size ); let uploader = Uploader::new(&self.signing_key, path_to_upload)?; let s3 = self.s3.clone(); diff --git a/tests/common/mod.rs b/tests/common/mod.rs deleted file mode 100644 index 3870a1d..0000000 --- a/tests/common/mod.rs +++ /dev/null @@ -1,32 +0,0 @@ -#![allow(dead_code)] - -use std::process::Command; -use std::sync::Arc; - -use nixcp::store::Store; - -pub const HELLO: &str = "github:nixos/nixpkgs?ref=f771eb401a46846c1aebd20552521b233dd7e18b#hello"; -pub const HELLO_DRV: &str = "iqbwkm8mjjjlmw6x6ry9rhzin2cp9372-hello-2.12.1.drv"; -pub const HELLO_PATH: &str = "/nix/store/9bwryidal9q3g91cjm6xschfn4ikd82q-hello-2.12.1"; - -pub struct Context { - pub store: Arc, -} - -impl Context { - fn new() -> Self { - // hello must be in the store - Command::new("nix") - .arg("build") - .arg("--no-link") - .arg(HELLO) - .status() - .unwrap(); - let store = Arc::new(Store::connect().expect("connect to nix store")); - Self { store } - } -} - -pub fn context() -> Context { - Context::new() -} diff --git a/tests/nar.rs b/tests/nar.rs deleted file mode 100644 index 5ebadc5..0000000 --- a/tests/nar.rs +++ /dev/null @@ -1,26 +0,0 @@ -use crate::common::HELLO_PATH; -use nix_compat::nixbase32; -use nixcp::make_nar::MakeNar; -use nixcp::path_info::PathInfo; -use sha2::Digest; -use tokio::io::AsyncReadExt; - -mod common; - -#[tokio::test] -async fn nar_size_and_hash() { - let ctx = common::context(); - let path_info = PathInfo::from_path(HELLO_PATH, &ctx.store).await.unwrap(); - - let mut nar = MakeNar::new(&path_info, ctx.store).unwrap(); - let mut reader = nar.compress_and_hash().unwrap(); - let mut buf = Vec::new(); - reader.read_to_end(&mut buf).await.unwrap(); - drop(reader); - - assert_eq!(nar.nar_size, 234680); - - let nar_hash = nar.nar_hasher.finalize(); - let real_nar_hash = "08za7nnjda8kpdsd73v3mhykjvp0rsmskwsr37winhmzgm6iw79w"; - assert_eq!(nixbase32::encode(nar_hash.as_slice()), real_nar_hash); -} diff --git a/tests/path_info.rs b/tests/path_info.rs deleted file mode 100644 index 57738fd..0000000 --- a/tests/path_info.rs +++ /dev/null @@ -1,37 +0,0 @@ -use nixcp::path_info::PathInfo; -use std::path::PathBuf; - -use crate::common::{HELLO, HELLO_DRV, HELLO_PATH}; - -mod common; - -#[tokio::test] -async fn path_info_from_package() { - let ctx = common::context(); - let path = PathBuf::from(HELLO); - let path_info = PathInfo::from_derivation(&path, &ctx.store) - .await - .expect("get pathinfo from package"); - assert_eq!(path_info.path.to_string(), HELLO_DRV); -} - -#[tokio::test] -async fn path_info_from_path() { - let ctx = common::context(); - let path = PathBuf::from(HELLO_PATH); - let path_info = PathInfo::from_derivation(&path, &ctx.store) - .await - .expect("get pathinfo from package"); - assert_eq!(path_info.path.to_string(), HELLO_DRV); -} - -#[tokio::test] -async fn closure() { - let ctx = common::context(); - let path = PathBuf::from(HELLO); - let path_info = PathInfo::from_derivation(&path, &ctx.store) - .await - .expect("get pathinfo from package"); - let closure = path_info.get_closure(&ctx.store).await.unwrap(); - assert_eq!(closure.len(), 472); -}