Compare commits
3 commits
2c252a42c5
...
57a7ab944b
Author | SHA1 | Date | |
---|---|---|---|
57a7ab944b | |||
b1134d5d6e | |||
202b222b83 |
6 changed files with 188 additions and 108 deletions
191
Cargo.lock
generated
191
Cargo.lock
generated
|
@ -647,7 +647,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4"
|
checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"memchr",
|
"memchr",
|
||||||
"regex-automata",
|
"regex-automata 0.4.9",
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -1052,29 +1052,6 @@ dependencies = [
|
||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "env_filter"
|
|
||||||
version = "0.1.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0"
|
|
||||||
dependencies = [
|
|
||||||
"log",
|
|
||||||
"regex",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "env_logger"
|
|
||||||
version = "0.11.7"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "c3716d7a920fb4fac5d84e9d4bce8ceb321e9414b4409da61b07b75c1e3d0697"
|
|
||||||
dependencies = [
|
|
||||||
"anstream",
|
|
||||||
"anstyle",
|
|
||||||
"env_filter",
|
|
||||||
"jiff",
|
|
||||||
"log",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "equivalent"
|
name = "equivalent"
|
||||||
version = "1.0.2"
|
version = "1.0.2"
|
||||||
|
@ -1737,30 +1714,6 @@ version = "1.0.15"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
|
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "jiff"
|
|
||||||
version = "0.2.5"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "c102670231191d07d37a35af3eb77f1f0dbf7a71be51a962dcd57ea607be7260"
|
|
||||||
dependencies = [
|
|
||||||
"jiff-static",
|
|
||||||
"log",
|
|
||||||
"portable-atomic",
|
|
||||||
"portable-atomic-util",
|
|
||||||
"serde",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "jiff-static"
|
|
||||||
version = "0.2.5"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "4cdde31a9d349f1b1f51a0b3714a5940ac022976f4b49485fc04be052b183b4c"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "jobserver"
|
name = "jobserver"
|
||||||
version = "0.1.33"
|
version = "0.1.33"
|
||||||
|
@ -1862,6 +1815,15 @@ dependencies = [
|
||||||
"hashbrown",
|
"hashbrown",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "matchers"
|
||||||
|
version = "0.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
|
||||||
|
dependencies = [
|
||||||
|
"regex-automata 0.1.10",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "md-5"
|
name = "md-5"
|
||||||
version = "0.10.6"
|
version = "0.10.6"
|
||||||
|
@ -1984,9 +1946,7 @@ dependencies = [
|
||||||
"aws-sdk-s3",
|
"aws-sdk-s3",
|
||||||
"clap",
|
"clap",
|
||||||
"ed25519-dalek",
|
"ed25519-dalek",
|
||||||
"env_logger",
|
|
||||||
"futures",
|
"futures",
|
||||||
"log",
|
|
||||||
"nix-compat",
|
"nix-compat",
|
||||||
"regex",
|
"regex",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
|
@ -1994,6 +1954,8 @@ dependencies = [
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sha2",
|
"sha2",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
"tracing-subscriber",
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -2016,6 +1978,16 @@ dependencies = [
|
||||||
"memchr",
|
"memchr",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "nu-ansi-term"
|
||||||
|
version = "0.46.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
|
||||||
|
dependencies = [
|
||||||
|
"overload",
|
||||||
|
"winapi",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "num-conv"
|
name = "num-conv"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
@ -2126,6 +2098,12 @@ version = "0.5.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e"
|
checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "overload"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "p256"
|
name = "p256"
|
||||||
version = "0.11.1"
|
version = "0.11.1"
|
||||||
|
@ -2204,21 +2182,6 @@ version = "0.3.32"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
|
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "portable-atomic"
|
|
||||||
version = "1.11.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "portable-atomic-util"
|
|
||||||
version = "0.2.4"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
|
|
||||||
dependencies = [
|
|
||||||
"portable-atomic",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "powerfmt"
|
name = "powerfmt"
|
||||||
version = "0.2.0"
|
version = "0.2.0"
|
||||||
|
@ -2294,8 +2257,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aho-corasick",
|
"aho-corasick",
|
||||||
"memchr",
|
"memchr",
|
||||||
"regex-automata",
|
"regex-automata 0.4.9",
|
||||||
"regex-syntax",
|
"regex-syntax 0.8.5",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "regex-automata"
|
||||||
|
version = "0.1.10"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
|
||||||
|
dependencies = [
|
||||||
|
"regex-syntax 0.6.29",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -2306,7 +2278,7 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aho-corasick",
|
"aho-corasick",
|
||||||
"memchr",
|
"memchr",
|
||||||
"regex-syntax",
|
"regex-syntax 0.8.5",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -2315,6 +2287,12 @@ version = "0.1.6"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a"
|
checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "regex-syntax"
|
||||||
|
version = "0.6.29"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "regex-syntax"
|
name = "regex-syntax"
|
||||||
version = "0.8.5"
|
version = "0.8.5"
|
||||||
|
@ -2692,6 +2670,15 @@ dependencies = [
|
||||||
"digest",
|
"digest",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "sharded-slab"
|
||||||
|
version = "0.1.7"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
|
||||||
|
dependencies = [
|
||||||
|
"lazy_static",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "shlex"
|
name = "shlex"
|
||||||
version = "1.3.0"
|
version = "1.3.0"
|
||||||
|
@ -2874,6 +2861,16 @@ dependencies = [
|
||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "thread_local"
|
||||||
|
version = "1.1.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"once_cell",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "time"
|
name = "time"
|
||||||
version = "0.3.41"
|
version = "0.3.41"
|
||||||
|
@ -3059,6 +3056,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
|
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"once_cell",
|
"once_cell",
|
||||||
|
"valuable",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tracing-log"
|
||||||
|
version = "0.2.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
|
||||||
|
dependencies = [
|
||||||
|
"log",
|
||||||
|
"once_cell",
|
||||||
|
"tracing-core",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tracing-subscriber"
|
||||||
|
version = "0.3.19"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
|
||||||
|
dependencies = [
|
||||||
|
"matchers",
|
||||||
|
"nu-ansi-term",
|
||||||
|
"once_cell",
|
||||||
|
"regex",
|
||||||
|
"sharded-slab",
|
||||||
|
"smallvec",
|
||||||
|
"thread_local",
|
||||||
|
"tracing",
|
||||||
|
"tracing-core",
|
||||||
|
"tracing-log",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -3127,6 +3154,12 @@ version = "1.16.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9"
|
checksum = "458f7a779bf54acc9f347480ac654f68407d3aab21269a6e3c9f922acd9e2da9"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "valuable"
|
||||||
|
version = "0.1.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "vcpkg"
|
name = "vcpkg"
|
||||||
version = "0.2.15"
|
version = "0.2.15"
|
||||||
|
@ -3262,6 +3295,28 @@ dependencies = [
|
||||||
"rustix 0.38.44",
|
"rustix 0.38.44",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "winapi"
|
||||||
|
version = "0.3.9"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
|
||||||
|
dependencies = [
|
||||||
|
"winapi-i686-pc-windows-gnu",
|
||||||
|
"winapi-x86_64-pc-windows-gnu",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "winapi-i686-pc-windows-gnu"
|
||||||
|
version = "0.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "winapi-x86_64-pc-windows-gnu"
|
||||||
|
version = "0.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows-link"
|
name = "windows-link"
|
||||||
version = "0.1.1"
|
version = "0.1.1"
|
||||||
|
|
|
@ -10,9 +10,7 @@ aws-config = { version = "1.6.1", features = ["behavior-version-latest"] }
|
||||||
aws-sdk-s3 = "1.82.0"
|
aws-sdk-s3 = "1.82.0"
|
||||||
clap = { version = "4.5.34", features = ["derive"] }
|
clap = { version = "4.5.34", features = ["derive"] }
|
||||||
ed25519-dalek = "2.1.1"
|
ed25519-dalek = "2.1.1"
|
||||||
env_logger = "0.11.7"
|
|
||||||
futures = "0.3.31"
|
futures = "0.3.31"
|
||||||
log = "0.4.27"
|
|
||||||
nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" }
|
nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" }
|
||||||
regex = "1.11.1"
|
regex = "1.11.1"
|
||||||
reqwest = "0.12.15"
|
reqwest = "0.12.15"
|
||||||
|
@ -20,4 +18,6 @@ serde = { version = "1.0.219", features = [ "derive" ]}
|
||||||
serde_json = "1.0.140"
|
serde_json = "1.0.140"
|
||||||
sha2 = "0.10.8"
|
sha2 = "0.10.8"
|
||||||
tokio = { version = "1.44.1", features = [ "full" ]}
|
tokio = { version = "1.44.1", features = [ "full" ]}
|
||||||
|
tracing = "0.1.41"
|
||||||
|
tracing-subscriber = { version = "0.3.19", features = ["env-filter"]}
|
||||||
url = { version = "2.5.4", features = [ "serde" ]}
|
url = { version = "2.5.4", features = [ "serde" ]}
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
#![feature(let_chains)]
|
#![feature(let_chains)]
|
||||||
#![feature(extend_one)]
|
#![feature(extend_one)]
|
||||||
#![feature(array_chunks)]
|
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
|
use tracing_subscriber::{EnvFilter, FmtSubscriber};
|
||||||
|
|
||||||
use nixcp::NixCp;
|
use nixcp::NixCp;
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ struct Cli {
|
||||||
region: Option<String>,
|
region: Option<String>,
|
||||||
|
|
||||||
/// If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default
|
/// If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default
|
||||||
/// e.g. s3.example.com
|
/// e.g. https://s3.example.com
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
endpoint: Option<String>,
|
endpoint: Option<String>,
|
||||||
|
|
||||||
|
@ -58,7 +58,10 @@ enum Commands {
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
env_logger::init();
|
let filter = EnvFilter::from_default_env();
|
||||||
|
let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish();
|
||||||
|
tracing::subscriber::set_global_default(subscriber)?;
|
||||||
|
|
||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
let nixcp = Box::leak(Box::new(NixCp::new(&cli).await?));
|
let nixcp = Box::leak(Box::new(NixCp::new(&cli).await?));
|
||||||
|
|
||||||
|
|
56
src/nixcp.rs
56
src/nixcp.rs
|
@ -2,7 +2,7 @@ use std::{
|
||||||
fs,
|
fs,
|
||||||
iter::once,
|
iter::once,
|
||||||
sync::{
|
sync::{
|
||||||
Arc, Mutex,
|
Arc,
|
||||||
atomic::{AtomicUsize, Ordering},
|
atomic::{AtomicUsize, Ordering},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
@ -11,9 +11,9 @@ use anyhow::{Context, Result};
|
||||||
use aws_config::Region;
|
use aws_config::Region;
|
||||||
use aws_sdk_s3 as s3;
|
use aws_sdk_s3 as s3;
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use log::{debug, info, warn};
|
|
||||||
use nix_compat::narinfo::{self, SigningKey};
|
use nix_compat::narinfo::{self, SigningKey};
|
||||||
use tokio::sync::{RwLock, Semaphore, mpsc};
|
use tokio::sync::{RwLock, Semaphore, mpsc};
|
||||||
|
use tracing::{debug, info, trace};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
use crate::{Cli, path_info::PathInfo, uploader::Uploader};
|
use crate::{Cli, path_info::PathInfo, uploader::Uploader};
|
||||||
|
@ -24,6 +24,10 @@ pub struct NixCp {
|
||||||
s3_client: s3::Client,
|
s3_client: s3::Client,
|
||||||
signing_key: SigningKey<ed25519_dalek::SigningKey>,
|
signing_key: SigningKey<ed25519_dalek::SigningKey>,
|
||||||
bucket: String,
|
bucket: String,
|
||||||
|
// paths that we skipped cause of a signature match
|
||||||
|
signature_hit_count: AtomicUsize,
|
||||||
|
// paths that we skipped cause we found it on an upstream
|
||||||
|
upstream_hit_count: AtomicUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NixCp {
|
impl NixCp {
|
||||||
|
@ -59,6 +63,8 @@ impl NixCp {
|
||||||
s3_client,
|
s3_client,
|
||||||
signing_key,
|
signing_key,
|
||||||
bucket: cli.bucket.clone(),
|
bucket: cli.bucket.clone(),
|
||||||
|
signature_hit_count: AtomicUsize::new(0),
|
||||||
|
upstream_hit_count: AtomicUsize::new(0),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,18 +87,23 @@ impl NixCp {
|
||||||
pub async fn run(&'static self) -> Result<()> {
|
pub async fn run(&'static self) -> Result<()> {
|
||||||
let (tx, rx) = mpsc::channel(10);
|
let (tx, rx) = mpsc::channel(10);
|
||||||
let tx = Arc::new(tx);
|
let tx = Arc::new(tx);
|
||||||
tokio::spawn(self.filter_from_upstream(tx));
|
let filter = tokio::spawn(self.filter_from_upstream(tx));
|
||||||
self.upload(rx).await
|
let upload = tokio::spawn(self.upload(rx));
|
||||||
|
filter.await?;
|
||||||
|
upload.await??;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// filter paths that are on upstream and send to `tx`
|
/// filter paths that are on upstream and send to `tx`
|
||||||
async fn filter_from_upstream(&self, tx: Arc<mpsc::Sender<PathInfo>>) {
|
async fn filter_from_upstream(&'static self, tx: Arc<mpsc::Sender<PathInfo>>) {
|
||||||
let permits = Arc::new(Semaphore::new(10));
|
let permits = Arc::new(Semaphore::new(10));
|
||||||
let mut handles = Vec::with_capacity(10);
|
let mut handles = Vec::with_capacity(10);
|
||||||
let store_paths = self.store_paths.read().await.clone();
|
let store_paths = self.store_paths.read().await.clone();
|
||||||
|
|
||||||
for path in store_paths.into_iter() {
|
for path in store_paths.into_iter() {
|
||||||
if path.check_upstream_signature(&self.upstream_caches) {
|
if path.check_upstream_signature(&self.upstream_caches) {
|
||||||
|
trace!("skip {} (signature match)", path.absolute_path());
|
||||||
|
self.signature_hit_count.fetch_add(1, Ordering::Release);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
handles.push({
|
handles.push({
|
||||||
|
@ -104,19 +115,23 @@ impl NixCp {
|
||||||
|
|
||||||
if !path.check_upstream_hit(upstream_caches.as_slice()).await {
|
if !path.check_upstream_hit(upstream_caches.as_slice()).await {
|
||||||
tx.send(path).await.unwrap();
|
tx.send(path).await.unwrap();
|
||||||
|
} else {
|
||||||
|
trace!("skip {} (upstream hit)", path.absolute_path());
|
||||||
|
self.upstream_hit_count.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
for handle in handles {
|
join_all(handles)
|
||||||
handle.await.unwrap();
|
.await
|
||||||
}
|
.into_iter()
|
||||||
|
.collect::<std::result::Result<(), _>>()
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||||
let upload_count = AtomicUsize::new(0);
|
let upload_count = AtomicUsize::new(0);
|
||||||
let failures: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
let permits = Arc::new(Semaphore::new(10));
|
let permits = Arc::new(Semaphore::new(10));
|
||||||
let mut uploads = Vec::with_capacity(10);
|
let mut uploads = Vec::with_capacity(10);
|
||||||
|
|
||||||
|
@ -139,16 +154,21 @@ impl NixCp {
|
||||||
});
|
});
|
||||||
uploads.push(fut);
|
uploads.push(fut);
|
||||||
} else {
|
} else {
|
||||||
join_all(uploads).await;
|
join_all(uploads)
|
||||||
println!("uploaded {} paths", upload_count.load(Ordering::Relaxed));
|
.await
|
||||||
|
.into_iter()
|
||||||
|
.flatten()
|
||||||
|
.collect::<Result<Vec<_>>>()?;
|
||||||
|
|
||||||
let failures = failures.lock().unwrap();
|
println!("uploaded: {}", upload_count.load(Ordering::Relaxed));
|
||||||
if !failures.is_empty() {
|
println!(
|
||||||
warn!("failed to upload these paths: ");
|
"skipped because of signature match: {}",
|
||||||
for failure in failures.iter() {
|
self.signature_hit_count.load(Ordering::Relaxed)
|
||||||
warn!("{}", failure);
|
);
|
||||||
}
|
println!(
|
||||||
}
|
"skipped because of upstream hit: {}",
|
||||||
|
self.upstream_hit_count.load(Ordering::Relaxed)
|
||||||
|
);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,12 @@
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
use anyhow::{Context, Error, Result};
|
use anyhow::{Context, Error, Result};
|
||||||
use log::{debug, error, trace};
|
|
||||||
use nix_compat::nixhash::CAHash;
|
|
||||||
use nix_compat::store_path::StorePath;
|
use nix_compat::store_path::StorePath;
|
||||||
|
use nix_compat::{nixbase32, nixhash::CAHash};
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::process::Command;
|
use tokio::process::Command;
|
||||||
|
use tracing::{debug, error, trace};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
// nix path-info --derivation --json
|
// nix path-info --derivation --json
|
||||||
|
@ -97,12 +97,9 @@ impl PathInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
|
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
|
||||||
let hash =
|
|
||||||
String::from_utf8(self.path.digest().to_vec()).expect("should be a valid string");
|
|
||||||
|
|
||||||
for upstream in upstreams {
|
for upstream in upstreams {
|
||||||
let upstream = upstream
|
let upstream = upstream
|
||||||
.join(format!("{hash}/.narinfo").as_str())
|
.join(format!("{}/.narinfo", self.digest()).as_str())
|
||||||
.expect("adding <hash>.narinfo should make a valid url");
|
.expect("adding <hash>.narinfo should make a valid url");
|
||||||
let res_status = reqwest::Client::new()
|
let res_status = reqwest::Client::new()
|
||||||
.head(upstream.as_str())
|
.head(upstream.as_str())
|
||||||
|
@ -121,8 +118,8 @@ impl PathInfo {
|
||||||
self.path.to_absolute_path()
|
self.path.to_absolute_path()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn digest(&self) -> &str {
|
pub fn digest(&self) -> String {
|
||||||
str::from_utf8(self.path.digest()).expect("digest should be valid string")
|
nixbase32::encode(self.path.digest())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ use aws_sdk_s3::{
|
||||||
types::{CompletedMultipartUpload, CompletedPart},
|
types::{CompletedMultipartUpload, CompletedPart},
|
||||||
};
|
};
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use log::debug;
|
|
||||||
use nix_compat::{
|
use nix_compat::{
|
||||||
narinfo::{self, NarInfo, SigningKey},
|
narinfo::{self, NarInfo, SigningKey},
|
||||||
nixbase32,
|
nixbase32,
|
||||||
|
@ -13,6 +12,7 @@ use nix_compat::{
|
||||||
};
|
};
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
use tokio::{io::AsyncReadExt, process::Command};
|
use tokio::{io::AsyncReadExt, process::Command};
|
||||||
|
use tracing::debug;
|
||||||
|
|
||||||
use crate::path_info::PathInfo;
|
use crate::path_info::PathInfo;
|
||||||
|
|
||||||
|
@ -46,11 +46,14 @@ impl<'a> Uploader<'a> {
|
||||||
let nar = self.compress_nar(&nar).await;
|
let nar = self.compress_nar(&nar).await;
|
||||||
|
|
||||||
// update fields that we know after compression
|
// update fields that we know after compression
|
||||||
nar_info.file_size = Some(nar.len() as u64);
|
|
||||||
let mut hasher = Sha256::new();
|
let mut hasher = Sha256::new();
|
||||||
hasher.update(&nar);
|
hasher.update(&nar);
|
||||||
nar_info.file_hash = Some(hasher.finalize().into());
|
let hash: [u8; 32] = hasher.finalize().into();
|
||||||
let nar_url = self.nar_url(&nar);
|
let nar_url = self.nar_url(&hash);
|
||||||
|
nar_info.file_hash = Some(hash);
|
||||||
|
nar_info.file_size = Some(nar.len() as u64);
|
||||||
|
nar_info.url = nar_url.as_str();
|
||||||
|
debug!("uploading nar with key: {nar_url}");
|
||||||
|
|
||||||
if nar.len() < MULTIPART_CUTOFF {
|
if nar.len() < MULTIPART_CUTOFF {
|
||||||
let put_object = self
|
let put_object = self
|
||||||
|
@ -73,7 +76,7 @@ impl<'a> Uploader<'a> {
|
||||||
let upload_id = multipart.upload_id().unwrap();
|
let upload_id = multipart.upload_id().unwrap();
|
||||||
|
|
||||||
let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF);
|
let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF);
|
||||||
let chunks = nar.array_chunks::<MULTIPART_CUTOFF>();
|
let chunks = nar.chunks(MULTIPART_CUTOFF);
|
||||||
for (i, chunk) in chunks.enumerate() {
|
for (i, chunk) in chunks.enumerate() {
|
||||||
parts.push(tokio::task::spawn(
|
parts.push(tokio::task::spawn(
|
||||||
self.s3_client
|
self.s3_client
|
||||||
|
@ -120,12 +123,12 @@ impl<'a> Uploader<'a> {
|
||||||
debug!("complete multipart upload: {:#?}", complete_mp_upload);
|
debug!("complete multipart upload: {:#?}", complete_mp_upload);
|
||||||
}
|
}
|
||||||
|
|
||||||
nar_info.add_signature(self.signing_key);
|
let narinfo_url = format!("{}.narinfo", self.path.digest());
|
||||||
|
debug!("uploading narinfo with key {narinfo_url}");
|
||||||
self.s3_client
|
self.s3_client
|
||||||
.put_object()
|
.put_object()
|
||||||
.bucket(&self.bucket)
|
.bucket(&self.bucket)
|
||||||
.key(format!("{}.narinfo", self.path.digest()))
|
.key(narinfo_url)
|
||||||
.body(nar_info.to_string().as_bytes().to_vec().into())
|
.body(nar_info.to_string().as_bytes().to_vec().into())
|
||||||
.send()
|
.send()
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -147,7 +150,7 @@ impl<'a> Uploader<'a> {
|
||||||
let mut hasher = Sha256::new();
|
let mut hasher = Sha256::new();
|
||||||
hasher.update(nar);
|
hasher.update(nar);
|
||||||
let nar_hash: [u8; 32] = hasher.finalize().into();
|
let nar_hash: [u8; 32] = hasher.finalize().into();
|
||||||
let nar_info = NarInfo {
|
let mut nar_info = NarInfo {
|
||||||
flags: narinfo::Flags::empty(),
|
flags: narinfo::Flags::empty(),
|
||||||
store_path: self.path.path.as_ref(),
|
store_path: self.path.path.as_ref(),
|
||||||
nar_hash,
|
nar_hash,
|
||||||
|
@ -162,6 +165,8 @@ impl<'a> Uploader<'a> {
|
||||||
file_size: None,
|
file_size: None,
|
||||||
url: "",
|
url: "",
|
||||||
};
|
};
|
||||||
|
// signature consists of: store_path, nar_hash, nar_size, and references
|
||||||
|
nar_info.add_signature(self.signing_key);
|
||||||
Ok(nar_info)
|
Ok(nar_info)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue