Compare commits

...

32 commits

Author SHA1 Message Date
cy
3c40776981 update readme 2025-05-04 03:06:55 -04:00
cy
9b0c6aece6 add .editorconfig 2025-05-04 02:37:54 -04:00
cy
14d6e9d29e path_info: check for and resolve symlink 2025-05-04 02:36:59 -04:00
cy
0e97d11745 skip integration tests in nix package
These tests need a connection to the nix store and we can't have that in
the nix build environment.
2025-05-04 01:49:35 -04:00
cy
a0794b0356 fix closure test check; rm RUST_LOG envar in devshell 2025-05-04 01:17:12 -04:00
cy
05589641cf move "nix build" to tests/common 2025-05-04 01:13:02 -04:00
cy
76cbc85032 use hashset for closure 2025-05-04 01:13:02 -04:00
cy
09181ae785 add nar tests 2025-05-04 01:13:02 -04:00
cy
54d4c714af rename from_path to from_derivation 2025-04-28 21:29:52 -04:00
cy
878e096494 add path_info tests 2025-04-28 13:35:49 -04:00
cy
7285c29e88 use io::Error::other 2025-04-28 13:30:48 -04:00
cy
9d2e9e38bd
split lib.rs and main.rs 2025-04-28 03:01:24 -04:00
cy
01443d0d99
use 10 permits but skip nars bigger than 15mb 2025-04-28 02:46:11 -04:00
cy
b49be95d09
simplify here since the problem was somewhere else 2025-04-27 02:33:19 -04:00
cy
e5336d304d
improve concurrency control; use nar_size from cpathinfo 2025-04-27 01:23:45 -04:00
cy
ca97aebd7a
limit directories even more 2025-04-26 23:07:56 -04:00
cy
85fefe9e77
use cpp bindings to make nar 2025-04-26 21:08:01 -04:00
cy
0fedae9334
try to fix fd issues 2025-04-26 18:21:49 -04:00
cy
0fae7ac3b0
fix build with tokio unstable 2025-04-26 18:21:27 -04:00
cy
7dec14fc1a
set path to nar in narinfo 2025-04-26 15:46:23 -04:00
cy
d524222a86
make tokio console optional and make it actually work 2025-04-26 15:37:54 -04:00
cy
5a3e6089b4
don't clean cpp files 2025-04-26 14:40:38 -04:00
cy
fc304df35e
refactor flake 2025-04-26 14:21:18 -04:00
cy
b8877f33a3
enable lto 2025-04-26 14:10:20 -04:00
cy
846c465ea0
fix flake 2025-04-26 13:11:54 -04:00
cy
81ce855dae
refactor and bunch more improvements; use object_store for s3 2025-04-26 12:37:07 -04:00
cy
b1e59d0a6c
use nix path-info cmd for derivation; console_subscriber 2025-04-18 00:50:11 -04:00
cy
6806b96892
limit uploads with semaphore 2025-04-16 15:49:01 -04:00
cy
84bbe5dcb4
fix build 2025-04-16 12:42:44 -04:00
cy
a771785352
use libstore cxx bindings 2025-04-16 03:47:42 -04:00
cy
8ac9253ea3
change stuff to our way 2025-04-16 03:46:56 -04:00
cy
c956d6741a
bring code from attic 2025-04-15 23:50:43 -04:00
20 changed files with 1936 additions and 1511 deletions

2
.cargo/config.toml Normal file
View file

@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]

3
.editorconfig Normal file
View file

@ -0,0 +1,3 @@
[*.nix]
indent_size = 2
indent_stype = space

