Compare commits
3 commits
main
...
2025-04-15
Author | SHA1 | Date | |
---|---|---|---|
ac4b2ba136 | |||
4808671071 | |||
a17fa92c78 |
25 changed files with 1529 additions and 1160 deletions
|
@ -1,2 +0,0 @@
|
|||
[build]
|
||||
rustflags = ["--cfg", "tokio_unstable"]
|
|
@ -1,3 +0,0 @@
|
|||
[*.nix]
|
||||
indent_size = 2
|
||||
indent_stype = space
|
73
.github/workflows/build.yml
vendored
73
.github/workflows/build.yml
vendored
|
@ -1,73 +0,0 @@
|
|||
name: build
|
||||
on:
|
||||
workflow_dispatch:
|
||||
push:
|
||||
pull_request:
|
||||
|
||||
env:
|
||||
TERM: ansi
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets. AWS_SECRET_ACCESS_KEY }}
|
||||
AWS_ENDPOINT: https://s3.cy7.sh
|
||||
|
||||
jobs:
|
||||
build-packages:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
os:
|
||||
- ubuntu-latest
|
||||
- ubuntu-24.04-arm
|
||||
- macos-latest # arm64
|
||||
- macos-13 # x86
|
||||
|
||||
runs-on: ${{ matrix.os }}
|
||||
|
||||
steps:
|
||||
- name: setup binary cache key
|
||||
run: echo -n "${{ secrets.NIX_CACHE_SECRET_KEY }}" | xxd -p -r > ${{ runner.temp }}/cache-priv-key.pem
|
||||
|
||||
- name: Install Nix
|
||||
uses: cachix/install-nix-action@526118121621777ccd86f79b04685a9319637641
|
||||
with:
|
||||
enable_kvm: true
|
||||
extra_nix_config: |
|
||||
show-trace = true
|
||||
experimental-features = nix-command flakes
|
||||
secret-key-files = ${{ runner.temp }}/cache-priv-key.pem
|
||||
extra-substituters = https://nixcache.cy7.sh
|
||||
extra-trusted-public-keys = nixcache.cy7.sh:DN3d1dt0wnXfTH03oVmTee4KgmdNdB0NY3SuzA8Fwx8=
|
||||
|
||||
- uses: actions/checkout@85e6279cec87321a52edac9c87bce653a07cf6c2
|
||||
with:
|
||||
persist-credentials: false
|
||||
|
||||
- name: cache devshell
|
||||
run: |
|
||||
nix build .#devShells.$(nix eval --impure --raw --expr 'builtins.currentSystem').default
|
||||
nix run \
|
||||
github:cything/nixcp -- push \
|
||||
--bucket nixcache \
|
||||
--signing-key ${{ runner.temp }}/cache-priv-key.pem \
|
||||
result
|
||||
|
||||
- name: build
|
||||
run: nix build -L .
|
||||
|
||||
- name: cache
|
||||
run: |
|
||||
nix run \
|
||||
github:cything/nixcp -- push \
|
||||
--bucket nixcache \
|
||||
--signing-key ${{ runner.temp }}/cache-priv-key.pem \
|
||||
result
|
||||
|
||||
- name: prepare tarball to upload
|
||||
run: nix run github:nixos/nixpkgs#gnutar hcvf result.tar result
|
||||
|
||||
- name: upload result
|
||||
uses: actions/upload-artifact@6027e3dd177782cd8ab9af838c04fd81a07f1d47
|
||||
with:
|
||||
name: ${{ matrix.os }}.tar
|
||||
path: result.tar
|
||||
if-no-files-found: error
|
27
.github/workflows/check.yml
vendored
27
.github/workflows/check.yml
vendored
|
@ -1,27 +0,0 @@
|
|||
name: check
|
||||
on:
|
||||
workflow_dispatch:
|
||||
push:
|
||||
pull_request:
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Install Nix
|
||||
uses: cachix/install-nix-action@526118121621777ccd86f79b04685a9319637641
|
||||
with:
|
||||
enable_kvm: true
|
||||
extra_nix_config: |
|
||||
show-trace = true
|
||||
experimental-features = nix-command flakes
|
||||
extra-substituters = https://nixcache.cy7.sh
|
||||
extra-trusted-public-keys = nixcache.cy7.sh:DN3d1dt0wnXfTH03oVmTee4KgmdNdB0NY3SuzA8Fwx8=
|
||||
|
||||
- uses: actions/checkout@85e6279cec87321a52edac9c87bce653a07cf6c2
|
||||
with:
|
||||
persist-credentials: false
|
||||
|
||||
- name: Run checks
|
||||
run: nix flake check -L
|
30
.github/workflows/test.yml
vendored
30
.github/workflows/test.yml
vendored
|
@ -1,30 +0,0 @@
|
|||
name: test
|
||||
on:
|
||||
workflow_dispatch:
|
||||
push:
|
||||
pull_request:
|
||||
|
||||
env:
|
||||
CARGO_TERM_COLOR: always
|
||||
|
||||
jobs:
|
||||
test:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Install Nix
|
||||
uses: cachix/install-nix-action@526118121621777ccd86f79b04685a9319637641
|
||||
with:
|
||||
enable_kvm: true
|
||||
extra_nix_config: |
|
||||
show-trace = true
|
||||
experimental-features = nix-command flakes
|
||||
extra-substituters = https://nixcache.cy7.sh
|
||||
extra-trusted-public-keys = nixcache.cy7.sh:DN3d1dt0wnXfTH03oVmTee4KgmdNdB0NY3SuzA8Fwx8=
|
||||
|
||||
- uses: actions/checkout@85e6279cec87321a52edac9c87bce653a07cf6c2
|
||||
with:
|
||||
persist-credentials: false
|
||||
|
||||
- name: Run tests
|
||||
run: nix develop -c cargo test --verbose
|
1499
Cargo.lock
generated
1499
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
23
Cargo.toml
23
Cargo.toml
|
@ -3,37 +3,30 @@ name = "nixcp"
|
|||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[profile.release]
|
||||
lto = true
|
||||
codegen-units = 1
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.97"
|
||||
async-compression = { version = "0.4.22", features = ["tokio", "zstd"] }
|
||||
aws-config = { version = "1.6.1", features = ["behavior-version-latest"] }
|
||||
aws-sdk-s3 = "1.82.0"
|
||||
clap = { version = "4.5.34", features = ["derive"] }
|
||||
ed25519-dalek = "2.1.1"
|
||||
futures = "0.3.31"
|
||||
nix-compat = { git = "https://github.com/tvlfyi/tvix.git", version = "0.1.0" }
|
||||
regex = "1.11.1"
|
||||
reqwest = "0.12.15"
|
||||
serde = { version = "1.0.219", features = ["derive"] }
|
||||
serde = { version = "1.0.219", features = [ "derive" ]}
|
||||
serde_json = "1.0.140"
|
||||
sha2 = "0.10.8"
|
||||
tokio = { version = "1.44.1", features = ["full", "tracing", "parking_lot"] }
|
||||
tokio = { version = "1.44.1", features = [ "full", "tracing", "parking_lot" ]}
|
||||
tracing = "0.1.41"
|
||||
url = { version = "2.5.4", features = ["serde"] }
|
||||
url = { version = "2.5.4", features = [ "serde" ]}
|
||||
cxx = "1.0"
|
||||
console-subscriber = "0.4.1"
|
||||
tokio-util = { version = "0.7.15", features = ["io"] }
|
||||
bytes = "1.10.1"
|
||||
object_store = { version = "0.12.0", features = ["aws"] }
|
||||
ulid = "1.2.1"
|
||||
tracing-subscriber = "0.3.19"
|
||||
humansize = "2.1.3"
|
||||
tokio-stream = { version = "0.1.17", features = ["fs"] }
|
||||
tempfile = "3.19.1"
|
||||
tokio-util = { version = "0.7.14", features = ["io"] }
|
||||
|
||||
[build-dependencies]
|
||||
cxx-build = "1.0"
|
||||
pkg-config = "0.3.32"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3.19.1"
|
||||
|
|
21
LICENSE
21
LICENSE
|
@ -1,21 +0,0 @@
|
|||
MIT License
|
||||
|
||||
Copyright (c) 2025 Cy
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
19
README.md
19
README.md
|
@ -11,13 +11,14 @@ The signing key is generated with:
|
|||
nix-store --generate-binary-cache-key nixcache.cy7.sh cache-priv-key.pem cache-pub-key.pem
|
||||
```
|
||||
|
||||
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables should be set with your s3 credentials.
|
||||
`AWS_ACCESS_KEY_ID` and `AWS_ENDPOINT_URL` environment variables should be set with your s3 credentials.
|
||||
|
||||
```
|
||||
Usage: nixcp push [OPTIONS] --bucket <bucket name> --signing-key <SIGNING_KEY> [PATH]...
|
||||
Usage: nixcp [OPTIONS] --bucket <bucket name> --signing-key <SIGNING_KEY> <COMMAND>
|
||||
|
||||
Arguments:
|
||||
[PATH]... Path to upload e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||
Commands:
|
||||
push
|
||||
help Print this message or the help of the given subcommand(s)
|
||||
|
||||
Options:
|
||||
--bucket <bucket name>
|
||||
|
@ -27,13 +28,15 @@ Options:
|
|||
--signing-key <SIGNING_KEY>
|
||||
Path to the file containing signing key e.g. ~/cache-priv-key.pem
|
||||
--region <REGION>
|
||||
If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
|
||||
If unspecified, will get it form AWS_DEFAULT_REGION envar or the AWS default
|
||||
--endpoint <ENDPOINT>
|
||||
If unspecifed, will get it from AWS_ENDPOINT envar e.g. https://s3.example.com
|
||||
--no-default-upstream
|
||||
Do not include cache.nixos.org as upstream
|
||||
If unspecifed, will get it from AWS_ENDPOINT_URL envar or the AWS default e.g. https://s3.example.com
|
||||
--profile <PROFILE>
|
||||
AWS profile to use
|
||||
-h, --help
|
||||
Print help
|
||||
-V, --version
|
||||
Print version
|
||||
```
|
||||
|
||||
## Install with nix
|
||||
|
|
90
flake.nix
90
flake.nix
|
@ -11,15 +11,8 @@
|
|||
};
|
||||
};
|
||||
|
||||
outputs =
|
||||
inputs@{
|
||||
nixpkgs,
|
||||
flake-utils,
|
||||
crane,
|
||||
...
|
||||
}:
|
||||
flake-utils.lib.eachDefaultSystem (
|
||||
system:
|
||||
outputs = inputs@{ nixpkgs, flake-utils, crane, ... }:
|
||||
flake-utils.lib.eachDefaultSystem (system:
|
||||
let
|
||||
pkgs = import nixpkgs {
|
||||
inherit system;
|
||||
|
@ -28,21 +21,11 @@
|
|||
];
|
||||
};
|
||||
toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;
|
||||
craneLib = (crane.mkLib pkgs).overrideToolchain (_: toolchain);
|
||||
craneLib = (crane.mkLib pkgs).overrideToolchain(_: toolchain);
|
||||
lib = pkgs.lib;
|
||||
|
||||
# don't clean cpp files
|
||||
cppFilter = path: _type: builtins.match ".*(cpp|hpp)$" path != null;
|
||||
cppOrCargo = path: type: (cppFilter path type) || (craneLib.filterCargoSources path type);
|
||||
src = lib.cleanSourceWith {
|
||||
src = ./.;
|
||||
filter = cppOrCargo;
|
||||
name = "source";
|
||||
};
|
||||
|
||||
commonArgs = {
|
||||
inherit src;
|
||||
strictDeps = true;
|
||||
in
|
||||
{
|
||||
devShells.default = pkgs.mkShell {
|
||||
nativeBuildInputs = with pkgs; [
|
||||
pkg-config
|
||||
];
|
||||
|
@ -51,58 +34,23 @@
|
|||
openssl
|
||||
nix
|
||||
boost
|
||||
];
|
||||
# for cpp bindings to work
|
||||
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
|
||||
# skip integration tests (they need a connection to the nix store)
|
||||
cargoTestExtraArgs = "--bins";
|
||||
};
|
||||
|
||||
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
|
||||
nixcp = craneLib.buildPackage (
|
||||
commonArgs
|
||||
// {
|
||||
inherit cargoArtifacts;
|
||||
}
|
||||
);
|
||||
in
|
||||
{
|
||||
checks = {
|
||||
# clippy with all warnings denied
|
||||
clippy = craneLib.cargoClippy (
|
||||
commonArgs
|
||||
// {
|
||||
inherit cargoArtifacts;
|
||||
cargoClippyExtraArgs = "--all-targets -- --deny warnings";
|
||||
}
|
||||
);
|
||||
|
||||
# check formatting
|
||||
cargoFmt = craneLib.cargoFmt {
|
||||
inherit src;
|
||||
};
|
||||
tomlFmt = craneLib.taploFmt {
|
||||
src = lib.sources.sourceFilesBySuffices src [ ".toml" ];
|
||||
};
|
||||
};
|
||||
|
||||
devShells.default = craneLib.devShell {
|
||||
inputsFrom = [ nixcp ];
|
||||
|
||||
RUST_BACKGRACE = 1;
|
||||
# for cpp bindings to work
|
||||
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
|
||||
|
||||
packages = with pkgs; [
|
||||
tokio-console
|
||||
cargo-udeps
|
||||
cargo-audit
|
||||
];
|
||||
NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include";
|
||||
RUST_LOG = "nixcp=debug";
|
||||
RUST_BACKGRACE = 1;
|
||||
};
|
||||
|
||||
formatter = pkgs.nixfmt-rfc-style;
|
||||
|
||||
packages.default = nixcp;
|
||||
packages.default = craneLib.buildPackage {
|
||||
src = craneLib.cleanCargoSource ./.;
|
||||
strictDeps = true;
|
||||
nativeBuildInputs = with pkgs; [
|
||||
pkg-config
|
||||
];
|
||||
buildInputs = with pkgs; [
|
||||
openssl
|
||||
];
|
||||
};
|
||||
}
|
||||
);
|
||||
}
|
||||
|
|
|
@ -1,4 +1,9 @@
|
|||
[toolchain]
|
||||
channel = "nightly"
|
||||
profile = "minimal"
|
||||
components = ["rust-src", "rust-analyzer", "rustfmt", "clippy"]
|
||||
components = [
|
||||
"rust-src",
|
||||
"rust-analyzer",
|
||||
"rustfmt",
|
||||
"clippy",
|
||||
]
|
|
@ -23,7 +23,6 @@ use std::pin::Pin;
|
|||
use std::task::{Context, Poll};
|
||||
|
||||
use anyhow::Result;
|
||||
use bytes::Bytes;
|
||||
use futures::stream::{Stream, StreamExt};
|
||||
use tokio::io::{AsyncWrite, AsyncWriteExt};
|
||||
|
||||
|
@ -126,7 +125,7 @@ impl AsyncWriteAdapter {
|
|||
writer.write_all(&v).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(e.into());
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -140,7 +139,7 @@ impl AsyncWriteAdapter {
|
|||
}
|
||||
|
||||
impl Stream for AsyncWriteAdapter {
|
||||
type Item = std::io::Result<Bytes>;
|
||||
type Item = Result<Vec<u8>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
match self.receiver.poll_recv(cx) {
|
||||
|
@ -148,9 +147,9 @@ impl Stream for AsyncWriteAdapter {
|
|||
Poll::Ready(Some(message)) => {
|
||||
use AsyncWriteMessage::*;
|
||||
match message {
|
||||
Data(v) => Poll::Ready(Some(Ok(v.into()))),
|
||||
Data(v) => Poll::Ready(Some(Ok(v))),
|
||||
Error(exception) => {
|
||||
let error = std::io::Error::other(format!("cxx error: {exception}"));
|
||||
let error = anyhow::Error::msg(format!("cxx error: {exception}"));
|
||||
Poll::Ready(Some(Err(error)))
|
||||
}
|
||||
Eof => {
|
||||
|
@ -161,7 +160,7 @@ impl Stream for AsyncWriteAdapter {
|
|||
}
|
||||
Poll::Ready(None) => {
|
||||
if !self.eof {
|
||||
Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe))))
|
||||
Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe).into())))
|
||||
} else {
|
||||
Poll::Ready(None)
|
||||
}
|
||||
|
@ -209,6 +208,9 @@ mod ffi {
|
|||
include_derivers: bool,
|
||||
) -> Result<UniquePtr<CxxVector<CxxString>>>;
|
||||
|
||||
/// Obtains a handle to the Nix store.
|
||||
fn open_nix_store() -> Result<UniquePtr<CNixStore>>;
|
||||
|
||||
/// Creates a NAR dump from a path.
|
||||
fn nar_from_path(
|
||||
self: Pin<&mut CNixStore>,
|
||||
|
@ -216,17 +218,12 @@ mod ffi {
|
|||
sender: Box<AsyncWriteSender>,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Obtains a handle to the Nix store.
|
||||
fn open_nix_store() -> Result<UniquePtr<CNixStore>>;
|
||||
|
||||
// =========
|
||||
// CPathInfo
|
||||
// =========
|
||||
|
||||
/// Mid-level wrapper for the `nix::ValidPathInfo` struct.
|
||||
type CPathInfo;
|
||||
/// Returns the size of the NAR.
|
||||
fn nar_size(self: Pin<&mut CPathInfo>) -> u64;
|
||||
|
||||
/// Returns the references of the store path.
|
||||
fn references(self: Pin<&mut CPathInfo>) -> UniquePtr<CxxVector<CxxString>>;
|
||||
|
|
|
@ -57,10 +57,6 @@ void RustSink::eof() {
|
|||
|
||||
CPathInfo::CPathInfo(nix::ref<const nix::ValidPathInfo> pi) : pi(pi) {}
|
||||
|
||||
uint64_t CPathInfo::nar_size() {
|
||||
return this->pi->narSize;
|
||||
}
|
||||
|
||||
std::unique_ptr<std::vector<std::string>> CPathInfo::sigs() {
|
||||
std::vector<std::string> result;
|
||||
for (auto&& elem : this->pi->sigs) {
|
||||
|
|
|
@ -65,7 +65,6 @@ public:
|
|||
CPathInfo(nix::ref<const nix::ValidPathInfo> pi);
|
||||
std::unique_ptr<std::vector<std::string>> sigs();
|
||||
std::unique_ptr<std::vector<std::string>> references();
|
||||
uint64_t nar_size();
|
||||
};
|
||||
|
||||
class CNixStore {
|
||||
|
|
2
src/cli.rs
Normal file
2
src/cli.rs
Normal file
|
@ -0,0 +1,2 @@
|
|||
|
||||
|
65
src/lib.rs
65
src/lib.rs
|
@ -1,65 +0,0 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
|
||||
mod bindings;
|
||||
pub mod make_nar;
|
||||
pub mod path_info;
|
||||
pub mod push;
|
||||
pub mod store;
|
||||
mod uploader;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version)]
|
||||
#[command(name = "nixcp")]
|
||||
#[command(about = "Upload store paths to a s3 binary cache")]
|
||||
#[command(long_about = None)]
|
||||
pub struct Cli {
|
||||
#[command(subcommand)]
|
||||
pub command: Commands,
|
||||
|
||||
/// Whether to enable tokio-console
|
||||
#[arg(long)]
|
||||
pub tokio_console: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
pub enum Commands {
|
||||
#[command(arg_required_else_help = true)]
|
||||
Push(PushArgs),
|
||||
}
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
pub struct PushArgs {
|
||||
/// The s3 bucket to upload to
|
||||
#[arg(long, value_name = "bucket name")]
|
||||
bucket: String,
|
||||
|
||||
/// Upstream cache to check against. Can be specified multiple times.
|
||||
/// cache.nixos.org is always included.
|
||||
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
|
||||
upstreams: Vec<String>,
|
||||
|
||||
/// Path to the file containing signing key
|
||||
/// e.g. ~/cache-priv-key.pem
|
||||
#[arg(long)]
|
||||
signing_key: String,
|
||||
|
||||
/// If unspecified, will get it form AWS_DEFAULT_REGION envar or default to us-east-1
|
||||
#[arg(long)]
|
||||
region: Option<String>,
|
||||
|
||||
/// If unspecifed, will get it from AWS_ENDPOINT envar
|
||||
/// e.g. https://s3.example.com
|
||||
#[arg(long)]
|
||||
endpoint: Option<String>,
|
||||
|
||||
/// Do not include cache.nixos.org as upstream
|
||||
#[arg(long)]
|
||||
no_default_upstream: bool,
|
||||
|
||||
/// Path to upload
|
||||
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||
#[arg(value_name = "PATH")]
|
||||
pub paths: Vec<PathBuf>,
|
||||
}
|
99
src/main.rs
99
src/main.rs
|
@ -1,15 +1,80 @@
|
|||
use anyhow::{Context, Result};
|
||||
use clap::Parser;
|
||||
use tracing_subscriber::{EnvFilter, prelude::*};
|
||||
#![feature(let_chains)]
|
||||
#![feature(extend_one)]
|
||||
|
||||
use nixcp::push::Push;
|
||||
use nixcp::store::Store;
|
||||
use nixcp::{Cli, Commands};
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
|
||||
use push::Push;
|
||||
use store::Store;
|
||||
|
||||
mod bindings;
|
||||
mod cli;
|
||||
mod path_info;
|
||||
mod push;
|
||||
mod store;
|
||||
mod uploader;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(version)]
|
||||
#[command(name = "nixcp")]
|
||||
#[command(about = "Upload store paths to a s3 binary cache")]
|
||||
#[command(long_about = None)]
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Commands,
|
||||
}
|
||||
|
||||
#[derive(Debug, Subcommand)]
|
||||
enum Commands {
|
||||
#[command(arg_required_else_help = true)]
|
||||
Push(PushArgs),
|
||||
}
|
||||
|
||||
#[derive(Debug, Args)]
|
||||
pub struct PushArgs {
|
||||
/// The s3 bucket to upload to
|
||||
#[arg(long, value_name = "bucket name")]
|
||||
bucket: String,
|
||||
|
||||
/// Upstream cache to check against. Can be specified multiple times.
|
||||
/// cache.nixos.org is always included.
|
||||
#[arg(long = "upstream", short, value_name = "nixcache.example.com")]
|
||||
upstreams: Vec<String>,
|
||||
|
||||
/// Path to the file containing signing key
|
||||
/// e.g. ~/cache-priv-key.pem
|
||||
#[arg(long)]
|
||||
signing_key: String,
|
||||
|
||||
/// If unspecified, will get it form AWS_DEFAULT_REGION envar
|
||||
#[arg(long)]
|
||||
region: Option<String>,
|
||||
|
||||
/// If unspecifed, will get it from AWS_ENDPOINT_URL envar
|
||||
/// e.g. https://s3.example.com
|
||||
#[arg(long)]
|
||||
endpoint: Option<String>,
|
||||
|
||||
/// AWS profile to use
|
||||
#[arg(long)]
|
||||
profile: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
skip_signature_check: bool,
|
||||
|
||||
/// Path to upload
|
||||
/// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
|
||||
#[arg(value_name = "PATH")]
|
||||
paths: Vec<PathBuf>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
console_subscriber::init();
|
||||
|
||||
let cli = Cli::parse();
|
||||
init_logging(cli.tokio_console);
|
||||
|
||||
match &cli.command {
|
||||
Commands::Push(cli) => {
|
||||
|
@ -24,23 +89,3 @@ async fn main() -> Result<()> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn init_logging(tokio_console: bool) {
|
||||
let env_filter = EnvFilter::from_default_env();
|
||||
let fmt_layer = tracing_subscriber::fmt::layer().with_filter(env_filter);
|
||||
|
||||
let console_layer = if tokio_console {
|
||||
Some(console_subscriber::spawn())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(fmt_layer)
|
||||
.with(console_layer)
|
||||
.init();
|
||||
|
||||
if tokio_console {
|
||||
println!("tokio-console is enabled");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,81 +0,0 @@
|
|||
use anyhow::Result;
|
||||
use async_compression::{Level, tokio::bufread::ZstdEncoder};
|
||||
use nix_compat::{
|
||||
narinfo::{self, NarInfo},
|
||||
store_path::StorePath,
|
||||
};
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::{mem::take, sync::Arc};
|
||||
use tokio::io::{AsyncRead, BufReader};
|
||||
use tokio_util::io::InspectReader;
|
||||
|
||||
use crate::path_info::PathInfo;
|
||||
use crate::store::Store;
|
||||
|
||||
pub struct MakeNar<'a> {
|
||||
path_info: &'a PathInfo,
|
||||
store: Arc<Store>,
|
||||
pub nar_hasher: Sha256,
|
||||
/// hash of compressed nar file
|
||||
file_hasher: Sha256,
|
||||
pub nar_size: u64,
|
||||
file_size: u64,
|
||||
}
|
||||
|
||||
impl<'a> MakeNar<'a> {
|
||||
pub fn new(path_info: &'a PathInfo, store: Arc<Store>) -> Result<Self> {
|
||||
Ok(Self {
|
||||
path_info,
|
||||
store,
|
||||
nar_hasher: Sha256::new(),
|
||||
file_hasher: Sha256::new(),
|
||||
nar_size: 0,
|
||||
file_size: 0,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns a compressed nar reader which can be uploaded. File hash will be available when
|
||||
/// everything is read
|
||||
pub fn compress_and_hash(&mut self) -> Result<impl AsyncRead> {
|
||||
let nar_reader = self.store.nar_from_path(self.path_info.path.clone());
|
||||
// reader that hashes as nar is read
|
||||
let nar_reader = InspectReader::new(nar_reader, |x| {
|
||||
self.nar_size += x.len() as u64;
|
||||
self.nar_hasher.update(x);
|
||||
});
|
||||
|
||||
let encoder = ZstdEncoder::with_quality(BufReader::new(nar_reader), Level::Default);
|
||||
// reader that updates file_hash as the compressed nar is read
|
||||
Ok(InspectReader::new(encoder, |x| {
|
||||
self.file_size += x.len() as u64;
|
||||
self.file_hasher.update(x);
|
||||
}))
|
||||
}
|
||||
|
||||
/// Returns *unsigned* narinfo. `url` must be updated before uploading
|
||||
pub fn get_narinfo(&mut self) -> Result<NarInfo> {
|
||||
let file_hash = take(&mut self.file_hasher).finalize().into();
|
||||
let nar_hash = take(&mut self.nar_hasher).finalize().into();
|
||||
|
||||
Ok(NarInfo {
|
||||
flags: narinfo::Flags::empty(),
|
||||
store_path: self.path_info.path.as_ref(),
|
||||
nar_hash,
|
||||
nar_size: self.nar_size,
|
||||
references: self
|
||||
.path_info
|
||||
.references
|
||||
.iter()
|
||||
.map(StorePath::as_ref)
|
||||
.collect(),
|
||||
signatures: Vec::new(),
|
||||
ca: None,
|
||||
system: None,
|
||||
deriver: None,
|
||||
compression: Some("zstd"),
|
||||
file_hash: Some(file_hash),
|
||||
file_size: Some(self.file_size),
|
||||
url: "",
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,10 +1,10 @@
|
|||
use std::collections::HashSet;
|
||||
|
||||
use anyhow::{Context, Result, anyhow};
|
||||
use anyhow::{Context, Result};
|
||||
use aws_sdk_s3 as s3;
|
||||
use futures::future::join_all;
|
||||
use nix_compat::nixbase32;
|
||||
use nix_compat::store_path::StorePath;
|
||||
use object_store::{ObjectStore, aws::AmazonS3, path::Path as ObjectPath};
|
||||
use regex::Regex;
|
||||
use std::path::Path;
|
||||
use tokio::process::Command;
|
||||
|
@ -13,61 +13,41 @@ use url::Url;
|
|||
|
||||
use crate::store::Store;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PathInfo {
|
||||
pub path: StorePath<String>,
|
||||
pub signatures: Vec<String>,
|
||||
pub references: Vec<StorePath<String>>,
|
||||
pub nar_size: u64,
|
||||
}
|
||||
|
||||
impl PathInfo {
|
||||
pub async fn from_derivation(drv: &Path, store: &Store) -> Result<Self> {
|
||||
debug!("query path info for {:?}", drv);
|
||||
pub async fn from_path(path: &Path, store: &Store) -> Result<Self> {
|
||||
debug!("query path info for {:?}", path);
|
||||
|
||||
let derivation = match drv.extension() {
|
||||
Some(ext) if ext == "drv" => drv.as_os_str().as_encoded_bytes(),
|
||||
let derivation = match path.extension() {
|
||||
Some(ext) if ext == "drv" => path.as_os_str().as_encoded_bytes(),
|
||||
_ => {
|
||||
let drv = {
|
||||
// resolve symlink
|
||||
if drv.is_symlink() {
|
||||
&drv.canonicalize()?
|
||||
} else {
|
||||
drv
|
||||
}
|
||||
};
|
||||
&Command::new("nix")
|
||||
.arg("path-info")
|
||||
.arg("--derivation")
|
||||
.arg(drv)
|
||||
.arg(path)
|
||||
.output()
|
||||
.await
|
||||
.context(format!("run command: nix path-info --derivaiton {drv:?}"))?
|
||||
.context(format!("run command: nix path-info --derivaiton {path:?}"))?
|
||||
.stdout
|
||||
}
|
||||
};
|
||||
let derivation = String::from_utf8_lossy(derivation);
|
||||
debug!("derivation: {derivation}");
|
||||
|
||||
if derivation.is_empty() {
|
||||
return Err(anyhow!(
|
||||
"nix path-info did not return a derivation for {drv:#?}"
|
||||
));
|
||||
}
|
||||
|
||||
Self::from_path(derivation.trim(), store).await
|
||||
}
|
||||
|
||||
pub async fn from_path(path: &str, store: &Store) -> Result<Self> {
|
||||
let store_path =
|
||||
StorePath::from_absolute_path(path.as_bytes()).context("storepath from path")?;
|
||||
let store_path = StorePath::from_absolute_path(derivation.trim().as_bytes())
|
||||
.context("storepath from derivation")?;
|
||||
store
|
||||
.query_path_info(store_path)
|
||||
.await
|
||||
.context("query pathinfo for path")
|
||||
.context("query pathinfo for derivation")
|
||||
}
|
||||
|
||||
// TODO: skip call to query_path_info and return Vec<Path>?
|
||||
pub async fn get_closure(&self, store: &Store) -> Result<Vec<Self>> {
|
||||
let futs = store
|
||||
.compute_fs_closure(self.path.clone())
|
||||
|
@ -102,15 +82,15 @@ impl PathInfo {
|
|||
.filter_map(|signature| Some(signature.split_once(":")?.0))
|
||||
.collect();
|
||||
trace!("signers for {}: {:?}", self.path, signers);
|
||||
signers
|
||||
return signers;
|
||||
}
|
||||
|
||||
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
|
||||
for upstream in upstreams {
|
||||
let upstream = upstream
|
||||
.join(self.narinfo_path().as_ref())
|
||||
.join(format!("{}.narinfo", self.digest()).as_str())
|
||||
.expect("adding <hash>.narinfo should make a valid url");
|
||||
trace!("querying {}", upstream);
|
||||
debug!("querying {}", upstream);
|
||||
let res_status = reqwest::Client::new()
|
||||
.head(upstream.as_str())
|
||||
.send()
|
||||
|
@ -128,12 +108,17 @@ impl PathInfo {
|
|||
self.path.to_absolute_path()
|
||||
}
|
||||
|
||||
pub fn narinfo_path(&self) -> ObjectPath {
|
||||
ObjectPath::parse(format!("{}.narinfo", nixbase32::encode(self.path.digest())))
|
||||
.expect("must parse to a valid object_store path")
|
||||
pub fn digest(&self) -> String {
|
||||
nixbase32::encode(self.path.digest())
|
||||
}
|
||||
|
||||
pub async fn check_if_already_exists(&self, s3: &AmazonS3) -> bool {
|
||||
s3.head(&self.narinfo_path()).await.is_ok()
|
||||
pub async fn check_if_already_exists(&self, s3_client: &s3::Client, bucket: String) -> bool {
|
||||
s3_client
|
||||
.head_object()
|
||||
.bucket(bucket)
|
||||
.key(format!("{}.narinfo", self.digest()))
|
||||
.send()
|
||||
.await
|
||||
.is_ok()
|
||||
}
|
||||
}
|
||||
|
|
94
src/push.rs
94
src/push.rs
|
@ -1,6 +1,6 @@
|
|||
use std::{
|
||||
collections::HashSet,
|
||||
fs,
|
||||
iter::once,
|
||||
path::PathBuf,
|
||||
sync::{
|
||||
Arc,
|
||||
|
@ -9,11 +9,11 @@ use std::{
|
|||
};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use aws_config::Region;
|
||||
use aws_sdk_s3 as s3;
|
||||
use futures::future::join_all;
|
||||
use humansize::{DECIMAL, format_size};
|
||||
use nix_compat::narinfo::{self, SigningKey};
|
||||
use object_store::aws::{AmazonS3, AmazonS3Builder};
|
||||
use tokio::sync::{RwLock, Semaphore, mpsc};
|
||||
use tokio::sync::{RwLock, mpsc};
|
||||
use tracing::debug;
|
||||
use url::Url;
|
||||
|
||||
|
@ -21,10 +21,11 @@ use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
|
|||
|
||||
pub struct Push {
|
||||
upstream_caches: Vec<Url>,
|
||||
store_paths: Arc<RwLock<HashSet<PathInfo>>>,
|
||||
store_paths: Arc<RwLock<Vec<PathInfo>>>,
|
||||
s3_client: s3::Client,
|
||||
signing_key: SigningKey<ed25519_dalek::SigningKey>,
|
||||
bucket: String,
|
||||
store: Arc<Store>,
|
||||
s3: Arc<AmazonS3>,
|
||||
// paths that we skipped cause of a signature match
|
||||
signature_hit_count: AtomicUsize,
|
||||
// paths that we skipped cause we found it on an upstream
|
||||
|
@ -38,13 +39,11 @@ pub struct Push {
|
|||
impl Push {
|
||||
pub async fn new(cli: &PushArgs, store: Store) -> Result<Self> {
|
||||
let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1);
|
||||
if !cli.no_default_upstream {
|
||||
upstreams.push(
|
||||
Url::parse("https://cache.nixos.org")
|
||||
.expect("default upstream must be a valid url"),
|
||||
);
|
||||
}
|
||||
for upstream in &cli.upstreams {
|
||||
for upstream in cli
|
||||
.upstreams
|
||||
.iter()
|
||||
.chain(once(&"https://cache.nixos.org".to_string()))
|
||||
{
|
||||
upstreams
|
||||
.push(Url::parse(upstream).context(format!("failed to parse {upstream} as url"))?);
|
||||
}
|
||||
|
@ -52,21 +51,25 @@ impl Push {
|
|||
let key = fs::read_to_string(&cli.signing_key)?;
|
||||
let signing_key = narinfo::parse_keypair(key.as_str())?.0;
|
||||
|
||||
let mut s3_builder = AmazonS3Builder::from_env().with_bucket_name(&cli.bucket);
|
||||
|
||||
let mut s3_config = aws_config::from_env();
|
||||
if let Some(region) = &cli.region {
|
||||
s3_builder = s3_builder.with_region(region);
|
||||
s3_config = s3_config.region(Region::new(region.clone()));
|
||||
}
|
||||
if let Some(endpoint) = &cli.endpoint {
|
||||
s3_builder = s3_builder.with_endpoint(endpoint);
|
||||
s3_config = s3_config.endpoint_url(endpoint);
|
||||
}
|
||||
if let Some(profile) = &cli.profile {
|
||||
s3_config = s3_config.profile_name(profile);
|
||||
}
|
||||
|
||||
let s3_client = s3::Client::new(&s3_config.load().await);
|
||||
Ok(Self {
|
||||
upstream_caches: upstreams,
|
||||
store_paths: Arc::new(RwLock::new(HashSet::new())),
|
||||
store_paths: Arc::new(RwLock::new(Vec::new())),
|
||||
s3_client,
|
||||
signing_key,
|
||||
bucket: cli.bucket.clone(),
|
||||
store: Arc::new(store),
|
||||
s3: Arc::new(s3_builder.build()?),
|
||||
signature_hit_count: AtomicUsize::new(0),
|
||||
upstream_hit_count: AtomicUsize::new(0),
|
||||
already_exists_count: AtomicUsize::new(0),
|
||||
|
@ -81,7 +84,7 @@ impl Push {
|
|||
let store = self.store.clone();
|
||||
|
||||
futs.push(tokio::spawn(async move {
|
||||
let path_info = PathInfo::from_derivation(path.as_path(), &store)
|
||||
let path_info = PathInfo::from_path(path.as_path(), &store)
|
||||
.await
|
||||
.context("get path info for path")?;
|
||||
debug!("path-info for {path:?}: {path_info:?}");
|
||||
|
@ -106,10 +109,9 @@ impl Push {
|
|||
}
|
||||
|
||||
pub async fn run(&'static self) -> Result<()> {
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
let filter = tokio::spawn(self.filter_from_upstream(tx));
|
||||
let upload = tokio::spawn(self.upload(rx));
|
||||
|
||||
filter.await?;
|
||||
upload.await??;
|
||||
Ok(())
|
||||
|
@ -117,24 +119,26 @@ impl Push {
|
|||
|
||||
/// filter paths that are on upstream and send to `tx`
|
||||
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
|
||||
let mut handles = Vec::new();
|
||||
let mut handles = Vec::with_capacity(10);
|
||||
let store_paths = self.store_paths.read().await.clone();
|
||||
// limit number of inflight requests
|
||||
let inflight_permits = Arc::new(Semaphore::new(32));
|
||||
|
||||
for path in store_paths.into_iter() {
|
||||
if path.check_upstream_signature(&self.upstream_caches) {
|
||||
debug!("skip {} (signature match)", path.absolute_path());
|
||||
self.signature_hit_count.fetch_add(1, Ordering::Relaxed);
|
||||
self.signature_hit_count.fetch_add(1, Ordering::Release);
|
||||
continue;
|
||||
}
|
||||
handles.push({
|
||||
let tx = tx.clone();
|
||||
let inflight_permits = inflight_permits.clone();
|
||||
tokio::spawn(async move {
|
||||
let _permit = inflight_permits.acquire().await.unwrap();
|
||||
if !path.check_upstream_hit(&self.upstream_caches).await {
|
||||
if path.check_if_already_exists(&self.s3).await {
|
||||
if !path
|
||||
.check_upstream_hit(self.upstream_caches.as_slice())
|
||||
.await
|
||||
{
|
||||
if path
|
||||
.check_if_already_exists(&self.s3_client, self.bucket.clone())
|
||||
.await
|
||||
{
|
||||
debug!("skip {} (already exists)", path.absolute_path());
|
||||
self.already_exists_count.fetch_add(1, Ordering::Relaxed);
|
||||
} else {
|
||||
|
@ -157,31 +161,21 @@ impl Push {
|
|||
|
||||
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||
let mut uploads = Vec::new();
|
||||
let permits = Arc::new(Semaphore::new(10));
|
||||
|
||||
loop {
|
||||
let permits = permits.clone();
|
||||
|
||||
if let Some(path_to_upload) = rx.recv().await {
|
||||
println!("uploading: {}", path_to_upload.absolute_path());
|
||||
|
||||
let uploader = Uploader::new(
|
||||
&self.signing_key,
|
||||
path_to_upload,
|
||||
&self.s3_client,
|
||||
self.bucket.clone(),
|
||||
)?;
|
||||
|
||||
uploads.push(tokio::spawn({
|
||||
// large uploads will be concurrently uploaded with multipart anyway so don't spawn
|
||||
// too much of them
|
||||
let permit = if path_to_upload.nar_size > 15 * 1024 * 1024 {
|
||||
Some(permits.acquire_owned().await.unwrap())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
println!(
|
||||
"uploading: {} (size: {})",
|
||||
path_to_upload.absolute_path(),
|
||||
format_size(path_to_upload.nar_size, DECIMAL)
|
||||
);
|
||||
let uploader = Uploader::new(&self.signing_key, path_to_upload)?;
|
||||
let s3 = self.s3.clone();
|
||||
let store = self.store.clone();
|
||||
async move {
|
||||
let res = uploader.upload(s3, store).await;
|
||||
drop(permit);
|
||||
let res = uploader.upload().await;
|
||||
self.upload_count.fetch_add(1, Ordering::Relaxed);
|
||||
res
|
||||
}
|
||||
|
|
24
src/store.rs
24
src/store.rs
|
@ -2,8 +2,7 @@ use std::{ffi::OsStr, os::unix::ffi::OsStrExt, sync::Arc};
|
|||
|
||||
use anyhow::{Context, Result};
|
||||
use nix_compat::store_path::StorePath;
|
||||
use tokio::{io::AsyncRead, task};
|
||||
use tokio_util::io::StreamReader;
|
||||
use tokio::task;
|
||||
|
||||
use crate::{
|
||||
bindings::{self, AsyncWriteAdapter},
|
||||
|
@ -32,13 +31,13 @@ impl Store {
|
|||
inner
|
||||
.store()
|
||||
.compute_fs_closure(path.to_string().as_bytes(), false, true, true)?;
|
||||
cxx_vector
|
||||
Ok(cxx_vector
|
||||
.iter()
|
||||
.map(|x| {
|
||||
StorePath::from_bytes(x.as_bytes())
|
||||
.context("make StorePath from vector returned by compute_fs_closure")
|
||||
})
|
||||
.collect::<Result<_, _>>()
|
||||
.collect::<Result<_, _>>()?)
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
|
@ -69,32 +68,29 @@ impl Store {
|
|||
.map(|x| StorePath::from_bytes(x.as_bytes()))
|
||||
.collect::<Result<_, _>>()
|
||||
.context("get references from pathinfo")?;
|
||||
let nar_size = c_path_info.pin_mut().nar_size();
|
||||
|
||||
Ok(PathInfo {
|
||||
path,
|
||||
signatures,
|
||||
references,
|
||||
nar_size,
|
||||
})
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn nar_from_path(&self, store_path: StorePath<String>) -> impl AsyncRead {
|
||||
pub fn stream_nar(&self, path: StorePath<String>) -> AsyncWriteAdapter {
|
||||
let inner = self.inner.clone();
|
||||
let (adapter, mut sender) = AsyncWriteAdapter::new();
|
||||
let base_name = store_path.to_string().as_bytes().to_vec();
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
// Send all exceptions through the channel, and ignore errors
|
||||
// during sending (the channel may have been closed).
|
||||
if let Err(e) = inner.store().nar_from_path(base_name, sender.clone()) {
|
||||
task::spawn_blocking(move || {
|
||||
if let Err(e) = inner
|
||||
.store()
|
||||
.nar_from_path(path.to_string().as_bytes().to_vec(), sender.clone())
|
||||
{
|
||||
let _ = sender.rust_error(e);
|
||||
}
|
||||
});
|
||||
|
||||
StreamReader::new(adapter)
|
||||
adapter
|
||||
}
|
||||
}
|
||||
|
|
280
src/uploader.rs
280
src/uploader.rs
|
@ -1,80 +1,260 @@
|
|||
use std::{collections::BTreeMap, os::unix::fs::PermissionsExt, path::PathBuf};
|
||||
|
||||
use anyhow::Result;
|
||||
use bytes::BytesMut;
|
||||
use nix_compat::{narinfo::SigningKey, nixbase32};
|
||||
use object_store::{ObjectStore, aws::AmazonS3, buffered::BufWriter, path::Path};
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tracing::{debug, trace};
|
||||
use ulid::Ulid;
|
||||
use async_compression::{Level, tokio::bufread::ZstdEncoder};
|
||||
use aws_sdk_s3::{
|
||||
self as s3,
|
||||
types::{CompletedMultipartUpload, CompletedPart},
|
||||
};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use futures::{future::join_all, stream::TryStreamExt};
|
||||
use nix_compat::{
|
||||
nar::writer::r#async as nar,
|
||||
narinfo::{self, NarInfo, SigningKey},
|
||||
nixbase32,
|
||||
store_path::StorePath,
|
||||
};
|
||||
use sha2::{Digest, Sha256};
|
||||
use tokio::{
|
||||
fs::{File, read_dir, read_link},
|
||||
io::{AsyncRead, BufReader},
|
||||
pin,
|
||||
};
|
||||
use tokio_stream::wrappers::ReadDirStream;
|
||||
use tokio_util::io::{InspectReader, read_buf};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{make_nar::MakeNar, path_info::PathInfo, store::Store};
|
||||
use crate::{bindings::AsyncWriteAdapter, path_info::PathInfo, store::Store};
|
||||
|
||||
const CHUNK_SIZE: usize = 1024 * 1024 * 5;
|
||||
const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5;
|
||||
|
||||
pub struct Uploader<'a> {
|
||||
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
|
||||
path: PathInfo,
|
||||
s3_client: &'a s3::Client,
|
||||
bucket: String,
|
||||
store: &'a Store,
|
||||
}
|
||||
|
||||
impl<'a> Uploader<'a> {
|
||||
pub fn new(
|
||||
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
|
||||
path: PathInfo,
|
||||
s3_client: &'a s3::Client,
|
||||
bucket: String,
|
||||
store: &'a Store,
|
||||
) -> Result<Self> {
|
||||
Ok(Self { signing_key, path })
|
||||
Ok(Self {
|
||||
signing_key,
|
||||
path,
|
||||
s3_client,
|
||||
bucket,
|
||||
store,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn upload(&self, s3: Arc<AmazonS3>, store: Arc<Store>) -> Result<()> {
|
||||
let mut nar = MakeNar::new(&self.path, store)?;
|
||||
pub async fn upload(&self) -> Result<()> {
|
||||
let mut nar_temp = File::open(tempfile::Builder::new().tempfile()?.path()).await?;
|
||||
self.make_nar(&mut nar_temp).await;
|
||||
|
||||
// we don't know what the hash of the compressed file will be so upload to a
|
||||
// temp location for now
|
||||
let temp_path = Path::parse(Ulid::new().to_string())?;
|
||||
let mut s3_writer = BufWriter::new(s3.clone(), temp_path.clone());
|
||||
debug!("uploading to temp path: {}", temp_path);
|
||||
// this goes to the .narinfo file
|
||||
let mut nar_hasher = Sha256::new();
|
||||
// this is the URL for file .narinfo points to
|
||||
let mut file_hasher = Sha256::new();
|
||||
let nar_reader = compress_and_hash_nar(nar_temp, &mut nar_hasher, &mut file_hasher);
|
||||
|
||||
// compress and upload nar
|
||||
let mut file_reader = nar.compress_and_hash()?;
|
||||
loop {
|
||||
let mut buf = BytesMut::with_capacity(CHUNK_SIZE);
|
||||
let n = file_reader.read_buf(&mut buf).await?;
|
||||
s3_writer.put(buf.freeze()).await?;
|
||||
if n == 0 {
|
||||
break;
|
||||
let buf = BytesMut::with_capacity(MULTIPART_CUTOFF);
|
||||
let
|
||||
|
||||
if first_chunk.len() < MULTIPART_CUTOFF {
|
||||
let put_object = self
|
||||
.s3_client
|
||||
.put_object()
|
||||
.bucket(&self.bucket)
|
||||
.key(&nar_url)
|
||||
.body(first_chunk.into())
|
||||
.send()
|
||||
.await?;
|
||||
debug!("put object: {:#?}", put_object);
|
||||
} else {
|
||||
let multipart = self
|
||||
.s3_client
|
||||
.create_multipart_upload()
|
||||
.bucket(&self.bucket)
|
||||
.key(&nar_url)
|
||||
.send()
|
||||
.await?;
|
||||
let upload_id = multipart.upload_id().unwrap();
|
||||
|
||||
let mut parts = Vec::with_capacity(nar.len() / MULTIPART_CUTOFF);
|
||||
let chunks = nar.chunks(MULTIPART_CUTOFF);
|
||||
for (i, chunk) in chunks.enumerate() {
|
||||
parts.push(tokio::task::spawn(
|
||||
self.s3_client
|
||||
.upload_part()
|
||||
.bucket(&self.bucket)
|
||||
.key(&nar_url)
|
||||
.upload_id(upload_id)
|
||||
.part_number(i as i32 + 1)
|
||||
.body(chunk.to_vec().into())
|
||||
.send(),
|
||||
));
|
||||
}
|
||||
|
||||
let completed_parts = join_all(parts)
|
||||
.await
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
.map(|(i, part)| {
|
||||
CompletedPart::builder()
|
||||
.set_e_tag(part.e_tag().map(ToString::to_string))
|
||||
.set_part_number(Some(i as i32 + 1))
|
||||
.set_checksum_sha256(part.checksum_sha256().map(ToString::to_string))
|
||||
.build()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let completed_mp_upload = CompletedMultipartUpload::builder()
|
||||
.set_parts(Some(completed_parts))
|
||||
.build();
|
||||
|
||||
let complete_mp_upload = self
|
||||
.s3_client
|
||||
.complete_multipart_upload()
|
||||
.bucket(&self.bucket)
|
||||
.key(&nar_url)
|
||||
.upload_id(upload_id)
|
||||
.multipart_upload(completed_mp_upload)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
debug!("complete multipart upload: {:#?}", complete_mp_upload);
|
||||
}
|
||||
drop(file_reader);
|
||||
|
||||
let mut nar_info = nar.get_narinfo()?;
|
||||
let narinfo_url = format!("{}.narinfo", self.path.digest());
|
||||
debug!("uploading narinfo with key {narinfo_url}");
|
||||
self.s3_client
|
||||
.put_object()
|
||||
.bucket(&self.bucket)
|
||||
.key(narinfo_url)
|
||||
.body(nar_info.to_string().as_bytes().to_vec().into())
|
||||
.send()
|
||||
.await?;
|
||||
debug!("done uploading narinfo");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn narinfo_from_nar(&self, nar: &[u8]) -> Result<NarInfo> {
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(nar);
|
||||
let nar_hash: [u8; 32] = hasher.finalize().into();
|
||||
let mut nar_info = NarInfo {
|
||||
flags: narinfo::Flags::empty(),
|
||||
store_path: self.path.path.as_ref(),
|
||||
nar_hash,
|
||||
nar_size: nar.len() as u64,
|
||||
references: self.path.references.iter().map(StorePath::as_ref).collect(),
|
||||
signatures: Vec::new(),
|
||||
ca: None,
|
||||
system: None,
|
||||
deriver: None,
|
||||
compression: Some("zstd"),
|
||||
file_hash: None,
|
||||
file_size: None,
|
||||
url: "",
|
||||
};
|
||||
// signature consists of: store_path, nar_hash, nar_size, and references
|
||||
nar_info.add_signature(self.signing_key);
|
||||
Ok(nar_info)
|
||||
}
|
||||
|
||||
// now that we can calculate the file_hash move the nar to where it should be
|
||||
let real_path = nar_url(
|
||||
&nar_info
|
||||
.file_hash
|
||||
.expect("file hash must be known at this point"),
|
||||
);
|
||||
debug!("moving {} to {}", temp_path, real_path);
|
||||
// the temp object must be done uploading
|
||||
s3_writer.shutdown().await?;
|
||||
// this is implemented as a copy-and-delete
|
||||
s3.rename(&temp_path, &real_path).await?;
|
||||
// set nar url in narinfo
|
||||
nar_info.url = real_path.as_ref();
|
||||
async fn make_nar(&self, sink: &mut File) -> Result<()> {
|
||||
let nar = nar::open(sink).await?;
|
||||
let path = self.path.absolute_path();
|
||||
let metadata = File::open(&path).await?.metadata().await?;
|
||||
|
||||
// upload narinfo
|
||||
let narinfo_path = self.path.narinfo_path();
|
||||
debug!("uploading narinfo: {}", narinfo_path);
|
||||
trace!("narinfo: {:#}", nar_info);
|
||||
s3.put(&narinfo_path, nar_info.to_string().into()).await?;
|
||||
if metadata.is_symlink() {
|
||||
let target = read_link(&path).await?;
|
||||
nar.symlink(target.as_os_str().as_encoded_bytes()).await;
|
||||
} else if metadata.is_dir() {
|
||||
let mut nar = nar.directory().await?;
|
||||
nar_from_dir(path.into(), &mut nar).await;
|
||||
nar.close().await;
|
||||
} else if metadata.is_file() {
|
||||
let perms = metadata.permissions().mode();
|
||||
let mut executable = false;
|
||||
if (perms & 0o700) == 0o700 {
|
||||
executable = true;
|
||||
}
|
||||
|
||||
let mut file = BufReader::new(File::open(&path).await?);
|
||||
nar.file(executable, metadata.len(), &mut file).await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// calculate url where the compressed nar should be uploaded
|
||||
fn nar_url(file_hash: &[u8]) -> Path {
|
||||
let compressed_nar_hash = nixbase32::encode(file_hash);
|
||||
Path::parse(format!("nar/{compressed_nar_hash}.nar.zst"))
|
||||
.expect("should parse to a valid object_store::path::Path")
|
||||
async fn nar_from_dir(path: PathBuf, node: &mut nar::Directory<'_, '_>) -> Result<()> {
|
||||
let root = ReadDirStream::new(read_dir(&path).await?);
|
||||
let entries = root
|
||||
.map_ok(|x| (x.file_name(), x))
|
||||
.try_collect::<BTreeMap<_, _>>()
|
||||
.await?;
|
||||
|
||||
// directory entries must be written in ascending order of name
|
||||
for (name, entry) in entries.iter() {
|
||||
let node = node.entry(name.as_encoded_bytes()).await?;
|
||||
let metadata = entry.metadata().await?;
|
||||
|
||||
if metadata.is_symlink() {
|
||||
let target = read_link(entry.path()).await?;
|
||||
node.symlink(target.as_os_str().as_encoded_bytes()).await;
|
||||
} else if metadata.is_dir() {
|
||||
let mut node = node.directory().await?;
|
||||
Box::pin(nar_from_dir(entry.path(), &mut node)).await;
|
||||
node.close().await;
|
||||
} else if metadata.is_file() {
|
||||
let perms = metadata.permissions().mode();
|
||||
let mut executable = false;
|
||||
if (perms & 0o700) == 0o700 {
|
||||
executable = true;
|
||||
}
|
||||
|
||||
let mut file = BufReader::new(File::open(entry.path()).await?);
|
||||
node.file(executable, metadata.len(), &mut file).await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn compress_and_hash_nar(
|
||||
nar_file: File,
|
||||
nar_hasher: &mut Sha256,
|
||||
compressed_nar_hasher: &mut Sha256,
|
||||
) -> impl AsyncRead {
|
||||
let nar_reader = InspectReader::new(nar_file, |x| nar_hasher.update(x));
|
||||
let nar_buf_reader = BufReader::new(nar_reader);
|
||||
|
||||
let compressed_nar_reader = ZstdEncoder::with_quality(nar_buf_reader, Level::Default);
|
||||
InspectReader::new(compressed_nar_reader, |x| compressed_nar_hasher.update(x))
|
||||
}
|
||||
|
||||
fn nar_url(compressed_nar_hash: &[u8]) -> String {
|
||||
format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash))
|
||||
}
|
||||
|
||||
async fn read_buf_nar<S: AsyncRead + Unpin>(stream: &mut S, mut buf: BytesMut) -> Result<Bytes> {
|
||||
while buf.len() < buf.capacity() {
|
||||
let n = read_buf(stream, &mut buf).await?;
|
||||
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(buf.freeze())
|
||||
}
|
||||
|
|
|
@ -1,38 +0,0 @@
|
|||
#![allow(dead_code)]
|
||||
|
||||
use std::process::Command;
|
||||
use std::sync::Arc;
|
||||
|
||||
use nixcp::store::Store;
|
||||
|
||||
pub const HELLO: &str = "github:nixos/nixpkgs?ref=f771eb401a46846c1aebd20552521b233dd7e18b#hello";
|
||||
pub const HELLO_DRV: &str = "/nix/store/iqbwkm8mjjjlmw6x6ry9rhzin2cp9372-hello-2.12.1.drv";
|
||||
pub const HELLO_PATH: &str = "/nix/store/9bwryidal9q3g91cjm6xschfn4ikd82q-hello-2.12.1";
|
||||
pub const NIXCP_PKG: &str = "github:cything/nixcp?ref=6cfe67af0e8da502702b31f34a941753e64d9561";
|
||||
pub const NIXCP_DRV: &str = "/nix/store/ldjvf9qjp980dyvka2hj99q4c0w6901x-nixcp-0.1.0.drv";
|
||||
|
||||
pub struct Context {
|
||||
pub store: Arc<Store>,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
fn new() -> Self {
|
||||
// hello must be in the store
|
||||
ensure_exists(HELLO);
|
||||
let store = Arc::new(Store::connect().expect("connect to nix store"));
|
||||
Self { store }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn context() -> Context {
|
||||
Context::new()
|
||||
}
|
||||
|
||||
pub fn ensure_exists(pkg: &str) {
|
||||
Command::new("nix")
|
||||
.arg("build")
|
||||
.arg("--no-link")
|
||||
.arg(pkg)
|
||||
.status()
|
||||
.unwrap();
|
||||
}
|
26
tests/nar.rs
26
tests/nar.rs
|
@ -1,26 +0,0 @@
|
|||
use crate::common::HELLO_PATH;
|
||||
use nix_compat::nixbase32;
|
||||
use nixcp::make_nar::MakeNar;
|
||||
use nixcp::path_info::PathInfo;
|
||||
use sha2::Digest;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
mod common;
|
||||
|
||||
#[tokio::test]
|
||||
async fn nar_size_and_hash() {
|
||||
let ctx = common::context();
|
||||
let path_info = PathInfo::from_path(HELLO_PATH, &ctx.store).await.unwrap();
|
||||
|
||||
let mut nar = MakeNar::new(&path_info, ctx.store).unwrap();
|
||||
let mut reader = nar.compress_and_hash().unwrap();
|
||||
let mut buf = Vec::new();
|
||||
reader.read_to_end(&mut buf).await.unwrap();
|
||||
drop(reader);
|
||||
|
||||
assert_eq!(nar.nar_size, 234680);
|
||||
|
||||
let nar_hash = nar.nar_hasher.finalize();
|
||||
let real_nar_hash = "08za7nnjda8kpdsd73v3mhykjvp0rsmskwsr37winhmzgm6iw79w";
|
||||
assert_eq!(nixbase32::encode(nar_hash.as_slice()), real_nar_hash);
|
||||
}
|
|
@ -1,97 +0,0 @@
|
|||
use nixcp::path_info::PathInfo;
|
||||
use std::{collections::HashSet, path::PathBuf, process::Command};
|
||||
|
||||
use tempfile::TempDir;
|
||||
|
||||
use crate::common::{HELLO, HELLO_DRV, HELLO_PATH, NIXCP_DRV, NIXCP_PKG};
|
||||
|
||||
mod common;
|
||||
|
||||
#[tokio::test]
|
||||
async fn path_info_from_package() {
|
||||
let ctx = common::context();
|
||||
let path = PathBuf::from(HELLO);
|
||||
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
assert_eq!(path_info.path.to_absolute_path(), HELLO_DRV);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn path_info_from_path() {
|
||||
let ctx = common::context();
|
||||
let path = PathBuf::from(HELLO_PATH);
|
||||
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
assert_eq!(path_info.path.to_absolute_path(), HELLO_DRV);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn path_info_symlink() {
|
||||
let ctx = common::context();
|
||||
|
||||
let temp_path = TempDir::new().unwrap();
|
||||
let link_path = temp_path.path().join("result");
|
||||
|
||||
// symlink at ./result (like `nix build`)
|
||||
std::os::unix::fs::symlink(HELLO_PATH, &link_path).unwrap();
|
||||
|
||||
// should resolve symlink
|
||||
let path_info = PathInfo::from_derivation(&link_path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
assert_eq!(path_info.path.to_absolute_path(), HELLO_DRV);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn closure_includes_nix_store_requisites() {
|
||||
let ctx = common::context();
|
||||
let path = PathBuf::from(HELLO);
|
||||
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
|
||||
// get what we think is the closure
|
||||
let mut closure: HashSet<String> = path_info
|
||||
.get_closure(&ctx.store)
|
||||
.await
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|x| x.path.to_absolute_path())
|
||||
.collect();
|
||||
|
||||
// for a somewhat more complicated case
|
||||
common::ensure_exists(NIXCP_PKG);
|
||||
let path = PathBuf::from(NIXCP_PKG);
|
||||
let path_info = PathInfo::from_derivation(&path, &ctx.store)
|
||||
.await
|
||||
.expect("get pathinfo from package");
|
||||
closure.extend(
|
||||
path_info
|
||||
.get_closure(&ctx.store)
|
||||
.await
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|x| x.path.to_absolute_path()),
|
||||
);
|
||||
|
||||
// get output of `nix-store --query --requisites --include-outputs`
|
||||
let nix_store_out = Command::new("nix-store")
|
||||
.arg("--query")
|
||||
.arg("--requisites")
|
||||
.arg("--include-outputs")
|
||||
.arg(HELLO_DRV)
|
||||
.arg(NIXCP_DRV)
|
||||
.output()
|
||||
.unwrap()
|
||||
.stdout;
|
||||
assert!(!nix_store_out.is_empty());
|
||||
let ref_closure = String::from_utf8_lossy(&nix_store_out);
|
||||
let ref_closure = ref_closure.split_whitespace();
|
||||
|
||||
// check that we didn't miss anything nix-store would catch
|
||||
for path in ref_closure {
|
||||
assert!(closure.contains(path));
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue