From 03530a2afa11c4a2e8c9908b8a17217c138f7fb7 Mon Sep 17 00:00:00 2001 From: cy Date: Tue, 15 Apr 2025 18:53:45 -0400 Subject: [PATCH 1/4] don't limit stuff with semaphore and use println for uploading output --- src/push.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/src/push.rs b/src/push.rs index 5adab30..896b556 100644 --- a/src/push.rs +++ b/src/push.rs @@ -12,7 +12,7 @@ use aws_config::Region; use aws_sdk_s3 as s3; use futures::future::join_all; use nix_compat::narinfo::{self, SigningKey}; -use tokio::sync::{RwLock, Semaphore, mpsc}; +use tokio::sync::{RwLock, mpsc}; use tracing::{debug, info, trace}; use url::Url; @@ -98,7 +98,6 @@ impl Push { /// filter paths that are on upstream and send to `tx` async fn filter_from_upstream(&'static self, tx: mpsc::Sender) { - let permits = Arc::new(Semaphore::new(10)); let mut handles = Vec::with_capacity(10); let store_paths = self.store_paths.read().await.clone(); @@ -109,11 +108,8 @@ impl Push { continue; } handles.push({ - let permits = permits.clone(); let tx = tx.clone(); tokio::spawn(async move { - let _permit = permits.acquire().await.unwrap(); - if !path .check_upstream_hit(self.upstream_caches.as_slice()) .await @@ -144,15 +140,13 @@ impl Push { async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { let upload_count = AtomicUsize::new(0); - let permits = Arc::new(Semaphore::new(10)); let mut uploads = Vec::with_capacity(10); loop { if let Some(path_to_upload) = rx.recv().await { - let permits = Arc::clone(&permits); let absolute_path = path_to_upload.absolute_path(); - info!("uploading: {}", absolute_path); + println!("uploading: {}", absolute_path); let uploader = Uploader::new( &self.signing_key, path_to_upload, @@ -160,10 +154,7 @@ impl Push { self.bucket.clone(), )?; - uploads.push(tokio::spawn(async move { - let _permit = permits.acquire().await.unwrap(); - uploader.upload().await - })); + uploads.push(tokio::spawn(async move { uploader.upload().await })); } else { join_all(uploads) .await From 489572083183aa25902635f306d8f4625cfd46ce Mon Sep 17 00:00:00 2001 From: cy Date: Tue, 15 Apr 2025 20:00:55 -0400 Subject: [PATCH 2/4] fix upload count --- src/push.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/push.rs b/src/push.rs index 896b556..719d3a8 100644 --- a/src/push.rs +++ b/src/push.rs @@ -30,6 +30,8 @@ pub struct Push { upstream_hit_count: AtomicUsize, // paths that we skipped cause they are already on our cache already_exists_count: AtomicUsize, + // paths that we uploaded + upload_count: AtomicUsize, } impl Push { @@ -68,6 +70,7 @@ impl Push { signature_hit_count: AtomicUsize::new(0), upstream_hit_count: AtomicUsize::new(0), already_exists_count: AtomicUsize::new(0), + upload_count: AtomicUsize::new(0), }) } @@ -139,7 +142,6 @@ impl Push { } async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { - let upload_count = AtomicUsize::new(0); let mut uploads = Vec::with_capacity(10); loop { @@ -154,7 +156,11 @@ impl Push { self.bucket.clone(), )?; - uploads.push(tokio::spawn(async move { uploader.upload().await })); + uploads.push(tokio::spawn(async move { + let res = uploader.upload().await; + self.upload_count.fetch_add(1, Ordering::Relaxed); + res + })); } else { join_all(uploads) .await @@ -162,7 +168,7 @@ impl Push { .flatten() .collect::>>()?; - println!("uploaded: {}", upload_count.load(Ordering::Relaxed)); + println!("uploaded: {}", self.upload_count.load(Ordering::Relaxed)); println!( "skipped because of signature match: {}", self.signature_hit_count.load(Ordering::Relaxed) From 20c13a86be3d571d59c1e5c82f3fdb558225d4c0 Mon Sep 17 00:00:00 2001 From: cy Date: Tue, 15 Apr 2025 20:06:46 -0400 Subject: [PATCH 3/4] use experimental-features flag when executing nix run --- src/path_info.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/path_info.rs b/src/path_info.rs index 0f05a87..6dcbb53 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -25,8 +25,12 @@ impl PathInfo { pub async fn from_path(path: &str) -> Result { debug!("query nix path-info for {path}"); // use lix cause nix would return a json map instead of an array + // json output is not stable and could break in future + // TODO figure out a better way let nix_cmd = Command::new("nix") .arg("run") + .arg("--experimental-features") + .arg("nix-command flakes") .arg("github:nixos/nixpkgs/nixos-unstable#lix") .arg("--") .arg("path-info") From 39792cdd403ab785830d65b767af0f4d3417e5f1 Mon Sep 17 00:00:00 2001 From: cy Date: Tue, 15 Apr 2025 20:09:09 -0400 Subject: [PATCH 4/4] cargo update --- Cargo.lock | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 419ee1b..aee2282 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,9 +84,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.97" +version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f" +checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" [[package]] name = "async-compression" @@ -414,7 +414,7 @@ dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", "aws-smithy-types", - "h2 0.4.8", + "h2 0.4.9", "http 0.2.12", "http 1.3.1", "http-body 0.4.6", @@ -425,7 +425,7 @@ dependencies = [ "hyper-util", "pin-project-lite", "rustls 0.21.12", - "rustls 0.23.25", + "rustls 0.23.26", "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", @@ -1298,9 +1298,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2" +checksum = "75249d144030531f8dee69fe9cea04d3edf809a017ae445e2abdff6629e86633" dependencies = [ "atomic-waker", "bytes", @@ -1457,7 +1457,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.8", + "h2 0.4.9", "http 1.3.1", "http-body 1.0.1", "httparse", @@ -1494,7 +1494,7 @@ dependencies = [ "http 1.3.1", "hyper 1.6.0", "hyper-util", - "rustls 0.23.25", + "rustls 0.23.26", "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", @@ -1748,9 +1748,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.171" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "libloading" @@ -1780,9 +1780,9 @@ checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" [[package]] name = "linux-raw-sys" -version = "0.9.3" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe7db12097d22ec582439daf8618b8fdd1a7bef6270e9af3b1ebcd30893cf413" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" [[package]] name = "litemap" @@ -2310,7 +2310,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2 0.4.8", + "h2 0.4.9", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -2404,14 +2404,14 @@ dependencies = [ [[package]] name = "rustix" -version = "1.0.3" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e56a18552996ac8d29ecc3b190b4fdbb2d91ca4ec396de7bbffaf43f3d637e96" +checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf" dependencies = [ "bitflags", "errno", "libc", - "linux-raw-sys 0.9.3", + "linux-raw-sys 0.9.4", "windows-sys 0.59.0", ] @@ -2429,9 +2429,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.25" +version = "0.23.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "822ee9188ac4ec04a2f0531e55d035fb2de73f18b41a63c70c2712503b6fb13c" +checksum = "df51b5869f3a441595eac5e8ff14d486ff285f7b8c0df8770e49c3b56351f0f0" dependencies = [ "aws-lc-rs", "once_cell", @@ -2837,7 +2837,7 @@ dependencies = [ "fastrand", "getrandom 0.3.2", "once_cell", - "rustix 1.0.3", + "rustix 1.0.5", "windows-sys 0.59.0", ] @@ -2966,7 +2966,7 @@ version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls 0.23.25", + "rustls 0.23.26", "tokio", ]