add check for already exist
This commit is contained in:
parent
57a7ab944b
commit
9a4d237235
2 changed files with 36 additions and 8 deletions
28
src/nixcp.rs
28
src/nixcp.rs
|
@ -19,7 +19,7 @@ use url::Url;
|
||||||
use crate::{Cli, path_info::PathInfo, uploader::Uploader};
|
use crate::{Cli, path_info::PathInfo, uploader::Uploader};
|
||||||
|
|
||||||
pub struct NixCp {
|
pub struct NixCp {
|
||||||
upstream_caches: Arc<Vec<Url>>,
|
upstream_caches: Vec<Url>,
|
||||||
store_paths: Arc<RwLock<Vec<PathInfo>>>,
|
store_paths: Arc<RwLock<Vec<PathInfo>>>,
|
||||||
s3_client: s3::Client,
|
s3_client: s3::Client,
|
||||||
signing_key: SigningKey<ed25519_dalek::SigningKey>,
|
signing_key: SigningKey<ed25519_dalek::SigningKey>,
|
||||||
|
@ -28,6 +28,8 @@ pub struct NixCp {
|
||||||
signature_hit_count: AtomicUsize,
|
signature_hit_count: AtomicUsize,
|
||||||
// paths that we skipped cause we found it on an upstream
|
// paths that we skipped cause we found it on an upstream
|
||||||
upstream_hit_count: AtomicUsize,
|
upstream_hit_count: AtomicUsize,
|
||||||
|
// paths that we skipped cause they are already on our cache
|
||||||
|
already_exists_count: AtomicUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NixCp {
|
impl NixCp {
|
||||||
|
@ -58,13 +60,14 @@ impl NixCp {
|
||||||
|
|
||||||
let s3_client = s3::Client::new(&s3_config.load().await);
|
let s3_client = s3::Client::new(&s3_config.load().await);
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
upstream_caches: Arc::new(upstreams),
|
upstream_caches: upstreams,
|
||||||
store_paths: Arc::new(RwLock::new(Vec::new())),
|
store_paths: Arc::new(RwLock::new(Vec::new())),
|
||||||
s3_client,
|
s3_client,
|
||||||
signing_key,
|
signing_key,
|
||||||
bucket: cli.bucket.clone(),
|
bucket: cli.bucket.clone(),
|
||||||
signature_hit_count: AtomicUsize::new(0),
|
signature_hit_count: AtomicUsize::new(0),
|
||||||
upstream_hit_count: AtomicUsize::new(0),
|
upstream_hit_count: AtomicUsize::new(0),
|
||||||
|
already_exists_count: AtomicUsize::new(0),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -86,7 +89,6 @@ impl NixCp {
|
||||||
|
|
||||||
pub async fn run(&'static self) -> Result<()> {
|
pub async fn run(&'static self) -> Result<()> {
|
||||||
let (tx, rx) = mpsc::channel(10);
|
let (tx, rx) = mpsc::channel(10);
|
||||||
let tx = Arc::new(tx);
|
|
||||||
let filter = tokio::spawn(self.filter_from_upstream(tx));
|
let filter = tokio::spawn(self.filter_from_upstream(tx));
|
||||||
let upload = tokio::spawn(self.upload(rx));
|
let upload = tokio::spawn(self.upload(rx));
|
||||||
filter.await?;
|
filter.await?;
|
||||||
|
@ -95,7 +97,7 @@ impl NixCp {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// filter paths that are on upstream and send to `tx`
|
/// filter paths that are on upstream and send to `tx`
|
||||||
async fn filter_from_upstream(&'static self, tx: Arc<mpsc::Sender<PathInfo>>) {
|
async fn filter_from_upstream(&'static self, tx: mpsc::Sender<PathInfo>) {
|
||||||
let permits = Arc::new(Semaphore::new(10));
|
let permits = Arc::new(Semaphore::new(10));
|
||||||
let mut handles = Vec::with_capacity(10);
|
let mut handles = Vec::with_capacity(10);
|
||||||
let store_paths = self.store_paths.read().await.clone();
|
let store_paths = self.store_paths.read().await.clone();
|
||||||
|
@ -109,12 +111,22 @@ impl NixCp {
|
||||||
handles.push({
|
handles.push({
|
||||||
let permits = permits.clone();
|
let permits = permits.clone();
|
||||||
let tx = tx.clone();
|
let tx = tx.clone();
|
||||||
let upstream_caches = self.upstream_caches.clone();
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let _permit = permits.acquire().await.unwrap();
|
let _permit = permits.acquire().await.unwrap();
|
||||||
|
|
||||||
if !path.check_upstream_hit(upstream_caches.as_slice()).await {
|
if !path
|
||||||
|
.check_upstream_hit(self.upstream_caches.as_slice())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
if path
|
||||||
|
.check_if_already_exists(&self.s3_client, self.bucket.clone())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
trace!("skip {} (already exists)", path.absolute_path());
|
||||||
|
self.already_exists_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
} else {
|
||||||
tx.send(path).await.unwrap();
|
tx.send(path).await.unwrap();
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
trace!("skip {} (upstream hit)", path.absolute_path());
|
trace!("skip {} (upstream hit)", path.absolute_path());
|
||||||
self.upstream_hit_count.fetch_add(1, Ordering::Relaxed);
|
self.upstream_hit_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
@ -169,6 +181,10 @@ impl NixCp {
|
||||||
"skipped because of upstream hit: {}",
|
"skipped because of upstream hit: {}",
|
||||||
self.upstream_hit_count.load(Ordering::Relaxed)
|
self.upstream_hit_count.load(Ordering::Relaxed)
|
||||||
);
|
);
|
||||||
|
println!(
|
||||||
|
"skipped because already exist: {}",
|
||||||
|
self.already_exists_count.load(Ordering::Relaxed)
|
||||||
|
);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
use anyhow::{Context, Error, Result};
|
use anyhow::{Context, Error, Result};
|
||||||
|
use aws_sdk_s3 as s3;
|
||||||
use nix_compat::store_path::StorePath;
|
use nix_compat::store_path::StorePath;
|
||||||
use nix_compat::{nixbase32, nixhash::CAHash};
|
use nix_compat::{nixbase32, nixhash::CAHash};
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
|
@ -99,8 +100,9 @@ impl PathInfo {
|
||||||
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
|
pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool {
|
||||||
for upstream in upstreams {
|
for upstream in upstreams {
|
||||||
let upstream = upstream
|
let upstream = upstream
|
||||||
.join(format!("{}/.narinfo", self.digest()).as_str())
|
.join(format!("{}.narinfo", self.digest()).as_str())
|
||||||
.expect("adding <hash>.narinfo should make a valid url");
|
.expect("adding <hash>.narinfo should make a valid url");
|
||||||
|
debug!("querying {}", upstream);
|
||||||
let res_status = reqwest::Client::new()
|
let res_status = reqwest::Client::new()
|
||||||
.head(upstream.as_str())
|
.head(upstream.as_str())
|
||||||
.send()
|
.send()
|
||||||
|
@ -121,6 +123,16 @@ impl PathInfo {
|
||||||
pub fn digest(&self) -> String {
|
pub fn digest(&self) -> String {
|
||||||
nixbase32::encode(self.path.digest())
|
nixbase32::encode(self.path.digest())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn check_if_already_exists(&self, s3_client: &s3::Client, bucket: String) -> bool {
|
||||||
|
!s3_client
|
||||||
|
.head_object()
|
||||||
|
.bucket(bucket)
|
||||||
|
.key(format!("{}.narinfo", self.digest()))
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.is_err()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue