diff --git a/src/push.rs b/src/push.rs index df8304d..dca2a9f 100644 --- a/src/push.rs +++ b/src/push.rs @@ -13,7 +13,7 @@ use aws_config::Region; use aws_sdk_s3 as s3; use futures::future::join_all; use nix_compat::narinfo::{self, SigningKey}; -use tokio::sync::{RwLock, mpsc}; +use tokio::sync::{RwLock, Semaphore, mpsc}; use tracing::{debug, trace}; use url::Url; @@ -161,6 +161,7 @@ impl Push { async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { let mut uploads = Vec::with_capacity(10); + let permits = Arc::new(Semaphore::new(10)); loop { if let Some(path_to_upload) = rx.recv().await { @@ -174,10 +175,15 @@ impl Push { self.bucket.clone(), )?; - uploads.push(tokio::spawn(async move { - let res = uploader.upload().await; - self.upload_count.fetch_add(1, Ordering::Relaxed); - res + uploads.push(tokio::spawn({ + let permits = permits.clone(); + + async move { + let _permit = permits.acquire().await; + let res = uploader.upload().await; + self.upload_count.fetch_add(1, Ordering::Relaxed); + res + } })); } else { join_all(uploads) diff --git a/src/uploader.rs b/src/uploader.rs index 95e03df..cacae2b 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -132,6 +132,7 @@ impl<'a> Uploader<'a> { .body(nar_info.to_string().as_bytes().to_vec().into()) .send() .await?; + debug!("done uploading narinfo"); Ok(()) }