improve concurrency control; use nar_size from cpathinfo

This commit is contained in:
cy 2025-04-27 01:23:45 -04:00
parent ca97aebd7a
commit e5336d304d
Signed by: cy
SSH key fingerprint: SHA256:o/geVWV4om1QhUSkKvDQeW/eAihwnjyXkqMwrVdbuts
6 changed files with 33 additions and 8 deletions

View file

@ -228,6 +228,8 @@ mod ffi {
/// Mid-level wrapper for the `nix::ValidPathInfo` struct. /// Mid-level wrapper for the `nix::ValidPathInfo` struct.
type CPathInfo; type CPathInfo;
/// Returns the size of the NAR.
fn nar_size(self: Pin<&mut CPathInfo>) -> u64;
/// Returns the references of the store path. /// Returns the references of the store path.
fn references(self: Pin<&mut CPathInfo>) -> UniquePtr<CxxVector<CxxString>>; fn references(self: Pin<&mut CPathInfo>) -> UniquePtr<CxxVector<CxxString>>;

View file

@ -57,6 +57,10 @@ void RustSink::eof() {
CPathInfo::CPathInfo(nix::ref<const nix::ValidPathInfo> pi) : pi(pi) {} CPathInfo::CPathInfo(nix::ref<const nix::ValidPathInfo> pi) : pi(pi) {}
uint64_t CPathInfo::nar_size() {
return this->pi->narSize;
}
std::unique_ptr<std::vector<std::string>> CPathInfo::sigs() { std::unique_ptr<std::vector<std::string>> CPathInfo::sigs() {
std::vector<std::string> result; std::vector<std::string> result;
for (auto&& elem : this->pi->sigs) { for (auto&& elem : this->pi->sigs) {

View file

@ -65,6 +65,7 @@ public:
CPathInfo(nix::ref<const nix::ValidPathInfo> pi); CPathInfo(nix::ref<const nix::ValidPathInfo> pi);
std::unique_ptr<std::vector<std::string>> sigs(); std::unique_ptr<std::vector<std::string>> sigs();
std::unique_ptr<std::vector<std::string>> references(); std::unique_ptr<std::vector<std::string>> references();
uint64_t nar_size();
}; };
class CNixStore { class CNixStore {

View file

@ -18,6 +18,7 @@ pub struct PathInfo {
pub path: StorePath<String>, pub path: StorePath<String>,
pub signatures: Vec<String>, pub signatures: Vec<String>,
pub references: Vec<StorePath<String>>, pub references: Vec<StorePath<String>>,
pub nar_size: u64,
} }
impl PathInfo { impl PathInfo {
@ -90,7 +91,7 @@ impl PathInfo {
let upstream = upstream let upstream = upstream
.join(self.narinfo_path().as_ref()) .join(self.narinfo_path().as_ref())
.expect("adding <hash>.narinfo should make a valid url"); .expect("adding <hash>.narinfo should make a valid url");
debug!("querying {}", upstream); trace!("querying {}", upstream);
let res_status = reqwest::Client::new() let res_status = reqwest::Client::new()
.head(upstream.as_str()) .head(upstream.as_str())
.send() .send()

View file

@ -114,18 +114,22 @@ impl Push {
/// filter paths that are on upstream and send to `tx` /// filter paths that are on upstream and send to `tx`
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) { async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
let mut handles = Vec::with_capacity(10); let mut handles = Vec::new();
let store_paths = self.store_paths.read().await.clone(); let store_paths = self.store_paths.read().await.clone();
// limit number of inflight requests
let inflight_permits = Arc::new(Semaphore::new(32));
for path in store_paths.into_iter() { for path in store_paths.into_iter() {
if path.check_upstream_signature(&self.upstream_caches) { if path.check_upstream_signature(&self.upstream_caches) {
debug!("skip {} (signature match)", path.absolute_path()); debug!("skip {} (signature match)", path.absolute_path());
self.signature_hit_count.fetch_add(1, Ordering::Release); self.signature_hit_count.fetch_add(1, Ordering::Relaxed);
continue; continue;
} }
handles.push({ handles.push({
let tx = tx.clone(); let tx = tx.clone();
let inflight_permits = inflight_permits.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _permit = inflight_permits.acquire().await.unwrap();
if !path if !path
.check_upstream_hit(self.upstream_caches.as_slice()) .check_upstream_hit(self.upstream_caches.as_slice())
.await .await
@ -153,24 +157,35 @@ impl Push {
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> { async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
let mut uploads = Vec::new(); let mut uploads = Vec::new();
let permits = Arc::new(Semaphore::new(10)); let permits = Arc::new(Semaphore::new(16));
let big_permits = Arc::new(Semaphore::new(2)); let big_permits = Arc::new(Semaphore::new(5));
loop { loop {
let permits = permits.clone(); let permits = permits.clone();
let big_permits = big_permits.clone(); let big_permits = big_permits.clone();
if let Some(path_to_upload) = rx.recv().await { if let Some(path_to_upload) = rx.recv().await {
debug!("upload permits available: {}", permits.available_permits());
let mut permit = permits.acquire_owned().await.unwrap(); let mut permit = permits.acquire_owned().await.unwrap();
uploads.push(tokio::spawn({ uploads.push(tokio::spawn({
// directory may have many files and end up causing "too many open files" // a large directory may have many files and end up causing "too many open files"
if PathBuf::from(path_to_upload.absolute_path()).is_dir() { if PathBuf::from(path_to_upload.absolute_path()).is_dir()
&& path_to_upload.nar_size > 5 * 1024 * 1024
{
debug!(
"upload big permits available: {}",
big_permits.available_permits()
);
// drop regular permit and take the big one // drop regular permit and take the big one
permit = big_permits.acquire_owned().await.unwrap(); permit = big_permits.acquire_owned().await.unwrap();
} }
println!("uploading: {}", path_to_upload.absolute_path()); println!(
"uploading: {} (size: {})",
path_to_upload.absolute_path(),
path_to_upload.nar_size
);
let uploader = Uploader::new(&self.signing_key, path_to_upload)?; let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
let s3 = self.s3.clone(); let s3 = self.s3.clone();
let store = self.store.clone(); let store = self.store.clone();

View file

@ -69,11 +69,13 @@ impl Store {
.map(|x| StorePath::from_bytes(x.as_bytes())) .map(|x| StorePath::from_bytes(x.as_bytes()))
.collect::<Result<_, _>>() .collect::<Result<_, _>>()
.context("get references from pathinfo")?; .context("get references from pathinfo")?;
let nar_size = c_path_info.pin_mut().nar_size();
Ok(PathInfo { Ok(PathInfo {
path, path,
signatures, signatures,
references, references,
nar_size,
}) })
}) })
.await .await