Compare commits
32 commits
39792cdd40
...
3c40776981
Author | SHA1 | Date | |
---|---|---|---|
3c40776981 | |||
9b0c6aece6 | |||
14d6e9d29e | |||
0e97d11745 | |||
a0794b0356 | |||
05589641cf | |||
76cbc85032 | |||
09181ae785 | |||
54d4c714af | |||
878e096494 | |||
7285c29e88 | |||
9d2e9e38bd | |||
01443d0d99 | |||
b49be95d09 | |||
e5336d304d | |||
ca97aebd7a | |||
85fefe9e77 | |||
0fedae9334 | |||
0fae7ac3b0 | |||
7dec14fc1a | |||
d524222a86 | |||
5a3e6089b4 | |||
fc304df35e | |||
b8877f33a3 | |||
846c465ea0 | |||
81ce855dae | |||
b1e59d0a6c | |||
6806b96892 | |||
84bbe5dcb4 | |||
a771785352 | |||
8ac9253ea3 | |||
c956d6741a |
20 changed files with 1936 additions and 1511 deletions
2
.cargo/config.toml
Normal file
2
.cargo/config.toml
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
[build]
|
||||||
|
rustflags = ["--cfg", "tokio_unstable"]
|
3
.editorconfig
Normal file
3
.editorconfig
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
[*.nix]
|
||||||
|
indent_size = 2
|
||||||
|
indent_stype = space
|
1834
Cargo.lock
generated
1834
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
28
Cargo.toml
28
Cargo.toml
|
@ -3,21 +3,37 @@ name = "nixcp"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
|
[profile.release]
|
||||||
|
lto = true
|
||||||
|
codegen-units = 1
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0.97"
|
anyhow = "1.0.97"
|
||||||
async-compression = { version = "0.4.22", features = ["tokio", "zstd"] }
|
async-compression = { version = "0.4.22", features = ["tokio", "zstd"] }
|
||||||
aws-config = { version = "1.6.1", features = ["behavior-version-latest"] }
|
|
||||||
aws-sdk-s3 = "1.82.0"
|
|
||||||
clap = { version = "4.5.34", features = ["derive"] }
|
clap = { version = "4.5.34", features = ["derive"] }
|
||||||
ed25519-dalek = "2.1.1"
|
ed25519-dalek = "2.1.1"
|
||||||
futures = "0.3.31"
|
futures = "0.3.31"
|
||||||
nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" }
|
nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" }
|
||||||
regex = "1.11.1"
|
regex = "1.11.1"
|
||||||
reqwest = "0.12.15"
|
reqwest = "0.12.15"
|
||||||
serde = { version = "1.0.219", features = [ "derive" ]}
|
serde = { version = "1.0.219", features = ["derive"] }
|
||||||
serde_json = "1.0.140"
|
serde_json = "1.0.140"
|
||||||
sha2 = "0.10.8"
|
sha2 = "0.10.8"
|
||||||
tokio = { version = "1.44.1", features = [ "full" ]}
|
tokio = { version = "1.44.1", features = ["full", "tracing", "parking_lot"] }
|
||||||
tracing = "0.1.41"
|
tracing = "0.1.41"
|
||||||
tracing-subscriber = { version = "0.3.19", features = ["env-filter"]}
|
url = { version = "2.5.4", features = ["serde"] }
|
||||||
url = { version = "2.5.4", features = [ "serde" ]}
|
cxx = "1.0"
|
||||||
|
console-subscriber = "0.4.1"
|
||||||
|
tokio-util = { version = "0.7.15", features = ["io"] }
|
||||||
|
bytes = "1.10.1"
|
||||||
|
object_store = { version = "0.12.0", features = ["aws"] }
|
||||||
|
ulid = "1.2.1"
|
||||||
|
tracing-subscriber = "0.3.19"
|
||||||
|
humansize = "2.1.3"
|
||||||
|
|
||||||
|
[build-dependencies]
|
||||||
|
cxx-build = "1.0"
|
||||||
|
pkg-config = "0.3.32"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tempfile = "3.19.1"
|
||||||
|
|
19
README.md
19
README.md
|
@ -11,14 +11,13 @@ The signing key is generated with:
|
||||||
nix-store --generate-binary-cache-key nixcache.cy7.sh cache-priv-key.pem cache-pub-key.pem
|
nix-store --generate-binary-cache-key nixcache.cy7.sh cache-priv-key.pem cache-pub-key.pem
|
||||||
```
|
```
|
||||||
|
|
||||||
`AWS_ACCESS_KEY_ID` and `AWS_ENDPOINT_URL` environment variables should be set with your s3 credentials.
|
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables should be set with your s3 credentials.
|
||||||
|
|
||||||
```
|
```
|
||||||
Usage: nixcp [OPTIONS] --bucket <bucket name> --signing-key <SIGNING_KEY> <COMMAND>
|
Usage: nixcp push [OPTIONS] --bucket <bucket name> --signing-key <SIGNING_KEY> [PATH]...
|
||||||
|
|
||||||
Commands:
|
Arguments:
|
||||||
push
|
[PATH]... Path to upload e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||||
help Print this message or the help of the given subcommand(s)
|
|
||||||
|
|
||||||
Options:
|
Options:
|
||||||
--bucket <bucket name>
|
--bucket <bucket name>
|
||||||
|
@ -28,15 +27,13 @@ Options:
|
||||||
--signing-key <SIGNING_KEY>
|
--signing-key <SIGNING_KEY>
|
||||||
Path to the file containing signing key e.g. ~/cache-priv-key.pem
|
Path to the file containing signing key e.g. ~/cache-priv-key.pem
|
||||||
--region <REGION>
|
--region <REGION>
|
||||||
If unspecified, will get it form AWS_DEFAULT_REGION envar or the AWS default
|
If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
|
||||||
--endpoint <ENDPOINT>
|
--endpoint <ENDPOINT>
|
||||||
If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default e.g. https://s3.example.com
|
If unspecifed, will get it from AWS_ENDPOINT envar e.g. https://s3.example.com
|
||||||
--profile <PROFILE>
|
--skip-signature-check
|
||||||
AWS profile to use
|
|
||||||
-h, --help
|
-h, --help
|
||||||
Print help
|
Print help
|
||||||
-V, --version
|
|
||||||
Print version
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Install with nix
|
## Install with nix
|
||||||
|
|
21
build.rs
Normal file
21
build.rs
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
fn main() {
|
||||||
|
cxx_build::bridge("src/bindings/mod.rs")
|
||||||
|
.file("src/bindings/nix.cpp")
|
||||||
|
.flag("-std=c++2a")
|
||||||
|
.flag("-O2")
|
||||||
|
.flag("-include")
|
||||||
|
.flag("nix/config.h")
|
||||||
|
.flag("-I")
|
||||||
|
.flag(concat!(env!("NIX_INCLUDE_PATH"), "/nix"))
|
||||||
|
.compile("nixbinding");
|
||||||
|
println!("cargo:rerun-if-changed=src/bindings");
|
||||||
|
|
||||||
|
pkg_config::Config::new()
|
||||||
|
.atleast_version("2.4")
|
||||||
|
.probe("nix-store")
|
||||||
|
.unwrap();
|
||||||
|
pkg_config::Config::new()
|
||||||
|
.atleast_version("2.4")
|
||||||
|
.probe("nix-main")
|
||||||
|
.unwrap();
|
||||||
|
}
|
52
flake.nix
52
flake.nix
|
@ -22,28 +22,56 @@
|
||||||
};
|
};
|
||||||
toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;
|
toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;
|
||||||
craneLib = (crane.mkLib pkgs).overrideToolchain(_: toolchain);
|
craneLib = (crane.mkLib pkgs).overrideToolchain(_: toolchain);
|
||||||
in
|
lib = pkgs.lib;
|
||||||
{
|
|
||||||
devShells.default = pkgs.mkShell {
|
# don't clean cpp files
|
||||||
nativeBuildInputs = with pkgs; [
|
cppFilter = path: _type: builtins.match ".*(cpp|hpp)$" path != null;
|
||||||
pkg-config
|
cppOrCargo = path: type:
|
||||||
];
|
(cppFilter path type) || (craneLib.filterCargoSources path type);
|
||||||
buildInputs = with pkgs; [
|
src = lib.cleanSourceWith {
|
||||||
openssl
|
src = ./.;
|
||||||
toolchain
|
filter = cppOrCargo;
|
||||||
];
|
name = "source";
|
||||||
};
|
};
|
||||||
|
|
||||||
packages.default = craneLib.buildPackage {
|
commonArgs = {
|
||||||
src = craneLib.cleanCargoSource ./.;
|
inherit src;
|
||||||
strictDeps = true;
|
strictDeps = true;
|
||||||
nativeBuildInputs = with pkgs; [
|
nativeBuildInputs = with pkgs; [
|
||||||
pkg-config
|
pkg-config
|
||||||
];
|
];
|
||||||
buildInputs = with pkgs; [
|
buildInputs = with pkgs; [
|
||||||
|
toolchain
|
||||||
openssl
|
openssl
|
||||||
|
nix
|
||||||
|
boost
|
||||||
|
];
|
||||||
|
# for cpp bindings to work
|
||||||
|
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
|
||||||
|
# skip integration tests (they need a connection to the nix store)
|
||||||
|
cargoTestExtraArgs = "--bins";
|
||||||
|
};
|
||||||
|
|
||||||
|
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
|
||||||
|
nixcp = craneLib.buildPackage (commonArgs // {
|
||||||
|
inherit cargoArtifacts;
|
||||||
|
});
|
||||||
|
in
|
||||||
|
{
|
||||||
|
devShells.default = craneLib.devShell {
|
||||||
|
inputsFrom = [ nixcp ];
|
||||||
|
|
||||||
|
RUST_BACKGRACE = 1;
|
||||||
|
# for cpp bindings to work
|
||||||
|
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
|
||||||
|
|
||||||
|
packages = with pkgs; [
|
||||||
|
tokio-console
|
||||||
|
cargo-udeps
|
||||||
];
|
];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
packages.default = nixcp;
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
237
src/bindings/mod.rs
Normal file
237
src/bindings/mod.rs
Normal file
|
@ -0,0 +1,237 @@
|
||||||
|
/*
|
||||||
|
Copyright 2022 Zhaofeng Li and the Attic contributors
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
//! `libnixstore` Bindings
|
||||||
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use std::cell::UnsafeCell;
|
||||||
|
use std::io;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::stream::{Stream, StreamExt};
|
||||||
|
use tokio::io::{AsyncWrite, AsyncWriteExt};
|
||||||
|
|
||||||
|
// The C++ implementation takes care of concurrency
|
||||||
|
#[repr(transparent)]
|
||||||
|
pub struct FfiNixStore(UnsafeCell<cxx::UniquePtr<ffi::CNixStore>>);
|
||||||
|
|
||||||
|
unsafe impl Send for FfiNixStore {}
|
||||||
|
unsafe impl Sync for FfiNixStore {}
|
||||||
|
|
||||||
|
impl FfiNixStore {
|
||||||
|
pub fn store(&self) -> Pin<&mut ffi::CNixStore> {
|
||||||
|
unsafe {
|
||||||
|
let ptr = self.0.get().as_mut().unwrap();
|
||||||
|
ptr.pin_mut()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Obtain a handle to the Nix store.
|
||||||
|
pub unsafe fn open_nix_store() -> Result<FfiNixStore> {
|
||||||
|
match ffi::open_nix_store() {
|
||||||
|
Ok(ptr) => {
|
||||||
|
let cell = UnsafeCell::new(ptr);
|
||||||
|
Ok(FfiNixStore(cell))
|
||||||
|
}
|
||||||
|
Err(e) => Err(e.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Benchmark different implementations
|
||||||
|
// (tokio, crossbeam, flume)
|
||||||
|
mod mpsc {
|
||||||
|
// Tokio
|
||||||
|
pub use tokio::sync::mpsc::{
|
||||||
|
UnboundedReceiver, UnboundedSender, error::SendError, unbounded_channel,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Async write request.
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum AsyncWriteMessage {
|
||||||
|
Data(Vec<u8>),
|
||||||
|
Error(String),
|
||||||
|
Eof,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Async write request sender.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct AsyncWriteSender {
|
||||||
|
sender: mpsc::UnboundedSender<AsyncWriteMessage>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncWriteSender {
|
||||||
|
fn send(&mut self, data: &[u8]) -> Result<(), mpsc::SendError<AsyncWriteMessage>> {
|
||||||
|
let message = AsyncWriteMessage::Data(Vec::from(data));
|
||||||
|
self.sender.send(message)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn eof(&mut self) -> Result<(), mpsc::SendError<AsyncWriteMessage>> {
|
||||||
|
let message = AsyncWriteMessage::Eof;
|
||||||
|
self.sender.send(message)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn rust_error(
|
||||||
|
&mut self,
|
||||||
|
error: impl std::error::Error,
|
||||||
|
) -> Result<(), impl std::error::Error> {
|
||||||
|
let message = AsyncWriteMessage::Error(error.to_string());
|
||||||
|
self.sender.send(message)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A wrapper of the `AsyncWrite` trait for the synchronous Nix C++ land.
|
||||||
|
pub struct AsyncWriteAdapter {
|
||||||
|
receiver: mpsc::UnboundedReceiver<AsyncWriteMessage>,
|
||||||
|
eof: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncWriteAdapter {
|
||||||
|
pub fn new() -> (Self, Box<AsyncWriteSender>) {
|
||||||
|
let (sender, receiver) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let r = Self {
|
||||||
|
receiver,
|
||||||
|
eof: false,
|
||||||
|
};
|
||||||
|
let sender = Box::new(AsyncWriteSender { sender });
|
||||||
|
|
||||||
|
(r, sender)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write everything the sender sends to us.
|
||||||
|
pub async fn write_all(mut self, mut writer: Box<dyn AsyncWrite + Unpin>) -> Result<()> {
|
||||||
|
let writer = writer.as_mut();
|
||||||
|
|
||||||
|
while let Some(data) = self.next().await {
|
||||||
|
match data {
|
||||||
|
Ok(v) => {
|
||||||
|
writer.write_all(&v).await?;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return Err(e.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !self.eof {
|
||||||
|
Err(io::Error::from(io::ErrorKind::BrokenPipe).into())
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for AsyncWriteAdapter {
|
||||||
|
type Item = std::io::Result<Bytes>;
|
||||||
|
|
||||||
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
match self.receiver.poll_recv(cx) {
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
Poll::Ready(Some(message)) => {
|
||||||
|
use AsyncWriteMessage::*;
|
||||||
|
match message {
|
||||||
|
Data(v) => Poll::Ready(Some(Ok(v.into()))),
|
||||||
|
Error(exception) => {
|
||||||
|
let error = std::io::Error::other(format!("cxx error: {exception}"));
|
||||||
|
Poll::Ready(Some(Err(error)))
|
||||||
|
}
|
||||||
|
Eof => {
|
||||||
|
self.eof = true;
|
||||||
|
Poll::Ready(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Poll::Ready(None) => {
|
||||||
|
if !self.eof {
|
||||||
|
Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe))))
|
||||||
|
} else {
|
||||||
|
Poll::Ready(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cxx::bridge]
|
||||||
|
/// Generated by `cxx.rs`.
|
||||||
|
///
|
||||||
|
/// Mid-level wrapper of `libnixstore` implemented in C++.
|
||||||
|
mod ffi {
|
||||||
|
extern "Rust" {
|
||||||
|
type AsyncWriteSender;
|
||||||
|
fn send(self: &mut AsyncWriteSender, data: &[u8]) -> Result<()>;
|
||||||
|
fn eof(self: &mut AsyncWriteSender) -> Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe extern "C++" {
|
||||||
|
include!("nixcp/src/bindings/nix.hpp");
|
||||||
|
|
||||||
|
// =========
|
||||||
|
// CNixStore
|
||||||
|
// =========
|
||||||
|
|
||||||
|
/// Mid-level wrapper for the Unix Domain Socket Nix Store.
|
||||||
|
type CNixStore;
|
||||||
|
|
||||||
|
/// Queries information about a valid path.
|
||||||
|
fn query_path_info(
|
||||||
|
self: Pin<&mut CNixStore>,
|
||||||
|
store_path: &[u8],
|
||||||
|
) -> Result<UniquePtr<CPathInfo>>;
|
||||||
|
|
||||||
|
/// Computes the closure of a valid path.
|
||||||
|
///
|
||||||
|
/// If `flip_directions` is true, the set of paths that can reach `store_path` is
|
||||||
|
/// returned.
|
||||||
|
fn compute_fs_closure(
|
||||||
|
self: Pin<&mut CNixStore>,
|
||||||
|
store_path: &[u8],
|
||||||
|
flip_direction: bool,
|
||||||
|
include_outputs: bool,
|
||||||
|
include_derivers: bool,
|
||||||
|
) -> Result<UniquePtr<CxxVector<CxxString>>>;
|
||||||
|
|
||||||
|
/// Creates a NAR dump from a path.
|
||||||
|
fn nar_from_path(
|
||||||
|
self: Pin<&mut CNixStore>,
|
||||||
|
base_name: Vec<u8>,
|
||||||
|
sender: Box<AsyncWriteSender>,
|
||||||
|
) -> Result<()>;
|
||||||
|
|
||||||
|
/// Obtains a handle to the Nix store.
|
||||||
|
fn open_nix_store() -> Result<UniquePtr<CNixStore>>;
|
||||||
|
|
||||||
|
// =========
|
||||||
|
// CPathInfo
|
||||||
|
// =========
|
||||||
|
|
||||||
|
/// Mid-level wrapper for the `nix::ValidPathInfo` struct.
|
||||||
|
type CPathInfo;
|
||||||
|
/// Returns the size of the NAR.
|
||||||
|
fn nar_size(self: Pin<&mut CPathInfo>) -> u64;
|
||||||
|
|
||||||
|
/// Returns the references of the store path.
|
||||||
|
fn references(self: Pin<&mut CPathInfo>) -> UniquePtr<CxxVector<CxxString>>;
|
||||||
|
|
||||||
|
/// Returns the possibly invalid signatures attached to the store path.
|
||||||
|
fn sigs(self: Pin<&mut CPathInfo>) -> UniquePtr<CxxVector<CxxString>>;
|
||||||
|
}
|
||||||
|
}
|
128
src/bindings/nix.cpp
Normal file
128
src/bindings/nix.cpp
Normal file
|
@ -0,0 +1,128 @@
|
||||||
|
/*
|
||||||
|
Copyright 2022 Zhaofeng Li and the Attic contributors
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// C++ side of the libnixstore glue.
|
||||||
|
//
|
||||||
|
// We implement a mid-level wrapper of the Nix Store interface,
|
||||||
|
// which is then wrapped again in the Rust side to enable full
|
||||||
|
// async-await operation.
|
||||||
|
//
|
||||||
|
// Here we stick with the naming conventions of Rust and handle
|
||||||
|
// Rust types directly where possible, so that the interfaces are
|
||||||
|
// satisfying to use from the Rust side via cxx.rs.
|
||||||
|
|
||||||
|
#include "nixcp/src/bindings/nix.hpp"
|
||||||
|
|
||||||
|
static std::mutex g_init_nix_mutex;
|
||||||
|
static bool g_init_nix_done = false;
|
||||||
|
|
||||||
|
static nix::StorePath store_path_from_rust(RBasePathSlice base_name) {
|
||||||
|
std::string_view sv((const char *)base_name.data(), base_name.size());
|
||||||
|
return nix::StorePath(sv);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ========
|
||||||
|
// RustSink
|
||||||
|
// ========
|
||||||
|
|
||||||
|
RustSink::RustSink(RBox<AsyncWriteSender> sender) : sender(std::move(sender)) {}
|
||||||
|
|
||||||
|
void RustSink::operator () (std::string_view data) {
|
||||||
|
RBasePathSlice s((const unsigned char *)data.data(), data.size());
|
||||||
|
|
||||||
|
this->sender->send(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
void RustSink::eof() {
|
||||||
|
this->sender->eof();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// =========
|
||||||
|
// CPathInfo
|
||||||
|
// =========
|
||||||
|
|
||||||
|
CPathInfo::CPathInfo(nix::ref<const nix::ValidPathInfo> pi) : pi(pi) {}
|
||||||
|
|
||||||
|
uint64_t CPathInfo::nar_size() {
|
||||||
|
return this->pi->narSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<std::vector<std::string>> CPathInfo::sigs() {
|
||||||
|
std::vector<std::string> result;
|
||||||
|
for (auto&& elem : this->pi->sigs) {
|
||||||
|
result.push_back(std::string(elem));
|
||||||
|
}
|
||||||
|
return std::make_unique<std::vector<std::string>>(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<std::vector<std::string>> CPathInfo::references() {
|
||||||
|
std::vector<std::string> result;
|
||||||
|
for (auto&& elem : this->pi->references) {
|
||||||
|
result.push_back(std::string(elem.to_string()));
|
||||||
|
}
|
||||||
|
return std::make_unique<std::vector<std::string>>(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
// =========
|
||||||
|
// CNixStore
|
||||||
|
// =========
|
||||||
|
|
||||||
|
CNixStore::CNixStore() {
|
||||||
|
std::map<std::string, std::string> params;
|
||||||
|
std::lock_guard<std::mutex> lock(g_init_nix_mutex);
|
||||||
|
|
||||||
|
if (!g_init_nix_done) {
|
||||||
|
nix::initNix();
|
||||||
|
g_init_nix_done = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
this->store = nix::openStore(nix::settings.storeUri.get(), params);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<CPathInfo> CNixStore::query_path_info(RBasePathSlice base_name) {
|
||||||
|
auto store_path = store_path_from_rust(base_name);
|
||||||
|
|
||||||
|
auto r = this->store->queryPathInfo(store_path);
|
||||||
|
return std::make_unique<CPathInfo>(r);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<std::vector<std::string>> CNixStore::compute_fs_closure(RBasePathSlice base_name, bool flip_direction, bool include_outputs, bool include_derivers) {
|
||||||
|
std::set<nix::StorePath> out;
|
||||||
|
|
||||||
|
this->store->computeFSClosure(store_path_from_rust(base_name), out, flip_direction, include_outputs, include_derivers);
|
||||||
|
|
||||||
|
std::vector<std::string> result;
|
||||||
|
for (auto&& elem : out) {
|
||||||
|
result.push_back(std::string(elem.to_string()));
|
||||||
|
}
|
||||||
|
return std::make_unique<std::vector<std::string>>(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
void CNixStore::nar_from_path(RVec<unsigned char> base_name, RBox<AsyncWriteSender> sender) {
|
||||||
|
RustSink sink(std::move(sender));
|
||||||
|
|
||||||
|
std::string_view sv((const char *)base_name.data(), base_name.size());
|
||||||
|
nix::StorePath store_path(sv);
|
||||||
|
|
||||||
|
// exceptions will be thrown into Rust
|
||||||
|
this->store->narFromPath(store_path, sink);
|
||||||
|
sink.eof();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<CNixStore> open_nix_store() {
|
||||||
|
return std::make_unique<CNixStore>();
|
||||||
|
}
|
89
src/bindings/nix.hpp
Normal file
89
src/bindings/nix.hpp
Normal file
|
@ -0,0 +1,89 @@
|
||||||
|
/*
|
||||||
|
Copyright 2022 Zhaofeng Li and the Attic contributors
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// C++ side of the libnixstore glue.
|
||||||
|
//
|
||||||
|
// We implement a mid-level wrapper of the Nix Store interface,
|
||||||
|
// which is then wrapped again in the Rust side to enable full
|
||||||
|
// async-await operation.
|
||||||
|
//
|
||||||
|
// Here we stick with the naming conventions of Rust and handle
|
||||||
|
// Rust types directly where possible, so that the interfaces are
|
||||||
|
// satisfying to use from the Rust side via cxx.rs.
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
#include <iostream>
|
||||||
|
#include <memory>
|
||||||
|
#include <mutex>
|
||||||
|
#include <set>
|
||||||
|
#include <nix/store-api.hh>
|
||||||
|
#include <nix/local-store.hh>
|
||||||
|
#include <nix/remote-store.hh>
|
||||||
|
#include <nix/uds-remote-store.hh>
|
||||||
|
#include <nix/hash.hh>
|
||||||
|
#include <nix/path.hh>
|
||||||
|
#include <nix/serialise.hh>
|
||||||
|
#include <nix/shared.hh>
|
||||||
|
#include <rust/cxx.h>
|
||||||
|
|
||||||
|
template<class T> using RVec = rust::Vec<T>;
|
||||||
|
template<class T> using RBox = rust::Box<T>;
|
||||||
|
template<class T> using RSlice = rust::Slice<T>;
|
||||||
|
using RString = rust::String;
|
||||||
|
using RStr = rust::Str;
|
||||||
|
using RBasePathSlice = RSlice<const unsigned char>;
|
||||||
|
using RHashSlice = RSlice<const unsigned char>;
|
||||||
|
|
||||||
|
struct AsyncWriteSender;
|
||||||
|
|
||||||
|
struct RustSink : nix::Sink
|
||||||
|
{
|
||||||
|
RBox<AsyncWriteSender> sender;
|
||||||
|
public:
|
||||||
|
RustSink(RBox<AsyncWriteSender> sender);
|
||||||
|
void operator () (std::string_view data) override;
|
||||||
|
void eof();
|
||||||
|
};
|
||||||
|
|
||||||
|
// Opaque wrapper for nix::ValidPathInfo
|
||||||
|
class CPathInfo {
|
||||||
|
nix::ref<const nix::ValidPathInfo> pi;
|
||||||
|
public:
|
||||||
|
CPathInfo(nix::ref<const nix::ValidPathInfo> pi);
|
||||||
|
std::unique_ptr<std::vector<std::string>> sigs();
|
||||||
|
std::unique_ptr<std::vector<std::string>> references();
|
||||||
|
uint64_t nar_size();
|
||||||
|
};
|
||||||
|
|
||||||
|
class CNixStore {
|
||||||
|
std::shared_ptr<nix::Store> store;
|
||||||
|
public:
|
||||||
|
CNixStore();
|
||||||
|
|
||||||
|
RString store_dir();
|
||||||
|
std::unique_ptr<CPathInfo> query_path_info(RBasePathSlice base_name);
|
||||||
|
std::unique_ptr<std::vector<std::string>> compute_fs_closure(
|
||||||
|
RBasePathSlice base_name,
|
||||||
|
bool flip_direction,
|
||||||
|
bool include_outputs,
|
||||||
|
bool include_derivers);
|
||||||
|
void nar_from_path(RVec<unsigned char> base_name, RBox<AsyncWriteSender> sender);
|
||||||
|
};
|
||||||
|
|
||||||
|
std::unique_ptr<CNixStore> open_nix_store();
|
||||||
|
|
||||||
|
// Relies on our definitions
|
||||||
|
#include "nixcp/src/bindings/mod.rs.h"
|
65
src/lib.rs
Normal file
65
src/lib.rs
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use clap::{Args, Parser, Subcommand};
|
||||||
|
|
||||||
|
mod bindings;
|
||||||
|
mod cli;
|
||||||
|
pub mod make_nar;
|
||||||
|
pub mod path_info;
|
||||||
|
pub mod push;
|
||||||
|
pub mod store;
|
||||||
|
mod uploader;
|
||||||
|
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
#[command(version)]
|
||||||
|
#[command(name = "nixcp")]
|
||||||
|
#[command(about = "Upload store paths to a s3 binary cache")]
|
||||||
|
#[command(long_about = None)]
|
||||||
|
pub struct Cli {
|
||||||
|
#[command(subcommand)]
|
||||||
|
pub command: Commands,
|
||||||
|
|
||||||
|
/// Whether to enable tokio-console
|
||||||
|
#[arg(long)]
|
||||||
|
pub tokio_console: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Subcommand)]
|
||||||
|
pub enum Commands {
|
||||||
|
#[command(arg_required_else_help = true)]
|
||||||
|
Push(PushArgs),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Args)]
|
||||||
|
pub struct PushArgs {
|
||||||
|
/// The s3 bucket to upload to
|
||||||
|
#[arg(long, value_name = "bucket name")]
|
||||||
|
bucket: String,
|
||||||
|
|
||||||
|
/// Upstream cache to check against. Can be specified multiple times.
|
||||||
|
/// cache.nixos.org is always included.
|
||||||
|
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
|
||||||
|
upstreams: Vec<String>,
|
||||||
|
|
||||||
|
/// Path to the file containing signing key
|
||||||
|
/// e.g. ~/cache-priv-key.pem
|
||||||
|
#[arg(long)]
|
||||||
|
signing_key: String,
|
||||||
|
|
||||||
|
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
|
||||||
|
#[arg(long)]
|
||||||
|
region: Option<String>,
|
||||||
|
|
||||||
|
/// If unspecifed, will get it from AWS_ENDPOINT envar
|
||||||
|
/// e.g. https://s3.example.com
|
||||||
|
#[arg(long)]
|
||||||
|
endpoint: Option<String>,
|
||||||
|
|
||||||
|
#[arg(long)]
|
||||||
|
skip_signature_check: bool,
|
||||||
|
|
||||||
|
/// Path to upload
|
||||||
|
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||||
|
#[arg(value_name = "PATH")]
|
||||||
|
pub paths: Vec<PathBuf>,
|
||||||
|
}
|
102
src/main.rs
102
src/main.rs
|
@ -1,88 +1,46 @@
|
||||||
#![feature(let_chains)]
|
|
||||||
#![feature(extend_one)]
|
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use clap::{Args, Parser, Subcommand};
|
use clap::Parser;
|
||||||
use tracing_subscriber::{EnvFilter, FmtSubscriber};
|
use tracing_subscriber::{EnvFilter, prelude::*};
|
||||||
|
|
||||||
use push::Push;
|
use nixcp::push::Push;
|
||||||
|
use nixcp::store::Store;
|
||||||
mod cli;
|
use nixcp::{Cli, Commands};
|
||||||
mod path_info;
|
|
||||||
mod push;
|
|
||||||
mod uploader;
|
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
|
||||||
#[command(version)]
|
|
||||||
#[command(name = "nixcp")]
|
|
||||||
#[command(about = "Upload store paths to a s3 binary cache")]
|
|
||||||
#[command(long_about = None)]
|
|
||||||
struct Cli {
|
|
||||||
#[command(subcommand)]
|
|
||||||
command: Commands,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Subcommand)]
|
|
||||||
enum Commands {
|
|
||||||
#[command(arg_required_else_help = true)]
|
|
||||||
Push(PushArgs),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Args)]
|
|
||||||
pub struct PushArgs {
|
|
||||||
/// The s3 bucket to upload to
|
|
||||||
#[arg(long, value_name = "bucket name")]
|
|
||||||
bucket: String,
|
|
||||||
|
|
||||||
/// Upstream cache to check against. Can be specified multiple times.
|
|
||||||
/// cache.nixos.org is always included.
|
|
||||||
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
|
|
||||||
upstreams: Vec<String>,
|
|
||||||
|
|
||||||
/// Path to the file containing signing key
|
|
||||||
/// e.g. ~/cache-priv-key.pem
|
|
||||||
#[arg(long)]
|
|
||||||
signing_key: String,
|
|
||||||
|
|
||||||
/// If unspecified, will get it form AWS_DEFAULT_REGION envar
|
|
||||||
#[arg(long)]
|
|
||||||
region: Option<String>,
|
|
||||||
|
|
||||||
/// If unspecifed, will get it from AWS_ENDPOINT_URL envar
|
|
||||||
/// e.g. https://s3.example.com
|
|
||||||
#[arg(long)]
|
|
||||||
endpoint: Option<String>,
|
|
||||||
|
|
||||||
/// AWS profile to use
|
|
||||||
#[arg(long)]
|
|
||||||
profile: Option<String>,
|
|
||||||
|
|
||||||
#[arg(long)]
|
|
||||||
skip_signature_check: bool,
|
|
||||||
|
|
||||||
/// Package or store path to upload
|
|
||||||
/// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
|
||||||
#[arg(value_name = "package or store path")]
|
|
||||||
package: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
let filter = EnvFilter::from_default_env();
|
|
||||||
let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish();
|
|
||||||
tracing::subscriber::set_global_default(subscriber)?;
|
|
||||||
|
|
||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
|
init_logging(cli.tokio_console);
|
||||||
|
|
||||||
match &cli.command {
|
match &cli.command {
|
||||||
Commands::Push(cli) => {
|
Commands::Push(cli) => {
|
||||||
let push = Box::leak(Box::new(Push::new(cli).await?));
|
let store = Store::connect()?;
|
||||||
push.paths_from_package(&cli.package)
|
let push = Box::leak(Box::new(Push::new(cli, store).await?));
|
||||||
|
push.add_paths(cli.paths.clone())
|
||||||
.await
|
.await
|
||||||
.context("nixcp get paths from package")?;
|
.context("add paths to push")?;
|
||||||
push.run().await.context("nixcp run")?;
|
push.run().await.context("nixcp run")?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn init_logging(tokio_console: bool) {
|
||||||
|
let env_filter = EnvFilter::from_default_env();
|
||||||
|
let fmt_layer = tracing_subscriber::fmt::layer().with_filter(env_filter);
|
||||||
|
|
||||||
|
let console_layer = if tokio_console {
|
||||||
|
Some(console_subscriber::spawn())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
tracing_subscriber::registry()
|
||||||
|
.with(fmt_layer)
|
||||||
|
.with(console_layer)
|
||||||
|
.init();
|
||||||
|
|
||||||
|
if tokio_console {
|
||||||
|
println!("tokio-console is enabled");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
81
src/make_nar.rs
Normal file
81
src/make_nar.rs
Normal file
|
@ -0,0 +1,81 @@
|
||||||
|
use anyhow::Result;
|
||||||
|
use async_compression::{Level, tokio::bufread::ZstdEncoder};
|
||||||
|
use nix_compat::{
|
||||||
|
narinfo::{self, NarInfo},
|
||||||
|
store_path::StorePath,
|
||||||
|
};
|
||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
use std::{mem::take, sync::Arc};
|
||||||
|
use tokio::io::{AsyncRead, BufReader};
|
||||||
|
use tokio_util::io::InspectReader;
|
||||||
|
|
||||||
|
use crate::path_info::PathInfo;
|
||||||
|
use crate::store::Store;
|
||||||
|
|
||||||
|
pub struct MakeNar<'a> {
|
||||||
|
path_info: &'a PathInfo,
|
||||||
|
store: Arc<Store>,
|
||||||
|
pub nar_hasher: Sha256,
|
||||||
|
/// hash of compressed nar file
|
||||||
|
file_hasher: Sha256,
|
||||||
|
pub nar_size: u64,
|
||||||
|
file_size: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> MakeNar<'a> {
|
||||||
|
pub fn new(path_info: &'a PathInfo, store: Arc<Store>) -> Result<Self> {
|
||||||
|
Ok(Self {
|
||||||
|
path_info,
|
||||||
|
store,
|
||||||
|
nar_hasher: Sha256::new(),
|
||||||
|
file_hasher: Sha256::new(),
|
||||||
|
nar_size: 0,
|
||||||
|
file_size: 0,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a compressed nar reader which can be uploaded. File hash will be available when
|
||||||
|
/// everything is read
|
||||||
|
pub fn compress_and_hash(&mut self) -> Result<impl AsyncRead> {
|
||||||
|
let nar_reader = self.store.nar_from_path(self.path_info.path.clone());
|
||||||
|
// reader that hashes as nar is read
|
||||||
|
let nar_reader = InspectReader::new(nar_reader, |x| {
|
||||||
|
self.nar_size += x.len() as u64;
|
||||||
|
self.nar_hasher.update(x);
|
||||||
|
});
|
||||||
|
|
||||||
|
let encoder = ZstdEncoder::with_quality(BufReader::new(nar_reader), Level::Default);
|
||||||
|
// reader that updates file_hash as the compressed nar is read
|
||||||
|
Ok(InspectReader::new(encoder, |x| {
|
||||||
|
self.file_size += x.len() as u64;
|
||||||
|
self.file_hasher.update(x);
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns *unsigned* narinfo. `url` must be updated before uploading
|
||||||
|
pub fn get_narinfo(&mut self) -> Result<NarInfo> {
|
||||||
|
let file_hash = take(&mut self.file_hasher).finalize().into();
|
||||||
|
let nar_hash = take(&mut self.nar_hasher).finalize().into();
|
||||||
|
|
||||||
|
Ok(NarInfo {
|
||||||
|
flags: narinfo::Flags::empty(),
|
||||||
|
store_path: self.path_info.path.as_ref(),
|
||||||
|
nar_hash,
|
||||||
|
nar_size: self.nar_size,
|
||||||
|
references: self
|
||||||
|
.path_info
|
||||||
|
.references
|
||||||
|
.iter()
|
||||||
|
.map(StorePath::as_ref)
|
||||||
|
.collect(),
|
||||||
|
signatures: Vec::new(),
|
||||||
|
ca: None,
|
||||||
|
system: None,
|
||||||
|
deriver: None,
|
||||||
|
compression: Some("zstd"),
|
||||||
|
file_hash: Some(file_hash),
|
||||||
|
file_size: Some(self.file_size),
|
||||||
|
url: "",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
218
src/path_info.rs
218
src/path_info.rs
|
@ -1,85 +1,80 @@
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
use anyhow::{Context, Error, Result};
|
use anyhow::{Context, Result, anyhow};
|
||||||
use aws_sdk_s3 as s3;
|
use futures::future::join_all;
|
||||||
use nix_compat::nixbase32;
|
use nix_compat::nixbase32;
|
||||||
use nix_compat::store_path::StorePath;
|
use nix_compat::store_path::StorePath;
|
||||||
|
use object_store::{ObjectStore, aws::AmazonS3, path::Path as ObjectPath};
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use serde::Deserialize;
|
use std::path::Path;
|
||||||
use tokio::process::Command;
|
use tokio::process::Command;
|
||||||
use tracing::{debug, error, trace};
|
use tracing::{debug, trace};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
// nix path-info --derivation --json
|
use crate::store::Store;
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||||
pub struct PathInfo {
|
pub struct PathInfo {
|
||||||
pub deriver: Option<StorePath<String>>,
|
|
||||||
pub path: StorePath<String>,
|
pub path: StorePath<String>,
|
||||||
signatures: Option<Vec<String>>,
|
pub signatures: Vec<String>,
|
||||||
pub references: Vec<StorePath<String>>,
|
pub references: Vec<StorePath<String>>,
|
||||||
|
pub nar_size: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PathInfo {
|
impl PathInfo {
|
||||||
// get PathInfo for a package or a store path
|
pub async fn from_derivation(drv: &Path, store: &Store) -> Result<Self> {
|
||||||
// we deserialize this as an array of `PathInfo` below
|
debug!("query path info for {:?}", drv);
|
||||||
pub async fn from_path(path: &str) -> Result<Self> {
|
|
||||||
debug!("query nix path-info for {path}");
|
|
||||||
// use lix cause nix would return a json map instead of an array
|
|
||||||
// json output is not stable and could break in future
|
|
||||||
// TODO figure out a better way
|
|
||||||
let nix_cmd = Command::new("nix")
|
|
||||||
.arg("run")
|
|
||||||
.arg("--experimental-features")
|
|
||||||
.arg("nix-command flakes")
|
|
||||||
.arg("github:nixos/nixpkgs/nixos-unstable#lix")
|
|
||||||
.arg("--")
|
|
||||||
.arg("path-info")
|
|
||||||
.arg("--json")
|
|
||||||
.arg(path)
|
|
||||||
.output()
|
|
||||||
.await
|
|
||||||
.context("`nix path-info` failed for {package}")?;
|
|
||||||
|
|
||||||
trace!(
|
let derivation = match drv.extension() {
|
||||||
"nix path-info output: {}",
|
Some(ext) if ext == "drv" => drv.as_os_str().as_encoded_bytes(),
|
||||||
String::from_utf8_lossy(&nix_cmd.stdout)
|
_ => {
|
||||||
);
|
let drv = {
|
||||||
|
// resolve symlink
|
||||||
// nix path-info returns an array with one element
|
if drv.is_symlink() {
|
||||||
match serde_json::from_slice::<Vec<_>>(&nix_cmd.stdout)
|
&drv.canonicalize()?
|
||||||
.context("parse path info from stdout")
|
} else {
|
||||||
{
|
drv
|
||||||
Ok(path_info) => path_info
|
}
|
||||||
.into_iter()
|
};
|
||||||
.next()
|
&Command::new("nix")
|
||||||
.ok_or_else(|| Error::msg("nix path-info returned empty")),
|
.arg("path-info")
|
||||||
Err(e) => {
|
.arg("--derivation")
|
||||||
error!(
|
.arg(drv)
|
||||||
"Failed to parse data from `nix path-info`. The path may not exist on your system."
|
.output()
|
||||||
);
|
.await
|
||||||
Err(e)
|
.context(format!("run command: nix path-info --derivaiton {drv:?}"))?
|
||||||
|
.stdout
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
let derivation = String::from_utf8_lossy(derivation);
|
||||||
|
debug!("derivation: {derivation}");
|
||||||
|
|
||||||
|
if derivation.is_empty() {
|
||||||
|
return Err(anyhow!(
|
||||||
|
"nix path-info did not return a derivation for {drv:#?}"
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Self::from_path(derivation.trim(), store).await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_closure(&self) -> Result<Vec<Self>> {
|
pub async fn from_path(path: &str, store: &Store) -> Result<Self> {
|
||||||
debug!("query nix-store for {}", self.absolute_path());
|
let store_path =
|
||||||
let nix_store_cmd = Command::new("nix-store")
|
StorePath::from_absolute_path(path.as_bytes()).context("storepath from path")?;
|
||||||
.arg("--query")
|
store
|
||||||
.arg("--requisites")
|
.query_path_info(store_path)
|
||||||
.arg("--include-outputs")
|
|
||||||
.arg(self.absolute_path())
|
|
||||||
.output()
|
|
||||||
.await
|
.await
|
||||||
.expect("nix-store cmd failed");
|
.context("query pathinfo for path")
|
||||||
|
}
|
||||||
|
|
||||||
let nix_store_paths = String::from_utf8(nix_store_cmd.stdout)?;
|
// TODO: skip call to query_path_info and return Vec<Path>?
|
||||||
let nix_store_paths: Vec<&str> = nix_store_paths.lines().collect();
|
pub async fn get_closure(&self, store: &Store) -> Result<Vec<Self>> {
|
||||||
let mut closure = Vec::with_capacity(nix_store_paths.len());
|
let futs = store
|
||||||
for path in nix_store_paths {
|
.compute_fs_closure(self.path.clone())
|
||||||
closure.push(Self::from_path(path).await?);
|
.await?
|
||||||
}
|
.into_iter()
|
||||||
Ok(closure)
|
.map(|x| store.query_path_info(x));
|
||||||
|
join_all(futs).await.into_iter().collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// checks if the path is signed by any upstream. if it is, we assume a cache hit.
|
/// checks if the path is signed by any upstream. if it is, we assume a cache hit.
|
||||||
|
@ -101,23 +96,21 @@ impl PathInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn signees(&self) -> Vec<&str> {
|
fn signees(&self) -> Vec<&str> {
|
||||||
if let Some(signatures) = self.signatures.as_ref() {
|
let signers: Vec<_> = self
|
||||||
let signees: Vec<_> = signatures
|
.signatures
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|signature| Some(signature.split_once(":")?.0))
|
.filter_map(|signature| Some(signature.split_once(":")?.0))
|
||||||
.collect();
|
.collect();
|
||||||
trace!("signees for {}: {:?}", self.path, signees);
|
trace!("signers for {}: {:?}", self.path, signers);
|
||||||
return signees;
|
signers
|
||||||
}
|
|
||||||
Vec::new()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
|
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
|
||||||
for upstream in upstreams {
|
for upstream in upstreams {
|
||||||
let upstream = upstream
|
let upstream = upstream
|
||||||
.join(format!("{}.narinfo", self.digest()).as_str())
|
.join(self.narinfo_path().as_ref())
|
||||||
.expect("adding <hash>.narinfo should make a valid url");
|
.expect("adding <hash>.narinfo should make a valid url");
|
||||||
debug!("querying {}", upstream);
|
trace!("querying {}", upstream);
|
||||||
let res_status = reqwest::Client::new()
|
let res_status = reqwest::Client::new()
|
||||||
.head(upstream.as_str())
|
.head(upstream.as_str())
|
||||||
.send()
|
.send()
|
||||||
|
@ -135,83 +128,12 @@ impl PathInfo {
|
||||||
self.path.to_absolute_path()
|
self.path.to_absolute_path()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn digest(&self) -> String {
|
pub fn narinfo_path(&self) -> ObjectPath {
|
||||||
nixbase32::encode(self.path.digest())
|
ObjectPath::parse(format!("{}.narinfo", nixbase32::encode(self.path.digest())))
|
||||||
|
.expect("must parse to a valid object_store path")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn check_if_already_exists(&self, s3_client: &s3::Client, bucket: String) -> bool {
|
pub async fn check_if_already_exists(&self, s3: &AmazonS3) -> bool {
|
||||||
s3_client
|
s3.head(&self.narinfo_path()).await.is_ok()
|
||||||
.head_object()
|
|
||||||
.bucket(bucket)
|
|
||||||
.key(format!("{}.narinfo", self.digest()))
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.is_ok()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn get_signees_from_path_info() {
|
|
||||||
let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#;
|
|
||||||
let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize");
|
|
||||||
|
|
||||||
path_info.signatures = Some(vec![
|
|
||||||
"cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(),
|
|
||||||
"nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(),
|
|
||||||
]);
|
|
||||||
let signees = path_info.signees();
|
|
||||||
assert_eq!(signees, vec!["cache.nixos.org-1", "nixcache.cy7.sh"]);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn match_upstream_cache_from_signature() {
|
|
||||||
let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#;
|
|
||||||
let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize");
|
|
||||||
|
|
||||||
path_info.signatures = Some(vec![
|
|
||||||
"cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(),
|
|
||||||
"nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(),
|
|
||||||
"nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs=".to_string(),
|
|
||||||
]);
|
|
||||||
assert!(
|
|
||||||
path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()])
|
|
||||||
);
|
|
||||||
assert!(
|
|
||||||
path_info.check_upstream_signature(&[Url::parse("https://nixcache.cy7.sh").unwrap()])
|
|
||||||
);
|
|
||||||
assert!(
|
|
||||||
path_info.check_upstream_signature(&[
|
|
||||||
Url::parse("https://nix-community.cachix.org").unwrap()
|
|
||||||
])
|
|
||||||
);
|
|
||||||
assert!(
|
|
||||||
!path_info
|
|
||||||
.check_upstream_signature(&[Url::parse("https://fake-cache.cachix.org").unwrap()]),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn path_info_without_signature() {
|
|
||||||
let path_info_json = r#"{"ca":"fixed:r:sha256:1q10p04pgx9sk6xbvrkn4nvh0ys2lzplgcni5368f4z3cr8ikbmz","narHash":"sha256-v64ZUWbjE4fMKNGyR++nQnsAtyV25r26mTr1dwm4IOA=","narSize":5520,"path":"/nix/store/gj6hz9mj23v01yvq1nn5f655jrcky1qq-nixos-option.nix","references":[],"registrationTime":1744740942,"valid":true}"#;
|
|
||||||
let path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize");
|
|
||||||
|
|
||||||
assert!(
|
|
||||||
!path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()])
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
#[test]
|
|
||||||
fn path_info_deserialize_nix_map() {
|
|
||||||
let path_info_json = r#"{"/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation":{"ca":null,"deriver":"/nix/store/h8z25s6arcrns5nmrq1yhgbamywjivpn-home-manager-generation.drv","narHash":"sha256-o4qwqyJ5UVm9cyC/nBNcNYVnIM14Pewgw7fou+wUVSY=","narSize":13608,"references":["/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/4d0ix5djms3n2jnjdc58l916cwack1rp-empty-directory","/nix/store/56zmgla8443qfrkrh2ch0vz0mh8ywrw1-home-manager-files","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/80l1sb3vcmrkcdd7ihlizkcnv19rq9fj-ncurses-6.5","/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation","/nix/store/92as847i10kl6s19fi910ddyk9l83835-check-link-targets.sh","/nix/store/9c90iz95yynyh3vsc67zndch6j01vgz3-home-manager-path","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bm5fi6wj0w4r2wjll2448k307bzfcjwx-cleanup","/nix/store/c244fsb3a7i5837lzn94m4bmav9i5p9b-link","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/gpxsdrrd4x93fs75395vr2dfys1ki9mq-jq-1.7.1-bin","/nix/store/jlf743lqxbvad6dbgndsgqfg20m2np5i-sd-switch-0.5.3","/nix/store/mhmgm739aagj4x7hr6ag2wjmxhmpy8mf-gettext-0.22.5","/nix/store/w9db12j05yv5hl31s6jndd9cfm1g1gw4-hm-modules-messages","/nix/store/wj1c3gsiajabnq50ifxqnlv60i5rhqj7-diffutils-3.10","/nix/store/xhql0ilzbiqwnmz4z8y0phk611wynxf2-gnused-4.9","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zhrjg6wxrxmdlpn6iapzpp2z2vylpvw5-home-manager.sh"],"registrationTime":1744742989,"signatures":["nixcache.cy7.sh:Vq4X95kSzum7BwrBhjmmM2yVipfBI3AE3jgZ3b3RoYrP4/ghotbDdlwCvwK3qx4BQdEOLSgrC1tDwiMNb6oRBw=="],"ultimate":false}}"#;
|
|
||||||
serde_json::from_str::<HashMap<String, PathInfo>>(path_info_json).expect("must serialize");
|
|
||||||
|
|
||||||
let path_info_json = r#"{"/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b":{"ca":null,"deriver":"/nix/store/12ssi931481jlkizgfk1c1jnawvwjbhh-nixos-system-chunk-25.05.20250412.2631b0b.drv","narHash":"sha256-CHhBIzMD4v/FKqKgGroq0UC1k3GrK5lcNwQPMpv2xLc=","narSize":20704,"references":["/nix/store/0yjiyixxsr137iw93hnaacdsssy1li9h-switch-to-configuration-0.1.0","/nix/store/14rby7cpwrzjsjym44cl5h6nj6qpn1gs-etc","/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b","/nix/store/3wjljpj30fvv2cdb60apr4126pa5bm87-shadow-4.17.2","/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/5dyh8l59kfvf89zjkbmjfnx7fix93n4f-net-tools-2.10","/nix/store/aq9wdsz12bg9252790l9awiry2bml4ls-sops-install-secrets-0.0.1","/nix/store/b00kq6fjhgisdrykg621vml8505nnmb3-users-groups.json","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bfr68wi6k8icb3j9fy3fzchva56djfhd-mounts.sh","/nix/store/cjnihsds5hhnji9r85hglph07q9y9hgc-system-path","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/f9jll96j74f5ykvs062718b98lfjbn9g-util-linux-2.40.4-bin","/nix/store/h7zih134d3n5yk8pnhv1fa38n6qkyrn2-pre-switch-checks","/nix/store/idn5n51246piyxcr3v6gxnj5a5l9mzpn-linux-6.14.2","/nix/store/ipn5793y61x2904xqnkgbjnp91svjjzx-perl-5.40.0-env","/nix/store/j1rikvl25pz0b5ham1ijq0nbg1q2fqfy-initrd-linux-6.14.2","/nix/store/jgawnqyh6piwcl79gxpmq5czx9rfr9xh-glibc-locales-2.40-66","/nix/store/jqgmcv8j4gj59218hcbiyn8z951rycdj-install-grub.sh","/nix/store/kpmybhxy3gz6k1znbdirwsp3c6wvsgg9-manifest.json","/nix/store/lgainx4gl6q7mhiwmls81d3n51p5jz7z-linux-6.14.2-modules","/nix/store/mhxn5kwnri3z9hdzi3x0980id65p0icn-lib.sh","/nix/store/n8n0faszqlnf3mdg0fj6abnknrhjsw5j-perl-5.40.0-env","/nix/store/nq61v7a601gjndijq5nndprkzpwz4q9g-glibc-2.40-66-bin","/nix/store/nx27idxpvi3fk3p7admvhipny73nr25n-kmod-31","/nix/store/pggww1d2pg24fcg5v36xn63n53vanyyi-gnupg-2.4.7","/nix/store/rg5rf512szdxmnj9qal3wfdnpfsx38qi-setup-etc.pl","/nix/store/vvlfaafnz3pdhw7lx5kc5gb9pl4zhz5l-local-cmds","/nix/store/w142vx7ij1fz6qwhp5dprkf59cizvv1v-update-users-groups.pl","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zlsmh0ccgvncg30qb4y0mp5pahnk1wnw-append-initrd-secrets","/nix/store/zs07icpv5ykf8m36xcv717hh26bp09fa-firmware","/nix/store/zy2n4id5gcxcbx2x8jbblkmcpdlpsypk-getent-glibc-2.40-66"],"registrationTime":1744743136,"signatures":["nixcache.cy7.sh:dZ1XiKQNe0fRX48gBj03PIABYJGV6BPwb72YpMqEBONZMF+JrkVKhRCF0ur/4Bf5prHxg6Qfg1ytP/4csRC9DQ=="],"ultimate":false}}"#;
|
|
||||||
serde_json::from_str::<HashMap<String, PathInfo>>(path_info_json).expect("must serialize");
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
|
|
137
src/push.rs
137
src/push.rs
|
@ -1,6 +1,8 @@
|
||||||
use std::{
|
use std::{
|
||||||
|
collections::HashSet,
|
||||||
fs,
|
fs,
|
||||||
iter::once,
|
iter::once,
|
||||||
|
path::PathBuf,
|
||||||
sync::{
|
sync::{
|
||||||
Arc,
|
Arc,
|
||||||
atomic::{AtomicUsize, Ordering},
|
atomic::{AtomicUsize, Ordering},
|
||||||
|
@ -8,22 +10,22 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use aws_config::Region;
|
|
||||||
use aws_sdk_s3 as s3;
|
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
|
use humansize::{DECIMAL, format_size};
|
||||||
use nix_compat::narinfo::{self, SigningKey};
|
use nix_compat::narinfo::{self, SigningKey};
|
||||||
use tokio::sync::{RwLock, mpsc};
|
use object_store::aws::{AmazonS3, AmazonS3Builder};
|
||||||
use tracing::{debug, info, trace};
|
use tokio::sync::{RwLock, Semaphore, mpsc};
|
||||||
|
use tracing::debug;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
use crate::{PushArgs, path_info::PathInfo, uploader::Uploader};
|
use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
|
||||||
|
|
||||||
pub struct Push {
|
pub struct Push {
|
||||||
upstream_caches: Vec<Url>,
|
upstream_caches: Vec<Url>,
|
||||||
store_paths: Arc<RwLock<Vec<PathInfo>>>,
|
store_paths: Arc<RwLock<HashSet<PathInfo>>>,
|
||||||
s3_client: s3::Client,
|
|
||||||
signing_key: SigningKey<ed25519_dalek::SigningKey>,
|
signing_key: SigningKey<ed25519_dalek::SigningKey>,
|
||||||
bucket: String,
|
store: Arc<Store>,
|
||||||
|
s3: Arc<AmazonS3>,
|
||||||
// paths that we skipped cause of a signature match
|
// paths that we skipped cause of a signature match
|
||||||
signature_hit_count: AtomicUsize,
|
signature_hit_count: AtomicUsize,
|
||||||
// paths that we skipped cause we found it on an upstream
|
// paths that we skipped cause we found it on an upstream
|
||||||
|
@ -35,7 +37,7 @@ pub struct Push {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Push {
|
impl Push {
|
||||||
pub async fn new(cli: &PushArgs) -> Result<Self> {
|
pub async fn new(cli: &PushArgs, store: Store) -> Result<Self> {
|
||||||
let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1);
|
let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1);
|
||||||
for upstream in cli
|
for upstream in cli
|
||||||
.upstreams
|
.upstreams
|
||||||
|
@ -49,24 +51,21 @@ impl Push {
|
||||||
let key = fs::read_to_string(&cli.signing_key)?;
|
let key = fs::read_to_string(&cli.signing_key)?;
|
||||||
let signing_key = narinfo::parse_keypair(key.as_str())?.0;
|
let signing_key = narinfo::parse_keypair(key.as_str())?.0;
|
||||||
|
|
||||||
let mut s3_config = aws_config::from_env();
|
let mut s3_builder = AmazonS3Builder::from_env().with_bucket_name(&cli.bucket);
|
||||||
|
|
||||||
if let Some(region) = &cli.region {
|
if let Some(region) = &cli.region {
|
||||||
s3_config = s3_config.region(Region::new(region.clone()));
|
s3_builder = s3_builder.with_region(region);
|
||||||
}
|
}
|
||||||
if let Some(endpoint) = &cli.endpoint {
|
if let Some(endpoint) = &cli.endpoint {
|
||||||
s3_config = s3_config.endpoint_url(endpoint);
|
s3_builder = s3_builder.with_endpoint(endpoint);
|
||||||
}
|
|
||||||
if let Some(profile) = &cli.profile {
|
|
||||||
s3_config = s3_config.profile_name(profile);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let s3_client = s3::Client::new(&s3_config.load().await);
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
upstream_caches: upstreams,
|
upstream_caches: upstreams,
|
||||||
store_paths: Arc::new(RwLock::new(Vec::new())),
|
store_paths: Arc::new(RwLock::new(HashSet::new())),
|
||||||
s3_client,
|
|
||||||
signing_key,
|
signing_key,
|
||||||
bucket: cli.bucket.clone(),
|
store: Arc::new(store),
|
||||||
|
s3: Arc::new(s3_builder.build()?),
|
||||||
signature_hit_count: AtomicUsize::new(0),
|
signature_hit_count: AtomicUsize::new(0),
|
||||||
upstream_hit_count: AtomicUsize::new(0),
|
upstream_hit_count: AtomicUsize::new(0),
|
||||||
already_exists_count: AtomicUsize::new(0),
|
already_exists_count: AtomicUsize::new(0),
|
||||||
|
@ -74,26 +73,42 @@ impl Push {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn paths_from_package(&mut self, package: &str) -> Result<()> {
|
pub async fn add_paths(&'static self, paths: Vec<PathBuf>) -> Result<()> {
|
||||||
let path_info = PathInfo::from_path(package)
|
let mut futs = Vec::with_capacity(paths.len());
|
||||||
|
for path in paths {
|
||||||
|
let store_paths = self.store_paths.clone();
|
||||||
|
let store = self.store.clone();
|
||||||
|
|
||||||
|
futs.push(tokio::spawn(async move {
|
||||||
|
let path_info = PathInfo::from_derivation(path.as_path(), &store)
|
||||||
|
.await
|
||||||
|
.context("get path info for path")?;
|
||||||
|
debug!("path-info for {path:?}: {path_info:?}");
|
||||||
|
|
||||||
|
store_paths.write().await.extend(
|
||||||
|
path_info
|
||||||
|
.get_closure(&store)
|
||||||
|
.await
|
||||||
|
.context("closure from path info")?,
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
join_all(futs)
|
||||||
.await
|
.await
|
||||||
.context("get path info for package")?;
|
.into_iter()
|
||||||
debug!("path-info for {package}: {:?}", path_info);
|
.flatten()
|
||||||
self.store_paths.write().await.extend(
|
.collect::<Result<Vec<_>>>()?;
|
||||||
path_info
|
println!("found {} store paths", self.store_paths.read().await.len());
|
||||||
.get_closure()
|
|
||||||
.await
|
|
||||||
.context("closure from path info")?,
|
|
||||||
);
|
|
||||||
info!("found {} store paths", self.store_paths.read().await.len());
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(&'static self) -> Result<()> {
|
pub async fn run(&'static self) -> Result<()> {
|
||||||
let (tx, rx) = mpsc::channel(10);
|
let (tx, rx) = mpsc::channel(1);
|
||||||
let filter = tokio::spawn(self.filter_from_upstream(tx));
|
let filter = tokio::spawn(self.filter_from_upstream(tx));
|
||||||
let upload = tokio::spawn(self.upload(rx));
|
let upload = tokio::spawn(self.upload(rx));
|
||||||
|
|
||||||
filter.await?;
|
filter.await?;
|
||||||
upload.await??;
|
upload.await??;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -101,33 +116,34 @@ impl Push {
|
||||||
|
|
||||||
/// filter paths that are on upstream and send to `tx`
|
/// filter paths that are on upstream and send to `tx`
|
||||||
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
|
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
|
||||||
let mut handles = Vec::with_capacity(10);
|
let mut handles = Vec::new();
|
||||||
let store_paths = self.store_paths.read().await.clone();
|
let store_paths = self.store_paths.read().await.clone();
|
||||||
|
// limit number of inflight requests
|
||||||
|
let inflight_permits = Arc::new(Semaphore::new(32));
|
||||||
|
|
||||||
for path in store_paths.into_iter() {
|
for path in store_paths.into_iter() {
|
||||||
if path.check_upstream_signature(&self.upstream_caches) {
|
if path.check_upstream_signature(&self.upstream_caches) {
|
||||||
trace!("skip {} (signature match)", path.absolute_path());
|
debug!("skip {} (signature match)", path.absolute_path());
|
||||||
self.signature_hit_count.fetch_add(1, Ordering::Release);
|
self.signature_hit_count.fetch_add(1, Ordering::Relaxed);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
handles.push({
|
handles.push({
|
||||||
let tx = tx.clone();
|
let tx = tx.clone();
|
||||||
|
let inflight_permits = inflight_permits.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
let _permit = inflight_permits.acquire().await.unwrap();
|
||||||
if !path
|
if !path
|
||||||
.check_upstream_hit(self.upstream_caches.as_slice())
|
.check_upstream_hit(self.upstream_caches.as_slice())
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
if path
|
if path.check_if_already_exists(&self.s3).await {
|
||||||
.check_if_already_exists(&self.s3_client, self.bucket.clone())
|
debug!("skip {} (already exists)", path.absolute_path());
|
||||||
.await
|
|
||||||
{
|
|
||||||
trace!("skip {} (already exists)", path.absolute_path());
|
|
||||||
self.already_exists_count.fetch_add(1, Ordering::Relaxed);
|
self.already_exists_count.fetch_add(1, Ordering::Relaxed);
|
||||||
} else {
|
} else {
|
||||||
tx.send(path).await.unwrap();
|
tx.send(path).await.unwrap();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
trace!("skip {} (upstream hit)", path.absolute_path());
|
debug!("skip {} (upstream hit)", path.absolute_path());
|
||||||
self.upstream_hit_count.fetch_add(1, Ordering::Relaxed);
|
self.upstream_hit_count.fetch_add(1, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -142,24 +158,35 @@ impl Push {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||||
let mut uploads = Vec::with_capacity(10);
|
let mut uploads = Vec::new();
|
||||||
|
let permits = Arc::new(Semaphore::new(10));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
let permits = permits.clone();
|
||||||
|
|
||||||
if let Some(path_to_upload) = rx.recv().await {
|
if let Some(path_to_upload) = rx.recv().await {
|
||||||
let absolute_path = path_to_upload.absolute_path();
|
uploads.push(tokio::spawn({
|
||||||
|
// large uploads will be concurrently uploaded with multipart anyway so don't spawn
|
||||||
println!("uploading: {}", absolute_path);
|
// too much of them
|
||||||
let uploader = Uploader::new(
|
let permit = if path_to_upload.nar_size > 15 * 1024 * 1024 {
|
||||||
&self.signing_key,
|
Some(permits.acquire_owned().await.unwrap())
|
||||||
path_to_upload,
|
} else {
|
||||||
&self.s3_client,
|
None
|
||||||
self.bucket.clone(),
|
};
|
||||||
)?;
|
println!(
|
||||||
|
"uploading: {} (size: {})",
|
||||||
uploads.push(tokio::spawn(async move {
|
path_to_upload.absolute_path(),
|
||||||
let res = uploader.upload().await;
|
format_size(path_to_upload.nar_size, DECIMAL)
|
||||||
self.upload_count.fetch_add(1, Ordering::Relaxed);
|
);
|
||||||
res
|
let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
|
||||||
|
let s3 = self.s3.clone();
|
||||||
|
let store = self.store.clone();
|
||||||
|
async move {
|
||||||
|
let res = uploader.upload(s3, store).await;
|
||||||
|
drop(permit);
|
||||||
|
self.upload_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
res
|
||||||
|
}
|
||||||
}));
|
}));
|
||||||
} else {
|
} else {
|
||||||
join_all(uploads)
|
join_all(uploads)
|
||||||
|
|
100
src/store.rs
Normal file
100
src/store.rs
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
use std::{ffi::OsStr, os::unix::ffi::OsStrExt, sync::Arc};
|
||||||
|
|
||||||
|
use anyhow::{Context, Result};
|
||||||
|
use nix_compat::store_path::StorePath;
|
||||||
|
use tokio::{io::AsyncRead, task};
|
||||||
|
use tokio_util::io::StreamReader;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
bindings::{self, AsyncWriteAdapter},
|
||||||
|
path_info::PathInfo,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct Store {
|
||||||
|
inner: Arc<bindings::FfiNixStore>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Store {
|
||||||
|
pub fn connect() -> Result<Self> {
|
||||||
|
let inner = unsafe { bindings::open_nix_store()? };
|
||||||
|
Ok(Self {
|
||||||
|
inner: Arc::new(inner),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn compute_fs_closure(
|
||||||
|
&self,
|
||||||
|
path: StorePath<String>,
|
||||||
|
) -> Result<Vec<StorePath<String>>> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
task::spawn_blocking(move || {
|
||||||
|
let cxx_vector =
|
||||||
|
inner
|
||||||
|
.store()
|
||||||
|
.compute_fs_closure(path.to_string().as_bytes(), false, true, true)?;
|
||||||
|
cxx_vector
|
||||||
|
.iter()
|
||||||
|
.map(|x| {
|
||||||
|
StorePath::from_bytes(x.as_bytes())
|
||||||
|
.context("make StorePath from vector returned by compute_fs_closure")
|
||||||
|
})
|
||||||
|
.collect::<Result<_, _>>()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn query_path_info(&self, path: StorePath<String>) -> Result<PathInfo> {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
|
||||||
|
task::spawn_blocking(move || {
|
||||||
|
let mut c_path_info = inner
|
||||||
|
.store()
|
||||||
|
.query_path_info(path.to_string().as_bytes())
|
||||||
|
.context("query cpp for path info")?;
|
||||||
|
|
||||||
|
let signatures = c_path_info
|
||||||
|
.pin_mut()
|
||||||
|
.sigs()
|
||||||
|
.into_iter()
|
||||||
|
.map(|x| {
|
||||||
|
let osstr = OsStr::from_bytes(x.as_bytes());
|
||||||
|
osstr.to_str().unwrap().to_string()
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let references = c_path_info
|
||||||
|
.pin_mut()
|
||||||
|
.references()
|
||||||
|
.into_iter()
|
||||||
|
.map(|x| StorePath::from_bytes(x.as_bytes()))
|
||||||
|
.collect::<Result<_, _>>()
|
||||||
|
.context("get references from pathinfo")?;
|
||||||
|
let nar_size = c_path_info.pin_mut().nar_size();
|
||||||
|
|
||||||
|
Ok(PathInfo {
|
||||||
|
path,
|
||||||
|
signatures,
|
||||||
|
references,
|
||||||
|
nar_size,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn nar_from_path(&self, store_path: StorePath<String>) -> impl AsyncRead {
|
||||||
|
let inner = self.inner.clone();
|
||||||
|
let (adapter, mut sender) = AsyncWriteAdapter::new();
|
||||||
|
let base_name = store_path.to_string().as_bytes().to_vec();
|
||||||
|
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
// Send all exceptions through the channel, and ignore errors
|
||||||
|
// during sending (the channel may have been closed).
|
||||||
|
if let Err(e) = inner.store().nar_from_path(base_name, sender.clone()) {
|
||||||
|
let _ = sender.rust_error(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
StreamReader::new(adapter)
|
||||||
|
}
|
||||||
|
}
|
217
src/uploader.rs
217
src/uploader.rs
|
@ -1,187 +1,80 @@
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use async_compression::{Level, tokio::bufread::ZstdEncoder};
|
use bytes::BytesMut;
|
||||||
use aws_sdk_s3::{
|
use nix_compat::{narinfo::SigningKey, nixbase32};
|
||||||
self as s3,
|
use object_store::{ObjectStore, aws::AmazonS3, buffered::BufWriter, path::Path};
|
||||||
types::{CompletedMultipartUpload, CompletedPart},
|
use std::sync::Arc;
|
||||||
};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
use futures::future::join_all;
|
use tracing::{debug, trace};
|
||||||
use nix_compat::{
|
use ulid::Ulid;
|
||||||
narinfo::{self, NarInfo, SigningKey},
|
|
||||||
nixbase32,
|
|
||||||
store_path::StorePath,
|
|
||||||
};
|
|
||||||
use sha2::{Digest, Sha256};
|
|
||||||
use tokio::{io::AsyncReadExt, process::Command};
|
|
||||||
use tracing::debug;
|
|
||||||
|
|
||||||
use crate::path_info::PathInfo;
|
use crate::{make_nar::MakeNar, path_info::PathInfo, store::Store};
|
||||||
|
|
||||||
const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5;
|
const CHUNK_SIZE: usize = 1024 * 1024 * 5;
|
||||||
|
|
||||||
pub struct Uploader<'a> {
|
pub struct Uploader<'a> {
|
||||||
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
|
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
|
||||||
path: PathInfo,
|
path: PathInfo,
|
||||||
s3_client: &'a s3::Client,
|
|
||||||
bucket: String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> Uploader<'a> {
|
impl<'a> Uploader<'a> {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
|
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
|
||||||
path: PathInfo,
|
path: PathInfo,
|
||||||
s3_client: &'a s3::Client,
|
|
||||||
bucket: String,
|
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
Ok(Self {
|
Ok(Self { signing_key, path })
|
||||||
signing_key,
|
|
||||||
path,
|
|
||||||
s3_client,
|
|
||||||
bucket,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn upload(&self) -> Result<()> {
|
pub async fn upload(&self, s3: Arc<AmazonS3>, store: Arc<Store>) -> Result<()> {
|
||||||
let nar = self.make_nar().await?;
|
let mut nar = MakeNar::new(&self.path, store)?;
|
||||||
let mut nar_info = self.narinfo_from_nar(&nar)?;
|
|
||||||
let nar = self.compress_nar(&nar).await;
|
|
||||||
|
|
||||||
// update fields that we know after compression
|
// we don't know what the hash of the compressed file will be so upload to a
|
||||||
let mut hasher = Sha256::new();
|
// temp location for now
|
||||||
hasher.update(&nar);
|
let temp_path = Path::parse(Ulid::new().to_string())?;
|
||||||
let hash: [u8; 32] = hasher.finalize().into();
|
let mut s3_writer = BufWriter::new(s3.clone(), temp_path.clone());
|
||||||
let nar_url = self.nar_url(&hash);
|
debug!("uploading to temp path: {}", temp_path);
|
||||||
nar_info.file_hash = Some(hash);
|
|
||||||
nar_info.file_size = Some(nar.len() as u64);
|
|
||||||
nar_info.url = nar_url.as_str();
|
|
||||||
debug!("uploading nar with key: {nar_url}");
|
|
||||||
|
|
||||||
if nar.len() < MULTIPART_CUTOFF {
|
// compress and upload nar
|
||||||
let put_object = self
|
let mut file_reader = nar.compress_and_hash()?;
|
||||||
.s3_client
|
loop {
|
||||||
.put_object()
|
let mut buf = BytesMut::with_capacity(CHUNK_SIZE);
|
||||||
.bucket(&self.bucket)
|
let n = file_reader.read_buf(&mut buf).await?;
|
||||||
.key(&nar_url)
|
s3_writer.put(buf.freeze()).await?;
|
||||||
.body(nar.into())
|
if n == 0 {
|
||||||
.send()
|
break;
|
||||||
.await?;
|
|
||||||
debug!("put object: {:#?}", put_object);
|
|
||||||
} else {
|
|
||||||
let multipart = self
|
|
||||||
.s3_client
|
|
||||||
.create_multipart_upload()
|
|
||||||
.bucket(&self.bucket)
|
|
||||||
.key(&nar_url)
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
let upload_id = multipart.upload_id().unwrap();
|
|
||||||
|
|
||||||
let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF);
|
|
||||||
let chunks = nar.chunks(MULTIPART_CUTOFF);
|
|
||||||
for (i, chunk) in chunks.enumerate() {
|
|
||||||
parts.push(tokio::task::spawn(
|
|
||||||
self.s3_client
|
|
||||||
.upload_part()
|
|
||||||
.bucket(&self.bucket)
|
|
||||||
.key(&nar_url)
|
|
||||||
.upload_id(upload_id)
|
|
||||||
.part_number(i as i32 + 1)
|
|
||||||
.body(chunk.to_vec().into())
|
|
||||||
.send(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let completed_parts = join_all(parts)
|
|
||||||
.await
|
|
||||||
.into_iter()
|
|
||||||
.flatten()
|
|
||||||
.collect::<Result<Vec<_>, _>>()?
|
|
||||||
.into_iter()
|
|
||||||
.enumerate()
|
|
||||||
.map(|(i, part)| {
|
|
||||||
CompletedPart::builder()
|
|
||||||
.set_e_tag(part.e_tag().map(ToString::to_string))
|
|
||||||
.set_part_number(Some(i as i32 + 1))
|
|
||||||
.set_checksum_sha256(part.checksum_sha256().map(ToString::to_string))
|
|
||||||
.build()
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
let completed_mp_upload = CompletedMultipartUpload::builder()
|
|
||||||
.set_parts(Some(completed_parts))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
let complete_mp_upload = self
|
|
||||||
.s3_client
|
|
||||||
.complete_multipart_upload()
|
|
||||||
.bucket(&self.bucket)
|
|
||||||
.key(&nar_url)
|
|
||||||
.upload_id(upload_id)
|
|
||||||
.multipart_upload(completed_mp_upload)
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
debug!("complete multipart upload: {:#?}", complete_mp_upload);
|
|
||||||
}
|
}
|
||||||
|
drop(file_reader);
|
||||||
|
|
||||||
let narinfo_url = format!("{}.narinfo", self.path.digest());
|
let mut nar_info = nar.get_narinfo()?;
|
||||||
debug!("uploading narinfo with key {narinfo_url}");
|
nar_info.add_signature(self.signing_key);
|
||||||
self.s3_client
|
|
||||||
.put_object()
|
// now that we can calculate the file_hash move the nar to where it should be
|
||||||
.bucket(&self.bucket)
|
let real_path = nar_url(
|
||||||
.key(narinfo_url)
|
&nar_info
|
||||||
.body(nar_info.to_string().as_bytes().to_vec().into())
|
.file_hash
|
||||||
.send()
|
.expect("file hash must be known at this point"),
|
||||||
.await?;
|
);
|
||||||
|
debug!("moving {} to {}", temp_path, real_path);
|
||||||
|
// the temp object must be done uploading
|
||||||
|
s3_writer.shutdown().await?;
|
||||||
|
// this is implemented as a copy-and-delete
|
||||||
|
s3.rename(&temp_path, &real_path).await?;
|
||||||
|
// set nar url in narinfo
|
||||||
|
nar_info.url = real_path.as_ref();
|
||||||
|
|
||||||
|
// upload narinfo
|
||||||
|
let narinfo_path = self.path.narinfo_path();
|
||||||
|
debug!("uploading narinfo: {}", narinfo_path);
|
||||||
|
trace!("narinfo: {:#}", nar_info);
|
||||||
|
s3.put(&narinfo_path, nar_info.to_string().into()).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
async fn make_nar(&self) -> Result<Vec<u8>> {
|
|
||||||
Ok(Command::new("nix")
|
/// calculate url where the compressed nar should be uploaded
|
||||||
.arg("nar")
|
fn nar_url(file_hash: &[u8]) -> Path {
|
||||||
.arg("dump-path")
|
let compressed_nar_hash = nixbase32::encode(file_hash);
|
||||||
.arg(self.path.absolute_path())
|
Path::parse(format!("nar/{compressed_nar_hash}.nar.zst"))
|
||||||
.output()
|
.expect("should parse to a valid object_store::path::Path")
|
||||||
.await?
|
|
||||||
.stdout)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn narinfo_from_nar(&self, nar: &[u8]) -> Result<NarInfo> {
|
|
||||||
let mut hasher = Sha256::new();
|
|
||||||
hasher.update(nar);
|
|
||||||
let nar_hash: [u8; 32] = hasher.finalize().into();
|
|
||||||
let mut nar_info = NarInfo {
|
|
||||||
flags: narinfo::Flags::empty(),
|
|
||||||
store_path: self.path.path.as_ref(),
|
|
||||||
nar_hash,
|
|
||||||
nar_size: nar.len() as u64,
|
|
||||||
references: self.path.references.iter().map(StorePath::as_ref).collect(),
|
|
||||||
signatures: Vec::new(),
|
|
||||||
ca: None,
|
|
||||||
system: None,
|
|
||||||
deriver: self.path.deriver.as_ref().map(|x| x.as_ref()),
|
|
||||||
compression: Some("zstd"),
|
|
||||||
file_hash: None,
|
|
||||||
file_size: None,
|
|
||||||
url: "",
|
|
||||||
};
|
|
||||||
// signature consists of: store_path, nar_hash, nar_size, and references
|
|
||||||
nar_info.add_signature(self.signing_key);
|
|
||||||
Ok(nar_info)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn nar_url(&self, compressed_nar_hash: &[u8]) -> String {
|
|
||||||
let compressed_nar_hash = nixbase32::encode(compressed_nar_hash);
|
|
||||||
format!("nar/{compressed_nar_hash}.nar.zst")
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn compress_nar(&self, nar: &[u8]) -> Vec<u8> {
|
|
||||||
let mut encoder = ZstdEncoder::with_quality(nar, Level::Default);
|
|
||||||
let mut compressed = Vec::with_capacity(nar.len());
|
|
||||||
encoder
|
|
||||||
.read_to_end(&mut compressed)
|
|
||||||
.await
|
|
||||||
.expect("should compress just fine");
|
|
||||||
compressed
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
32
tests/common/mod.rs
Normal file
32
tests/common/mod.rs
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use std::process::Command;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use nixcp::store::Store;
|
||||||
|
|
||||||
|
pub const HELLO: &str = "github:nixos/nixpkgs?ref=f771eb401a46846c1aebd20552521b233dd7e18b#hello";
|
||||||
|
pub const HELLO_DRV: &str = "iqbwkm8mjjjlmw6x6ry9rhzin2cp9372-hello-2.12.1.drv";
|
||||||
|
pub const HELLO_PATH: &str = "/nix/store/9bwryidal9q3g91cjm6xschfn4ikd82q-hello-2.12.1";
|
||||||
|
|
||||||
|
pub struct Context {
|
||||||
|
pub store: Arc<Store>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Context {
|
||||||
|
fn new() -> Self {
|
||||||
|
// hello must be in the store
|
||||||
|
Command::new("nix")
|
||||||
|
.arg("build")
|
||||||
|
.arg("--no-link")
|
||||||
|
.arg(HELLO)
|
||||||
|
.status()
|
||||||
|
.unwrap();
|
||||||
|
let store = Arc::new(Store::connect().expect("connect to nix store"));
|
||||||
|
Self { store }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn context() -> Context {
|
||||||
|
Context::new()
|
||||||
|
}
|
26
tests/nar.rs
Normal file
26
tests/nar.rs
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
use crate::common::HELLO_PATH;
|
||||||
|
use nix_compat::nixbase32;
|
||||||
|
use nixcp::make_nar::MakeNar;
|
||||||
|
use nixcp::path_info::PathInfo;
|
||||||
|
use sha2::Digest;
|
||||||
|
use tokio::io::AsyncReadExt;
|
||||||
|
|
||||||
|
mod common;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn nar_size_and_hash() {
|
||||||
|
let ctx = common::context();
|
||||||
|
let path_info = PathInfo::from_path(HELLO_PATH, &ctx.store).await.unwrap();
|
||||||
|
|
||||||
|
let mut nar = MakeNar::new(&path_info, ctx.store).unwrap();
|
||||||
|
let mut reader = nar.compress_and_hash().unwrap();
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
reader.read_to_end(&mut buf).await.unwrap();
|
||||||
|
drop(reader);
|
||||||
|
|
||||||
|
assert_eq!(nar.nar_size, 234680);
|
||||||
|
|
||||||
|
let nar_hash = nar.nar_hasher.finalize();
|
||||||
|
let real_nar_hash = "08za7nnjda8kpdsd73v3mhykjvp0rsmskwsr37winhmzgm6iw79w";
|
||||||
|
assert_eq!(nixbase32::encode(nar_hash.as_slice()), real_nar_hash);
|
||||||
|
}
|
56
tests/path_info.rs
Normal file
56
tests/path_info.rs
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
use nixcp::path_info::PathInfo;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
|
use crate::common::{HELLO, HELLO_DRV, HELLO_PATH};
|
||||||
|
|
||||||
|
mod common;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn path_info_from_package() {
|
||||||
|
let ctx = common::context();
|
||||||
|
let path = PathBuf::from(HELLO);
|
||||||
|
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||||
|
.await
|
||||||
|
.expect("get pathinfo from package");
|
||||||
|
assert_eq!(path_info.path.to_string(), HELLO_DRV);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn path_info_from_path() {
|
||||||
|
let ctx = common::context();
|
||||||
|
let path = PathBuf::from(HELLO_PATH);
|
||||||
|
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||||
|
.await
|
||||||
|
.expect("get pathinfo from package");
|
||||||
|
assert_eq!(path_info.path.to_string(), HELLO_DRV);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn path_info_symlink() {
|
||||||
|
let ctx = common::context();
|
||||||
|
|
||||||
|
let temp_path = TempDir::new().unwrap();
|
||||||
|
let link_path = temp_path.path().join("result");
|
||||||
|
|
||||||
|
// symlink at ./result (like `nix build`)
|
||||||
|
std::os::unix::fs::symlink(HELLO_PATH, &link_path).unwrap();
|
||||||
|
|
||||||
|
// should resolve symlink
|
||||||
|
let path_info = PathInfo::from_derivation(&link_path, &ctx.store)
|
||||||
|
.await
|
||||||
|
.expect("get pathinfo from package");
|
||||||
|
assert_eq!(path_info.path.to_string(), HELLO_DRV);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn closure() {
|
||||||
|
let ctx = common::context();
|
||||||
|
let path = PathBuf::from(HELLO);
|
||||||
|
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||||
|
.await
|
||||||
|
.expect("get pathinfo from package");
|
||||||
|
let closure = path_info.get_closure(&ctx.store).await.unwrap();
|
||||||
|
assert_eq!(closure.len(), 472);
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue