Compare commits
5 commits
2f7ae745f5
...
39792cdd40
Author | SHA1 | Date | |
---|---|---|---|
39792cdd40 | |||
55a097d45c | |||
20c13a86be | |||
4895720831 | |||
03530a2afa |
3 changed files with 33 additions and 32 deletions
40
Cargo.lock
generated
40
Cargo.lock
generated
|
@ -84,9 +84,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "anyhow"
|
name = "anyhow"
|
||||||
version = "1.0.97"
|
version = "1.0.98"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "dcfed56ad506cb2c684a14971b8861fdc3baaaae314b9e5f9bb532cbe3ba7a4f"
|
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-compression"
|
name = "async-compression"
|
||||||
|
@ -414,7 +414,7 @@ dependencies = [
|
||||||
"aws-smithy-async",
|
"aws-smithy-async",
|
||||||
"aws-smithy-runtime-api",
|
"aws-smithy-runtime-api",
|
||||||
"aws-smithy-types",
|
"aws-smithy-types",
|
||||||
"h2 0.4.8",
|
"h2 0.4.9",
|
||||||
"http 0.2.12",
|
"http 0.2.12",
|
||||||
"http 1.3.1",
|
"http 1.3.1",
|
||||||
"http-body 0.4.6",
|
"http-body 0.4.6",
|
||||||
|
@ -425,7 +425,7 @@ dependencies = [
|
||||||
"hyper-util",
|
"hyper-util",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"rustls 0.21.12",
|
"rustls 0.21.12",
|
||||||
"rustls 0.23.25",
|
"rustls 0.23.26",
|
||||||
"rustls-native-certs 0.8.1",
|
"rustls-native-certs 0.8.1",
|
||||||
"rustls-pki-types",
|
"rustls-pki-types",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
@ -1298,9 +1298,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "h2"
|
name = "h2"
|
||||||
version = "0.4.8"
|
version = "0.4.9"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5017294ff4bb30944501348f6f8e42e6ad28f42c8bbef7a74029aff064a4e3c2"
|
checksum = "75249d144030531f8dee69fe9cea04d3edf809a017ae445e2abdff6629e86633"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"atomic-waker",
|
"atomic-waker",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
@ -1457,7 +1457,7 @@ dependencies = [
|
||||||
"bytes",
|
"bytes",
|
||||||
"futures-channel",
|
"futures-channel",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"h2 0.4.8",
|
"h2 0.4.9",
|
||||||
"http 1.3.1",
|
"http 1.3.1",
|
||||||
"http-body 1.0.1",
|
"http-body 1.0.1",
|
||||||
"httparse",
|
"httparse",
|
||||||
|
@ -1494,7 +1494,7 @@ dependencies = [
|
||||||
"http 1.3.1",
|
"http 1.3.1",
|
||||||
"hyper 1.6.0",
|
"hyper 1.6.0",
|
||||||
"hyper-util",
|
"hyper-util",
|
||||||
"rustls 0.23.25",
|
"rustls 0.23.26",
|
||||||
"rustls-native-certs 0.8.1",
|
"rustls-native-certs 0.8.1",
|
||||||
"rustls-pki-types",
|
"rustls-pki-types",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
@ -1748,9 +1748,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libc"
|
name = "libc"
|
||||||
version = "0.2.171"
|
version = "0.2.172"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
|
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libloading"
|
name = "libloading"
|
||||||
|
@ -1780,9 +1780,9 @@ checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "linux-raw-sys"
|
name = "linux-raw-sys"
|
||||||
version = "0.9.3"
|
version = "0.9.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "fe7db12097d22ec582439daf8618b8fdd1a7bef6270e9af3b1ebcd30893cf413"
|
checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "litemap"
|
name = "litemap"
|
||||||
|
@ -2310,7 +2310,7 @@ dependencies = [
|
||||||
"encoding_rs",
|
"encoding_rs",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"h2 0.4.8",
|
"h2 0.4.9",
|
||||||
"http 1.3.1",
|
"http 1.3.1",
|
||||||
"http-body 1.0.1",
|
"http-body 1.0.1",
|
||||||
"http-body-util",
|
"http-body-util",
|
||||||
|
@ -2404,14 +2404,14 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustix"
|
name = "rustix"
|
||||||
version = "1.0.3"
|
version = "1.0.5"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e56a18552996ac8d29ecc3b190b4fdbb2d91ca4ec396de7bbffaf43f3d637e96"
|
checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags",
|
||||||
"errno",
|
"errno",
|
||||||
"libc",
|
"libc",
|
||||||
"linux-raw-sys 0.9.3",
|
"linux-raw-sys 0.9.4",
|
||||||
"windows-sys 0.59.0",
|
"windows-sys 0.59.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -2429,9 +2429,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustls"
|
name = "rustls"
|
||||||
version = "0.23.25"
|
version = "0.23.26"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "822ee9188ac4ec04a2f0531e55d035fb2de73f18b41a63c70c2712503b6fb13c"
|
checksum = "df51b5869f3a441595eac5e8ff14d486ff285f7b8c0df8770e49c3b56351f0f0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aws-lc-rs",
|
"aws-lc-rs",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
|
@ -2837,7 +2837,7 @@ dependencies = [
|
||||||
"fastrand",
|
"fastrand",
|
||||||
"getrandom 0.3.2",
|
"getrandom 0.3.2",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"rustix 1.0.3",
|
"rustix 1.0.5",
|
||||||
"windows-sys 0.59.0",
|
"windows-sys 0.59.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -2966,7 +2966,7 @@ version = "0.26.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
|
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"rustls 0.23.25",
|
"rustls 0.23.26",
|
||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -25,8 +25,12 @@ impl PathInfo {
|
||||||
pub async fn from_path(path: &str) -> Result<Self> {
|
pub async fn from_path(path: &str) -> Result<Self> {
|
||||||
debug!("query nix path-info for {path}");
|
debug!("query nix path-info for {path}");
|
||||||
// use lix cause nix would return a json map instead of an array
|
// use lix cause nix would return a json map instead of an array
|
||||||
|
// json output is not stable and could break in future
|
||||||
|
// TODO figure out a better way
|
||||||
let nix_cmd = Command::new("nix")
|
let nix_cmd = Command::new("nix")
|
||||||
.arg("run")
|
.arg("run")
|
||||||
|
.arg("--experimental-features")
|
||||||
|
.arg("nix-command flakes")
|
||||||
.arg("github:nixos/nixpkgs/nixos-unstable#lix")
|
.arg("github:nixos/nixpkgs/nixos-unstable#lix")
|
||||||
.arg("--")
|
.arg("--")
|
||||||
.arg("path-info")
|
.arg("path-info")
|
||||||
|
|
21
src/push.rs
21
src/push.rs
|
@ -12,7 +12,7 @@ use aws_config::Region;
|
||||||
use aws_sdk_s3 as s3;
|
use aws_sdk_s3 as s3;
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use nix_compat::narinfo::{self, SigningKey};
|
use nix_compat::narinfo::{self, SigningKey};
|
||||||
use tokio::sync::{RwLock, Semaphore, mpsc};
|
use tokio::sync::{RwLock, mpsc};
|
||||||
use tracing::{debug, info, trace};
|
use tracing::{debug, info, trace};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
|
@ -30,6 +30,8 @@ pub struct Push {
|
||||||
upstream_hit_count: AtomicUsize,
|
upstream_hit_count: AtomicUsize,
|
||||||
// paths that we skipped cause they are already on our cache
|
// paths that we skipped cause they are already on our cache
|
||||||
already_exists_count: AtomicUsize,
|
already_exists_count: AtomicUsize,
|
||||||
|
// paths that we uploaded
|
||||||
|
upload_count: AtomicUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Push {
|
impl Push {
|
||||||
|
@ -68,6 +70,7 @@ impl Push {
|
||||||
signature_hit_count: AtomicUsize::new(0),
|
signature_hit_count: AtomicUsize::new(0),
|
||||||
upstream_hit_count: AtomicUsize::new(0),
|
upstream_hit_count: AtomicUsize::new(0),
|
||||||
already_exists_count: AtomicUsize::new(0),
|
already_exists_count: AtomicUsize::new(0),
|
||||||
|
upload_count: AtomicUsize::new(0),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,7 +101,6 @@ impl Push {
|
||||||
|
|
||||||
/// filter paths that are on upstream and send to `tx`
|
/// filter paths that are on upstream and send to `tx`
|
||||||
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
|
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
|
||||||
let permits = Arc::new(Semaphore::new(10));
|
|
||||||
let mut handles = Vec::with_capacity(10);
|
let mut handles = Vec::with_capacity(10);
|
||||||
let store_paths = self.store_paths.read().await.clone();
|
let store_paths = self.store_paths.read().await.clone();
|
||||||
|
|
||||||
|
@ -109,11 +111,8 @@ impl Push {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
handles.push({
|
handles.push({
|
||||||
let permits = permits.clone();
|
|
||||||
let tx = tx.clone();
|
let tx = tx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _permit = permits.acquire().await.unwrap();
|
|
||||||
|
|
||||||
if !path
|
if !path
|
||||||
.check_upstream_hit(self.upstream_caches.as_slice())
|
.check_upstream_hit(self.upstream_caches.as_slice())
|
||||||
.await
|
.await
|
||||||
|
@ -143,16 +142,13 @@ impl Push {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||||
let upload_count = AtomicUsize::new(0);
|
|
||||||
let permits = Arc::new(Semaphore::new(10));
|
|
||||||
let mut uploads = Vec::with_capacity(10);
|
let mut uploads = Vec::with_capacity(10);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Some(path_to_upload) = rx.recv().await {
|
if let Some(path_to_upload) = rx.recv().await {
|
||||||
let permits = Arc::clone(&permits);
|
|
||||||
let absolute_path = path_to_upload.absolute_path();
|
let absolute_path = path_to_upload.absolute_path();
|
||||||
|
|
||||||
info!("uploading: {}", absolute_path);
|
println!("uploading: {}", absolute_path);
|
||||||
let uploader = Uploader::new(
|
let uploader = Uploader::new(
|
||||||
&self.signing_key,
|
&self.signing_key,
|
||||||
path_to_upload,
|
path_to_upload,
|
||||||
|
@ -161,8 +157,9 @@ impl Push {
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
uploads.push(tokio::spawn(async move {
|
uploads.push(tokio::spawn(async move {
|
||||||
let _permit = permits.acquire().await.unwrap();
|
let res = uploader.upload().await;
|
||||||
uploader.upload().await
|
self.upload_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
res
|
||||||
}));
|
}));
|
||||||
} else {
|
} else {
|
||||||
join_all(uploads)
|
join_all(uploads)
|
||||||
|
@ -171,7 +168,7 @@ impl Push {
|
||||||
.flatten()
|
.flatten()
|
||||||
.collect::<Result<Vec<_>>>()?;
|
.collect::<Result<Vec<_>>>()?;
|
||||||
|
|
||||||
println!("uploaded: {}", upload_count.load(Ordering::Relaxed));
|
println!("uploaded: {}", self.upload_count.load(Ordering::Relaxed));
|
||||||
println!(
|
println!(
|
||||||
"skipped because of signature match: {}",
|
"skipped because of signature match: {}",
|
||||||
self.signature_hit_count.load(Ordering::Relaxed)
|
self.signature_hit_count.load(Ordering::Relaxed)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue