make things faster
This commit is contained in:
parent
00a69b810e
commit
c77e76014f
1 changed files with 67 additions and 45 deletions
112
src/main.rs
112
src/main.rs
|
@ -1,3 +1,5 @@
|
||||||
|
#![feature(let_chains)]
|
||||||
|
|
||||||
use std::process::{Command, Stdio};
|
use std::process::{Command, Stdio};
|
||||||
use std::sync::mpsc;
|
use std::sync::mpsc;
|
||||||
use std::{env, path::Path};
|
use std::{env, path::Path};
|
||||||
|
@ -5,7 +7,7 @@ use std::{env, path::Path};
|
||||||
use log::{debug, trace};
|
use log::{debug, trace};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json;
|
use serde_json;
|
||||||
use tokio;
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
const UPSTREAM_CACHES: &'static [&'static str] = &[
|
const UPSTREAM_CACHES: &'static [&'static str] = &[
|
||||||
"https://cache.nixos.org",
|
"https://cache.nixos.org",
|
||||||
|
@ -75,71 +77,91 @@ async fn main() {
|
||||||
let store_paths = path_infos[0].get_store_paths();
|
let store_paths = path_infos[0].get_store_paths();
|
||||||
let (cacheable_tx, cacheable_rx) = mpsc::channel();
|
let (cacheable_tx, cacheable_rx) = mpsc::channel();
|
||||||
|
|
||||||
|
let mut handles = Vec::new();
|
||||||
|
|
||||||
println!("spawning check_upstream");
|
println!("spawning check_upstream");
|
||||||
tokio::spawn(async move {
|
handles.push(tokio::spawn(async move {
|
||||||
check_upstream(store_paths, cacheable_tx).await;
|
check_upstream(store_paths, cacheable_tx).await;
|
||||||
});
|
}));
|
||||||
|
|
||||||
println!("spawning uploader");
|
println!("spawning uploader");
|
||||||
tokio::spawn(async move {
|
handles.push(tokio::spawn(async move {
|
||||||
uploader(cacheable_rx).await;
|
uploader(cacheable_rx).await;
|
||||||
}).await.unwrap();
|
}));
|
||||||
|
|
||||||
|
// make sure all threads are done
|
||||||
|
for handle in handles {
|
||||||
|
handle.await.unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// filter out store paths that exist in upstream caches
|
// filter out store paths that exist in upstream caches
|
||||||
async fn check_upstream(store_paths: Vec<String>, cacheable_tx: mpsc::Sender<String>) {
|
async fn check_upstream(store_paths: Vec<String>, cacheable_tx: mpsc::Sender<String>) {
|
||||||
|
let concurrent = Semaphore::new(50);
|
||||||
for store_path in store_paths {
|
for store_path in store_paths {
|
||||||
let basename = Path::new(&store_path)
|
let _ = concurrent.acquire().await.unwrap();
|
||||||
.file_name()
|
let tx = cacheable_tx.clone();
|
||||||
.unwrap()
|
tokio::spawn(async move {
|
||||||
.to_str()
|
let basename = Path::new(&store_path)
|
||||||
.unwrap()
|
.file_name()
|
||||||
.to_string();
|
|
||||||
let hash = basename.split("-").nth(0).unwrap();
|
|
||||||
|
|
||||||
let mut hit = false;
|
|
||||||
for upstream in UPSTREAM_CACHES {
|
|
||||||
let mut uri = String::from(*upstream);
|
|
||||||
uri.push_str(format!("/{}.narinfo", hash).as_str());
|
|
||||||
|
|
||||||
let res_status = reqwest::Client::new()
|
|
||||||
.head(uri)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.status();
|
.to_str()
|
||||||
|
.unwrap()
|
||||||
|
.to_string();
|
||||||
|
let hash = basename.split("-").nth(0).unwrap();
|
||||||
|
|
||||||
if res_status.is_success() {
|
let mut hit = false;
|
||||||
debug!("{} was a hit upstream: {}", store_path, upstream);
|
for upstream in UPSTREAM_CACHES {
|
||||||
hit = true;
|
let mut uri = String::from(*upstream);
|
||||||
break;
|
uri.push_str(format!("/{}.narinfo", hash).as_str());
|
||||||
|
|
||||||
|
let res_status = reqwest::Client::new()
|
||||||
|
.head(uri)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map(|x| x.status());
|
||||||
|
|
||||||
|
if let Ok(res_status) = res_status && res_status.is_success() {
|
||||||
|
println!("{} was a hit upstream: {}", store_path, upstream);
|
||||||
|
hit = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
if !hit {
|
||||||
if !hit {
|
trace!("sending {}", store_path);
|
||||||
trace!("sending {}", store_path);
|
tx.send(store_path).unwrap();
|
||||||
cacheable_tx.send(store_path).unwrap();
|
}
|
||||||
}
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn uploader(cacheable_rx: mpsc::Receiver<String>) {
|
async fn uploader(cacheable_rx: mpsc::Receiver<String>) {
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
|
let concurrent = Semaphore::new(10);
|
||||||
|
let mut handles = Vec::new();
|
||||||
loop {
|
loop {
|
||||||
if let Ok(path_to_upload) = cacheable_rx.recv() {
|
if let Ok(path_to_upload) = cacheable_rx.recv() {
|
||||||
trace!("to upload: {}", path_to_upload);
|
let _ = concurrent.acquire().await.unwrap();
|
||||||
if Command::new("nix")
|
handles.push(tokio::spawn(async move {
|
||||||
.arg("copy")
|
println!("uploading: {}", path_to_upload);
|
||||||
.arg("--to")
|
if Command::new("nix")
|
||||||
.arg("s3://nixcache?endpoint=s3.cy7.sh&secret-key=/home/yt/cache-priv-key.pem")
|
.arg("copy")
|
||||||
.arg(&path_to_upload)
|
.arg("--to")
|
||||||
.output()
|
.arg("s3://nixcache?endpoint=s3.cy7.sh&secret-key=/home/yt/cache-priv-key.pem")
|
||||||
.is_err()
|
.arg(&path_to_upload)
|
||||||
{
|
.output()
|
||||||
println!("WARN: upload failed: {}", path_to_upload);
|
.is_err()
|
||||||
} else {
|
{
|
||||||
count += 1;
|
println!("WARN: upload failed: {}", path_to_upload);
|
||||||
}
|
} else {
|
||||||
|
count += 1;
|
||||||
|
}
|
||||||
|
}));
|
||||||
} else {
|
} else {
|
||||||
|
// make sure all threads are done
|
||||||
|
for handle in handles {
|
||||||
|
handle.await.unwrap();
|
||||||
|
}
|
||||||
println!("uploaded {} paths", count);
|
println!("uploaded {} paths", count);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue