use tokio::sync::mpsc

This commit is contained in:
cy 2025-04-03 15:40:57 -04:00
parent 5fa60e6bfc
commit 722609b993
Signed by: cy
SSH key fingerprint: SHA256:o/geVWV4om1QhUSkKvDQeW/eAihwnjyXkqMwrVdbuts

View file

@ -1,7 +1,6 @@
#![feature(let_chains)] #![feature(let_chains)]
use std::path::Path; use std::path::Path;
use std::sync::mpsc;
use std::sync::{ use std::sync::{
Arc, Mutex, RwLock, Arc, Mutex, RwLock,
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
@ -11,7 +10,7 @@ use clap::Parser;
use log::{debug, trace}; use log::{debug, trace};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::process::Command; use tokio::process::Command;
use tokio::sync::Semaphore; use tokio::sync::{Semaphore, mpsc};
const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"]; const UPSTREAM_CACHES: &[&str] = &["https://cache.nixos.org"];
@ -138,7 +137,7 @@ async fn main() {
} }
println!("got {} store paths", store_paths.read().unwrap().len()); println!("got {} store paths", store_paths.read().unwrap().len());
let (cacheable_tx, cacheable_rx) = mpsc::channel(); let (cacheable_tx, mut cacheable_rx) = mpsc::channel(cli.uploader_concurrency.into());
println!("spawning check_upstream"); println!("spawning check_upstream");
handles = Vec::new(); handles = Vec::new();
@ -154,7 +153,7 @@ async fn main() {
println!("spawning uploader"); println!("spawning uploader");
handles.push(tokio::spawn(async move { handles.push(tokio::spawn(async move {
uploader(cacheable_rx, binary_cache, cli.uploader_concurrency).await; uploader(&mut cacheable_rx, binary_cache, cli.uploader_concurrency).await;
})); }));
// make sure all threads are done // make sure all threads are done
@ -210,20 +209,24 @@ async fn check_upstream(
} }
if !hit { if !hit {
trace!("sending {}", store_path); trace!("sending {}", store_path);
tx.send(store_path).unwrap(); tx.send(store_path).await.unwrap();
} }
}); });
} }
} }
async fn uploader(cacheable_rx: mpsc::Receiver<String>, binary_cache: String, concurrency: u8) { async fn uploader(
cacheable_rx: &mut mpsc::Receiver<String>,
binary_cache: String,
concurrency: u8,
) {
let upload_count = Arc::new(AtomicUsize::new(0)); let upload_count = Arc::new(AtomicUsize::new(0));
let failures: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new())); let failures: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let concurrency = Arc::new(Semaphore::new(concurrency.into())); let concurrency = Arc::new(Semaphore::new(concurrency.into()));
let mut handles = Vec::new(); let mut handles = Vec::new();
loop { loop {
if let Ok(path_to_upload) = cacheable_rx.recv() { if let Some(path_to_upload) = cacheable_rx.recv().await {
let concurrency = Arc::clone(&concurrency); let concurrency = Arc::clone(&concurrency);
let failures = Arc::clone(&failures); let failures = Arc::clone(&failures);
let binary_cache = binary_cache.clone(); let binary_cache = binary_cache.clone();