use nix path-info cmd for derivation; console_subscriber
This commit is contained in:
parent
6806b96892
commit
b1e59d0a6c
8 changed files with 421 additions and 82 deletions
|
@ -5,7 +5,6 @@ use std::path::PathBuf;
|
|||
|
||||
use anyhow::{Context, Result};
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
use tracing_subscriber::{EnvFilter, FmtSubscriber};
|
||||
|
||||
use push::Push;
|
||||
use store::Store;
|
||||
|
@ -73,9 +72,7 @@ pub struct PushArgs {
|
|||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let filter = EnvFilter::from_default_env();
|
||||
let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish();
|
||||
tracing::subscriber::set_global_default(subscriber)?;
|
||||
console_subscriber::init();
|
||||
|
||||
let cli = Cli::parse();
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ use nix_compat::nixbase32;
|
|||
use nix_compat::store_path::StorePath;
|
||||
use regex::Regex;
|
||||
use std::path::Path;
|
||||
use tokio::process::Command;
|
||||
use tracing::{debug, trace};
|
||||
use url::Url;
|
||||
|
||||
|
@ -22,9 +23,29 @@ pub struct PathInfo {
|
|||
impl PathInfo {
|
||||
pub async fn from_path(path: &Path, store: &Store) -> Result<Self> {
|
||||
debug!("query path info for {:?}", path);
|
||||
let canon = path.canonicalize().context("canonicalize path")?;
|
||||
let store_path = StorePath::from_absolute_path(canon.into_os_string().as_encoded_bytes())?;
|
||||
store.query_path_info(store_path).await
|
||||
|
||||
let derivation = match path.extension() {
|
||||
Some(ext) if ext == "drv" => path.as_os_str().as_encoded_bytes(),
|
||||
_ => {
|
||||
&Command::new("nix")
|
||||
.arg("path-info")
|
||||
.arg("--derivation")
|
||||
.arg(path)
|
||||
.output()
|
||||
.await
|
||||
.context(format!("run command: nix path-info --derivaiton {path:?}"))?
|
||||
.stdout
|
||||
}
|
||||
};
|
||||
let derivation = String::from_utf8_lossy(derivation);
|
||||
debug!("derivation: {derivation}");
|
||||
|
||||
let store_path = StorePath::from_absolute_path(derivation.trim().as_bytes())
|
||||
.context("storepath from derivation")?;
|
||||
store
|
||||
.query_path_info(store_path)
|
||||
.await
|
||||
.context("query pathinfo for derivation")
|
||||
}
|
||||
|
||||
pub async fn get_closure(&self, store: &Store) -> Result<Vec<Self>> {
|
||||
|
|
17
src/push.rs
17
src/push.rs
|
@ -13,8 +13,8 @@ use aws_config::Region;
|
|||
use aws_sdk_s3 as s3;
|
||||
use futures::future::join_all;
|
||||
use nix_compat::narinfo::{self, SigningKey};
|
||||
use tokio::sync::{RwLock, Semaphore, mpsc};
|
||||
use tracing::{debug, trace};
|
||||
use tokio::sync::{RwLock, mpsc};
|
||||
use tracing::debug;
|
||||
use url::Url;
|
||||
|
||||
use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
|
||||
|
@ -124,7 +124,7 @@ impl Push {
|
|||
|
||||
for path in store_paths.into_iter() {
|
||||
if path.check_upstream_signature(&self.upstream_caches) {
|
||||
trace!("skip {} (signature match)", path.absolute_path());
|
||||
debug!("skip {} (signature match)", path.absolute_path());
|
||||
self.signature_hit_count.fetch_add(1, Ordering::Release);
|
||||
continue;
|
||||
}
|
||||
|
@ -139,13 +139,13 @@ impl Push {
|
|||
.check_if_already_exists(&self.s3_client, self.bucket.clone())
|
||||
.await
|
||||
{
|
||||
trace!("skip {} (already exists)", path.absolute_path());
|
||||
debug!("skip {} (already exists)", path.absolute_path());
|
||||
self.already_exists_count.fetch_add(1, Ordering::Relaxed);
|
||||
} else {
|
||||
tx.send(path).await.unwrap();
|
||||
}
|
||||
} else {
|
||||
trace!("skip {} (upstream hit)", path.absolute_path());
|
||||
debug!("skip {} (upstream hit)", path.absolute_path());
|
||||
self.upstream_hit_count.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
})
|
||||
|
@ -161,13 +161,11 @@ impl Push {
|
|||
|
||||
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||
let mut uploads = Vec::with_capacity(10);
|
||||
let permits = Arc::new(Semaphore::new(10));
|
||||
|
||||
loop {
|
||||
if let Some(path_to_upload) = rx.recv().await {
|
||||
let absolute_path = path_to_upload.absolute_path();
|
||||
println!("uploading: {}", path_to_upload.absolute_path());
|
||||
|
||||
println!("uploading: {}", absolute_path);
|
||||
let uploader = Uploader::new(
|
||||
&self.signing_key,
|
||||
path_to_upload,
|
||||
|
@ -176,10 +174,7 @@ impl Push {
|
|||
)?;
|
||||
|
||||
uploads.push(tokio::spawn({
|
||||
let permits = permits.clone();
|
||||
|
||||
async move {
|
||||
let _permit = permits.acquire().await;
|
||||
let res = uploader.upload().await;
|
||||
self.upload_count.fetch_add(1, Ordering::Relaxed);
|
||||
res
|
||||
|
|
|
@ -44,7 +44,10 @@ impl Store {
|
|||
let inner = self.inner.clone();
|
||||
|
||||
task::spawn_blocking(move || {
|
||||
let mut c_path_info = inner.store().query_path_info(path.to_string().as_bytes())?;
|
||||
let mut c_path_info = inner
|
||||
.store()
|
||||
.query_path_info(path.to_string().as_bytes())
|
||||
.context("query cpp for path info")?;
|
||||
|
||||
let signatures = c_path_info
|
||||
.pin_mut()
|
||||
|
@ -60,7 +63,8 @@ impl Store {
|
|||
.references()
|
||||
.into_iter()
|
||||
.map(|x| StorePath::from_bytes(x.as_bytes()))
|
||||
.collect::<Result<_, _>>()?;
|
||||
.collect::<Result<_, _>>()
|
||||
.context("get references from pathinfo")?;
|
||||
|
||||
Ok(PathInfo {
|
||||
path,
|
||||
|
|
|
@ -23,6 +23,7 @@ pub struct Uploader<'a> {
|
|||
path: PathInfo,
|
||||
s3_client: &'a s3::Client,
|
||||
bucket: String,
|
||||
hash: Sha256,
|
||||
}
|
||||
|
||||
impl<'a> Uploader<'a> {
|
||||
|
@ -37,6 +38,7 @@ impl<'a> Uploader<'a> {
|
|||
path,
|
||||
s3_client,
|
||||
bucket,
|
||||
hash: Sha256::new(),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue