don't limit stuff with semaphore and use println for uploading output

This commit is contained in:
cy 2025-04-15 18:53:45 -04:00
parent d4cac65247
commit 03530a2afa
Signed by: cy
SSH key fingerprint: SHA256:o/geVWV4om1QhUSkKvDQeW/eAihwnjyXkqMwrVdbuts

View file

@ -12,7 +12,7 @@ use aws_config::Region;
use aws_sdk_s3 as s3;
use futures::future::join_all;
use nix_compat::narinfo::{self, SigningKey};
use tokio::sync::{RwLock, Semaphore, mpsc};
use tokio::sync::{RwLock, mpsc};
use tracing::{debug, info, trace};
use url::Url;
@ -98,7 +98,6 @@ impl Push {
/// filter paths that are on upstream and send to `tx`
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
let permits = Arc::new(Semaphore::new(10));
let mut handles = Vec::with_capacity(10);
let store_paths = self.store_paths.read().await.clone();
@ -109,11 +108,8 @@ impl Push {
continue;
}
handles.push({
let permits = permits.clone();
let tx = tx.clone();
tokio::spawn(async move {
let _permit = permits.acquire().await.unwrap();
if !path
.check_upstream_hit(self.upstream_caches.as_slice())
.await
@ -144,15 +140,13 @@ impl Push {
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
let upload_count = AtomicUsize::new(0);
let permits = Arc::new(Semaphore::new(10));
let mut uploads = Vec::with_capacity(10);
loop {
if let Some(path_to_upload) = rx.recv().await {
let permits = Arc::clone(&permits);
let absolute_path = path_to_upload.absolute_path();
info!("uploading: {}", absolute_path);
println!("uploading: {}", absolute_path);
let uploader = Uploader::new(
&self.signing_key,
path_to_upload,
@ -160,10 +154,7 @@ impl Push {
self.bucket.clone(),
)?;
uploads.push(tokio::spawn(async move {
let _permit = permits.acquire().await.unwrap();
uploader.upload().await
}));
uploads.push(tokio::spawn(async move { uploader.upload().await }));
} else {
join_all(uploads)
.await