Merge branch '2025-04-12'
This commit is contained in:
commit
2f7ae745f5
9 changed files with 2556 additions and 360 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1,2 +1,3 @@
|
||||||
/target
|
/target
|
||||||
.direnv
|
.direnv
|
||||||
|
result
|
||||||
|
|
1938
Cargo.lock
generated
1938
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
14
Cargo.toml
14
Cargo.toml
|
@ -4,10 +4,20 @@ version = "0.1.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
anyhow = "1.0.97"
|
||||||
|
async-compression = { version = "0.4.22", features = ["tokio", "zstd"] }
|
||||||
|
aws-config = { version = "1.6.1", features = ["behavior-version-latest"] }
|
||||||
|
aws-sdk-s3 = "1.82.0"
|
||||||
clap = { version = "4.5.34", features = ["derive"] }
|
clap = { version = "4.5.34", features = ["derive"] }
|
||||||
env_logger = "0.11.7"
|
ed25519-dalek = "2.1.1"
|
||||||
log = "0.4.27"
|
futures = "0.3.31"
|
||||||
|
nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" }
|
||||||
|
regex = "1.11.1"
|
||||||
reqwest = "0.12.15"
|
reqwest = "0.12.15"
|
||||||
serde = { version = "1.0.219", features = [ "derive" ]}
|
serde = { version = "1.0.219", features = [ "derive" ]}
|
||||||
serde_json = "1.0.140"
|
serde_json = "1.0.140"
|
||||||
|
sha2 = "0.10.8"
|
||||||
tokio = { version = "1.44.1", features = [ "full" ]}
|
tokio = { version = "1.44.1", features = [ "full" ]}
|
||||||
|
tracing = "0.1.41"
|
||||||
|
tracing-subscriber = { version = "0.3.19", features = ["env-filter"]}
|
||||||
|
url = { version = "2.5.4", features = [ "serde" ]}
|
||||||
|
|
50
README.md
50
README.md
|
@ -1,24 +1,38 @@
|
||||||
Runs `nix copy` under the hood but only uploads paths that don't exist in upstream caches. It's async so may also be somewhat faster. Unlike `nix copy`, we also upload build dependencies. You may also pass the `--recursive` flag to absolutely not miss anything (be warned though, it queues up a lot of paths to check against upstream caches (also idk why you'd ever want to use this honestly)). Specify upstream caches to check against with `--upstream-cache` (can be specified multiple times, `cache.nixos.org` is always included).
|
Uploads stuff to your s3 binary cache, but skip stuff that exist on upstream caches to save you space and time. Unlike `nix copy`, we also upload build dependencies meaning you just say the package or store path and we figure out the rest. Specify upstream caches to check against with `-u` (can be specified multiple times, `cache.nixos.org` is always included).
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
Example:
|
||||||
|
```
|
||||||
|
nixcp --bucket nixcache --signing-key ~/cache-priv-key.pem --endpoint https://s3.cy7.sh -u https://nix-community.cachix.org push github:cything/nixcp/2025-04-12
|
||||||
|
```
|
||||||
|
The signing key is generated with:
|
||||||
|
```
|
||||||
|
nix-store --generate-binary-cache-key nixcache.cy7.sh cache-priv-key.pem cache-pub-key.pem
|
||||||
|
```
|
||||||
|
|
||||||
|
`AWS_ACCESS_KEY_ID` and `AWS_ENDPOINT_URL` environment variables should be set with your s3 credentials.
|
||||||
|
|
||||||
```
|
```
|
||||||
Usage: nixcp [OPTIONS] --to <BINARY CACHE> <PACKAGE>
|
Usage: nixcp [OPTIONS] --bucket <bucket name> --signing-key <SIGNING_KEY> <COMMAND>
|
||||||
|
|
||||||
Arguments:
|
Commands:
|
||||||
<PACKAGE> Package to upload to the binary cache
|
push
|
||||||
|
help Print this message or the help of the given subcommand(s)
|
||||||
|
|
||||||
Options:
|
Options:
|
||||||
--to <BINARY CACHE>
|
--bucket <bucket name>
|
||||||
Address of the binary cache (passed to nix copy --to)
|
The s3 bucket to upload to
|
||||||
-u, --upstream-cache <UPSTREAM_CACHE>
|
-u, --upstream <nixcache.example.com>
|
||||||
Upstream cache to check against. Can be specified multiple times. cache.nixos.org is always included
|
Upstream cache to check against. Can be specified multiple times. cache.nixos.org is always included
|
||||||
-r, --recursive
|
--signing-key <SIGNING_KEY>
|
||||||
Whether to pass --recursive to nix path-info. Can queue a huge number of paths to upload
|
Path to the file containing signing key e.g. ~/cache-priv-key.pem
|
||||||
--upstream-checker-concurrency <UPSTREAM_CHECKER_CONCURRENCY>
|
--region <REGION>
|
||||||
Concurrent upstream cache checkers [default: 32]
|
If unspecified, will get it form AWS_DEFAULT_REGION envar or the AWS default
|
||||||
--uploader-concurrency <UPLOADER_CONCURRENCY>
|
--endpoint <ENDPOINT>
|
||||||
Concurrent uploaders [default: 16]
|
If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default e.g. https://s3.example.com
|
||||||
--nix-store-concurrency <NIX_STORE_CONCURRENCY>
|
--profile <PROFILE>
|
||||||
Concurrent nix-store commands to run [default: 32]
|
AWS profile to use
|
||||||
-h, --help
|
-h, --help
|
||||||
Print help
|
Print help
|
||||||
-V, --version
|
-V, --version
|
||||||
|
@ -27,13 +41,13 @@ Options:
|
||||||
|
|
||||||
## Install with nix
|
## Install with nix
|
||||||
```
|
```
|
||||||
nix profile install git+https://git.cy7.sh/cy/nixcp.git
|
nix profile install github:cything/nixcp
|
||||||
```
|
```
|
||||||
Or run without installing:
|
Or run without installing:
|
||||||
```
|
```
|
||||||
nix run git+https://git.cy7.sh/cy/nixcp.git
|
nix run github:cything/nixcp
|
||||||
```
|
```
|
||||||
Separate arguments with `--` to pass them through to `nixcp` like so:
|
Separate arguments with `--` to pass them through to `nixcp` like so:
|
||||||
```
|
```
|
||||||
nix run git+https://git.cy7.sh/cy/nixcp.git -- --help
|
nix run github:cything/nixcp -- --help
|
||||||
```
|
```
|
2
src/cli.rs
Normal file
2
src/cli.rs
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
|
||||||
|
|
313
src/main.rs
313
src/main.rs
|
@ -1,271 +1,88 @@
|
||||||
#![feature(let_chains)]
|
#![feature(let_chains)]
|
||||||
|
#![feature(extend_one)]
|
||||||
|
|
||||||
use std::path::Path;
|
use anyhow::{Context, Result};
|
||||||
use std::sync::{
|
use clap::{Args, Parser, Subcommand};
|
||||||
Arc, Mutex, RwLock,
|
use tracing_subscriber::{EnvFilter, FmtSubscriber};
|
||||||
atomic::{AtomicUsize, Ordering},
|
|
||||||
};
|
|
||||||
|
|
||||||
use clap::Parser;
|
use push::Push;
|
||||||
use log::{debug, trace};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use tokio::process::Command;
|
|
||||||
use tokio::sync::{Semaphore, mpsc};
|
|
||||||
|
|
||||||
const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"];
|
mod cli;
|
||||||
|
mod path_info;
|
||||||
|
mod push;
|
||||||
|
mod uploader;
|
||||||
|
|
||||||
// nix path-info --derivation --json
|
#[derive(Parser, Debug)]
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[command(version)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[command(name = "nixcp")]
|
||||||
struct PathInfo {
|
#[command(about = "Upload store paths to a s3 binary cache")]
|
||||||
ca: String,
|
#[command(long_about = None)]
|
||||||
nar_hash: String,
|
|
||||||
nar_size: u32,
|
|
||||||
path: String,
|
|
||||||
references: Vec<String>,
|
|
||||||
registration_time: u32,
|
|
||||||
valid: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PathInfo {
|
|
||||||
// find derivations related to package
|
|
||||||
async fn from_package(package: &str, recursive: bool) -> Vec<Self> {
|
|
||||||
let mut args = vec!["path-info", "--derivation", "--json"];
|
|
||||||
if recursive {
|
|
||||||
args.push("--recursive");
|
|
||||||
}
|
|
||||||
let path_infos = Command::new("nix")
|
|
||||||
.args(args)
|
|
||||||
.arg(package)
|
|
||||||
.output()
|
|
||||||
.await
|
|
||||||
.expect("path-info failed");
|
|
||||||
|
|
||||||
let path_infos: Vec<PathInfo> = serde_json::from_slice(&path_infos.stdout)
|
|
||||||
.expect("no derivations found for this package");
|
|
||||||
debug!("PathInfo's from nix path-info: {:#?}", path_infos);
|
|
||||||
path_infos
|
|
||||||
}
|
|
||||||
|
|
||||||
// find store paths related to derivation
|
|
||||||
async fn get_store_paths(&self) -> Vec<String> {
|
|
||||||
let mut store_paths: Vec<String> = Vec::new();
|
|
||||||
let nix_store_cmd = Command::new("nix-store")
|
|
||||||
.arg("--query")
|
|
||||||
.arg("--requisites")
|
|
||||||
.arg("--include-outputs")
|
|
||||||
.arg(&self.path)
|
|
||||||
.output()
|
|
||||||
.await
|
|
||||||
.expect("nix-store cmd failed");
|
|
||||||
|
|
||||||
let nix_store_out = String::from_utf8(nix_store_cmd.stdout).unwrap();
|
|
||||||
for store_path in nix_store_out.split_whitespace().map(ToString::to_string) {
|
|
||||||
store_paths.push(store_path);
|
|
||||||
}
|
|
||||||
store_paths
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Parser)]
|
|
||||||
#[command(version, about, long_about = None)]
|
|
||||||
struct Cli {
|
struct Cli {
|
||||||
/// Package to upload to the binary cache
|
#[command(subcommand)]
|
||||||
package: String,
|
command: Commands,
|
||||||
|
}
|
||||||
|
|
||||||
/// Address of the binary cache (passed to nix copy --to)
|
#[derive(Debug, Subcommand)]
|
||||||
#[arg(long, value_name = "BINARY CACHE")]
|
enum Commands {
|
||||||
to: String,
|
#[command(arg_required_else_help = true)]
|
||||||
|
Push(PushArgs),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Args)]
|
||||||
|
pub struct PushArgs {
|
||||||
|
/// The s3 bucket to upload to
|
||||||
|
#[arg(long, value_name = "bucket name")]
|
||||||
|
bucket: String,
|
||||||
|
|
||||||
/// Upstream cache to check against. Can be specified multiple times.
|
/// Upstream cache to check against. Can be specified multiple times.
|
||||||
/// cache.nixos.org is always included
|
/// cache.nixos.org is always included.
|
||||||
#[arg(long, short)]
|
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
|
||||||
upstream_cache: Vec<String>,
|
upstreams: Vec<String>,
|
||||||
|
|
||||||
/// Whether to pass --recursive to nix path-info. Can queue a huge number of paths to upload
|
/// Path to the file containing signing key
|
||||||
#[arg(long, short, default_value_t = false)]
|
/// e.g. ~/cache-priv-key.pem
|
||||||
recursive: bool,
|
#[arg(long)]
|
||||||
|
signing_key: String,
|
||||||
|
|
||||||
/// Concurrent upstream cache checkers
|
/// If unspecified, will get it form AWS_DEFAULT_REGION envar
|
||||||
#[arg(long, default_value_t = 32)]
|
#[arg(long)]
|
||||||
upstream_checker_concurrency: u8,
|
region: Option<String>,
|
||||||
|
|
||||||
/// Concurrent uploaders
|
/// If unspecifed, will get it from AWS_ENDPOINT_URL envar
|
||||||
#[arg(long, default_value_t = 16)]
|
/// e.g. https://s3.example.com
|
||||||
uploader_concurrency: u8,
|
#[arg(long)]
|
||||||
|
endpoint: Option<String>,
|
||||||
|
|
||||||
/// Concurrent nix-store commands to run
|
/// AWS profile to use
|
||||||
#[arg(long, default_value_t = 32)]
|
#[arg(long)]
|
||||||
nix_store_concurrency: u8,
|
profile: Option<String>,
|
||||||
|
|
||||||
|
#[arg(long)]
|
||||||
|
skip_signature_check: bool,
|
||||||
|
|
||||||
|
/// Package or store path to upload
|
||||||
|
/// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||||
|
#[arg(value_name = "package or store path")]
|
||||||
|
package: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() -> Result<()> {
|
||||||
env_logger::init();
|
let filter = EnvFilter::from_default_env();
|
||||||
|
let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish();
|
||||||
|
tracing::subscriber::set_global_default(subscriber)?;
|
||||||
|
|
||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
let package = &cli.package;
|
|
||||||
let binary_cache = cli.to;
|
|
||||||
let mut upstream_caches = cli.upstream_cache;
|
|
||||||
for upstream in UPSTREAM_CACHES {
|
|
||||||
upstream_caches.push(upstream.to_string());
|
|
||||||
}
|
|
||||||
debug!("package: {}", package);
|
|
||||||
debug!("binary cache: {}", binary_cache);
|
|
||||||
debug!("upstream caches: {:#?}", upstream_caches);
|
|
||||||
|
|
||||||
println!("querying nix path-info");
|
match &cli.command {
|
||||||
let derivations = PathInfo::from_package(package, cli.recursive).await;
|
Commands::Push(cli) => {
|
||||||
println!("got {} derivations", derivations.len());
|
let push = Box::leak(Box::new(Push::new(cli).await?));
|
||||||
|
push.paths_from_package(&cli.package)
|
||||||
println!("querying nix-store");
|
|
||||||
let mut handles = Vec::new();
|
|
||||||
let concurrency = Arc::new(Semaphore::new(cli.nix_store_concurrency.into()));
|
|
||||||
let store_paths = Arc::new(RwLock::new(Vec::new()));
|
|
||||||
|
|
||||||
for derivation in derivations {
|
|
||||||
let store_paths = Arc::clone(&store_paths);
|
|
||||||
let permit = Arc::clone(&concurrency);
|
|
||||||
handles.push(tokio::spawn(async move {
|
|
||||||
let _permit = permit.acquire_owned().await.unwrap();
|
|
||||||
let paths = derivation.get_store_paths().await;
|
|
||||||
store_paths.write().unwrap().extend(paths);
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
// resolve store paths for all derivations before we move on
|
|
||||||
for handle in handles {
|
|
||||||
handle.await.unwrap();
|
|
||||||
}
|
|
||||||
println!("got {} store paths", store_paths.read().unwrap().len());
|
|
||||||
|
|
||||||
let (cacheable_tx, mut cacheable_rx) = mpsc::channel(cli.uploader_concurrency.into());
|
|
||||||
|
|
||||||
println!("spawning check_upstream");
|
|
||||||
handles = Vec::new();
|
|
||||||
handles.push(tokio::spawn(async move {
|
|
||||||
check_upstream(
|
|
||||||
store_paths,
|
|
||||||
cacheable_tx,
|
|
||||||
cli.upstream_checker_concurrency,
|
|
||||||
Arc::new(upstream_caches),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}));
|
|
||||||
|
|
||||||
println!("spawning uploader");
|
|
||||||
handles.push(tokio::spawn(async move {
|
|
||||||
uploader(&mut cacheable_rx, binary_cache, cli.uploader_concurrency).await;
|
|
||||||
}));
|
|
||||||
|
|
||||||
// make sure all threads are done
|
|
||||||
for handle in handles {
|
|
||||||
handle.await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// filter out store paths that exist in upstream caches
|
|
||||||
async fn check_upstream(
|
|
||||||
store_paths: Arc<RwLock<Vec<String>>>,
|
|
||||||
cacheable_tx: mpsc::Sender<String>,
|
|
||||||
concurrency: u8,
|
|
||||||
upstream_caches: Arc<Vec<String>>,
|
|
||||||
) {
|
|
||||||
let concurrency = Arc::new(Semaphore::new(concurrency.into()));
|
|
||||||
let c_store_paths = Arc::clone(&store_paths);
|
|
||||||
let store_paths = c_store_paths.read().unwrap().clone();
|
|
||||||
|
|
||||||
for store_path in store_paths {
|
|
||||||
let tx = cacheable_tx.clone();
|
|
||||||
let upstream_caches = Arc::clone(&upstream_caches);
|
|
||||||
let concurrency = Arc::clone(&concurrency);
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let _permit = concurrency.acquire().await.unwrap();
|
|
||||||
let basename = Path::new(&store_path)
|
|
||||||
.file_name()
|
|
||||||
.unwrap()
|
|
||||||
.to_str()
|
|
||||||
.unwrap()
|
|
||||||
.to_string();
|
|
||||||
let hash = basename.split("-").next().unwrap();
|
|
||||||
|
|
||||||
let mut hit = false;
|
|
||||||
for upstream in upstream_caches.as_ref() {
|
|
||||||
let mut uri = upstream.clone();
|
|
||||||
uri.push_str(format!("/{}.narinfo", hash).as_str());
|
|
||||||
|
|
||||||
let res_status = reqwest::Client::new()
|
|
||||||
.head(uri)
|
|
||||||
.send()
|
|
||||||
.await
|
.await
|
||||||
.map(|x| x.status());
|
.context("nixcp get paths from package")?;
|
||||||
|
push.run().await.context("nixcp run")?;
|
||||||
if let Ok(res_status) = res_status
|
|
||||||
&& res_status.is_success()
|
|
||||||
{
|
|
||||||
debug!("{} was a hit upstream: {}", store_path, upstream);
|
|
||||||
hit = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !hit {
|
|
||||||
trace!("sending {}", store_path);
|
|
||||||
tx.send(store_path).await.unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn uploader(
|
Ok(())
|
||||||
cacheable_rx: &mut mpsc::Receiver<String>,
|
|
||||||
binary_cache: String,
|
|
||||||
concurrency: u8,
|
|
||||||
) {
|
|
||||||
let upload_count = Arc::new(AtomicUsize::new(0));
|
|
||||||
let failures: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
|
|
||||||
let concurrency = Arc::new(Semaphore::new(concurrency.into()));
|
|
||||||
let mut handles = Vec::new();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
if let Some(path_to_upload) = cacheable_rx.recv().await {
|
|
||||||
let concurrency = Arc::clone(&concurrency);
|
|
||||||
let failures = Arc::clone(&failures);
|
|
||||||
let binary_cache = binary_cache.clone();
|
|
||||||
let upload_count = Arc::clone(&upload_count);
|
|
||||||
|
|
||||||
handles.push(tokio::spawn(async move {
|
|
||||||
let _permit = concurrency.acquire().await.unwrap();
|
|
||||||
println!("uploading: {}", path_to_upload);
|
|
||||||
if Command::new("nix")
|
|
||||||
.arg("copy")
|
|
||||||
.arg("--to")
|
|
||||||
.arg(&binary_cache)
|
|
||||||
.arg(&path_to_upload)
|
|
||||||
.output()
|
|
||||||
.await
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
println!("WARN: upload failed: {}", path_to_upload);
|
|
||||||
failures.lock().unwrap().push(path_to_upload);
|
|
||||||
} else {
|
|
||||||
upload_count.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
} else {
|
|
||||||
// make sure all threads are done
|
|
||||||
for handle in handles {
|
|
||||||
handle.await.unwrap();
|
|
||||||
}
|
|
||||||
println!("uploaded {} paths", upload_count.load(Ordering::Relaxed));
|
|
||||||
|
|
||||||
let failures = failures.lock().unwrap();
|
|
||||||
if !failures.is_empty() {
|
|
||||||
println!("failed to upload these paths: ");
|
|
||||||
for failure in failures.iter() {
|
|
||||||
print!("{}", failure);
|
|
||||||
}
|
|
||||||
println!();
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
213
src/path_info.rs
Normal file
213
src/path_info.rs
Normal file
|
@ -0,0 +1,213 @@
|
||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
|
use anyhow::{Context, Error, Result};
|
||||||
|
use aws_sdk_s3 as s3;
|
||||||
|
use nix_compat::nixbase32;
|
||||||
|
use nix_compat::store_path::StorePath;
|
||||||
|
use regex::Regex;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use tokio::process::Command;
|
||||||
|
use tracing::{debug, error, trace};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
// nix path-info --derivation --json
|
||||||
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
pub struct PathInfo {
|
||||||
|
pub deriver: Option<StorePath<String>>,
|
||||||
|
pub path: StorePath<String>,
|
||||||
|
signatures: Option<Vec<String>>,
|
||||||
|
pub references: Vec<StorePath<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PathInfo {
|
||||||
|
// get PathInfo for a package or a store path
|
||||||
|
// we deserialize this as an array of `PathInfo` below
|
||||||
|
pub async fn from_path(path: &str) -> Result<Self> {
|
||||||
|
debug!("query nix path-info for {path}");
|
||||||
|
// use lix cause nix would return a json map instead of an array
|
||||||
|
let nix_cmd = Command::new("nix")
|
||||||
|
.arg("run")
|
||||||
|
.arg("github:nixos/nixpkgs/nixos-unstable#lix")
|
||||||
|
.arg("--")
|
||||||
|
.arg("path-info")
|
||||||
|
.arg("--json")
|
||||||
|
.arg(path)
|
||||||
|
.output()
|
||||||
|
.await
|
||||||
|
.context("`nix path-info` failed for {package}")?;
|
||||||
|
|
||||||
|
trace!(
|
||||||
|
"nix path-info output: {}",
|
||||||
|
String::from_utf8_lossy(&nix_cmd.stdout)
|
||||||
|
);
|
||||||
|
|
||||||
|
// nix path-info returns an array with one element
|
||||||
|
match serde_json::from_slice::<Vec<_>>(&nix_cmd.stdout)
|
||||||
|
.context("parse path info from stdout")
|
||||||
|
{
|
||||||
|
Ok(path_info) => path_info
|
||||||
|
.into_iter()
|
||||||
|
.next()
|
||||||
|
.ok_or_else(|| Error::msg("nix path-info returned empty")),
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
"Failed to parse data from `nix path-info`. The path may not exist on your system."
|
||||||
|
);
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_closure(&self) -> Result<Vec<Self>> {
|
||||||
|
debug!("query nix-store for {}", self.absolute_path());
|
||||||
|
let nix_store_cmd = Command::new("nix-store")
|
||||||
|
.arg("--query")
|
||||||
|
.arg("--requisites")
|
||||||
|
.arg("--include-outputs")
|
||||||
|
.arg(self.absolute_path())
|
||||||
|
.output()
|
||||||
|
.await
|
||||||
|
.expect("nix-store cmd failed");
|
||||||
|
|
||||||
|
let nix_store_paths = String::from_utf8(nix_store_cmd.stdout)?;
|
||||||
|
let nix_store_paths: Vec<&str> = nix_store_paths.lines().collect();
|
||||||
|
let mut closure = Vec::with_capacity(nix_store_paths.len());
|
||||||
|
for path in nix_store_paths {
|
||||||
|
closure.push(Self::from_path(path).await?);
|
||||||
|
}
|
||||||
|
Ok(closure)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// checks if the path is signed by any upstream. if it is, we assume a cache hit.
|
||||||
|
/// the name of the cache in the signature does not have to be the domain of the cache.
|
||||||
|
/// in fact, it can be any random string. but, most often it is, and this saves us
|
||||||
|
/// a request.
|
||||||
|
pub fn check_upstream_signature(&self, upstreams: &[Url]) -> bool {
|
||||||
|
let upstreams: HashSet<_> = upstreams.iter().filter_map(|x| x.domain()).collect();
|
||||||
|
|
||||||
|
// some caches use names prefixed with -<some number>
|
||||||
|
// e.g. cache.nixos.org-1, nix-community.cachix.org-1
|
||||||
|
let re = Regex::new(r"-\d+$").expect("regex should be valid");
|
||||||
|
for signee in self.signees().iter().map(|&x| re.replace(x, "")) {
|
||||||
|
if upstreams.contains(signee.as_ref()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
fn signees(&self) -> Vec<&str> {
|
||||||
|
if let Some(signatures) = self.signatures.as_ref() {
|
||||||
|
let signees: Vec<_> = signatures
|
||||||
|
.iter()
|
||||||
|
.filter_map(|signature| Some(signature.split_once(":")?.0))
|
||||||
|
.collect();
|
||||||
|
trace!("signees for {}: {:?}", self.path, signees);
|
||||||
|
return signees;
|
||||||
|
}
|
||||||
|
Vec::new()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
|
||||||
|
for upstream in upstreams {
|
||||||
|
let upstream = upstream
|
||||||
|
.join(format!("{}.narinfo", self.digest()).as_str())
|
||||||
|
.expect("adding <hash>.narinfo should make a valid url");
|
||||||
|
debug!("querying {}", upstream);
|
||||||
|
let res_status = reqwest::Client::new()
|
||||||
|
.head(upstream.as_str())
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map(|x| x.status());
|
||||||
|
|
||||||
|
if res_status.map(|code| code.is_success()).unwrap_or_default() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn absolute_path(&self) -> String {
|
||||||
|
self.path.to_absolute_path()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn digest(&self) -> String {
|
||||||
|
nixbase32::encode(self.path.digest())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn check_if_already_exists(&self, s3_client: &s3::Client, bucket: String) -> bool {
|
||||||
|
s3_client
|
||||||
|
.head_object()
|
||||||
|
.bucket(bucket)
|
||||||
|
.key(format!("{}.narinfo", self.digest()))
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.is_ok()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn get_signees_from_path_info() {
|
||||||
|
let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#;
|
||||||
|
let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize");
|
||||||
|
|
||||||
|
path_info.signatures = Some(vec![
|
||||||
|
"cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(),
|
||||||
|
"nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(),
|
||||||
|
]);
|
||||||
|
let signees = path_info.signees();
|
||||||
|
assert_eq!(signees, vec!["cache.nixos.org-1", "nixcache.cy7.sh"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn match_upstream_cache_from_signature() {
|
||||||
|
let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#;
|
||||||
|
let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize");
|
||||||
|
|
||||||
|
path_info.signatures = Some(vec![
|
||||||
|
"cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(),
|
||||||
|
"nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(),
|
||||||
|
"nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs=".to_string(),
|
||||||
|
]);
|
||||||
|
assert!(
|
||||||
|
path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()])
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
path_info.check_upstream_signature(&[Url::parse("https://nixcache.cy7.sh").unwrap()])
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
path_info.check_upstream_signature(&[
|
||||||
|
Url::parse("https://nix-community.cachix.org").unwrap()
|
||||||
|
])
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
!path_info
|
||||||
|
.check_upstream_signature(&[Url::parse("https://fake-cache.cachix.org").unwrap()]),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn path_info_without_signature() {
|
||||||
|
let path_info_json = r#"{"ca":"fixed:r:sha256:1q10p04pgx9sk6xbvrkn4nvh0ys2lzplgcni5368f4z3cr8ikbmz","narHash":"sha256-v64ZUWbjE4fMKNGyR++nQnsAtyV25r26mTr1dwm4IOA=","narSize":5520,"path":"/nix/store/gj6hz9mj23v01yvq1nn5f655jrcky1qq-nixos-option.nix","references":[],"registrationTime":1744740942,"valid":true}"#;
|
||||||
|
let path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize");
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()])
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
#[test]
|
||||||
|
fn path_info_deserialize_nix_map() {
|
||||||
|
let path_info_json = r#"{"/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation":{"ca":null,"deriver":"/nix/store/h8z25s6arcrns5nmrq1yhgbamywjivpn-home-manager-generation.drv","narHash":"sha256-o4qwqyJ5UVm9cyC/nBNcNYVnIM14Pewgw7fou+wUVSY=","narSize":13608,"references":["/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/4d0ix5djms3n2jnjdc58l916cwack1rp-empty-directory","/nix/store/56zmgla8443qfrkrh2ch0vz0mh8ywrw1-home-manager-files","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/80l1sb3vcmrkcdd7ihlizkcnv19rq9fj-ncurses-6.5","/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation","/nix/store/92as847i10kl6s19fi910ddyk9l83835-check-link-targets.sh","/nix/store/9c90iz95yynyh3vsc67zndch6j01vgz3-home-manager-path","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bm5fi6wj0w4r2wjll2448k307bzfcjwx-cleanup","/nix/store/c244fsb3a7i5837lzn94m4bmav9i5p9b-link","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/gpxsdrrd4x93fs75395vr2dfys1ki9mq-jq-1.7.1-bin","/nix/store/jlf743lqxbvad6dbgndsgqfg20m2np5i-sd-switch-0.5.3","/nix/store/mhmgm739aagj4x7hr6ag2wjmxhmpy8mf-gettext-0.22.5","/nix/store/w9db12j05yv5hl31s6jndd9cfm1g1gw4-hm-modules-messages","/nix/store/wj1c3gsiajabnq50ifxqnlv60i5rhqj7-diffutils-3.10","/nix/store/xhql0ilzbiqwnmz4z8y0phk611wynxf2-gnused-4.9","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zhrjg6wxrxmdlpn6iapzpp2z2vylpvw5-home-manager.sh"],"registrationTime":1744742989,"signatures":["nixcache.cy7.sh:Vq4X95kSzum7BwrBhjmmM2yVipfBI3AE3jgZ3b3RoYrP4/ghotbDdlwCvwK3qx4BQdEOLSgrC1tDwiMNb6oRBw=="],"ultimate":false}}"#;
|
||||||
|
serde_json::from_str::<HashMap<String, PathInfo>>(path_info_json).expect("must serialize");
|
||||||
|
|
||||||
|
let path_info_json = r#"{"/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b":{"ca":null,"deriver":"/nix/store/12ssi931481jlkizgfk1c1jnawvwjbhh-nixos-system-chunk-25.05.20250412.2631b0b.drv","narHash":"sha256-CHhBIzMD4v/FKqKgGroq0UC1k3GrK5lcNwQPMpv2xLc=","narSize":20704,"references":["/nix/store/0yjiyixxsr137iw93hnaacdsssy1li9h-switch-to-configuration-0.1.0","/nix/store/14rby7cpwrzjsjym44cl5h6nj6qpn1gs-etc","/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b","/nix/store/3wjljpj30fvv2cdb60apr4126pa5bm87-shadow-4.17.2","/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/5dyh8l59kfvf89zjkbmjfnx7fix93n4f-net-tools-2.10","/nix/store/aq9wdsz12bg9252790l9awiry2bml4ls-sops-install-secrets-0.0.1","/nix/store/b00kq6fjhgisdrykg621vml8505nnmb3-users-groups.json","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bfr68wi6k8icb3j9fy3fzchva56djfhd-mounts.sh","/nix/store/cjnihsds5hhnji9r85hglph07q9y9hgc-system-path","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/f9jll96j74f5ykvs062718b98lfjbn9g-util-linux-2.40.4-bin","/nix/store/h7zih134d3n5yk8pnhv1fa38n6qkyrn2-pre-switch-checks","/nix/store/idn5n51246piyxcr3v6gxnj5a5l9mzpn-linux-6.14.2","/nix/store/ipn5793y61x2904xqnkgbjnp91svjjzx-perl-5.40.0-env","/nix/store/j1rikvl25pz0b5ham1ijq0nbg1q2fqfy-initrd-linux-6.14.2","/nix/store/jgawnqyh6piwcl79gxpmq5czx9rfr9xh-glibc-locales-2.40-66","/nix/store/jqgmcv8j4gj59218hcbiyn8z951rycdj-install-grub.sh","/nix/store/kpmybhxy3gz6k1znbdirwsp3c6wvsgg9-manifest.json","/nix/store/lgainx4gl6q7mhiwmls81d3n51p5jz7z-linux-6.14.2-modules","/nix/store/mhxn5kwnri3z9hdzi3x0980id65p0icn-lib.sh","/nix/store/n8n0faszqlnf3mdg0fj6abnknrhjsw5j-perl-5.40.0-env","/nix/store/nq61v7a601gjndijq5nndprkzpwz4q9g-glibc-2.40-66-bin","/nix/store/nx27idxpvi3fk3p7admvhipny73nr25n-kmod-31","/nix/store/pggww1d2pg24fcg5v36xn63n53vanyyi-gnupg-2.4.7","/nix/store/rg5rf512szdxmnj9qal3wfdnpfsx38qi-setup-etc.pl","/nix/store/vvlfaafnz3pdhw7lx5kc5gb9pl4zhz5l-local-cmds","/nix/store/w142vx7ij1fz6qwhp5dprkf59cizvv1v-update-users-groups.pl","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zlsmh0ccgvncg30qb4y0mp5pahnk1wnw-append-initrd-secrets","/nix/store/zs07icpv5ykf8m36xcv717hh26bp09fa-firmware","/nix/store/zy2n4id5gcxcbx2x8jbblkmcpdlpsypk-getent-glibc-2.40-66"],"registrationTime":1744743136,"signatures":["nixcache.cy7.sh:dZ1XiKQNe0fRX48gBj03PIABYJGV6BPwb72YpMqEBONZMF+JrkVKhRCF0ur/4Bf5prHxg6Qfg1ytP/4csRC9DQ=="],"ultimate":false}}"#;
|
||||||
|
serde_json::from_str::<HashMap<String, PathInfo>>(path_info_json).expect("must serialize");
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
}
|
192
src/push.rs
Normal file
192
src/push.rs
Normal file
|
@ -0,0 +1,192 @@
|
||||||
|
use std::{
|
||||||
|
fs,
|
||||||
|
iter::once,
|
||||||
|
sync::{
|
||||||
|
Arc,
|
||||||
|
atomic::{AtomicUsize, Ordering},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use aws_config::Region;
|
||||||
|
use aws_sdk_s3 as s3;
|
||||||
|
use futures::future::join_all;
|
||||||
|
use nix_compat::narinfo::{self, SigningKey};
|
||||||
|
use tokio::sync::{RwLock, Semaphore, mpsc};
|
||||||
|
use tracing::{debug, info, trace};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
use crate::{PushArgs, path_info::PathInfo, uploader::Uploader};
|
||||||
|
|
||||||
|
pub struct Push {
|
||||||
|
upstream_caches: Vec<Url>,
|
||||||
|
store_paths: Arc<RwLock<Vec<PathInfo>>>,
|
||||||
|
s3_client: s3::Client,
|
||||||
|
signing_key: SigningKey<ed25519_dalek::SigningKey>,
|
||||||
|
bucket: String,
|
||||||
|
// paths that we skipped cause of a signature match
|
||||||
|
signature_hit_count: AtomicUsize,
|
||||||
|
// paths that we skipped cause we found it on an upstream
|
||||||
|
upstream_hit_count: AtomicUsize,
|
||||||
|
// paths that we skipped cause they are already on our cache
|
||||||
|
already_exists_count: AtomicUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Push {
|
||||||
|
pub async fn new(cli: &PushArgs) -> Result<Self> {
|
||||||
|
let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1);
|
||||||
|
for upstream in cli
|
||||||
|
.upstreams
|
||||||
|
.iter()
|
||||||
|
.chain(once(&"https://cache.nixos.org".to_string()))
|
||||||
|
{
|
||||||
|
upstreams
|
||||||
|
.push(Url::parse(upstream).context(format!("failed to parse {upstream} as url"))?);
|
||||||
|
}
|
||||||
|
|
||||||
|
let key = fs::read_to_string(&cli.signing_key)?;
|
||||||
|
let signing_key = narinfo::parse_keypair(key.as_str())?.0;
|
||||||
|
|
||||||
|
let mut s3_config = aws_config::from_env();
|
||||||
|
if let Some(region) = &cli.region {
|
||||||
|
s3_config = s3_config.region(Region::new(region.clone()));
|
||||||
|
}
|
||||||
|
if let Some(endpoint) = &cli.endpoint {
|
||||||
|
s3_config = s3_config.endpoint_url(endpoint);
|
||||||
|
}
|
||||||
|
if let Some(profile) = &cli.profile {
|
||||||
|
s3_config = s3_config.profile_name(profile);
|
||||||
|
}
|
||||||
|
|
||||||
|
let s3_client = s3::Client::new(&s3_config.load().await);
|
||||||
|
Ok(Self {
|
||||||
|
upstream_caches: upstreams,
|
||||||
|
store_paths: Arc::new(RwLock::new(Vec::new())),
|
||||||
|
s3_client,
|
||||||
|
signing_key,
|
||||||
|
bucket: cli.bucket.clone(),
|
||||||
|
signature_hit_count: AtomicUsize::new(0),
|
||||||
|
upstream_hit_count: AtomicUsize::new(0),
|
||||||
|
already_exists_count: AtomicUsize::new(0),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn paths_from_package(&mut self, package: &str) -> Result<()> {
|
||||||
|
let path_info = PathInfo::from_path(package)
|
||||||
|
.await
|
||||||
|
.context("get path info for package")?;
|
||||||
|
debug!("path-info for {package}: {:?}", path_info);
|
||||||
|
self.store_paths.write().await.extend(
|
||||||
|
path_info
|
||||||
|
.get_closure()
|
||||||
|
.await
|
||||||
|
.context("closure from path info")?,
|
||||||
|
);
|
||||||
|
info!("found {} store paths", self.store_paths.read().await.len());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(&'static self) -> Result<()> {
|
||||||
|
let (tx, rx) = mpsc::channel(10);
|
||||||
|
let filter = tokio::spawn(self.filter_from_upstream(tx));
|
||||||
|
let upload = tokio::spawn(self.upload(rx));
|
||||||
|
filter.await?;
|
||||||
|
upload.await??;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// filter paths that are on upstream and send to `tx`
|
||||||
|
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
|
||||||
|
let permits = Arc::new(Semaphore::new(10));
|
||||||
|
let mut handles = Vec::with_capacity(10);
|
||||||
|
let store_paths = self.store_paths.read().await.clone();
|
||||||
|
|
||||||
|
for path in store_paths.into_iter() {
|
||||||
|
if path.check_upstream_signature(&self.upstream_caches) {
|
||||||
|
trace!("skip {} (signature match)", path.absolute_path());
|
||||||
|
self.signature_hit_count.fetch_add(1, Ordering::Release);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
handles.push({
|
||||||
|
let permits = permits.clone();
|
||||||
|
let tx = tx.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _permit = permits.acquire().await.unwrap();
|
||||||
|
|
||||||
|
if !path
|
||||||
|
.check_upstream_hit(self.upstream_caches.as_slice())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
if path
|
||||||
|
.check_if_already_exists(&self.s3_client, self.bucket.clone())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
trace!("skip {} (already exists)", path.absolute_path());
|
||||||
|
self.already_exists_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
} else {
|
||||||
|
tx.send(path).await.unwrap();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
trace!("skip {} (upstream hit)", path.absolute_path());
|
||||||
|
self.upstream_hit_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
join_all(handles)
|
||||||
|
.await
|
||||||
|
.into_iter()
|
||||||
|
.collect::<std::result::Result<(), _>>()
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||||
|
let upload_count = AtomicUsize::new(0);
|
||||||
|
let permits = Arc::new(Semaphore::new(10));
|
||||||
|
let mut uploads = Vec::with_capacity(10);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if let Some(path_to_upload) = rx.recv().await {
|
||||||
|
let permits = Arc::clone(&permits);
|
||||||
|
let absolute_path = path_to_upload.absolute_path();
|
||||||
|
|
||||||
|
info!("uploading: {}", absolute_path);
|
||||||
|
let uploader = Uploader::new(
|
||||||
|
&self.signing_key,
|
||||||
|
path_to_upload,
|
||||||
|
&self.s3_client,
|
||||||
|
self.bucket.clone(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
uploads.push(tokio::spawn(async move {
|
||||||
|
let _permit = permits.acquire().await.unwrap();
|
||||||
|
uploader.upload().await
|
||||||
|
}));
|
||||||
|
} else {
|
||||||
|
join_all(uploads)
|
||||||
|
.await
|
||||||
|
.into_iter()
|
||||||
|
.flatten()
|
||||||
|
.collect::<Result<Vec<_>>>()?;
|
||||||
|
|
||||||
|
println!("uploaded: {}", upload_count.load(Ordering::Relaxed));
|
||||||
|
println!(
|
||||||
|
"skipped because of signature match: {}",
|
||||||
|
self.signature_hit_count.load(Ordering::Relaxed)
|
||||||
|
);
|
||||||
|
println!(
|
||||||
|
"skipped because of upstream hit: {}",
|
||||||
|
self.upstream_hit_count.load(Ordering::Relaxed)
|
||||||
|
);
|
||||||
|
println!(
|
||||||
|
"skipped because already exist: {}",
|
||||||
|
self.already_exists_count.load(Ordering::Relaxed)
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
187
src/uploader.rs
Normal file
187
src/uploader.rs
Normal file
|
@ -0,0 +1,187 @@
|
||||||
|
use anyhow::Result;
|
||||||
|
use async_compression::{Level, tokio::bufread::ZstdEncoder};
|
||||||
|
use aws_sdk_s3::{
|
||||||
|
self as s3,
|
||||||
|
types::{CompletedMultipartUpload, CompletedPart},
|
||||||
|
};
|
||||||
|
use futures::future::join_all;
|
||||||
|
use nix_compat::{
|
||||||
|
narinfo::{self, NarInfo, SigningKey},
|
||||||
|
nixbase32,
|
||||||
|
store_path::StorePath,
|
||||||
|
};
|
||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
use tokio::{io::AsyncReadExt, process::Command};
|
||||||
|
use tracing::debug;
|
||||||
|
|
||||||
|
use crate::path_info::PathInfo;
|
||||||
|
|
||||||
|
const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5;
|
||||||
|
|
||||||
|
pub struct Uploader<'a> {
|
||||||
|
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
|
||||||
|
path: PathInfo,
|
||||||
|
s3_client: &'a s3::Client,
|
||||||
|
bucket: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Uploader<'a> {
|
||||||
|
pub fn new(
|
||||||
|
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
|
||||||
|
path: PathInfo,
|
||||||
|
s3_client: &'a s3::Client,
|
||||||
|
bucket: String,
|
||||||
|
) -> Result<Self> {
|
||||||
|
Ok(Self {
|
||||||
|
signing_key,
|
||||||
|
path,
|
||||||
|
s3_client,
|
||||||
|
bucket,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn upload(&self) -> Result<()> {
|
||||||
|
let nar = self.make_nar().await?;
|
||||||
|
let mut nar_info = self.narinfo_from_nar(&nar)?;
|
||||||
|
let nar = self.compress_nar(&nar).await;
|
||||||
|
|
||||||
|
// update fields that we know after compression
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(&nar);
|
||||||
|
let hash: [u8; 32] = hasher.finalize().into();
|
||||||
|
let nar_url = self.nar_url(&hash);
|
||||||
|
nar_info.file_hash = Some(hash);
|
||||||
|
nar_info.file_size = Some(nar.len() as u64);
|
||||||
|
nar_info.url = nar_url.as_str();
|
||||||
|
debug!("uploading nar with key: {nar_url}");
|
||||||
|
|
||||||
|
if nar.len() < MULTIPART_CUTOFF {
|
||||||
|
let put_object = self
|
||||||
|
.s3_client
|
||||||
|
.put_object()
|
||||||
|
.bucket(&self.bucket)
|
||||||
|
.key(&nar_url)
|
||||||
|
.body(nar.into())
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
debug!("put object: {:#?}", put_object);
|
||||||
|
} else {
|
||||||
|
let multipart = self
|
||||||
|
.s3_client
|
||||||
|
.create_multipart_upload()
|
||||||
|
.bucket(&self.bucket)
|
||||||
|
.key(&nar_url)
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
let upload_id = multipart.upload_id().unwrap();
|
||||||
|
|
||||||
|
let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF);
|
||||||
|
let chunks = nar.chunks(MULTIPART_CUTOFF);
|
||||||
|
for (i, chunk) in chunks.enumerate() {
|
||||||
|
parts.push(tokio::task::spawn(
|
||||||
|
self.s3_client
|
||||||
|
.upload_part()
|
||||||
|
.bucket(&self.bucket)
|
||||||
|
.key(&nar_url)
|
||||||
|
.upload_id(upload_id)
|
||||||
|
.part_number(i as i32 + 1)
|
||||||
|
.body(chunk.to_vec().into())
|
||||||
|
.send(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let completed_parts = join_all(parts)
|
||||||
|
.await
|
||||||
|
.into_iter()
|
||||||
|
.flatten()
|
||||||
|
.collect::<Result<Vec<_>, _>>()?
|
||||||
|
.into_iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, part)| {
|
||||||
|
CompletedPart::builder()
|
||||||
|
.set_e_tag(part.e_tag().map(ToString::to_string))
|
||||||
|
.set_part_number(Some(i as i32 + 1))
|
||||||
|
.set_checksum_sha256(part.checksum_sha256().map(ToString::to_string))
|
||||||
|
.build()
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let completed_mp_upload = CompletedMultipartUpload::builder()
|
||||||
|
.set_parts(Some(completed_parts))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let complete_mp_upload = self
|
||||||
|
.s3_client
|
||||||
|
.complete_multipart_upload()
|
||||||
|
.bucket(&self.bucket)
|
||||||
|
.key(&nar_url)
|
||||||
|
.upload_id(upload_id)
|
||||||
|
.multipart_upload(completed_mp_upload)
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
debug!("complete multipart upload: {:#?}", complete_mp_upload);
|
||||||
|
}
|
||||||
|
|
||||||
|
let narinfo_url = format!("{}.narinfo", self.path.digest());
|
||||||
|
debug!("uploading narinfo with key {narinfo_url}");
|
||||||
|
self.s3_client
|
||||||
|
.put_object()
|
||||||
|
.bucket(&self.bucket)
|
||||||
|
.key(narinfo_url)
|
||||||
|
.body(nar_info.to_string().as_bytes().to_vec().into())
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn make_nar(&self) -> Result<Vec<u8>> {
|
||||||
|
Ok(Command::new("nix")
|
||||||
|
.arg("nar")
|
||||||
|
.arg("dump-path")
|
||||||
|
.arg(self.path.absolute_path())
|
||||||
|
.output()
|
||||||
|
.await?
|
||||||
|
.stdout)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn narinfo_from_nar(&self, nar: &[u8]) -> Result<NarInfo> {
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(nar);
|
||||||
|
let nar_hash: [u8; 32] = hasher.finalize().into();
|
||||||
|
let mut nar_info = NarInfo {
|
||||||
|
flags: narinfo::Flags::empty(),
|
||||||
|
store_path: self.path.path.as_ref(),
|
||||||
|
nar_hash,
|
||||||
|
nar_size: nar.len() as u64,
|
||||||
|
references: self.path.references.iter().map(StorePath::as_ref).collect(),
|
||||||
|
signatures: Vec::new(),
|
||||||
|
ca: None,
|
||||||
|
system: None,
|
||||||
|
deriver: self.path.deriver.as_ref().map(|x| x.as_ref()),
|
||||||
|
compression: Some("zstd"),
|
||||||
|
file_hash: None,
|
||||||
|
file_size: None,
|
||||||
|
url: "",
|
||||||
|
};
|
||||||
|
// signature consists of: store_path, nar_hash, nar_size, and references
|
||||||
|
nar_info.add_signature(self.signing_key);
|
||||||
|
Ok(nar_info)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn nar_url(&self, compressed_nar_hash: &[u8]) -> String {
|
||||||
|
let compressed_nar_hash = nixbase32::encode(compressed_nar_hash);
|
||||||
|
format!("nar/{compressed_nar_hash}.nar.zst")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn compress_nar(&self, nar: &[u8]) -> Vec<u8> {
|
||||||
|
let mut encoder = ZstdEncoder::with_quality(nar, Level::Default);
|
||||||
|
let mut compressed = Vec::with_capacity(nar.len());
|
||||||
|
encoder
|
||||||
|
.read_to_end(&mut compressed)
|
||||||
|
.await
|
||||||
|
.expect("should compress just fine");
|
||||||
|
compressed
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue