Compare commits

..

3 commits

Author SHA1 Message Date
cy
ac4b2ba136
fancy nar cooking 2025-04-26 00:39:22 -04:00
cy
4808671071
some progress on using nix-compat for nar creation 2025-04-18 21:23:52 -04:00
cy
a17fa92c78
some progress at an attempt 2025-04-18 00:50:21 -04:00
25 changed files with 1529 additions and 1160 deletions

View file

@ -1,2 +0,0 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]

View file

@ -1,3 +0,0 @@
[*.nix]
indent_size = 2
indent_stype = space

View file

@ -1,73 +0,0 @@
name: build
on:
workflow_dispatch:
push:
pull_request:
env:
TERM: ansi
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets. AWS_SECRET_ACCESS_KEY }}
AWS_ENDPOINT: https://s3.cy7.sh
jobs:
build-packages:
strategy:
fail-fast: false
matrix:
os:
- ubuntu-latest
- ubuntu-24.04-arm
- macos-latest # arm64
- macos-13 # x86
runs-on: ${{ matrix.os }}
steps:
- name: setup binary cache key
run: echo -n "${{ secrets.NIX_CACHE_SECRET_KEY }}" | xxd -p -r > ${{ runner.temp }}/cache-priv-key.pem
- name: Install Nix
uses: cachix/install-nix-action@526118121621777ccd86f79b04685a9319637641
with:
enable_kvm: true
extra_nix_config: |
show-trace = true
experimental-features = nix-command flakes
secret-key-files = ${{ runner.temp }}/cache-priv-key.pem
extra-substituters = https://nixcache.cy7.sh
extra-trusted-public-keys = nixcache.cy7.sh:DN3d1dt0wnXfTH03oVmTee4KgmdNdB0NY3SuzA8Fwx8=
- uses: actions/checkout@85e6279cec87321a52edac9c87bce653a07cf6c2
with:
persist-credentials: false
- name: cache devshell
run: |
nix build .#devShells.$(nix eval --impure --raw --expr 'builtins.currentSystem').default
nix run \
github:cything/nixcp -- push \
--bucket nixcache \
--signing-key ${{ runner.temp }}/cache-priv-key.pem \
result
- name: build
run: nix build -L .
- name: cache
run: |
nix run \
github:cything/nixcp -- push \
--bucket nixcache \
--signing-key ${{ runner.temp }}/cache-priv-key.pem \
result
- name: prepare tarball to upload
run: nix run github:nixos/nixpkgs#gnutar hcvf result.tar result
- name: upload result
uses: actions/upload-artifact@6027e3dd177782cd8ab9af838c04fd81a07f1d47
with:
name: ${{ matrix.os }}.tar
path: result.tar
if-no-files-found: error

View file

@ -1,27 +0,0 @@
name: check
on:
workflow_dispatch:
push:
pull_request:
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Install Nix
uses: cachix/install-nix-action@526118121621777ccd86f79b04685a9319637641
with:
enable_kvm: true
extra_nix_config: |
show-trace = true
experimental-features = nix-command flakes
extra-substituters = https://nixcache.cy7.sh
extra-trusted-public-keys = nixcache.cy7.sh:DN3d1dt0wnXfTH03oVmTee4KgmdNdB0NY3SuzA8Fwx8=
- uses: actions/checkout@85e6279cec87321a52edac9c87bce653a07cf6c2
with:
persist-credentials: false
- name: Run checks
run: nix flake check -L

View file

@ -1,30 +0,0 @@
name: test
on:
workflow_dispatch:
push:
pull_request:
env:
CARGO_TERM_COLOR: always
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Install Nix
uses: cachix/install-nix-action@526118121621777ccd86f79b04685a9319637641
with:
enable_kvm: true
extra_nix_config: |
show-trace = true
experimental-features = nix-command flakes
extra-substituters = https://nixcache.cy7.sh
extra-trusted-public-keys = nixcache.cy7.sh:DN3d1dt0wnXfTH03oVmTee4KgmdNdB0NY3SuzA8Fwx8=
- uses: actions/checkout@85e6279cec87321a52edac9c87bce653a07cf6c2
with:
persist-credentials: false
- name: Run tests
run: nix develop -c cargo test --verbose

1499
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -3,37 +3,30 @@ name = "nixcp"
version = "0.1.0" version = "0.1.0"
edition = "2024" edition = "2024"
[profile.release]
lto = true
codegen-units = 1
[dependencies] [dependencies]
anyhow = "1.0.97" anyhow = "1.0.97"
async-compression = { version = "0.4.22", features = ["tokio", "zstd"] } async-compression = { version = "0.4.22", features = ["tokio", "zstd"] }
aws-config = { version = "1.6.1", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.82.0"
clap = { version = "4.5.34", features = ["derive"] } clap = { version = "4.5.34", features = ["derive"] }
ed25519-dalek = "2.1.1" ed25519-dalek = "2.1.1"
futures = "0.3.31" futures = "0.3.31"
nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" } nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" }
regex = "1.11.1" regex = "1.11.1"
reqwest = "0.12.15" reqwest = "0.12.15"
serde = { version = "1.0.219", features = ["derive"] } serde = { version = "1.0.219", features = [ "derive" ]}
serde_json = "1.0.140" serde_json = "1.0.140"
sha2 = "0.10.8" sha2 = "0.10.8"
tokio = { version = "1.44.1", features = ["full", "tracing", "parking_lot"] } tokio = { version = "1.44.1", features = [ "full", "tracing", "parking_lot" ]}
tracing = "0.1.41" tracing = "0.1.41"
url = { version = "2.5.4", features = ["serde"] } url = { version = "2.5.4", features = [ "serde" ]}
cxx = "1.0" cxx = "1.0"
console-subscriber = "0.4.1" console-subscriber = "0.4.1"
tokio-util = { version = "0.7.15", features = ["io"] }
bytes = "1.10.1" bytes = "1.10.1"
object_store = { version = "0.12.0", features = ["aws"] } tokio-stream = { version = "0.1.17", features = ["fs"] }
ulid = "1.2.1" tempfile = "3.19.1"
tracing-subscriber = "0.3.19" tokio-util = { version = "0.7.14", features = ["io"] }
humansize = "2.1.3"
[build-dependencies] [build-dependencies]
cxx-build = "1.0" cxx-build = "1.0"
pkg-config = "0.3.32" pkg-config = "0.3.32"
[dev-dependencies]
tempfile = "3.19.1"

21
LICENSE
View file

@ -1,21 +0,0 @@
MIT License
Copyright (c) 2025 Cy
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -11,13 +11,14 @@ The signing key is generated with:
nix-store --generate-binary-cache-key nixcache.cy7.sh cache-priv-key.pem cache-pub-key.pem nix-store --generate-binary-cache-key nixcache.cy7.sh cache-priv-key.pem cache-pub-key.pem
``` ```
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables should be set with your s3 credentials. `AWS_ACCESS_KEY_ID` and `AWS_ENDPOINT_URL` environment variables should be set with your s3 credentials.
``` ```
Usage: nixcp push [OPTIONS] --bucket <bucket name> --signing-key <SIGNING_KEY> [PATH]... Usage: nixcp [OPTIONS] --bucket <bucket name> --signing-key <SIGNING_KEY> <COMMAND>
Arguments: Commands:
[PATH]... Path to upload e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 push
help Print this message or the help of the given subcommand(s)
Options: Options:
--bucket <bucket name> --bucket <bucket name>
@ -27,13 +28,15 @@ Options:
--signing-key <SIGNING_KEY> --signing-key <SIGNING_KEY>
Path to the file containing signing key e.g. ~/cache-priv-key.pem Path to the file containing signing key e.g. ~/cache-priv-key.pem
--region <REGION> --region <REGION>
If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1 If unspecified, will get it form AWS_DEFAULT_REGION envar or the AWS default
--endpoint <ENDPOINT> --endpoint <ENDPOINT>
If unspecifed, will get it from AWS_ENDPOINT envar e.g. https://s3.example.com If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default e.g. https://s3.example.com
--no-default-upstream --profile <PROFILE>
Do not include cache.nixos.org as upstream AWS profile to use
-h, --help -h, --help
Print help Print help
-V, --version
Print version
``` ```
## Install with nix ## Install with nix

View file

@ -11,15 +11,8 @@
}; };
}; };
outputs = outputs = inputs@{ nixpkgs, flake-utils, crane, ... }:
inputs@{ flake-utils.lib.eachDefaultSystem (system:
nixpkgs,
flake-utils,
crane,
...
}:
flake-utils.lib.eachDefaultSystem (
system:
let let
pkgs = import nixpkgs { pkgs = import nixpkgs {
inherit system; inherit system;
@ -28,21 +21,11 @@
]; ];
}; };
toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml; toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;
craneLib = (crane.mkLib pkgs).overrideToolchain (_: toolchain); craneLib = (crane.mkLib pkgs).overrideToolchain(_: toolchain);
lib = pkgs.lib; lib = pkgs.lib;
in
# don't clean cpp files {
cppFilter = path: _type: builtins.match ".*(cpp|hpp)$" path != null; devShells.default = pkgs.mkShell {
cppOrCargo = path: type: (cppFilter path type) || (craneLib.filterCargoSources path type);
src = lib.cleanSourceWith {
src = ./.;
filter = cppOrCargo;
name = "source";
};
commonArgs = {
inherit src;
strictDeps = true;
nativeBuildInputs = with pkgs; [ nativeBuildInputs = with pkgs; [
pkg-config pkg-config
]; ];
@ -51,58 +34,23 @@
openssl openssl
nix nix
boost boost
];
# for cpp bindings to work
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
# skip integration tests (they need a connection to the nix store)
cargoTestExtraArgs = "--bins";
};
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
nixcp = craneLib.buildPackage (
commonArgs
// {
inherit cargoArtifacts;
}
);
in
{
checks = {
# clippy with all warnings denied
clippy = craneLib.cargoClippy (
commonArgs
// {
inherit cargoArtifacts;
cargoClippyExtraArgs = "--all-targets -- --deny warnings";
}
);
# check formatting
cargoFmt = craneLib.cargoFmt {
inherit src;
};
tomlFmt = craneLib.taploFmt {
src = lib.sources.sourceFilesBySuffices src [ ".toml" ];
};
};
devShells.default = craneLib.devShell {
inputsFrom = [ nixcp ];
RUST_BACKGRACE = 1;
# for cpp bindings to work
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
packages = with pkgs; [
tokio-console tokio-console
cargo-udeps
cargo-audit
]; ];
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
RUST_LOG = "nixcp=debug";
RUST_BACKGRACE = 1;
}; };
formatter = pkgs.nixfmt-rfc-style; packages.default = craneLib.buildPackage {
src = craneLib.cleanCargoSource ./.;
packages.default = nixcp; strictDeps = true;
nativeBuildInputs = with pkgs; [
pkg-config
];
buildInputs = with pkgs; [
openssl
];
};
} }
); );
} }

View file

@ -1,4 +1,9 @@
[toolchain] [toolchain]
channel = "nightly" channel = "nightly"
profile = "minimal" profile = "minimal"
components = ["rust-src", "rust-analyzer", "rustfmt", "clippy"] components = [
"rust-src",
"rust-analyzer",
"rustfmt",
"clippy",
]

View file

