diff --git a/Cargo.lock b/Cargo.lock index 901bf4b..86744e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -647,7 +647,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4" dependencies = [ "memchr", - "regex-automata 0.4.9", + "regex-automata", "serde", ] @@ -1052,6 +1052,29 @@ dependencies = [ "syn", ] +[[package]] +name = "env_filter" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3716d7a920fb4fac5d84e9d4bce8ceb321e9414b4409da61b07b75c1e3d0697" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "jiff", + "log", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -1714,6 +1737,30 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jiff" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c102670231191d07d37a35af3eb77f1f0dbf7a71be51a962dcd57ea607be7260" +dependencies = [ + "jiff-static", + "log", + "portable-atomic", + "portable-atomic-util", + "serde", +] + +[[package]] +name = "jiff-static" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cdde31a9d349f1b1f51a0b3714a5940ac022976f4b49485fc04be052b183b4c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "jobserver" version = "0.1.33" @@ -1815,15 +1862,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "matchers" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" -dependencies = [ - "regex-automata 0.1.10", -] - [[package]] name = "md-5" version = "0.10.6" @@ -1946,7 +1984,9 @@ dependencies = [ "aws-sdk-s3", "clap", "ed25519-dalek", + "env_logger", "futures", + "log", "nix-compat", "regex", "reqwest", @@ -1954,8 +1994,6 @@ dependencies = [ "serde_json", "sha2", "tokio", - "tracing", - "tracing-subscriber", "url", ] @@ -1978,16 +2016,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "nu-ansi-term" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" -dependencies = [ - "overload", - "winapi", -] - [[package]] name = "num-conv" version = "0.1.0" @@ -2098,12 +2126,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "p256" version = "0.11.1" @@ -2182,6 +2204,21 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" +[[package]] +name = "portable-atomic" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" + +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -2257,17 +2294,8 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.9", - "regex-syntax 0.8.5", -] - -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" -dependencies = [ - "regex-syntax 0.6.29", + "regex-automata", + "regex-syntax", ] [[package]] @@ -2278,7 +2306,7 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.5", + "regex-syntax", ] [[package]] @@ -2287,12 +2315,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" -[[package]] -name = "regex-syntax" -version = "0.6.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" - [[package]] name = "regex-syntax" version = "0.8.5" @@ -2670,15 +2692,6 @@ dependencies = [ "digest", ] -[[package]] -name = "sharded-slab" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" -dependencies = [ - "lazy_static", -] - [[package]] name = "shlex" version = "1.3.0" @@ -2861,16 +2874,6 @@ dependencies = [ "syn", ] -[[package]] -name = "thread_local" -version = "1.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" -dependencies = [ - "cfg-if", - "once_cell", -] - [[package]] name = "time" version = "0.3.41" @@ -3056,36 +3059,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", - "valuable", -] - -[[package]] -name = "tracing-log" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" -dependencies = [ - "log", - "once_cell", - "tracing-core", -] - -[[package]] -name = "tracing-subscriber" -version = "0.3.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" -dependencies = [ - "matchers", - "nu-ansi-term", - "once_cell", - "regex", - "sharded-slab", - "smallvec", - "thread_local", - "tracing", - "tracing-core", - "tracing-log", ] [[package]] @@ -3154,12 +3127,6 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" -[[package]] -name = "valuable" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" - [[package]] name = "vcpkg" version = "0.2.15" @@ -3295,28 +3262,6 @@ dependencies = [ "rustix 0.38.44", ] -[[package]] -name = "winapi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" -dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", -] - -[[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" - -[[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - [[package]] name = "windows-link" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index f7bc3f0..71b2a41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,9 @@ aws-config = { version = "1.6.1", features = ["behavior-version-latest"] } aws-sdk-s3 = "1.82.0" clap = { version = "4.5.34", features = ["derive"] } ed25519-dalek = "2.1.1" +env_logger = "0.11.7" futures = "0.3.31" +log = "0.4.27" nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" } regex = "1.11.1" reqwest = "0.12.15" @@ -18,6 +20,4 @@ serde = { version = "1.0.219", features = [ "derive" ]} serde_json = "1.0.140" sha2 = "0.10.8" tokio = { version = "1.44.1", features = [ "full" ]} -tracing = "0.1.41" -tracing-subscriber = { version = "0.3.19", features = ["env-filter"]} url = { version = "2.5.4", features = [ "serde" ]} diff --git a/src/main.rs b/src/main.rs index 3a7cc78..c4e3d05 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,9 @@ #![feature(let_chains)] #![feature(extend_one)] +#![feature(array_chunks)] use anyhow::{Context, Result}; use clap::{Parser, Subcommand}; -use tracing_subscriber::{EnvFilter, FmtSubscriber}; use nixcp::NixCp; @@ -37,7 +37,7 @@ struct Cli { region: Option, /// If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default - /// e.g. https://s3.example.com + /// e.g. s3.example.com #[arg(long)] endpoint: Option, @@ -58,10 +58,7 @@ enum Commands { #[tokio::main] async fn main() -> Result<()> { - let filter = EnvFilter::from_default_env(); - let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish(); - tracing::subscriber::set_global_default(subscriber)?; - + env_logger::init(); let cli = Cli::parse(); let nixcp = Box::leak(Box::new(NixCp::new(&cli).await?)); diff --git a/src/nixcp.rs b/src/nixcp.rs index a215923..279c20d 100644 --- a/src/nixcp.rs +++ b/src/nixcp.rs @@ -2,7 +2,7 @@ use std::{ fs, iter::once, sync::{ - Arc, + Arc, Mutex, atomic::{AtomicUsize, Ordering}, }, }; @@ -11,9 +11,9 @@ use anyhow::{Context, Result}; use aws_config::Region; use aws_sdk_s3 as s3; use futures::future::join_all; +use log::{debug, info, warn}; use nix_compat::narinfo::{self, SigningKey}; use tokio::sync::{RwLock, Semaphore, mpsc}; -use tracing::{debug, info, trace}; use url::Url; use crate::{Cli, path_info::PathInfo, uploader::Uploader}; @@ -24,10 +24,6 @@ pub struct NixCp { s3_client: s3::Client, signing_key: SigningKey, bucket: String, - // paths that we skipped cause of a signature match - signature_hit_count: AtomicUsize, - // paths that we skipped cause we found it on an upstream - upstream_hit_count: AtomicUsize, } impl NixCp { @@ -63,8 +59,6 @@ impl NixCp { s3_client, signing_key, bucket: cli.bucket.clone(), - signature_hit_count: AtomicUsize::new(0), - upstream_hit_count: AtomicUsize::new(0), }) } @@ -87,23 +81,18 @@ impl NixCp { pub async fn run(&'static self) -> Result<()> { let (tx, rx) = mpsc::channel(10); let tx = Arc::new(tx); - let filter = tokio::spawn(self.filter_from_upstream(tx)); - let upload = tokio::spawn(self.upload(rx)); - filter.await?; - upload.await??; - Ok(()) + tokio::spawn(self.filter_from_upstream(tx)); + self.upload(rx).await } /// filter paths that are on upstream and send to `tx` - async fn filter_from_upstream(&'static self, tx: Arc>) { + async fn filter_from_upstream(&self, tx: Arc>) { let permits = Arc::new(Semaphore::new(10)); let mut handles = Vec::with_capacity(10); let store_paths = self.store_paths.read().await.clone(); for path in store_paths.into_iter() { if path.check_upstream_signature(&self.upstream_caches) { - trace!("skip {} (signature match)", path.absolute_path()); - self.signature_hit_count.fetch_add(1, Ordering::Release); continue; } handles.push({ @@ -115,23 +104,19 @@ impl NixCp { if !path.check_upstream_hit(upstream_caches.as_slice()).await { tx.send(path).await.unwrap(); - } else { - trace!("skip {} (upstream hit)", path.absolute_path()); - self.upstream_hit_count.fetch_add(1, Ordering::Relaxed); } }) }); } - join_all(handles) - .await - .into_iter() - .collect::>() - .unwrap(); + for handle in handles { + handle.await.unwrap(); + } } async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { let upload_count = AtomicUsize::new(0); + let failures: Arc>> = Arc::new(Mutex::new(Vec::new())); let permits = Arc::new(Semaphore::new(10)); let mut uploads = Vec::with_capacity(10); @@ -154,21 +139,16 @@ impl NixCp { }); uploads.push(fut); } else { - join_all(uploads) - .await - .into_iter() - .flatten() - .collect::>>()?; + join_all(uploads).await; + println!("uploaded {} paths", upload_count.load(Ordering::Relaxed)); - println!("uploaded: {}", upload_count.load(Ordering::Relaxed)); - println!( - "skipped because of signature match: {}", - self.signature_hit_count.load(Ordering::Relaxed) - ); - println!( - "skipped because of upstream hit: {}", - self.upstream_hit_count.load(Ordering::Relaxed) - ); + let failures = failures.lock().unwrap(); + if !failures.is_empty() { + warn!("failed to upload these paths: "); + for failure in failures.iter() { + warn!("{}", failure); + } + } break; } } diff --git a/src/path_info.rs b/src/path_info.rs index 57e0a5b..746c344 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -1,12 +1,12 @@ use std::collections::HashSet; use anyhow::{Context, Error, Result}; +use log::{debug, error, trace}; +use nix_compat::nixhash::CAHash; use nix_compat::store_path::StorePath; -use nix_compat::{nixbase32, nixhash::CAHash}; use regex::Regex; use serde::{Deserialize, Serialize}; use tokio::process::Command; -use tracing::{debug, error, trace}; use url::Url; // nix path-info --derivation --json @@ -97,9 +97,12 @@ impl PathInfo { } pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool { + let hash = + String::from_utf8(self.path.digest().to_vec()).expect("should be a valid string"); + for upstream in upstreams { let upstream = upstream - .join(format!("{}/.narinfo", self.digest()).as_str()) + .join(format!("{hash}/.narinfo").as_str()) .expect("adding .narinfo should make a valid url"); let res_status = reqwest::Client::new() .head(upstream.as_str()) @@ -118,8 +121,8 @@ impl PathInfo { self.path.to_absolute_path() } - pub fn digest(&self) -> String { - nixbase32::encode(self.path.digest()) + pub fn digest(&self) -> &str { + str::from_utf8(self.path.digest()).expect("digest should be valid string") } } diff --git a/src/uploader.rs b/src/uploader.rs index e52e4ae..81f4be8 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -5,6 +5,7 @@ use aws_sdk_s3::{ types::{CompletedMultipartUpload, CompletedPart}, }; use futures::future::join_all; +use log::debug; use nix_compat::{ narinfo::{self, NarInfo, SigningKey}, nixbase32, @@ -12,7 +13,6 @@ use nix_compat::{ }; use sha2::{Digest, Sha256}; use tokio::{io::AsyncReadExt, process::Command}; -use tracing::debug; use crate::path_info::PathInfo; @@ -46,14 +46,11 @@ impl<'a> Uploader<'a> { let nar = self.compress_nar(&nar).await; // update fields that we know after compression + nar_info.file_size = Some(nar.len() as u64); let mut hasher = Sha256::new(); hasher.update(&nar); - let hash: [u8; 32] = hasher.finalize().into(); - let nar_url = self.nar_url(&hash); - nar_info.file_hash = Some(hash); - nar_info.file_size = Some(nar.len() as u64); - nar_info.url = nar_url.as_str(); - debug!("uploading nar with key: {nar_url}"); + nar_info.file_hash = Some(hasher.finalize().into()); + let nar_url = self.nar_url(&nar); if nar.len() < MULTIPART_CUTOFF { let put_object = self @@ -76,7 +73,7 @@ impl<'a> Uploader<'a> { let upload_id = multipart.upload_id().unwrap(); let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF); - let chunks = nar.chunks(MULTIPART_CUTOFF); + let chunks = nar.array_chunks::(); for (i, chunk) in chunks.enumerate() { parts.push(tokio::task::spawn( self.s3_client @@ -123,12 +120,12 @@ impl<'a> Uploader<'a> { debug!("complete multipart upload: {:#?}", complete_mp_upload); } - let narinfo_url = format!("{}.narinfo", self.path.digest()); - debug!("uploading narinfo with key {narinfo_url}"); + nar_info.add_signature(self.signing_key); + self.s3_client .put_object() .bucket(&self.bucket) - .key(narinfo_url) + .key(format!("{}.narinfo", self.path.digest())) .body(nar_info.to_string().as_bytes().to_vec().into()) .send() .await?; @@ -150,7 +147,7 @@ impl<'a> Uploader<'a> { let mut hasher = Sha256::new(); hasher.update(nar); let nar_hash: [u8; 32] = hasher.finalize().into(); - let mut nar_info = NarInfo { + let nar_info = NarInfo { flags: narinfo::Flags::empty(), store_path: self.path.path.as_ref(), nar_hash, @@ -165,8 +162,6 @@ impl<'a> Uploader<'a> { file_size: None, url: "", }; - // signature consists of: store_path, nar_hash, nar_size, and references - nar_info.add_signature(self.signing_key); Ok(nar_info) }