fix chunking bug

This commit is contained in:
cy 2025-04-13 23:52:31 -04:00
parent b1134d5d6e
commit 57a7ab944b
Signed by: cy
SSH key fingerprint: SHA256:o/geVWV4om1QhUSkKvDQeW/eAihwnjyXkqMwrVdbuts
3 changed files with 17 additions and 18 deletions

View file

@ -1,6 +1,5 @@
#![feature(let_chains)] #![feature(let_chains)]
#![feature(extend_one)] #![feature(extend_one)]
#![feature(array_chunks)]
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
@ -38,7 +37,7 @@ struct Cli {
region: Option<String>, region: Option<String>,
/// If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default /// If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default
/// e.g. s3.example.com /// e.g. https://s3.example.com
#[arg(long)] #[arg(long)]
endpoint: Option<String>, endpoint: Option<String>,
@ -62,6 +61,7 @@ async fn main() -> Result<()> {
let filter = EnvFilter::from_default_env(); let filter = EnvFilter::from_default_env();
let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish(); let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish();
tracing::subscriber::set_global_default(subscriber)?; tracing::subscriber::set_global_default(subscriber)?;
let cli = Cli::parse(); let cli = Cli::parse();
let nixcp = Box::leak(Box::new(NixCp::new(&cli).await?)); let nixcp = Box::leak(Box::new(NixCp::new(&cli).await?));

View file

@ -2,7 +2,7 @@ use std::{
fs, fs,
iter::once, iter::once,
sync::{ sync::{
Arc, Mutex, Arc,
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
}, },
}; };
@ -13,7 +13,7 @@ use aws_sdk_s3 as s3;
use futures::future::join_all; use futures::future::join_all;
use nix_compat::narinfo::{self, SigningKey}; use nix_compat::narinfo::{self, SigningKey};
use tokio::sync::{RwLock, Semaphore, mpsc}; use tokio::sync::{RwLock, Semaphore, mpsc};
use tracing::{debug, info, trace, warn}; use tracing::{debug, info, trace};
use url::Url; use url::Url;
use crate::{Cli, path_info::PathInfo, uploader::Uploader}; use crate::{Cli, path_info::PathInfo, uploader::Uploader};
@ -132,7 +132,6 @@ impl NixCp {
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> { async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
let upload_count = AtomicUsize::new(0); let upload_count = AtomicUsize::new(0);
let failures: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let permits = Arc::new(Semaphore::new(10)); let permits = Arc::new(Semaphore::new(10));
let mut uploads = Vec::with_capacity(10); let mut uploads = Vec::with_capacity(10);
@ -155,7 +154,12 @@ impl NixCp {
}); });
uploads.push(fut); uploads.push(fut);
} else { } else {
join_all(uploads).await; join_all(uploads)
.await
.into_iter()
.flatten()
.collect::<Result<Vec<_>>>()?;
println!("uploaded: {}", upload_count.load(Ordering::Relaxed)); println!("uploaded: {}", upload_count.load(Ordering::Relaxed));
println!( println!(
"skipped because of signature match: {}", "skipped because of signature match: {}",
@ -165,14 +169,6 @@ impl NixCp {
"skipped because of upstream hit: {}", "skipped because of upstream hit: {}",
self.upstream_hit_count.load(Ordering::Relaxed) self.upstream_hit_count.load(Ordering::Relaxed)
); );
let failures = failures.lock().unwrap();
if !failures.is_empty() {
warn!("failed to upload these paths: ");
for failure in failures.iter() {
warn!("{}", failure);
}
}
break; break;
} }
} }

View file

@ -46,13 +46,14 @@ impl<'a> Uploader<'a> {
let nar = self.compress_nar(&nar).await; let nar = self.compress_nar(&nar).await;
// update fields that we know after compression // update fields that we know after compression
nar_info.file_size = Some(nar.len() as u64);
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
hasher.update(&nar); hasher.update(&nar);
let hash: [u8; 32] = hasher.finalize().into(); let hash: [u8; 32] = hasher.finalize().into();
let nar_url = self.nar_url(&hash); let nar_url = self.nar_url(&hash);
nar_info.file_hash = Some(hash); nar_info.file_hash = Some(hash);
debug!("uploading to bucket with key: {nar_url}"); nar_info.file_size = Some(nar.len() as u64);
nar_info.url = nar_url.as_str();
debug!("uploading nar with key: {nar_url}");
if nar.len() < MULTIPART_CUTOFF { if nar.len() < MULTIPART_CUTOFF {
let put_object = self let put_object = self
@ -75,7 +76,7 @@ impl<'a> Uploader<'a> {
let upload_id = multipart.upload_id().unwrap(); let upload_id = multipart.upload_id().unwrap();
let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF); let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF);
let chunks = nar.array_chunks::<MULTIPART_CUTOFF>(); let chunks = nar.chunks(MULTIPART_CUTOFF);
for (i, chunk) in chunks.enumerate() { for (i, chunk) in chunks.enumerate() {
parts.push(tokio::task::spawn( parts.push(tokio::task::spawn(
self.s3_client self.s3_client
@ -122,10 +123,12 @@ impl<'a> Uploader<'a> {
debug!("complete multipart upload: {:#?}", complete_mp_upload); debug!("complete multipart upload: {:#?}", complete_mp_upload);
} }
let narinfo_url = format!("{}.narinfo", self.path.digest());
debug!("uploading narinfo with key {narinfo_url}");
self.s3_client self.s3_client
.put_object() .put_object()
.bucket(&self.bucket) .bucket(&self.bucket)
.key(format!("{}.narinfo", self.path.digest())) .key(narinfo_url)
.body(nar_info.to_string().as_bytes().to_vec().into()) .body(nar_info.to_string().as_bytes().to_vec().into())
.send() .send()
.await?; .await?;