limit uploads with semaphore
This commit is contained in:
parent
84bbe5dcb4
commit
6806b96892
2 changed files with 12 additions and 5 deletions
16
src/push.rs
16
src/push.rs
|
@ -13,7 +13,7 @@ use aws_config::Region;
|
|||
use aws_sdk_s3 as s3;
|
||||
use futures::future::join_all;
|
||||
use nix_compat::narinfo::{self, SigningKey};
|
||||
use tokio::sync::{RwLock, mpsc};
|
||||
use tokio::sync::{RwLock, Semaphore, mpsc};
|
||||
use tracing::{debug, trace};
|
||||
use url::Url;
|
||||
|
||||
|
@ -161,6 +161,7 @@ impl Push {
|
|||
|
||||
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||
let mut uploads = Vec::with_capacity(10);
|
||||
let permits = Arc::new(Semaphore::new(10));
|
||||
|
||||
loop {
|
||||
if let Some(path_to_upload) = rx.recv().await {
|
||||
|
@ -174,10 +175,15 @@ impl Push {
|
|||
self.bucket.clone(),
|
||||
)?;
|
||||
|
||||
uploads.push(tokio::spawn(async move {
|
||||
let res = uploader.upload().await;
|
||||
self.upload_count.fetch_add(1, Ordering::Relaxed);
|
||||
res
|
||||
uploads.push(tokio::spawn({
|
||||
let permits = permits.clone();
|
||||
|
||||
async move {
|
||||
let _permit = permits.acquire().await;
|
||||
let res = uploader.upload().await;
|
||||
self.upload_count.fetch_add(1, Ordering::Relaxed);
|
||||
res
|
||||
}
|
||||
}));
|
||||
} else {
|
||||
join_all(uploads)
|
||||
|
|
|
@ -132,6 +132,7 @@ impl<'a> Uploader<'a> {
|
|||
.body(nar_info.to_string().as_bytes().to_vec().into())
|
||||
.send()
|
||||
.await?;
|
||||
debug!("done uploading narinfo");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue