Compare commits

..

No commits in common. "39792cdd403ab785830d65b767af0f4d3417e5f1" and "2f7ae745f5d88a6071952f889d3fb6d6656239d7" have entirely different histories.

3 changed files with 32 additions and 33 deletions

40
Cargo.lock generated
View file

@ -84,9 +84,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.98"
version = "1.0.97"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
[[package]]
name = "async-compression"
@ -414,7 +414,7 @@ dependencies = [
"aws-smithy-async",
"aws-smithy-runtime-api",
"aws-smithy-types",
"h2 0.4.9",
"h2 0.4.8",
"http 0.2.12",
"http 1.3.1",
"http-body 0.4.6",
@ -425,7 +425,7 @@ dependencies = [
"hyper-util",
"pin-project-lite",
"rustls 0.21.12",
"rustls 0.23.26",
"rustls 0.23.25",
"rustls-native-certs 0.8.1",
"rustls-pki-types",
"tokio",
@ -1298,9 +1298,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.4.9"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75249d144030531f8dee69fe9cea04d3edf809a017ae445e2abdff6629e86633"
checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2"
dependencies = [
"atomic-waker",
"bytes",
@ -1457,7 +1457,7 @@ dependencies = [
"bytes",
"futures-channel",
"futures-util",
"h2 0.4.9",
"h2 0.4.8",
"http 1.3.1",
"http-body 1.0.1",
"httparse",
@ -1494,7 +1494,7 @@ dependencies = [
"http 1.3.1",
"hyper 1.6.0",
"hyper-util",
"rustls 0.23.26",
"rustls 0.23.25",
"rustls-native-certs 0.8.1",
"rustls-pki-types",
"tokio",
@ -1748,9 +1748,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.172"
version = "0.2.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
[[package]]
name = "libloading"
@ -1780,9 +1780,9 @@ checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab"
[[package]]
name = "linux-raw-sys"
version = "0.9.4"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12"
checksum = "fe7db12097d22ec582439daf8618b8fdd1a7bef6270e9af3b1ebcd30893cf413"
[[package]]
name = "litemap"
@ -2310,7 +2310,7 @@ dependencies = [
"encoding_rs",
"futures-core",
"futures-util",
"h2 0.4.9",
"h2 0.4.8",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
@ -2404,14 +2404,14 @@ dependencies = [
[[package]]
name = "rustix"
version = "1.0.5"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf"
checksum = "e56a18552996ac8d29ecc3b190b4fdbb2d91ca4ec396de7bbffaf43f3d637e96"
dependencies = [
"bitflags",
"errno",
"libc",
"linux-raw-sys 0.9.4",
"linux-raw-sys 0.9.3",
"windows-sys 0.59.0",
]
@ -2429,9 +2429,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.23.26"
version = "0.23.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df51b5869f3a441595eac5e8ff14d486ff285f7b8c0df8770e49c3b56351f0f0"
checksum = "822ee9188ac4ec04a2f0531e55d035fb2de73f18b41a63c70c2712503b6fb13c"
dependencies = [
"aws-lc-rs",
"once_cell",
@ -2837,7 +2837,7 @@ dependencies = [
"fastrand",
"getrandom 0.3.2",
"once_cell",
"rustix 1.0.5",
"rustix 1.0.3",
"windows-sys 0.59.0",
]
@ -2966,7 +2966,7 @@ version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
dependencies = [
"rustls 0.23.26",
"rustls 0.23.25",
"tokio",
]

View file

@ -25,12 +25,8 @@ impl PathInfo {
pub async fn from_path(path: &str) -> Result<Self> {
debug!("query nix path-info for {path}");
// use lix cause nix would return a json map instead of an array
// json output is not stable and could break in future
// TODO figure out a better way
let nix_cmd = Command::new("nix")
.arg("run")
.arg("--experimental-features")
.arg("nix-command flakes")
.arg("github:nixos/nixpkgs/nixos-unstable#lix")
.arg("--")
.arg("path-info")

View file

@ -12,7 +12,7 @@ use aws_config::Region;
use aws_sdk_s3 as s3;
use futures::future::join_all;
use nix_compat::narinfo::{self, SigningKey};
use tokio::sync::{RwLock, mpsc};
use tokio::sync::{RwLock, Semaphore, mpsc};
use tracing::{debug, info, trace};
use url::Url;
@ -30,8 +30,6 @@ pub struct Push {
upstream_hit_count: AtomicUsize,
// paths that we skipped cause they are already on our cache
already_exists_count: AtomicUsize,
// paths that we uploaded
upload_count: AtomicUsize,
}
impl Push {
@ -70,7 +68,6 @@ impl Push {
signature_hit_count: AtomicUsize::new(0),
upstream_hit_count: AtomicUsize::new(0),
already_exists_count: AtomicUsize::new(0),
upload_count: AtomicUsize::new(0),
})
}
@ -101,6 +98,7 @@ impl Push {
/// filter paths that are on upstream and send to `tx`
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
let permits = Arc::new(Semaphore::new(10));
let mut handles = Vec::with_capacity(10);
let store_paths = self.store_paths.read().await.clone();
@ -111,8 +109,11 @@ impl Push {
continue;
}
handles.push({
let permits = permits.clone();
let tx = tx.clone();
tokio::spawn(async move {
let _permit = permits.acquire().await.unwrap();
if !path
.check_upstream_hit(self.upstream_caches.as_slice())
.await
@ -142,13 +143,16 @@ impl Push {
}
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
let upload_count = AtomicUsize::new(0);
let permits = Arc::new(Semaphore::new(10));
let mut uploads = Vec::with_capacity(10);
loop {
if let Some(path_to_upload) = rx.recv().await {
let permits = Arc::clone(&permits);
let absolute_path = path_to_upload.absolute_path();
println!("uploading: {}", absolute_path);
info!("uploading: {}", absolute_path);
let uploader = Uploader::new(
&self.signing_key,
path_to_upload,
@ -157,9 +161,8 @@ impl Push {
)?;
uploads.push(tokio::spawn(async move {
let res = uploader.upload().await;
self.upload_count.fetch_add(1, Ordering::Relaxed);
res
let _permit = permits.acquire().await.unwrap();
uploader.upload().await
}));
} else {
join_all(uploads)
@ -168,7 +171,7 @@ impl Push {
.flatten()
.collect::<Result<Vec<_>>>()?;
println!("uploaded: {}", self.upload_count.load(Ordering::Relaxed));
println!("uploaded: {}", upload_count.load(Ordering::Relaxed));
println!(
"skipped because of signature match: {}",
self.signature_hit_count.load(Ordering::Relaxed)