Compare commits

..

No commits in common. "b47a778b9eeee968c53f103e67344548ed2d032d" and "48e44628fb3246cf06947295604d041791a00597" have entirely different histories.

3 changed files with 28 additions and 36 deletions

View file

@ -2,34 +2,22 @@
#![feature(extend_one)] #![feature(extend_one)]
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use clap::{Args, Parser, Subcommand}; use clap::{Parser, Subcommand};
use tracing_subscriber::{EnvFilter, FmtSubscriber}; use tracing_subscriber::{EnvFilter, FmtSubscriber};
use push::Push; use nixcp::NixCp;
mod cli; mod cli;
mod nixcp;
mod path_info; mod path_info;
mod push;
mod uploader; mod uploader;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(version)] #[command(version, name = "nixcp")]
#[command(name = "nixcp")]
#[command(about = "Upload store paths to a s3 binary cache")]
#[command(long_about = None)]
struct Cli { struct Cli {
#[command(subcommand)] #[command(subcommand)]
command: Commands, command: Commands,
}
#[derive(Debug, Subcommand)]
enum Commands {
#[command(arg_required_else_help = true)]
Push(PushArgs),
}
#[derive(Debug, Args)]
pub struct PushArgs {
/// The s3 bucket to upload to /// The s3 bucket to upload to
#[arg(long, value_name = "bucket name")] #[arg(long, value_name = "bucket name")]
bucket: String, bucket: String,
@ -56,14 +44,16 @@ pub struct PushArgs {
/// AWS profile to use /// AWS profile to use
#[arg(long)] #[arg(long)]
profile: Option<String>, profile: Option<String>,
}
#[arg(long)] #[derive(Debug, Subcommand)]
skip_signature_check: bool, enum Commands {
Push {
/// Package or store path to upload /// Package or store path to upload
/// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 /// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1
#[arg(value_name = "package or store path")] #[arg(value_name = "package or store path")]
package: String, package: String,
},
} }
#[tokio::main] #[tokio::main]
@ -73,14 +63,15 @@ async fn main() -> Result<()> {
tracing::subscriber::set_global_default(subscriber)?; tracing::subscriber::set_global_default(subscriber)?;
let cli = Cli::parse(); let cli = Cli::parse();
let nixcp = Box::leak(Box::new(NixCp::new(&cli).await?));
match &cli.command { match &cli.command {
Commands::Push(cli) => { Commands::Push { package } => {
let push = Box::leak(Box::new(Push::new(cli).await?)); nixcp
push.paths_from_package(&cli.package) .paths_from_package(package)
.await .await
.context("nixcp get paths from package")?; .context("nixcp get paths from package")?;
push.run().await.context("nixcp run")?; nixcp.run().await.context("nixcp run")?;
} }
} }

View file

@ -16,9 +16,9 @@ use tokio::sync::{RwLock, Semaphore, mpsc};
use tracing::{debug, info, trace}; use tracing::{debug, info, trace};
use url::Url; use url::Url;
use crate::{PushArgs, path_info::PathInfo, uploader::Uploader}; use crate::{Cli, path_info::PathInfo, uploader::Uploader};
pub struct Push { pub struct NixCp {
upstream_caches: Vec<Url>, upstream_caches: Vec<Url>,
store_paths: Arc<RwLock<Vec<PathInfo>>>, store_paths: Arc<RwLock<Vec<PathInfo>>>,
s3_client: s3::Client, s3_client: s3::Client,
@ -32,8 +32,8 @@ pub struct Push {
already_exists_count: AtomicUsize, already_exists_count: AtomicUsize,
} }
impl Push { impl NixCp {
pub async fn new(cli: &PushArgs) -> Result<Self> { pub async fn new(cli: &Cli) -> Result<Self> {
let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1); let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1);
for upstream in cli for upstream in cli
.upstreams .upstreams
@ -160,10 +160,11 @@ impl Push {
self.bucket.clone(), self.bucket.clone(),
)?; )?;
uploads.push(tokio::spawn(async move { let fut = tokio::spawn({
let _permit = permits.acquire().await.unwrap(); let _permit = permits.acquire().await.unwrap();
uploader.upload().await async move { uploader.upload().await }
})); });
uploads.push(fut);
} else { } else {
join_all(uploads) join_all(uploads)
.await .await

View file

@ -125,13 +125,13 @@ impl PathInfo {
} }
pub async fn check_if_already_exists(&self, s3_client: &s3::Client, bucket: String) -> bool { pub async fn check_if_already_exists(&self, s3_client: &s3::Client, bucket: String) -> bool {
s3_client !s3_client
.head_object() .head_object()
.bucket(bucket) .bucket(bucket)
.key(format!("{}.narinfo", self.digest())) .key(format!("{}.narinfo", self.digest()))
.send() .send()
.await .await
.is_ok() .is_err()
} }
} }