diff --git a/Cargo.lock b/Cargo.lock index 86744e5..901bf4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -647,7 +647,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4" dependencies = [ "memchr", - "regex-automata", + "regex-automata 0.4.9", "serde", ] @@ -1052,29 +1052,6 @@ dependencies = [ "syn", ] -[[package]] -name = "env_filter" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" -dependencies = [ - "log", - "regex", -] - -[[package]] -name = "env_logger" -version = "0.11.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3716d7a920fb4fac5d84e9d4bce8ceb321e9414b4409da61b07b75c1e3d0697" -dependencies = [ - "anstream", - "anstyle", - "env_filter", - "jiff", - "log", -] - [[package]] name = "equivalent" version = "1.0.2" @@ -1737,30 +1714,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" -[[package]] -name = "jiff" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c102670231191d07d37a35af3eb77f1f0dbf7a71be51a962dcd57ea607be7260" -dependencies = [ - "jiff-static", - "log", - "portable-atomic", - "portable-atomic-util", - "serde", -] - -[[package]] -name = "jiff-static" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4cdde31a9d349f1b1f51a0b3714a5940ac022976f4b49485fc04be052b183b4c" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "jobserver" version = "0.1.33" @@ -1862,6 +1815,15 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "md-5" version = "0.10.6" @@ -1984,9 +1946,7 @@ dependencies = [ "aws-sdk-s3", "clap", "ed25519-dalek", - "env_logger", "futures", - "log", "nix-compat", "regex", "reqwest", @@ -1994,6 +1954,8 @@ dependencies = [ "serde_json", "sha2", "tokio", + "tracing", + "tracing-subscriber", "url", ] @@ -2016,6 +1978,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -2126,6 +2098,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "p256" version = "0.11.1" @@ -2204,21 +2182,6 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" -[[package]] -name = "portable-atomic" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" - -[[package]] -name = "portable-atomic-util" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" -dependencies = [ - "portable-atomic", -] - [[package]] name = "powerfmt" version = "0.2.0" @@ -2294,8 +2257,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.9", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2306,7 +2278,7 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] [[package]] @@ -2315,6 +2287,12 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -2692,6 +2670,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2874,6 +2861,16 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" +dependencies = [ + "cfg-if", + "once_cell", +] + [[package]] name = "time" version = "0.3.41" @@ -3059,6 +3056,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -3127,6 +3154,12 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9" +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" @@ -3262,6 +3295,28 @@ dependencies = [ "rustix 0.38.44", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-link" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index 71b2a41..f7bc3f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,9 +10,7 @@ aws-config = { version = "1.6.1", features = ["behavior-version-latest"] } aws-sdk-s3 = "1.82.0" clap = { version = "4.5.34", features = ["derive"] } ed25519-dalek = "2.1.1" -env_logger = "0.11.7" futures = "0.3.31" -log = "0.4.27" nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" } regex = "1.11.1" reqwest = "0.12.15" @@ -20,4 +18,6 @@ serde = { version = "1.0.219", features = [ "derive" ]} serde_json = "1.0.140" sha2 = "0.10.8" tokio = { version = "1.44.1", features = [ "full" ]} +tracing = "0.1.41" +tracing-subscriber = { version = "0.3.19", features = ["env-filter"]} url = { version = "2.5.4", features = [ "serde" ]} diff --git a/src/main.rs b/src/main.rs index c4e3d05..3a7cc78 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,9 @@ #![feature(let_chains)] #![feature(extend_one)] -#![feature(array_chunks)] use anyhow::{Context, Result}; use clap::{Parser, Subcommand}; +use tracing_subscriber::{EnvFilter, FmtSubscriber}; use nixcp::NixCp; @@ -37,7 +37,7 @@ struct Cli { region: Option, /// If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default - /// e.g. s3.example.com + /// e.g. https://s3.example.com #[arg(long)] endpoint: Option, @@ -58,7 +58,10 @@ enum Commands { #[tokio::main] async fn main() -> Result<()> { - env_logger::init(); + let filter = EnvFilter::from_default_env(); + let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish(); + tracing::subscriber::set_global_default(subscriber)?; + let cli = Cli::parse(); let nixcp = Box::leak(Box::new(NixCp::new(&cli).await?)); diff --git a/src/nixcp.rs b/src/nixcp.rs index 279c20d..a215923 100644 --- a/src/nixcp.rs +++ b/src/nixcp.rs @@ -2,7 +2,7 @@ use std::{ fs, iter::once, sync::{ - Arc, Mutex, + Arc, atomic::{AtomicUsize, Ordering}, }, }; @@ -11,9 +11,9 @@ use anyhow::{Context, Result}; use aws_config::Region; use aws_sdk_s3 as s3; use futures::future::join_all; -use log::{debug, info, warn}; use nix_compat::narinfo::{self, SigningKey}; use tokio::sync::{RwLock, Semaphore, mpsc}; +use tracing::{debug, info, trace}; use url::Url; use crate::{Cli, path_info::PathInfo, uploader::Uploader}; @@ -24,6 +24,10 @@ pub struct NixCp { s3_client: s3::Client, signing_key: SigningKey, bucket: String, + // paths that we skipped cause of a signature match + signature_hit_count: AtomicUsize, + // paths that we skipped cause we found it on an upstream + upstream_hit_count: AtomicUsize, } impl NixCp { @@ -59,6 +63,8 @@ impl NixCp { s3_client, signing_key, bucket: cli.bucket.clone(), + signature_hit_count: AtomicUsize::new(0), + upstream_hit_count: AtomicUsize::new(0), }) } @@ -81,18 +87,23 @@ impl NixCp { pub async fn run(&'static self) -> Result<()> { let (tx, rx) = mpsc::channel(10); let tx = Arc::new(tx); - tokio::spawn(self.filter_from_upstream(tx)); - self.upload(rx).await + let filter = tokio::spawn(self.filter_from_upstream(tx)); + let upload = tokio::spawn(self.upload(rx)); + filter.await?; + upload.await??; + Ok(()) } /// filter paths that are on upstream and send to `tx` - async fn filter_from_upstream(&self, tx: Arc>) { + async fn filter_from_upstream(&'static self, tx: Arc>) { let permits = Arc::new(Semaphore::new(10)); let mut handles = Vec::with_capacity(10); let store_paths = self.store_paths.read().await.clone(); for path in store_paths.into_iter() { if path.check_upstream_signature(&self.upstream_caches) { + trace!("skip {} (signature match)", path.absolute_path()); + self.signature_hit_count.fetch_add(1, Ordering::Release); continue; } handles.push({ @@ -104,19 +115,23 @@ impl NixCp { if !path.check_upstream_hit(upstream_caches.as_slice()).await { tx.send(path).await.unwrap(); + } else { + trace!("skip {} (upstream hit)", path.absolute_path()); + self.upstream_hit_count.fetch_add(1, Ordering::Relaxed); } }) }); } - for handle in handles { - handle.await.unwrap(); - } + join_all(handles) + .await + .into_iter() + .collect::>() + .unwrap(); } async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { let upload_count = AtomicUsize::new(0); - let failures: Arc>> = Arc::new(Mutex::new(Vec::new())); let permits = Arc::new(Semaphore::new(10)); let mut uploads = Vec::with_capacity(10); @@ -139,16 +154,21 @@ impl NixCp { }); uploads.push(fut); } else { - join_all(uploads).await; - println!("uploaded {} paths", upload_count.load(Ordering::Relaxed)); + join_all(uploads) + .await + .into_iter() + .flatten() + .collect::>>()?; - let failures = failures.lock().unwrap(); - if !failures.is_empty() { - warn!("failed to upload these paths: "); - for failure in failures.iter() { - warn!("{}", failure); - } - } + println!("uploaded: {}", upload_count.load(Ordering::Relaxed)); + println!( + "skipped because of signature match: {}", + self.signature_hit_count.load(Ordering::Relaxed) + ); + println!( + "skipped because of upstream hit: {}", + self.upstream_hit_count.load(Ordering::Relaxed) + ); break; } } diff --git a/src/path_info.rs b/src/path_info.rs index 746c344..57e0a5b 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -1,12 +1,12 @@ use std::collections::HashSet; use anyhow::{Context, Error, Result}; -use log::{debug, error, trace}; -use nix_compat::nixhash::CAHash; use nix_compat::store_path::StorePath; +use nix_compat::{nixbase32, nixhash::CAHash}; use regex::Regex; use serde::{Deserialize, Serialize}; use tokio::process::Command; +use tracing::{debug, error, trace}; use url::Url; // nix path-info --derivation --json @@ -97,12 +97,9 @@ impl PathInfo { } pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool { - let hash = - String::from_utf8(self.path.digest().to_vec()).expect("should be a valid string"); - for upstream in upstreams { let upstream = upstream - .join(format!("{hash}/.narinfo").as_str()) + .join(format!("{}/.narinfo", self.digest()).as_str()) .expect("adding .narinfo should make a valid url"); let res_status = reqwest::Client::new() .head(upstream.as_str()) @@ -121,8 +118,8 @@ impl PathInfo { self.path.to_absolute_path() } - pub fn digest(&self) -> &str { - str::from_utf8(self.path.digest()).expect("digest should be valid string") + pub fn digest(&self) -> String { + nixbase32::encode(self.path.digest()) } } diff --git a/src/uploader.rs b/src/uploader.rs index 81f4be8..e52e4ae 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -5,7 +5,6 @@ use aws_sdk_s3::{ types::{CompletedMultipartUpload, CompletedPart}, }; use futures::future::join_all; -use log::debug; use nix_compat::{ narinfo::{self, NarInfo, SigningKey}, nixbase32, @@ -13,6 +12,7 @@ use nix_compat::{ }; use sha2::{Digest, Sha256}; use tokio::{io::AsyncReadExt, process::Command}; +use tracing::debug; use crate::path_info::PathInfo; @@ -46,11 +46,14 @@ impl<'a> Uploader<'a> { let nar = self.compress_nar(&nar).await; // update fields that we know after compression - nar_info.file_size = Some(nar.len() as u64); let mut hasher = Sha256::new(); hasher.update(&nar); - nar_info.file_hash = Some(hasher.finalize().into()); - let nar_url = self.nar_url(&nar); + let hash: [u8; 32] = hasher.finalize().into(); + let nar_url = self.nar_url(&hash); + nar_info.file_hash = Some(hash); + nar_info.file_size = Some(nar.len() as u64); + nar_info.url = nar_url.as_str(); + debug!("uploading nar with key: {nar_url}"); if nar.len() < MULTIPART_CUTOFF { let put_object = self @@ -73,7 +76,7 @@ impl<'a> Uploader<'a> { let upload_id = multipart.upload_id().unwrap(); let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF); - let chunks = nar.array_chunks::(); + let chunks = nar.chunks(MULTIPART_CUTOFF); for (i, chunk) in chunks.enumerate() { parts.push(tokio::task::spawn( self.s3_client @@ -120,12 +123,12 @@ impl<'a> Uploader<'a> { debug!("complete multipart upload: {:#?}", complete_mp_upload); } - nar_info.add_signature(self.signing_key); - + let narinfo_url = format!("{}.narinfo", self.path.digest()); + debug!("uploading narinfo with key {narinfo_url}"); self.s3_client .put_object() .bucket(&self.bucket) - .key(format!("{}.narinfo", self.path.digest())) + .key(narinfo_url) .body(nar_info.to_string().as_bytes().to_vec().into()) .send() .await?; @@ -147,7 +150,7 @@ impl<'a> Uploader<'a> { let mut hasher = Sha256::new(); hasher.update(nar); let nar_hash: [u8; 32] = hasher.finalize().into(); - let nar_info = NarInfo { + let mut nar_info = NarInfo { flags: narinfo::Flags::empty(), store_path: self.path.path.as_ref(), nar_hash, @@ -162,6 +165,8 @@ impl<'a> Uploader<'a> { file_size: None, url: "", }; + // signature consists of: store_path, nar_hash, nar_size, and references + nar_info.add_signature(self.signing_key); Ok(nar_info) }