diff --git a/src/path_info.rs b/src/path_info.rs index 0f05a87..6dcbb53 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -25,8 +25,12 @@ impl PathInfo { pub async fn from_path(path: &str) -> Result { debug!("query nix path-info for {path}"); // use lix cause nix would return a json map instead of an array + // json output is not stable and could break in future + // TODO figure out a better way let nix_cmd = Command::new("nix") .arg("run") + .arg("--experimental-features") + .arg("nix-command flakes") .arg("github:nixos/nixpkgs/nixos-unstable#lix") .arg("--") .arg("path-info") diff --git a/src/push.rs b/src/push.rs index 5adab30..719d3a8 100644 --- a/src/push.rs +++ b/src/push.rs @@ -12,7 +12,7 @@ use aws_config::Region; use aws_sdk_s3 as s3; use futures::future::join_all; use nix_compat::narinfo::{self, SigningKey}; -use tokio::sync::{RwLock, Semaphore, mpsc}; +use tokio::sync::{RwLock, mpsc}; use tracing::{debug, info, trace}; use url::Url; @@ -30,6 +30,8 @@ pub struct Push { upstream_hit_count: AtomicUsize, // paths that we skipped cause they are already on our cache already_exists_count: AtomicUsize, + // paths that we uploaded + upload_count: AtomicUsize, } impl Push { @@ -68,6 +70,7 @@ impl Push { signature_hit_count: AtomicUsize::new(0), upstream_hit_count: AtomicUsize::new(0), already_exists_count: AtomicUsize::new(0), + upload_count: AtomicUsize::new(0), }) } @@ -98,7 +101,6 @@ impl Push { /// filter paths that are on upstream and send to `tx` async fn filter_from_upstream(&'static self, tx: mpsc::Sender) { - let permits = Arc::new(Semaphore::new(10)); let mut handles = Vec::with_capacity(10); let store_paths = self.store_paths.read().await.clone(); @@ -109,11 +111,8 @@ impl Push { continue; } handles.push({ - let permits = permits.clone(); let tx = tx.clone(); tokio::spawn(async move { - let _permit = permits.acquire().await.unwrap(); - if !path .check_upstream_hit(self.upstream_caches.as_slice()) .await @@ -143,16 +142,13 @@ impl Push { } async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { - let upload_count = AtomicUsize::new(0); - let permits = Arc::new(Semaphore::new(10)); let mut uploads = Vec::with_capacity(10); loop { if let Some(path_to_upload) = rx.recv().await { - let permits = Arc::clone(&permits); let absolute_path = path_to_upload.absolute_path(); - info!("uploading: {}", absolute_path); + println!("uploading: {}", absolute_path); let uploader = Uploader::new( &self.signing_key, path_to_upload, @@ -161,8 +157,9 @@ impl Push { )?; uploads.push(tokio::spawn(async move { - let _permit = permits.acquire().await.unwrap(); - uploader.upload().await + let res = uploader.upload().await; + self.upload_count.fetch_add(1, Ordering::Relaxed); + res })); } else { join_all(uploads) @@ -171,7 +168,7 @@ impl Push { .flatten() .collect::>>()?; - println!("uploaded: {}", upload_count.load(Ordering::Relaxed)); + println!("uploaded: {}", self.upload_count.load(Ordering::Relaxed)); println!( "skipped because of signature match: {}", self.signature_hit_count.load(Ordering::Relaxed)