From ca97aebd7ac84b2fbeb88a545c6968795cc186ce Mon Sep 17 00:00:00 2001 From: cy Date: Sat, 26 Apr 2025 23:07:56 -0400 Subject: [PATCH] limit directories even more --- src/push.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/push.rs b/src/push.rs index 57a43d2..9a9e156 100644 --- a/src/push.rs +++ b/src/push.rs @@ -18,8 +18,6 @@ use url::Url; use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader}; -const UPLOAD_CONCURRENCY: usize = 5; - pub struct Push { upstream_caches: Vec, store_paths: Arc>>, @@ -105,7 +103,7 @@ impl Push { } pub async fn run(&'static self) -> Result<()> { - let (tx, rx) = mpsc::channel(UPLOAD_CONCURRENCY); + let (tx, rx) = mpsc::channel(1); let filter = tokio::spawn(self.filter_from_upstream(tx)); let upload = tokio::spawn(self.upload(rx)); @@ -155,19 +153,25 @@ impl Push { async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { let mut uploads = Vec::new(); - let permits = Arc::new(Semaphore::new(UPLOAD_CONCURRENCY)); + let permits = Arc::new(Semaphore::new(10)); + let big_permits = Arc::new(Semaphore::new(2)); loop { let permits = permits.clone(); - debug!("upload permits available: {}", permits.available_permits()); - let permit = permits.acquire_owned().await.unwrap(); + let big_permits = big_permits.clone(); if let Some(path_to_upload) = rx.recv().await { - println!("uploading: {}", path_to_upload.absolute_path()); - - let uploader = Uploader::new(&self.signing_key, path_to_upload)?; + let mut permit = permits.acquire_owned().await.unwrap(); uploads.push(tokio::spawn({ + // directory may have many files and end up causing "too many open files" + if PathBuf::from(path_to_upload.absolute_path()).is_dir() { + // drop regular permit and take the big one + permit = big_permits.acquire_owned().await.unwrap(); + } + + println!("uploading: {}", path_to_upload.absolute_path()); + let uploader = Uploader::new(&self.signing_key, path_to_upload)?; let s3 = self.s3.clone(); let store = self.store.clone(); async move {