80 lines
2.7 KiB
Rust
80 lines
2.7 KiB
Rust
use anyhow::Result;
|
|
use bytes::BytesMut;
|
|
use nix_compat::{narinfo::SigningKey, nixbase32};
|
|
use object_store::{ObjectStore, aws::AmazonS3, buffered::BufWriter, path::Path};
|
|
use std::sync::Arc;
|
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
use tracing::{debug, trace};
|
|
use ulid::Ulid;
|
|
|
|
use crate::{make_nar::MakeNar, path_info::PathInfo, store::Store};
|
|
|
|
const CHUNK_SIZE: usize = 1024 * 1024 * 5;
|
|
|
|
pub struct Uploader<'a> {
|
|
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
|
|
path: PathInfo,
|
|
}
|
|
|
|
impl<'a> Uploader<'a> {
|
|
pub fn new(
|
|
signing_key: &'a SigningKey<ed25519_dalek::SigningKey>,
|
|
path: PathInfo,
|
|
) -> Result<Self> {
|
|
Ok(Self { signing_key, path })
|
|
}
|
|
|
|
pub async fn upload(&self, s3: Arc<AmazonS3>, store: Arc<Store>) -> Result<()> {
|
|
let mut nar = MakeNar::new(&self.path, store)?;
|
|
|
|
// we don't know what the hash of the compressed file will be so upload to a
|
|
// temp location for now
|
|
let temp_path = Path::parse(Ulid::new().to_string())?;
|
|
let mut s3_writer = BufWriter::new(s3.clone(), temp_path.clone());
|
|
debug!("uploading to temp path: {}", temp_path);
|
|
|
|
// compress and upload nar
|
|
let mut file_reader = nar.compress_and_hash()?;
|
|
loop {
|
|
let mut buf = BytesMut::with_capacity(CHUNK_SIZE);
|
|
let n = file_reader.read_buf(&mut buf).await?;
|
|
s3_writer.put(buf.freeze()).await?;
|
|
if n == 0 {
|
|
break;
|
|
}
|
|
}
|
|
drop(file_reader);
|
|
|
|
let mut nar_info = nar.get_narinfo()?;
|
|
nar_info.add_signature(self.signing_key);
|
|
|
|
// now that we can calculate the file_hash move the nar to where it should be
|
|
let real_path = nar_url(
|
|
&nar_info
|
|
.file_hash
|
|
.expect("file hash must be known at this point"),
|
|
);
|
|
debug!("moving {} to {}", temp_path, real_path);
|
|
// the temp object must be done uploading
|
|
s3_writer.shutdown().await?;
|
|
// this is implemented as a copy-and-delete
|
|
s3.rename(&temp_path, &real_path).await?;
|
|
// set nar url in narinfo
|
|
nar_info.url = real_path.as_ref();
|
|
|
|
// upload narinfo
|
|
let narinfo_path = self.path.narinfo_path();
|
|
debug!("uploading narinfo: {}", narinfo_path);
|
|
trace!("narinfo: {:#}", nar_info);
|
|
s3.put(&narinfo_path, nar_info.to_string().into()).await?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// calculate url where the compressed nar should be uploaded
|
|
fn nar_url(file_hash: &[u8]) -> Path {
|
|
let compressed_nar_hash = nixbase32::encode(file_hash);
|
|
Path::parse(format!("nar/{compressed_nar_hash}.nar.zst"))
|
|
.expect("should parse to a valid object_store::path::Path")
|
|
}
|