diff --git a/src/push.rs b/src/push.rs index 5adab30..896b556 100644 --- a/src/push.rs +++ b/src/push.rs @@ -12,7 +12,7 @@ use aws_config::Region; use aws_sdk_s3 as s3; use futures::future::join_all; use nix_compat::narinfo::{self, SigningKey}; -use tokio::sync::{RwLock, Semaphore, mpsc}; +use tokio::sync::{RwLock, mpsc}; use tracing::{debug, info, trace}; use url::Url; @@ -98,7 +98,6 @@ impl Push { /// filter paths that are on upstream and send to `tx` async fn filter_from_upstream(&'static self, tx: mpsc::Sender) { - let permits = Arc::new(Semaphore::new(10)); let mut handles = Vec::with_capacity(10); let store_paths = self.store_paths.read().await.clone(); @@ -109,11 +108,8 @@ impl Push { continue; } handles.push({ - let permits = permits.clone(); let tx = tx.clone(); tokio::spawn(async move { - let _permit = permits.acquire().await.unwrap(); - if !path .check_upstream_hit(self.upstream_caches.as_slice()) .await @@ -144,15 +140,13 @@ impl Push { async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { let upload_count = AtomicUsize::new(0); - let permits = Arc::new(Semaphore::new(10)); let mut uploads = Vec::with_capacity(10); loop { if let Some(path_to_upload) = rx.recv().await { - let permits = Arc::clone(&permits); let absolute_path = path_to_upload.absolute_path(); - info!("uploading: {}", absolute_path); + println!("uploading: {}", absolute_path); let uploader = Uploader::new( &self.signing_key, path_to_upload, @@ -160,10 +154,7 @@ impl Push { self.bucket.clone(), )?; - uploads.push(tokio::spawn(async move { - let _permit = permits.acquire().await.unwrap(); - uploader.upload().await - })); + uploads.push(tokio::spawn(async move { uploader.upload().await })); } else { join_all(uploads) .await