@ -23,7 +23,6 @@ use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use anyhow::Result; use anyhow::Result;
use bytes::Bytes;
use futures::stream::{Stream, StreamExt}; use futures::stream::{Stream, StreamExt};
use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::io::{AsyncWrite, AsyncWriteExt};
@ -126,7 +125,7 @@ impl AsyncWriteAdapter {
writer.write_all(&v).await?; writer.write_all(&v).await?;
} }
Err(e) => { Err(e) => {
return Err(e.into()); return Err(e);
} }
} }
} }
@ -140,7 +139,7 @@ impl AsyncWriteAdapter {
} }
impl Stream for AsyncWriteAdapter { impl Stream for AsyncWriteAdapter {
type Item = std::io::Result<Bytes>; type Item = Result<Vec<u8>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.receiver.poll_recv(cx) { match self.receiver.poll_recv(cx) {
@ -148,9 +147,9 @@ impl Stream for AsyncWriteAdapter {
Poll::Ready(Some(message)) => { Poll::Ready(Some(message)) => {
use AsyncWriteMessage::*; use AsyncWriteMessage::*;
match message { match message {
Data(v) => Poll::Ready(Some(Ok(v.into()))), Data(v) => Poll::Ready(Some(Ok(v))),
Error(exception) => { Error(exception) => {
let error = std::io::Error::other(format!("cxx error: {exception}")); let error = anyhow::Error::msg(format!("cxx error: {exception}"));
Poll::Ready(Some(Err(error))) Poll::Ready(Some(Err(error)))
} }
Eof => { Eof => {
@ -161,7 +160,7 @@ impl Stream for AsyncWriteAdapter {
} }
Poll::Ready(None) => { Poll::Ready(None) => {
if !self.eof { if !self.eof {
Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe)))) Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe).into())))
} else { } else {
Poll::Ready(None) Poll::Ready(None)
} }
@ -209,6 +208,9 @@ mod ffi {
include_derivers: bool, include_derivers: bool,
) -> Result<UniquePtr<CxxVector<CxxString>>>; ) -> Result<UniquePtr<CxxVector<CxxString>>>;
/// Obtains a handle to the Nix store.
fn open_nix_store() -> Result<UniquePtr<CNixStore>>;
/// Creates a NAR dump from a path. /// Creates a NAR dump from a path.
fn nar_from_path( fn nar_from_path(
self: Pin<&mut CNixStore>, self: Pin<&mut CNixStore>,
@ -216,17 +218,12 @@ mod ffi {
sender: Box<AsyncWriteSender>, sender: Box<AsyncWriteSender>,
) -> Result<()>; ) -> Result<()>;
/// Obtains a handle to the Nix store.
fn open_nix_store() -> Result<UniquePtr<CNixStore>>;
// ========= // =========
// CPathInfo // CPathInfo
// ========= // =========
/// Mid-level wrapper for the `nix::ValidPathInfo` struct. /// Mid-level wrapper for the `nix::ValidPathInfo` struct.
type CPathInfo; type CPathInfo;
/// Returns the size of the NAR.
fn nar_size(self: Pin<&mut CPathInfo>) -> u64;
/// Returns the references of the store path. /// Returns the references of the store path.
fn references(self: Pin<&mut CPathInfo>) -> UniquePtr<CxxVector<CxxString>>; fn references(self: Pin<&mut CPathInfo>) -> UniquePtr<CxxVector<CxxString>>;

View file

@ -57,10 +57,6 @@ void RustSink::eof() {
CPathInfo::CPathInfo(nix::ref<const nix::ValidPathInfo> pi) : pi(pi) {} CPathInfo::CPathInfo(nix::ref<const nix::ValidPathInfo> pi) : pi(pi) {}
uint64_t CPathInfo::nar_size() {
return this->pi->narSize;
}
std::unique_ptr<std::vector<std::string>> CPathInfo::sigs() { std::unique_ptr<std::vector<std::string>> CPathInfo::sigs() {
std::vector<std::string> result; std::vector<std::string> result;
for (auto&& elem : this->pi->sigs) { for (auto&& elem : this->pi->sigs) {

View file

@ -65,7 +65,6 @@ public:
CPathInfo(nix::ref<const nix::ValidPathInfo> pi); CPathInfo(nix::ref<const nix::ValidPathInfo> pi);
std::unique_ptr<std::vector<std::string>> sigs(); std::unique_ptr<std::vector<std::string>> sigs();
std::unique_ptr<std::vector<std::string>> references(); std::unique_ptr<std::vector<std::string>> references();
uint64_t nar_size();
}; };
class CNixStore { class CNixStore {

2
src/cli.rs Normal file
View file

@ -0,0 +1,2 @@

View file

@ -1,65 +0,0 @@
use std::path::PathBuf;
use clap::{Args, Parser, Subcommand};
mod bindings;
pub mod make_nar;
pub mod path_info;
pub mod push;
pub mod store;
mod uploader;
#[derive(Parser, Debug)]
#[command(version)]
#[command(name = "nixcp")]
#[command(about = "Upload store paths to a s3 binary cache")]
#[command(long_about = None)]
pub struct Cli {
#[command(subcommand)]
pub command: Commands,
/// Whether to enable tokio-console
#[arg(long)]
pub tokio_console: bool,
}
#[derive(Debug, Subcommand)]
pub enum Commands {
#[command(arg_required_else_help = true)]
Push(PushArgs),
}
#[derive(Debug, Args)]
pub struct PushArgs {
/// The s3 bucket to upload to
#[arg(long, value_name = "bucket name")]
bucket: String,
/// Upstream cache to check against. Can be specified multiple times.
/// cache.nixos.org is always included.
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
upstreams: Vec<String>,
/// Path to the file containing signing key
/// e.g. ~/cache-priv-key.pem
#[arg(long)]
signing_key: String,
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
#[arg(long)]
region: Option<String>,
/// If unspecifed, will get it from AWS_ENDPOINT envar
/// e.g. https://s3.example.com
#[arg(long)]
endpoint: Option<String>,
/// Do not include cache.nixos.org as upstream
#[arg(long)]
no_default_upstream: bool,
/// Path to upload
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
#[arg(value_name = "PATH")]
pub paths: Vec<PathBuf>,
}

View file

@ -1,15 +1,80 @@
use anyhow::{Context, Result}; #![feature(let_chains)]
use clap::Parser; #![feature(extend_one)]
use tracing_subscriber::{EnvFilter, prelude::*};
use nixcp::push::Push; use std::path::PathBuf;
use nixcp::store::Store;
use nixcp::{Cli, Commands}; use anyhow::{Context, Result};
use clap::{Args, Parser, Subcommand};
use push::Push;
use store::Store;
mod bindings;
mod cli;
mod path_info;
mod push;
mod store;
mod uploader;
#[derive(Parser, Debug)]
#[command(version)]
#[command(name = "nixcp")]
#[command(about = "Upload store paths to a s3 binary cache")]
#[command(long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Debug, Subcommand)]
enum Commands {
#[command(arg_required_else_help = true)]
Push(PushArgs),
}
#[derive(Debug, Args)]
pub struct PushArgs {
/// The s3 bucket to upload to
#[arg(long, value_name = "bucket name")]
bucket: String,
/// Upstream cache to check against. Can be specified multiple times.
/// cache.nixos.org is always included.
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
upstreams: Vec<String>,
/// Path to the file containing signing key
/// e.g. ~/cache-priv-key.pem
#[arg(long)]
signing_key: String,
/// If unspecified, will get it form AWS_DEFAULT_REGION envar
#[arg(long)]
region: Option<String>,
/// If unspecifed, will get it from AWS_ENDPOINT_URL envar
/// e.g. https://s3.example.com
#[arg(long)]
endpoint: Option<String>,
/// AWS profile to use
#[arg(long)]
profile: Option<String>,
#[arg(long)]
skip_signature_check: bool,
/// Path to upload
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
#[arg(value_name = "PATH")]
paths: Vec<PathBuf>,
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
console_subscriber::init();
let cli = Cli::parse(); let cli = Cli::parse();
init_logging(cli.tokio_console);
match &cli.command { match &cli.command {
Commands::Push(cli) => { Commands::Push(cli) => {
@ -24,23 +89,3 @@ async fn main() -> Result<()> {
Ok(()) Ok(())
} }
fn init_logging(tokio_console: bool) {
let env_filter = EnvFilter::from_default_env();
let fmt_layer = tracing_subscriber::fmt::layer().with_filter(env_filter);
let console_layer = if tokio_console {
Some(console_subscriber::spawn())
} else {
None
};
tracing_subscriber::registry()
.with(fmt_layer)
.with(console_layer)
.init();
if tokio_console {
println!("tokio-console is enabled");
}
}

View file

@ -1,81 +0,0 @@
use anyhow::Result;
use async_compression::{Level, tokio::bufread::ZstdEncoder};
use nix_compat::{
narinfo::{self, NarInfo},
store_path::StorePath,
};
use sha2::{Digest, Sha256};
use std::{mem::take, sync::Arc};
use tokio::io::{AsyncRead, BufReader};
use tokio_util::io::InspectReader;
use crate::path_info::PathInfo;
use crate::store::Store;
pub struct MakeNar<'a> {
path_info: &'a PathInfo,
store: Arc<Store>,
pub nar_hasher: Sha256,
/// hash of compressed nar file
file_hasher: Sha256,
pub nar_size: u64,
file_size: u64,
}
impl<'a> MakeNar<'a> {
pub fn new(path_info: &'a PathInfo, store: Arc<Store>) -> Result<Self> {
Ok(Self {
path_info,
store,
nar_hasher: Sha256::new(),
file_hasher: Sha256::new(),
nar_size: 0,
file_size: 0,
})
}
/// Returns a compressed nar reader which can be uploaded. File hash will be available when
/// everything is read
pub fn compress_and_hash(&mut self) -> Result<impl AsyncRead> {
let nar_reader = self.store.nar_from_path(self.path_info.path.clone());
// reader that hashes as nar is read
let nar_reader = InspectReader::new(nar_reader, |x| {
self.nar_size += x.len() as u64;
self.nar_hasher.update(x);
});
let encoder = ZstdEncoder::with_quality(BufReader::new(nar_reader), Level::Default);
// reader that updates file_hash as the compressed nar is read
Ok(InspectReader::new(encoder, |x| {
self.file_size += x.len() as u64;
self.file_hasher.update(x);
}))
}
/// Returns *unsigned* narinfo. `url` must be updated before uploading
pub fn get_narinfo(&mut self) -> Result<NarInfo> {
let file_hash = take(&mut self.file_hasher).finalize().into();
let nar_hash = take(&mut self.nar_hasher).finalize().into();
Ok(NarInfo {
flags: narinfo::Flags::empty(),
store_path: self.path_info.path.as_ref(),
nar_hash,
nar_size: self.nar_size,
references: self
.path_info
.references
.iter()
.map(StorePath::as_ref)
.collect(),
signatures: Vec::new(),
ca: None,
system: None,
deriver: None,
compression: Some("zstd"),
file_hash: Some(file_hash),
file_size: Some(self.file_size),
url: "",
})
}
}

View file

@ -1,10 +1,10 @@
use std::collections::HashSet; use std::collections::HashSet;
use anyhow::{Context, Result, anyhow}; use anyhow::{Context, Result};
use aws_sdk_s3 as s3;
use futures::future::join_all; use futures::future::join_all;
use nix_compat::nixbase32; use nix_compat::nixbase32;
use nix_compat::store_path::StorePath; use nix_compat::store_path::StorePath;
use object_store::{ObjectStore, aws::AmazonS3, path::Path as ObjectPath};
use regex::Regex; use regex::Regex;
use std::path::Path; use std::path::Path;
use tokio::process::Command; use tokio::process::Command;
@ -13,61 +13,41 @@ use url::Url;
use crate::store::Store; use crate::store::Store;
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone)]
pub struct PathInfo { pub struct PathInfo {
pub path: StorePath<String>, pub path: StorePath<String>,
pub signatures: Vec<String>, pub signatures: Vec<String>,
pub references: Vec<StorePath<String>>, pub references: Vec<StorePath<String>>,
pub nar_size: u64,
} }
impl PathInfo { impl PathInfo {
pub async fn from_derivation(drv: &Path, store: &Store) -> Result<Self> { pub async fn from_path(path: &Path, store: &Store) -> Result<Self> {
debug!("query path info for {:?}", drv); debug!("query path info for {:?}", path);
let derivation = match drv.extension() { let derivation = match path.extension() {
Some(ext) if ext == "drv" => drv.as_os_str().as_encoded_bytes(), Some(ext) if ext == "drv" => path.as_os_str().as_encoded_bytes(),
_ => { _ => {
let drv = {
// resolve symlink
if drv.is_symlink() {
&drv.canonicalize()?
} else {
drv
}
};
&Command::new("nix") &Command::new("nix")
.arg("path-info") .arg("path-info")
.arg("--derivation") .arg("--derivation")
.arg(drv) .arg(path)
.output() .output()
.await .await
.context(format!("run command: nix path-info --derivaiton {drv:?}"))? .context(format!("run command: nix path-info --derivaiton {path:?}"))?
.stdout .stdout
} }
}; };
let derivation = String::from_utf8_lossy(derivation); let derivation = String::from_utf8_lossy(derivation);
debug!("derivation: {derivation}"); debug!("derivation: {derivation}");
if derivation.is_empty() { let store_path = StorePath::from_absolute_path(derivation.trim().as_bytes())
return Err(anyhow!( .context("storepath from derivation")?;
"nix path-info did not return a derivation for {drv:#?}"
));
}
Self::from_path(derivation.trim(), store).await
}
pub async fn from_path(path: &str, store: &Store) -> Result<Self> {
let store_path =
StorePath::from_absolute_path(path.as_bytes()).context("storepath from path")?;
store store
.query_path_info(store_path) .query_path_info(store_path)
.await .await
.context("query pathinfo for path") .context("query pathinfo for derivation")
} }
// TODO: skip call to query_path_info and return Vec<Path>?
pub async fn get_closure(&self, store: &Store) -> Result<Vec<Self>> { pub async fn get_closure(&self, store: &Store) -> Result<Vec<Self>> {
let futs = store let futs = store
.compute_fs_closure(self.path.clone()) .compute_fs_closure(self.path.clone())
@ -102,15 +82,15 @@ impl PathInfo {
.filter_map(|signature| Some(signature.split_once(":")?.0)) .filter_map(|signature| Some(signature.split_once(":")?.0))
.collect(); .collect();
trace!("signers for {}: {:?}", self.path, signers); trace!("signers for {}: {:?}", self.path, signers);
signers return signers;
} }
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool { pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
for upstream in upstreams { for upstream in upstreams {
let upstream = upstream let upstream = upstream
.join(self.narinfo_path().as_ref()) .join(format!("{}.narinfo", self.digest()).as_str())
.expect("adding <hash>.narinfo should make a valid url"); .expect("adding <hash>.narinfo should make a valid url");
trace!("querying {}", upstream); debug!("querying {}", upstream);
let res_status = reqwest::Client::new() let res_status = reqwest::Client::new()
.head(upstream.as_str()) .head(upstream.as_str())
.send() .send()
@ -128,12 +108,17 @@ impl PathInfo {
self.path.to_absolute_path() self.path.to_absolute_path()
} }
pub fn narinfo_path(&self) -> ObjectPath { pub fn digest(&self) -> String {
ObjectPath::parse(format!("{}.narinfo", nixbase32::encode(self.path.digest()))) nixbase32::encode(self.path.digest())
.expect("must parse to a valid object_store path")
} }
pub async fn check_if_already_exists(&self, s3: &AmazonS3) -> bool { pub async fn check_if_already_exists(&self, s3_client: &s3::Client, bucket: String) -> bool {
s3.head(&self.narinfo_path()).await.is_ok() s3_client
.head_object()
.bucket(bucket)
.key(format!("{}.narinfo", self.digest()))
.send()
.await
.is_ok()
} }
} }

View file

@ -1,6 +1,6 @@
use std::{ use std::{
collections::HashSet,
fs, fs,
iter::once,
path::PathBuf, path::PathBuf,
sync::{ sync::{
Arc, Arc,
@ -9,11 +9,11 @@ use std::{
}; };
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use aws_config::Region;
use aws_sdk_s3 as s3;
use futures::future::join_all; use futures::future::join_all;
use humansize::{DECIMAL, format_size};
use nix_compat::narinfo::{self, SigningKey}; use nix_compat::narinfo::{self, SigningKey};
use object_store::aws::{AmazonS3, AmazonS3Builder}; use tokio::sync::{RwLock, mpsc};
use tokio::sync::{RwLock, Semaphore, mpsc};
use tracing::debug; use tracing::debug;
use url::Url; use url::Url;
@ -21,10 +21,11 @@ use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
pub struct Push { pub struct Push {
upstream_caches: Vec<Url>, upstream_caches: Vec<Url>,
store_paths: Arc<RwLock<HashSet<PathInfo>>>, store_paths: Arc<RwLock<Vec<PathInfo>>>,
s3_client: s3::Client,
signing_key: SigningKey<ed25519_dalek::SigningKey>, signing_key: SigningKey<ed25519_dalek::SigningKey>,
bucket: String,
store: Arc<Store>, store: Arc<Store>,
s3: Arc<AmazonS3>,
// paths that we skipped cause of a signature match // paths that we skipped cause of a signature match
signature_hit_count: AtomicUsize, signature_hit_count: AtomicUsize,
// paths that we skipped cause we found it on an upstream // paths that we skipped cause we found it on an upstream
@ -38,13 +39,11 @@ pub struct Push {
impl Push { impl Push {
pub async fn new(cli: &PushArgs, store: Store) -> Result<Self> { pub async fn new(cli: &PushArgs, store: Store) -> Result<Self> {
let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1); let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1);
if !cli.no_default_upstream { for upstream in cli
upstreams.push( .upstreams
Url::parse("https://cache.nixos.org") .iter()
.expect("default upstream must be a valid url"), .chain(once(&"https://cache.nixos.org".to_string()))
); {
}
for upstream in &cli.upstreams {
upstreams upstreams
.push(Url::parse(upstream).context(format!("failed to parse {upstream} as url"))?); .push(Url::parse(upstream).context(format!("failed to parse {upstream} as url"))?);
} }
@ -52,21 +51,25 @@ impl Push {
let key = fs::read_to_string(&cli.signing_key)?; let key = fs::read_to_string(&cli.signing_key)?;
let signing_key = narinfo::parse_keypair(key.as_str())?.0; let signing_key = narinfo::parse_keypair(key.as_str())?.0;
let mut s3_builder = AmazonS3Builder::from_env().with_bucket_name(&cli.bucket); let mut s3_config = aws_config::from_env();
if let Some(region) = &cli.region { if let Some(region) = &cli.region {
s3_builder = s3_builder.with_region(region); s3_config = s3_config.region(Region::new(region.clone()));
} }
if let Some(endpoint) = &cli.endpoint { if let Some(endpoint) = &cli.endpoint {
s3_builder = s3_builder.with_endpoint(endpoint); s3_config = s3_config.endpoint_url(endpoint);
}
if let Some(profile) = &cli.profile {
s3_config = s3_config.profile_name(profile);
} }
let s3_client = s3::Client::new(&s3_config.load().await);
Ok(Self { Ok(Self {
upstream_caches: upstreams, upstream_caches: upstreams,
store_paths: Arc::new(RwLock::new(HashSet::new())), store_paths: Arc::new(RwLock::new(Vec::new())),
s3_client,
signing_key, signing_key,
bucket: cli.bucket.clone(),
store: Arc::new(store), store: Arc::new(store),
s3: Arc::new(s3_builder.build()?),
signature_hit_count: AtomicUsize::new(0), signature_hit_count: AtomicUsize::new(0),
upstream_hit_count: AtomicUsize::new(0), upstream_hit_count: AtomicUsize::new(0),
already_exists_count: AtomicUsize::new(0), already_exists_count: AtomicUsize::new(0),
@ -81,7 +84,7 @@ impl Push {
let store = self.store.clone(); let store = self.store.clone();
futs.push(tokio::spawn(async move { futs.push(tokio::spawn(async move {
let path_info = PathInfo::from_derivation(path.as_path(), &store) let path_info = PathInfo::from_path(path.as_path(), &store)
.await .await
.context("get path info for path")?; .context("get path info for path")?;
debug!("path-info for {path:?}: {path_info:?}"); debug!("path-info for {path:?}: {path_info:?}");
@ -106,10 +109,9 @@ impl Push {
} }
pub async fn run(&'static self) -> Result<()> { pub async fn run(&'static self) -> Result<()> {
let (tx, rx) = mpsc::channel(1); let (tx, rx) = mpsc::channel(10);
let filter = tokio::spawn(self.filter_from_upstream(tx)); let filter = tokio::spawn(self.filter_from_upstream(tx));
let upload = tokio::spawn(self.upload(rx)); let upload = tokio::spawn(self.upload(rx));
filter.await?; filter.await?;
upload.await??; upload.await??;
Ok(()) Ok(())
@ -117,24 +119,26 @@ impl Push {
/// filter paths that are on upstream and send to `tx` /// filter paths that are on upstream and send to `tx`
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) { async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
let mut handles = Vec::new(); let mut handles = Vec::with_capacity(10);
let store_paths = self.store_paths.read().await.clone(); let store_paths = self.store_paths.read().await.clone();
// limit number of inflight requests
let inflight_permits = Arc::new(Semaphore::new(32));
for path in store_paths.into_iter() { for path in store_paths.into_iter() {
if path.check_upstream_signature(&self.upstream_caches) { if path.check_upstream_signature(&self.upstream_caches) {
debug!("skip {} (signature match)", path.absolute_path()); debug!("skip {} (signature match)", path.absolute_path());
self.signature_hit_count.fetch_add(1, Ordering::Relaxed); self.signature_hit_count.fetch_add(1, Ordering::Release);
continue; continue;
} }
handles.push({ handles.push({
let tx = tx.clone(); let tx = tx.clone();
let inflight_permits = inflight_permits.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _permit = inflight_permits.acquire().await.unwrap(); if !path
if !path.check_upstream_hit(&self.upstream_caches).await { .check_upstream_hit(self.upstream_caches.as_slice())
if path.check_if_already_exists(&self.s3).await { .await
{
if path
.check_if_already_exists(&self.s3_client, self.bucket.clone())
.await
{
debug!("skip {} (already exists)", path.absolute_path()); debug!("skip {} (already exists)", path.absolute_path());
self.already_exists_count.fetch_add(1, Ordering::Relaxed); self.already_exists_count.fetch_add(1, Ordering::Relaxed);
} else { } else {
@ -157,31 +161,21 @@ impl Push {
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> { async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
let mut uploads = Vec::new(); let mut uploads = Vec::new();
let permits = Arc::new(Semaphore::new(10));
loop { loop {
let permits = permits.clone();
if let Some(path_to_upload) = rx.recv().await { if let Some(path_to_upload) = rx.recv().await {
println!("uploading: {}", path_to_upload.absolute_path());
let uploader = Uploader::new(
&self.signing_key,
path_to_upload,
&self.s3_client,
self.bucket.clone(),
)?;
uploads.push(tokio::spawn({ uploads.push(tokio::spawn({
// large uploads will be concurrently uploaded with multipart anyway so don't spawn
// too much of them
let permit = if path_to_upload.nar_size > 15 * 1024 * 1024 {
Some(permits.acquire_owned().await.unwrap())
} else {
None
};
println!(
"uploading: {} (size: {})",
path_to_upload.absolute_path(),
format_size(path_to_upload.nar_size, DECIMAL)
);
let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
let s3 = self.s3.clone();
let store = self.store.clone();
async move { async move {
let res = uploader.upload(s3, store).await; let res = uploader.upload().await;
drop(permit);
self.upload_count.fetch_add(1, Ordering::Relaxed); self.upload_count.fetch_add(1, Ordering::Relaxed);
res res
} }

View file

@ -2,8 +2,7 @@ use std::{ffi::OsStr, os::unix::ffi::OsStrExt, sync::Arc};
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use nix_compat::store_path::StorePath; use nix_compat::store_path::StorePath;
use tokio::{io::AsyncRead, task}; use tokio::task;
use tokio_util::io::StreamReader;
use crate::{ use crate::{
bindings::{self, AsyncWriteAdapter}, bindings::{self, AsyncWriteAdapter},
@ -32,13 +31,13 @@ impl Store {
inner inner
.store() .store()
.compute_fs_closure(path.to_string().as_bytes(), false, true, true)?; .compute_fs_closure(path.to_string().as_bytes(), false, true, true)?;
cxx_vector Ok(cxx_vector
.iter() .iter()
.map(|x| { .map(|x| {
StorePath::from_bytes(x.as_bytes()) StorePath::from_bytes(x.as_bytes())
.context("make StorePath from vector returned by compute_fs_closure") .context("make StorePath from vector returned by compute_fs_closure")
}) })
.collect::<Result<_, _>>() .collect::<Result<_, _>>()?)
}) })
.await .await
.unwrap() .unwrap()
@ -69,32 +68,29 @@ impl Store {
.map(|x| StorePath::from_bytes(x.as_bytes())) .map(|x| StorePath::from_bytes(x.as_bytes()))
.collect::<Result<_, _>>() .collect::<Result<_, _>>()
.context("get references from pathinfo")?; .context("get references from pathinfo")?;
let nar_size = c_path_info.pin_mut().nar_size();
Ok(PathInfo { Ok(PathInfo {
path, path,
signatures, signatures,
references, references,
nar_size,
}) })
}) })
.await .await
.unwrap() .unwrap()
} }
pub fn nar_from_path(&self, store_path: StorePath<String>) -> impl AsyncRead { pub fn stream_nar(&self, path: StorePath<String>) -> AsyncWriteAdapter {
let inner = self.inner.clone(); let inner = self.inner.clone();
let (adapter, mut sender) = AsyncWriteAdapter::new(); let (adapter, mut sender) = AsyncWriteAdapter::new();
let base_name = store_path.to_string().as_bytes().to_vec();
tokio::task::spawn_blocking(move || { task::spawn_blocking(move || {
// Send all exceptions through the channel, and ignore errors if let Err(e) = inner
// during sending (the channel may have been closed). .store()
if let Err(e) = inner.store().nar_from_path(base_name, sender.clone()) { .nar_from_path(path.to_string().as_bytes().to_vec(), sender.clone())
{
let _ = sender.rust_error(e); let _ = sender.rust_error(e);
} }
}); });
adapter
StreamReader::new(adapter)
} }
} }

View file

@ -1,80 +1,260 @@
use std::{collections::BTreeMap, os::unix::fs::PermissionsExt, path::PathBuf};
use anyhow::Result; use anyhow::Result;
use bytes::BytesMut; use async_compression::{Level, tokio::bufread::ZstdEncoder};
use nix_compat::{narinfo::SigningKey, nixbase32}; use aws_sdk_s3::{
use object_store::{ObjectStore, aws::AmazonS3, buffered::BufWriter, path::Path}; self as s3,
use std::sync::Arc; types::{CompletedMultipartUpload, CompletedPart},
use tokio::io::{AsyncReadExt, AsyncWriteExt}; };
use tracing::{debug, trace}; use bytes::{BufMut, Bytes, BytesMut};
use ulid::Ulid; use futures::{future::join_all, stream::TryStreamExt};
use nix_compat::{
nar::writer::r#async as nar,
narinfo::{self, NarInfo, SigningKey},
nixbase32,
store_path::StorePath,
};
use sha2::{Digest, Sha256};
use tokio::{
fs::{File, read_dir, read_link},
io::{AsyncRead, BufReader},
pin,
};
use tokio_stream::wrappers::ReadDirStream;
use tokio_util::io::{InspectReader, read_buf};
use tracing::debug;
use crate::{make_nar::MakeNar, path_info::PathInfo, store::Store}; use crate::{bindings::AsyncWriteAdapter, path_info::PathInfo, store::Store};
const CHUNK_SIZE: usize = 1024 * 1024 * 5; const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5;
pub struct Uploader<'a> { pub struct Uploader<'a> {
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>, signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
path: PathInfo, path: PathInfo,
s3_client: &'a s3::Client,
bucket: String,
store: &'a Store,
} }
impl<'a> Uploader<'a> { impl<'a> Uploader<'a> {
pub fn new( pub fn new(
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>, signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
path: PathInfo, path: PathInfo,
s3_client: &'a s3::Client,
bucket: String,
store: &'a Store,
) -> Result<Self> { ) -> Result<Self> {
Ok(Self { signing_key, path }) Ok(Self {
signing_key,
path,
s3_client,
bucket,
store,
})
} }
pub async fn upload(&self, s3: Arc<AmazonS3>, store: Arc<Store>) -> Result<()> { pub async fn upload(&self) -> Result<()> {
let mut nar = MakeNar::new(&self.path, store)?; let mut nar_temp = File::open(tempfile::Builder::new().tempfile()?.path()).await?;
self.make_nar(&mut nar_temp).await;
// we don't know what the hash of the compressed file will be so upload to a // this goes to the .narinfo file
// temp location for now let mut nar_hasher = Sha256::new();
let temp_path = Path::parse(Ulid::new().to_string())?; // this is the URL for file .narinfo points to
let mut s3_writer = BufWriter::new(s3.clone(), temp_path.clone()); let mut file_hasher = Sha256::new();
debug!("uploading to temp path: {}", temp_path); let nar_reader = compress_and_hash_nar(nar_temp, &mut nar_hasher, &mut file_hasher);
// compress and upload nar let buf = BytesMut::with_capacity(MULTIPART_CUTOFF);
let mut file_reader = nar.compress_and_hash()?; let
loop {
let mut buf = BytesMut::with_capacity(CHUNK_SIZE); if first_chunk.len() < MULTIPART_CUTOFF {
let n = file_reader.read_buf(&mut buf).await?; let put_object = self
s3_writer.put(buf.freeze()).await?; .s3_client
if n == 0 { .put_object()
break; .bucket(&self.bucket)
.key(&nar_url)
.body(first_chunk.into())
.send()
.await?;
debug!("put object: {:#?}", put_object);
} else {
let multipart = self
.s3_client
.create_multipart_upload()
.bucket(&self.bucket)
.key(&nar_url)
.send()
.await?;
let upload_id = multipart.upload_id().unwrap();
let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF);
let chunks = nar.chunks(MULTIPART_CUTOFF);
for (i, chunk) in chunks.enumerate() {
parts.push(tokio::task::spawn(
self.s3_client
.upload_part()
.bucket(&self.bucket)
.key(&nar_url)
.upload_id(upload_id)
.part_number(i as i32 + 1)
.body(chunk.to_vec().into())
.send(),
));
} }
let completed_parts = join_all(parts)
.await
.into_iter()
.flatten()
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.enumerate()
.map(|(i, part)| {
CompletedPart::builder()
.set_e_tag(part.e_tag().map(ToString::to_string))
.set_part_number(Some(i as i32 + 1))
.set_checksum_sha256(part.checksum_sha256().map(ToString::to_string))
.build()
})
.collect::<Vec<_>>();
let completed_mp_upload = CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build();
let complete_mp_upload = self
.s3_client
.complete_multipart_upload()
.bucket(&self.bucket)
.key(&nar_url)
.upload_id(upload_id)
.multipart_upload(completed_mp_upload)
.send()
.await?;
debug!("complete multipart upload: {:#?}", complete_mp_upload);
} }
drop(file_reader);
let mut nar_info = nar.get_narinfo()?; let narinfo_url = format!("{}.narinfo", self.path.digest());
debug!("uploading narinfo with key {narinfo_url}");
self.s3_client
.put_object()
.bucket(&self.bucket)
.key(narinfo_url)
.body(nar_info.to_string().as_bytes().to_vec().into())
.send()
.await?;
debug!("done uploading narinfo");
Ok(())
}
fn narinfo_from_nar(&self, nar: &[u8]) -> Result<NarInfo> {
let mut hasher = Sha256::new();
hasher.update(nar);
let nar_hash: [u8; 32] = hasher.finalize().into();
let mut nar_info = NarInfo {
flags: narinfo::Flags::empty(),
store_path: self.path.path.as_ref(),
nar_hash,
nar_size: nar.len() as u64,
references: self.path.references.iter().map(StorePath::as_ref).collect(),
signatures: Vec::new(),
ca: None,
system: None,
deriver: None,
compression: Some("zstd"),
file_hash: None,
file_size: None,
url: "",
};
// signature consists of: store_path, nar_hash, nar_size, and references
nar_info.add_signature(self.signing_key); nar_info.add_signature(self.signing_key);
Ok(nar_info)
}
// now that we can calculate the file_hash move the nar to where it should be async fn make_nar(&self, sink: &mut File) -> Result<()> {
let real_path = nar_url( let nar = nar::open(sink).await?;
&nar_info let path = self.path.absolute_path();
.file_hash let metadata = File::open(&path).await?.metadata().await?;
.expect("file hash must be known at this point"),
);
debug!("moving {} to {}", temp_path, real_path);
// the temp object must be done uploading
s3_writer.shutdown().await?;
// this is implemented as a copy-and-delete
s3.rename(&temp_path, &real_path).await?;
// set nar url in narinfo
nar_info.url = real_path.as_ref();
// upload narinfo if metadata.is_symlink() {
let narinfo_path = self.path.narinfo_path(); let target = read_link(&path).await?;
debug!("uploading narinfo: {}", narinfo_path); nar.symlink(target.as_os_str().as_encoded_bytes()).await;
trace!("narinfo: {:#}", nar_info); } else if metadata.is_dir() {
s3.put(&narinfo_path, nar_info.to_string().into()).await?; let mut nar = nar.directory().await?;
nar_from_dir(path.into(), &mut nar).await;
nar.close().await;
} else if metadata.is_file() {
let perms = metadata.permissions().mode();
let mut executable = false;
if (perms & 0o700) == 0o700 {
executable = true;
}
let mut file = BufReader::new(File::open(&path).await?);
nar.file(executable, metadata.len(), &mut file).await;
}
Ok(()) Ok(())
} }
} }
/// calculate url where the compressed nar should be uploaded async fn nar_from_dir(path: PathBuf, node: &mut nar::Directory<'_, '_>) -> Result<()> {
fn nar_url(file_hash: &[u8]) -> Path { let root = ReadDirStream::new(read_dir(&path).await?);
let compressed_nar_hash = nixbase32::encode(file_hash); let entries = root
Path::parse(format!("nar/{compressed_nar_hash}.nar.zst")) .map_ok(|x| (x.file_name(), x))
.expect("should parse to a valid object_store::path::Path") .try_collect::<BTreeMap<_, _>>()
.await?;
// directory entries must be written in ascending order of name
for (name, entry) in entries.iter() {
let node = node.entry(name.as_encoded_bytes()).await?;
let metadata = entry.metadata().await?;
if metadata.is_symlink() {
let target = read_link(entry.path()).await?;
node.symlink(target.as_os_str().as_encoded_bytes()).await;
} else if metadata.is_dir() {
let mut node = node.directory().await?;
Box::pin(nar_from_dir(entry.path(), &mut node)).await;
node.close().await;
} else if metadata.is_file() {
let perms = metadata.permissions().mode();
let mut executable = false;
if (perms & 0o700) == 0o700 {
executable = true;
}
let mut file = BufReader::new(File::open(entry.path()).await?);
node.file(executable, metadata.len(), &mut file).await;
}
}
Ok(())
}
fn compress_and_hash_nar(
nar_file: File,
nar_hasher: &mut Sha256,
compressed_nar_hasher: &mut Sha256,
) -> impl AsyncRead {
let nar_reader = InspectReader::new(nar_file, |x| nar_hasher.update(x));
let nar_buf_reader = BufReader::new(nar_reader);
let compressed_nar_reader = ZstdEncoder::with_quality(nar_buf_reader, Level::Default);
InspectReader::new(compressed_nar_reader, |x| compressed_nar_hasher.update(x))
}
fn nar_url(compressed_nar_hash: &[u8]) -> String {
format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash))
}
async fn read_buf_nar<S: AsyncRead + Unpin>(stream: &mut S, mut buf: BytesMut) -> Result<Bytes> {
while buf.len() < buf.capacity() {
let n = read_buf(stream, &mut buf).await?;
if n == 0 {
break;
}
}
Ok(buf.freeze())
} }

View file

@ -1,38 +0,0 @@
#![allow(dead_code)]
use std::process::Command;
use std::sync::Arc;
use nixcp::store::Store;
pub const HELLO: &str = "github:nixos/nixpkgs?ref=f771eb401a46846c1aebd20552521b233dd7e18b#hello";
pub const HELLO_DRV: &str = "/nix/store/iqbwkm8mjjjlmw6x6ry9rhzin2cp9372-hello-2.12.1.drv";
pub const HELLO_PATH: &str = "/nix/store/9bwryidal9q3g91cjm6xschfn4ikd82q-hello-2.12.1";
pub const NIXCP_PKG: &str = "github:cything/nixcp?ref=6cfe67af0e8da502702b31f34a941753e64d9561";
pub const NIXCP_DRV: &str = "/nix/store/ldjvf9qjp980dyvka2hj99q4c0w6901x-nixcp-0.1.0.drv";
pub struct Context {
pub store: Arc<Store>,
}
impl Context {
fn new() -> Self {
// hello must be in the store
ensure_exists(HELLO);
let store = Arc::new(Store::connect().expect("connect to nix store"));
Self { store }
}
}
pub fn context() -> Context {
Context::new()
}
pub fn ensure_exists(pkg: &str) {
Command::new("nix")
.arg("build")
.arg("--no-link")
.arg(pkg)
.status()
.unwrap();
}

View file

@ -1,26 +0,0 @@
use crate::common::HELLO_PATH;
use nix_compat::nixbase32;
use nixcp::make_nar::MakeNar;
use nixcp::path_info::PathInfo;
use sha2::Digest;
use tokio::io::AsyncReadExt;
mod common;
#[tokio::test]
async fn nar_size_and_hash() {
let ctx = common::context();
let path_info = PathInfo::from_path(HELLO_PATH, &ctx.store).await.unwrap();
let mut nar = MakeNar::new(&path_info, ctx.store).unwrap();
let mut reader = nar.compress_and_hash().unwrap();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await.unwrap();
drop(reader);
assert_eq!(nar.nar_size, 234680);
let nar_hash = nar.nar_hasher.finalize();
let real_nar_hash = "08za7nnjda8kpdsd73v3mhykjvp0rsmskwsr37winhmzgm6iw79w";
assert_eq!(nixbase32::encode(nar_hash.as_slice()), real_nar_hash);
}

View file

@ -1,97 +0,0 @@
use nixcp::path_info::PathInfo;
use std::{collections::HashSet, path::PathBuf, process::Command};
use tempfile::TempDir;
use crate::common::{HELLO, HELLO_DRV, HELLO_PATH, NIXCP_DRV, NIXCP_PKG};
mod common;
#[tokio::test]
async fn path_info_from_package() {
let ctx = common::context();
let path = PathBuf::from(HELLO);
let path_info = PathInfo::from_derivation(&path, &ctx.store)
.await
.expect("get pathinfo from package");
assert_eq!(path_info.path.to_absolute_path(), HELLO_DRV);
}
#[tokio::test]
async fn path_info_from_path() {
let ctx = common::context();
let path = PathBuf::from(HELLO_PATH);
let path_info = PathInfo::from_derivation(&path, &ctx.store)
.await
.expect("get pathinfo from package");
assert_eq!(path_info.path.to_absolute_path(), HELLO_DRV);
}
#[tokio::test]
async fn path_info_symlink() {
let ctx = common::context();
let temp_path = TempDir::new().unwrap();
let link_path = temp_path.path().join("result");
// symlink at ./result (like `nix build`)
std::os::unix::fs::symlink(HELLO_PATH, &link_path).unwrap();
// should resolve symlink
let path_info = PathInfo::from_derivation(&link_path, &ctx.store)
.await
.expect("get pathinfo from package");
assert_eq!(path_info.path.to_absolute_path(), HELLO_DRV);
}
#[tokio::test]
async fn closure_includes_nix_store_requisites() {
let ctx = common::context();
let path = PathBuf::from(HELLO);
let path_info = PathInfo::from_derivation(&path, &ctx.store)
.await
.expect("get pathinfo from package");
// get what we think is the closure
let mut closure: HashSet<String> = path_info
.get_closure(&ctx.store)
.await
.unwrap()
.iter()
.map(|x| x.path.to_absolute_path())
.collect();
// for a somewhat more complicated case
common::ensure_exists(NIXCP_PKG);
let path = PathBuf::from(NIXCP_PKG);
let path_info = PathInfo::from_derivation(&path, &ctx.store)
.await
.expect("get pathinfo from package");
closure.extend(
path_info
.get_closure(&ctx.store)
.await
.unwrap()
.iter()
.map(|x| x.path.to_absolute_path()),
);
// get output of `nix-store --query --requisites --include-outputs`
let nix_store_out = Command::new("nix-store")
.arg("--query")
.arg("--requisites")
.arg("--include-outputs")
.arg(HELLO_DRV)
.arg(NIXCP_DRV)
.output()
.unwrap()
.stdout;
assert!(!nix_store_out.is_empty());
let ref_closure = String::from_utf8_lossy(&nix_store_out);
let ref_closure = ref_closure.split_whitespace();
// check that we didn't miss anything nix-store would catch
for path in ref_closure {
assert!(closure.contains(path));
}
}