simplify here since the problem was somewhere else
This commit is contained in:
parent
e5336d304d
commit
b49be95d09
3 changed files with 21 additions and 18 deletions
16
Cargo.lock
generated
16
Cargo.lock
generated
|
@ -968,6 +968,15 @@ version = "1.0.3"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "humansize"
|
||||||
|
version = "2.1.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7"
|
||||||
|
dependencies = [
|
||||||
|
"libm",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "humantime"
|
name = "humantime"
|
||||||
version = "2.2.0"
|
version = "2.2.0"
|
||||||
|
@ -1304,6 +1313,12 @@ version = "0.2.172"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
|
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "libm"
|
||||||
|
version = "0.2.13"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libmimalloc-sys"
|
name = "libmimalloc-sys"
|
||||||
version = "0.1.42"
|
version = "0.1.42"
|
||||||
|
@ -1491,6 +1506,7 @@ dependencies = [
|
||||||
"cxx-build",
|
"cxx-build",
|
||||||
"ed25519-dalek",
|
"ed25519-dalek",
|
||||||
"futures",
|
"futures",
|
||||||
|
"humansize",
|
||||||
"nix-compat",
|
"nix-compat",
|
||||||
"object_store",
|
"object_store",
|
||||||
"pkg-config",
|
"pkg-config",
|
||||||
|
|
|
@ -30,6 +30,7 @@ bytes = "1.10.1"
|
||||||
object_store = { version = "0.12.0", features = ["aws"] }
|
object_store = { version = "0.12.0", features = ["aws"] }
|
||||||
ulid = "1.2.1"
|
ulid = "1.2.1"
|
||||||
tracing-subscriber = "0.3.19"
|
tracing-subscriber = "0.3.19"
|
||||||
|
humansize = "2.1.3"
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
cxx-build = "1.0"
|
cxx-build = "1.0"
|
||||||
|
|
22
src/push.rs
22
src/push.rs
|
@ -10,6 +10,7 @@ use std::{
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
|
use humansize::{DECIMAL, format_size};
|
||||||
use nix_compat::narinfo::{self, SigningKey};
|
use nix_compat::narinfo::{self, SigningKey};
|
||||||
use object_store::aws::{AmazonS3, AmazonS3Builder};
|
use object_store::aws::{AmazonS3, AmazonS3Builder};
|
||||||
use tokio::sync::{RwLock, Semaphore, mpsc};
|
use tokio::sync::{RwLock, Semaphore, mpsc};
|
||||||
|
@ -157,34 +158,19 @@ impl Push {
|
||||||
|
|
||||||
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||||
let mut uploads = Vec::new();
|
let mut uploads = Vec::new();
|
||||||
let permits = Arc::new(Semaphore::new(16));
|
let permits = Arc::new(Semaphore::new(32));
|
||||||
let big_permits = Arc::new(Semaphore::new(5));
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let permits = permits.clone();
|
let permits = permits.clone();
|
||||||
let big_permits = big_permits.clone();
|
|
||||||
|
|
||||||
if let Some(path_to_upload) = rx.recv().await {
|
if let Some(path_to_upload) = rx.recv().await {
|
||||||
debug!("upload permits available: {}", permits.available_permits());
|
let permit = permits.acquire_owned().await.unwrap();
|
||||||
let mut permit = permits.acquire_owned().await.unwrap();
|
|
||||||
|
|
||||||
uploads.push(tokio::spawn({
|
uploads.push(tokio::spawn({
|
||||||
// a large directory may have many files and end up causing "too many open files"
|
|
||||||
if PathBuf::from(path_to_upload.absolute_path()).is_dir()
|
|
||||||
&& path_to_upload.nar_size > 5 * 1024 * 1024
|
|
||||||
{
|
|
||||||
debug!(
|
|
||||||
"upload big permits available: {}",
|
|
||||||
big_permits.available_permits()
|
|
||||||
);
|
|
||||||
// drop regular permit and take the big one
|
|
||||||
permit = big_permits.acquire_owned().await.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
println!(
|
println!(
|
||||||
"uploading: {} (size: {})",
|
"uploading: {} (size: {})",
|
||||||
path_to_upload.absolute_path(),
|
path_to_upload.absolute_path(),
|
||||||
path_to_upload.nar_size
|
format_size(path_to_upload.nar_size, DECIMAL)
|
||||||
);
|
);
|
||||||
let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
|
let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
|
||||||
let s3 = self.s3.clone();
|
let s3 = self.s3.clone();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue