Compare commits
10 commits
e5336d304d
...
a0794b0356
Author | SHA1 | Date | |
---|---|---|---|
a0794b0356 | |||
05589641cf | |||
76cbc85032 | |||
09181ae785 | |||
54d4c714af | |||
878e096494 | |||
7285c29e88 | |||
9d2e9e38bd | |||
01443d0d99 | |||
b49be95d09 |
12 changed files with 220 additions and 110 deletions
16
Cargo.lock
generated
16
Cargo.lock
generated
|
@ -968,6 +968,15 @@ version = "1.0.3"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
||||
|
||||
[[package]]
|
||||
name = "humansize"
|
||||
version = "2.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7"
|
||||
dependencies = [
|
||||
"libm",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.2.0"
|
||||
|
@ -1304,6 +1313,12 @@ version = "0.2.172"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
|
||||
|
||||
[[package]]
|
||||
name = "libm"
|
||||
version = "0.2.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72"
|
||||
|
||||
[[package]]
|
||||
name = "libmimalloc-sys"
|
||||
version = "0.1.42"
|
||||
|
@ -1491,6 +1506,7 @@ dependencies = [
|
|||
"cxx-build",
|
||||
"ed25519-dalek",
|
||||
"futures",
|
||||
"humansize",
|
||||
"nix-compat",
|
||||
"object_store",
|
||||
"pkg-config",
|
||||
|
|
|
@ -30,6 +30,7 @@ bytes = "1.10.1"
|
|||
object_store = { version = "0.12.0", features = ["aws"] }
|
||||
ulid = "1.2.1"
|
||||
tracing-subscriber = "0.3.19"
|
||||
humansize = "2.1.3"
|
||||
|
||||
[build-dependencies]
|
||||
cxx-build = "1.0"
|
||||
|
|
|
@ -59,7 +59,6 @@
|
|||
devShells.default = craneLib.devShell {
|
||||
inputsFrom = [ nixcp ];
|
||||
|
||||
RUST_LOG = "nixcp=debug";
|
||||
RUST_BACKGRACE = 1;
|
||||
# for cpp bindings to work
|
||||
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
|
||||
|
|
|
@ -150,10 +150,7 @@ impl Stream for AsyncWriteAdapter {
|
|||
match message {
|
||||
Data(v) => Poll::Ready(Some(Ok(v.into()))),
|
||||
Error(exception) => {
|
||||
let error = std::io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("cxx error: {exception}"),
|
||||
);
|
||||
let error = std::io::Error::other(format!("cxx error: {exception}"));
|
||||
Poll::Ready(Some(Err(error)))
|
||||
}
|
||||
Eof => {
|
||||
|
|
65
src/lib.rs
Normal file
65
src/lib.rs
Normal file
|
@ -0,0 +1,65 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
|
||||
mod bindings;
|
||||
mod cli;
|
||||
pub mod make_nar;
|
||||
pub mod path_info;
|
||||
pub mod push;
|
||||
pub mod store;
|
||||
mod uploader;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version)]
|
||||
#[command(name = "nixcp")]
|
||||
#[command(about = "Upload store paths to a s3 binary cache")]
|
||||
#[command(long_about = None)]
|
||||
pub struct Cli {
|
||||
#[command(subcommand)]
|
||||
pub command: Commands,
|
||||
|
||||
/// Whether to enable tokio-console
|
||||
#[arg(long)]
|
||||
pub tokio_console: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
pub enum Commands {
|
||||
#[command(arg_required_else_help = true)]
|
||||
Push(PushArgs),
|
||||
}
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
pub struct PushArgs {
|
||||
/// The s3 bucket to upload to
|
||||
#[arg(long, value_name = "bucket name")]
|
||||
bucket: String,
|
||||
|
||||
/// Upstream cache to check against. Can be specified multiple times.
|
||||
/// cache.nixos.org is always included.
|
||||
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
|
||||
upstreams: Vec<String>,
|
||||
|
||||
/// Path to the file containing signing key
|
||||
/// e.g. ~/cache-priv-key.pem
|
||||
#[arg(long)]
|
||||
signing_key: String,
|
||||
|
||||
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
|
||||
#[arg(long)]
|
||||
region: Option<String>,
|
||||
|
||||
/// If unspecifed, will get it from AWS_ENDPOINT envar
|
||||
/// e.g. https://s3.example.com
|
||||
#[arg(long)]
|
||||
endpoint: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
skip_signature_check: bool,
|
||||
|
||||
/// Path to upload
|
||||
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||
#[arg(value_name = "PATH")]
|
||||
pub paths: Vec<PathBuf>,
|
||||
}
|
74
src/main.rs
74
src/main.rs
|
@ -1,76 +1,10 @@
|
|||
#![feature(let_chains)]
|
||||
#![feature(exit_status_error)]
|
||||
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
use clap::Parser;
|
||||
use tracing_subscriber::{EnvFilter, prelude::*};
|
||||
|
||||
use push::Push;
|
||||
use store::Store;
|
||||
|
||||
mod bindings;
|
||||
mod cli;
|
||||
mod make_nar;
|
||||
mod path_info;
|
||||
mod push;
|
||||
mod store;
|
||||
mod uploader;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version)]
|
||||
#[command(name = "nixcp")]
|
||||
#[command(about = "Upload store paths to a s3 binary cache")]
|
||||
#[command(long_about = None)]
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Commands,
|
||||
|
||||
/// Whether to enable tokio-console
|
||||
#[arg(long)]
|
||||
tokio_console: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
enum Commands {
|
||||
#[command(arg_required_else_help = true)]
|
||||
Push(PushArgs),
|
||||
}
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
pub struct PushArgs {
|
||||
/// The s3 bucket to upload to
|
||||
#[arg(long, value_name = "bucket name")]
|
||||
bucket: String,
|
||||
|
||||
/// Upstream cache to check against. Can be specified multiple times.
|
||||
/// cache.nixos.org is always included.
|
||||
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
|
||||
upstreams: Vec<String>,
|
||||
|
||||
/// Path to the file containing signing key
|
||||
/// e.g. ~/cache-priv-key.pem
|
||||
#[arg(long)]
|
||||
signing_key: String,
|
||||
|
||||
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
|
||||
#[arg(long)]
|
||||
region: Option<String>,
|
||||
|
||||
/// If unspecifed, will get it from AWS_ENDPOINT envar
|
||||
/// e.g. https://s3.example.com
|
||||
#[arg(long)]
|
||||
endpoint: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
skip_signature_check: bool,
|
||||
|
||||
/// Path to upload
|
||||
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||
#[arg(value_name = "PATH")]
|
||||
paths: Vec<PathBuf>,
|
||||
}
|
||||
use nixcp::push::Push;
|
||||
use nixcp::store::Store;
|
||||
use nixcp::{Cli, Commands};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
|
|
|
@ -15,10 +15,10 @@ use crate::store::Store;
|
|||
pub struct MakeNar<'a> {
|
||||
path_info: &'a PathInfo,
|
||||
store: Arc<Store>,
|
||||
nar_hasher: Sha256,
|
||||
pub nar_hasher: Sha256,
|
||||
/// hash of compressed nar file
|
||||
file_hasher: Sha256,
|
||||
nar_size: u64,
|
||||
pub nar_size: u64,
|
||||
file_size: u64,
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::collections::HashSet;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use anyhow::{Context, Result, anyhow};
|
||||
use futures::future::join_all;
|
||||
use nix_compat::nixbase32;
|
||||
use nix_compat::store_path::StorePath;
|
||||
|
@ -13,7 +13,7 @@ use url::Url;
|
|||
|
||||
use crate::store::Store;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct PathInfo {
|
||||
pub path: StorePath<String>,
|
||||
pub signatures: Vec<String>,
|
||||
|
@ -22,33 +22,44 @@ pub struct PathInfo {
|
|||
}
|
||||
|
||||
impl PathInfo {
|
||||
pub async fn from_path(path: &Path, store: &Store) -> Result<Self> {
|
||||
debug!("query path info for {:?}", path);
|
||||
pub async fn from_derivation(drv: &Path, store: &Store) -> Result<Self> {
|
||||
debug!("query path info for {:?}", drv);
|
||||
|
||||
let derivation = match path.extension() {
|
||||
Some(ext) if ext == "drv" => path.as_os_str().as_encoded_bytes(),
|
||||
let derivation = match drv.extension() {
|
||||
Some(ext) if ext == "drv" => drv.as_os_str().as_encoded_bytes(),
|
||||
_ => {
|
||||
&Command::new("nix")
|
||||
.arg("path-info")
|
||||
.arg("--derivation")
|
||||
.arg(path)
|
||||
.arg(drv)
|
||||
.output()
|
||||
.await
|
||||
.context(format!("run command: nix path-info --derivaiton {path:?}"))?
|
||||
.context(format!("run command: nix path-info --derivaiton {drv:?}"))?
|
||||
.stdout
|
||||
}
|
||||
};
|
||||
let derivation = String::from_utf8_lossy(derivation);
|
||||
debug!("derivation: {derivation}");
|
||||
|
||||
let store_path = StorePath::from_absolute_path(derivation.trim().as_bytes())
|
||||
.context("storepath from derivation")?;
|
||||
if derivation.is_empty() {
|
||||
return Err(anyhow!(
|
||||
"nix path-info did not return a derivation for {drv:#?}"
|
||||
));
|
||||
}
|
||||
|
||||
Self::from_path(derivation.trim(), store).await
|
||||
}
|
||||
|
||||
pub async fn from_path(path: &str, store: &Store) -> Result<Self> {
|
||||
let store_path =
|
||||
StorePath::from_absolute_path(path.as_bytes()).context("storepath from path")?;
|
||||
store
|
||||
.query_path_info(store_path)
|
||||
.await
|
||||
.context("query pathinfo for derivation")
|
||||
.context("query pathinfo for path")
|
||||
}
|
||||
|
||||
// TODO: skip call to query_path_info and return Vec<Path>?
|
||||
pub async fn get_closure(&self, store: &Store) -> Result<Vec<Self>> {
|
||||
let futs = store
|
||||
.compute_fs_closure(self.path.clone())
|
||||
|
|
36
src/push.rs
36
src/push.rs
|
@ -1,4 +1,5 @@
|
|||
use std::{
|
||||
collections::HashSet,
|
||||
fs,
|
||||
iter::once,
|
||||
path::PathBuf,
|
||||
|
@ -10,6 +11,7 @@ use std::{
|
|||
|
||||
use anyhow::{Context, Result};
|
||||
use futures::future::join_all;
|
||||
use humansize::{DECIMAL, format_size};
|
||||
use nix_compat::narinfo::{self, SigningKey};
|
||||
use object_store::aws::{AmazonS3, AmazonS3Builder};
|
||||
use tokio::sync::{RwLock, Semaphore, mpsc};
|
||||
|
@ -20,7 +22,7 @@ use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
|
|||
|
||||
pub struct Push {
|
||||
upstream_caches: Vec<Url>,
|
||||
store_paths: Arc<RwLock<Vec<PathInfo>>>,
|
||||
store_paths: Arc<RwLock<HashSet<PathInfo>>>,
|
||||
signing_key: SigningKey<ed25519_dalek::SigningKey>,
|
||||
store: Arc<Store>,
|
||||
s3: Arc<AmazonS3>,
|
||||
|
@ -60,7 +62,7 @@ impl Push {
|
|||
|
||||
Ok(Self {
|
||||
upstream_caches: upstreams,
|
||||
store_paths: Arc::new(RwLock::new(Vec::new())),
|
||||
store_paths: Arc::new(RwLock::new(HashSet::new())),
|
||||
signing_key,
|
||||
store: Arc::new(store),
|
||||
s3: Arc::new(s3_builder.build()?),
|
||||
|
@ -78,7 +80,7 @@ impl Push {
|
|||
let store = self.store.clone();
|
||||
|
||||
futs.push(tokio::spawn(async move {
|
||||
let path_info = PathInfo::from_path(path.as_path(), &store)
|
||||
let path_info = PathInfo::from_derivation(path.as_path(), &store)
|
||||
.await
|
||||
.context("get path info for path")?;
|
||||
debug!("path-info for {path:?}: {path_info:?}");
|
||||
|
@ -157,34 +159,24 @@ impl Push {
|
|||
|
||||
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||
let mut uploads = Vec::new();
|
||||
let permits = Arc::new(Semaphore::new(16));
|
||||
let big_permits = Arc::new(Semaphore::new(5));
|
||||
let permits = Arc::new(Semaphore::new(10));
|
||||
|
||||
loop {
|
||||
let permits = permits.clone();
|
||||
let big_permits = big_permits.clone();
|
||||
|
||||
if let Some(path_to_upload) = rx.recv().await {
|
||||
debug!("upload permits available: {}", permits.available_permits());
|
||||
let mut permit = permits.acquire_owned().await.unwrap();
|
||||
|
||||
uploads.push(tokio::spawn({
|
||||
// a large directory may have many files and end up causing "too many open files"
|
||||
if PathBuf::from(path_to_upload.absolute_path()).is_dir()
|
||||
&& path_to_upload.nar_size > 5 * 1024 * 1024
|
||||
{
|
||||
debug!(
|
||||
"upload big permits available: {}",
|
||||
big_permits.available_permits()
|
||||
);
|
||||
// drop regular permit and take the big one
|
||||
permit = big_permits.acquire_owned().await.unwrap();
|
||||
}
|
||||
|
||||
// large uploads will be concurrently uploaded with multipart anyway so don't spawn
|
||||
// too much of them
|
||||
let permit = if path_to_upload.nar_size > 15 * 1024 * 1024 {
|
||||
Some(permits.acquire_owned().await.unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
println!(
|
||||
"uploading: {} (size: {})",
|
||||
path_to_upload.absolute_path(),
|
||||
path_to_upload.nar_size
|
||||
format_size(path_to_upload.nar_size, DECIMAL)
|
||||
);
|
||||
let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
|
||||
let s3 = self.s3.clone();
|
||||
|
|
32
tests/common/mod.rs
Normal file
32
tests/common/mod.rs
Normal file
|
@ -0,0 +1,32 @@
|
|||
#![allow(dead_code)]
|
||||
|
||||
use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
|
||||
use nixcp::store::Store;
|
||||
|
||||
pub const HELLO: &str = "github:nixos/nixpkgs?ref=f771eb401a46846c1aebd20552521b233dd7e18b#hello";
|
||||
pub const HELLO_DRV: &str = "iqbwkm8mjjjlmw6x6ry9rhzin2cp9372-hello-2.12.1.drv";
|
||||
pub const HELLO_PATH: &str = "/nix/store/9bwryidal9q3g91cjm6xschfn4ikd82q-hello-2.12.1";
|
||||
|
||||
pub struct Context {
|
||||
pub store: Arc<Store>,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
fn new() -> Self {
|
||||
// hello must be in the store
|
||||
Command::new("nix")
|
||||
.arg("build")
|
||||
.arg("--no-link")
|
||||
.arg(HELLO)
|
||||
.status()
|
||||
.unwrap();
|
||||
let store = Arc::new(Store::connect().expect("connect to nix store"));
|
||||
Self { store }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn context() -> Context {
|
||||
Context::new()
|
||||
}
|
26
tests/nar.rs
Normal file
26
tests/nar.rs
Normal file
|
@ -0,0 +1,26 @@
|
|||
use crate::common::HELLO_PATH;
|
||||
use nix_compat::nixbase32;
|
||||
use nixcp::make_nar::MakeNar;
|
||||
use nixcp::path_info::PathInfo;
|
||||
use sha2::Digest;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
mod common;
|
||||
|
||||
#[tokio::test]
|
||||
async fn nar_size_and_hash() {
|
||||
let ctx = common::context();
|
||||
let path_info = PathInfo::from_path(HELLO_PATH, &ctx.store).await.unwrap();
|
||||
|
||||
let mut nar = MakeNar::new(&path_info, ctx.store).unwrap();
|
||||
let mut reader = nar.compress_and_hash().unwrap();
|
||||
let mut buf = Vec::new();
|
||||
reader.read_to_end(&mut buf).await.unwrap();
|
||||
drop(reader);
|
||||
|
||||
assert_eq!(nar.nar_size, 234680);
|
||||
|
||||
let nar_hash = nar.nar_hasher.finalize();
|
||||
let real_nar_hash = "08za7nnjda8kpdsd73v3mhykjvp0rsmskwsr37winhmzgm6iw79w";
|
||||
assert_eq!(nixbase32::encode(nar_hash.as_slice()), real_nar_hash);
|
||||
}
|
37
tests/path_info.rs
Normal file
37
tests/path_info.rs
Normal file
|
@ -0,0 +1,37 @@
|
|||
use nixcp::path_info::PathInfo;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::common::{HELLO, HELLO_DRV, HELLO_PATH};
|
||||
|
||||
mod common;
|
||||
|
||||
#[tokio::test]
|
||||
async fn path_info_from_package() {
|
||||
let ctx = common::context();
|
||||
let path = PathBuf::from(HELLO);
|
||||
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
assert_eq!(path_info.path.to_string(), HELLO_DRV);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn path_info_from_path() {
|
||||
let ctx = common::context();
|
||||
let path = PathBuf::from(HELLO_PATH);
|
||||
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
assert_eq!(path_info.path.to_string(), HELLO_DRV);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn closure() {
|
||||
let ctx = common::context();
|
||||
let path = PathBuf::from(HELLO);
|
||||
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
let closure = path_info.get_closure(&ctx.store).await.unwrap();
|
||||
assert_eq!(closure.len(), 472);
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue