Compare commits

..

No commits in common. "a0794b03564c6f045c25cc96c93922a43afea367" and "e5336d304d41d80cb9b07a4d9ffe3d9d480e6cf4" have entirely different histories.

12 changed files with 110 additions and 220 deletions

16
Cargo.lock generated
View file

@ -968,15 +968,6 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]]
name = "humansize"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7"
dependencies = [
"libm",
]
[[package]]
name = "humantime"
version = "2.2.0"
@ -1313,12 +1304,6 @@ version = "0.2.172"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
[[package]]
name = "libm"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72"
[[package]]
name = "libmimalloc-sys"
version = "0.1.42"
@ -1506,7 +1491,6 @@ dependencies = [
"cxx-build",
"ed25519-dalek",
"futures",
"humansize",
"nix-compat",
"object_store",
"pkg-config",

View file

@ -30,7 +30,6 @@ bytes = "1.10.1"
object_store = { version = "0.12.0", features = ["aws"] }
ulid = "1.2.1"
tracing-subscriber = "0.3.19"
humansize = "2.1.3"
[build-dependencies]
cxx-build = "1.0"

View file

@ -59,6 +59,7 @@
devShells.default = craneLib.devShell {
inputsFrom = [ nixcp ];
RUST_LOG = "nixcp=debug";
RUST_BACKGRACE = 1;
# for cpp bindings to work
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";

View file

@ -150,7 +150,10 @@ impl Stream for AsyncWriteAdapter {
match message {
Data(v) => Poll::Ready(Some(Ok(v.into()))),
Error(exception) => {
let error = std::io::Error::other(format!("cxx error: {exception}"));
let error = std::io::Error::new(
io::ErrorKind::Other,
format!("cxx error: {exception}"),
);
Poll::Ready(Some(Err(error)))
}
Eof => {

View file

@ -1,65 +0,0 @@
use std::path::PathBuf;
use clap::{Args, Parser, Subcommand};
mod bindings;
mod cli;
pub mod make_nar;
pub mod path_info;
pub mod push;
pub mod store;
mod uploader;
#[derive(Parser, Debug)]
#[command(version)]
#[command(name = "nixcp")]
#[command(about = "Upload store paths to a s3 binary cache")]
#[command(long_about = None)]
pub struct Cli {
#[command(subcommand)]
pub command: Commands,
/// Whether to enable tokio-console
#[arg(long)]
pub tokio_console: bool,
}
#[derive(Debug, Subcommand)]
pub enum Commands {
#[command(arg_required_else_help = true)]
Push(PushArgs),
}
#[derive(Debug, Args)]
pub struct PushArgs {
/// The s3 bucket to upload to
#[arg(long, value_name = "bucket name")]
bucket: String,
/// Upstream cache to check against. Can be specified multiple times.
/// cache.nixos.org is always included.
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
upstreams: Vec<String>,
/// Path to the file containing signing key
/// e.g. ~/cache-priv-key.pem
#[arg(long)]
signing_key: String,
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
#[arg(long)]
region: Option<String>,
/// If unspecifed, will get it from AWS_ENDPOINT envar
/// e.g. https://s3.example.com
#[arg(long)]
endpoint: Option<String>,
#[arg(long)]
skip_signature_check: bool,
/// Path to upload
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
#[arg(value_name = "PATH")]
pub paths: Vec<PathBuf>,
}

View file

@ -1,10 +1,76 @@
#![feature(let_chains)]
#![feature(exit_status_error)]
use std::path::PathBuf;
use anyhow::{Context, Result};
use clap::Parser;
use clap::{Args, Parser, Subcommand};
use tracing_subscriber::{EnvFilter, prelude::*};
use nixcp::push::Push;
use nixcp::store::Store;
use nixcp::{Cli, Commands};
use push::Push;
use store::Store;
mod bindings;
mod cli;
mod make_nar;
mod path_info;
mod push;
mod store;
mod uploader;
#[derive(Parser, Debug)]
#[command(version)]
#[command(name = "nixcp")]
#[command(about = "Upload store paths to a s3 binary cache")]
#[command(long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
/// Whether to enable tokio-console
#[arg(long)]
tokio_console: bool,
}
#[derive(Debug, Subcommand)]
enum Commands {
#[command(arg_required_else_help = true)]
Push(PushArgs),
}
#[derive(Debug, Args)]
pub struct PushArgs {
/// The s3 bucket to upload to
#[arg(long, value_name = "bucket name")]
bucket: String,
/// Upstream cache to check against. Can be specified multiple times.
/// cache.nixos.org is always included.
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
upstreams: Vec<String>,
/// Path to the file containing signing key
/// e.g. ~/cache-priv-key.pem
#[arg(long)]
signing_key: String,
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
#[arg(long)]
region: Option<String>,
/// If unspecifed, will get it from AWS_ENDPOINT envar
/// e.g. https://s3.example.com
#[arg(long)]
endpoint: Option<String>,
#[arg(long)]
skip_signature_check: bool,
/// Path to upload
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
#[arg(value_name = "PATH")]
paths: Vec<PathBuf>,
}
#[tokio::main]
async fn main() -> Result<()> {

View file

@ -15,10 +15,10 @@ use crate::store::Store;
pub struct MakeNar<'a> {
path_info: &'a PathInfo,
store: Arc<Store>,
pub nar_hasher: Sha256,
nar_hasher: Sha256,
/// hash of compressed nar file
file_hasher: Sha256,
pub nar_size: u64,
nar_size: u64,
file_size: u64,
}

View file

@ -1,6 +1,6 @@
use std::collections::HashSet;
use anyhow::{Context, Result, anyhow};
use anyhow::{Context, Result};
use futures::future::join_all;
use nix_compat::nixbase32;
use nix_compat::store_path::StorePath;
@ -13,7 +13,7 @@ use url::Url;
use crate::store::Store;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Clone)]
pub struct PathInfo {
pub path: StorePath<String>,
pub signatures: Vec<String>,
@ -22,44 +22,33 @@ pub struct PathInfo {
}
impl PathInfo {
pub async fn from_derivation(drv: &Path, store: &Store) -> Result<Self> {
debug!("query path info for {:?}", drv);
pub async fn from_path(path: &Path, store: &Store) -> Result<Self> {
debug!("query path info for {:?}", path);
let derivation = match drv.extension() {
Some(ext) if ext == "drv" => drv.as_os_str().as_encoded_bytes(),
let derivation = match path.extension() {
Some(ext) if ext == "drv" => path.as_os_str().as_encoded_bytes(),
_ => {
&Command::new("nix")
.arg("path-info")
.arg("--derivation")
.arg(drv)
.arg(path)
.output()
.await
.context(format!("run command: nix path-info --derivaiton {drv:?}"))?
.context(format!("run command: nix path-info --derivaiton {path:?}"))?
.stdout
}
};
let derivation = String::from_utf8_lossy(derivation);
debug!("derivation: {derivation}");
if derivation.is_empty() {
return Err(anyhow!(
"nix path-info did not return a derivation for {drv:#?}"
));
}
Self::from_path(derivation.trim(), store).await
}
pub async fn from_path(path: &str, store: &Store) -> Result<Self> {
let store_path =
StorePath::from_absolute_path(path.as_bytes()).context("storepath from path")?;
let store_path = StorePath::from_absolute_path(derivation.trim().as_bytes())
.context("storepath from derivation")?;
store
.query_path_info(store_path)
.await
.context("query pathinfo for path")
.context("query pathinfo for derivation")
}
// TODO: skip call to query_path_info and return Vec<Path>?
pub async fn get_closure(&self, store: &Store) -> Result<Vec<Self>> {
let futs = store
.compute_fs_closure(self.path.clone())

View file

@ -1,5 +1,4 @@
use std::{
collections::HashSet,
fs,
iter::once,
path::PathBuf,
@ -11,7 +10,6 @@ use std::{
use anyhow::{Context, Result};
use futures::future::join_all;
use humansize::{DECIMAL, format_size};
use nix_compat::narinfo::{self, SigningKey};
use object_store::aws::{AmazonS3, AmazonS3Builder};
use tokio::sync::{RwLock, Semaphore, mpsc};
@ -22,7 +20,7 @@ use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
pub struct Push {
upstream_caches: Vec<Url>,
store_paths: Arc<RwLock<HashSet<PathInfo>>>,
store_paths: Arc<RwLock<Vec<PathInfo>>>,
signing_key: SigningKey<ed25519_dalek::SigningKey>,
store: Arc<Store>,
s3: Arc<AmazonS3>,
@ -62,7 +60,7 @@ impl Push {
Ok(Self {
upstream_caches: upstreams,
store_paths: Arc::new(RwLock::new(HashSet::new())),
store_paths: Arc::new(RwLock::new(Vec::new())),
signing_key,
store: Arc::new(store),
s3: Arc::new(s3_builder.build()?),
@ -80,7 +78,7 @@ impl Push {
let store = self.store.clone();
futs.push(tokio::spawn(async move {
let path_info = PathInfo::from_derivation(path.as_path(), &store)
let path_info = PathInfo::from_path(path.as_path(), &store)
.await
.context("get path info for path")?;
debug!("path-info for {path:?}: {path_info:?}");
@ -159,24 +157,34 @@ impl Push {
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
let mut uploads = Vec::new();
let permits = Arc::new(Semaphore::new(10));
let permits = Arc::new(Semaphore::new(16));
let big_permits = Arc::new(Semaphore::new(5));
loop {
let permits = permits.clone();
let big_permits = big_permits.clone();
if let Some(path_to_upload) = rx.recv().await {
debug!("upload permits available: {}", permits.available_permits());
let mut permit = permits.acquire_owned().await.unwrap();
uploads.push(tokio::spawn({
// large uploads will be concurrently uploaded with multipart anyway so don't spawn
// too much of them
let permit = if path_to_upload.nar_size > 15 * 1024 * 1024 {
Some(permits.acquire_owned().await.unwrap())
} else {
None
};
// a large directory may have many files and end up causing "too many open files"
if PathBuf::from(path_to_upload.absolute_path()).is_dir()
&& path_to_upload.nar_size > 5 * 1024 * 1024
{
debug!(
"upload big permits available: {}",
big_permits.available_permits()
);
// drop regular permit and take the big one
permit = big_permits.acquire_owned().await.unwrap();
}
println!(
"uploading: {} (size: {})",
path_to_upload.absolute_path(),
format_size(path_to_upload.nar_size, DECIMAL)
path_to_upload.nar_size
);
let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
let s3 = self.s3.clone();

View file

@ -1,32 +0,0 @@
#![allow(dead_code)]
use std::process::Command;
use std::sync::Arc;
use nixcp::store::Store;
pub const HELLO: &str = "github:nixos/nixpkgs?ref=f771eb401a46846c1aebd20552521b233dd7e18b#hello";
pub const HELLO_DRV: &str = "iqbwkm8mjjjlmw6x6ry9rhzin2cp9372-hello-2.12.1.drv";
pub const HELLO_PATH: &str = "/nix/store/9bwryidal9q3g91cjm6xschfn4ikd82q-hello-2.12.1";
pub struct Context {
pub store: Arc<Store>,
}
impl Context {
fn new() -> Self {
// hello must be in the store
Command::new("nix")
.arg("build")
.arg("--no-link")
.arg(HELLO)
.status()
.unwrap();
let store = Arc::new(Store::connect().expect("connect to nix store"));
Self { store }
}
}
pub fn context() -> Context {
Context::new()
}

View file

@ -1,26 +0,0 @@
use crate::common::HELLO_PATH;
use nix_compat::nixbase32;
use nixcp::make_nar::MakeNar;
use nixcp::path_info::PathInfo;
use sha2::Digest;
use tokio::io::AsyncReadExt;
mod common;
#[tokio::test]
async fn nar_size_and_hash() {
let ctx = common::context();
let path_info = PathInfo::from_path(HELLO_PATH, &ctx.store).await.unwrap();
let mut nar = MakeNar::new(&path_info, ctx.store).unwrap();
let mut reader = nar.compress_and_hash().unwrap();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
drop(reader);
assert_eq!(nar.nar_size, 234680);
let nar_hash = nar.nar_hasher.finalize();
let real_nar_hash = "08za7nnjda8kpdsd73v3mhykjvp0rsmskwsr37winhmzgm6iw79w";
assert_eq!(nixbase32::encode(nar_hash.as_slice()), real_nar_hash);
}

View file

@ -1,37 +0,0 @@
use nixcp::path_info::PathInfo;
use std::path::PathBuf;
use crate::common::{HELLO, HELLO_DRV, HELLO_PATH};
mod common;
#[tokio::test]
async fn path_info_from_package() {
let ctx = common::context();
let path = PathBuf::from(HELLO);
let path_info = PathInfo::from_derivation(&path, &ctx.store)
.await
.expect("get pathinfo from package");
assert_eq!(path_info.path.to_string(), HELLO_DRV);
}
#[tokio::test]
async fn path_info_from_path() {
let ctx = common::context();
let path = PathBuf::from(HELLO_PATH);
let path_info = PathInfo::from_derivation(&path, &ctx.store)
.await
.expect("get pathinfo from package");
assert_eq!(path_info.path.to_string(), HELLO_DRV);
}
#[tokio::test]
async fn closure() {
let ctx = common::context();
let path = PathBuf::from(HELLO);
let path_info = PathInfo::from_derivation(&path, &ctx.store)
.await
.expect("get pathinfo from package");
let closure = path_info.get_closure(&ctx.store).await.unwrap();
assert_eq!(closure.len(), 472);
}