Compare commits
No commits in common. "a0794b03564c6f045c25cc96c93922a43afea367" and "e5336d304d41d80cb9b07a4d9ffe3d9d480e6cf4" have entirely different histories.
a0794b0356
...
e5336d304d
12 changed files with 110 additions and 220 deletions
16
Cargo.lock
generated
16
Cargo.lock
generated
|
@ -968,15 +968,6 @@ version = "1.0.3"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
||||
|
||||
[[package]]
|
||||
name = "humansize"
|
||||
version = "2.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6cb51c9a029ddc91b07a787f1d86b53ccfa49b0e86688c946ebe8d3555685dd7"
|
||||
dependencies = [
|
||||
"libm",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.2.0"
|
||||
|
@ -1313,12 +1304,6 @@ version = "0.2.172"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa"
|
||||
|
||||
[[package]]
|
||||
name = "libm"
|
||||
version = "0.2.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72"
|
||||
|
||||
[[package]]
|
||||
name = "libmimalloc-sys"
|
||||
version = "0.1.42"
|
||||
|
@ -1506,7 +1491,6 @@ dependencies = [
|
|||
"cxx-build",
|
||||
"ed25519-dalek",
|
||||
"futures",
|
||||
"humansize",
|
||||
"nix-compat",
|
||||
"object_store",
|
||||
"pkg-config",
|
||||
|
|
|
@ -30,7 +30,6 @@ bytes = "1.10.1"
|
|||
object_store = { version = "0.12.0", features = ["aws"] }
|
||||
ulid = "1.2.1"
|
||||
tracing-subscriber = "0.3.19"
|
||||
humansize = "2.1.3"
|
||||
|
||||
[build-dependencies]
|
||||
cxx-build = "1.0"
|
||||
|
|
|
@ -59,6 +59,7 @@
|
|||
devShells.default = craneLib.devShell {
|
||||
inputsFrom = [ nixcp ];
|
||||
|
||||
RUST_LOG = "nixcp=debug";
|
||||
RUST_BACKGRACE = 1;
|
||||
# for cpp bindings to work
|
||||
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
|
||||
|
|
|
@ -150,7 +150,10 @@ impl Stream for AsyncWriteAdapter {
|
|||
match message {
|
||||
Data(v) => Poll::Ready(Some(Ok(v.into()))),
|
||||
Error(exception) => {
|
||||
let error = std::io::Error::other(format!("cxx error: {exception}"));
|
||||
let error = std::io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!("cxx error: {exception}"),
|
||||
);
|
||||
Poll::Ready(Some(Err(error)))
|
||||
}
|
||||
Eof => {
|
||||
|
|
65
src/lib.rs
65
src/lib.rs
|
@ -1,65 +0,0 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
|
||||
mod bindings;
|
||||
mod cli;
|
||||
pub mod make_nar;
|
||||
pub mod path_info;
|
||||
pub mod push;
|
||||
pub mod store;
|
||||
mod uploader;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version)]
|
||||
#[command(name = "nixcp")]
|
||||
#[command(about = "Upload store paths to a s3 binary cache")]
|
||||
#[command(long_about = None)]
|
||||
pub struct Cli {
|
||||
#[command(subcommand)]
|
||||
pub command: Commands,
|
||||
|
||||
/// Whether to enable tokio-console
|
||||
#[arg(long)]
|
||||
pub tokio_console: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
pub enum Commands {
|
||||
#[command(arg_required_else_help = true)]
|
||||
Push(PushArgs),
|
||||
}
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
pub struct PushArgs {
|
||||
/// The s3 bucket to upload to
|
||||
#[arg(long, value_name = "bucket name")]
|
||||
bucket: String,
|
||||
|
||||
/// Upstream cache to check against. Can be specified multiple times.
|
||||
/// cache.nixos.org is always included.
|
||||
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
|
||||
upstreams: Vec<String>,
|
||||
|
||||
/// Path to the file containing signing key
|
||||
/// e.g. ~/cache-priv-key.pem
|
||||
#[arg(long)]
|
||||
signing_key: String,
|
||||
|
||||
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
|
||||
#[arg(long)]
|
||||
region: Option<String>,
|
||||
|
||||
/// If unspecifed, will get it from AWS_ENDPOINT envar
|
||||
/// e.g. https://s3.example.com
|
||||
#[arg(long)]
|
||||
endpoint: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
skip_signature_check: bool,
|
||||
|
||||
/// Path to upload
|
||||
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||
#[arg(value_name = "PATH")]
|
||||
pub paths: Vec<PathBuf>,
|
||||
}
|
74
src/main.rs
74
src/main.rs
|
@ -1,10 +1,76 @@
|
|||
#![feature(let_chains)]
|
||||
#![feature(exit_status_error)]
|
||||
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use clap::Parser;
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
use tracing_subscriber::{EnvFilter, prelude::*};
|
||||
|
||||
use nixcp::push::Push;
|
||||
use nixcp::store::Store;
|
||||
use nixcp::{Cli, Commands};
|
||||
use push::Push;
|
||||
use store::Store;
|
||||
|
||||
mod bindings;
|
||||
mod cli;
|
||||
mod make_nar;
|
||||
mod path_info;
|
||||
mod push;
|
||||
mod store;
|
||||
mod uploader;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version)]
|
||||
#[command(name = "nixcp")]
|
||||
#[command(about = "Upload store paths to a s3 binary cache")]
|
||||
#[command(long_about = None)]
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Commands,
|
||||
|
||||
/// Whether to enable tokio-console
|
||||
#[arg(long)]
|
||||
tokio_console: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
enum Commands {
|
||||
#[command(arg_required_else_help = true)]
|
||||
Push(PushArgs),
|
||||
}
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
pub struct PushArgs {
|
||||
/// The s3 bucket to upload to
|
||||
#[arg(long, value_name = "bucket name")]
|
||||
bucket: String,
|
||||
|
||||
/// Upstream cache to check against. Can be specified multiple times.
|
||||
/// cache.nixos.org is always included.
|
||||
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
|
||||
upstreams: Vec<String>,
|
||||
|
||||
/// Path to the file containing signing key
|
||||
/// e.g. ~/cache-priv-key.pem
|
||||
#[arg(long)]
|
||||
signing_key: String,
|
||||
|
||||
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
|
||||
#[arg(long)]
|
||||
region: Option<String>,
|
||||
|
||||
/// If unspecifed, will get it from AWS_ENDPOINT envar
|
||||
/// e.g. https://s3.example.com
|
||||
#[arg(long)]
|
||||
endpoint: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
skip_signature_check: bool,
|
||||
|
||||
/// Path to upload
|
||||
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||
#[arg(value_name = "PATH")]
|
||||
paths: Vec<PathBuf>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
|
|
|
@ -15,10 +15,10 @@ use crate::store::Store;
|
|||
pub struct MakeNar<'a> {
|
||||
path_info: &'a PathInfo,
|
||||
store: Arc<Store>,
|
||||
pub nar_hasher: Sha256,
|
||||
nar_hasher: Sha256,
|
||||
/// hash of compressed nar file
|
||||
file_hasher: Sha256,
|
||||
pub nar_size: u64,
|
||||
nar_size: u64,
|
||||
file_size: u64,
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::collections::HashSet;
|
||||
|
||||
use anyhow::{Context, Result, anyhow};
|
||||
use anyhow::{Context, Result};
|
||||
use futures::future::join_all;
|
||||
use nix_compat::nixbase32;
|
||||
use nix_compat::store_path::StorePath;
|
||||
|
@ -13,7 +13,7 @@ use url::Url;
|
|||
|
||||
use crate::store::Store;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PathInfo {
|
||||
pub path: StorePath<String>,
|
||||
pub signatures: Vec<String>,
|
||||
|
@ -22,44 +22,33 @@ pub struct PathInfo {
|
|||
}
|
||||
|
||||
impl PathInfo {
|
||||
pub async fn from_derivation(drv: &Path, store: &Store) -> Result<Self> {
|
||||
debug!("query path info for {:?}", drv);
|
||||
pub async fn from_path(path: &Path, store: &Store) -> Result<Self> {
|
||||
debug!("query path info for {:?}", path);
|
||||
|
||||
let derivation = match drv.extension() {
|
||||
Some(ext) if ext == "drv" => drv.as_os_str().as_encoded_bytes(),
|
||||
let derivation = match path.extension() {
|
||||
Some(ext) if ext == "drv" => path.as_os_str().as_encoded_bytes(),
|
||||
_ => {
|
||||
&Command::new("nix")
|
||||
.arg("path-info")
|
||||
.arg("--derivation")
|
||||
.arg(drv)
|
||||
.arg(path)
|
||||
.output()
|
||||
.await
|
||||
.context(format!("run command: nix path-info --derivaiton {drv:?}"))?
|
||||
.context(format!("run command: nix path-info --derivaiton {path:?}"))?
|
||||
.stdout
|
||||
}
|
||||
};
|
||||
let derivation = String::from_utf8_lossy(derivation);
|
||||
debug!("derivation: {derivation}");
|
||||
|
||||
if derivation.is_empty() {
|
||||
return Err(anyhow!(
|
||||
"nix path-info did not return a derivation for {drv:#?}"
|
||||
));
|
||||
}
|
||||
|
||||
Self::from_path(derivation.trim(), store).await
|
||||
}
|
||||
|
||||
pub async fn from_path(path: &str, store: &Store) -> Result<Self> {
|
||||
let store_path =
|
||||
StorePath::from_absolute_path(path.as_bytes()).context("storepath from path")?;
|
||||
let store_path = StorePath::from_absolute_path(derivation.trim().as_bytes())
|
||||
.context("storepath from derivation")?;
|
||||
store
|
||||
.query_path_info(store_path)
|
||||
.await
|
||||
.context("query pathinfo for path")
|
||||
.context("query pathinfo for derivation")
|
||||
}
|
||||
|
||||
// TODO: skip call to query_path_info and return Vec<Path>?
|
||||
pub async fn get_closure(&self, store: &Store) -> Result<Vec<Self>> {
|
||||
let futs = store
|
||||
.compute_fs_closure(self.path.clone())
|
||||
|
|
36
src/push.rs
36
src/push.rs
|
@ -1,5 +1,4 @@
|
|||
use std::{
|
||||
collections::HashSet,
|
||||
fs,
|
||||
iter::once,
|
||||
path::PathBuf,
|
||||
|
@ -11,7 +10,6 @@ use std::{
|
|||
|
||||
use anyhow::{Context, Result};
|
||||
use futures::future::join_all;
|
||||
use humansize::{DECIMAL, format_size};
|
||||
use nix_compat::narinfo::{self, SigningKey};
|
||||
use object_store::aws::{AmazonS3, AmazonS3Builder};
|
||||
use tokio::sync::{RwLock, Semaphore, mpsc};
|
||||
|
@ -22,7 +20,7 @@ use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
|
|||
|
||||
pub struct Push {
|
||||
upstream_caches: Vec<Url>,
|
||||
store_paths: Arc<RwLock<HashSet<PathInfo>>>,
|
||||
store_paths: Arc<RwLock<Vec<PathInfo>>>,
|
||||
signing_key: SigningKey<ed25519_dalek::SigningKey>,
|
||||
store: Arc<Store>,
|
||||
s3: Arc<AmazonS3>,
|
||||
|
@ -62,7 +60,7 @@ impl Push {
|
|||
|
||||
Ok(Self {
|
||||
upstream_caches: upstreams,
|
||||
store_paths: Arc::new(RwLock::new(HashSet::new())),
|
||||
store_paths: Arc::new(RwLock::new(Vec::new())),
|
||||
signing_key,
|
||||
store: Arc::new(store),
|
||||
s3: Arc::new(s3_builder.build()?),
|
||||
|
@ -80,7 +78,7 @@ impl Push {
|
|||
let store = self.store.clone();
|
||||
|
||||
futs.push(tokio::spawn(async move {
|
||||
let path_info = PathInfo::from_derivation(path.as_path(), &store)
|
||||
let path_info = PathInfo::from_path(path.as_path(), &store)
|
||||
.await
|
||||
.context("get path info for path")?;
|
||||
debug!("path-info for {path:?}: {path_info:?}");
|
||||
|
@ -159,24 +157,34 @@ impl Push {
|
|||
|
||||
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||
let mut uploads = Vec::new();
|
||||
let permits = Arc::new(Semaphore::new(10));
|
||||
let permits = Arc::new(Semaphore::new(16));
|
||||
let big_permits = Arc::new(Semaphore::new(5));
|
||||
|
||||
loop {
|
||||
let permits = permits.clone();
|
||||
let big_permits = big_permits.clone();
|
||||
|
||||
if let Some(path_to_upload) = rx.recv().await {
|
||||
debug!("upload permits available: {}", permits.available_permits());
|
||||
let mut permit = permits.acquire_owned().await.unwrap();
|
||||
|
||||
uploads.push(tokio::spawn({
|
||||
// large uploads will be concurrently uploaded with multipart anyway so don't spawn
|
||||
// too much of them
|
||||
let permit = if path_to_upload.nar_size > 15 * 1024 * 1024 {
|
||||
Some(permits.acquire_owned().await.unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// a large directory may have many files and end up causing "too many open files"
|
||||
if PathBuf::from(path_to_upload.absolute_path()).is_dir()
|
||||
&& path_to_upload.nar_size > 5 * 1024 * 1024
|
||||
{
|
||||
debug!(
|
||||
"upload big permits available: {}",
|
||||
big_permits.available_permits()
|
||||
);
|
||||
// drop regular permit and take the big one
|
||||
permit = big_permits.acquire_owned().await.unwrap();
|
||||
}
|
||||
|
||||
println!(
|
||||
"uploading: {} (size: {})",
|
||||
path_to_upload.absolute_path(),
|
||||
format_size(path_to_upload.nar_size, DECIMAL)
|
||||
path_to_upload.nar_size
|
||||
);
|
||||
let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
|
||||
let s3 = self.s3.clone();
|
||||
|
|
|
@ -1,32 +0,0 @@
|
|||
#![allow(dead_code)]
|
||||
|
||||
use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
|
||||
use nixcp::store::Store;
|
||||
|
||||
pub const HELLO: &str = "github:nixos/nixpkgs?ref=f771eb401a46846c1aebd20552521b233dd7e18b#hello";
|
||||
pub const HELLO_DRV: &str = "iqbwkm8mjjjlmw6x6ry9rhzin2cp9372-hello-2.12.1.drv";
|
||||
pub const HELLO_PATH: &str = "/nix/store/9bwryidal9q3g91cjm6xschfn4ikd82q-hello-2.12.1";
|
||||
|
||||
pub struct Context {
|
||||
pub store: Arc<Store>,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
fn new() -> Self {
|
||||
// hello must be in the store
|
||||
Command::new("nix")
|
||||
.arg("build")
|
||||
.arg("--no-link")
|
||||
.arg(HELLO)
|
||||
.status()
|
||||
.unwrap();
|
||||
let store = Arc::new(Store::connect().expect("connect to nix store"));
|
||||
Self { store }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn context() -> Context {
|
||||
Context::new()
|
||||
}
|
26
tests/nar.rs
26
tests/nar.rs
|
@ -1,26 +0,0 @@
|
|||
use crate::common::HELLO_PATH;
|
||||
use nix_compat::nixbase32;
|
||||
use nixcp::make_nar::MakeNar;
|
||||
use nixcp::path_info::PathInfo;
|
||||
use sha2::Digest;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
mod common;
|
||||
|
||||
#[tokio::test]
|
||||
async fn nar_size_and_hash() {
|
||||
let ctx = common::context();
|
||||
let path_info = PathInfo::from_path(HELLO_PATH, &ctx.store).await.unwrap();
|
||||
|
||||
let mut nar = MakeNar::new(&path_info, ctx.store).unwrap();
|
||||
let mut reader = nar.compress_and_hash().unwrap();
|
||||
let mut buf = Vec::new();
|
||||
reader.read_to_end(&mut buf).await.unwrap();
|
||||
drop(reader);
|
||||
|
||||
assert_eq!(nar.nar_size, 234680);
|
||||
|
||||
let nar_hash = nar.nar_hasher.finalize();
|
||||
let real_nar_hash = "08za7nnjda8kpdsd73v3mhykjvp0rsmskwsr37winhmzgm6iw79w";
|
||||
assert_eq!(nixbase32::encode(nar_hash.as_slice()), real_nar_hash);
|
||||
}
|
|
@ -1,37 +0,0 @@
|
|||
use nixcp::path_info::PathInfo;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use crate::common::{HELLO, HELLO_DRV, HELLO_PATH};
|
||||
|
||||
mod common;
|
||||
|
||||
#[tokio::test]
|
||||
async fn path_info_from_package() {
|
||||
let ctx = common::context();
|
||||
let path = PathBuf::from(HELLO);
|
||||
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
assert_eq!(path_info.path.to_string(), HELLO_DRV);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn path_info_from_path() {
|
||||
let ctx = common::context();
|
||||
let path = PathBuf::from(HELLO_PATH);
|
||||
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
assert_eq!(path_info.path.to_string(), HELLO_DRV);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn closure() {
|
||||
let ctx = common::context();
|
||||
let path = PathBuf::from(HELLO);
|
||||
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
let closure = path_info.get_closure(&ctx.store).await.unwrap();
|
||||
assert_eq!(closure.len(), 472);
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue