limit directories even more
This commit is contained in:
parent
85fefe9e77
commit
ca97aebd7a
1 changed files with 13 additions and 9 deletions
22
src/push.rs
22
src/push.rs
|
@ -18,8 +18,6 @@ use url::Url;
|
||||||
|
|
||||||
use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
|
use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
|
||||||
|
|
||||||
const UPLOAD_CONCURRENCY: usize = 5;
|
|
||||||
|
|
||||||
pub struct Push {
|
pub struct Push {
|
||||||
upstream_caches: Vec<Url>,
|
upstream_caches: Vec<Url>,
|
||||||
store_paths: Arc<RwLock<Vec<PathInfo>>>,
|
store_paths: Arc<RwLock<Vec<PathInfo>>>,
|
||||||
|
@ -105,7 +103,7 @@ impl Push {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&'static self) -> Result<()> {
|
pub async fn run(&'static self) -> Result<()> {
|
||||||
let (tx, rx) = mpsc::channel(UPLOAD_CONCURRENCY);
|
let (tx, rx) = mpsc::channel(1);
|
||||||
let filter = tokio::spawn(self.filter_from_upstream(tx));
|
let filter = tokio::spawn(self.filter_from_upstream(tx));
|
||||||
let upload = tokio::spawn(self.upload(rx));
|
let upload = tokio::spawn(self.upload(rx));
|
||||||
|
|
||||||
|
@ -155,19 +153,25 @@ impl Push {
|
||||||
|
|
||||||
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||||
let mut uploads = Vec::new();
|
let mut uploads = Vec::new();
|
||||||
let permits = Arc::new(Semaphore::new(UPLOAD_CONCURRENCY));
|
let permits = Arc::new(Semaphore::new(10));
|
||||||
|
let big_permits = Arc::new(Semaphore::new(2));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let permits = permits.clone();
|
let permits = permits.clone();
|
||||||
debug!("upload permits available: {}", permits.available_permits());
|
let big_permits = big_permits.clone();
|
||||||
let permit = permits.acquire_owned().await.unwrap();
|
|
||||||
|
|
||||||
if let Some(path_to_upload) = rx.recv().await {
|
if let Some(path_to_upload) = rx.recv().await {
|
||||||
println!("uploading: {}", path_to_upload.absolute_path());
|
let mut permit = permits.acquire_owned().await.unwrap();
|
||||||
|
|
||||||
let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
|
|
||||||
|
|
||||||
uploads.push(tokio::spawn({
|
uploads.push(tokio::spawn({
|
||||||
|
// directory may have many files and end up causing "too many open files"
|
||||||
|
if PathBuf::from(path_to_upload.absolute_path()).is_dir() {
|
||||||
|
// drop regular permit and take the big one
|
||||||
|
permit = big_permits.acquire_owned().await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("uploading: {}", path_to_upload.absolute_path());
|
||||||
|
let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
|
||||||
let s3 = self.s3.clone();
|
let s3 = self.s3.clone();
|
||||||
let store = self.store.clone();
|
let store = self.store.clone();
|
||||||
async move {
|
async move {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue