From c956d6741a802b10f0dff1620716da05ac881e8a Mon Sep 17 00:00:00 2001 From: cy Date: Tue, 15 Apr 2025 23:49:34 -0400 Subject: [PATCH 1/9] bring code from attic --- src/bindings/mod.rs | 270 +++++++++++++++++++++++++++++++++++++++++++ src/bindings/nix.cpp | 173 +++++++++++++++++++++++++++ src/bindings/nix.hpp | 96 +++++++++++++++ 3 files changed, 539 insertions(+) create mode 100644 src/bindings/mod.rs create mode 100644 src/bindings/nix.cpp create mode 100644 src/bindings/nix.hpp diff --git a/src/bindings/mod.rs b/src/bindings/mod.rs new file mode 100644 index 0000000..521df2e --- /dev/null +++ b/src/bindings/mod.rs @@ -0,0 +1,270 @@ +/* +Copyright 2022 Zhaofeng Li and the Attic contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +//! `libnixstore` Bindings + +use std::cell::UnsafeCell; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::stream::{Stream, StreamExt}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +use crate::{AtticError, AtticResult}; + +// The C++ implementation takes care of concurrency +#[repr(transparent)] +pub struct FfiNixStore(UnsafeCell>); + +unsafe impl Send for FfiNixStore {} +unsafe impl Sync for FfiNixStore {} + +impl FfiNixStore { + pub fn store(&self) -> Pin<&mut ffi::CNixStore> { + unsafe { + let ptr = self.0.get().as_mut().unwrap(); + ptr.pin_mut() + } + } +} + +/// Obtain a handle to the Nix store. +pub unsafe fn open_nix_store() -> AtticResult { + match ffi::open_nix_store() { + Ok(ptr) => { + let cell = UnsafeCell::new(ptr); + Ok(FfiNixStore(cell)) + } + Err(e) => Err(e.into()), + } +} + +// TODO: Benchmark different implementations +// (tokio, crossbeam, flume) +mod mpsc { + // Tokio + pub use tokio::sync::mpsc::{ + UnboundedReceiver, UnboundedSender, error::SendError, unbounded_channel, + }; +} + +/// Async write request. +#[derive(Debug)] +enum AsyncWriteMessage { + Data(Vec), + Error(String), + Eof, +} + +/// Async write request sender. +#[derive(Clone)] +pub struct AsyncWriteSender { + sender: mpsc::UnboundedSender, +} + +impl AsyncWriteSender { + fn send(&mut self, data: &[u8]) -> Result<(), mpsc::SendError> { + let message = AsyncWriteMessage::Data(Vec::from(data)); + self.sender.send(message) + } + + fn eof(&mut self) -> Result<(), mpsc::SendError> { + let message = AsyncWriteMessage::Eof; + self.sender.send(message) + } + + pub(crate) fn rust_error( + &mut self, + error: impl std::error::Error, + ) -> Result<(), impl std::error::Error> { + let message = AsyncWriteMessage::Error(error.to_string()); + self.sender.send(message) + } +} + +/// A wrapper of the `AsyncWrite` trait for the synchronous Nix C++ land. +pub struct AsyncWriteAdapter { + receiver: mpsc::UnboundedReceiver, + eof: bool, +} + +impl AsyncWriteAdapter { + pub fn new() -> (Self, Box) { + let (sender, receiver) = mpsc::unbounded_channel(); + + let r = Self { + receiver, + eof: false, + }; + let sender = Box::new(AsyncWriteSender { sender }); + + (r, sender) + } + + /// Write everything the sender sends to us. + pub async fn write_all(mut self, mut writer: Box) -> AtticResult<()> { + let writer = writer.as_mut(); + + while let Some(data) = self.next().await { + match data { + Ok(v) => { + writer.write_all(&v).await?; + } + Err(e) => { + return Err(e); + } + } + } + + if !self.eof { + Err(io::Error::from(io::ErrorKind::BrokenPipe).into()) + } else { + Ok(()) + } + } +} + +impl Stream for AsyncWriteAdapter { + type Item = AtticResult>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.receiver.poll_recv(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(message)) => { + use AsyncWriteMessage::*; + match message { + Data(v) => Poll::Ready(Some(Ok(v))), + Error(exception) => { + let error = AtticError::CxxError { exception }; + Poll::Ready(Some(Err(error))) + } + Eof => { + self.eof = true; + Poll::Ready(None) + } + } + } + Poll::Ready(None) => { + if !self.eof { + Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe).into()))) + } else { + Poll::Ready(None) + } + } + } + } +} + +#[cxx::bridge] +/// Generated by `cxx.rs`. +/// +/// Mid-level wrapper of `libnixstore` implemented in C++. +mod ffi { + extern "Rust" { + type AsyncWriteSender; + fn send(self: &mut AsyncWriteSender, data: &[u8]) -> Result<()>; + fn eof(self: &mut AsyncWriteSender) -> Result<()>; + } + + unsafe extern "C++" { + include!("attic/src/nix_store/bindings/nix.hpp"); + + // ========= + // CNixStore + // ========= + + /// Mid-level wrapper for the Unix Domain Socket Nix Store. + type CNixStore; + + /// Returns the path of the Nix store itself. + fn store_dir(self: Pin<&mut CNixStore>) -> String; + + /* + /// Verifies that a path is indeed in the Nix store, then return the base store path. + /// + /// Use parse_store_path instead. + fn to_store_path(self: Pin<&mut CNixStore>, path: &str) -> Result; + */ + + /// Queries information about a valid path. + fn query_path_info( + self: Pin<&mut CNixStore>, + store_path: &[u8], + ) -> Result>; + + /// Computes the closure of a valid path. + /// + /// If `flip_directions` is true, the set of paths that can reach `store_path` is + /// returned. + fn compute_fs_closure( + self: Pin<&mut CNixStore>, + store_path: &[u8], + flip_direction: bool, + include_outputs: bool, + include_derivers: bool, + ) -> Result>>; + + /// Computes the closure of a list of valid paths. + /// + /// This is the multi-path variant of `compute_fs_closure`. + /// If `flip_directions` is true, the set of paths that can reach `store_path` is + /// returned. + /// + /// It's easier and more efficient to just pass a vector of slices + /// instead of wrangling with concrete "extern rust" / "extern C++" + /// types. + fn compute_fs_closure_multi( + self: Pin<&mut CNixStore>, + base_names: &[&[u8]], + flip_direction: bool, + include_outputs: bool, + include_derivers: bool, + ) -> Result>>; + + /// Creates a NAR dump from a path. + fn nar_from_path( + self: Pin<&mut CNixStore>, + base_name: Vec, + sender: Box, + ) -> Result<()>; + + /// Obtains a handle to the Nix store. + fn open_nix_store() -> Result>; + + // ========= + // CPathInfo + // ========= + + /// Mid-level wrapper for the `nix::ValidPathInfo` struct. + type CPathInfo; + + /// Returns the SHA-256 hash of the store path. + fn nar_sha256_hash(self: Pin<&mut CPathInfo>) -> &[u8]; + + /// Returns the size of the NAR. + fn nar_size(self: Pin<&mut CPathInfo>) -> u64; + + /// Returns the references of the store path. + fn references(self: Pin<&mut CPathInfo>) -> UniquePtr>; + + /// Returns the possibly invalid signatures attached to the store path. + fn sigs(self: Pin<&mut CPathInfo>) -> UniquePtr>; + + /// Returns the CA field of the store path. + fn ca(self: Pin<&mut CPathInfo>) -> String; + } +} diff --git a/src/bindings/nix.cpp b/src/bindings/nix.cpp new file mode 100644 index 0000000..167244c --- /dev/null +++ b/src/bindings/nix.cpp @@ -0,0 +1,173 @@ +/* +Copyright 2022 Zhaofeng Li and the Attic contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// C++ side of the libnixstore glue. +// +// We implement a mid-level wrapper of the Nix Store interface, +// which is then wrapped again in the Rust side to enable full +// async-await operation. +// +// Here we stick with the naming conventions of Rust and handle +// Rust types directly where possible, so that the interfaces are +// satisfying to use from the Rust side via cxx.rs. + +#include "attic/src/nix_store/bindings/nix.hpp" + +static std::mutex g_init_nix_mutex; +static bool g_init_nix_done = false; + +static nix::StorePath store_path_from_rust(RBasePathSlice base_name) { + std::string_view sv((const char *)base_name.data(), base_name.size()); + return nix::StorePath(sv); +} + +static bool hash_is_sha256(const nix::Hash &hash) { +#ifdef ATTIC_NIX_2_20 + return hash.algo == nix::HashAlgorithm::SHA256; +#else + return hash.type == nix::htSHA256; +#endif +} + +// ======== +// RustSink +// ======== + +RustSink::RustSink(RBox sender) : sender(std::move(sender)) {} + +void RustSink::operator () (std::string_view data) { + RBasePathSlice s((const unsigned char *)data.data(), data.size()); + + this->sender->send(s); +} + +void RustSink::eof() { + this->sender->eof(); +} + + +// ========= +// CPathInfo +// ========= + +CPathInfo::CPathInfo(nix::ref pi) : pi(pi) {} + +RHashSlice CPathInfo::nar_sha256_hash() { + auto &hash = this->pi->narHash; + + if (!hash_is_sha256(hash)) { + throw nix::Error("Only SHA-256 hashes are supported at the moment"); + } + + return RHashSlice(hash.hash, hash.hashSize); +} + +uint64_t CPathInfo::nar_size() { + return this->pi->narSize; +} + +std::unique_ptr> CPathInfo::sigs() { + std::vector result; + for (auto&& elem : this->pi->sigs) { + result.push_back(std::string(elem)); + } + return std::make_unique>(result); +} + +std::unique_ptr> CPathInfo::references() { + std::vector result; + for (auto&& elem : this->pi->references) { + result.push_back(std::string(elem.to_string())); + } + return std::make_unique>(result); +} + +RString CPathInfo::ca() { + if (this->pi->ca) { + return RString(nix::renderContentAddress(this->pi->ca)); + } else { + return RString(""); + } +} + +// ========= +// CNixStore +// ========= + +CNixStore::CNixStore() { + std::map params; + std::lock_guard lock(g_init_nix_mutex); + + if (!g_init_nix_done) { + nix::initNix(); + g_init_nix_done = true; + } + + this->store = nix::openStore(nix::settings.storeUri.get(), params); +} + +RString CNixStore::store_dir() { + return RString(this->store->storeDir); +} + +std::unique_ptr CNixStore::query_path_info(RBasePathSlice base_name) { + auto store_path = store_path_from_rust(base_name); + + auto r = this->store->queryPathInfo(store_path); + return std::make_unique(r); +} + +std::unique_ptr> CNixStore::compute_fs_closure(RBasePathSlice base_name, bool flip_direction, bool include_outputs, bool include_derivers) { + std::set out; + + this->store->computeFSClosure(store_path_from_rust(base_name), out, flip_direction, include_outputs, include_derivers); + + std::vector result; + for (auto&& elem : out) { + result.push_back(std::string(elem.to_string())); + } + return std::make_unique>(result); +} + +std::unique_ptr> CNixStore::compute_fs_closure_multi(RSlice base_names, bool flip_direction, bool include_outputs, bool include_derivers) { + std::set path_set, out; + for (auto&& base_name : base_names) { + path_set.insert(store_path_from_rust(base_name)); + } + + this->store->computeFSClosure(path_set, out, flip_direction, include_outputs, include_derivers); + + std::vector result; + for (auto&& elem : out) { + result.push_back(std::string(elem.to_string())); + } + return std::make_unique>(result); +} + +void CNixStore::nar_from_path(RVec base_name, RBox sender) { + RustSink sink(std::move(sender)); + + std::string_view sv((const char *)base_name.data(), base_name.size()); + nix::StorePath store_path(sv); + + // exceptions will be thrown into Rust + this->store->narFromPath(store_path, sink); + sink.eof(); +} + +std::unique_ptr open_nix_store() { + return std::make_unique(); +} diff --git a/src/bindings/nix.hpp b/src/bindings/nix.hpp new file mode 100644 index 0000000..bbab4c5 --- /dev/null +++ b/src/bindings/nix.hpp @@ -0,0 +1,96 @@ +/* +Copyright 2022 Zhaofeng Li and the Attic contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// C++ side of the libnixstore glue. +// +// We implement a mid-level wrapper of the Nix Store interface, +// which is then wrapped again in the Rust side to enable full +// async-await operation. +// +// Here we stick with the naming conventions of Rust and handle +// Rust types directly where possible, so that the interfaces are +// satisfying to use from the Rust side via cxx.rs. + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +template using RVec = rust::Vec; +template using RBox = rust::Box; +template using RSlice = rust::Slice; +using RString = rust::String; +using RStr = rust::Str; +using RBasePathSlice = RSlice; +using RHashSlice = RSlice; + +struct AsyncWriteSender; + +struct RustSink : nix::Sink +{ + RBox sender; +public: + RustSink(RBox sender); + void operator () (std::string_view data) override; + void eof(); +}; + +// Opaque wrapper for nix::ValidPathInfo +class CPathInfo { + nix::ref pi; +public: + CPathInfo(nix::ref pi); + RHashSlice nar_sha256_hash(); + uint64_t nar_size(); + std::unique_ptr> sigs(); + std::unique_ptr> references(); + RString ca(); +}; + +class CNixStore { + std::shared_ptr store; +public: + CNixStore(); + + RString store_dir(); + std::unique_ptr query_path_info(RBasePathSlice base_name); + std::unique_ptr> compute_fs_closure( + RBasePathSlice base_name, + bool flip_direction, + bool include_outputs, + bool include_derivers); + std::unique_ptr> compute_fs_closure_multi( + RSlice base_names, + bool flip_direction, + bool include_outputs, + bool include_derivers); + void nar_from_path(RVec base_name, RBox sender); +}; + +std::unique_ptr open_nix_store(); + +// Relies on our definitions +#include "attic/src/nix_store/bindings/mod.rs.h" From 8ac9253ea3dff87a2a1cbe28619c5536ca0d662e Mon Sep 17 00:00:00 2001 From: cy Date: Wed, 16 Apr 2025 03:46:56 -0400 Subject: [PATCH 2/9] change stuff to our way --- src/bindings/mod.rs | 17 ++++++++++------- src/bindings/nix.cpp | 8 ++++++++ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/bindings/mod.rs b/src/bindings/mod.rs index 521df2e..f636f4c 100644 --- a/src/bindings/mod.rs +++ b/src/bindings/mod.rs @@ -15,17 +15,17 @@ limitations under the License. */ //! `libnixstore` Bindings +#![allow(dead_code)] use std::cell::UnsafeCell; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; +use anyhow::Result; use futures::stream::{Stream, StreamExt}; use tokio::io::{AsyncWrite, AsyncWriteExt}; -use crate::{AtticError, AtticResult}; - // The C++ implementation takes care of concurrency #[repr(transparent)] pub struct FfiNixStore(UnsafeCell>); @@ -43,7 +43,7 @@ impl FfiNixStore { } /// Obtain a handle to the Nix store. -pub unsafe fn open_nix_store() -> AtticResult { +pub unsafe fn open_nix_store() -> Result { match ffi::open_nix_store() { Ok(ptr) => { let cell = UnsafeCell::new(ptr); @@ -116,7 +116,7 @@ impl AsyncWriteAdapter { } /// Write everything the sender sends to us. - pub async fn write_all(mut self, mut writer: Box) -> AtticResult<()> { + pub async fn write_all(mut self, mut writer: Box) -> Result<()> { let writer = writer.as_mut(); while let Some(data) = self.next().await { @@ -139,7 +139,7 @@ impl AsyncWriteAdapter { } impl Stream for AsyncWriteAdapter { - type Item = AtticResult>; + type Item = Result>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.receiver.poll_recv(cx) { @@ -149,7 +149,7 @@ impl Stream for AsyncWriteAdapter { match message { Data(v) => Poll::Ready(Some(Ok(v))), Error(exception) => { - let error = AtticError::CxxError { exception }; + let error = anyhow::Error::msg(format!("cxx error: {exception}")); Poll::Ready(Some(Err(error))) } Eof => { @@ -181,7 +181,7 @@ mod ffi { } unsafe extern "C++" { - include!("attic/src/nix_store/bindings/nix.hpp"); + include!("nix.hpp"); // ========= // CNixStore @@ -266,5 +266,8 @@ mod ffi { /// Returns the CA field of the store path. fn ca(self: Pin<&mut CPathInfo>) -> String; + + /// Returns the derivation that built this path + fn deriver(self: Pin<&mut CPathInfo>) -> String; } } diff --git a/src/bindings/nix.cpp b/src/bindings/nix.cpp index 167244c..7783b4b 100644 --- a/src/bindings/nix.cpp +++ b/src/bindings/nix.cpp @@ -103,6 +103,14 @@ RString CPathInfo::ca() { } } +RString CPathInfo::deriver() { + if (this->pi->deriver) { + return RString((this->pi->deriver).to_string()); + } else { + return RString(""); + } +} + // ========= // CNixStore // ========= From a771785352a72ae6e3a18bbe4b5f46e1bda7196f Mon Sep 17 00:00:00 2001 From: cy Date: Wed, 16 Apr 2025 03:47:42 -0400 Subject: [PATCH 3/9] use libstore cxx bindings --- src/main.rs | 20 ++++-- src/path_info.rs | 167 ++++++++--------------------------------------- src/push.rs | 46 +++++++++---- src/store.rs | 80 +++++++++++++++++++++++ 4 files changed, 152 insertions(+), 161 deletions(-) create mode 100644 src/store.rs diff --git a/src/main.rs b/src/main.rs index 57d3340..5a7e91f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,20 @@ #![feature(let_chains)] #![feature(extend_one)] +use std::path::PathBuf; + use anyhow::{Context, Result}; use clap::{Args, Parser, Subcommand}; use tracing_subscriber::{EnvFilter, FmtSubscriber}; use push::Push; +use store::Store; +mod bindings; mod cli; mod path_info; mod push; +mod store; mod uploader; #[derive(Parser, Debug)] @@ -60,10 +65,10 @@ pub struct PushArgs { #[arg(long)] skip_signature_check: bool, - /// Package or store path to upload - /// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 - #[arg(value_name = "package or store path")] - package: String, + /// Path to upload + /// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 + #[arg(value_name = "PATH")] + paths: Vec, } #[tokio::main] @@ -76,10 +81,11 @@ async fn main() -> Result<()> { match &cli.command { Commands::Push(cli) => { - let push = Box::leak(Box::new(Push::new(cli).await?)); - push.paths_from_package(&cli.package) + let store = Store::connect()?; + let push = Box::leak(Box::new(Push::new(cli, store).await?)); + push.add_paths(cli.paths.clone()) .await - .context("nixcp get paths from package")?; + .context("add paths to push")?; push.run().await.context("nixcp run")?; } } diff --git a/src/path_info.rs b/src/path_info.rs index 6dcbb53..3e69ad9 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -1,85 +1,40 @@ use std::collections::HashSet; -use anyhow::{Context, Error, Result}; +use anyhow::{Context, Result}; use aws_sdk_s3 as s3; +use futures::future::join_all; use nix_compat::nixbase32; use nix_compat::store_path::StorePath; use regex::Regex; -use serde::Deserialize; -use tokio::process::Command; -use tracing::{debug, error, trace}; +use std::path::Path; +use tracing::{debug, trace}; use url::Url; -// nix path-info --derivation --json -#[derive(Debug, Clone, Deserialize)] +use crate::store::Store; + +#[derive(Debug, Clone)] pub struct PathInfo { pub deriver: Option>, pub path: StorePath, - signatures: Option>, + pub signatures: Vec, pub references: Vec>, } impl PathInfo { - // get PathInfo for a package or a store path - // we deserialize this as an array of `PathInfo` below - pub async fn from_path(path: &str) -> Result { - debug!("query nix path-info for {path}"); - // use lix cause nix would return a json map instead of an array - // json output is not stable and could break in future - // TODO figure out a better way - let nix_cmd = Command::new("nix") - .arg("run") - .arg("--experimental-features") - .arg("nix-command flakes") - .arg("github:nixos/nixpkgs/nixos-unstable#lix") - .arg("--") - .arg("path-info") - .arg("--json") - .arg(path) - .output() - .await - .context("`nix path-info` failed for {package}")?; - - trace!( - "nix path-info output: {}", - String::from_utf8_lossy(&nix_cmd.stdout) - ); - - // nix path-info returns an array with one element - match serde_json::from_slice::>(&nix_cmd.stdout) - .context("parse path info from stdout") - { - Ok(path_info) => path_info - .into_iter() - .next() - .ok_or_else(|| Error::msg("nix path-info returned empty")), - Err(e) => { - error!( - "Failed to parse data from `nix path-info`. The path may not exist on your system." - ); - Err(e) - } - } + pub async fn from_path(path: &Path, store: &Store) -> Result { + debug!("query path info for {:?}", path); + let canon = path.canonicalize().context("canonicalize path")?; + let store_path = StorePath::from_absolute_path(canon.into_os_string().as_encoded_bytes())?; + store.query_path_info(store_path).await } - pub async fn get_closure(&self) -> Result> { - debug!("query nix-store for {}", self.absolute_path()); - let nix_store_cmd = Command::new("nix-store") - .arg("--query") - .arg("--requisites") - .arg("--include-outputs") - .arg(self.absolute_path()) - .output() - .await - .expect("nix-store cmd failed"); - - let nix_store_paths = String::from_utf8(nix_store_cmd.stdout)?; - let nix_store_paths: Vec<&str> = nix_store_paths.lines().collect(); - let mut closure = Vec::with_capacity(nix_store_paths.len()); - for path in nix_store_paths { - closure.push(Self::from_path(path).await?); - } - Ok(closure) + pub async fn get_closure(&self, store: &Store) -> Result> { + let futs = store + .compute_fs_closure(self.path.clone()) + .await? + .into_iter() + .map(|x| store.query_path_info(x)); + join_all(futs).await.into_iter().collect() } /// checks if the path is signed by any upstream. if it is, we assume a cache hit. @@ -101,15 +56,13 @@ impl PathInfo { } fn signees(&self) -> Vec<&str> { - if let Some(signatures) = self.signatures.as_ref() { - let signees: Vec<_> = signatures - .iter() - .filter_map(|signature| Some(signature.split_once(":")?.0)) - .collect(); - trace!("signees for {}: {:?}", self.path, signees); - return signees; - } - Vec::new() + let signers: Vec<_> = self + .signatures + .iter() + .filter_map(|signature| Some(signature.split_once(":")?.0)) + .collect(); + trace!("signers for {}: {:?}", self.path, signers); + return signers; } pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool { @@ -149,69 +102,3 @@ impl PathInfo { .is_ok() } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn get_signees_from_path_info() { - let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#; - let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize"); - - path_info.signatures = Some(vec![ - "cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(), - "nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(), - ]); - let signees = path_info.signees(); - assert_eq!(signees, vec!["cache.nixos.org-1", "nixcache.cy7.sh"]); - } - - #[test] - fn match_upstream_cache_from_signature() { - let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#; - let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize"); - - path_info.signatures = Some(vec![ - "cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(), - "nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(), - "nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs=".to_string(), - ]); - assert!( - path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()]) - ); - assert!( - path_info.check_upstream_signature(&[Url::parse("https://nixcache.cy7.sh").unwrap()]) - ); - assert!( - path_info.check_upstream_signature(&[ - Url::parse("https://nix-community.cachix.org").unwrap() - ]) - ); - assert!( - !path_info - .check_upstream_signature(&[Url::parse("https://fake-cache.cachix.org").unwrap()]), - ); - } - - #[test] - fn path_info_without_signature() { - let path_info_json = r#"{"ca":"fixed:r:sha256:1q10p04pgx9sk6xbvrkn4nvh0ys2lzplgcni5368f4z3cr8ikbmz","narHash":"sha256-v64ZUWbjE4fMKNGyR++nQnsAtyV25r26mTr1dwm4IOA=","narSize":5520,"path":"/nix/store/gj6hz9mj23v01yvq1nn5f655jrcky1qq-nixos-option.nix","references":[],"registrationTime":1744740942,"valid":true}"#; - let path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize"); - - assert!( - !path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()]) - ); - } - - /* - #[test] - fn path_info_deserialize_nix_map() { - let path_info_json = r#"{"/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation":{"ca":null,"deriver":"/nix/store/h8z25s6arcrns5nmrq1yhgbamywjivpn-home-manager-generation.drv","narHash":"sha256-o4qwqyJ5UVm9cyC/nBNcNYVnIM14Pewgw7fou+wUVSY=","narSize":13608,"references":["/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/4d0ix5djms3n2jnjdc58l916cwack1rp-empty-directory","/nix/store/56zmgla8443qfrkrh2ch0vz0mh8ywrw1-home-manager-files","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/80l1sb3vcmrkcdd7ihlizkcnv19rq9fj-ncurses-6.5","/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation","/nix/store/92as847i10kl6s19fi910ddyk9l83835-check-link-targets.sh","/nix/store/9c90iz95yynyh3vsc67zndch6j01vgz3-home-manager-path","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bm5fi6wj0w4r2wjll2448k307bzfcjwx-cleanup","/nix/store/c244fsb3a7i5837lzn94m4bmav9i5p9b-link","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/gpxsdrrd4x93fs75395vr2dfys1ki9mq-jq-1.7.1-bin","/nix/store/jlf743lqxbvad6dbgndsgqfg20m2np5i-sd-switch-0.5.3","/nix/store/mhmgm739aagj4x7hr6ag2wjmxhmpy8mf-gettext-0.22.5","/nix/store/w9db12j05yv5hl31s6jndd9cfm1g1gw4-hm-modules-messages","/nix/store/wj1c3gsiajabnq50ifxqnlv60i5rhqj7-diffutils-3.10","/nix/store/xhql0ilzbiqwnmz4z8y0phk611wynxf2-gnused-4.9","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zhrjg6wxrxmdlpn6iapzpp2z2vylpvw5-home-manager.sh"],"registrationTime":1744742989,"signatures":["nixcache.cy7.sh:Vq4X95kSzum7BwrBhjmmM2yVipfBI3AE3jgZ3b3RoYrP4/ghotbDdlwCvwK3qx4BQdEOLSgrC1tDwiMNb6oRBw=="],"ultimate":false}}"#; - serde_json::from_str::>(path_info_json).expect("must serialize"); - - let path_info_json = r#"{"/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b":{"ca":null,"deriver":"/nix/store/12ssi931481jlkizgfk1c1jnawvwjbhh-nixos-system-chunk-25.05.20250412.2631b0b.drv","narHash":"sha256-CHhBIzMD4v/FKqKgGroq0UC1k3GrK5lcNwQPMpv2xLc=","narSize":20704,"references":["/nix/store/0yjiyixxsr137iw93hnaacdsssy1li9h-switch-to-configuration-0.1.0","/nix/store/14rby7cpwrzjsjym44cl5h6nj6qpn1gs-etc","/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b","/nix/store/3wjljpj30fvv2cdb60apr4126pa5bm87-shadow-4.17.2","/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/5dyh8l59kfvf89zjkbmjfnx7fix93n4f-net-tools-2.10","/nix/store/aq9wdsz12bg9252790l9awiry2bml4ls-sops-install-secrets-0.0.1","/nix/store/b00kq6fjhgisdrykg621vml8505nnmb3-users-groups.json","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bfr68wi6k8icb3j9fy3fzchva56djfhd-mounts.sh","/nix/store/cjnihsds5hhnji9r85hglph07q9y9hgc-system-path","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/f9jll96j74f5ykvs062718b98lfjbn9g-util-linux-2.40.4-bin","/nix/store/h7zih134d3n5yk8pnhv1fa38n6qkyrn2-pre-switch-checks","/nix/store/idn5n51246piyxcr3v6gxnj5a5l9mzpn-linux-6.14.2","/nix/store/ipn5793y61x2904xqnkgbjnp91svjjzx-perl-5.40.0-env","/nix/store/j1rikvl25pz0b5ham1ijq0nbg1q2fqfy-initrd-linux-6.14.2","/nix/store/jgawnqyh6piwcl79gxpmq5czx9rfr9xh-glibc-locales-2.40-66","/nix/store/jqgmcv8j4gj59218hcbiyn8z951rycdj-install-grub.sh","/nix/store/kpmybhxy3gz6k1znbdirwsp3c6wvsgg9-manifest.json","/nix/store/lgainx4gl6q7mhiwmls81d3n51p5jz7z-linux-6.14.2-modules","/nix/store/mhxn5kwnri3z9hdzi3x0980id65p0icn-lib.sh","/nix/store/n8n0faszqlnf3mdg0fj6abnknrhjsw5j-perl-5.40.0-env","/nix/store/nq61v7a601gjndijq5nndprkzpwz4q9g-glibc-2.40-66-bin","/nix/store/nx27idxpvi3fk3p7admvhipny73nr25n-kmod-31","/nix/store/pggww1d2pg24fcg5v36xn63n53vanyyi-gnupg-2.4.7","/nix/store/rg5rf512szdxmnj9qal3wfdnpfsx38qi-setup-etc.pl","/nix/store/vvlfaafnz3pdhw7lx5kc5gb9pl4zhz5l-local-cmds","/nix/store/w142vx7ij1fz6qwhp5dprkf59cizvv1v-update-users-groups.pl","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zlsmh0ccgvncg30qb4y0mp5pahnk1wnw-append-initrd-secrets","/nix/store/zs07icpv5ykf8m36xcv717hh26bp09fa-firmware","/nix/store/zy2n4id5gcxcbx2x8jbblkmcpdlpsypk-getent-glibc-2.40-66"],"registrationTime":1744743136,"signatures":["nixcache.cy7.sh:dZ1XiKQNe0fRX48gBj03PIABYJGV6BPwb72YpMqEBONZMF+JrkVKhRCF0ur/4Bf5prHxg6Qfg1ytP/4csRC9DQ=="],"ultimate":false}}"#; - serde_json::from_str::>(path_info_json).expect("must serialize"); - } - */ -} diff --git a/src/push.rs b/src/push.rs index 719d3a8..df8304d 100644 --- a/src/push.rs +++ b/src/push.rs @@ -1,6 +1,7 @@ use std::{ fs, iter::once, + path::PathBuf, sync::{ Arc, atomic::{AtomicUsize, Ordering}, @@ -13,10 +14,10 @@ use aws_sdk_s3 as s3; use futures::future::join_all; use nix_compat::narinfo::{self, SigningKey}; use tokio::sync::{RwLock, mpsc}; -use tracing::{debug, info, trace}; +use tracing::{debug, trace}; use url::Url; -use crate::{PushArgs, path_info::PathInfo, uploader::Uploader}; +use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader}; pub struct Push { upstream_caches: Vec, @@ -24,6 +25,7 @@ pub struct Push { s3_client: s3::Client, signing_key: SigningKey, bucket: String, + store: Arc, // paths that we skipped cause of a signature match signature_hit_count: AtomicUsize, // paths that we skipped cause we found it on an upstream @@ -35,7 +37,7 @@ pub struct Push { } impl Push { - pub async fn new(cli: &PushArgs) -> Result { + pub async fn new(cli: &PushArgs, store: Store) -> Result { let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1); for upstream in cli .upstreams @@ -67,6 +69,7 @@ impl Push { s3_client, signing_key, bucket: cli.bucket.clone(), + store: Arc::new(store), signature_hit_count: AtomicUsize::new(0), upstream_hit_count: AtomicUsize::new(0), already_exists_count: AtomicUsize::new(0), @@ -74,18 +77,33 @@ impl Push { }) } - pub async fn paths_from_package(&mut self, package: &str) -> Result<()> { - let path_info = PathInfo::from_path(package) + pub async fn add_paths(&'static self, paths: Vec) -> Result<()> { + let mut futs = Vec::with_capacity(paths.len()); + for path in paths { + let store_paths = self.store_paths.clone(); + let store = self.store.clone(); + + futs.push(tokio::spawn(async move { + let path_info = PathInfo::from_path(path.as_path(), &store) + .await + .context("get path info for path")?; + debug!("path-info for {path:?}: {path_info:?}"); + + store_paths.write().await.extend( + path_info + .get_closure(&store) + .await + .context("closure from path info")?, + ); + Ok(()) + })); + } + join_all(futs) .await - .context("get path info for package")?; - debug!("path-info for {package}: {:?}", path_info); - self.store_paths.write().await.extend( - path_info - .get_closure() - .await - .context("closure from path info")?, - ); - info!("found {} store paths", self.store_paths.read().await.len()); + .into_iter() + .flatten() + .collect::>>()?; + println!("found {} store paths", self.store_paths.read().await.len()); Ok(()) } diff --git a/src/store.rs b/src/store.rs new file mode 100644 index 0000000..e988535 --- /dev/null +++ b/src/store.rs @@ -0,0 +1,80 @@ +use std::{ffi::OsStr, os::unix::ffi::OsStrExt, sync::Arc}; + +use anyhow::{Context, Result}; +use nix_compat::store_path::StorePath; +use tokio::task; + +use crate::{bindings, path_info::PathInfo}; + +pub struct Store { + inner: Arc, +} + +impl Store { + pub fn connect() -> Result { + let inner = unsafe { bindings::open_nix_store()? }; + Ok(Self { + inner: Arc::new(inner), + }) + } + + pub async fn compute_fs_closure( + &self, + path: StorePath, + ) -> Result>> { + let inner = self.inner.clone(); + task::spawn_blocking(move || { + let cxx_vector = + inner + .store() + .compute_fs_closure(path.to_string().as_bytes(), false, true, true)?; + Ok(cxx_vector + .iter() + .map(|x| { + StorePath::from_bytes(x.as_bytes()) + .context("make StorePath from vector returned by compute_fs_closure") + }) + .collect::>()?) + }) + .await + .unwrap() + } + + pub async fn query_path_info(&self, path: StorePath) -> Result { + let inner = self.inner.clone(); + + task::spawn_blocking(move || { + let mut c_path_info = inner.store().query_path_info(path.to_string().as_bytes())?; + + let deriver = c_path_info.pin_mut().deriver(); + let signatures = c_path_info + .pin_mut() + .sigs() + .into_iter() + .map(|x| { + let osstr = OsStr::from_bytes(x.as_bytes()); + osstr.to_str().unwrap().to_string() + }) + .collect(); + let references = c_path_info + .pin_mut() + .references() + .into_iter() + .map(|x| StorePath::from_bytes(x.as_bytes())) + .collect::>()?; + + Ok(PathInfo { + path, + deriver: if deriver.is_empty() { + None + } else { + Some(StorePath::from_bytes(deriver.as_bytes())?) + }, + signatures, + references, + }) + }) + .await + .unwrap() + } +} From 84bbe5dcb49ae8e39cebed5386006d749823ea94 Mon Sep 17 00:00:00 2001 From: cy Date: Wed, 16 Apr 2025 12:42:44 -0400 Subject: [PATCH 4/9] fix build --- Cargo.lock | 112 +++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 5 ++ build.rs | 21 ++++++++ flake.nix | 6 ++- src/bindings/mod.rs | 48 +------------------ src/bindings/nix.cpp | 70 +-------------------------- src/bindings/nix.hpp | 11 +---- src/path_info.rs | 1 - src/store.rs | 6 --- src/uploader.rs | 2 +- 10 files changed, 147 insertions(+), 135 deletions(-) create mode 100644 build.rs diff --git a/Cargo.lock b/Cargo.lock index aee2282..4d5600c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -759,6 +759,17 @@ dependencies = [ "cc", ] +[[package]] +name = "codespan-reporting" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe6d2e5af09e8c8ad56c969f2157a3d4238cebc7c55f0a517728c38f7b200f81" +dependencies = [ + "serde", + "termcolor", + "unicode-width", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -907,6 +918,65 @@ dependencies = [ "syn", ] +[[package]] +name = "cxx" +version = "1.0.157" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6354e975ea4ec28033ec3a36fa9baa1a02e3eb22ad740eeb4929370d4f5ba8" +dependencies = [ + "cc", + "cxxbridge-cmd", + "cxxbridge-flags", + "cxxbridge-macro", + "foldhash", + "link-cplusplus", +] + +[[package]] +name = "cxx-build" +version = "1.0.157" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b4400e26ea4b99417e4263b1ce2d8452404d750ba0809a7bd043072593d430d" +dependencies = [ + "cc", + "codespan-reporting", + "proc-macro2", + "quote", + "scratch", + "syn", +] + +[[package]] +name = "cxxbridge-cmd" +version = "1.0.157" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31860c98f69fc14da5742c5deaf78983e846c7b27804ca8c8319e32eef421bde" +dependencies = [ + "clap", + "codespan-reporting", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "cxxbridge-flags" +version = "1.0.157" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0402a66013f3b8d3d9f2d7c9994656cc81e671054822b0728d7454d9231892f" + +[[package]] +name = "cxxbridge-macro" +version = "1.0.157" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64c0b38f32d68f3324a981645ee39b2d686af36d03c98a386df3716108c9feae" +dependencies = [ + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "data-encoding" version = "2.9.0" @@ -1772,6 +1842,15 @@ dependencies = [ "libc", ] +[[package]] +name = "link-cplusplus" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a6f6da007f968f9def0d65a05b187e2960183de70c160204ecfccf0ee330212" +dependencies = [ + "cc", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -1945,9 +2024,12 @@ dependencies = [ "aws-config", "aws-sdk-s3", "clap", + "cxx", + "cxx-build", "ed25519-dalek", "futures", "nix-compat", + "pkg-config", "regex", "reqwest", "serde", @@ -2538,6 +2620,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "scratch" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f6280af86e5f559536da57a45ebc84948833b3bee313a7dd25232e09c878a52" + [[package]] name = "sct" version = "0.7.1" @@ -2841,6 +2929,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "2.0.12" @@ -3106,6 +3203,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unicode-width" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" + [[package]] name = "untrusted" version = "0.9.0" @@ -3311,6 +3414,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index f7bc3f0..8adf37e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,3 +21,8 @@ tokio = { version = "1.44.1", features = [ "full" ]} tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"]} url = { version = "2.5.4", features = [ "serde" ]} +cxx = "1.0" + +[build-dependencies] +cxx-build = "1.0" +pkg-config = "0.3.32" diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..2bbe451 --- /dev/null +++ b/build.rs @@ -0,0 +1,21 @@ +fn main() { + cxx_build::bridge("src/bindings/mod.rs") + .file("src/bindings/nix.cpp") + .flag("-std=c++2a") + .flag("-O2") + .flag("-include") + .flag("nix/config.h") + .flag("-I") + .flag(concat!(env!("NIX_INCLUDE_PATH"), "/nix")) + .compile("nixbinding"); + println!("cargo:rerun-if-changed=src/bindings"); + + pkg_config::Config::new() + .atleast_version("2.4") + .probe("nix-store") + .unwrap(); + pkg_config::Config::new() + .atleast_version("2.4") + .probe("nix-main") + .unwrap(); +} diff --git a/flake.nix b/flake.nix index c263f49..e9165c2 100644 --- a/flake.nix +++ b/flake.nix @@ -22,6 +22,7 @@ }; toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml; craneLib = (crane.mkLib pkgs).overrideToolchain(_: toolchain); + lib = pkgs.lib; in { devShells.default = pkgs.mkShell { @@ -29,9 +30,12 @@ pkg-config ]; buildInputs = with pkgs; [ - openssl toolchain + openssl + nix + boost ]; + NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include"; }; packages.default = craneLib.buildPackage { diff --git a/src/bindings/mod.rs b/src/bindings/mod.rs index f636f4c..61a32af 100644 --- a/src/bindings/mod.rs +++ b/src/bindings/mod.rs @@ -181,7 +181,7 @@ mod ffi { } unsafe extern "C++" { - include!("nix.hpp"); + include!("nixcp/src/bindings/nix.hpp"); // ========= // CNixStore @@ -190,16 +190,6 @@ mod ffi { /// Mid-level wrapper for the Unix Domain Socket Nix Store. type CNixStore; - /// Returns the path of the Nix store itself. - fn store_dir(self: Pin<&mut CNixStore>) -> String; - - /* - /// Verifies that a path is indeed in the Nix store, then return the base store path. - /// - /// Use parse_store_path instead. - fn to_store_path(self: Pin<&mut CNixStore>, path: &str) -> Result; - */ - /// Queries information about a valid path. fn query_path_info( self: Pin<&mut CNixStore>, @@ -218,30 +208,6 @@ mod ffi { include_derivers: bool, ) -> Result>>; - /// Computes the closure of a list of valid paths. - /// - /// This is the multi-path variant of `compute_fs_closure`. - /// If `flip_directions` is true, the set of paths that can reach `store_path` is - /// returned. - /// - /// It's easier and more efficient to just pass a vector of slices - /// instead of wrangling with concrete "extern rust" / "extern C++" - /// types. - fn compute_fs_closure_multi( - self: Pin<&mut CNixStore>, - base_names: &[&[u8]], - flip_direction: bool, - include_outputs: bool, - include_derivers: bool, - ) -> Result>>; - - /// Creates a NAR dump from a path. - fn nar_from_path( - self: Pin<&mut CNixStore>, - base_name: Vec, - sender: Box, - ) -> Result<()>; - /// Obtains a handle to the Nix store. fn open_nix_store() -> Result>; @@ -252,22 +218,10 @@ mod ffi { /// Mid-level wrapper for the `nix::ValidPathInfo` struct. type CPathInfo; - /// Returns the SHA-256 hash of the store path. - fn nar_sha256_hash(self: Pin<&mut CPathInfo>) -> &[u8]; - - /// Returns the size of the NAR. - fn nar_size(self: Pin<&mut CPathInfo>) -> u64; - /// Returns the references of the store path. fn references(self: Pin<&mut CPathInfo>) -> UniquePtr>; /// Returns the possibly invalid signatures attached to the store path. fn sigs(self: Pin<&mut CPathInfo>) -> UniquePtr>; - - /// Returns the CA field of the store path. - fn ca(self: Pin<&mut CPathInfo>) -> String; - - /// Returns the derivation that built this path - fn deriver(self: Pin<&mut CPathInfo>) -> String; } } diff --git a/src/bindings/nix.cpp b/src/bindings/nix.cpp index 7783b4b..3914de1 100644 --- a/src/bindings/nix.cpp +++ b/src/bindings/nix.cpp @@ -24,7 +24,7 @@ limitations under the License. // Rust types directly where possible, so that the interfaces are // satisfying to use from the Rust side via cxx.rs. -#include "attic/src/nix_store/bindings/nix.hpp" +#include "nixcp/src/bindings/nix.hpp" static std::mutex g_init_nix_mutex; static bool g_init_nix_done = false; @@ -34,14 +34,6 @@ static nix::StorePath store_path_from_rust(RBasePathSlice base_name) { return nix::StorePath(sv); } -static bool hash_is_sha256(const nix::Hash &hash) { -#ifdef ATTIC_NIX_2_20 - return hash.algo == nix::HashAlgorithm::SHA256; -#else - return hash.type == nix::htSHA256; -#endif -} - // ======== // RustSink // ======== @@ -65,20 +57,6 @@ void RustSink::eof() { CPathInfo::CPathInfo(nix::ref pi) : pi(pi) {} -RHashSlice CPathInfo::nar_sha256_hash() { - auto &hash = this->pi->narHash; - - if (!hash_is_sha256(hash)) { - throw nix::Error("Only SHA-256 hashes are supported at the moment"); - } - - return RHashSlice(hash.hash, hash.hashSize); -} - -uint64_t CPathInfo::nar_size() { - return this->pi->narSize; -} - std::unique_ptr> CPathInfo::sigs() { std::vector result; for (auto&& elem : this->pi->sigs) { @@ -95,22 +73,6 @@ std::unique_ptr> CPathInfo::references() { return std::make_unique>(result); } -RString CPathInfo::ca() { - if (this->pi->ca) { - return RString(nix::renderContentAddress(this->pi->ca)); - } else { - return RString(""); - } -} - -RString CPathInfo::deriver() { - if (this->pi->deriver) { - return RString((this->pi->deriver).to_string()); - } else { - return RString(""); - } -} - // ========= // CNixStore // ========= @@ -127,10 +89,6 @@ CNixStore::CNixStore() { this->store = nix::openStore(nix::settings.storeUri.get(), params); } -RString CNixStore::store_dir() { - return RString(this->store->storeDir); -} - std::unique_ptr CNixStore::query_path_info(RBasePathSlice base_name) { auto store_path = store_path_from_rust(base_name); @@ -150,32 +108,6 @@ std::unique_ptr> CNixStore::compute_fs_closure(RBasePat return std::make_unique>(result); } -std::unique_ptr> CNixStore::compute_fs_closure_multi(RSlice base_names, bool flip_direction, bool include_outputs, bool include_derivers) { - std::set path_set, out; - for (auto&& base_name : base_names) { - path_set.insert(store_path_from_rust(base_name)); - } - - this->store->computeFSClosure(path_set, out, flip_direction, include_outputs, include_derivers); - - std::vector result; - for (auto&& elem : out) { - result.push_back(std::string(elem.to_string())); - } - return std::make_unique>(result); -} - -void CNixStore::nar_from_path(RVec base_name, RBox sender) { - RustSink sink(std::move(sender)); - - std::string_view sv((const char *)base_name.data(), base_name.size()); - nix::StorePath store_path(sv); - - // exceptions will be thrown into Rust - this->store->narFromPath(store_path, sink); - sink.eof(); -} - std::unique_ptr open_nix_store() { return std::make_unique(); } diff --git a/src/bindings/nix.hpp b/src/bindings/nix.hpp index bbab4c5..9f4964a 100644 --- a/src/bindings/nix.hpp +++ b/src/bindings/nix.hpp @@ -63,11 +63,8 @@ class CPathInfo { nix::ref pi; public: CPathInfo(nix::ref pi); - RHashSlice nar_sha256_hash(); - uint64_t nar_size(); std::unique_ptr> sigs(); std::unique_ptr> references(); - RString ca(); }; class CNixStore { @@ -82,15 +79,9 @@ public: bool flip_direction, bool include_outputs, bool include_derivers); - std::unique_ptr> compute_fs_closure_multi( - RSlice base_names, - bool flip_direction, - bool include_outputs, - bool include_derivers); - void nar_from_path(RVec base_name, RBox sender); }; std::unique_ptr open_nix_store(); // Relies on our definitions -#include "attic/src/nix_store/bindings/mod.rs.h" +#include "nixcp/src/bindings/mod.rs.h" diff --git a/src/path_info.rs b/src/path_info.rs index 3e69ad9..3d54b17 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -14,7 +14,6 @@ use crate::store::Store; #[derive(Debug, Clone)] pub struct PathInfo { - pub deriver: Option>, pub path: StorePath, pub signatures: Vec, pub references: Vec>, diff --git a/src/store.rs b/src/store.rs index e988535..54f185c 100644 --- a/src/store.rs +++ b/src/store.rs @@ -46,7 +46,6 @@ impl Store { task::spawn_blocking(move || { let mut c_path_info = inner.store().query_path_info(path.to_string().as_bytes())?; - let deriver = c_path_info.pin_mut().deriver(); let signatures = c_path_info .pin_mut() .sigs() @@ -65,11 +64,6 @@ impl Store { Ok(PathInfo { path, - deriver: if deriver.is_empty() { - None - } else { - Some(StorePath::from_bytes(deriver.as_bytes())?) - }, signatures, references, }) diff --git a/src/uploader.rs b/src/uploader.rs index b0520ac..95e03df 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -159,7 +159,7 @@ impl<'a> Uploader<'a> { signatures: Vec::new(), ca: None, system: None, - deriver: self.path.deriver.as_ref().map(|x| x.as_ref()), + deriver: None, compression: Some("zstd"), file_hash: None, file_size: None, From 6806b968924089ee2a8b4c62d4aed15da3f10d91 Mon Sep 17 00:00:00 2001 From: cy Date: Wed, 16 Apr 2025 15:49:01 -0400 Subject: [PATCH 5/9] limit uploads with semaphore --- src/push.rs | 16 +++++++++++----- src/uploader.rs | 1 + 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/push.rs b/src/push.rs index df8304d..dca2a9f 100644 --- a/src/push.rs +++ b/src/push.rs @@ -13,7 +13,7 @@ use aws_config::Region; use aws_sdk_s3 as s3; use futures::future::join_all; use nix_compat::narinfo::{self, SigningKey}; -use tokio::sync::{RwLock, mpsc}; +use tokio::sync::{RwLock, Semaphore, mpsc}; use tracing::{debug, trace}; use url::Url; @@ -161,6 +161,7 @@ impl Push { async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { let mut uploads = Vec::with_capacity(10); + let permits = Arc::new(Semaphore::new(10)); loop { if let Some(path_to_upload) = rx.recv().await { @@ -174,10 +175,15 @@ impl Push { self.bucket.clone(), )?; - uploads.push(tokio::spawn(async move { - let res = uploader.upload().await; - self.upload_count.fetch_add(1, Ordering::Relaxed); - res + uploads.push(tokio::spawn({ + let permits = permits.clone(); + + async move { + let _permit = permits.acquire().await; + let res = uploader.upload().await; + self.upload_count.fetch_add(1, Ordering::Relaxed); + res + } })); } else { join_all(uploads) diff --git a/src/uploader.rs b/src/uploader.rs index 95e03df..cacae2b 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -132,6 +132,7 @@ impl<'a> Uploader<'a> { .body(nar_info.to_string().as_bytes().to_vec().into()) .send() .await?; + debug!("done uploading narinfo"); Ok(()) } From b1e59d0a6c302805475d2f4c5644bdf8a24c6334 Mon Sep 17 00:00:00 2001 From: cy Date: Fri, 18 Apr 2025 00:50:11 -0400 Subject: [PATCH 6/9] use nix path-info cmd for derivation; console_subscriber --- Cargo.lock | 437 ++++++++++++++++++++++++++++++++++++++++------- Cargo.toml | 4 +- flake.nix | 3 + src/main.rs | 5 +- src/path_info.rs | 27 ++- src/push.rs | 17 +- src/store.rs | 8 +- src/uploader.rs | 2 + 8 files changed, 421 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d5600c..9fa7a11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -102,6 +102,39 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "async-trait" +version = "0.1.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -429,7 +462,7 @@ dependencies = [ "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", - "tower", + "tower 0.5.2", "tracing", ] @@ -553,6 +586,53 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -657,6 +737,12 @@ version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.10.1" @@ -776,6 +862,45 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "console-api" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8030735ecb0d128428b64cd379809817e620a40e5001c54465b99ec5feec2857" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6539aa9c6a4cd31f4b1c040f860a1eac9aa80e7df6b05d506a6e7179936d6a01" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "hyper-util", + "prost", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -859,6 +984,21 @@ dependencies = [ "crc", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crypto-bigint" version = "0.4.9" @@ -1160,6 +1300,16 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" +[[package]] +name = "flate2" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1359,7 +1509,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap", + "indexmap 2.9.0", "slab", "tokio", "tokio-util", @@ -1378,13 +1528,19 @@ dependencies = [ "futures-core", "futures-sink", "http 1.3.1", - "indexmap", + "indexmap 2.9.0", "slab", "tokio", "tokio-util", "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.15.2" @@ -1396,6 +1552,19 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "flate2", + "nom 7.1.3", + "num-traits", +] + [[package]] name = "heck" version = "0.5.0" @@ -1494,6 +1663,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" + [[package]] name = "hyper" version = "0.14.32" @@ -1531,6 +1706,7 @@ dependencies = [ "http 1.3.1", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -1572,6 +1748,19 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.6.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -1747,6 +1936,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.9.0" @@ -1754,7 +1953,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -1891,7 +2090,7 @@ version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" dependencies = [ - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -1903,6 +2102,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -2024,6 +2229,7 @@ dependencies = [ "aws-config", "aws-sdk-s3", "clap", + "console-subscriber", "cxx", "cxx-build", "ed25519-dalek", @@ -2037,7 +2243,6 @@ dependencies = [ "sha2", "tokio", "tracing", - "tracing-subscriber", "url", ] @@ -2060,16 +2265,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "nu-ansi-term" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" -dependencies = [ - "overload", - "winapi", -] - [[package]] name = "num-conv" version = "0.1.0" @@ -2180,12 +2375,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "p256" version = "0.11.1" @@ -2226,6 +2415,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2270,6 +2479,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + [[package]] name = "prettyplease" version = "0.2.32" @@ -2298,6 +2516,38 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost", +] + [[package]] name = "quote" version = "1.0.40" @@ -2313,6 +2563,27 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + [[package]] name = "rand_core" version = "0.6.4" @@ -2416,7 +2687,7 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", - "tower", + "tower 0.5.2", "tower-service", "url", "wasm-bindgen", @@ -3023,6 +3294,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.52.0", ] @@ -3067,6 +3339,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.14" @@ -3092,11 +3375,61 @@ version = "0.22.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474" dependencies = [ - "indexmap", + "indexmap 2.9.0", "toml_datetime", "winnow", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2 0.4.9", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.2" @@ -3156,17 +3489,6 @@ dependencies = [ "valuable", ] -[[package]] -name = "tracing-log" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" -dependencies = [ - "log", - "once_cell", - "tracing-core", -] - [[package]] name = "tracing-subscriber" version = "0.3.19" @@ -3174,15 +3496,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ "matchers", - "nu-ansi-term", "once_cell", "regex", "sharded-slab", - "smallvec", "thread_local", "tracing", "tracing-core", - "tracing-log", ] [[package]] @@ -3398,22 +3717,6 @@ dependencies = [ "rustix 0.38.44", ] -[[package]] -name = "winapi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" -dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", -] - -[[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" - [[package]] name = "winapi-util" version = "0.1.9" @@ -3423,12 +3726,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - [[package]] name = "windows-link" version = "0.1.1" @@ -3670,6 +3967,26 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zerocopy" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zerofrom" version = "0.1.6" diff --git a/Cargo.toml b/Cargo.toml index 8adf37e..ee646cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,11 +17,11 @@ reqwest = "0.12.15" serde = { version = "1.0.219", features = [ "derive" ]} serde_json = "1.0.140" sha2 = "0.10.8" -tokio = { version = "1.44.1", features = [ "full" ]} +tokio = { version = "1.44.1", features = [ "full", "tracing", "parking_lot" ]} tracing = "0.1.41" -tracing-subscriber = { version = "0.3.19", features = ["env-filter"]} url = { version = "2.5.4", features = [ "serde" ]} cxx = "1.0" +console-subscriber = "0.4.1" [build-dependencies] cxx-build = "1.0" diff --git a/flake.nix b/flake.nix index e9165c2..46caa41 100644 --- a/flake.nix +++ b/flake.nix @@ -34,8 +34,11 @@ openssl nix boost + tokio-console ]; NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include"; + RUST_LOG = "nixcp=debug"; + RUST_BACKGRACE = 1; }; packages.default = craneLib.buildPackage { diff --git a/src/main.rs b/src/main.rs index 5a7e91f..f4c0f4f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,6 @@ use std::path::PathBuf; use anyhow::{Context, Result}; use clap::{Args, Parser, Subcommand}; -use tracing_subscriber::{EnvFilter, FmtSubscriber}; use push::Push; use store::Store; @@ -73,9 +72,7 @@ pub struct PushArgs { #[tokio::main] async fn main() -> Result<()> { - let filter = EnvFilter::from_default_env(); - let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish(); - tracing::subscriber::set_global_default(subscriber)?; + console_subscriber::init(); let cli = Cli::parse(); diff --git a/src/path_info.rs b/src/path_info.rs index 3d54b17..e62b68a 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -7,6 +7,7 @@ use nix_compat::nixbase32; use nix_compat::store_path::StorePath; use regex::Regex; use std::path::Path; +use tokio::process::Command; use tracing::{debug, trace}; use url::Url; @@ -22,9 +23,29 @@ pub struct PathInfo { impl PathInfo { pub async fn from_path(path: &Path, store: &Store) -> Result { debug!("query path info for {:?}", path); - let canon = path.canonicalize().context("canonicalize path")?; - let store_path = StorePath::from_absolute_path(canon.into_os_string().as_encoded_bytes())?; - store.query_path_info(store_path).await + + let derivation = match path.extension() { + Some(ext) if ext == "drv" => path.as_os_str().as_encoded_bytes(), + _ => { + &Command::new("nix") + .arg("path-info") + .arg("--derivation") + .arg(path) + .output() + .await + .context(format!("run command: nix path-info --derivaiton {path:?}"))? + .stdout + } + }; + let derivation = String::from_utf8_lossy(derivation); + debug!("derivation: {derivation}"); + + let store_path = StorePath::from_absolute_path(derivation.trim().as_bytes()) + .context("storepath from derivation")?; + store + .query_path_info(store_path) + .await + .context("query pathinfo for derivation") } pub async fn get_closure(&self, store: &Store) -> Result> { diff --git a/src/push.rs b/src/push.rs index dca2a9f..8177864 100644 --- a/src/push.rs +++ b/src/push.rs @@ -13,8 +13,8 @@ use aws_config::Region; use aws_sdk_s3 as s3; use futures::future::join_all; use nix_compat::narinfo::{self, SigningKey}; -use tokio::sync::{RwLock, Semaphore, mpsc}; -use tracing::{debug, trace}; +use tokio::sync::{RwLock, mpsc}; +use tracing::debug; use url::Url; use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader}; @@ -124,7 +124,7 @@ impl Push { for path in store_paths.into_iter() { if path.check_upstream_signature(&self.upstream_caches) { - trace!("skip {} (signature match)", path.absolute_path()); + debug!("skip {} (signature match)", path.absolute_path()); self.signature_hit_count.fetch_add(1, Ordering::Release); continue; } @@ -139,13 +139,13 @@ impl Push { .check_if_already_exists(&self.s3_client, self.bucket.clone()) .await { - trace!("skip {} (already exists)", path.absolute_path()); + debug!("skip {} (already exists)", path.absolute_path()); self.already_exists_count.fetch_add(1, Ordering::Relaxed); } else { tx.send(path).await.unwrap(); } } else { - trace!("skip {} (upstream hit)", path.absolute_path()); + debug!("skip {} (upstream hit)", path.absolute_path()); self.upstream_hit_count.fetch_add(1, Ordering::Relaxed); } }) @@ -161,13 +161,11 @@ impl Push { async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { let mut uploads = Vec::with_capacity(10); - let permits = Arc::new(Semaphore::new(10)); loop { if let Some(path_to_upload) = rx.recv().await { - let absolute_path = path_to_upload.absolute_path(); + println!("uploading: {}", path_to_upload.absolute_path()); - println!("uploading: {}", absolute_path); let uploader = Uploader::new( &self.signing_key, path_to_upload, @@ -176,10 +174,7 @@ impl Push { )?; uploads.push(tokio::spawn({ - let permits = permits.clone(); - async move { - let _permit = permits.acquire().await; let res = uploader.upload().await; self.upload_count.fetch_add(1, Ordering::Relaxed); res diff --git a/src/store.rs b/src/store.rs index 54f185c..4499243 100644 --- a/src/store.rs +++ b/src/store.rs @@ -44,7 +44,10 @@ impl Store { let inner = self.inner.clone(); task::spawn_blocking(move || { - let mut c_path_info = inner.store().query_path_info(path.to_string().as_bytes())?; + let mut c_path_info = inner + .store() + .query_path_info(path.to_string().as_bytes()) + .context("query cpp for path info")?; let signatures = c_path_info .pin_mut() @@ -60,7 +63,8 @@ impl Store { .references() .into_iter() .map(|x| StorePath::from_bytes(x.as_bytes())) - .collect::>()?; + .collect::>() + .context("get references from pathinfo")?; Ok(PathInfo { path, diff --git a/src/uploader.rs b/src/uploader.rs index cacae2b..eb955a2 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -23,6 +23,7 @@ pub struct Uploader<'a> { path: PathInfo, s3_client: &'a s3::Client, bucket: String, + hash: Sha256, } impl<'a> Uploader<'a> { @@ -37,6 +38,7 @@ impl<'a> Uploader<'a> { path, s3_client, bucket, + hash: Sha256::new(), }) } From a17fa92c78a6355adb70c9645627d9f42dc1a7c4 Mon Sep 17 00:00:00 2001 From: cy Date: Thu, 17 Apr 2025 22:25:36 -0400 Subject: [PATCH 7/9] some progress at an attempt --- src/bindings/mod.rs | 7 +++++++ src/bindings/nix.cpp | 11 +++++++++++ src/bindings/nix.hpp | 1 + src/store.rs | 20 +++++++++++++++++++- src/uploader.rs | 10 ---------- 5 files changed, 38 insertions(+), 11 deletions(-) diff --git a/src/bindings/mod.rs b/src/bindings/mod.rs index 61a32af..701a15e 100644 --- a/src/bindings/mod.rs +++ b/src/bindings/mod.rs @@ -211,6 +211,13 @@ mod ffi { /// Obtains a handle to the Nix store. fn open_nix_store() -> Result>; + /// Creates a NAR dump from a path. + fn nar_from_path( + self: Pin<&mut CNixStore>, + base_name: Vec, + sender: Box, + ) -> Result<()>; + // ========= // CPathInfo // ========= diff --git a/src/bindings/nix.cpp b/src/bindings/nix.cpp index 3914de1..326e878 100644 --- a/src/bindings/nix.cpp +++ b/src/bindings/nix.cpp @@ -108,6 +108,17 @@ std::unique_ptr> CNixStore::compute_fs_closure(RBasePat return std::make_unique>(result); } +void CNixStore::nar_from_path(RVec base_name, RBox sender) { + RustSink sink(std::move(sender)); + + std::string_view sv((const char *)base_name.data(), base_name.size()); + nix::StorePath store_path(sv); + + // exceptions will be thrown into Rust + this->store->narFromPath(store_path, sink); + sink.eof(); +} + std::unique_ptr open_nix_store() { return std::make_unique(); } diff --git a/src/bindings/nix.hpp b/src/bindings/nix.hpp index 9f4964a..5c79a33 100644 --- a/src/bindings/nix.hpp +++ b/src/bindings/nix.hpp @@ -79,6 +79,7 @@ public: bool flip_direction, bool include_outputs, bool include_derivers); + void nar_from_path(RVec base_name, RBox sender); }; std::unique_ptr open_nix_store(); diff --git a/src/store.rs b/src/store.rs index 4499243..9aa5b44 100644 --- a/src/store.rs +++ b/src/store.rs @@ -4,7 +4,10 @@ use anyhow::{Context, Result}; use nix_compat::store_path::StorePath; use tokio::task; -use crate::{bindings, path_info::PathInfo}; +use crate::{ + bindings::{self, AsyncWriteAdapter}, + path_info::PathInfo, +}; pub struct Store { inner: Arc, @@ -75,4 +78,19 @@ impl Store { .await .unwrap() } + + pub fn make_nar(&self, path: StorePath) -> AsyncWriteAdapter { + let inner = self.inner.clone(); + let (adapter, mut sender) = AsyncWriteAdapter::new(); + + task::spawn_blocking(move || { + if let Err(e) = inner + .store() + .nar_from_path(path.to_string().as_bytes().to_vec(), sender.clone()) + { + let _ = sender.rust_error(e); + } + }); + adapter + } } diff --git a/src/uploader.rs b/src/uploader.rs index eb955a2..f77a3d3 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -139,16 +139,6 @@ impl<'a> Uploader<'a> { Ok(()) } - async fn make_nar(&self) -> Result> { - Ok(Command::new("nix") - .arg("nar") - .arg("dump-path") - .arg(self.path.absolute_path()) - .output() - .await? - .stdout) - } - fn narinfo_from_nar(&self, nar: &[u8]) -> Result { let mut hasher = Sha256::new(); hasher.update(nar); From 4808671071555f50e979e2cbf35295b16cd259ca Mon Sep 17 00:00:00 2001 From: cy Date: Fri, 18 Apr 2025 21:23:52 -0400 Subject: [PATCH 8/9] some progress on using nix-compat for nar creation --- Cargo.lock | 4 ++ Cargo.toml | 4 ++ src/store.rs | 2 +- src/uploader.rs | 115 +++++++++++++++++++++++++++++++++++------------- 4 files changed, 94 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9fa7a11..2608cf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2228,6 +2228,7 @@ dependencies = [ "async-compression", "aws-config", "aws-sdk-s3", + "bytes", "clap", "console-subscriber", "cxx", @@ -2241,7 +2242,10 @@ dependencies = [ "serde", "serde_json", "sha2", + "tempfile", "tokio", + "tokio-stream", + "tokio-util", "tracing", "url", ] diff --git a/Cargo.toml b/Cargo.toml index ee646cf..2430fdd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,10 @@ tracing = "0.1.41" url = { version = "2.5.4", features = [ "serde" ]} cxx = "1.0" console-subscriber = "0.4.1" +bytes = "1.10.1" +tokio-stream = { version = "0.1.17", features = ["fs"] } +tempfile = "3.19.1" +tokio-util = { version = "0.7.14", features = ["io"] } [build-dependencies] cxx-build = "1.0" diff --git a/src/store.rs b/src/store.rs index 9aa5b44..0433362 100644 --- a/src/store.rs +++ b/src/store.rs @@ -79,7 +79,7 @@ impl Store { .unwrap() } - pub fn make_nar(&self, path: StorePath) -> AsyncWriteAdapter { + pub fn stream_nar(&self, path: StorePath) -> AsyncWriteAdapter { let inner = self.inner.clone(); let (adapter, mut sender) = AsyncWriteAdapter::new(); diff --git a/src/uploader.rs b/src/uploader.rs index f77a3d3..390c3ac 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -1,20 +1,30 @@ +use std::{collections::BTreeMap, os::unix::fs::PermissionsExt, path::PathBuf}; + use anyhow::Result; use async_compression::{Level, tokio::bufread::ZstdEncoder}; use aws_sdk_s3::{ self as s3, types::{CompletedMultipartUpload, CompletedPart}, }; -use futures::future::join_all; +use bytes::{BufMut, Bytes, BytesMut}; +use futures::{future::join_all, stream::TryStreamExt}; use nix_compat::{ + nar::writer::r#async as nar, narinfo::{self, NarInfo, SigningKey}, nixbase32, store_path::StorePath, }; use sha2::{Digest, Sha256}; -use tokio::{io::AsyncReadExt, process::Command}; +use tokio::{ + fs::{File, read_dir, read_link}, + io::{AsyncRead, BufReader}, + pin, +}; +use tokio_stream::wrappers::ReadDirStream; +use tokio_util::io::InspectReader; use tracing::debug; -use crate::path_info::PathInfo; +use crate::{bindings::AsyncWriteAdapter, path_info::PathInfo, store::Store}; const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5; @@ -23,7 +33,7 @@ pub struct Uploader<'a> { path: PathInfo, s3_client: &'a s3::Client, bucket: String, - hash: Sha256, + store: &'a Store, } impl<'a> Uploader<'a> { @@ -32,38 +42,28 @@ impl<'a> Uploader<'a> { path: PathInfo, s3_client: &'a s3::Client, bucket: String, + store: &'a Store, ) -> Result { Ok(Self { signing_key, path, s3_client, bucket, - hash: Sha256::new(), + store, }) } pub async fn upload(&self) -> Result<()> { - let nar = self.make_nar().await?; - let mut nar_info = self.narinfo_from_nar(&nar)?; - let nar = self.compress_nar(&nar).await; + let mut nar_temp = File::open(tempfile::Builder::new().tempfile()?.path()).await?; + self.make_nar(&mut nar_temp); - // update fields that we know after compression - let mut hasher = Sha256::new(); - hasher.update(&nar); - let hash: [u8; 32] = hasher.finalize().into(); - let nar_url = self.nar_url(&hash); - nar_info.file_hash = Some(hash); - nar_info.file_size = Some(nar.len() as u64); - nar_info.url = nar_url.as_str(); - debug!("uploading nar with key: {nar_url}"); - - if nar.len() < MULTIPART_CUTOFF { + if first_chunk.len() < MULTIPART_CUTOFF { let put_object = self .s3_client .put_object() .bucket(&self.bucket) .key(&nar_url) - .body(nar.into()) + .body(first_chunk.into()) .send() .await?; debug!("put object: {:#?}", put_object); @@ -164,17 +164,72 @@ impl<'a> Uploader<'a> { } fn nar_url(&self, compressed_nar_hash: &[u8]) -> String { - let compressed_nar_hash = nixbase32::encode(compressed_nar_hash); - format!("nar/{compressed_nar_hash}.nar.zst") + format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash)) } - async fn compress_nar(&self, nar: &[u8]) -> Vec { - let mut encoder = ZstdEncoder::with_quality(nar, Level::Default); - let mut compressed = Vec::with_capacity(nar.len()); - encoder - .read_to_end(&mut compressed) - .await - .expect("should compress just fine"); - compressed + async fn make_nar(&self, sink: &mut File) -> Result<()> { + let nar = nar::open(sink).await?; + let path = self.path.absolute_path(); + let metadata = File::open(&path).await?.metadata().await?; + + if metadata.is_symlink() { + let target = read_link(&path).await?; + nar.symlink(target.as_os_str().as_encoded_bytes()).await; + } else if metadata.is_dir() { + let mut nar = nar.directory().await?; + nar_from_dir(path.into(), &mut nar).await; + nar.close().await; + } else if metadata.is_file() { + let perms = metadata.permissions().mode(); + let mut executable = false; + if (perms & 0o700) == 0o700 { + executable = true; + } + + let mut file = BufReader::new(File::open(&path).await?); + nar.file(executable, metadata.len(), &mut file).await; + } + + Ok(()) } } + +async fn nar_from_dir(path: PathBuf, node: &mut nar::Directory<'_, '_>) -> Result<()> { + let root = ReadDirStream::new(read_dir(&path).await?); + let entries = root + .map_ok(|x| (x.file_name(), x)) + .try_collect::>() + .await?; + + // directory entries must be written in ascending order of name + for (name, entry) in entries.iter() { + let node = node.entry(name.as_encoded_bytes()).await?; + let metadata = entry.metadata().await?; + + if metadata.is_symlink() { + let target = read_link(entry.path()).await?; + node.symlink(target.as_os_str().as_encoded_bytes()).await; + } else if metadata.is_dir() { + let mut node = node.directory().await?; + Box::pin(nar_from_dir(entry.path(), &mut node)).await; + node.close().await; + } else if metadata.is_file() { + let perms = metadata.permissions().mode(); + let mut executable = false; + if (perms & 0o700) == 0o700 { + executable = true; + } + + let mut file = BufReader::new(File::open(entry.path()).await?); + node.file(executable, metadata.len(), &mut file).await; + } + } + Ok(()) +} + +async fn compress_and_hash_nar(nar: File, nar_hasher: &mut Sha256) -> impl AsyncRead { + let nar_reader = InspectReader::new(nar, |x| nar_hasher.update(x)); + let nar_buf_reader = BufReader::new(nar_reader); + + ZstdEncoder::with_quality(nar_buf_reader, Level::Default) +} From ac4b2ba136e7fa82336aa91c2088ff407e49e603 Mon Sep 17 00:00:00 2001 From: cy Date: Sat, 26 Apr 2025 00:39:22 -0400 Subject: [PATCH 9/9] fancy nar cooking --- src/push.rs | 2 +- src/uploader.rs | 43 ++++++++++++++++++++++++++++++++++--------- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/push.rs b/src/push.rs index 8177864..18f74b9 100644 --- a/src/push.rs +++ b/src/push.rs @@ -160,7 +160,7 @@ impl Push { } async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { - let mut uploads = Vec::with_capacity(10); + let mut uploads = Vec::new(); loop { if let Some(path_to_upload) = rx.recv().await { diff --git a/src/uploader.rs b/src/uploader.rs index 390c3ac..8f4efaa 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -21,7 +21,7 @@ use tokio::{ pin, }; use tokio_stream::wrappers::ReadDirStream; -use tokio_util::io::InspectReader; +use tokio_util::io::{InspectReader, read_buf}; use tracing::debug; use crate::{bindings::AsyncWriteAdapter, path_info::PathInfo, store::Store}; @@ -55,7 +55,16 @@ impl<'a> Uploader<'a> { pub async fn upload(&self) -> Result<()> { let mut nar_temp = File::open(tempfile::Builder::new().tempfile()?.path()).await?; - self.make_nar(&mut nar_temp); + self.make_nar(&mut nar_temp).await; + + // this goes to the .narinfo file + let mut nar_hasher = Sha256::new(); + // this is the URL for file .narinfo points to + let mut file_hasher = Sha256::new(); + let nar_reader = compress_and_hash_nar(nar_temp, &mut nar_hasher, &mut file_hasher); + + let buf = BytesMut::with_capacity(MULTIPART_CUTOFF); + let if first_chunk.len() < MULTIPART_CUTOFF { let put_object = self @@ -163,10 +172,6 @@ impl<'a> Uploader<'a> { Ok(nar_info) } - fn nar_url(&self, compressed_nar_hash: &[u8]) -> String { - format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash)) - } - async fn make_nar(&self, sink: &mut File) -> Result<()> { let nar = nar::open(sink).await?; let path = self.path.absolute_path(); @@ -227,9 +232,29 @@ async fn nar_from_dir(path: PathBuf, node: &mut nar::Directory<'_, '_>) -> Resul Ok(()) } -async fn compress_and_hash_nar(nar: File, nar_hasher: &mut Sha256) -> impl AsyncRead { - let nar_reader = InspectReader::new(nar, |x| nar_hasher.update(x)); +fn compress_and_hash_nar( + nar_file: File, + nar_hasher: &mut Sha256, + compressed_nar_hasher: &mut Sha256, +) -> impl AsyncRead { + let nar_reader = InspectReader::new(nar_file, |x| nar_hasher.update(x)); let nar_buf_reader = BufReader::new(nar_reader); - ZstdEncoder::with_quality(nar_buf_reader, Level::Default) + let compressed_nar_reader = ZstdEncoder::with_quality(nar_buf_reader, Level::Default); + InspectReader::new(compressed_nar_reader, |x| compressed_nar_hasher.update(x)) +} + +fn nar_url(compressed_nar_hash: &[u8]) -> String { + format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash)) +} + +async fn read_buf_nar(stream: &mut S, mut buf: BytesMut) -> Result { + while buf.len() < buf.capacity() { + let n = read_buf(stream, &mut buf).await?; + + if n == 0 { + break; + } + } + Ok(buf.freeze()) }