diff --git a/src/main.rs b/src/main.rs index 45348b3..3a7cc78 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,5 @@ #![feature(let_chains)] #![feature(extend_one)] -#![feature(array_chunks)] use anyhow::{Context, Result}; use clap::{Parser, Subcommand}; @@ -38,7 +37,7 @@ struct Cli { region: Option, /// If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default - /// e.g. s3.example.com + /// e.g. https://s3.example.com #[arg(long)] endpoint: Option, @@ -62,6 +61,7 @@ async fn main() -> Result<()> { let filter = EnvFilter::from_default_env(); let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish(); tracing::subscriber::set_global_default(subscriber)?; + let cli = Cli::parse(); let nixcp = Box::leak(Box::new(NixCp::new(&cli).await?)); diff --git a/src/nixcp.rs b/src/nixcp.rs index c0298bd..a215923 100644 --- a/src/nixcp.rs +++ b/src/nixcp.rs @@ -2,7 +2,7 @@ use std::{ fs, iter::once, sync::{ - Arc, Mutex, + Arc, atomic::{AtomicUsize, Ordering}, }, }; @@ -13,7 +13,7 @@ use aws_sdk_s3 as s3; use futures::future::join_all; use nix_compat::narinfo::{self, SigningKey}; use tokio::sync::{RwLock, Semaphore, mpsc}; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, info, trace}; use url::Url; use crate::{Cli, path_info::PathInfo, uploader::Uploader}; @@ -132,7 +132,6 @@ impl NixCp { async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { let upload_count = AtomicUsize::new(0); - let failures: Arc>> = Arc::new(Mutex::new(Vec::new())); let permits = Arc::new(Semaphore::new(10)); let mut uploads = Vec::with_capacity(10); @@ -155,7 +154,12 @@ impl NixCp { }); uploads.push(fut); } else { - join_all(uploads).await; + join_all(uploads) + .await + .into_iter() + .flatten() + .collect::>>()?; + println!("uploaded: {}", upload_count.load(Ordering::Relaxed)); println!( "skipped because of signature match: {}", @@ -165,14 +169,6 @@ impl NixCp { "skipped because of upstream hit: {}", self.upstream_hit_count.load(Ordering::Relaxed) ); - - let failures = failures.lock().unwrap(); - if !failures.is_empty() { - warn!("failed to upload these paths: "); - for failure in failures.iter() { - warn!("{}", failure); - } - } break; } } diff --git a/src/uploader.rs b/src/uploader.rs index 558e411..e52e4ae 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -46,13 +46,14 @@ impl<'a> Uploader<'a> { let nar = self.compress_nar(&nar).await; // update fields that we know after compression - nar_info.file_size = Some(nar.len() as u64); let mut hasher = Sha256::new(); hasher.update(&nar); let hash: [u8; 32] = hasher.finalize().into(); let nar_url = self.nar_url(&hash); nar_info.file_hash = Some(hash); - debug!("uploading to bucket with key: {nar_url}"); + nar_info.file_size = Some(nar.len() as u64); + nar_info.url = nar_url.as_str(); + debug!("uploading nar with key: {nar_url}"); if nar.len() < MULTIPART_CUTOFF { let put_object = self @@ -75,7 +76,7 @@ impl<'a> Uploader<'a> { let upload_id = multipart.upload_id().unwrap(); let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF); - let chunks = nar.array_chunks::(); + let chunks = nar.chunks(MULTIPART_CUTOFF); for (i, chunk) in chunks.enumerate() { parts.push(tokio::task::spawn( self.s3_client @@ -122,10 +123,12 @@ impl<'a> Uploader<'a> { debug!("complete multipart upload: {:#?}", complete_mp_upload); } + let narinfo_url = format!("{}.narinfo", self.path.digest()); + debug!("uploading narinfo with key {narinfo_url}"); self.s3_client .put_object() .bucket(&self.bucket) - .key(format!("{}.narinfo", self.path.digest())) + .key(narinfo_url) .body(nar_info.to_string().as_bytes().to_vec().into()) .send() .await?;