1834
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -3,21 +3,37 @@ name = "nixcp"
version = "0.1.0"
edition = "2024"
[profile.release]
lto = true
codegen-units = 1
[dependencies]
anyhow = "1.0.97"
async-compression = { version = "0.4.22", features = ["tokio", "zstd"] }
aws-config = { version = "1.6.1", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.82.0"
clap = { version = "4.5.34", features = ["derive"] }
ed25519-dalek = "2.1.1"
futures = "0.3.31"
nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" }
regex = "1.11.1"
reqwest = "0.12.15"
serde = { version = "1.0.219", features = [ "derive" ]}
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
sha2 = "0.10.8"
tokio = { version = "1.44.1", features = [ "full" ]}
tokio = { version = "1.44.1", features = ["full", "tracing", "parking_lot"] }
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"]}
url = { version = "2.5.4", features = [ "serde" ]}
url = { version = "2.5.4", features = ["serde"] }
cxx = "1.0"
console-subscriber = "0.4.1"
tokio-util = { version = "0.7.15", features = ["io"] }
bytes = "1.10.1"
object_store = { version = "0.12.0", features = ["aws"] }
ulid = "1.2.1"
tracing-subscriber = "0.3.19"
humansize = "2.1.3"
[build-dependencies]
cxx-build = "1.0"
pkg-config = "0.3.32"
[dev-dependencies]
tempfile = "3.19.1"

View file

@ -11,14 +11,13 @@ The signing key is generated with:
nix-store --generate-binary-cache-key nixcache.cy7.sh cache-priv-key.pem cache-pub-key.pem
```
`AWS_ACCESS_KEY_ID` and `AWS_ENDPOINT_URL` environment variables should be set with your s3 credentials.
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables should be set with your s3 credentials.
```
Usage: nixcp [OPTIONS] --bucket <bucket name> --signing-key <SIGNING_KEY> <COMMAND>
Usage: nixcp push [OPTIONS] --bucket <bucket name> --signing-key <SIGNING_KEY> [PATH]...
Commands:
push
help Print this message or the help of the given subcommand(s)
Arguments:
[PATH]... Path to upload e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
Options:
--bucket <bucket name>
@ -28,15 +27,13 @@ Options:
--signing-key <SIGNING_KEY>
Path to the file containing signing key e.g. ~/cache-priv-key.pem
--region <REGION>
If unspecified, will get it form AWS_DEFAULT_REGION envar or the AWS default
If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
--endpoint <ENDPOINT>
If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default e.g. https://s3.example.com
--profile <PROFILE>
AWS profile to use
If unspecifed, will get it from AWS_ENDPOINT envar e.g. https://s3.example.com
--skip-signature-check
-h, --help
Print help
-V, --version
Print version
```
## Install with nix

21
build.rs Normal file
View file

@ -0,0 +1,21 @@
fn main() {
cxx_build::bridge("src/bindings/mod.rs")
.file("src/bindings/nix.cpp")
.flag("-std=c++2a")
.flag("-O2")
.flag("-include")
.flag("nix/config.h")
.flag("-I")
.flag(concat!(env!("NIX_INCLUDE_PATH"), "/nix"))
.compile("nixbinding");
println!("cargo:rerun-if-changed=src/bindings");
pkg_config::Config::new()
.atleast_version("2.4")
.probe("nix-store")
.unwrap();
pkg_config::Config::new()
.atleast_version("2.4")
.probe("nix-main")
.unwrap();
}

View file

@ -22,28 +22,56 @@
};
toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;
craneLib = (crane.mkLib pkgs).overrideToolchain(_: toolchain);
in
{
devShells.default = pkgs.mkShell {
nativeBuildInputs = with pkgs; [
pkg-config
];
buildInputs = with pkgs; [
openssl
toolchain
];
lib = pkgs.lib;
# don't clean cpp files
cppFilter = path: _type: builtins.match ".*(cpp|hpp)$" path != null;
cppOrCargo = path: type:
(cppFilter path type) || (craneLib.filterCargoSources path type);
src = lib.cleanSourceWith {
src = ./.;
filter = cppOrCargo;
name = "source";
};
packages.default = craneLib.buildPackage {
src = craneLib.cleanCargoSource ./.;
commonArgs = {
inherit src;
strictDeps = true;
nativeBuildInputs = with pkgs; [
pkg-config
];
buildInputs = with pkgs; [
toolchain
openssl
nix
boost
];
# for cpp bindings to work
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
# skip integration tests (they need a connection to the nix store)
cargoTestExtraArgs = "--bins";
};
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
nixcp = craneLib.buildPackage (commonArgs // {
inherit cargoArtifacts;
});
in
{
devShells.default = craneLib.devShell {
inputsFrom = [ nixcp ];
RUST_BACKGRACE = 1;
# for cpp bindings to work
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
packages = with pkgs; [
tokio-console
cargo-udeps
];
};
packages.default = nixcp;
}
);
}

237
src/bindings/mod.rs Normal file
View file

@ -0,0 +1,237 @@
/*
Copyright 2022 Zhaofeng Li and the Attic contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
//! `libnixstore` Bindings
#![allow(dead_code)]
use std::cell::UnsafeCell;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::Result;
use bytes::Bytes;
use futures::stream::{Stream, StreamExt};
use tokio::io::{AsyncWrite, AsyncWriteExt};
// The C++ implementation takes care of concurrency
#[repr(transparent)]
pub struct FfiNixStore(UnsafeCell<cxx::UniquePtr<ffi::CNixStore>>);
unsafe impl Send for FfiNixStore {}
unsafe impl Sync for FfiNixStore {}
impl FfiNixStore {
pub fn store(&self) -> Pin<&mut ffi::CNixStore> {
unsafe {
let ptr = self.0.get().as_mut().unwrap();
ptr.pin_mut()
}
}
}
/// Obtain a handle to the Nix store.
pub unsafe fn open_nix_store() -> Result<FfiNixStore> {
match ffi::open_nix_store() {
Ok(ptr) => {
let cell = UnsafeCell::new(ptr);
Ok(FfiNixStore(cell))
}
Err(e) => Err(e.into()),
}
}
// TODO: Benchmark different implementations
// (tokio, crossbeam, flume)
mod mpsc {
// Tokio
pub use tokio::sync::mpsc::{
UnboundedReceiver, UnboundedSender, error::SendError, unbounded_channel,
};
}
/// Async write request.
#[derive(Debug)]
enum AsyncWriteMessage {
Data(Vec<u8>),
Error(String),
Eof,
}
/// Async write request sender.
#[derive(Clone)]
pub struct AsyncWriteSender {
sender: mpsc::UnboundedSender<AsyncWriteMessage>,
}
impl AsyncWriteSender {
fn send(&mut self, data: &[u8]) -> Result<(), mpsc::SendError<AsyncWriteMessage>> {
let message = AsyncWriteMessage::Data(Vec::from(data));
self.sender.send(message)
}
fn eof(&mut self) -> Result<(), mpsc::SendError<AsyncWriteMessage>> {
let message = AsyncWriteMessage::Eof;
self.sender.send(message)
}
pub(crate) fn rust_error(
&mut self,
error: impl std::error::Error,
) -> Result<(), impl std::error::Error> {
let message = AsyncWriteMessage::Error(error.to_string());
self.sender.send(message)
}
}
/// A wrapper of the `AsyncWrite` trait for the synchronous Nix C++ land.
pub struct AsyncWriteAdapter {
receiver: mpsc::UnboundedReceiver<AsyncWriteMessage>,
eof: bool,
}
impl AsyncWriteAdapter {
pub fn new() -> (Self, Box<AsyncWriteSender>) {
let (sender, receiver) = mpsc::unbounded_channel();
let r = Self {
receiver,
eof: false,
};
let sender = Box::new(AsyncWriteSender { sender });
(r, sender)
}
/// Write everything the sender sends to us.
pub async fn write_all(mut self, mut writer: Box<dyn AsyncWrite + Unpin>) -> Result<()> {
let writer = writer.as_mut();
while let Some(data) = self.next().await {
match data {
Ok(v) => {
writer.write_all(&v).await?;
}
Err(e) => {
return Err(e.into());
}
}
}
if !self.eof {
Err(io::Error::from(io::ErrorKind::BrokenPipe).into())
} else {
Ok(())
}
}
}
impl Stream for AsyncWriteAdapter {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.receiver.poll_recv(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(message)) => {
use AsyncWriteMessage::*;
match message {
Data(v) => Poll::Ready(Some(Ok(v.into()))),
Error(exception) => {
let error = std::io::Error::other(format!("cxx error: {exception}"));
Poll::Ready(Some(Err(error)))
}
Eof => {
self.eof = true;
Poll::Ready(None)
}
}
}
Poll::Ready(None) => {
if !self.eof {
Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe))))
} else {
Poll::Ready(None)
}
}
}
}
}
#[cxx::bridge]
/// Generated by `cxx.rs`.
///
/// Mid-level wrapper of `libnixstore` implemented in C++.
mod ffi {
extern "Rust" {
type AsyncWriteSender;
fn send(self: &mut AsyncWriteSender, data: &[u8]) -> Result<()>;
fn eof(self: &mut AsyncWriteSender) -> Result<()>;
}
unsafe extern "C++" {
include!("nixcp/src/bindings/nix.hpp");
// =========
// CNixStore
// =========
/// Mid-level wrapper for the Unix Domain Socket Nix Store.
type CNixStore;
/// Queries information about a valid path.
fn query_path_info(
self: Pin<&mut CNixStore>,
store_path: &[u8],
) -> Result<UniquePtr<CPathInfo>>;
/// Computes the closure of a valid path.
///
/// If `flip_directions` is true, the set of paths that can reach `store_path` is
/// returned.
fn compute_fs_closure(
self: Pin<&mut CNixStore>,
store_path: &[u8],
flip_direction: bool,
include_outputs: bool,
include_derivers: bool,
) -> Result<UniquePtr<CxxVector<CxxString>>>;
/// Creates a NAR dump from a path.
fn nar_from_path(
self: Pin<&mut CNixStore>,
base_name: Vec<u8>,
sender: Box<AsyncWriteSender>,
) -> Result<()>;
/// Obtains a handle to the Nix store.
fn open_nix_store() -> Result<UniquePtr<CNixStore>>;
// =========
// CPathInfo
// =========
/// Mid-level wrapper for the `nix::ValidPathInfo` struct.
type CPathInfo;
/// Returns the size of the NAR.
fn nar_size(self: Pin<&mut CPathInfo>) -> u64;
/// Returns the references of the store path.
fn references(self: Pin<&mut CPathInfo>) -> UniquePtr<CxxVector<CxxString>>;
/// Returns the possibly invalid signatures attached to the store path.
fn sigs(self: Pin<&mut CPathInfo>) -> UniquePtr<CxxVector<CxxString>>;
}
}

128
src/bindings/nix.cpp Normal file
View file

@ -0,0 +1,128 @@
/*
Copyright 2022 Zhaofeng Li and the Attic contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// C++ side of the libnixstore glue.
//
// We implement a mid-level wrapper of the Nix Store interface,
// which is then wrapped again in the Rust side to enable full
// async-await operation.
//
// Here we stick with the naming conventions of Rust and handle
// Rust types directly where possible, so that the interfaces are
// satisfying to use from the Rust side via cxx.rs.
#include "nixcp/src/bindings/nix.hpp"
static std::mutex g_init_nix_mutex;
static bool g_init_nix_done = false;
static nix::StorePath store_path_from_rust(RBasePathSlice base_name) {
std::string_view sv((const char *)base_name.data(), base_name.size());
return nix::StorePath(sv);
}
// ========
// RustSink
// ========
RustSink::RustSink(RBox<AsyncWriteSender> sender) : sender(std::move(sender)) {}
void RustSink::operator () (std::string_view data) {
RBasePathSlice s((const unsigned char *)data.data(), data.size());
this->sender->send(s);
}
void RustSink::eof() {
this->sender->eof();
}
// =========
// CPathInfo
// =========
CPathInfo::CPathInfo(nix::ref<const nix::ValidPathInfo> pi) : pi(pi) {}
uint64_t CPathInfo::nar_size() {
return this->pi->narSize;
}
std::unique_ptr<std::vector<std::string>> CPathInfo::sigs() {
std::vector<std::string> result;
for (auto&& elem : this->pi->sigs) {
result.push_back(std::string(elem));
}
return std::make_unique<std::vector<std::string>>(result);
}
std::unique_ptr<std::vector<std::string>> CPathInfo::references() {
std::vector<std::string> result;
for (auto&& elem : this->pi->references) {
result.push_back(std::string(elem.to_string()));
}
return std::make_unique<std::vector<std::string>>(result);
}
// =========
// CNixStore
// =========
CNixStore::CNixStore() {
std::map<std::string, std::string> params;
std::lock_guard<std::mutex> lock(g_init_nix_mutex);
if (!g_init_nix_done) {
nix::initNix();
g_init_nix_done = true;
}
this->store = nix::openStore(nix::settings.storeUri.get(), params);
}
std::unique_ptr<CPathInfo> CNixStore::query_path_info(RBasePathSlice base_name) {
auto store_path = store_path_from_rust(base_name);
auto r = this->store->queryPathInfo(store_path);
return std::make_unique<CPathInfo>(r);
}
std::unique_ptr<std::vector<std::string>> CNixStore::compute_fs_closure(RBasePathSlice base_name, bool flip_direction, bool include_outputs, bool include_derivers) {
std::set<nix::StorePath> out;
this->store->computeFSClosure(store_path_from_rust(base_name), out, flip_direction, include_outputs, include_derivers);
std::vector<std::string> result;
for (auto&& elem : out) {
result.push_back(std::string(elem.to_string()));
}
return std::make_unique<std::vector<std::string>>(result);
}
void CNixStore::nar_from_path(RVec<unsigned char> base_name, RBox<AsyncWriteSender> sender) {
RustSink sink(std::move(sender));
std::string_view sv((const char *)base_name.data(), base_name.size());
nix::StorePath store_path(sv);
// exceptions will be thrown into Rust
this->store->narFromPath(store_path, sink);
sink.eof();
}
std::unique_ptr<CNixStore> open_nix_store() {
return std::make_unique<CNixStore>();
}

89
src/bindings/nix.hpp Normal file
View file

@ -0,0 +1,89 @@
/*
Copyright 2022 Zhaofeng Li and the Attic contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// C++ side of the libnixstore glue.
//
// We implement a mid-level wrapper of the Nix Store interface,
// which is then wrapped again in the Rust side to enable full
// async-await operation.
//
// Here we stick with the naming conventions of Rust and handle
// Rust types directly where possible, so that the interfaces are
// satisfying to use from the Rust side via cxx.rs.
#pragma once
#include <iostream>
#include <memory>
#include <mutex>
#include <set>
#include <nix/store-api.hh>
#include <nix/local-store.hh>
#include <nix/remote-store.hh>
#include <nix/uds-remote-store.hh>
#include <nix/hash.hh>
#include <nix/path.hh>
#include <nix/serialise.hh>
#include <nix/shared.hh>
#include <rust/cxx.h>
template<class T> using RVec = rust::Vec<T>;
template<class T> using RBox = rust::Box<T>;
template<class T> using RSlice = rust::Slice<T>;
using RString = rust::String;
using RStr = rust::Str;
using RBasePathSlice = RSlice<const unsigned char>;
using RHashSlice = RSlice<const unsigned char>;
struct AsyncWriteSender;
struct RustSink : nix::Sink
{
RBox<AsyncWriteSender> sender;
public:
RustSink(RBox<AsyncWriteSender> sender);
void operator () (std::string_view data) override;
void eof();
};
// Opaque wrapper for nix::ValidPathInfo
class CPathInfo {
nix::ref<const nix::ValidPathInfo> pi;
public:
CPathInfo(nix::ref<const nix::ValidPathInfo> pi);
std::unique_ptr<std::vector<std::string>> sigs();
std::unique_ptr<std::vector<std::string>> references();
uint64_t nar_size();
};
class CNixStore {
std::shared_ptr<nix::Store> store;
public:
CNixStore();
RString store_dir();
std::unique_ptr<CPathInfo> query_path_info(RBasePathSlice base_name);
std::unique_ptr<std::vector<std::string>> compute_fs_closure(
RBasePathSlice base_name,
bool flip_direction,
bool include_outputs,
bool include_derivers);
void nar_from_path(RVec<unsigned char> base_name, RBox<AsyncWriteSender> sender);
};
std::unique_ptr<CNixStore> open_nix_store();
// Relies on our definitions
#include "nixcp/src/bindings/mod.rs.h"

65
src/lib.rs Normal file
View file

@ -0,0 +1,65 @@
use std::path::PathBuf;
use clap::{Args, Parser, Subcommand};
mod bindings;
mod cli;
pub mod make_nar;
pub mod path_info;
pub mod push;
pub mod store;
mod uploader;
#[derive(Parser, Debug)]
#[command(version)]
#[command(name = "nixcp")]
#[command(about = "Upload store paths to a s3 binary cache")]
#[command(long_about = None)]
pub struct Cli {
#[command(subcommand)]
pub command: Commands,
/// Whether to enable tokio-console
#[arg(long)]
pub tokio_console: bool,
}
#[derive(Debug, Subcommand)]
pub enum Commands {
#[command(arg_required_else_help = true)]
Push(PushArgs),
}
#[derive(Debug, Args)]
pub struct PushArgs {
/// The s3 bucket to upload to
#[arg(long, value_name = "bucket name")]
bucket: String,
/// Upstream cache to check against. Can be specified multiple times.
/// cache.nixos.org is always included.
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
upstreams: Vec<String>,
/// Path to the file containing signing key
/// e.g. ~/cache-priv-key.pem
#[arg(long)]
signing_key: String,
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
#[arg(long)]
region: Option<String>,
/// If unspecifed, will get it from AWS_ENDPOINT envar
/// e.g. https://s3.example.com
#[arg(long)]
endpoint: Option<String>,
#[arg(long)]
skip_signature_check: bool,
/// Path to upload
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
#[arg(value_name = "PATH")]
pub paths: Vec<PathBuf>,
}

View file

@ -1,88 +1,46 @@
#![feature(let_chains)]
#![feature(extend_one)]
use anyhow::{Context, Result};
use clap::{Args, Parser, Subcommand};
use tracing_subscriber::{EnvFilter, FmtSubscriber};
use clap::Parser;
use tracing_subscriber::{EnvFilter, prelude::*};
use push::Push;
mod cli;
mod path_info;
mod push;
mod uploader;
#[derive(Parser, Debug)]
#[command(version)]
#[command(name = "nixcp")]
#[command(about = "Upload store paths to a s3 binary cache")]
#[command(long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Debug, Subcommand)]
enum Commands {
#[command(arg_required_else_help = true)]
Push(PushArgs),
}
#[derive(Debug, Args)]
pub struct PushArgs {
/// The s3 bucket to upload to
#[arg(long, value_name = "bucket name")]
bucket: String,
/// Upstream cache to check against. Can be specified multiple times.
/// cache.nixos.org is always included.
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
upstreams: Vec<String>,
/// Path to the file containing signing key
/// e.g. ~/cache-priv-key.pem
#[arg(long)]
signing_key: String,
/// If unspecified, will get it form AWS_DEFAULT_REGION envar
#[arg(long)]
region: Option<String>,
/// If unspecifed, will get it from AWS_ENDPOINT_URL envar
/// e.g. https://s3.example.com
#[arg(long)]
endpoint: Option<String>,
/// AWS profile to use
#[arg(long)]
profile: Option<String>,
#[arg(long)]
skip_signature_check: bool,
/// Package or store path to upload
/// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
#[arg(value_name = "package or store path")]
package: String,
}
use nixcp::push::Push;
use nixcp::store::Store;
use nixcp::{Cli, Commands};
#[tokio::main]
async fn main() -> Result<()> {
let filter = EnvFilter::from_default_env();
let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish();
tracing::subscriber::set_global_default(subscriber)?;
let cli = Cli::parse();
init_logging(cli.tokio_console);
match &cli.command {
Commands::Push(cli) => {
let push = Box::leak(Box::new(Push::new(cli).await?));
push.paths_from_package(&cli.package)
let store = Store::connect()?;
let push = Box::leak(Box::new(Push::new(cli, store).await?));
push.add_paths(cli.paths.clone())
.await
.context("nixcp get paths from package")?;
.context("add paths to push")?;
push.run().await.context("nixcp run")?;
}
}
Ok(())
}
fn init_logging(tokio_console: bool) {
let env_filter = EnvFilter::from_default_env();
let fmt_layer = tracing_subscriber::fmt::layer().with_filter(env_filter);
let console_layer = if tokio_console {
Some(console_subscriber::spawn())
} else {
None
};
tracing_subscriber::registry()
.with(fmt_layer)
.with(console_layer)
.init();
if tokio_console {
println!("tokio-console is enabled");
}
}

81
src/make_nar.rs Normal file
View file

@ -0,0 +1,81 @@
use anyhow::Result;
use async_compression::{Level, tokio::bufread::ZstdEncoder};
use nix_compat::{
narinfo::{self, NarInfo},
store_path::StorePath,
};
use sha2::{Digest, Sha256};
use std::{mem::take, sync::Arc};
use tokio::io::{AsyncRead, BufReader};
use tokio_util::io::InspectReader;
use crate::path_info::PathInfo;
use crate::store::Store;
pub struct MakeNar<'a> {
path_info: &'a PathInfo,
store: Arc<Store>,
pub nar_hasher: Sha256,
/// hash of compressed nar file
file_hasher: Sha256,
pub nar_size: u64,
file_size: u64,
}
impl<'a> MakeNar<'a> {
pub fn new(path_info: &'a PathInfo, store: Arc<Store>) -> Result<Self> {
Ok(Self {
path_info,
store,
nar_hasher: Sha256::new(),
file_hasher: Sha256::new(),
nar_size: 0,
file_size: 0,
})
}
/// Returns a compressed nar reader which can be uploaded. File hash will be available when
/// everything is read
pub fn compress_and_hash(&mut self) -> Result<impl AsyncRead> {
let nar_reader = self.store.nar_from_path(self.path_info.path.clone());
// reader that hashes as nar is read
let nar_reader = InspectReader::new(nar_reader, |x| {
self.nar_size += x.len() as u64;
self.nar_hasher.update(x);
});
let encoder = ZstdEncoder::with_quality(BufReader::new(nar_reader), Level::Default);
// reader that updates file_hash as the compressed nar is read
Ok(InspectReader::new(encoder, |x| {
self.file_size += x.len() as u64;
self.file_hasher.update(x);
}))
}
/// Returns *unsigned* narinfo. `url` must be updated before uploading
pub fn get_narinfo(&mut self) -> Result<NarInfo> {
let file_hash = take(&mut self.file_hasher).finalize().into();
let nar_hash = take(&mut self.nar_hasher).finalize().into();
Ok(NarInfo {
flags: narinfo::Flags::empty(),
store_path: self.path_info.path.as_ref(),
nar_hash,
nar_size: self.nar_size,
references: self
.path_info
.references
.iter()
.map(StorePath::as_ref)
.collect(),
signatures: Vec::new(),
ca: None,
system: None,
deriver: None,
compression: Some("zstd"),
file_hash: Some(file_hash),
file_size: Some(self.file_size),
url: "",
})
}
}

View file

@ -1,85 +1,80 @@
use std::collections::HashSet;
use anyhow::{Context, Error, Result};
use aws_sdk_s3 as s3;
use anyhow::{Context, Result, anyhow};
use futures::future::join_all;
use nix_compat::nixbase32;
use nix_compat::store_path::StorePath;
use object_store::{ObjectStore, aws::AmazonS3, path::Path as ObjectPath};
use regex::Regex;
use serde::Deserialize;
use std::path::Path;
use tokio::process::Command;
use tracing::{debug, error, trace};
use tracing::{debug, trace};
use url::Url;
// nix path-info --derivation --json
#[derive(Debug, Clone, Deserialize)]
use crate::store::Store;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PathInfo {
pub deriver: Option<StorePath<String>>,
pub path: StorePath<String>,
signatures: Option<Vec<String>>,
pub signatures: Vec<String>,
pub references: Vec<StorePath<String>>,
pub nar_size: u64,
}
impl PathInfo {
// get PathInfo for a package or a store path
// we deserialize this as an array of `PathInfo` below
pub async fn from_path(path: &str) -> Result<Self> {
debug!("query nix path-info for {path}");
// use lix cause nix would return a json map instead of an array
// json output is not stable and could break in future
// TODO figure out a better way
let nix_cmd = Command::new("nix")
.arg("run")
.arg("--experimental-features")
.arg("nix-command flakes")
.arg("github:nixos/nixpkgs/nixos-unstable#lix")
.arg("--")
.arg("path-info")
.arg("--json")
.arg(path)
.output()
.await
.context("`nix path-info` failed for {package}")?;
pub async fn from_derivation(drv: &Path, store: &Store) -> Result<Self> {
debug!("query path info for {:?}", drv);
trace!(
"nix path-info output: {}",
String::from_utf8_lossy(&nix_cmd.stdout)
);
// nix path-info returns an array with one element
match serde_json::from_slice::<Vec<_>>(&nix_cmd.stdout)
.context("parse path info from stdout")
{
Ok(path_info) => path_info
.into_iter()
.next()
.ok_or_else(|| Error::msg("nix path-info returned empty")),
Err(e) => {
error!(
"Failed to parse data from `nix path-info`. The path may not exist on your system."
);
Err(e)
let derivation = match drv.extension() {
Some(ext) if ext == "drv" => drv.as_os_str().as_encoded_bytes(),
_ => {
let drv = {
// resolve symlink
if drv.is_symlink() {
&drv.canonicalize()?
} else {
drv
}
};
&Command::new("nix")
.arg("path-info")
.arg("--derivation")
.arg(drv)
.output()
.await
.context(format!("run command: nix path-info --derivaiton {drv:?}"))?
.stdout
}
};
let derivation = String::from_utf8_lossy(derivation);
debug!("derivation: {derivation}");
if derivation.is_empty() {
return Err(anyhow!(
"nix path-info did not return a derivation for {drv:#?}"
));
}
Self::from_path(derivation.trim(), store).await
}
pub async fn get_closure(&self) -> Result<Vec<Self>> {
debug!("query nix-store for {}", self.absolute_path());
let nix_store_cmd = Command::new("nix-store")
.arg("--query")
.arg("--requisites")
.arg("--include-outputs")
.arg(self.absolute_path())
.output()
pub async fn from_path(path: &str, store: &Store) -> Result<Self> {
let store_path =
StorePath::from_absolute_path(path.as_bytes()).context("storepath from path")?;
store
.query_path_info(store_path)
.await
.expect("nix-store cmd failed");
.context("query pathinfo for path")
}
let nix_store_paths = String::from_utf8(nix_store_cmd.stdout)?;
let nix_store_paths: Vec<&str> = nix_store_paths.lines().collect();
let mut closure = Vec::with_capacity(nix_store_paths.len());
for path in nix_store_paths {
closure.push(Self::from_path(path).await?);
}
Ok(closure)
// TODO: skip call to query_path_info and return Vec<Path>?
pub async fn get_closure(&self, store: &Store) -> Result<Vec<Self>> {
let futs = store
.compute_fs_closure(self.path.clone())
.await?
.into_iter()
.map(|x| store.query_path_info(x));
join_all(futs).await.into_iter().collect()
}
/// checks if the path is signed by any upstream. if it is, we assume a cache hit.
@ -101,23 +96,21 @@ impl PathInfo {
}
fn signees(&self) -> Vec<&str> {
if let Some(signatures) = self.signatures.as_ref() {
let signees: Vec<_> = signatures
.iter()
.filter_map(|signature| Some(signature.split_once(":")?.0))
.collect();
trace!("signees for {}: {:?}", self.path, signees);
return signees;
}
Vec::new()
let signers: Vec<_> = self
.signatures
.iter()
.filter_map(|signature| Some(signature.split_once(":")?.0))
.collect();
trace!("signers for {}: {:?}", self.path, signers);
signers
}
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
for upstream in upstreams {
let upstream = upstream
.join(format!("{}.narinfo", self.digest()).as_str())
.join(self.narinfo_path().as_ref())
.expect("adding <hash>.narinfo should make a valid url");
debug!("querying {}", upstream);
trace!("querying {}", upstream);
let res_status = reqwest::Client::new()
.head(upstream.as_str())
.send()
@ -135,83 +128,12 @@ impl PathInfo {
self.path.to_absolute_path()
}
pub fn digest(&self) -> String {
nixbase32::encode(self.path.digest())
pub fn narinfo_path(&self) -> ObjectPath {
ObjectPath::parse(format!("{}.narinfo", nixbase32::encode(self.path.digest())))
.expect("must parse to a valid object_store path")
}
pub async fn check_if_already_exists(&self, s3_client: &s3::Client, bucket: String) -> bool {
s3_client
.head_object()
.bucket(bucket)
.key(format!("{}.narinfo", self.digest()))
.send()
.await
.is_ok()
pub async fn check_if_already_exists(&self, s3: &AmazonS3) -> bool {
s3.head(&self.narinfo_path()).await.is_ok()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn get_signees_from_path_info() {
let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#;
let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize");
path_info.signatures = Some(vec![
"cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(),
"nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(),
]);
let signees = path_info.signees();
assert_eq!(signees, vec!["cache.nixos.org-1", "nixcache.cy7.sh"]);
}
#[test]
fn match_upstream_cache_from_signature() {
let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#;
let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize");
path_info.signatures = Some(vec![
"cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(),
"nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(),
"nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs=".to_string(),
]);
assert!(
path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()])
);
assert!(
path_info.check_upstream_signature(&[Url::parse("https://nixcache.cy7.sh").unwrap()])
);
assert!(
path_info.check_upstream_signature(&[
Url::parse("https://nix-community.cachix.org").unwrap()
])
);
assert!(
!path_info
.check_upstream_signature(&[Url::parse("https://fake-cache.cachix.org").unwrap()]),
);
}
#[test]
fn path_info_without_signature() {
let path_info_json = r#"{"ca":"fixed:r:sha256:1q10p04pgx9sk6xbvrkn4nvh0ys2lzplgcni5368f4z3cr8ikbmz","narHash":"sha256-v64ZUWbjE4fMKNGyR++nQnsAtyV25r26mTr1dwm4IOA=","narSize":5520,"path":"/nix/store/gj6hz9mj23v01yvq1nn5f655jrcky1qq-nixos-option.nix","references":[],"registrationTime":1744740942,"valid":true}"#;
let path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize");
assert!(
!path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()])
);
}
/*
#[test]
fn path_info_deserialize_nix_map() {
let path_info_json = r#"{"/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation":{"ca":null,"deriver":"/nix/store/h8z25s6arcrns5nmrq1yhgbamywjivpn-home-manager-generation.drv","narHash":"sha256-o4qwqyJ5UVm9cyC/nBNcNYVnIM14Pewgw7fou+wUVSY=","narSize":13608,"references":["/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/4d0ix5djms3n2jnjdc58l916cwack1rp-empty-directory","/nix/store/56zmgla8443qfrkrh2ch0vz0mh8ywrw1-home-manager-files","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/80l1sb3vcmrkcdd7ihlizkcnv19rq9fj-ncurses-6.5","/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation","/nix/store/92as847i10kl6s19fi910ddyk9l83835-check-link-targets.sh","/nix/store/9c90iz95yynyh3vsc67zndch6j01vgz3-home-manager-path","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bm5fi6wj0w4r2wjll2448k307bzfcjwx-cleanup","/nix/store/c244fsb3a7i5837lzn94m4bmav9i5p9b-link","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/gpxsdrrd4x93fs75395vr2dfys1ki9mq-jq-1.7.1-bin","/nix/store/jlf743lqxbvad6dbgndsgqfg20m2np5i-sd-switch-0.5.3","/nix/store/mhmgm739aagj4x7hr6ag2wjmxhmpy8mf-gettext-0.22.5","/nix/store/w9db12j05yv5hl31s6jndd9cfm1g1gw4-hm-modules-messages","/nix/store/wj1c3gsiajabnq50ifxqnlv60i5rhqj7-diffutils-3.10","/nix/store/xhql0ilzbiqwnmz4z8y0phk611wynxf2-gnused-4.9","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zhrjg6wxrxmdlpn6iapzpp2z2vylpvw5-home-manager.sh"],"registrationTime":1744742989,"signatures":["nixcache.cy7.sh:Vq4X95kSzum7BwrBhjmmM2yVipfBI3AE3jgZ3b3RoYrP4/ghotbDdlwCvwK3qx4BQdEOLSgrC1tDwiMNb6oRBw=="],"ultimate":false}}"#;
serde_json::from_str::<HashMap<String, PathInfo>>(path_info_json).expect("must serialize");
let path_info_json = r#"{"/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b":{"ca":null,"deriver":"/nix/store/12ssi931481jlkizgfk1c1jnawvwjbhh-nixos-system-chunk-25.05.20250412.2631b0b.drv","narHash":"sha256-CHhBIzMD4v/FKqKgGroq0UC1k3GrK5lcNwQPMpv2xLc=","narSize":20704,"references":["/nix/store/0yjiyixxsr137iw93hnaacdsssy1li9h-switch-to-configuration-0.1.0","/nix/store/14rby7cpwrzjsjym44cl5h6nj6qpn1gs-etc","/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b","/nix/store/3wjljpj30fvv2cdb60apr4126pa5bm87-shadow-4.17.2","/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/5dyh8l59kfvf89zjkbmjfnx7fix93n4f-net-tools-2.10","/nix/store/aq9wdsz12bg9252790l9awiry2bml4ls-sops-install-secrets-0.0.1","/nix/store/b00kq6fjhgisdrykg621vml8505nnmb3-users-groups.json","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bfr68wi6k8icb3j9fy3fzchva56djfhd-mounts.sh","/nix/store/cjnihsds5hhnji9r85hglph07q9y9hgc-system-path","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/f9jll96j74f5ykvs062718b98lfjbn9g-util-linux-2.40.4-bin","/nix/store/h7zih134d3n5yk8pnhv1fa38n6qkyrn2-pre-switch-checks","/nix/store/idn5n51246piyxcr3v6gxnj5a5l9mzpn-linux-6.14.2","/nix/store/ipn5793y61x2904xqnkgbjnp91svjjzx-perl-5.40.0-env","/nix/store/j1rikvl25pz0b5ham1ijq0nbg1q2fqfy-initrd-linux-6.14.2","/nix/store/jgawnqyh6piwcl79gxpmq5czx9rfr9xh-glibc-locales-2.40-66","/nix/store/jqgmcv8j4gj59218hcbiyn8z951rycdj-install-grub.sh","/nix/store/kpmybhxy3gz6k1znbdirwsp3c6wvsgg9-manifest.json","/nix/store/lgainx4gl6q7mhiwmls81d3n51p5jz7z-linux-6.14.2-modules","/nix/store/mhxn5kwnri3z9hdzi3x0980id65p0icn-lib.sh","/nix/store/n8n0faszqlnf3mdg0fj6abnknrhjsw5j-perl-5.40.0-env","/nix/store/nq61v7a601gjndijq5nndprkzpwz4q9g-glibc-2.40-66-bin","/nix/store/nx27idxpvi3fk3p7admvhipny73nr25n-kmod-31","/nix/store/pggww1d2pg24fcg5v36xn63n53vanyyi-gnupg-2.4.7","/nix/store/rg5rf512szdxmnj9qal3wfdnpfsx38qi-setup-etc.pl","/nix/store/vvlfaafnz3pdhw7lx5kc5gb9pl4zhz5l-local-cmds","/nix/store/w142vx7ij1fz6qwhp5dprkf59cizvv1v-update-users-groups.pl","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zlsmh0ccgvncg30qb4y0mp5pahnk1wnw-append-initrd-secrets","/nix/store/zs07icpv5ykf8m36xcv717hh26bp09fa-firmware","/nix/store/zy2n4id5gcxcbx2x8jbblkmcpdlpsypk-getent-glibc-2.40-66"],"registrationTime":1744743136,"signatures":["nixcache.cy7.sh:dZ1XiKQNe0fRX48gBj03PIABYJGV6BPwb72YpMqEBONZMF+JrkVKhRCF0ur/4Bf5prHxg6Qfg1ytP/4csRC9DQ=="],"ultimate":false}}"#;
serde_json::from_str::<HashMap<String, PathInfo>>(path_info_json).expect("must serialize");
}
*/
}

View file

@ -1,6 +1,8 @@
use std::{
collections::HashSet,
fs,
iter::once,
path::PathBuf,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
@ -8,22 +10,22 @@ use std::{
};
use anyhow::{Context, Result};
use aws_config::Region;
use aws_sdk_s3 as s3;
use futures::future::join_all;
use humansize::{DECIMAL, format_size};
use nix_compat::narinfo::{self, SigningKey};
use tokio::sync::{RwLock, mpsc};
use tracing::{debug, info, trace};
use object_store::aws::{AmazonS3, AmazonS3Builder};
use tokio::sync::{RwLock, Semaphore, mpsc};
use tracing::debug;
use url::Url;
use crate::{PushArgs, path_info::PathInfo, uploader::Uploader};
use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
pub struct Push {
upstream_caches: Vec<Url>,
store_paths: Arc<RwLock<Vec<PathInfo>>>,
s3_client: s3::Client,
store_paths: Arc<RwLock<HashSet<PathInfo>>>,
signing_key: SigningKey<ed25519_dalek::SigningKey>,
bucket: String,
store: Arc<Store>,
s3: Arc<AmazonS3>,
// paths that we skipped cause of a signature match
signature_hit_count: AtomicUsize,
// paths that we skipped cause we found it on an upstream
@ -35,7 +37,7 @@ pub struct Push {
}
impl Push {
pub async fn new(cli: &PushArgs) -> Result<Self> {
pub async fn new(cli: &PushArgs, store: Store) -> Result<Self> {
let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1);
for upstream in cli
.upstreams
@ -49,24 +51,21 @@ impl Push {
let key = fs::read_to_string(&cli.signing_key)?;
let signing_key = narinfo::parse_keypair(key.as_str())?.0;
let mut s3_config = aws_config::from_env();
let mut s3_builder = AmazonS3Builder::from_env().with_bucket_name(&cli.bucket);
if let Some(region) = &cli.region {
s3_config = s3_config.region(Region::new(region.clone()));
s3_builder = s3_builder.with_region(region);
}
if let Some(endpoint) = &cli.endpoint {
s3_config = s3_config.endpoint_url(endpoint);
}
if let Some(profile) = &cli.profile {
s3_config = s3_config.profile_name(profile);
s3_builder = s3_builder.with_endpoint(endpoint);
}
let s3_client = s3::Client::new(&s3_config.load().await);
Ok(Self {
upstream_caches: upstreams,
store_paths: Arc::new(RwLock::new(Vec::new())),
s3_client,
store_paths: Arc::new(RwLock::new(HashSet::new())),
signing_key,
bucket: cli.bucket.clone(),
store: Arc::new(store),
s3: Arc::new(s3_builder.build()?),
signature_hit_count: AtomicUsize::new(0),
upstream_hit_count: AtomicUsize::new(0),
already_exists_count: AtomicUsize::new(0),
@ -74,26 +73,42 @@ impl Push {
})
}
pub async fn paths_from_package(&mut self, package: &str) -> Result<()> {
let path_info = PathInfo::from_path(package)
pub async fn add_paths(&'static self, paths: Vec<PathBuf>) -> Result<()> {
let mut futs = Vec::with_capacity(paths.len());
for path in paths {
let store_paths = self.store_paths.clone();
let store = self.store.clone();
futs.push(tokio::spawn(async move {
let path_info = PathInfo::from_derivation(path.as_path(), &store)
.await
.context("get path info for path")?;
debug!("path-info for {path:?}: {path_info:?}");
store_paths.write().await.extend(
path_info
.get_closure(&store)
.await
.context("closure from path info")?,
);
Ok(())
}));
}
join_all(futs)
.await
.context("get path info for package")?;
debug!("path-info for {package}: {:?}", path_info);
self.store_paths.write().await.extend(
path_info
.get_closure()
.await
.context("closure from path info")?,
);
info!("found {} store paths", self.store_paths.read().await.len());
.into_iter()
.flatten()
.collect::<Result<Vec<_>>>()?;
println!("found {} store paths", self.store_paths.read().await.len());
Ok(())
}
pub async fn run(&'static self) -> Result<()> {
let (tx, rx) = mpsc::channel(10);
let (tx, rx) = mpsc::channel(1);
let filter = tokio::spawn(self.filter_from_upstream(tx));
let upload = tokio::spawn(self.upload(rx));
filter.await?;
upload.await??;
Ok(())
@ -101,33 +116,34 @@ impl Push {
/// filter paths that are on upstream and send to `tx`
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
let mut handles = Vec::with_capacity(10);
let mut handles = Vec::new();
let store_paths = self.store_paths.read().await.clone();
// limit number of inflight requests
let inflight_permits = Arc::new(Semaphore::new(32));
for path in store_paths.into_iter() {
if path.check_upstream_signature(&self.upstream_caches) {
trace!("skip {} (signature match)", path.absolute_path());
self.signature_hit_count.fetch_add(1, Ordering::Release);
debug!("skip {} (signature match)", path.absolute_path());
self.signature_hit_count.fetch_add(1, Ordering::Relaxed);
continue;
}
handles.push({
let tx = tx.clone();
let inflight_permits = inflight_permits.clone();
tokio::spawn(async move {
let _permit = inflight_permits.acquire().await.unwrap();
if !path
.check_upstream_hit(self.upstream_caches.as_slice())
.await
{
if path
.check_if_already_exists(&self.s3_client, self.bucket.clone())
.await
{
trace!("skip {} (already exists)", path.absolute_path());
if path.check_if_already_exists(&self.s3).await {
debug!("skip {} (already exists)", path.absolute_path());
self.already_exists_count.fetch_add(1, Ordering::Relaxed);
} else {
tx.send(path).await.unwrap();
}
} else {
trace!("skip {} (upstream hit)", path.absolute_path());
debug!("skip {} (upstream hit)", path.absolute_path());
self.upstream_hit_count.fetch_add(1, Ordering::Relaxed);
}
})
@ -142,24 +158,35 @@ impl Push {
}
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
let mut uploads = Vec::with_capacity(10);
let mut uploads = Vec::new();
let permits = Arc::new(Semaphore::new(10));
loop {
let permits = permits.clone();
if let Some(path_to_upload) = rx.recv().await {
let absolute_path = path_to_upload.absolute_path();
println!("uploading: {}", absolute_path);
let uploader = Uploader::new(
&self.signing_key,
path_to_upload,
&self.s3_client,
self.bucket.clone(),
)?;
uploads.push(tokio::spawn(async move {
let res = uploader.upload().await;
self.upload_count.fetch_add(1, Ordering::Relaxed);
res
uploads.push(tokio::spawn({
// large uploads will be concurrently uploaded with multipart anyway so don't spawn
// too much of them
let permit = if path_to_upload.nar_size > 15 * 1024 * 1024 {
Some(permits.acquire_owned().await.unwrap())
} else {
None
};
println!(
"uploading: {} (size: {})",
path_to_upload.absolute_path(),
format_size(path_to_upload.nar_size, DECIMAL)
);
let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
let s3 = self.s3.clone();
let store = self.store.clone();
async move {
let res = uploader.upload(s3, store).await;
drop(permit);
self.upload_count.fetch_add(1, Ordering::Relaxed);
res
}
}));
} else {
join_all(uploads)

100
src/store.rs Normal file
View file

@ -0,0 +1,100 @@
use std::{ffi::OsStr, os::unix::ffi::OsStrExt, sync::Arc};
use anyhow::{Context, Result};
use nix_compat::store_path::StorePath;
use tokio::{io::AsyncRead, task};
use tokio_util::io::StreamReader;
use crate::{
bindings::{self, AsyncWriteAdapter},
path_info::PathInfo,
};
pub struct Store {
inner: Arc<bindings::FfiNixStore>,
}
impl Store {
pub fn connect() -> Result<Self> {
let inner = unsafe { bindings::open_nix_store()? };
Ok(Self {
inner: Arc::new(inner),
})
}
pub async fn compute_fs_closure(
&self,
path: StorePath<String>,
) -> Result<Vec<StorePath<String>>> {
let inner = self.inner.clone();
task::spawn_blocking(move || {
let cxx_vector =
inner
.store()
.compute_fs_closure(path.to_string().as_bytes(), false, true, true)?;
cxx_vector
.iter()
.map(|x| {
StorePath::from_bytes(x.as_bytes())
.context("make StorePath from vector returned by compute_fs_closure")
})
.collect::<Result<_, _>>()
})
.await
.unwrap()
}
pub async fn query_path_info(&self, path: StorePath<String>) -> Result<PathInfo> {
let inner = self.inner.clone();
task::spawn_blocking(move || {
let mut c_path_info = inner
.store()
.query_path_info(path.to_string().as_bytes())
.context("query cpp for path info")?;
let signatures = c_path_info
.pin_mut()
.sigs()
.into_iter()
.map(|x| {
let osstr = OsStr::from_bytes(x.as_bytes());
osstr.to_str().unwrap().to_string()
})
.collect();
let references = c_path_info
.pin_mut()
.references()
.into_iter()
.map(|x| StorePath::from_bytes(x.as_bytes()))
.collect::<Result<_, _>>()
.context("get references from pathinfo")?;
let nar_size = c_path_info.pin_mut().nar_size();
Ok(PathInfo {
path,
signatures,
references,
nar_size,
})
})
.await
.unwrap()
}
pub fn nar_from_path(&self, store_path: StorePath<String>) -> impl AsyncRead {
let inner = self.inner.clone();
let (adapter, mut sender) = AsyncWriteAdapter::new();
let base_name = store_path.to_string().as_bytes().to_vec();
tokio::task::spawn_blocking(move || {
// Send all exceptions through the channel, and ignore errors
// during sending (the channel may have been closed).
if let Err(e) = inner.store().nar_from_path(base_name, sender.clone()) {
let _ = sender.rust_error(e);
}
});
StreamReader::new(adapter)
}
}

View file

@ -1,187 +1,80 @@
use anyhow::Result;
use async_compression::{Level, tokio::bufread::ZstdEncoder};
use aws_sdk_s3::{
self as s3,
types::{CompletedMultipartUpload, CompletedPart},
};
use futures::future::join_all;
use nix_compat::{
narinfo::{self, NarInfo, SigningKey},
nixbase32,
store_path::StorePath,
};
use sha2::{Digest, Sha256};
use tokio::{io::AsyncReadExt, process::Command};
use tracing::debug;
use bytes::BytesMut;
use nix_compat::{narinfo::SigningKey, nixbase32};
use object_store::{ObjectStore, aws::AmazonS3, buffered::BufWriter, path::Path};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{debug, trace};
use ulid::Ulid;
use crate::path_info::PathInfo;
use crate::{make_nar::MakeNar, path_info::PathInfo, store::Store};
const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5;
const CHUNK_SIZE: usize = 1024 * 1024 * 5;
pub struct Uploader<'a> {
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
path: PathInfo,
s3_client: &'a s3::Client,
bucket: String,
}
impl<'a> Uploader<'a> {
pub fn new(
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
path: PathInfo,
s3_client: &'a s3::Client,
bucket: String,
) -> Result<Self> {
Ok(Self {
signing_key,
path,
s3_client,
bucket,
})
Ok(Self { signing_key, path })
}
pub async fn upload(&self) -> Result<()> {
let nar = self.make_nar().await?;
let mut nar_info = self.narinfo_from_nar(&nar)?;
let nar = self.compress_nar(&nar).await;
pub async fn upload(&self, s3: Arc<AmazonS3>, store: Arc<Store>) -> Result<()> {
let mut nar = MakeNar::new(&self.path, store)?;
// update fields that we know after compression
let mut hasher = Sha256::new();
hasher.update(&nar);
let hash: [u8; 32] = hasher.finalize().into();
let nar_url = self.nar_url(&hash);
nar_info.file_hash = Some(hash);
nar_info.file_size = Some(nar.len() as u64);
nar_info.url = nar_url.as_str();
debug!("uploading nar with key: {nar_url}");
// we don't know what the hash of the compressed file will be so upload to a
// temp location for now
let temp_path = Path::parse(Ulid::new().to_string())?;
let mut s3_writer = BufWriter::new(s3.clone(), temp_path.clone());
debug!("uploading to temp path: {}", temp_path);
if nar.len() < MULTIPART_CUTOFF {
let put_object = self
.s3_client
.put_object()
.bucket(&self.bucket)
.key(&nar_url)
.body(nar.into())
.send()
.await?;
debug!("put object: {:#?}", put_object);
} else {
let multipart = self
.s3_client
.create_multipart_upload()
.bucket(&self.bucket)
.key(&nar_url)
.send()
.await?;
let upload_id = multipart.upload_id().unwrap();
let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF);
let chunks = nar.chunks(MULTIPART_CUTOFF);
for (i, chunk) in chunks.enumerate() {
parts.push(tokio::task::spawn(
self.s3_client
.upload_part()
.bucket(&self.bucket)
.key(&nar_url)
.upload_id(upload_id)
.part_number(i as i32 + 1)
.body(chunk.to_vec().into())
.send(),
));
// compress and upload nar
let mut file_reader = nar.compress_and_hash()?;
loop {
let mut buf = BytesMut::with_capacity(CHUNK_SIZE);
let n = file_reader.read_buf(&mut buf).await?;
s3_writer.put(buf.freeze()).await?;
if n == 0 {
break;
}
let completed_parts = join_all(parts)
.await
.into_iter()
.flatten()
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.enumerate()
.map(|(i, part)| {
CompletedPart::builder()
.set_e_tag(part.e_tag().map(ToString::to_string))
.set_part_number(Some(i as i32 + 1))
.set_checksum_sha256(part.checksum_sha256().map(ToString::to_string))
.build()
})
.collect::<Vec<_>>();
let completed_mp_upload = CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build();
let complete_mp_upload = self
.s3_client
.complete_multipart_upload()
.bucket(&self.bucket)
.key(&nar_url)
.upload_id(upload_id)
.multipart_upload(completed_mp_upload)
.send()
.await?;
debug!("complete multipart upload: {:#?}", complete_mp_upload);
}
drop(file_reader);
let narinfo_url = format!("{}.narinfo", self.path.digest());
debug!("uploading narinfo with key {narinfo_url}");
self.s3_client
.put_object()
.bucket(&self.bucket)
.key(narinfo_url)
.body(nar_info.to_string().as_bytes().to_vec().into())
.send()
.await?;
let mut nar_info = nar.get_narinfo()?;
nar_info.add_signature(self.signing_key);
// now that we can calculate the file_hash move the nar to where it should be
let real_path = nar_url(
&nar_info
.file_hash
.expect("file hash must be known at this point"),
);
debug!("moving {} to {}", temp_path, real_path);
// the temp object must be done uploading
s3_writer.shutdown().await?;
// this is implemented as a copy-and-delete
s3.rename(&temp_path, &real_path).await?;
// set nar url in narinfo
nar_info.url = real_path.as_ref();
// upload narinfo
let narinfo_path = self.path.narinfo_path();
debug!("uploading narinfo: {}", narinfo_path);
trace!("narinfo: {:#}", nar_info);
s3.put(&narinfo_path, nar_info.to_string().into()).await?;
Ok(())
}
async fn make_nar(&self) -> Result<Vec<u8>> {
Ok(Command::new("nix")
.arg("nar")
.arg("dump-path")
.arg(self.path.absolute_path())
.output()
.await?
.stdout)
}
fn narinfo_from_nar(&self, nar: &[u8]) -> Result<NarInfo> {
let mut hasher = Sha256::new();
hasher.update(nar);
let nar_hash: [u8; 32] = hasher.finalize().into();
let mut nar_info = NarInfo {
flags: narinfo::Flags::empty(),
store_path: self.path.path.as_ref(),
nar_hash,
nar_size: nar.len() as u64,
references: self.path.references.iter().map(StorePath::as_ref).collect(),
signatures: Vec::new(),
ca: None,
system: None,
deriver: self.path.deriver.as_ref().map(|x| x.as_ref()),
compression: Some("zstd"),
file_hash: None,
file_size: None,
url: "",
};
// signature consists of: store_path, nar_hash, nar_size, and references
nar_info.add_signature(self.signing_key);
Ok(nar_info)
}
fn nar_url(&self, compressed_nar_hash: &[u8]) -> String {
let compressed_nar_hash = nixbase32::encode(compressed_nar_hash);
format!("nar/{compressed_nar_hash}.nar.zst")
}
async fn compress_nar(&self, nar: &[u8]) -> Vec<u8> {
let mut encoder = ZstdEncoder::with_quality(nar, Level::Default);
let mut compressed = Vec::with_capacity(nar.len());
encoder
.read_to_end(&mut compressed)
.await
.expect("should compress just fine");
compressed
}
}
/// calculate url where the compressed nar should be uploaded
fn nar_url(file_hash: &[u8]) -> Path {
let compressed_nar_hash = nixbase32::encode(file_hash);
Path::parse(format!("nar/{compressed_nar_hash}.nar.zst"))
.expect("should parse to a valid object_store::path::Path")
}

32
tests/common/mod.rs Normal file
View file

@ -0,0 +1,32 @@
#![allow(dead_code)]
use std::process::Command;
use std::sync::Arc;
use nixcp::store::Store;
pub const HELLO: &str = "github:nixos/nixpkgs?ref=f771eb401a46846c1aebd20552521b233dd7e18b#hello";
pub const HELLO_DRV: &str = "iqbwkm8mjjjlmw6x6ry9rhzin2cp9372-hello-2.12.1.drv";
pub const HELLO_PATH: &str = "/nix/store/9bwryidal9q3g91cjm6xschfn4ikd82q-hello-2.12.1";
pub struct Context {
pub store: Arc<Store>,
}
impl Context {
fn new() -> Self {
// hello must be in the store
Command::new("nix")
.arg("build")
.arg("--no-link")
.arg(HELLO)
.status()
.unwrap();
let store = Arc::new(Store::connect().expect("connect to nix store"));
Self { store }
}
}
pub fn context() -> Context {
Context::new()
}

26
tests/nar.rs Normal file
View file

@ -0,0 +1,26 @@
use crate::common::HELLO_PATH;
use nix_compat::nixbase32;
use nixcp::make_nar::MakeNar;
use nixcp::path_info::PathInfo;
use sha2::Digest;
use tokio::io::AsyncReadExt;
mod common;
#[tokio::test]
async fn nar_size_and_hash() {
let ctx = common::context();
let path_info = PathInfo::from_path(HELLO_PATH, &ctx.store).await.unwrap();
let mut nar = MakeNar::new(&path_info, ctx.store).unwrap();
let mut reader = nar.compress_and_hash().unwrap();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
drop(reader);
assert_eq!(nar.nar_size, 234680);
let nar_hash = nar.nar_hasher.finalize();
let real_nar_hash = "08za7nnjda8kpdsd73v3mhykjvp0rsmskwsr37winhmzgm6iw79w";
assert_eq!(nixbase32::encode(nar_hash.as_slice()), real_nar_hash);
}

56
tests/path_info.rs Normal file
View file

@ -0,0 +1,56 @@
use nixcp::path_info::PathInfo;
use std::path::PathBuf;
use tempfile::TempDir;
use crate::common::{HELLO, HELLO_DRV, HELLO_PATH};
mod common;
#[tokio::test]
async fn path_info_from_package() {
let ctx = common::context();
let path = PathBuf::from(HELLO);
let path_info = PathInfo::from_derivation(&path, &ctx.store)
.await
.expect("get pathinfo from package");
assert_eq!(path_info.path.to_string(), HELLO_DRV);
}
#[tokio::test]
async fn path_info_from_path() {
let ctx = common::context();
let path = PathBuf::from(HELLO_PATH);
let path_info = PathInfo::from_derivation(&path, &ctx.store)
.await
.expect("get pathinfo from package");
assert_eq!(path_info.path.to_string(), HELLO_DRV);
}
#[tokio::test]
async fn path_info_symlink() {
let ctx = common::context();
let temp_path = TempDir::new().unwrap();
let link_path = temp_path.path().join("result");
// symlink at ./result (like `nix build`)
std::os::unix::fs::symlink(HELLO_PATH, &link_path).unwrap();
// should resolve symlink
let path_info = PathInfo::from_derivation(&link_path, &ctx.store)
.await
.expect("get pathinfo from package");
assert_eq!(path_info.path.to_string(), HELLO_DRV);
}
#[tokio::test]
async fn closure() {
let ctx = common::context();
let path = PathBuf::from(HELLO);
let path_info = PathInfo::from_derivation(&path, &ctx.store)
.await
.expect("get pathinfo from package");
let closure = path_info.get_closure(&ctx.store).await.unwrap();
assert_eq!(closure.len(), 472);
}