From e5336d304d41d80cb9b07a4d9ffe3d9d480e6cf4 Mon Sep 17 00:00:00 2001 From: cy Date: Sun, 27 Apr 2025 01:23:45 -0400 Subject: [PATCH] improve concurrency control; use nar_size from cpathinfo --- src/bindings/mod.rs | 2 ++ src/bindings/nix.cpp | 4 ++++ src/bindings/nix.hpp | 1 + src/path_info.rs | 3 ++- src/push.rs | 29 ++++++++++++++++++++++------- src/store.rs | 2 ++ 6 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/bindings/mod.rs b/src/bindings/mod.rs index 8084dff..450bc98 100644 --- a/src/bindings/mod.rs +++ b/src/bindings/mod.rs @@ -228,6 +228,8 @@ mod ffi { /// Mid-level wrapper for the `nix::ValidPathInfo` struct. type CPathInfo; + /// Returns the size of the NAR. + fn nar_size(self: Pin<&mut CPathInfo>) -> u64; /// Returns the references of the store path. fn references(self: Pin<&mut CPathInfo>) -> UniquePtr>; diff --git a/src/bindings/nix.cpp b/src/bindings/nix.cpp index 326e878..8bf2cb3 100644 --- a/src/bindings/nix.cpp +++ b/src/bindings/nix.cpp @@ -57,6 +57,10 @@ void RustSink::eof() { CPathInfo::CPathInfo(nix::ref pi) : pi(pi) {} +uint64_t CPathInfo::nar_size() { + return this->pi->narSize; +} + std::unique_ptr> CPathInfo::sigs() { std::vector result; for (auto&& elem : this->pi->sigs) { diff --git a/src/bindings/nix.hpp b/src/bindings/nix.hpp index 5c79a33..87b3ebf 100644 --- a/src/bindings/nix.hpp +++ b/src/bindings/nix.hpp @@ -65,6 +65,7 @@ public: CPathInfo(nix::ref pi); std::unique_ptr> sigs(); std::unique_ptr> references(); + uint64_t nar_size(); }; class CNixStore { diff --git a/src/path_info.rs b/src/path_info.rs index f88e464..beea6b9 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -18,6 +18,7 @@ pub struct PathInfo { pub path: StorePath, pub signatures: Vec, pub references: Vec>, + pub nar_size: u64, } impl PathInfo { @@ -90,7 +91,7 @@ impl PathInfo { let upstream = upstream .join(self.narinfo_path().as_ref()) .expect("adding .narinfo should make a valid url"); - debug!("querying {}", upstream); + trace!("querying {}", upstream); let res_status = reqwest::Client::new() .head(upstream.as_str()) .send() diff --git a/src/push.rs b/src/push.rs index 9a9e156..8943355 100644 --- a/src/push.rs +++ b/src/push.rs @@ -114,18 +114,22 @@ impl Push { /// filter paths that are on upstream and send to `tx` async fn filter_from_upstream(&'static self, tx: mpsc::Sender) { - let mut handles = Vec::with_capacity(10); + let mut handles = Vec::new(); let store_paths = self.store_paths.read().await.clone(); + // limit number of inflight requests + let inflight_permits = Arc::new(Semaphore::new(32)); for path in store_paths.into_iter() { if path.check_upstream_signature(&self.upstream_caches) { debug!("skip {} (signature match)", path.absolute_path()); - self.signature_hit_count.fetch_add(1, Ordering::Release); + self.signature_hit_count.fetch_add(1, Ordering::Relaxed); continue; } handles.push({ let tx = tx.clone(); + let inflight_permits = inflight_permits.clone(); tokio::spawn(async move { + let _permit = inflight_permits.acquire().await.unwrap(); if !path .check_upstream_hit(self.upstream_caches.as_slice()) .await @@ -153,24 +157,35 @@ impl Push { async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { let mut uploads = Vec::new(); - let permits = Arc::new(Semaphore::new(10)); - let big_permits = Arc::new(Semaphore::new(2)); + let permits = Arc::new(Semaphore::new(16)); + let big_permits = Arc::new(Semaphore::new(5)); loop { let permits = permits.clone(); let big_permits = big_permits.clone(); if let Some(path_to_upload) = rx.recv().await { + debug!("upload permits available: {}", permits.available_permits()); let mut permit = permits.acquire_owned().await.unwrap(); uploads.push(tokio::spawn({ - // directory may have many files and end up causing "too many open files" - if PathBuf::from(path_to_upload.absolute_path()).is_dir() { + // a large directory may have many files and end up causing "too many open files" + if PathBuf::from(path_to_upload.absolute_path()).is_dir() + && path_to_upload.nar_size > 5 * 1024 * 1024 + { + debug!( + "upload big permits available: {}", + big_permits.available_permits() + ); // drop regular permit and take the big one permit = big_permits.acquire_owned().await.unwrap(); } - println!("uploading: {}", path_to_upload.absolute_path()); + println!( + "uploading: {} (size: {})", + path_to_upload.absolute_path(), + path_to_upload.nar_size + ); let uploader = Uploader::new(&self.signing_key, path_to_upload)?; let s3 = self.s3.clone(); let store = self.store.clone(); diff --git a/src/store.rs b/src/store.rs index 7589e94..9b925b2 100644 --- a/src/store.rs +++ b/src/store.rs @@ -69,11 +69,13 @@ impl Store { .map(|x| StorePath::from_bytes(x.as_bytes())) .collect::>() .context("get references from pathinfo")?; + let nar_size = c_path_info.pin_mut().nar_size(); Ok(PathInfo { path, signatures, references, + nar_size, }) }) .await