diff --git a/src/nixcp.rs b/src/nixcp.rs index a215923..f5b3dce 100644 --- a/src/nixcp.rs +++ b/src/nixcp.rs @@ -19,7 +19,7 @@ use url::Url; use crate::{Cli, path_info::PathInfo, uploader::Uploader}; pub struct NixCp { - upstream_caches: Arc>, + upstream_caches: Vec, store_paths: Arc>>, s3_client: s3::Client, signing_key: SigningKey, @@ -28,6 +28,8 @@ pub struct NixCp { signature_hit_count: AtomicUsize, // paths that we skipped cause we found it on an upstream upstream_hit_count: AtomicUsize, + // paths that we skipped cause they are already on our cache + already_exists_count: AtomicUsize, } impl NixCp { @@ -58,13 +60,14 @@ impl NixCp { let s3_client = s3::Client::new(&s3_config.load().await); Ok(Self { - upstream_caches: Arc::new(upstreams), + upstream_caches: upstreams, store_paths: Arc::new(RwLock::new(Vec::new())), s3_client, signing_key, bucket: cli.bucket.clone(), signature_hit_count: AtomicUsize::new(0), upstream_hit_count: AtomicUsize::new(0), + already_exists_count: AtomicUsize::new(0), }) } @@ -86,7 +89,6 @@ impl NixCp { pub async fn run(&'static self) -> Result<()> { let (tx, rx) = mpsc::channel(10); - let tx = Arc::new(tx); let filter = tokio::spawn(self.filter_from_upstream(tx)); let upload = tokio::spawn(self.upload(rx)); filter.await?; @@ -95,7 +97,7 @@ impl NixCp { } /// filter paths that are on upstream and send to `tx` - async fn filter_from_upstream(&'static self, tx: Arc>) { + async fn filter_from_upstream(&'static self, tx: mpsc::Sender) { let permits = Arc::new(Semaphore::new(10)); let mut handles = Vec::with_capacity(10); let store_paths = self.store_paths.read().await.clone(); @@ -109,12 +111,22 @@ impl NixCp { handles.push({ let permits = permits.clone(); let tx = tx.clone(); - let upstream_caches = self.upstream_caches.clone(); tokio::spawn(async move { let _permit = permits.acquire().await.unwrap(); - if !path.check_upstream_hit(upstream_caches.as_slice()).await { - tx.send(path).await.unwrap(); + if !path + .check_upstream_hit(self.upstream_caches.as_slice()) + .await + { + if path + .check_if_already_exists(&self.s3_client, self.bucket.clone()) + .await + { + trace!("skip {} (already exists)", path.absolute_path()); + self.already_exists_count.fetch_add(1, Ordering::Relaxed); + } else { + tx.send(path).await.unwrap(); + } } else { trace!("skip {} (upstream hit)", path.absolute_path()); self.upstream_hit_count.fetch_add(1, Ordering::Relaxed); @@ -169,6 +181,10 @@ impl NixCp { "skipped because of upstream hit: {}", self.upstream_hit_count.load(Ordering::Relaxed) ); + println!( + "skipped because already exist: {}", + self.already_exists_count.load(Ordering::Relaxed) + ); break; } } diff --git a/src/path_info.rs b/src/path_info.rs index 57e0a5b..2bf05ea 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use anyhow::{Context, Error, Result}; +use aws_sdk_s3 as s3; use nix_compat::store_path::StorePath; use nix_compat::{nixbase32, nixhash::CAHash}; use regex::Regex; @@ -99,8 +100,9 @@ impl PathInfo { pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool { for upstream in upstreams { let upstream = upstream - .join(format!("{}/.narinfo", self.digest()).as_str()) + .join(format!("{}.narinfo", self.digest()).as_str()) .expect("adding .narinfo should make a valid url"); + debug!("querying {}", upstream); let res_status = reqwest::Client::new() .head(upstream.as_str()) .send() @@ -121,6 +123,16 @@ impl PathInfo { pub fn digest(&self) -> String { nixbase32::encode(self.path.digest()) } + + pub async fn check_if_already_exists(&self, s3_client: &s3::Client, bucket: String) -> bool { + !s3_client + .head_object() + .bucket(bucket) + .key(format!("{}.narinfo", self.digest())) + .send() + .await + .is_err() + } } /*