From 03530a2afa11c4a2e8c9908b8a17217c138f7fb7 Mon Sep 17 00:00:00 2001 From: cy Date: Tue, 15 Apr 2025 18:53:45 -0400 Subject: [PATCH 1/3] don't limit stuff with semaphore and use println for uploading output --- src/push.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/src/push.rs b/src/push.rs index 5adab30..896b556 100644 --- a/src/push.rs +++ b/src/push.rs @@ -12,7 +12,7 @@ use aws_config::Region; use aws_sdk_s3 as s3; use futures::future::join_all; use nix_compat::narinfo::{self, SigningKey}; -use tokio::sync::{RwLock, Semaphore, mpsc}; +use tokio::sync::{RwLock, mpsc}; use tracing::{debug, info, trace}; use url::Url; @@ -98,7 +98,6 @@ impl Push { /// filter paths that are on upstream and send to `tx` async fn filter_from_upstream(&'static self, tx: mpsc::Sender) { - let permits = Arc::new(Semaphore::new(10)); let mut handles = Vec::with_capacity(10); let store_paths = self.store_paths.read().await.clone(); @@ -109,11 +108,8 @@ impl Push { continue; } handles.push({ - let permits = permits.clone(); let tx = tx.clone(); tokio::spawn(async move { - let _permit = permits.acquire().await.unwrap(); - if !path .check_upstream_hit(self.upstream_caches.as_slice()) .await @@ -144,15 +140,13 @@ impl Push { async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { let upload_count = AtomicUsize::new(0); - let permits = Arc::new(Semaphore::new(10)); let mut uploads = Vec::with_capacity(10); loop { if let Some(path_to_upload) = rx.recv().await { - let permits = Arc::clone(&permits); let absolute_path = path_to_upload.absolute_path(); - info!("uploading: {}", absolute_path); + println!("uploading: {}", absolute_path); let uploader = Uploader::new( &self.signing_key, path_to_upload, @@ -160,10 +154,7 @@ impl Push { self.bucket.clone(), )?; - uploads.push(tokio::spawn(async move { - let _permit = permits.acquire().await.unwrap(); - uploader.upload().await - })); + uploads.push(tokio::spawn(async move { uploader.upload().await })); } else { join_all(uploads) .await From 489572083183aa25902635f306d8f4625cfd46ce Mon Sep 17 00:00:00 2001 From: cy Date: Tue, 15 Apr 2025 20:00:55 -0400 Subject: [PATCH 2/3] fix upload count --- src/push.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/push.rs b/src/push.rs index 896b556..719d3a8 100644 --- a/src/push.rs +++ b/src/push.rs @@ -30,6 +30,8 @@ pub struct Push { upstream_hit_count: AtomicUsize, // paths that we skipped cause they are already on our cache already_exists_count: AtomicUsize, + // paths that we uploaded + upload_count: AtomicUsize, } impl Push { @@ -68,6 +70,7 @@ impl Push { signature_hit_count: AtomicUsize::new(0), upstream_hit_count: AtomicUsize::new(0), already_exists_count: AtomicUsize::new(0), + upload_count: AtomicUsize::new(0), }) } @@ -139,7 +142,6 @@ impl Push { } async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { - let upload_count = AtomicUsize::new(0); let mut uploads = Vec::with_capacity(10); loop { @@ -154,7 +156,11 @@ impl Push { self.bucket.clone(), )?; - uploads.push(tokio::spawn(async move { uploader.upload().await })); + uploads.push(tokio::spawn(async move { + let res = uploader.upload().await; + self.upload_count.fetch_add(1, Ordering::Relaxed); + res + })); } else { join_all(uploads) .await @@ -162,7 +168,7 @@ impl Push { .flatten() .collect::>>()?; - println!("uploaded: {}", upload_count.load(Ordering::Relaxed)); + println!("uploaded: {}", self.upload_count.load(Ordering::Relaxed)); println!( "skipped because of signature match: {}", self.signature_hit_count.load(Ordering::Relaxed) From 20c13a86be3d571d59c1e5c82f3fdb558225d4c0 Mon Sep 17 00:00:00 2001 From: cy Date: Tue, 15 Apr 2025 20:06:46 -0400 Subject: [PATCH 3/3] use experimental-features flag when executing nix run --- src/path_info.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/path_info.rs b/src/path_info.rs index 0f05a87..6dcbb53 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -25,8 +25,12 @@ impl PathInfo { pub async fn from_path(path: &str) -> Result { debug!("query nix path-info for {path}"); // use lix cause nix would return a json map instead of an array + // json output is not stable and could break in future + // TODO figure out a better way let nix_cmd = Command::new("nix") .arg("run") + .arg("--experimental-features") + .arg("nix-command flakes") .arg("github:nixos/nixpkgs/nixos-unstable#lix") .arg("--") .arg("path-info")