use libstore cxx bindings

This commit is contained in:
cy 2025-04-16 03:47:42 -04:00
parent 8ac9253ea3
commit a771785352
Signed by: cy
SSH key fingerprint: SHA256:o/geVWV4om1QhUSkKvDQeW/eAihwnjyXkqMwrVdbuts
4 changed files with 152 additions and 161 deletions

View file

@ -1,6 +1,7 @@
use std::{
fs,
iter::once,
path::PathBuf,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
@ -13,10 +14,10 @@ use aws_sdk_s3 as s3;
use futures::future::join_all;
use nix_compat::narinfo::{self, SigningKey};
use tokio::sync::{RwLock, mpsc};
use tracing::{debug, info, trace};
use tracing::{debug, trace};
use url::Url;
use crate::{PushArgs, path_info::PathInfo, uploader::Uploader};
use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader};
pub struct Push {
upstream_caches: Vec<Url>,
@ -24,6 +25,7 @@ pub struct Push {
s3_client: s3::Client,
signing_key: SigningKey<ed25519_dalek::SigningKey>,
bucket: String,
store: Arc<Store>,
// paths that we skipped cause of a signature match
signature_hit_count: AtomicUsize,
// paths that we skipped cause we found it on an upstream
@ -35,7 +37,7 @@ pub struct Push {
}
impl Push {
pub async fn new(cli: &PushArgs) -> Result<Self> {
pub async fn new(cli: &PushArgs, store: Store) -> Result<Self> {
let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1);
for upstream in cli
.upstreams
@ -67,6 +69,7 @@ impl Push {
s3_client,
signing_key,
bucket: cli.bucket.clone(),
store: Arc::new(store),
signature_hit_count: AtomicUsize::new(0),
upstream_hit_count: AtomicUsize::new(0),
already_exists_count: AtomicUsize::new(0),
@ -74,18 +77,33 @@ impl Push {
})
}
pub async fn paths_from_package(&mut self, package: &str) -> Result<()> {
let path_info = PathInfo::from_path(package)
pub async fn add_paths(&'static self, paths: Vec<PathBuf>) -> Result<()> {
let mut futs = Vec::with_capacity(paths.len());
for path in paths {
let store_paths = self.store_paths.clone();
let store = self.store.clone();
futs.push(tokio::spawn(async move {
let path_info = PathInfo::from_path(path.as_path(), &store)
.await
.context("get path info for path")?;
debug!("path-info for {path:?}: {path_info:?}");
store_paths.write().await.extend(
path_info
.get_closure(&store)
.await
.context("closure from path info")?,
);
Ok(())
}));
}
join_all(futs)
.await
.context("get path info for package")?;
debug!("path-info for {package}: {:?}", path_info);
self.store_paths.write().await.extend(
path_info
.get_closure()
.await
.context("closure from path info")?,
);
info!("found {} store paths", self.store_paths.read().await.len());
.into_iter()
.flatten()
.collect::<Result<Vec<_>>>()?;
println!("found {} store paths", self.store_paths.read().await.len());
Ok(())
}