use tracing for logs

This commit is contained in:
cy 2025-04-13 23:10:50 -04:00
parent 202b222b83
commit b1134d5d6e
Signed by: cy
SSH key fingerprint: SHA256:o/geVWV4om1QhUSkKvDQeW/eAihwnjyXkqMwrVdbuts
6 changed files with 163 additions and 81 deletions

View file

@ -11,9 +11,9 @@ use anyhow::{Context, Result};
use aws_config::Region;
use aws_sdk_s3 as s3;
use futures::future::join_all;
use log::{debug, info, warn};
use nix_compat::narinfo::{self, SigningKey};
use tokio::sync::{RwLock, Semaphore, mpsc};
use tracing::{debug, info, trace, warn};
use url::Url;
use crate::{Cli, path_info::PathInfo, uploader::Uploader};
@ -24,6 +24,10 @@ pub struct NixCp {
s3_client: s3::Client,
signing_key: SigningKey<ed25519_dalek::SigningKey>,
bucket: String,
// paths that we skipped cause of a signature match
signature_hit_count: AtomicUsize,
// paths that we skipped cause we found it on an upstream
upstream_hit_count: AtomicUsize,
}
impl NixCp {
@ -59,6 +63,8 @@ impl NixCp {
s3_client,
signing_key,
bucket: cli.bucket.clone(),
signature_hit_count: AtomicUsize::new(0),
upstream_hit_count: AtomicUsize::new(0),
})
}
@ -81,18 +87,23 @@ impl NixCp {
pub async fn run(&'static self) -> Result<()> {
let (tx, rx) = mpsc::channel(10);
let tx = Arc::new(tx);
tokio::spawn(self.filter_from_upstream(tx));
self.upload(rx).await
let filter = tokio::spawn(self.filter_from_upstream(tx));
let upload = tokio::spawn(self.upload(rx));
filter.await?;
upload.await??;
Ok(())
}
/// filter paths that are on upstream and send to `tx`
async fn filter_from_upstream(&self, tx: Arc<mpsc::Sender<PathInfo>>) {
async fn filter_from_upstream(&'static self, tx: Arc<mpsc::Sender<PathInfo>>) {
let permits = Arc::new(Semaphore::new(10));
let mut handles = Vec::with_capacity(10);
let store_paths = self.store_paths.read().await.clone();
for path in store_paths.into_iter() {
if path.check_upstream_signature(&self.upstream_caches) {
trace!("skip {} (signature match)", path.absolute_path());
self.signature_hit_count.fetch_add(1, Ordering::Release);
continue;
}
handles.push({
@ -104,14 +115,19 @@ impl NixCp {
if !path.check_upstream_hit(upstream_caches.as_slice()).await {
tx.send(path).await.unwrap();
} else {
trace!("skip {} (upstream hit)", path.absolute_path());
self.upstream_hit_count.fetch_add(1, Ordering::Relaxed);
}
})
});
}
for handle in handles {
handle.await.unwrap();
}
join_all(handles)
.await
.into_iter()
.collect::<std::result::Result<(), _>>()
.unwrap();
}
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
@ -140,7 +156,15 @@ impl NixCp {
uploads.push(fut);
} else {
join_all(uploads).await;
println!("uploaded {} paths", upload_count.load(Ordering::Relaxed));
println!("uploaded: {}", upload_count.load(Ordering::Relaxed));
println!(
"skipped because of signature match: {}",
self.signature_hit_count.load(Ordering::Relaxed)
);
println!(
"skipped because of upstream hit: {}",
self.upstream_hit_count.load(Ordering::Relaxed)
);
let failures = failures.lock().unwrap();
if !failures.is_empty() {