From 0fedae933465d2b1935af2ff0498118393203ed1 Mon Sep 17 00:00:00 2001 From: cy Date: Sat, 26 Apr 2025 18:21:49 -0400 Subject: [PATCH] try to fix fd issues --- src/make_nar.rs | 36 +++++++++++++++++++++++++++--------- src/push.rs | 3 ++- src/uploader.rs | 18 +++++++++++------- 3 files changed, 40 insertions(+), 17 deletions(-) diff --git a/src/make_nar.rs b/src/make_nar.rs index e1ca57c..19a112b 100644 --- a/src/make_nar.rs +++ b/src/make_nar.rs @@ -6,19 +6,20 @@ use nix_compat::{ }; use sha2::{Digest, Sha256}; use std::mem::take; -use tempfile::NamedTempFile; +use tempfile::{NamedTempFile, TempPath}; use tokio::{ fs::File, io::{AsyncRead, BufReader}, process::Command, }; use tokio_util::io::InspectReader; +use tracing::debug; use crate::path_info::PathInfo; pub struct MakeNar<'a> { path_info: &'a PathInfo, - nar_file: NamedTempFile, + nar_file: TempPath, nar_hasher: Sha256, /// hash of compressed nar file file_hasher: Sha256, @@ -30,7 +31,9 @@ impl<'a> MakeNar<'a> { pub fn new(path_info: &'a PathInfo) -> Result { Ok(Self { path_info, - nar_file: NamedTempFile::new().context("crated tempfile for nar")?, + nar_file: NamedTempFile::new() + .context("create tempfile for nar")? + .into_temp_path(), nar_hasher: Sha256::new(), file_hasher: Sha256::new(), nar_size: 0, @@ -39,12 +42,19 @@ impl<'a> MakeNar<'a> { } pub async fn make(&self) -> Result<()> { + let path = self.path_info.absolute_path(); + let out = std::fs::File::options() + .write(true) + .truncate(true) + .open(&self.nar_file)?; + + debug!("dumping nar for: {}", path); Ok(Command::new("nix") .arg("nar") - .arg("dump-path") + .arg("pack") .arg(self.path_info.absolute_path()) .kill_on_drop(true) - .stdout(self.nar_file.reopen()?) + .stdout(out) .spawn()? .wait() .await? @@ -54,22 +64,30 @@ impl<'a> MakeNar<'a> { /// Returns a compressed nar reader which can be uploaded. File hash will be available when /// everything is read pub async fn compress_and_hash(&mut self) -> Result { - let nar_file = File::from_std(self.nar_file.reopen()?); + let nar_file = File::open(&self.nar_file).await?; // reader that hashes as nar is read - let nar_reader = InspectReader::new(nar_file, |x| self.nar_hasher.update(x)); + let nar_reader = InspectReader::new(nar_file, |x| { + self.nar_size += x.len() as u64; + self.nar_hasher.update(x); + }); let encoder = ZstdEncoder::with_quality(BufReader::new(nar_reader), Level::Default); // reader that updates file_hash as the compressed nar is read - Ok(InspectReader::new(encoder, |x| self.file_hasher.update(x))) + Ok(InspectReader::new(encoder, |x| { + self.file_size += x.len() as u64; + self.file_hasher.update(x); + })) } /// Returns *unsigned* narinfo. `url` must be updated before uploading pub fn get_narinfo(&mut self) -> Result { let file_hash = take(&mut self.file_hasher).finalize().into(); + let nar_hash = take(&mut self.nar_hasher).finalize().into(); + Ok(NarInfo { flags: narinfo::Flags::empty(), store_path: self.path_info.path.as_ref(), - nar_hash: take(&mut self.nar_hasher).finalize().into(), + nar_hash, nar_size: self.nar_size, references: self .path_info diff --git a/src/push.rs b/src/push.rs index c5a4229..93a28da 100644 --- a/src/push.rs +++ b/src/push.rs @@ -18,7 +18,7 @@ use url::Url; use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader}; -const UPLOAD_CONCURRENCY: usize = 32; +const UPLOAD_CONCURRENCY: usize = 5; pub struct Push { upstream_caches: Vec, @@ -159,6 +159,7 @@ impl Push { loop { let permits = permits.clone(); + debug!("upload permits available: {}", permits.available_permits()); let permit = permits.acquire_owned().await.unwrap(); if let Some(path_to_upload) = rx.recv().await { diff --git a/src/uploader.rs b/src/uploader.rs index 0832a58..a2c2766 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -32,21 +32,22 @@ impl<'a> Uploader<'a> { // temp location for now let temp_path = Path::parse(Ulid::new().to_string())?; let mut s3_writer = BufWriter::new(s3.clone(), temp_path.clone()); + debug!("uploading to temp path: {}", temp_path); // compress and upload nar let mut file_reader = nar.compress_and_hash().await?; - let mut buf = BytesMut::with_capacity(CHUNK_SIZE); - debug!("uploading to temp path: {}", temp_path); - while let n = file_reader.read_buf(&mut buf).await? - && n != 0 - { - s3_writer.write_all_buf(&mut buf).await?; + loop { + let mut buf = BytesMut::with_capacity(CHUNK_SIZE); + let n = file_reader.read_buf(&mut buf).await?; + s3_writer.put(buf.freeze()).await?; + if n == 0 { + break; + } } drop(file_reader); let mut nar_info = nar.get_narinfo()?; nar_info.add_signature(self.signing_key); - trace!("narinfo: {:#}", nar_info); // now that we can calculate the file_hash move the nar to where it should be let real_path = nar_url( @@ -55,6 +56,8 @@ impl<'a> Uploader<'a> { .expect("file hash must be known at this point"), ); debug!("moving {} to {}", temp_path, real_path); + // the temp object must be done uploading + s3_writer.shutdown().await?; // this is implemented as a copy-and-delete s3.rename(&temp_path, &real_path).await?; // set nar url in narinfo @@ -63,6 +66,7 @@ impl<'a> Uploader<'a> { // upload narinfo let narinfo_path = self.path.narinfo_path(); debug!("uploading narinfo: {}", narinfo_path); + trace!("narinfo: {:#}", nar_info); s3.put(&narinfo_path, nar_info.to_string().into()).await?; Ok(())