Compare commits
32 commits
39792cdd40
...
3c40776981
Author | SHA1 | Date | |
---|---|---|---|
3c40776981 | |||
9b0c6aece6 | |||
14d6e9d29e | |||
0e97d11745 | |||
a0794b0356 | |||
05589641cf | |||
76cbc85032 | |||
09181ae785 | |||
54d4c714af | |||
878e096494 | |||
7285c29e88 | |||
9d2e9e38bd | |||
01443d0d99 | |||
b49be95d09 | |||
e5336d304d | |||
ca97aebd7a | |||
85fefe9e77 | |||
0fedae9334 | |||
0fae7ac3b0 | |||
7dec14fc1a | |||
d524222a86 | |||
5a3e6089b4 | |||
fc304df35e | |||
b8877f33a3 | |||
846c465ea0 | |||
81ce855dae | |||
b1e59d0a6c | |||
6806b96892 | |||
84bbe5dcb4 | |||
a771785352 | |||
8ac9253ea3 | |||
c956d6741a |
20 changed files with 1936 additions and 1511 deletions
2
.cargo/config.toml
Normal file
2
.cargo/config.toml
Normal file
|
@ -0,0 +1,2 @@
|
|||
[build]
|
||||
rustflags = ["--cfg", "tokio_unstable"]
|
3
.editorconfig
Normal file
3
.editorconfig
Normal file
|
@ -0,0 +1,3 @@
|
|||
[*.nix]
|
||||
indent_size = 2
|
||||
indent_stype = space
|
1834
Cargo.lock
generated
1834
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
28
Cargo.toml
28
Cargo.toml
|
@ -3,21 +3,37 @@ name = "nixcp"
|
|||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[profile.release]
|
||||
lto = true
|
||||
codegen-units = 1
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.97"
|
||||
async-compression = { version = "0.4.22", features = ["tokio", "zstd"] }
|
||||
aws-config = { version = "1.6.1", features = ["behavior-version-latest"] }
|
||||
aws-sdk-s3 = "1.82.0"
|
||||
clap = { version = "4.5.34", features = ["derive"] }
|
||||
ed25519-dalek = "2.1.1"
|
||||
futures = "0.3.31"
|
||||
nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" }
|
||||
regex = "1.11.1"
|
||||
reqwest = "0.12.15"
|
||||
serde = { version = "1.0.219", features = [ "derive" ]}
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde_json = "1.0.140"
|
||||
sha2 = "0.10.8"
|
||||
tokio = { version = "1.44.1", features = [ "full" ]}
|
||||
tokio = { version = "1.44.1", features = ["full", "tracing", "parking_lot"] }
|
||||
tracing = "0.1.41"
|
||||
tracing-subscriber = { version = "0.3.19", features = ["env-filter"]}
|
||||
url = { version = "2.5.4", features = [ "serde" ]}
|
||||
url = { version = "2.5.4", features = ["serde"] }
|
||||
cxx = "1.0"
|
||||
console-subscriber = "0.4.1"
|
||||
tokio-util = { version = "0.7.15", features = ["io"] }
|
||||
bytes = "1.10.1"
|
||||
object_store = { version = "0.12.0", features = ["aws"] }
|
||||
ulid = "1.2.1"
|
||||
tracing-subscriber = "0.3.19"
|
||||
humansize = "2.1.3"
|
||||
|
||||
[build-dependencies]
|
||||
cxx-build = "1.0"
|
||||
pkg-config = "0.3.32"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.19.1"
|
||||
|
|
19
README.md
19
README.md
|
@ -11,14 +11,13 @@ The signing key is generated with:
|
|||
nix-store --generate-binary-cache-key nixcache.cy7.sh cache-priv-key.pem cache-pub-key.pem
|
||||
```
|
||||
|
||||
`AWS_ACCESS_KEY_ID` and `AWS_ENDPOINT_URL` environment variables should be set with your s3 credentials.
|
||||
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables should be set with your s3 credentials.
|
||||
|
||||
```
|
||||
Usage: nixcp [OPTIONS] --bucket <bucket name> --signing-key <SIGNING_KEY> <COMMAND>
|
||||
Usage: nixcp push [OPTIONS] --bucket <bucket name> --signing-key <SIGNING_KEY> [PATH]...
|
||||
|
||||
Commands:
|
||||
push
|
||||
help Print this message or the help of the given subcommand(s)
|
||||
Arguments:
|
||||
[PATH]... Path to upload e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||
|
||||
Options:
|
||||
--bucket <bucket name>
|
||||
|
@ -28,15 +27,13 @@ Options:
|
|||
--signing-key <SIGNING_KEY>
|
||||
Path to the file containing signing key e.g. ~/cache-priv-key.pem
|
||||
--region <REGION>
|
||||
If unspecified, will get it form AWS_DEFAULT_REGION envar or the AWS default
|
||||
If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
|
||||
--endpoint <ENDPOINT>
|
||||
If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default e.g. https://s3.example.com
|
||||
--profile <PROFILE>
|
||||
AWS profile to use
|
||||
If unspecifed, will get it from AWS_ENDPOINT envar e.g. https://s3.example.com
|
||||
--skip-signature-check
|
||||
|
||||
-h, --help
|
||||
Print help
|
||||
-V, --version
|
||||
Print version
|
||||
```
|
||||
|
||||
## Install with nix
|
||||
|
|
21
build.rs
Normal file
21
build.rs
Normal file
|
@ -0,0 +1,21 @@
|
|||
fn main() {
|
||||
cxx_build::bridge("src/bindings/mod.rs")
|
||||
.file("src/bindings/nix.cpp")
|
||||
.flag("-std=c++2a")
|
||||
.flag("-O2")
|
||||
.flag("-include")
|
||||
.flag("nix/config.h")
|
||||
.flag("-I")
|
||||
.flag(concat!(env!("NIX_INCLUDE_PATH"), "/nix"))
|
||||
.compile("nixbinding");
|
||||
println!("cargo:rerun-if-changed=src/bindings");
|
||||
|
||||
pkg_config::Config::new()
|
||||
.atleast_version("2.4")
|
||||
.probe("nix-store")
|
||||
.unwrap();
|
||||
pkg_config::Config::new()
|
||||
.atleast_version("2.4")
|
||||
.probe("nix-main")
|
||||
.unwrap();
|
||||
}
|
52
flake.nix
52
flake.nix
|
@ -22,28 +22,56 @@
|
|||
};
|
||||
toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;
|
||||
craneLib = (crane.mkLib pkgs).overrideToolchain(_: toolchain);
|
||||
in
|
||||
{
|
||||
devShells.default = pkgs.mkShell {
|
||||
nativeBuildInputs = with pkgs; [
|
||||
pkg-config
|
||||
];
|
||||
buildInputs = with pkgs; [
|
||||
openssl
|
||||
toolchain
|
||||
];
|
||||
lib = pkgs.lib;
|
||||
|
||||
# don't clean cpp files
|
||||
cppFilter = path: _type: builtins.match ".*(cpp|hpp)$" path != null;
|
||||
cppOrCargo = path: type:
|
||||
(cppFilter path type) || (craneLib.filterCargoSources path type);
|
||||
src = lib.cleanSourceWith {
|
||||
src = ./.;
|
||||
filter = cppOrCargo;
|
||||
name = "source";
|
||||
};
|
||||
|
||||
packages.default = craneLib.buildPackage {
|
||||
src = craneLib.cleanCargoSource ./.;
|
||||
commonArgs = {
|
||||
inherit src;
|
||||
strictDeps = true;
|
||||
nativeBuildInputs = with pkgs; [
|
||||
pkg-config
|
||||
];
|
||||
buildInputs = with pkgs; [
|
||||
toolchain
|
||||
openssl
|
||||
nix
|
||||
boost
|
||||
];
|
||||
# for cpp bindings to work
|
||||
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
|
||||
# skip integration tests (they need a connection to the nix store)
|
||||
cargoTestExtraArgs = "--bins";
|
||||
};
|
||||
|
||||
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
|
||||
nixcp = craneLib.buildPackage (commonArgs // {
|
||||
inherit cargoArtifacts;
|
||||
});
|
||||
in
|
||||
{
|
||||
devShells.default = craneLib.devShell {
|
||||
inputsFrom = [ nixcp ];
|
||||
|
||||
RUST_BACKGRACE = 1;
|
||||
# for cpp bindings to work
|
||||
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
|
||||
|
||||
packages = with pkgs; [
|
||||
tokio-console
|
||||
cargo-udeps
|
||||
];
|
||||
};
|
||||
|
||||
packages.default = nixcp;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
237
src/bindings/mod.rs
Normal file
237
src/bindings/mod.rs
Normal file
|
@ -0,0 +1,237 @@
|
|||
/*
|
||||
Copyright 2022 Zhaofeng Li and the Attic contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
//! `libnixstore` Bindings
|
||||
#![allow(dead_code)]
|
||||
|
||||
use std::cell::UnsafeCell;
|
||||
use std::io;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use anyhow::Result;
|
||||
use bytes::Bytes;
|
||||
use futures::stream::{Stream, StreamExt};
|
||||
use tokio::io::{AsyncWrite, AsyncWriteExt};
|
||||
|
||||
// The C++ implementation takes care of concurrency
|
||||
#[repr(transparent)]
|
||||
pub struct FfiNixStore(UnsafeCell<cxx::UniquePtr<ffi::CNixStore>>);
|
||||
|
||||
unsafe impl Send for FfiNixStore {}
|
||||
unsafe impl Sync for FfiNixStore {}
|
||||
|
||||
impl FfiNixStore {
|
||||
pub fn store(&self) -> Pin<&mut ffi::CNixStore> {
|
||||
unsafe {
|
||||
let ptr = self.0.get().as_mut().unwrap();
|
||||
ptr.pin_mut()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Obtain a handle to the Nix store.
|
||||
pub unsafe fn open_nix_store() -> Result<FfiNixStore> {
|
||||
match ffi::open_nix_store() {
|
||||
Ok(ptr) => {
|
||||
let cell = UnsafeCell::new(ptr);
|
||||
Ok(FfiNixStore(cell))
|
||||
}
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Benchmark different implementations
|
||||
// (tokio, crossbeam, flume)
|
||||
mod mpsc {
|
||||
// Tokio
|
||||
pub use tokio::sync::mpsc::{
|
||||
UnboundedReceiver, UnboundedSender, error::SendError, unbounded_channel,
|
||||
};
|
||||
}
|
||||
|
||||
/// Async write request.
|
||||
#[derive(Debug)]
|
||||
enum AsyncWriteMessage {
|
||||
Data(Vec<u8>),
|
||||
Error(String),
|
||||
Eof,
|
||||
}
|
||||
|
||||
/// Async write request sender.
|
||||
#[derive(Clone)]
|
||||
pub struct AsyncWriteSender {
|
||||
sender: mpsc::UnboundedSender<AsyncWriteMessage>,
|
||||
}
|
||||
|
||||
impl AsyncWriteSender {
|
||||
fn send(&mut self, data: &[u8]) -> Result<(), mpsc::SendError<AsyncWriteMessage>> {
|
||||
let message = AsyncWriteMessage::Data(Vec::from(data));
|
||||
self.sender.send(message)
|
||||
}
|
||||
|
||||
fn eof(&mut self) -> Result<(), mpsc::SendError<AsyncWriteMessage>> {
|
||||
let message = AsyncWriteMessage::Eof;
|
||||
self.sender.send(message)
|
||||
}
|
||||
|
||||
pub(crate) fn rust_error(
|
||||
&mut self,
|
||||
error: impl std::error::Error,
|
||||
) -> Result<(), impl std::error::Error> {
|
||||
let message = AsyncWriteMessage::Error(error.to_string());
|
||||
self.sender.send(message)
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper of the `AsyncWrite` trait for the synchronous Nix C++ land.
|
||||
pub struct AsyncWriteAdapter {
|
||||
receiver: mpsc::UnboundedReceiver<AsyncWriteMessage>,
|
||||
eof: bool,
|
||||
}
|
||||
|
||||
impl AsyncWriteAdapter {
|
||||
pub fn new() -> (Self, Box<AsyncWriteSender>) {
|
||||
let (sender, receiver) = mpsc::unbounded_channel();
|
||||
|
||||
let r = Self {
|
||||
receiver,
|
||||
eof: false,
|
||||
};
|
||||
let sender = Box::new(AsyncWriteSender { sender });
|
||||
|
||||
(r, sender)
|
||||
}
|
||||
|
||||
/// Write everything the sender sends to us.
|
||||
pub async fn write_all(mut self, mut writer: Box<dyn AsyncWrite + Unpin>) -> Result<()> {
|
||||
let writer = writer.as_mut();
|
||||
|
||||
while let Some(data) = self.next().await {
|
||||
match data {
|
||||
Ok(v) => {
|
||||
writer.write_all(&v).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !self.eof {
|
||||
Err(io::Error::from(io::ErrorKind::BrokenPipe).into())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for AsyncWriteAdapter {
|
||||
type Item = std::io::Result<Bytes>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
match self.receiver.poll_recv(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Some(message)) => {
|
||||
use AsyncWriteMessage::*;
|
||||
match message {
|
||||
Data(v) => Poll::Ready(Some(Ok(v.into()))),
|
||||
Error(exception) => {
|
||||
let error = std::io::Error::other(format!("cxx error: {exception}"));
|
||||
Poll::Ready(Some(Err(error)))
|
||||
}
|
||||
Eof => {
|
||||
self.eof = true;
|
||||
Poll::Ready(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Ready(None) => {
|
||||
if !self.eof {
|
||||
Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe))))
|
||||
} else {
|
||||
Poll::Ready(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cxx::bridge]
|
||||
/// Generated by `cxx.rs`.
|
||||
///
|
||||
/// Mid-level wrapper of `libnixstore` implemented in C++.
|
||||
mod ffi {
|
||||
extern "Rust" {
|
||||
type AsyncWriteSender;
|
||||
fn send(self: &mut AsyncWriteSender, data: &[u8]) -> Result<()>;
|
||||
fn eof(self: &mut AsyncWriteSender) -> Result<()>;
|
||||
}
|
||||
|
||||
unsafe extern "C++" {
|
||||
include!("nixcp/src/bindings/nix.hpp");
|
||||
|
||||
// =========
|
||||
// CNixStore
|
||||
// =========
|
||||
|
||||
/// Mid-level wrapper for the Unix Domain Socket Nix Store.
|
||||
type CNixStore;
|
||||
|
||||
/// Queries information about a valid path.
|
||||
fn query_path_info(
|
||||
self: Pin<&mut CNixStore>,
|
||||
store_path: &[u8],
|
||||
) -> Result<UniquePtr<CPathInfo>>;
|
||||
|
||||
/// Computes the closure of a valid path.
|
||||
///
|
||||
/// If `flip_directions` is true, the set of paths that can reach `store_path` is
|
||||
/// returned.
|
||||
fn compute_fs_closure(
|
||||
self: Pin<&mut CNixStore>,
|
||||
store_path: &[u8],
|
||||
flip_direction: bool,
|
||||
include_outputs: bool,
|
||||
include_derivers: bool,
|
||||
) -> Result<UniquePtr<CxxVector<CxxString>>>;
|
||||
|
||||
/// Creates a NAR dump from a path.
|
||||
fn nar_from_path(
|
||||
self: Pin<&mut CNixStore>,
|
||||
base_name: Vec<u8>,
|
||||
sender: Box<AsyncWriteSender>,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Obtains a handle to the Nix store.
|
||||
fn open_nix_store() -> Result<UniquePtr<CNixStore>>;
|
||||
|
||||
// =========
|
||||
// CPathInfo
|
||||
// =========
|
||||
|
||||
/// Mid-level wrapper for the `nix::ValidPathInfo` struct.
|
||||
type CPathInfo;
|
||||
/// Returns the size of the NAR.
|
||||
fn nar_size(self: Pin<&mut CPathInfo>) -> u64;
|
||||
|
||||
/// Returns the references of the store path.
|
||||
fn references(self: Pin<&mut CPathInfo>) -> UniquePtr<CxxVector<CxxString>>;
|
||||
|
||||
/// Returns the possibly invalid signatures attached to the store path.
|
||||
fn sigs(self: Pin<&mut CPathInfo>) -> UniquePtr<CxxVector<CxxString>>;
|
||||
}
|
||||
}
|
128
src/bindings/nix.cpp
Normal file
128
src/bindings/nix.cpp
Normal file
|
@ -0,0 +1,128 @@
|
|||
/*
|
||||
Copyright 2022 Zhaofeng Li and the Attic contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// C++ side of the libnixstore glue.
|
||||
//
|
||||
// We implement a mid-level wrapper of the Nix Store interface,
|
||||
// which is then wrapped again in the Rust side to enable full
|
||||
// async-await operation.
|
||||
//
|
||||
// Here we stick with the naming conventions of Rust and handle
|
||||
// Rust types directly where possible, so that the interfaces are
|
||||
// satisfying to use from the Rust side via cxx.rs.
|
||||
|
||||
#include "nixcp/src/bindings/nix.hpp"
|
||||
|
||||
static std::mutex g_init_nix_mutex;
|
||||
static bool g_init_nix_done = false;
|
||||
|
||||
static nix::StorePath store_path_from_rust(RBasePathSlice base_name) {
|
||||
std::string_view sv((const char *)base_name.data(), base_name.size());
|
||||
return nix::StorePath(sv);
|
||||
}
|
||||
|
||||
// ========
|
||||
// RustSink
|
||||
// ========
|
||||
|
||||
RustSink::RustSink(RBox<AsyncWriteSender> sender) : sender(std::move(sender)) {}
|
||||
|
||||
void RustSink::operator () (std::string_view data) {
|
||||
RBasePathSlice s((const unsigned char *)data.data(), data.size());
|
||||
|
||||
this->sender->send(s);
|
||||
}
|
||||
|
||||
void RustSink::eof() {
|
||||
this->sender->eof();
|
||||
}
|
||||
|
||||
|
||||
// =========
|
||||
// CPathInfo
|
||||
// =========
|
||||
|
||||
CPathInfo::CPathInfo(nix::ref<const nix::ValidPathInfo> pi) : pi(pi) {}
|
||||
|
||||
uint64_t CPathInfo::nar_size() {
|
||||
return this->pi->narSize;
|
||||
}
|
||||
|
||||
std::unique_ptr<std::vector<std::string>> CPathInfo::sigs() {
|
||||
std::vector<std::string> result;
|
||||
for (auto&& elem : this->pi->sigs) {
|
||||
result.push_back(std::string(elem));
|
||||
}
|
||||
return std::make_unique<std::vector<std::string>>(result);
|
||||
}
|
||||
|
||||
std::unique_ptr<std::vector<std::string>> CPathInfo::references() {
|
||||
std::vector<std::string> result;
|
||||
for (auto&& elem : this->pi->references) {
|
||||
result.push_back(std::string(elem.to_string()));
|
||||
}
|
||||
return std::make_unique<std::vector<std::string>>(result);
|
||||
}
|
||||
|
||||
// =========
|
||||
// CNixStore
|
||||
// =========
|
||||
|
||||
CNixStore::CNixStore() {
|
||||
std::map<std::string, std::string> params;
|
||||
std::lock_guard<std::mutex> lock(g_init_nix_mutex);
|
||||
|
||||
if (!g_init_nix_done) {
|
||||
nix::initNix();
|
||||
g_init_nix_done = true;
|
||||
}
|
||||
|
||||
this->store = nix::openStore(nix::settings.storeUri.get(), params);
|
||||
}
|
||||
|
||||
std::unique_ptr<CPathInfo> CNixStore::query_path_info(RBasePathSlice base_name) {
|
||||
auto store_path = store_path_from_rust(base_name);
|
||||
|
||||
auto r = this->store->queryPathInfo(store_path);
|
||||
return std::make_unique<CPathInfo>(r);
|
||||
}
|
||||
|
||||
std::unique_ptr<std::vector<std::string>> CNixStore::compute_fs_closure(RBasePathSlice base_name, bool flip_direction, bool include_outputs, bool include_derivers) {
|
||||
std::set<nix::StorePath> out;
|
||||
|
||||
this->store->computeFSClosure(store_path_from_rust(base_name), out, flip_direction, include_outputs, include_derivers);
|
||||
|
||||
std::vector<std::string> result;
|
||||
for (auto&& elem : out) {
|
||||
result.push_back(std::string(elem.to_string()));
|
||||
}
|
||||
return std::make_unique<std::vector<std::string>>(result);
|
||||
}
|
||||
|
||||
void CNixStore::nar_from_path(RVec<unsigned char> base_name, RBox<AsyncWriteSender> sender) {
|
||||
RustSink sink(std::move(sender));
|
||||
|
||||
std::string_view sv((const char *)base_name.data(), base_name.size());
|
||||
nix::StorePath store_path(sv);
|
||||
|
||||
// exceptions will be thrown into Rust
|
||||
this->store->narFromPath(store_path, sink);
|
||||
sink.eof();
|
||||
}
|
||||
|
||||
std::unique_ptr<CNixStore> open_nix_store() {
|
||||
return std::make_unique<CNixStore>();
|
||||
}
|
89
src/bindings/nix.hpp
Normal file
89
src/bindings/nix.hpp
Normal file
|
@ -0,0 +1,89 @@
|
|||
/*
|
||||
Copyright 2022 Zhaofeng Li and the Attic contributors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// C++ side of the libnixstore glue.
|
||||
//
|
||||
// We implement a mid-level wrapper of the Nix Store interface,
|
||||
// which is then wrapped again in the Rust side to enable full
|
||||
// async-await operation.
|
||||
//
|
||||
// Here we stick with the naming conventions of Rust and handle
|
||||
// Rust types directly where possible, so that the interfaces are
|
||||
// satisfying to use from the Rust side via cxx.rs.
|
||||
|
||||
#pragma once
|
||||
#include <iostream>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
#include <nix/store-api.hh>
|
||||
#include <nix/local-store.hh>
|
||||
#include <nix/remote-store.hh>
|
||||
#include <nix/uds-remote-store.hh>
|
||||
#include <nix/hash.hh>
|
||||
#include <nix/path.hh>
|
||||
#include <nix/serialise.hh>
|
||||
#include <nix/shared.hh>
|
||||
#include <rust/cxx.h>
|
||||
|
||||
template<class T> using RVec = rust::Vec<T>;
|
||||
template<class T> using RBox = rust::Box<T>;
|
||||
template<class T> using RSlice = rust::Slice<T>;
|
||||
using RString = rust::String;
|
||||
using RStr = rust::Str;
|
||||
using RBasePathSlice = RSlice<const unsigned char>;
|
||||
using RHashSlice = RSlice<const unsigned char>;
|
||||
|
||||
struct AsyncWriteSender;
|
||||
|
||||
struct RustSink : nix::Sink
|
||||
{
|
||||
RBox<AsyncWriteSender> sender;
|
||||
public:
|
||||
RustSink(RBox<AsyncWriteSender> sender);
|
||||
void operator () (std::string_view data) override;
|
||||
void eof();
|
||||
};
|
||||
|
||||
// Opaque wrapper for nix::ValidPathInfo
|
||||
class CPathInfo {
|
||||
nix::ref<const nix::ValidPathInfo> pi;
|
||||
public:
|
||||
CPathInfo(nix::ref<const nix::ValidPathInfo> pi);
|
||||
std::unique_ptr<std::vector<std::string>> sigs();
|
||||
std::unique_ptr<std::vector<std::string>> references();
|
||||
uint64_t nar_size();
|
||||
};
|
||||
|
||||
class CNixStore {
|
||||
std::shared_ptr<nix::Store> store;
|
||||
public:
|
||||
CNixStore();
|
||||
|
||||
RString store_dir();
|
||||
std::unique_ptr<CPathInfo> query_path_info(RBasePathSlice base_name);
|
||||
std::unique_ptr<std::vector<std::string>> compute_fs_closure(
|
||||
RBasePathSlice base_name,
|
||||
bool flip_direction,
|
||||
bool include_outputs,
|
||||
bool include_derivers);
|
||||
void nar_from_path(RVec<unsigned char> base_name, RBox<AsyncWriteSender> sender);
|
||||
};
|
||||
|
||||
std::unique_ptr<CNixStore> open_nix_store();
|
||||
|
||||
// Relies on our definitions
|
||||
#include "nixcp/src/bindings/mod.rs.h"
|
65
src/lib.rs
Normal file
65
src/lib.rs
Normal file
|
@ -0,0 +1,65 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
|
||||
mod bindings;
|
||||
mod cli;
|
||||
pub mod make_nar;
|
||||
pub mod path_info;
|
||||
pub mod push;
|
||||
pub mod store;
|
||||
mod uploader;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version)]
|
||||
#[command(name = "nixcp")]
|
||||
#[command(about = "Upload store paths to a s3 binary cache")]
|
||||
#[command(long_about = None)]
|
||||
pub struct Cli {
|
||||
#[command(subcommand)]
|
||||
pub command: Commands,
|
||||
|
||||
/// Whether to enable tokio-console
|
||||
#[arg(long)]
|
||||
pub tokio_console: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
pub enum Commands {
|
||||
#[command(arg_required_else_help = true)]
|
||||
Push(PushArgs),
|
||||
}
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
pub struct PushArgs {
|
||||
/// The s3 bucket to upload to
|
||||
#[arg(long, value_name = "bucket name")]
|
||||
bucket: String,
|
||||
|
||||
/// Upstream cache to check against. Can be specified multiple times.
|
||||
/// cache.nixos.org is always included.
|
||||
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
|
||||
upstreams: Vec<String>,
|
||||
|
||||
/// Path to the file containing signing key
|
||||
/// e.g. ~/cache-priv-key.pem
|
||||
#[arg(long)]
|
||||
signing_key: String,
|
||||
|
||||
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
|
||||
#[arg(long)]
|
||||
region: Option<String>,
|
||||
|
||||
/// If unspecifed, will get it from AWS_ENDPOINT envar
|
||||
/// e.g. https://s3.example.com
|
||||
#[arg(long)]
|
||||
endpoint: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
skip_signature_check: bool,
|
||||
|
||||
/// Path to upload
|
||||
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||
#[arg(value_name = "PATH")]
|
||||
pub paths: Vec<PathBuf>,
|
||||
}
|
102
src/main.rs
102
src/main.rs
|
@ -1,88 +1,46 @@
|
|||
#![feature(let_chains)]
|
||||
#![feature(extend_one)]
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
use tracing_subscriber::{EnvFilter, FmtSubscriber};
|
||||
use clap::Parser;
|
||||
use tracing_subscriber::{EnvFilter, prelude::*};
|
||||
|
||||
use push::Push;
|
||||
|
||||
mod cli;
|
||||
mod path_info;
|
||||
mod push;
|
||||
mod uploader;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version)]
|
||||
#[command(name = "nixcp")]
|
||||
#[command(about = "Upload store paths to a s3 binary cache")]
|
||||
#[command(long_about = None)]
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Commands,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
enum Commands {
|
||||
#[command(arg_required_else_help = true)]
|
||||
Push(PushArgs),
|
||||
}
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
pub struct PushArgs {
|
||||
/// The s3 bucket to upload to
|
||||
#[arg(long, value_name = "bucket name")]
|
||||
bucket: String,
|
||||
|
||||
/// Upstream cache to check against. Can be specified multiple times.
|
||||
/// cache.nixos.org is always included.
|
||||
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
|
||||
upstreams: Vec<String>,
|
||||
|
||||
/// Path to the file containing signing key
|
||||
/// e.g. ~/cache-priv-key.pem
|
||||
#[arg(long)]
|
||||
signing_key: String,
|
||||
|
||||
/// If unspecified, will get it form AWS_DEFAULT_REGION envar
|
||||
#[arg(long)]
|
||||
region: Option<String>,
|
||||
|
||||
/// If unspecifed, will get it from AWS_ENDPOINT_URL envar
|
||||
/// e.g. https://s3.example.com
|
||||
#[arg(long)]
|
||||
endpoint: Option<String>,
|
||||
|
||||
/// AWS profile to use
|
||||
#[arg(long)]
|
||||
profile: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
skip_signature_check: bool,
|
||||
|
||||
/// Package or store path to upload
|
||||
/// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||
#[arg(value_name = "package or store path")]
|
||||
package: String,
|
||||
}
|
||||
use nixcp::push::Push;
|
||||
use nixcp::store::Store;
|
||||
use nixcp::{Cli, Commands};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let filter = EnvFilter::from_default_env();
|
||||
let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish();
|
||||
tracing::subscriber::set_global_default(subscriber)?;
|
||||
|
||||
let cli = Cli::parse();
|
||||
init_logging(cli.tokio_console);
|
||||
|
||||
match &cli.command {
|
||||
Commands::Push(cli) => {
|
||||
let push = Box::leak(Box::new(Push::new(cli).await?));
|
||||
push.paths_from_package(&cli.package)
|
||||
let store = Store::connect()?;
|
||||
let push = Box::leak(Box::new(Push::new(cli, store).await?));
|
||||
push.add_paths(cli.paths.clone())
|
||||
.await
|
||||
.context("nixcp get paths from package")?;
|
||||
.context("add paths to push")?;
|
||||
push.run().await.context("nixcp run")?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn init_logging(tokio_console: bool) {
|
||||
let env_filter = EnvFilter::from_default_env();
|
||||
let fmt_layer = tracing_subscriber::fmt::layer().with_filter(env_filter);
|
||||
|
||||
let console_layer = if tokio_console {
|
||||
Some(console_subscriber::spawn())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(fmt_layer)
|
||||
.with(console_layer)
|
||||
.init();
|
||||
|
||||
if tokio_console {
|
||||
println!("tokio-console is enabled");
|
||||
}
|
||||
}
|
||||
|
|
81
src/make_nar.rs
Normal file
81
src/make_nar.rs
Normal file
|
@ -0,0 +1,81 @@
|
|||
use anyhow::Result;
|
||||
use async_compression::{Level, tokio::bufread::ZstdEncoder};
|
||||
use nix_compat::{
|
||||
narinfo::{self, NarInfo},
|
||||
store_path::StorePath,
|
||||
};
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::{mem::take, sync::Arc};
|
||||
use tokio::io::{AsyncRead, BufReader};
|
||||
use tokio_util::io::InspectReader;
|
||||
|
||||
use crate::path_info::PathInfo;
|
||||
use crate::store::Store;
|
||||
|
||||
pub struct MakeNar<'a> {
|
||||
path_info: &'a PathInfo,
|
||||
store: Arc<Store>,
|
||||
pub nar_hasher: Sha256,
|
||||
/// hash of compressed nar file
|
||||
file_hasher: Sha256,
|
||||
pub nar_size: u64,
|
||||
file_size: u64,
|
||||
}
|
||||
|
||||
impl<'a> MakeNar<'a> {
|
||||
pub fn new(path_info: &'a PathInfo, store: Arc<Store>) -> Result<Self> {
|
||||
Ok(Self {
|
||||
path_info,
|
||||
store,
|
||||
nar_hasher: Sha256::new(),
|
||||
file_hasher: Sha256::new(),
|
||||
nar_size: 0,
|
||||
file_size: 0,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns a compressed nar reader which can be uploaded. File hash will be available when
|
||||
/// everything is read
|
||||
pub fn compress_and_hash(&mut self) -> Result<impl AsyncRead> {
|
||||
let nar_reader = self.store.nar_from_path(self.path_info.path.clone());
|
||||
// reader that hashes as nar is read
|
||||
let nar_reader = InspectReader::new(nar_reader, |x| {
|
||||
self.nar_size += x.len() as u64;
|
||||
self.nar_hasher.update(x);
|
||||
});
|
||||
|
||||
let encoder = ZstdEncoder::with_quality(BufReader::new(nar_reader), Level::Default);
|
||||
// reader that updates file_hash as the compressed nar is read
|
||||
Ok(InspectReader::new(encoder, |x| {
|
||||
self.file_size += x.len() as u64;
|
||||
self.file_hasher.update(x);
|
||||
}))
|
||||
}
|
||||
|
||||
/// Returns *unsigned* narinfo. `url` must be updated before uploading
|
||||
pub fn get_narinfo(&mut self) -> Result<NarInfo> {
|
||||
let file_hash = take(&mut self.file_hasher).finalize().into();
|
||||
let nar_hash = take(&mut self.nar_hasher).finalize().into();
|
||||
|
||||
Ok(NarInfo {
|
||||
flags: narinfo::Flags::empty(),
|
||||
store_path: self.path_info.path.as_ref(),
|
||||
nar_hash,
|
||||
nar_size: self.nar_size,
|
||||
references: self
|
||||
.path_info
|
||||
.references
|
||||
.iter()
|
||||
.map(StorePath::as_ref)
|
||||
.collect(),
|
||||
signatures: Vec::new(),
|
||||
ca: None,
|
||||
system: None,
|
||||
deriver: None,
|
||||
compression: Some("zstd"),
|
||||
file_hash: Some(file_hash),
|
||||
file_size: Some(self.file_size),
|
||||
url: "",
|
||||
})
|
||||
}
|
||||
}
|
218
src/path_info.rs
218
src/path_info.rs
|
@ -1,85 +1,80 @@
|
|||
use std::collections::HashSet;
|
||||
|
||||
use anyhow::{Context, Error, Result};
|
||||
use aws_sdk_s3 as s3;
|
||||
use anyhow::{Context, Result, anyhow};
|
||||
use futures::future::join_all;
|
||||
use nix_compat::nixbase32;
|
||||
use nix_compat::store_path::StorePath;
|
||||
use object_store::{ObjectStore, aws::AmazonS3, path::Path as ObjectPath};
|
||||
use regex::Regex;
|
||||
use serde::Deserialize;
|
||||
use std::path::Path;
|
||||
use tokio::process::Command;
|
||||
use tracing::{debug, error, trace};
|
||||
use tracing::{debug, trace};
|
||||
use url::Url;
|
||||
|
||||
// nix path-info --derivation --json
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
use crate::store::Store;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct PathInfo {
|
||||
pub deriver: Option<StorePath<String>>,
|
||||
pub path: StorePath<String>,
|
||||
signatures: Option<Vec<String>>,
|
||||
pub signatures: Vec<String>,
|
||||
pub references: Vec<StorePath<String>>,
|
||||
pub nar_size: u64,
|
||||
}
|
||||
|
||||
impl PathInfo {
|
||||
// get PathInfo for a package or a store path
|
||||
// we deserialize this as an array of `PathInfo` below
|
||||
pub async fn from_path(path: &str) -> Result<Self> {
|
||||
debug!("query nix path-info for {path}");
|
||||
// use lix cause nix would return a json map instead of an array
|
||||
// json output is not stable and could break in future
|
||||
// TODO figure out a better way
|
||||
let nix_cmd = Command::new("nix")
|
||||
.arg("run")
|
||||
.arg("--experimental-features")
|
||||
.arg("nix-command flakes")
|
||||
.arg("github:nixos/nixpkgs/nixos-unstable#lix")
|
||||
.arg("--")
|
||||
.arg("path-info")
|
||||
.arg("--json")
|
||||
.arg(path)
|
||||
.output()
|
||||
.await
|
||||
.context("`nix path-info` failed for {package}")?;
|
||||
pub async fn from_derivation(drv: &Path, store: &Store) -> Result<Self> {
|
||||
debug!("query path info for {:?}", drv);
|
||||
|
||||
trace!(
|
||||
"nix path-info output: {}",
|
||||
String::from_utf8_lossy(&nix_cmd.stdout)
|
||||
);
|
||||
|
||||
// nix path-info returns an array with one element
|
||||
match serde_json::from_slice::<Vec<_>>(&nix_cmd.stdout)
|
||||
.context("parse path info from stdout")
|
||||
{
|
||||
Ok(path_info) => path_info
|
||||
.into_iter()
|
||||
.next()
|
||||
.ok_or_else(|| Error::msg("nix path-info returned empty")),
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to parse data from `nix path-info`. The path may not exist on your system."
|
||||
);
|
||||
Err(e)
|
||||
let derivation = match drv.extension() {
|
||||
Some(ext) if ext == "drv" => drv.as_os_str().as_encoded_bytes(),
|
||||
_ => {
|
||||
let drv = {
|
||||
// resolve symlink
|
||||
if drv.is_symlink() {
|
||||
&drv.canonicalize()?
|
||||
} else {
|
||||
drv
|
||||
}
|
||||
};
|
||||
&Command::new("nix")
|
||||
.arg("path-info")
|
||||
.arg("--derivation")
|
||||
.arg(drv)
|
||||
.output()
|
||||
.await
|
||||
.context(format!("run command: nix path-info --derivaiton {drv:?}"))?
|
||||
.stdout
|
||||
}
|
||||
};
|
||||
let derivation = String::from_utf8_lossy(derivation);
|
||||
debug!("derivation: {derivation}");
|
||||
|
||||
if derivation.is_empty() {
|
||||
return Err(anyhow!(
|
||||
"nix path-info did not return a derivation for {drv:#?}"
|
||||
));
|
||||
}
|
||||
|
||||
Self::from_path(derivation.trim(), store).await
|
||||
}
|
||||
|
||||
pub async fn get_closure(&self) -> Result<Vec<Self>> {
|
||||
debug!("query nix-store for {}", self.absolute_path());
|
||||
let nix_store_cmd = Command::new("nix-store")
|
||||
.arg("--query")
|
||||
.arg("--requisites")
|
||||
.arg("--include-outputs")
|
||||
.arg(self.absolute_path())
|
||||
.output()
|
||||
pub async fn from_path(path: &str, store: &Store) -> Result<Self> {
|
||||
let store_path =
|
||||
StorePath::from_absolute_path(path.as_bytes()).context("storepath from path")?;
|
||||
store
|
||||
.query_path_info(store_path)
|
||||
.await
|
||||
.expect("nix-store cmd failed");
|
||||
.context("query pathinfo for path")
|
||||
}
|
||||
|
||||
let nix_store_paths = String::from_utf8(nix_store_cmd.stdout)?;
|
||||
let nix_store_paths: Vec<&str> = nix_store_paths.lines().collect();
|
||||
let mut closure = Vec::with_capacity(nix_store_paths.len());
|
||||
for path in nix_store_paths {
|
||||
closure.push(Self::from_path(path).await?);
|
||||
}
|
||||
Ok(closure)
|
||||
// TODO: skip call to query_path_info and return Vec<Path>?
|
||||
pub async fn get_closure(&self, store: &Store) -> Result<Vec<Self>> {
|
||||
let futs = store
|
||||
.compute_fs_closure(self.path.clone())
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|x| store.query_path_info(x));
|
||||
join_all(futs).await.into_iter().collect()
|
||||
}
|
||||
|
||||
/// checks if the path is signed by any upstream. if it is, we assume a cache hit.
|
||||
|
@ -101,23 +96,21 @@ impl PathInfo {
|
|||
}
|
||||
|
||||
fn signees(&self) -> Vec<&str> {
|
||||
if let Some(signatures) = self.signatures.as_ref() {
|
||||
let signees: Vec<_> = signatures
|
||||
.iter()
|
||||
.filter_map(|signature| Some(signature.split_once(":")?.0))
|
||||
.collect();
|
||||
trace!("signees for {}: {:?}", self.path, signees);
|
||||
return signees;
|
||||
}
|
||||
Vec::new()
|
||||
let signers: Vec<_> = self
|
||||
.signatures
|
||||
.iter()
|
||||
.filter_map(|signature| Some(signature.split_once(":")?.0))
|
||||
.collect();
|
||||
trace!("signers for {}: {:?}", self.path, signers);
|
||||
signers
|
||||
}
|
||||
|
||||
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
|
||||
for upstream in upstreams {
|
||||
let upstream = upstream
|
||||
.join(format!("{}.narinfo", self.digest()).as_str())
|
||||
.join(self.narinfo_path().as_ref())
|
||||
.expect("adding <hash>.narinfo should make a valid url");
|
||||
debug!("querying {}", upstream);
|
||||
trace!("querying {}", upstream);
|
||||
let res_status = reqwest::Client::new()
|
||||
.head(upstream.as_str())
|
||||
.send()
|
||||
|
@ -135,83 +128,12 @@ impl PathInfo {
|
|||
self.path.to_absolute_path()
|
||||
}
|
||||
|
||||
pub fn digest(&self) -> String {
|
||||
nixbase32::encode(self.path.digest())
|
||||
pub fn narinfo_path(&self) -> ObjectPath {
|
||||
ObjectPath::parse(format!("{}.narinfo", nixbase32::encode(self.path.digest())))
|
||||
.expect("must parse to a valid object_store path")
|
||||
}
|
||||
|
||||
pub async fn check_if_already_exists(&self, s3_client: &s3::Client, bucket: String) -> bool {
|
||||
s3_client
|
||||
.head_object()
|
||||
.bucket(bucket)
|
||||
.key(format!("{}.narinfo", self.digest()))
|
||||
.send()
|
||||
.await
|
||||
.is_ok()
|
||||
pub async fn check_if_already_exists(&self, s3: &AmazonS3) -> bool {
|
||||
s3.head(&self.narinfo_path()).await.is_ok()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn get_signees_from_path_info() {
|
||||
let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#;
|
||||
let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize");
|
||||
|
||||
path_info.signatures = Some(vec![
|
||||
"cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(),
|
||||
"nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(),
|
||||
]);
|
||||
let signees = path_info.signees();
|
||||
assert_eq!(signees, vec!["cache.nixos.org-1", "nixcache.cy7.sh"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn match_upstream_cache_from_signature() {
|
||||
let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#;
|
||||
let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize");
|
||||
|
||||
path_info.signatures = Some(vec![
|
||||
"cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(),
|
||||
"nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(),
|
||||
"nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs=".to_string(),
|
||||
]);
|
||||
assert!(
|
||||
path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()])
|
||||
);
|
||||
assert!(
|
||||
path_info.check_upstream_signature(&[Url::parse("https://nixcache.cy7.sh").unwrap()])
|
||||
);
|
||||
assert!(
|
||||
path_info.check_upstream_signature(&[
|
||||
Url::parse("https://nix-community.cachix.org").unwrap()
|
||||
])
|
||||
);
|
||||
assert!(
|
||||
!path_info
|
||||
.check_upstream_signature(&[Url::parse("https://fake-cache.cachix.org").unwrap()]),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn path_info_without_signature() {
|
||||
let path_info_json = r#"{"ca":"fixed:r:sha256:1q10p04pgx9sk6xbvrkn4nvh0ys2lzplgcni5368f4z3cr8ikbmz","narHash":"sha256-v64ZUWbjE4fMKNGyR++nQnsAtyV25r26mTr1dwm4IOA=","narSize":5520,"path":"/nix/store/gj6hz9mj23v01yvq1nn5f655jrcky1qq-nixos-option.nix","references":[],"registrationTime":1744740942,"valid":true}"#;
|
||||
let path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize");
|
||||
|
||||
assert!(
|
||||
!path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()])
|
||||
);
|
||||
}
|
||||
|
||||
/*
|
||||
#[test]
|
||||
fn path_info_deserialize_nix_map() {
|
||||
let path_info_json = r#"{"/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation":{"ca":null,"deriver":"/nix/store/h8z25s6arcrns5nmrq1yhgbamywjivpn-home-manager-generation.drv","narHash":"sha256-o4qwqyJ5UVm9cyC/nBNcNYVnIM14Pewgw7fou+wUVSY=","narSize":13608,"references":["/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/4d0ix5djms3n2jnjdc58l916cwack1rp-empty-directory","/nix/store/56zmgla8443qfrkrh2ch0vz0mh8ywrw1-home-manager-files","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/80l1sb3vcmrkcdd7ihlizkcnv19rq9fj-ncurses-6.5","/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation","/nix/store/92as847i10kl6s19fi910ddyk9l83835-check-link-targets.sh","/nix/store/9c90iz95yynyh3vsc67zndch6j01vgz3-home-manager-path","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bm5fi6wj0w4r2wjll2448k307bzfcjwx-cleanup","/nix/store/c244fsb3a7i5837lzn94m4bmav9i5p9b-link","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/gpxsdrrd4x93fs75395vr2dfys1ki9mq-jq-1.7.1-bin","/nix/store/jlf743lqxbvad6dbgndsgqfg20m2np5i-sd-switch-0.5.3","/nix/store/mhmgm739aagj4x7hr6ag2wjmxhmpy8mf-gettext-0.22.5","/nix/store/w9db12j05yv5hl31s6jndd9cfm1g1gw4-hm-modules-messages","/nix/store/wj1c3gsiajabnq50ifxqnlv60i5rhqj7-diffutils-3.10","/nix/store/xhql0ilzbiqwnmz4z8y0phk611wynxf2-gnused-4.9","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zhrjg6wxrxmdlpn6iapzpp2z2vylpvw5-home-manager.sh"],"registrationTime":1744742989,"signatures":["nixcache.cy7.sh:Vq4X95kSzum7BwrBhjmmM2yVipfBI3AE3jgZ3b3RoYrP4/ghotbDdlwCvwK3qx4BQdEOLSgrC1tDwiMNb6oRBw=="],"ultimate":false}}"#;
|
||||
serde_json::from_str::<HashMap<String, PathInfo>>(path_info_json).expect("must serialize");
|
||||
|
||||
let path_info_json = r#"{"/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b":{"ca":null,"deriver":"/nix/store/12ssi931481jlkizgfk1c1jnawvwjbhh-nixos-system-chunk-25.05.20250412.2631b0b.drv","narHash":"sha256-CHhBIzMD4v/FKqKgGroq0UC1k3GrK5lcNwQPMpv2xLc=","narSize":20704,"references":["/nix/store/0yjiyixxsr137iw93hnaacdsssy1li9h-switch-to-configuration-0.1.0","/nix/store/14rby7cpwrzjsjym44cl5h6nj6qpn1gs-etc","/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b","/nix/store/3wjljpj30fvv2cdb60apr4126pa5bm87-shadow-4.17.2","/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/5dyh8l59kfvf89zjkbmjfnx7fix93n4f-net-tools-2.10","/nix/store/aq9wdsz12bg9252790l9awiry2bml4ls-sops-install-secrets-0.0.1","/nix/store/b00kq6fjhgisdrykg621vml8505nnmb3-users-groups.json","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bfr68wi6k8icb3j9fy3fzchva56djfhd-mounts.sh","/nix/store/cjnihsds5hhnji9r85hglph07q9y9hgc-system-path","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/f9jll96j74f5ykvs062718b98lfjbn9g-util-linux-2.40.4-bin","/nix/store/h7zih134d3n5yk8pnhv1fa38n6qkyrn2-pre-switch-checks","/nix/store/idn5n51246piyxcr3v6gxnj5a5l9mzpn-linux-6.14.2","/nix/store/ipn5793y61x2904xqnkgbjnp91svjjzx-perl-5.40.0-env","/nix/store/j1rikvl25pz0b5ham1ijq0nbg1q2fqfy-initrd-linux-6.14.2","/nix/store/jgawnqyh6piwcl79gxpmq5czx9rfr9xh-glibc-locales-2.40-66","/nix/store/jqgmcv8j4gj59218hcbiyn8z951rycdj-install-grub.sh","/nix/store/kpmybhxy3gz6k1znbdirwsp3c6wvsgg9-manifest.json","/nix/store/lgainx4gl6q7mhiwmls81d3n51p5jz7z-linux-6.14.2-modules","/nix/store/mhxn5kwnri3z9hdzi3x0980id65p0icn-lib.sh","/nix/store/n8n0faszqlnf3mdg0fj6abnknrhjsw5j-perl-5.40.0-env","/nix/store/nq61v7a601gjndijq5nndprkzpwz4q9g-glibc-2.40-66-bin","/nix/store/nx27idxpvi3fk3p7admvhipny73nr25n-kmod-31","/nix/store/pggww1d2pg24fcg5v36xn63n53vanyyi-gnupg-2.4.7","/nix/store/rg5rf512szdxmnj9qal3wfdnpfsx38qi-setup-etc.pl","/nix/store/vvlfaafnz3pdhw7lx5kc5gb9pl4zhz5l-local-cmds","/nix/store/w142vx7ij1fz6qwhp5dprkf59cizvv1v-update-users-groups.pl","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zlsmh0ccgvncg30qb4y0mp5pahnk1wnw-append-initrd-secrets","/nix/store/zs07icpv5ykf8m36xcv717hh26bp09fa-firmware","/nix/store/zy2n4id5gcxcbx2x8jbblkmcpdlpsypk-getent-glibc-2.40-66"],"registrationTime":1744743136,"signatures":["nixcache.cy7.sh:dZ1XiKQNe0fRX48gBj03PIABYJGV6BPwb72YpMqEBONZMF+JrkVKhRCF0ur/4Bf5prHxg6Qfg1ytP/4csRC9DQ=="],"ultimate":false}}"#;
|
||||
serde_json::from_str::<HashMap<String, PathInfo>>(path_info_json).expect("must serialize");
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
|
137
src/push.rs
137
src/push.rs
|
@ -1,6 +1,8 @@
|
|||
use std::{
|
||||
collections::HashSet,
|
||||
fs,
|
||||
iter::once,
|
||||
path::PathBuf,
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
|
@ -8,22 +10,22 @@ use std::{
|
|||
};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use aws_config::Region;
|
||||
use aws_sdk_s3 as s3;
|
||||
use futures::future::join_all;
|
||||
use humansize::{DECIMAL, format_size};
|
||||
use nix_compat::narinfo::{self, SigningKey};
|
||||
use tokio::sync::{RwLock, mpsc};
|
||||
use tracing::{debug, info, trace};
|
||||
use object_store::aws::{AmazonS3, AmazonS3Builder};
|
||||
use tokio::sync::{RwLock, Semaphore, mpsc};
|
||||
use tracing::debug;
|
||||
use url::Url;
|
||||
|
||||
use crate::{PushArgs, path_info::PathInfo, uploader::Uploader};
|
||||
use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
|
||||
|
||||
pub struct Push {
|
||||
upstream_caches: Vec<Url>,
|
||||
store_paths: Arc<RwLock<Vec<PathInfo>>>,
|
||||
s3_client: s3::Client,
|
||||
store_paths: Arc<RwLock<HashSet<PathInfo>>>,
|
||||
signing_key: SigningKey<ed25519_dalek::SigningKey>,
|
||||
bucket: String,
|
||||
store: Arc<Store>,
|
||||
s3: Arc<AmazonS3>,
|
||||
// paths that we skipped cause of a signature match
|
||||
signature_hit_count: AtomicUsize,
|
||||
// paths that we skipped cause we found it on an upstream
|
||||
|
@ -35,7 +37,7 @@ pub struct Push {
|
|||
}
|
||||
|
||||
impl Push {
|
||||
pub async fn new(cli: &PushArgs) -> Result<Self> {
|
||||
pub async fn new(cli: &PushArgs, store: Store) -> Result<Self> {
|
||||
let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1);
|
||||
for upstream in cli
|
||||
.upstreams
|
||||
|
@ -49,24 +51,21 @@ impl Push {
|
|||
let key = fs::read_to_string(&cli.signing_key)?;
|
||||
let signing_key = narinfo::parse_keypair(key.as_str())?.0;
|
||||
|
||||
let mut s3_config = aws_config::from_env();
|
||||
let mut s3_builder = AmazonS3Builder::from_env().with_bucket_name(&cli.bucket);
|
||||
|
||||
if let Some(region) = &cli.region {
|
||||
s3_config = s3_config.region(Region::new(region.clone()));
|
||||
s3_builder = s3_builder.with_region(region);
|
||||
}
|
||||
if let Some(endpoint) = &cli.endpoint {
|
||||
s3_config = s3_config.endpoint_url(endpoint);
|
||||
}
|
||||
if let Some(profile) = &cli.profile {
|
||||
s3_config = s3_config.profile_name(profile);
|
||||
s3_builder = s3_builder.with_endpoint(endpoint);
|
||||
}
|
||||
|
||||
let s3_client = s3::Client::new(&s3_config.load().await);
|
||||
Ok(Self {
|
||||
upstream_caches: upstreams,
|
||||
store_paths: Arc::new(RwLock::new(Vec::new())),
|
||||
s3_client,
|
||||
store_paths: Arc::new(RwLock::new(HashSet::new())),
|
||||
signing_key,
|
||||
bucket: cli.bucket.clone(),
|
||||
store: Arc::new(store),
|
||||
s3: Arc::new(s3_builder.build()?),
|
||||
signature_hit_count: AtomicUsize::new(0),
|
||||
upstream_hit_count: AtomicUsize::new(0),
|
||||
already_exists_count: AtomicUsize::new(0),
|
||||
|
@ -74,26 +73,42 @@ impl Push {
|
|||
})
|
||||
}
|
||||
|
||||
pub async fn paths_from_package(&mut self, package: &str) -> Result<()> {
|
||||
let path_info = PathInfo::from_path(package)
|
||||
pub async fn add_paths(&'static self, paths: Vec<PathBuf>) -> Result<()> {
|
||||
let mut futs = Vec::with_capacity(paths.len());
|
||||
for path in paths {
|
||||
let store_paths = self.store_paths.clone();
|
||||
let store = self.store.clone();
|
||||
|
||||
futs.push(tokio::spawn(async move {
|
||||
let path_info = PathInfo::from_derivation(path.as_path(), &store)
|
||||
.await
|
||||
.context("get path info for path")?;
|
||||
debug!("path-info for {path:?}: {path_info:?}");
|
||||
|
||||
store_paths.write().await.extend(
|
||||
path_info
|
||||
.get_closure(&store)
|
||||
.await
|
||||
.context("closure from path info")?,
|
||||
);
|
||||
Ok(())
|
||||
}));
|
||||
}
|
||||
join_all(futs)
|
||||
.await
|
||||
.context("get path info for package")?;
|
||||
debug!("path-info for {package}: {:?}", path_info);
|
||||
self.store_paths.write().await.extend(
|
||||
path_info
|
||||
.get_closure()
|
||||
.await
|
||||
.context("closure from path info")?,
|
||||
);
|
||||
info!("found {} store paths", self.store_paths.read().await.len());
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
println!("found {} store paths", self.store_paths.read().await.len());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn run(&'static self) -> Result<()> {
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
let filter = tokio::spawn(self.filter_from_upstream(tx));
|
||||
let upload = tokio::spawn(self.upload(rx));
|
||||
|
||||
filter.await?;
|
||||
upload.await??;
|
||||
Ok(())
|
||||
|
@ -101,33 +116,34 @@ impl Push {
|
|||
|
||||
/// filter paths that are on upstream and send to `tx`
|
||||
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
|
||||
let mut handles = Vec::with_capacity(10);
|
||||
let mut handles = Vec::new();
|
||||
let store_paths = self.store_paths.read().await.clone();
|
||||
// limit number of inflight requests
|
||||
let inflight_permits = Arc::new(Semaphore::new(32));
|
||||
|
||||
for path in store_paths.into_iter() {
|
||||
if path.check_upstream_signature(&self.upstream_caches) {
|
||||
trace!("skip {} (signature match)", path.absolute_path());
|
||||
self.signature_hit_count.fetch_add(1, Ordering::Release);
|
||||
debug!("skip {} (signature match)", path.absolute_path());
|
||||
self.signature_hit_count.fetch_add(1, Ordering::Relaxed);
|
||||
continue;
|
||||
}
|
||||
handles.push({
|
||||
let tx = tx.clone();
|
||||
let inflight_permits = inflight_permits.clone();
|
||||
tokio::spawn(async move {
|
||||
let _permit = inflight_permits.acquire().await.unwrap();
|
||||
if !path
|
||||
.check_upstream_hit(self.upstream_caches.as_slice())
|
||||
.await
|
||||
{
|
||||
if path
|
||||
.check_if_already_exists(&self.s3_client, self.bucket.clone())
|
||||
.await
|
||||
{
|
||||
trace!("skip {} (already exists)", path.absolute_path());
|
||||
if path.check_if_already_exists(&self.s3).await {
|
||||
debug!("skip {} (already exists)", path.absolute_path());
|
||||
self.already_exists_count.fetch_add(1, Ordering::Relaxed);
|
||||
} else {
|
||||
tx.send(path).await.unwrap();
|
||||
}
|
||||
} else {
|
||||
trace!("skip {} (upstream hit)", path.absolute_path());
|
||||
debug!("skip {} (upstream hit)", path.absolute_path());
|
||||
self.upstream_hit_count.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
})
|
||||
|
@ -142,24 +158,35 @@ impl Push {
|
|||
}
|
||||
|
||||
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||
let mut uploads = Vec::with_capacity(10);
|
||||
let mut uploads = Vec::new();
|
||||
let permits = Arc::new(Semaphore::new(10));
|
||||
|
||||
loop {
|
||||
let permits = permits.clone();
|
||||
|
||||
if let Some(path_to_upload) = rx.recv().await {
|
||||
let absolute_path = path_to_upload.absolute_path();
|
||||
|
||||
println!("uploading: {}", absolute_path);
|
||||
let uploader = Uploader::new(
|
||||
&self.signing_key,
|
||||
path_to_upload,
|
||||
&self.s3_client,
|
||||
self.bucket.clone(),
|
||||
)?;
|
||||
|
||||
uploads.push(tokio::spawn(async move {
|
||||
let res = uploader.upload().await;
|
||||
self.upload_count.fetch_add(1, Ordering::Relaxed);
|
||||
res
|
||||
uploads.push(tokio::spawn({
|
||||
// large uploads will be concurrently uploaded with multipart anyway so don't spawn
|
||||
// too much of them
|
||||
let permit = if path_to_upload.nar_size > 15 * 1024 * 1024 {
|
||||
Some(permits.acquire_owned().await.unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
println!(
|
||||
"uploading: {} (size: {})",
|
||||
path_to_upload.absolute_path(),
|
||||
format_size(path_to_upload.nar_size, DECIMAL)
|
||||
);
|
||||
let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
|
||||
let s3 = self.s3.clone();
|
||||
let store = self.store.clone();
|
||||
async move {
|
||||
let res = uploader.upload(s3, store).await;
|
||||
drop(permit);
|
||||
self.upload_count.fetch_add(1, Ordering::Relaxed);
|
||||
res
|
||||
}
|
||||
}));
|
||||
} else {
|
||||
join_all(uploads)
|
||||
|
|
100
src/store.rs
Normal file
100
src/store.rs
Normal file
|
@ -0,0 +1,100 @@
|
|||
use std::{ffi::OsStr, os::unix::ffi::OsStrExt, sync::Arc};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use nix_compat::store_path::StorePath;
|
||||
use tokio::{io::AsyncRead, task};
|
||||
use tokio_util::io::StreamReader;
|
||||
|
||||
use crate::{
|
||||
bindings::{self, AsyncWriteAdapter},
|
||||
path_info::PathInfo,
|
||||
};
|
||||
|
||||
pub struct Store {
|
||||
inner: Arc<bindings::FfiNixStore>,
|
||||
}
|
||||
|
||||
impl Store {
|
||||
pub fn connect() -> Result<Self> {
|
||||
let inner = unsafe { bindings::open_nix_store()? };
|
||||
Ok(Self {
|
||||
inner: Arc::new(inner),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn compute_fs_closure(
|
||||
&self,
|
||||
path: StorePath<String>,
|
||||
) -> Result<Vec<StorePath<String>>> {
|
||||
let inner = self.inner.clone();
|
||||
task::spawn_blocking(move || {
|
||||
let cxx_vector =
|
||||
inner
|
||||
.store()
|
||||
.compute_fs_closure(path.to_string().as_bytes(), false, true, true)?;
|
||||
cxx_vector
|
||||
.iter()
|
||||
.map(|x| {
|
||||
StorePath::from_bytes(x.as_bytes())
|
||||
.context("make StorePath from vector returned by compute_fs_closure")
|
||||
})
|
||||
.collect::<Result<_, _>>()
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub async fn query_path_info(&self, path: StorePath<String>) -> Result<PathInfo> {
|
||||
let inner = self.inner.clone();
|
||||
|
||||
task::spawn_blocking(move || {
|
||||
let mut c_path_info = inner
|
||||
.store()
|
||||
.query_path_info(path.to_string().as_bytes())
|
||||
.context("query cpp for path info")?;
|
||||
|
||||
let signatures = c_path_info
|
||||
.pin_mut()
|
||||
.sigs()
|
||||
.into_iter()
|
||||
.map(|x| {
|
||||
let osstr = OsStr::from_bytes(x.as_bytes());
|
||||
osstr.to_str().unwrap().to_string()
|
||||
})
|
||||
.collect();
|
||||
let references = c_path_info
|
||||
.pin_mut()
|
||||
.references()
|
||||
.into_iter()
|
||||
.map(|x| StorePath::from_bytes(x.as_bytes()))
|
||||
.collect::<Result<_, _>>()
|
||||
.context("get references from pathinfo")?;
|
||||
let nar_size = c_path_info.pin_mut().nar_size();
|
||||
|
||||
Ok(PathInfo {
|
||||
path,
|
||||
signatures,
|
||||
references,
|
||||
nar_size,
|
||||
})
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn nar_from_path(&self, store_path: StorePath<String>) -> impl AsyncRead {
|
||||
let inner = self.inner.clone();
|
||||
let (adapter, mut sender) = AsyncWriteAdapter::new();
|
||||
let base_name = store_path.to_string().as_bytes().to_vec();
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
// Send all exceptions through the channel, and ignore errors
|
||||
// during sending (the channel may have been closed).
|
||||
if let Err(e) = inner.store().nar_from_path(base_name, sender.clone()) {
|
||||
let _ = sender.rust_error(e);
|
||||
}
|
||||
});
|
||||
|
||||
StreamReader::new(adapter)
|
||||
}
|
||||
}
|
217
src/uploader.rs
217
src/uploader.rs
|
@ -1,187 +1,80 @@
|
|||
use anyhow::Result;
|
||||
use async_compression::{Level, tokio::bufread::ZstdEncoder};
|
||||
use aws_sdk_s3::{
|
||||
self as s3,
|
||||
types::{CompletedMultipartUpload, CompletedPart},
|
||||
};
|
||||
use futures::future::join_all;
|
||||
use nix_compat::{
|
||||
narinfo::{self, NarInfo, SigningKey},
|
||||
nixbase32,
|
||||
store_path::StorePath,
|
||||
};
|
||||
use sha2::{Digest, Sha256};
|
||||
use tokio::{io::AsyncReadExt, process::Command};
|
||||
use tracing::debug;
|
||||
use bytes::BytesMut;
|
||||
use nix_compat::{narinfo::SigningKey, nixbase32};
|
||||
use object_store::{ObjectStore, aws::AmazonS3, buffered::BufWriter, path::Path};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tracing::{debug, trace};
|
||||
use ulid::Ulid;
|
||||
|
||||
use crate::path_info::PathInfo;
|
||||
use crate::{make_nar::MakeNar, path_info::PathInfo, store::Store};
|
||||
|
||||
const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5;
|
||||
const CHUNK_SIZE: usize = 1024 * 1024 * 5;
|
||||
|
||||
pub struct Uploader<'a> {
|
||||
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
|
||||
path: PathInfo,
|
||||
s3_client: &'a s3::Client,
|
||||
bucket: String,
|
||||
}
|
||||
|
||||
impl<'a> Uploader<'a> {
|
||||
pub fn new(
|
||||
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
|
||||
path: PathInfo,
|
||||
s3_client: &'a s3::Client,
|
||||
bucket: String,
|
||||
) -> Result<Self> {
|
||||
Ok(Self {
|
||||
signing_key,
|
||||
path,
|
||||
s3_client,
|
||||
bucket,
|
||||
})
|
||||
Ok(Self { signing_key, path })
|
||||
}
|
||||
|
||||
pub async fn upload(&self) -> Result<()> {
|
||||
let nar = self.make_nar().await?;
|
||||
let mut nar_info = self.narinfo_from_nar(&nar)?;
|
||||
let nar = self.compress_nar(&nar).await;
|
||||
pub async fn upload(&self, s3: Arc<AmazonS3>, store: Arc<Store>) -> Result<()> {
|
||||
let mut nar = MakeNar::new(&self.path, store)?;
|
||||
|
||||
// update fields that we know after compression
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(&nar);
|
||||
let hash: [u8; 32] = hasher.finalize().into();
|
||||
let nar_url = self.nar_url(&hash);
|
||||
nar_info.file_hash = Some(hash);
|
||||
nar_info.file_size = Some(nar.len() as u64);
|
||||
nar_info.url = nar_url.as_str();
|
||||
debug!("uploading nar with key: {nar_url}");
|
||||
// we don't know what the hash of the compressed file will be so upload to a
|
||||
// temp location for now
|
||||
let temp_path = Path::parse(Ulid::new().to_string())?;
|
||||
let mut s3_writer = BufWriter::new(s3.clone(), temp_path.clone());
|
||||
debug!("uploading to temp path: {}", temp_path);
|
||||
|
||||
if nar.len() < MULTIPART_CUTOFF {
|
||||
let put_object = self
|
||||
.s3_client
|
||||
.put_object()
|
||||
.bucket(&self.bucket)
|
||||
.key(&nar_url)
|
||||
.body(nar.into())
|
||||
.send()
|
||||
.await?;
|
||||
debug!("put object: {:#?}", put_object);
|
||||
} else {
|
||||
let multipart = self
|
||||
.s3_client
|
||||
.create_multipart_upload()
|
||||
.bucket(&self.bucket)
|
||||
.key(&nar_url)
|
||||
.send()
|
||||
.await?;
|
||||
let upload_id = multipart.upload_id().unwrap();
|
||||
|
||||
let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF);
|
||||
let chunks = nar.chunks(MULTIPART_CUTOFF);
|
||||
for (i, chunk) in chunks.enumerate() {
|
||||
parts.push(tokio::task::spawn(
|
||||
self.s3_client
|
||||
.upload_part()
|
||||
.bucket(&self.bucket)
|
||||
.key(&nar_url)
|
||||
.upload_id(upload_id)
|
||||
.part_number(i as i32 + 1)
|
||||
.body(chunk.to_vec().into())
|
||||
.send(),
|
||||
));
|
||||
// compress and upload nar
|
||||
let mut file_reader = nar.compress_and_hash()?;
|
||||
loop {
|
||||
let mut buf = BytesMut::with_capacity(CHUNK_SIZE);
|
||||
let n = file_reader.read_buf(&mut buf).await?;
|
||||
s3_writer.put(buf.freeze()).await?;
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
let completed_parts = join_all(parts)
|
||||
.await
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, part)| {
|
||||
CompletedPart::builder()
|
||||
.set_e_tag(part.e_tag().map(ToString::to_string))
|
||||
.set_part_number(Some(i as i32 + 1))
|
||||
.set_checksum_sha256(part.checksum_sha256().map(ToString::to_string))
|
||||
.build()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let completed_mp_upload = CompletedMultipartUpload::builder()
|
||||
.set_parts(Some(completed_parts))
|
||||
.build();
|
||||
|
||||
let complete_mp_upload = self
|
||||
.s3_client
|
||||
.complete_multipart_upload()
|
||||
.bucket(&self.bucket)
|
||||
.key(&nar_url)
|
||||
.upload_id(upload_id)
|
||||
.multipart_upload(completed_mp_upload)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
debug!("complete multipart upload: {:#?}", complete_mp_upload);
|
||||
}
|
||||
drop(file_reader);
|
||||
|
||||
let narinfo_url = format!("{}.narinfo", self.path.digest());
|
||||
debug!("uploading narinfo with key {narinfo_url}");
|
||||
self.s3_client
|
||||
.put_object()
|
||||
.bucket(&self.bucket)
|
||||
.key(narinfo_url)
|
||||
.body(nar_info.to_string().as_bytes().to_vec().into())
|
||||
.send()
|
||||
.await?;
|
||||
let mut nar_info = nar.get_narinfo()?;
|
||||
nar_info.add_signature(self.signing_key);
|
||||
|
||||
// now that we can calculate the file_hash move the nar to where it should be
|
||||
let real_path = nar_url(
|
||||
&nar_info
|
||||
.file_hash
|
||||
.expect("file hash must be known at this point"),
|
||||
);
|
||||
debug!("moving {} to {}", temp_path, real_path);
|
||||
// the temp object must be done uploading
|
||||
s3_writer.shutdown().await?;
|
||||
// this is implemented as a copy-and-delete
|
||||
s3.rename(&temp_path, &real_path).await?;
|
||||
// set nar url in narinfo
|
||||
nar_info.url = real_path.as_ref();
|
||||
|
||||
// upload narinfo
|
||||
let narinfo_path = self.path.narinfo_path();
|
||||
debug!("uploading narinfo: {}", narinfo_path);
|
||||
trace!("narinfo: {:#}", nar_info);
|
||||
s3.put(&narinfo_path, nar_info.to_string().into()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn make_nar(&self) -> Result<Vec<u8>> {
|
||||
Ok(Command::new("nix")
|
||||
.arg("nar")
|
||||
.arg("dump-path")
|
||||
.arg(self.path.absolute_path())
|
||||
.output()
|
||||
.await?
|
||||
.stdout)
|
||||
}
|
||||
|
||||
fn narinfo_from_nar(&self, nar: &[u8]) -> Result<NarInfo> {
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(nar);
|
||||
let nar_hash: [u8; 32] = hasher.finalize().into();
|
||||
let mut nar_info = NarInfo {
|
||||
flags: narinfo::Flags::empty(),
|
||||
store_path: self.path.path.as_ref(),
|
||||
nar_hash,
|
||||
nar_size: nar.len() as u64,
|
||||
references: self.path.references.iter().map(StorePath::as_ref).collect(),
|
||||
signatures: Vec::new(),
|
||||
ca: None,
|
||||
system: None,
|
||||
deriver: self.path.deriver.as_ref().map(|x| x.as_ref()),
|
||||
compression: Some("zstd"),
|
||||
file_hash: None,
|
||||
file_size: None,
|
||||
url: "",
|
||||
};
|
||||
// signature consists of: store_path, nar_hash, nar_size, and references
|
||||
nar_info.add_signature(self.signing_key);
|
||||
Ok(nar_info)
|
||||
}
|
||||
|
||||
fn nar_url(&self, compressed_nar_hash: &[u8]) -> String {
|
||||
let compressed_nar_hash = nixbase32::encode(compressed_nar_hash);
|
||||
format!("nar/{compressed_nar_hash}.nar.zst")
|
||||
}
|
||||
|
||||
async fn compress_nar(&self, nar: &[u8]) -> Vec<u8> {
|
||||
let mut encoder = ZstdEncoder::with_quality(nar, Level::Default);
|
||||
let mut compressed = Vec::with_capacity(nar.len());
|
||||
encoder
|
||||
.read_to_end(&mut compressed)
|
||||
.await
|
||||
.expect("should compress just fine");
|
||||
compressed
|
||||
}
|
||||
}
|
||||
|
||||
/// calculate url where the compressed nar should be uploaded
|
||||
fn nar_url(file_hash: &[u8]) -> Path {
|
||||
let compressed_nar_hash = nixbase32::encode(file_hash);
|
||||
Path::parse(format!("nar/{compressed_nar_hash}.nar.zst"))
|
||||
.expect("should parse to a valid object_store::path::Path")
|
||||
}
|
||||
|
|
32
tests/common/mod.rs
Normal file
32
tests/common/mod.rs
Normal file
|
@ -0,0 +1,32 @@
|
|||
#![allow(dead_code)]
|
||||
|
||||
use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
|
||||
use nixcp::store::Store;
|
||||
|
||||
pub const HELLO: &str = "github:nixos/nixpkgs?ref=f771eb401a46846c1aebd20552521b233dd7e18b#hello";
|
||||
pub const HELLO_DRV: &str = "iqbwkm8mjjjlmw6x6ry9rhzin2cp9372-hello-2.12.1.drv";
|
||||
pub const HELLO_PATH: &str = "/nix/store/9bwryidal9q3g91cjm6xschfn4ikd82q-hello-2.12.1";
|
||||
|
||||
pub struct Context {
|
||||
pub store: Arc<Store>,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
fn new() -> Self {
|
||||
// hello must be in the store
|
||||
Command::new("nix")
|
||||
.arg("build")
|
||||
.arg("--no-link")
|
||||
.arg(HELLO)
|
||||
.status()
|
||||
.unwrap();
|
||||
let store = Arc::new(Store::connect().expect("connect to nix store"));
|
||||
Self { store }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn context() -> Context {
|
||||
Context::new()
|
||||
}
|
26
tests/nar.rs
Normal file
26
tests/nar.rs
Normal file
|
@ -0,0 +1,26 @@
|
|||
use crate::common::HELLO_PATH;
|
||||
use nix_compat::nixbase32;
|
||||
use nixcp::make_nar::MakeNar;
|
||||
use nixcp::path_info::PathInfo;
|
||||
use sha2::Digest;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
mod common;
|
||||
|
||||
#[tokio::test]
|
||||
async fn nar_size_and_hash() {
|
||||
let ctx = common::context();
|
||||
let path_info = PathInfo::from_path(HELLO_PATH, &ctx.store).await.unwrap();
|
||||
|
||||
let mut nar = MakeNar::new(&path_info, ctx.store).unwrap();
|
||||
let mut reader = nar.compress_and_hash().unwrap();
|
||||
let mut buf = Vec::new();
|
||||
reader.read_to_end(&mut buf).await.unwrap();
|
||||
drop(reader);
|
||||
|
||||
assert_eq!(nar.nar_size, 234680);
|
||||
|
||||
let nar_hash = nar.nar_hasher.finalize();
|
||||
let real_nar_hash = "08za7nnjda8kpdsd73v3mhykjvp0rsmskwsr37winhmzgm6iw79w";
|
||||
assert_eq!(nixbase32::encode(nar_hash.as_slice()), real_nar_hash);
|
||||
}
|
56
tests/path_info.rs
Normal file
56
tests/path_info.rs
Normal file
|
@ -0,0 +1,56 @@
|
|||
use nixcp::path_info::PathInfo;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::common::{HELLO, HELLO_DRV, HELLO_PATH};
|
||||
|
||||
mod common;
|
||||
|
||||
#[tokio::test]
|
||||
async fn path_info_from_package() {
|
||||
let ctx = common::context();
|
||||
let path = PathBuf::from(HELLO);
|
||||
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
assert_eq!(path_info.path.to_string(), HELLO_DRV);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn path_info_from_path() {
|
||||
let ctx = common::context();
|
||||
let path = PathBuf::from(HELLO_PATH);
|
||||
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
assert_eq!(path_info.path.to_string(), HELLO_DRV);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn path_info_symlink() {
|
||||
let ctx = common::context();
|
||||
|
||||
let temp_path = TempDir::new().unwrap();
|
||||
let link_path = temp_path.path().join("result");
|
||||
|
||||
// symlink at ./result (like `nix build`)
|
||||
std::os::unix::fs::symlink(HELLO_PATH, &link_path).unwrap();
|
||||
|
||||
// should resolve symlink
|
||||
let path_info = PathInfo::from_derivation(&link_path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
assert_eq!(path_info.path.to_string(), HELLO_DRV);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn closure() {
|
||||
let ctx = common::context();
|
||||
let path = PathBuf::from(HELLO);
|
||||
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
let closure = path_info.get_closure(&ctx.store).await.unwrap();
|
||||
assert_eq!(closure.len(), 472);
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue