Compare commits

...

10 commits

12 changed files with 220 additions and 110 deletions

16
Cargo.lock generated
View file

@ -968,6 +968,15 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humansize"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7"
dependencies = [
"libm",
]
[[package]]
name = "humantime"
version = "2.2.0"
@ -1304,6 +1313,12 @@ version = "0.2.172"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
[[package]]
name = "libm"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72"
[[package]]
name = "libmimalloc-sys"
version = "0.1.42"
@ -1491,6 +1506,7 @@ dependencies = [
"cxx-build",
"ed25519-dalek",
"futures",
"humansize",
"nix-compat",
"object_store",
"pkg-config",

View file

@ -30,6 +30,7 @@ bytes = "1.10.1"
object_store = { version = "0.12.0", features = ["aws"] }
ulid = "1.2.1"
tracing-subscriber = "0.3.19"
humansize = "2.1.3"
[build-dependencies]
cxx-build = "1.0"

View file

@ -59,7 +59,6 @@
devShells.default = craneLib.devShell {
inputsFrom = [ nixcp ];
RUST_LOG = "nixcp=debug";
RUST_BACKGRACE = 1;
# for cpp bindings to work
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";

View file

@ -150,10 +150,7 @@ impl Stream for AsyncWriteAdapter {
match message {
Data(v) => Poll::Ready(Some(Ok(v.into()))),
Error(exception) => {
let error = std::io::Error::new(
io::ErrorKind::Other,
format!("cxx error: {exception}"),
);
let error = std::io::Error::other(format!("cxx error: {exception}"));
Poll::Ready(Some(Err(error)))
}
Eof => {

65
src/lib.rs Normal file
View file

@ -0,0 +1,65 @@
use std::path::PathBuf;
use clap::{Args, Parser, Subcommand};
mod bindings;
mod cli;
pub mod make_nar;
pub mod path_info;
pub mod push;
pub mod store;
mod uploader;
#[derive(Parser, Debug)]
#[command(version)]
#[command(name = "nixcp")]
#[command(about = "Upload store paths to a s3 binary cache")]
#[command(long_about = None)]
pub struct Cli {
#[command(subcommand)]
pub command: Commands,
/// Whether to enable tokio-console
#[arg(long)]
pub tokio_console: bool,
}
#[derive(Debug, Subcommand)]
pub enum Commands {
#[command(arg_required_else_help = true)]
Push(PushArgs),
}
#[derive(Debug, Args)]
pub struct PushArgs {
/// The s3 bucket to upload to
#[arg(long, value_name = "bucket name")]
bucket: String,
/// Upstream cache to check against. Can be specified multiple times.
/// cache.nixos.org is always included.
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
upstreams: Vec<String>,
/// Path to the file containing signing key
/// e.g. ~/cache-priv-key.pem
#[arg(long)]
signing_key: String,
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
#[arg(long)]
region: Option<String>,
/// If unspecifed, will get it from AWS_ENDPOINT envar
/// e.g. https://s3.example.com
#[arg(long)]
endpoint: Option<String>,
#[arg(long)]
skip_signature_check: bool,
/// Path to upload
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
#[arg(value_name = "PATH")]
pub paths: Vec<PathBuf>,
}

View file

@ -1,76 +1,10 @@
#![feature(let_chains)]
#![feature(exit_status_error)]
use std::path::PathBuf;
use anyhow::{Context, Result};
use clap::{Args, Parser, Subcommand};
use clap::Parser;
use tracing_subscriber::{EnvFilter, prelude::*};
use push::Push;
use store::Store;
mod bindings;
mod cli;
mod make_nar;
mod path_info;
mod push;
mod store;
mod uploader;
#[derive(Parser, Debug)]
#[command(version)]
#[command(name = "nixcp")]
#[command(about = "Upload store paths to a s3 binary cache")]
#[command(long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
/// Whether to enable tokio-console
#[arg(long)]
tokio_console: bool,
}
#[derive(Debug, Subcommand)]
enum Commands {
#[command(arg_required_else_help = true)]
Push(PushArgs),
}
#[derive(Debug, Args)]
pub struct PushArgs {
/// The s3 bucket to upload to
#[arg(long, value_name = "bucket name")]
bucket: String,
/// Upstream cache to check against. Can be specified multiple times.
/// cache.nixos.org is always included.
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
upstreams: Vec<String>,
/// Path to the file containing signing key
/// e.g. ~/cache-priv-key.pem
#[arg(long)]
signing_key: String,
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
#[arg(long)]
region: Option<String>,
/// If unspecifed, will get it from AWS_ENDPOINT envar
/// e.g. https://s3.example.com
#[arg(long)]
endpoint: Option<String>,
#[arg(long)]
skip_signature_check: bool,
/// Path to upload
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
#[arg(value_name = "PATH")]
paths: Vec<PathBuf>,
}
use nixcp::push::Push;
use nixcp::store::Store;
use nixcp::{Cli, Commands};
#[tokio::main]
async fn main() -> Result<()> {

View file

@ -15,10 +15,10 @@ use crate::store::Store;
pub struct MakeNar<'a> {
path_info: &'a PathInfo,
store: Arc<Store>,
nar_hasher: Sha256,
pub nar_hasher: Sha256,
/// hash of compressed nar file
file_hasher: Sha256,
nar_size: u64,
pub nar_size: u64,
file_size: u64,
}

View file

@ -1,6 +1,6 @@
use std::collections::HashSet;
use anyhow::{Context, Result};
use anyhow::{Context, Result, anyhow};
use futures::future::join_all;
use nix_compat::nixbase32;
use nix_compat::store_path::StorePath;
@ -13,7 +13,7 @@ use url::Url;
use crate::store::Store;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PathInfo {
pub path: StorePath<String>,
pub signatures: Vec<String>,
@ -22,33 +22,44 @@ pub struct PathInfo {
}
impl PathInfo {
pub async fn from_path(path: &Path, store: &Store) -> Result<Self> {
debug!("query path info for {:?}", path);
pub async fn from_derivation(drv: &Path, store: &Store) -> Result<Self> {
debug!("query path info for {:?}", drv);
let derivation = match path.extension() {
Some(ext) if ext == "drv" => path.as_os_str().as_encoded_bytes(),
let derivation = match drv.extension() {
Some(ext) if ext == "drv" => drv.as_os_str().as_encoded_bytes(),
_ => {
&Command::new("nix")
.arg("path-info")
.arg("--derivation")
.arg(path)
.arg(drv)
.output()
.await
.context(format!("run command: nix path-info --derivaiton {path:?}"))?
.context(format!("run command: nix path-info --derivaiton {drv:?}"))?
.stdout
}
};
let derivation = String::from_utf8_lossy(derivation);
debug!("derivation: {derivation}");
let store_path = StorePath::from_absolute_path(derivation.trim().as_bytes())
.context("storepath from derivation")?;
if derivation.is_empty() {
return Err(anyhow!(
"nix path-info did not return a derivation for {drv:#?}"
));
}
Self::from_path(derivation.trim(), store).await
}
pub async fn from_path(path: &str, store: &Store) -> Result<Self> {
let store_path =
StorePath::from_absolute_path(path.as_bytes()).context("storepath from path")?;
store
.query_path_info(store_path)
.await
.context("query pathinfo for derivation")
.context("query pathinfo for path")
}
// TODO: skip call to query_path_info and return Vec<Path>?
pub async fn get_closure(&self, store: &Store) -> Result<Vec<Self>> {
let futs = store
.compute_fs_closure(self.path.clone())

View file

@ -1,4 +1,5 @@
use std::{
collections::HashSet,
fs,
iter::once,
path::PathBuf,
@ -10,6 +11,7 @@ use std::{
use anyhow::{Context, Result};
use futures::future::join_all;
use humansize::{DECIMAL, format_size};
use nix_compat::narinfo::{self, SigningKey};
use object_store::aws::{AmazonS3, AmazonS3Builder};
use tokio::sync::{RwLock, Semaphore, mpsc};
@ -20,7 +22,7 @@ use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
pub struct Push {
upstream_caches: Vec<Url>,
store_paths: Arc<RwLock<Vec<PathInfo>>>,
store_paths: Arc<RwLock<HashSet<PathInfo>>>,
signing_key: SigningKey<ed25519_dalek::SigningKey>,
store: Arc<Store>,
s3: Arc<AmazonS3>,
@ -60,7 +62,7 @@ impl Push {
Ok(Self {
upstream_caches: upstreams,
store_paths: Arc::new(RwLock::new(Vec::new())),
store_paths: Arc::new(RwLock::new(HashSet::new())),
signing_key,
store: Arc::new(store),
s3: Arc::new(s3_builder.build()?),
@ -78,7 +80,7 @@ impl Push {
let store = self.store.clone();
futs.push(tokio::spawn(async move {
let path_info = PathInfo::from_path(path.as_path(), &store)
let path_info = PathInfo::from_derivation(path.as_path(), &store)
.await
.context("get path info for path")?;
debug!("path-info for {path:?}: {path_info:?}");
@ -157,34 +159,24 @@ impl Push {
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
let mut uploads = Vec::new();
let permits = Arc::new(Semaphore::new(16));
let big_permits = Arc::new(Semaphore::new(5));
let permits = Arc::new(Semaphore::new(10));
loop {
let permits = permits.clone();
let big_permits = big_permits.clone();
if let Some(path_to_upload) = rx.recv().await {
debug!("upload permits available: {}", permits.available_permits());
let mut permit = permits.acquire_owned().await.unwrap();
uploads.push(tokio::spawn({
// a large directory may have many files and end up causing "too many open files"
if PathBuf::from(path_to_upload.absolute_path()).is_dir()
&& path_to_upload.nar_size > 5 * 1024 * 1024
{
debug!(
"upload big permits available: {}",
big_permits.available_permits()
);
// drop regular permit and take the big one
permit = big_permits.acquire_owned().await.unwrap();
}
// large uploads will be concurrently uploaded with multipart anyway so don't spawn
// too much of them
let permit = if path_to_upload.nar_size > 15 * 1024 * 1024 {
Some(permits.acquire_owned().await.unwrap())
} else {
None
};
println!(
"uploading: {} (size: {})",
path_to_upload.absolute_path(),
path_to_upload.nar_size
format_size(path_to_upload.nar_size, DECIMAL)
);
let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
let s3 = self.s3.clone();

32
tests/common/mod.rs Normal file
View file

@ -0,0 +1,32 @@
#![allow(dead_code)]
use std::process::Command;
use std::sync::Arc;
use nixcp::store::Store;
pub const HELLO: &str = "github:nixos/nixpkgs?ref=f771eb401a46846c1aebd20552521b233dd7e18b#hello";
pub const HELLO_DRV: &str = "iqbwkm8mjjjlmw6x6ry9rhzin2cp9372-hello-2.12.1.drv";
pub const HELLO_PATH: &str = "/nix/store/9bwryidal9q3g91cjm6xschfn4ikd82q-hello-2.12.1";
pub struct Context {
pub store: Arc<Store>,
}
impl Context {
fn new() -> Self {
// hello must be in the store
Command::new("nix")
.arg("build")
.arg("--no-link")
.arg(HELLO)
.status()
.unwrap();
let store = Arc::new(Store::connect().expect("connect to nix store"));
Self { store }
}
}
pub fn context() -> Context {
Context::new()
}

26
tests/nar.rs Normal file
View file

@ -0,0 +1,26 @@
use crate::common::HELLO_PATH;
use nix_compat::nixbase32;
use nixcp::make_nar::MakeNar;
use nixcp::path_info::PathInfo;
use sha2::Digest;
use tokio::io::AsyncReadExt;
mod common;
#[tokio::test]
async fn nar_size_and_hash() {
let ctx = common::context();
let path_info = PathInfo::from_path(HELLO_PATH, &ctx.store).await.unwrap();
let mut nar = MakeNar::new(&path_info, ctx.store).unwrap();
let mut reader = nar.compress_and_hash().unwrap();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
drop(reader);
assert_eq!(nar.nar_size, 234680);
let nar_hash = nar.nar_hasher.finalize();
let real_nar_hash = "08za7nnjda8kpdsd73v3mhykjvp0rsmskwsr37winhmzgm6iw79w";
assert_eq!(nixbase32::encode(nar_hash.as_slice()), real_nar_hash);
}

37
tests/path_info.rs Normal file
View file

@ -0,0 +1,37 @@
use nixcp::path_info::PathInfo;
use std::path::PathBuf;
use crate::common::{HELLO, HELLO_DRV, HELLO_PATH};
mod common;
#[tokio::test]
async fn path_info_from_package() {
let ctx = common::context();
let path = PathBuf::from(HELLO);
let path_info = PathInfo::from_derivation(&path, &ctx.store)
.await
.expect("get pathinfo from package");
assert_eq!(path_info.path.to_string(), HELLO_DRV);
}
#[tokio::test]
async fn path_info_from_path() {
let ctx = common::context();
let path = PathBuf::from(HELLO_PATH);
let path_info = PathInfo::from_derivation(&path, &ctx.store)
.await
.expect("get pathinfo from package");
assert_eq!(path_info.path.to_string(), HELLO_DRV);
}
#[tokio::test]
async fn closure() {
let ctx = common::context();
let path = PathBuf::from(HELLO);
let path_info = PathInfo::from_derivation(&path, &ctx.store)
.await
.expect("get pathinfo from package");
let closure = path_info.get_closure(&ctx.store).await.unwrap();
assert_eq!(closure.len(), 472);
}