fancy nar cooking
This commit is contained in:
parent
4808671071
commit
ac4b2ba136
2 changed files with 35 additions and 10 deletions
|
@ -160,7 +160,7 @@ impl Push {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
async fn upload(&'static self, mut rx: mpsc::Receiver<PathInfo>) -> Result<()> {
|
||||||
let mut uploads = Vec::with_capacity(10);
|
let mut uploads = Vec::new();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Some(path_to_upload) = rx.recv().await {
|
if let Some(path_to_upload) = rx.recv().await {
|
||||||
|
|
|
@ -21,7 +21,7 @@ use tokio::{
|
||||||
pin,
|
pin,
|
||||||
};
|
};
|
||||||
use tokio_stream::wrappers::ReadDirStream;
|
use tokio_stream::wrappers::ReadDirStream;
|
||||||
use tokio_util::io::InspectReader;
|
use tokio_util::io::{InspectReader, read_buf};
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
use crate::{bindings::AsyncWriteAdapter, path_info::PathInfo, store::Store};
|
use crate::{bindings::AsyncWriteAdapter, path_info::PathInfo, store::Store};
|
||||||
|
@ -55,7 +55,16 @@ impl<'a> Uploader<'a> {
|
||||||
|
|
||||||
pub async fn upload(&self) -> Result<()> {
|
pub async fn upload(&self) -> Result<()> {
|
||||||
let mut nar_temp = File::open(tempfile::Builder::new().tempfile()?.path()).await?;
|
let mut nar_temp = File::open(tempfile::Builder::new().tempfile()?.path()).await?;
|
||||||
self.make_nar(&mut nar_temp);
|
self.make_nar(&mut nar_temp).await;
|
||||||
|
|
||||||
|
// this goes to the .narinfo file
|
||||||
|
let mut nar_hasher = Sha256::new();
|
||||||
|
// this is the URL for file .narinfo points to
|
||||||
|
let mut file_hasher = Sha256::new();
|
||||||
|
let nar_reader = compress_and_hash_nar(nar_temp, &mut nar_hasher, &mut file_hasher);
|
||||||
|
|
||||||
|
let buf = BytesMut::with_capacity(MULTIPART_CUTOFF);
|
||||||
|
let
|
||||||
|
|
||||||
if first_chunk.len() < MULTIPART_CUTOFF {
|
if first_chunk.len() < MULTIPART_CUTOFF {
|
||||||
let put_object = self
|
let put_object = self
|
||||||
|
@ -163,10 +172,6 @@ impl<'a> Uploader<'a> {
|
||||||
Ok(nar_info)
|
Ok(nar_info)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn nar_url(&self, compressed_nar_hash: &[u8]) -> String {
|
|
||||||
format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn make_nar(&self, sink: &mut File) -> Result<()> {
|
async fn make_nar(&self, sink: &mut File) -> Result<()> {
|
||||||
let nar = nar::open(sink).await?;
|
let nar = nar::open(sink).await?;
|
||||||
let path = self.path.absolute_path();
|
let path = self.path.absolute_path();
|
||||||
|
@ -227,9 +232,29 @@ async fn nar_from_dir(path: PathBuf, node: &mut nar::Directory<'_, '_>) -> Resul
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn compress_and_hash_nar(nar: File, nar_hasher: &mut Sha256) -> impl AsyncRead {
|
fn compress_and_hash_nar(
|
||||||
let nar_reader = InspectReader::new(nar, |x| nar_hasher.update(x));
|
nar_file: File,
|
||||||
|
nar_hasher: &mut Sha256,
|
||||||
|
compressed_nar_hasher: &mut Sha256,
|
||||||
|
) -> impl AsyncRead {
|
||||||
|
let nar_reader = InspectReader::new(nar_file, |x| nar_hasher.update(x));
|
||||||
let nar_buf_reader = BufReader::new(nar_reader);
|
let nar_buf_reader = BufReader::new(nar_reader);
|
||||||
|
|
||||||
ZstdEncoder::with_quality(nar_buf_reader, Level::Default)
|
let compressed_nar_reader = ZstdEncoder::with_quality(nar_buf_reader, Level::Default);
|
||||||
|
InspectReader::new(compressed_nar_reader, |x| compressed_nar_hasher.update(x))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn nar_url(compressed_nar_hash: &[u8]) -> String {
|
||||||
|
format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_buf_nar<S: AsyncRead + Unpin>(stream: &mut S, mut buf: BytesMut) -> Result<Bytes> {
|
||||||
|
while buf.len() < buf.capacity() {
|
||||||
|
let n = read_buf(stream, &mut buf).await?;
|
||||||
|
|
||||||
|
if n == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(buf.freeze())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue