Merge branch '2025-04-12'

This commit is contained in:
cy 2025-04-15 20:07:08 -04:00
commit 55a097d45c
Signed by: cy
SSH key fingerprint: SHA256:o/geVWV4om1QhUSkKvDQeW/eAihwnjyXkqMwrVdbuts
2 changed files with 13 additions and 12 deletions

View file

@ -25,8 +25,12 @@ impl PathInfo {
pub async fn from_path(path: &str) -> Result<Self> { pub async fn from_path(path: &str) -> Result<Self> {
debug!("query nix path-info for {path}"); debug!("query nix path-info for {path}");
// use lix cause nix would return a json map instead of an array // use lix cause nix would return a json map instead of an array
// json output is not stable and could break in future
// TODO figure out a better way
let nix_cmd = Command::new("nix") let nix_cmd = Command::new("nix")
.arg("run") .arg("run")
.arg("--experimental-features")
.arg("nix-command flakes")
.arg("github:nixos/nixpkgs/nixos-unstable#lix") .arg("github:nixos/nixpkgs/nixos-unstable#lix")
.arg("--") .arg("--")
.arg("path-info") .arg("path-info")

View file

@ -12,7 +12,7 @@ use aws_config::Region;
use aws_sdk_s3 as s3; use aws_sdk_s3 as s3;
use futures::future::join_all; use futures::future::join_all;
use nix_compat::narinfo::{self, SigningKey}; use nix_compat::narinfo::{self, SigningKey};
use tokio::sync::{RwLock, Semaphore, mpsc}; use tokio::sync::{RwLock, mpsc};
use tracing::{debug, info, trace}; use tracing::{debug, info, trace};
use url::Url; use url::Url;
@ -30,6 +30,8 @@ pub struct Push {
upstream_hit_count: AtomicUsize, upstream_hit_count: AtomicUsize,
// paths that we skipped cause they are already on our cache // paths that we skipped cause they are already on our cache
already_exists_count: AtomicUsize, already_exists_count: AtomicUsize,
// paths that we uploaded
upload_count: AtomicUsize,
} }
impl Push { impl Push {
@ -68,6 +70,7 @@ impl Push {
signature_hit_count: AtomicUsize::new(0), signature_hit_count: AtomicUsize::new(0),
upstream_hit_count: AtomicUsize::new(0), upstream_hit_count: AtomicUsize::new(0),
already_exists_count: AtomicUsize::new(0), already_exists_count: AtomicUsize::new(0),
upload_count: AtomicUsize::new(0),
}) })
} }
@ -98,7 +101,6 @@ impl Push {
/// filter paths that are on upstream and send to `tx` /// filter paths that are on upstream and send to `tx`
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) { async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
let permits = Arc::new(Semaphore::new(10));
let mut handles = Vec::with_capacity(10); let mut handles = Vec::with_capacity(10);
let store_paths = self.store_paths.read().await.clone(); let store_paths = self.store_paths.read().await.clone();
@ -109,11 +111,8 @@ impl Push {
continue; continue;
} }
handles.push({ handles.push({
let permits = permits.clone();
let tx = tx.clone(); let tx = tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _permit = permits.acquire().await.unwrap();
if !path if !path
.check_upstream_hit(self.upstream_caches.as_slice()) .check_upstream_hit(self.upstream_caches.as_slice())
.await .await
@ -143,16 +142,13 @@ impl Push {
} }
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> { async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
let upload_count = AtomicUsize::new(0);
let permits = Arc::new(Semaphore::new(10));
let mut uploads = Vec::with_capacity(10); let mut uploads = Vec::with_capacity(10);
loop { loop {
if let Some(path_to_upload) = rx.recv().await { if let Some(path_to_upload) = rx.recv().await {
let permits = Arc::clone(&permits);
let absolute_path = path_to_upload.absolute_path(); let absolute_path = path_to_upload.absolute_path();
info!("uploading: {}", absolute_path); println!("uploading: {}", absolute_path);
let uploader = Uploader::new( let uploader = Uploader::new(
&self.signing_key, &self.signing_key,
path_to_upload, path_to_upload,
@ -161,8 +157,9 @@ impl Push {
)?; )?;
uploads.push(tokio::spawn(async move { uploads.push(tokio::spawn(async move {
let _permit = permits.acquire().await.unwrap(); let res = uploader.upload().await;
uploader.upload().await self.upload_count.fetch_add(1, Ordering::Relaxed);
res
})); }));
} else { } else {
join_all(uploads) join_all(uploads)
@ -171,7 +168,7 @@ impl Push {
.flatten() .flatten()
.collect::<Result<Vec<_>>>()?; .collect::<Result<Vec<_>>>()?;
println!("uploaded: {}", upload_count.load(Ordering::Relaxed)); println!("uploaded: {}", self.upload_count.load(Ordering::Relaxed));
println!( println!(
"skipped because of signature match: {}", "skipped because of signature match: {}",
self.signature_hit_count.load(Ordering::Relaxed) self.signature_hit_count.load(Ordering::Relaxed)