diff --git a/Cargo.lock b/Cargo.lock index 2608cf9..aee2282 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -102,39 +102,6 @@ dependencies = [ "zstd-safe", ] -[[package]] -name = "async-stream" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "async-trait" -version = "0.1.88" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "atomic-waker" version = "1.1.2" @@ -462,7 +429,7 @@ dependencies = [ "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", - "tower 0.5.2", + "tower", "tracing", ] @@ -586,53 +553,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "axum" -version = "0.7.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" -dependencies = [ - "async-trait", - "axum-core", - "bytes", - "futures-util", - "http 1.3.1", - "http-body 1.0.1", - "http-body-util", - "itoa", - "matchit", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "rustversion", - "serde", - "sync_wrapper", - "tower 0.5.2", - "tower-layer", - "tower-service", -] - -[[package]] -name = "axum-core" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http 1.3.1", - "http-body 1.0.1", - "http-body-util", - "mime", - "pin-project-lite", - "rustversion", - "sync_wrapper", - "tower-layer", - "tower-service", -] - [[package]] name = "backtrace" version = "0.3.74" @@ -737,12 +657,6 @@ version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" -[[package]] -name = "byteorder" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" - [[package]] name = "bytes" version = "1.10.1" @@ -845,62 +759,12 @@ dependencies = [ "cc", ] -[[package]] -name = "codespan-reporting" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe6d2e5af09e8c8ad56c969f2157a3d4238cebc7c55f0a517728c38f7b200f81" -dependencies = [ - "serde", - "termcolor", - "unicode-width", -] - [[package]] name = "colorchoice" version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" -[[package]] -name = "console-api" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8030735ecb0d128428b64cd379809817e620a40e5001c54465b99ec5feec2857" -dependencies = [ - "futures-core", - "prost", - "prost-types", - "tonic", - "tracing-core", -] - -[[package]] -name = "console-subscriber" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6539aa9c6a4cd31f4b1c040f860a1eac9aa80e7df6b05d506a6e7179936d6a01" -dependencies = [ - "console-api", - "crossbeam-channel", - "crossbeam-utils", - "futures-task", - "hdrhistogram", - "humantime", - "hyper-util", - "prost", - "prost-types", - "serde", - "serde_json", - "thread_local", - "tokio", - "tokio-stream", - "tonic", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "const-oid" version = "0.9.6" @@ -984,21 +848,6 @@ dependencies = [ "crc", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" -dependencies = [ - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-utils" -version = "0.8.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" - [[package]] name = "crypto-bigint" version = "0.4.9" @@ -1058,65 +907,6 @@ dependencies = [ "syn", ] -[[package]] -name = "cxx" -version = "1.0.157" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6354e975ea4ec28033ec3a36fa9baa1a02e3eb22ad740eeb4929370d4f5ba8" -dependencies = [ - "cc", - "cxxbridge-cmd", - "cxxbridge-flags", - "cxxbridge-macro", - "foldhash", - "link-cplusplus", -] - -[[package]] -name = "cxx-build" -version = "1.0.157" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b4400e26ea4b99417e4263b1ce2d8452404d750ba0809a7bd043072593d430d" -dependencies = [ - "cc", - "codespan-reporting", - "proc-macro2", - "quote", - "scratch", - "syn", -] - -[[package]] -name = "cxxbridge-cmd" -version = "1.0.157" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31860c98f69fc14da5742c5deaf78983e846c7b27804ca8c8319e32eef421bde" -dependencies = [ - "clap", - "codespan-reporting", - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "cxxbridge-flags" -version = "1.0.157" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0402a66013f3b8d3d9f2d7c9994656cc81e671054822b0728d7454d9231892f" - -[[package]] -name = "cxxbridge-macro" -version = "1.0.157" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64c0b38f32d68f3324a981645ee39b2d686af36d03c98a386df3716108c9feae" -dependencies = [ - "proc-macro2", - "quote", - "rustversion", - "syn", -] - [[package]] name = "data-encoding" version = "2.9.0" @@ -1300,16 +1090,6 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" -[[package]] -name = "flate2" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece" -dependencies = [ - "crc32fast", - "miniz_oxide", -] - [[package]] name = "fnv" version = "1.0.7" @@ -1509,7 +1289,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.9.0", + "indexmap", "slab", "tokio", "tokio-util", @@ -1528,19 +1308,13 @@ dependencies = [ "futures-core", "futures-sink", "http 1.3.1", - "indexmap 2.9.0", + "indexmap", "slab", "tokio", "tokio-util", "tracing", ] -[[package]] -name = "hashbrown" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" - [[package]] name = "hashbrown" version = "0.15.2" @@ -1552,19 +1326,6 @@ dependencies = [ "foldhash", ] -[[package]] -name = "hdrhistogram" -version = "7.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" -dependencies = [ - "base64 0.21.7", - "byteorder", - "flate2", - "nom 7.1.3", - "num-traits", -] - [[package]] name = "heck" version = "0.5.0" @@ -1663,12 +1424,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "humantime" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" - [[package]] name = "hyper" version = "0.14.32" @@ -1706,7 +1461,6 @@ dependencies = [ "http 1.3.1", "http-body 1.0.1", "httparse", - "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -1748,19 +1502,6 @@ dependencies = [ "tower-service", ] -[[package]] -name = "hyper-timeout" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" -dependencies = [ - "hyper 1.6.0", - "hyper-util", - "pin-project-lite", - "tokio", - "tower-service", -] - [[package]] name = "hyper-tls" version = "0.6.0" @@ -1936,16 +1677,6 @@ dependencies = [ "icu_properties", ] -[[package]] -name = "indexmap" -version = "1.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" -dependencies = [ - "autocfg", - "hashbrown 0.12.3", -] - [[package]] name = "indexmap" version = "2.9.0" @@ -1953,7 +1684,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" dependencies = [ "equivalent", - "hashbrown 0.15.2", + "hashbrown", ] [[package]] @@ -2041,15 +1772,6 @@ dependencies = [ "libc", ] -[[package]] -name = "link-cplusplus" -version = "1.0.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a6f6da007f968f9def0d65a05b187e2960183de70c160204ecfccf0ee330212" -dependencies = [ - "cc", -] - [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -2090,7 +1812,7 @@ version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" dependencies = [ - "hashbrown 0.15.2", + "hashbrown", ] [[package]] @@ -2102,12 +1824,6 @@ dependencies = [ "regex-automata 0.1.10", ] -[[package]] -name = "matchit" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" - [[package]] name = "md-5" version = "0.10.6" @@ -2228,25 +1944,18 @@ dependencies = [ "async-compression", "aws-config", "aws-sdk-s3", - "bytes", "clap", - "console-subscriber", - "cxx", - "cxx-build", "ed25519-dalek", "futures", "nix-compat", - "pkg-config", "regex", "reqwest", "serde", "serde_json", "sha2", - "tempfile", "tokio", - "tokio-stream", - "tokio-util", "tracing", + "tracing-subscriber", "url", ] @@ -2269,6 +1978,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -2379,6 +2098,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "p256" version = "0.11.1" @@ -2419,26 +2144,6 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" -[[package]] -name = "pin-project" -version = "1.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" -dependencies = [ - "pin-project-internal", -] - -[[package]] -name = "pin-project-internal" -version = "1.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2483,15 +2188,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" -[[package]] -name = "ppv-lite86" -version = "0.2.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" -dependencies = [ - "zerocopy", -] - [[package]] name = "prettyplease" version = "0.2.32" @@ -2520,38 +2216,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "prost" -version = "0.13.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" -dependencies = [ - "bytes", - "prost-derive", -] - -[[package]] -name = "prost-derive" -version = "0.13.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" -dependencies = [ - "anyhow", - "itertools", - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "prost-types" -version = "0.13.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" -dependencies = [ - "prost", -] - [[package]] name = "quote" version = "1.0.40" @@ -2567,27 +2231,6 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" -[[package]] -name = "rand" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" -dependencies = [ - "libc", - "rand_chacha", - "rand_core", -] - -[[package]] -name = "rand_chacha" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" -dependencies = [ - "ppv-lite86", - "rand_core", -] - [[package]] name = "rand_core" version = "0.6.4" @@ -2691,7 +2334,7 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", - "tower 0.5.2", + "tower", "tower-service", "url", "wasm-bindgen", @@ -2895,12 +2538,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "scratch" -version = "1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f6280af86e5f559536da57a45ebc84948833b3bee313a7dd25232e09c878a52" - [[package]] name = "sct" version = "0.7.1" @@ -3204,15 +2841,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "termcolor" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" -dependencies = [ - "winapi-util", -] - [[package]] name = "thiserror" version = "2.0.12" @@ -3298,7 +2926,6 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "tracing", "windows-sys 0.52.0", ] @@ -3343,17 +2970,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-stream" -version = "0.1.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.14" @@ -3379,61 +2995,11 @@ version = "0.22.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474" dependencies = [ - "indexmap 2.9.0", + "indexmap", "toml_datetime", "winnow", ] -[[package]] -name = "tonic" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" -dependencies = [ - "async-stream", - "async-trait", - "axum", - "base64 0.22.1", - "bytes", - "h2 0.4.9", - "http 1.3.1", - "http-body 1.0.1", - "http-body-util", - "hyper 1.6.0", - "hyper-timeout", - "hyper-util", - "percent-encoding", - "pin-project", - "prost", - "socket2", - "tokio", - "tokio-stream", - "tower 0.4.13", - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] -name = "tower" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" -dependencies = [ - "futures-core", - "futures-util", - "indexmap 1.9.3", - "pin-project", - "pin-project-lite", - "rand", - "slab", - "tokio", - "tokio-util", - "tower-layer", - "tower-service", - "tracing", -] - [[package]] name = "tower" version = "0.5.2" @@ -3493,6 +3059,17 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.19" @@ -3500,12 +3077,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ "matchers", + "nu-ansi-term", "once_cell", "regex", "sharded-slab", + "smallvec", "thread_local", "tracing", "tracing-core", + "tracing-log", ] [[package]] @@ -3526,12 +3106,6 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" -[[package]] -name = "unicode-width" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" - [[package]] name = "untrusted" version = "0.9.0" @@ -3722,14 +3296,27 @@ dependencies = [ ] [[package]] -name = "winapi-util" -version = "0.1.9" +name = "winapi" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" dependencies = [ - "windows-sys 0.59.0", + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", ] +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-link" version = "0.1.1" @@ -3971,26 +3558,6 @@ dependencies = [ "synstructure", ] -[[package]] -name = "zerocopy" -version = "0.8.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879" -dependencies = [ - "zerocopy-derive", -] - -[[package]] -name = "zerocopy-derive" -version = "0.8.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "zerofrom" version = "0.1.6" diff --git a/Cargo.toml b/Cargo.toml index 2430fdd..f7bc3f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,16 +17,7 @@ reqwest = "0.12.15" serde = { version = "1.0.219", features = [ "derive" ]} serde_json = "1.0.140" sha2 = "0.10.8" -tokio = { version = "1.44.1", features = [ "full", "tracing", "parking_lot" ]} +tokio = { version = "1.44.1", features = [ "full" ]} tracing = "0.1.41" +tracing-subscriber = { version = "0.3.19", features = ["env-filter"]} url = { version = "2.5.4", features = [ "serde" ]} -cxx = "1.0" -console-subscriber = "0.4.1" -bytes = "1.10.1" -tokio-stream = { version = "0.1.17", features = ["fs"] } -tempfile = "3.19.1" -tokio-util = { version = "0.7.14", features = ["io"] } - -[build-dependencies] -cxx-build = "1.0" -pkg-config = "0.3.32" diff --git a/build.rs b/build.rs deleted file mode 100644 index 2bbe451..0000000 --- a/build.rs +++ /dev/null @@ -1,21 +0,0 @@ -fn main() { - cxx_build::bridge("src/bindings/mod.rs") - .file("src/bindings/nix.cpp") - .flag("-std=c++2a") - .flag("-O2") - .flag("-include") - .flag("nix/config.h") - .flag("-I") - .flag(concat!(env!("NIX_INCLUDE_PATH"), "/nix")) - .compile("nixbinding"); - println!("cargo:rerun-if-changed=src/bindings"); - - pkg_config::Config::new() - .atleast_version("2.4") - .probe("nix-store") - .unwrap(); - pkg_config::Config::new() - .atleast_version("2.4") - .probe("nix-main") - .unwrap(); -} diff --git a/flake.nix b/flake.nix index 46caa41..c263f49 100644 --- a/flake.nix +++ b/flake.nix @@ -22,7 +22,6 @@ }; toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml; craneLib = (crane.mkLib pkgs).overrideToolchain(_: toolchain); - lib = pkgs.lib; in { devShells.default = pkgs.mkShell { @@ -30,15 +29,9 @@ pkg-config ]; buildInputs = with pkgs; [ - toolchain openssl - nix - boost - tokio-console + toolchain ]; - NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include"; - RUST_LOG = "nixcp=debug"; - RUST_BACKGRACE = 1; }; packages.default = craneLib.buildPackage { diff --git a/src/bindings/mod.rs b/src/bindings/mod.rs deleted file mode 100644 index 701a15e..0000000 --- a/src/bindings/mod.rs +++ /dev/null @@ -1,234 +0,0 @@ -/* -Copyright 2022 Zhaofeng Li and the Attic contributors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -//! `libnixstore` Bindings -#![allow(dead_code)] - -use std::cell::UnsafeCell; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use anyhow::Result; -use futures::stream::{Stream, StreamExt}; -use tokio::io::{AsyncWrite, AsyncWriteExt}; - -// The C++ implementation takes care of concurrency -#[repr(transparent)] -pub struct FfiNixStore(UnsafeCell>); - -unsafe impl Send for FfiNixStore {} -unsafe impl Sync for FfiNixStore {} - -impl FfiNixStore { - pub fn store(&self) -> Pin<&mut ffi::CNixStore> { - unsafe { - let ptr = self.0.get().as_mut().unwrap(); - ptr.pin_mut() - } - } -} - -/// Obtain a handle to the Nix store. -pub unsafe fn open_nix_store() -> Result { - match ffi::open_nix_store() { - Ok(ptr) => { - let cell = UnsafeCell::new(ptr); - Ok(FfiNixStore(cell)) - } - Err(e) => Err(e.into()), - } -} - -// TODO: Benchmark different implementations -// (tokio, crossbeam, flume) -mod mpsc { - // Tokio - pub use tokio::sync::mpsc::{ - UnboundedReceiver, UnboundedSender, error::SendError, unbounded_channel, - }; -} - -/// Async write request. -#[derive(Debug)] -enum AsyncWriteMessage { - Data(Vec), - Error(String), - Eof, -} - -/// Async write request sender. -#[derive(Clone)] -pub struct AsyncWriteSender { - sender: mpsc::UnboundedSender, -} - -impl AsyncWriteSender { - fn send(&mut self, data: &[u8]) -> Result<(), mpsc::SendError> { - let message = AsyncWriteMessage::Data(Vec::from(data)); - self.sender.send(message) - } - - fn eof(&mut self) -> Result<(), mpsc::SendError> { - let message = AsyncWriteMessage::Eof; - self.sender.send(message) - } - - pub(crate) fn rust_error( - &mut self, - error: impl std::error::Error, - ) -> Result<(), impl std::error::Error> { - let message = AsyncWriteMessage::Error(error.to_string()); - self.sender.send(message) - } -} - -/// A wrapper of the `AsyncWrite` trait for the synchronous Nix C++ land. -pub struct AsyncWriteAdapter { - receiver: mpsc::UnboundedReceiver, - eof: bool, -} - -impl AsyncWriteAdapter { - pub fn new() -> (Self, Box) { - let (sender, receiver) = mpsc::unbounded_channel(); - - let r = Self { - receiver, - eof: false, - }; - let sender = Box::new(AsyncWriteSender { sender }); - - (r, sender) - } - - /// Write everything the sender sends to us. - pub async fn write_all(mut self, mut writer: Box) -> Result<()> { - let writer = writer.as_mut(); - - while let Some(data) = self.next().await { - match data { - Ok(v) => { - writer.write_all(&v).await?; - } - Err(e) => { - return Err(e); - } - } - } - - if !self.eof { - Err(io::Error::from(io::ErrorKind::BrokenPipe).into()) - } else { - Ok(()) - } - } -} - -impl Stream for AsyncWriteAdapter { - type Item = Result>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.receiver.poll_recv(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Some(message)) => { - use AsyncWriteMessage::*; - match message { - Data(v) => Poll::Ready(Some(Ok(v))), - Error(exception) => { - let error = anyhow::Error::msg(format!("cxx error: {exception}")); - Poll::Ready(Some(Err(error))) - } - Eof => { - self.eof = true; - Poll::Ready(None) - } - } - } - Poll::Ready(None) => { - if !self.eof { - Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe).into()))) - } else { - Poll::Ready(None) - } - } - } - } -} - -#[cxx::bridge] -/// Generated by `cxx.rs`. -/// -/// Mid-level wrapper of `libnixstore` implemented in C++. -mod ffi { - extern "Rust" { - type AsyncWriteSender; - fn send(self: &mut AsyncWriteSender, data: &[u8]) -> Result<()>; - fn eof(self: &mut AsyncWriteSender) -> Result<()>; - } - - unsafe extern "C++" { - include!("nixcp/src/bindings/nix.hpp"); - - // ========= - // CNixStore - // ========= - - /// Mid-level wrapper for the Unix Domain Socket Nix Store. - type CNixStore; - - /// Queries information about a valid path. - fn query_path_info( - self: Pin<&mut CNixStore>, - store_path: &[u8], - ) -> Result>; - - /// Computes the closure of a valid path. - /// - /// If `flip_directions` is true, the set of paths that can reach `store_path` is - /// returned. - fn compute_fs_closure( - self: Pin<&mut CNixStore>, - store_path: &[u8], - flip_direction: bool, - include_outputs: bool, - include_derivers: bool, - ) -> Result>>; - - /// Obtains a handle to the Nix store. - fn open_nix_store() -> Result>; - - /// Creates a NAR dump from a path. - fn nar_from_path( - self: Pin<&mut CNixStore>, - base_name: Vec, - sender: Box, - ) -> Result<()>; - - // ========= - // CPathInfo - // ========= - - /// Mid-level wrapper for the `nix::ValidPathInfo` struct. - type CPathInfo; - - /// Returns the references of the store path. - fn references(self: Pin<&mut CPathInfo>) -> UniquePtr>; - - /// Returns the possibly invalid signatures attached to the store path. - fn sigs(self: Pin<&mut CPathInfo>) -> UniquePtr>; - } -} diff --git a/src/bindings/nix.cpp b/src/bindings/nix.cpp deleted file mode 100644 index 326e878..0000000 --- a/src/bindings/nix.cpp +++ /dev/null @@ -1,124 +0,0 @@ -/* -Copyright 2022 Zhaofeng Li and the Attic contributors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// C++ side of the libnixstore glue. -// -// We implement a mid-level wrapper of the Nix Store interface, -// which is then wrapped again in the Rust side to enable full -// async-await operation. -// -// Here we stick with the naming conventions of Rust and handle -// Rust types directly where possible, so that the interfaces are -// satisfying to use from the Rust side via cxx.rs. - -#include "nixcp/src/bindings/nix.hpp" - -static std::mutex g_init_nix_mutex; -static bool g_init_nix_done = false; - -static nix::StorePath store_path_from_rust(RBasePathSlice base_name) { - std::string_view sv((const char *)base_name.data(), base_name.size()); - return nix::StorePath(sv); -} - -// ======== -// RustSink -// ======== - -RustSink::RustSink(RBox sender) : sender(std::move(sender)) {} - -void RustSink::operator () (std::string_view data) { - RBasePathSlice s((const unsigned char *)data.data(), data.size()); - - this->sender->send(s); -} - -void RustSink::eof() { - this->sender->eof(); -} - - -// ========= -// CPathInfo -// ========= - -CPathInfo::CPathInfo(nix::ref pi) : pi(pi) {} - -std::unique_ptr> CPathInfo::sigs() { - std::vector result; - for (auto&& elem : this->pi->sigs) { - result.push_back(std::string(elem)); - } - return std::make_unique>(result); -} - -std::unique_ptr> CPathInfo::references() { - std::vector result; - for (auto&& elem : this->pi->references) { - result.push_back(std::string(elem.to_string())); - } - return std::make_unique>(result); -} - -// ========= -// CNixStore -// ========= - -CNixStore::CNixStore() { - std::map params; - std::lock_guard lock(g_init_nix_mutex); - - if (!g_init_nix_done) { - nix::initNix(); - g_init_nix_done = true; - } - - this->store = nix::openStore(nix::settings.storeUri.get(), params); -} - -std::unique_ptr CNixStore::query_path_info(RBasePathSlice base_name) { - auto store_path = store_path_from_rust(base_name); - - auto r = this->store->queryPathInfo(store_path); - return std::make_unique(r); -} - -std::unique_ptr> CNixStore::compute_fs_closure(RBasePathSlice base_name, bool flip_direction, bool include_outputs, bool include_derivers) { - std::set out; - - this->store->computeFSClosure(store_path_from_rust(base_name), out, flip_direction, include_outputs, include_derivers); - - std::vector result; - for (auto&& elem : out) { - result.push_back(std::string(elem.to_string())); - } - return std::make_unique>(result); -} - -void CNixStore::nar_from_path(RVec base_name, RBox sender) { - RustSink sink(std::move(sender)); - - std::string_view sv((const char *)base_name.data(), base_name.size()); - nix::StorePath store_path(sv); - - // exceptions will be thrown into Rust - this->store->narFromPath(store_path, sink); - sink.eof(); -} - -std::unique_ptr open_nix_store() { - return std::make_unique(); -} diff --git a/src/bindings/nix.hpp b/src/bindings/nix.hpp deleted file mode 100644 index 5c79a33..0000000 --- a/src/bindings/nix.hpp +++ /dev/null @@ -1,88 +0,0 @@ -/* -Copyright 2022 Zhaofeng Li and the Attic contributors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// C++ side of the libnixstore glue. -// -// We implement a mid-level wrapper of the Nix Store interface, -// which is then wrapped again in the Rust side to enable full -// async-await operation. -// -// Here we stick with the naming conventions of Rust and handle -// Rust types directly where possible, so that the interfaces are -// satisfying to use from the Rust side via cxx.rs. - -#pragma once -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -template using RVec = rust::Vec; -template using RBox = rust::Box; -template using RSlice = rust::Slice; -using RString = rust::String; -using RStr = rust::Str; -using RBasePathSlice = RSlice; -using RHashSlice = RSlice; - -struct AsyncWriteSender; - -struct RustSink : nix::Sink -{ - RBox sender; -public: - RustSink(RBox sender); - void operator () (std::string_view data) override; - void eof(); -}; - -// Opaque wrapper for nix::ValidPathInfo -class CPathInfo { - nix::ref pi; -public: - CPathInfo(nix::ref pi); - std::unique_ptr> sigs(); - std::unique_ptr> references(); -}; - -class CNixStore { - std::shared_ptr store; -public: - CNixStore(); - - RString store_dir(); - std::unique_ptr query_path_info(RBasePathSlice base_name); - std::unique_ptr> compute_fs_closure( - RBasePathSlice base_name, - bool flip_direction, - bool include_outputs, - bool include_derivers); - void nar_from_path(RVec base_name, RBox sender); -}; - -std::unique_ptr open_nix_store(); - -// Relies on our definitions -#include "nixcp/src/bindings/mod.rs.h" diff --git a/src/main.rs b/src/main.rs index f4c0f4f..57d3340 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,19 +1,15 @@ #![feature(let_chains)] #![feature(extend_one)] -use std::path::PathBuf; - use anyhow::{Context, Result}; use clap::{Args, Parser, Subcommand}; +use tracing_subscriber::{EnvFilter, FmtSubscriber}; use push::Push; -use store::Store; -mod bindings; mod cli; mod path_info; mod push; -mod store; mod uploader; #[derive(Parser, Debug)] @@ -64,25 +60,26 @@ pub struct PushArgs { #[arg(long)] skip_signature_check: bool, - /// Path to upload - /// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 - #[arg(value_name = "PATH")] - paths: Vec, + /// Package or store path to upload + /// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 + #[arg(value_name = "package or store path")] + package: String, } #[tokio::main] async fn main() -> Result<()> { - console_subscriber::init(); + let filter = EnvFilter::from_default_env(); + let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish(); + tracing::subscriber::set_global_default(subscriber)?; let cli = Cli::parse(); match &cli.command { Commands::Push(cli) => { - let store = Store::connect()?; - let push = Box::leak(Box::new(Push::new(cli, store).await?)); - push.add_paths(cli.paths.clone()) + let push = Box::leak(Box::new(Push::new(cli).await?)); + push.paths_from_package(&cli.package) .await - .context("add paths to push")?; + .context("nixcp get paths from package")?; push.run().await.context("nixcp run")?; } } diff --git a/src/path_info.rs b/src/path_info.rs index e62b68a..6dcbb53 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -1,60 +1,85 @@ use std::collections::HashSet; -use anyhow::{Context, Result}; +use anyhow::{Context, Error, Result}; use aws_sdk_s3 as s3; -use futures::future::join_all; use nix_compat::nixbase32; use nix_compat::store_path::StorePath; use regex::Regex; -use std::path::Path; +use serde::Deserialize; use tokio::process::Command; -use tracing::{debug, trace}; +use tracing::{debug, error, trace}; use url::Url; -use crate::store::Store; - -#[derive(Debug, Clone)] +// nix path-info --derivation --json +#[derive(Debug, Clone, Deserialize)] pub struct PathInfo { + pub deriver: Option>, pub path: StorePath, - pub signatures: Vec, + signatures: Option>, pub references: Vec>, } impl PathInfo { - pub async fn from_path(path: &Path, store: &Store) -> Result { - debug!("query path info for {:?}", path); - - let derivation = match path.extension() { - Some(ext) if ext == "drv" => path.as_os_str().as_encoded_bytes(), - _ => { - &Command::new("nix") - .arg("path-info") - .arg("--derivation") - .arg(path) - .output() - .await - .context(format!("run command: nix path-info --derivaiton {path:?}"))? - .stdout - } - }; - let derivation = String::from_utf8_lossy(derivation); - debug!("derivation: {derivation}"); - - let store_path = StorePath::from_absolute_path(derivation.trim().as_bytes()) - .context("storepath from derivation")?; - store - .query_path_info(store_path) + // get PathInfo for a package or a store path + // we deserialize this as an array of `PathInfo` below + pub async fn from_path(path: &str) -> Result { + debug!("query nix path-info for {path}"); + // use lix cause nix would return a json map instead of an array + // json output is not stable and could break in future + // TODO figure out a better way + let nix_cmd = Command::new("nix") + .arg("run") + .arg("--experimental-features") + .arg("nix-command flakes") + .arg("github:nixos/nixpkgs/nixos-unstable#lix") + .arg("--") + .arg("path-info") + .arg("--json") + .arg(path) + .output() .await - .context("query pathinfo for derivation") + .context("`nix path-info` failed for {package}")?; + + trace!( + "nix path-info output: {}", + String::from_utf8_lossy(&nix_cmd.stdout) + ); + + // nix path-info returns an array with one element + match serde_json::from_slice::>(&nix_cmd.stdout) + .context("parse path info from stdout") + { + Ok(path_info) => path_info + .into_iter() + .next() + .ok_or_else(|| Error::msg("nix path-info returned empty")), + Err(e) => { + error!( + "Failed to parse data from `nix path-info`. The path may not exist on your system." + ); + Err(e) + } + } } - pub async fn get_closure(&self, store: &Store) -> Result> { - let futs = store - .compute_fs_closure(self.path.clone()) - .await? - .into_iter() - .map(|x| store.query_path_info(x)); - join_all(futs).await.into_iter().collect() + pub async fn get_closure(&self) -> Result> { + debug!("query nix-store for {}", self.absolute_path()); + let nix_store_cmd = Command::new("nix-store") + .arg("--query") + .arg("--requisites") + .arg("--include-outputs") + .arg(self.absolute_path()) + .output() + .await + .expect("nix-store cmd failed"); + + let nix_store_paths = String::from_utf8(nix_store_cmd.stdout)?; + let nix_store_paths: Vec<&str> = nix_store_paths.lines().collect(); + let mut closure = Vec::with_capacity(nix_store_paths.len()); + for path in nix_store_paths { + closure.push(Self::from_path(path).await?); + } + Ok(closure) } /// checks if the path is signed by any upstream. if it is, we assume a cache hit. @@ -76,13 +101,15 @@ impl PathInfo { } fn signees(&self) -> Vec<&str> { - let signers: Vec<_> = self - .signatures - .iter() - .filter_map(|signature| Some(signature.split_once(":")?.0)) - .collect(); - trace!("signers for {}: {:?}", self.path, signers); - return signers; + if let Some(signatures) = self.signatures.as_ref() { + let signees: Vec<_> = signatures + .iter() + .filter_map(|signature| Some(signature.split_once(":")?.0)) + .collect(); + trace!("signees for {}: {:?}", self.path, signees); + return signees; + } + Vec::new() } pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool { @@ -122,3 +149,69 @@ impl PathInfo { .is_ok() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn get_signees_from_path_info() { + let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#; + let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize"); + + path_info.signatures = Some(vec![ + "cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(), + "nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(), + ]); + let signees = path_info.signees(); + assert_eq!(signees, vec!["cache.nixos.org-1", "nixcache.cy7.sh"]); + } + + #[test] + fn match_upstream_cache_from_signature() { + let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#; + let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize"); + + path_info.signatures = Some(vec![ + "cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(), + "nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(), + "nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs=".to_string(), + ]); + assert!( + path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()]) + ); + assert!( + path_info.check_upstream_signature(&[Url::parse("https://nixcache.cy7.sh").unwrap()]) + ); + assert!( + path_info.check_upstream_signature(&[ + Url::parse("https://nix-community.cachix.org").unwrap() + ]) + ); + assert!( + !path_info + .check_upstream_signature(&[Url::parse("https://fake-cache.cachix.org").unwrap()]), + ); + } + + #[test] + fn path_info_without_signature() { + let path_info_json = r#"{"ca":"fixed:r:sha256:1q10p04pgx9sk6xbvrkn4nvh0ys2lzplgcni5368f4z3cr8ikbmz","narHash":"sha256-v64ZUWbjE4fMKNGyR++nQnsAtyV25r26mTr1dwm4IOA=","narSize":5520,"path":"/nix/store/gj6hz9mj23v01yvq1nn5f655jrcky1qq-nixos-option.nix","references":[],"registrationTime":1744740942,"valid":true}"#; + let path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize"); + + assert!( + !path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()]) + ); + } + + /* + #[test] + fn path_info_deserialize_nix_map() { + let path_info_json = r#"{"/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation":{"ca":null,"deriver":"/nix/store/h8z25s6arcrns5nmrq1yhgbamywjivpn-home-manager-generation.drv","narHash":"sha256-o4qwqyJ5UVm9cyC/nBNcNYVnIM14Pewgw7fou+wUVSY=","narSize":13608,"references":["/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/4d0ix5djms3n2jnjdc58l916cwack1rp-empty-directory","/nix/store/56zmgla8443qfrkrh2ch0vz0mh8ywrw1-home-manager-files","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/80l1sb3vcmrkcdd7ihlizkcnv19rq9fj-ncurses-6.5","/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation","/nix/store/92as847i10kl6s19fi910ddyk9l83835-check-link-targets.sh","/nix/store/9c90iz95yynyh3vsc67zndch6j01vgz3-home-manager-path","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bm5fi6wj0w4r2wjll2448k307bzfcjwx-cleanup","/nix/store/c244fsb3a7i5837lzn94m4bmav9i5p9b-link","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/gpxsdrrd4x93fs75395vr2dfys1ki9mq-jq-1.7.1-bin","/nix/store/jlf743lqxbvad6dbgndsgqfg20m2np5i-sd-switch-0.5.3","/nix/store/mhmgm739aagj4x7hr6ag2wjmxhmpy8mf-gettext-0.22.5","/nix/store/w9db12j05yv5hl31s6jndd9cfm1g1gw4-hm-modules-messages","/nix/store/wj1c3gsiajabnq50ifxqnlv60i5rhqj7-diffutils-3.10","/nix/store/xhql0ilzbiqwnmz4z8y0phk611wynxf2-gnused-4.9","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zhrjg6wxrxmdlpn6iapzpp2z2vylpvw5-home-manager.sh"],"registrationTime":1744742989,"signatures":["nixcache.cy7.sh:Vq4X95kSzum7BwrBhjmmM2yVipfBI3AE3jgZ3b3RoYrP4/ghotbDdlwCvwK3qx4BQdEOLSgrC1tDwiMNb6oRBw=="],"ultimate":false}}"#; + serde_json::from_str::>(path_info_json).expect("must serialize"); + + let path_info_json = r#"{"/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b":{"ca":null,"deriver":"/nix/store/12ssi931481jlkizgfk1c1jnawvwjbhh-nixos-system-chunk-25.05.20250412.2631b0b.drv","narHash":"sha256-CHhBIzMD4v/FKqKgGroq0UC1k3GrK5lcNwQPMpv2xLc=","narSize":20704,"references":["/nix/store/0yjiyixxsr137iw93hnaacdsssy1li9h-switch-to-configuration-0.1.0","/nix/store/14rby7cpwrzjsjym44cl5h6nj6qpn1gs-etc","/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b","/nix/store/3wjljpj30fvv2cdb60apr4126pa5bm87-shadow-4.17.2","/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/5dyh8l59kfvf89zjkbmjfnx7fix93n4f-net-tools-2.10","/nix/store/aq9wdsz12bg9252790l9awiry2bml4ls-sops-install-secrets-0.0.1","/nix/store/b00kq6fjhgisdrykg621vml8505nnmb3-users-groups.json","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bfr68wi6k8icb3j9fy3fzchva56djfhd-mounts.sh","/nix/store/cjnihsds5hhnji9r85hglph07q9y9hgc-system-path","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/f9jll96j74f5ykvs062718b98lfjbn9g-util-linux-2.40.4-bin","/nix/store/h7zih134d3n5yk8pnhv1fa38n6qkyrn2-pre-switch-checks","/nix/store/idn5n51246piyxcr3v6gxnj5a5l9mzpn-linux-6.14.2","/nix/store/ipn5793y61x2904xqnkgbjnp91svjjzx-perl-5.40.0-env","/nix/store/j1rikvl25pz0b5ham1ijq0nbg1q2fqfy-initrd-linux-6.14.2","/nix/store/jgawnqyh6piwcl79gxpmq5czx9rfr9xh-glibc-locales-2.40-66","/nix/store/jqgmcv8j4gj59218hcbiyn8z951rycdj-install-grub.sh","/nix/store/kpmybhxy3gz6k1znbdirwsp3c6wvsgg9-manifest.json","/nix/store/lgainx4gl6q7mhiwmls81d3n51p5jz7z-linux-6.14.2-modules","/nix/store/mhxn5kwnri3z9hdzi3x0980id65p0icn-lib.sh","/nix/store/n8n0faszqlnf3mdg0fj6abnknrhjsw5j-perl-5.40.0-env","/nix/store/nq61v7a601gjndijq5nndprkzpwz4q9g-glibc-2.40-66-bin","/nix/store/nx27idxpvi3fk3p7admvhipny73nr25n-kmod-31","/nix/store/pggww1d2pg24fcg5v36xn63n53vanyyi-gnupg-2.4.7","/nix/store/rg5rf512szdxmnj9qal3wfdnpfsx38qi-setup-etc.pl","/nix/store/vvlfaafnz3pdhw7lx5kc5gb9pl4zhz5l-local-cmds","/nix/store/w142vx7ij1fz6qwhp5dprkf59cizvv1v-update-users-groups.pl","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zlsmh0ccgvncg30qb4y0mp5pahnk1wnw-append-initrd-secrets","/nix/store/zs07icpv5ykf8m36xcv717hh26bp09fa-firmware","/nix/store/zy2n4id5gcxcbx2x8jbblkmcpdlpsypk-getent-glibc-2.40-66"],"registrationTime":1744743136,"signatures":["nixcache.cy7.sh:dZ1XiKQNe0fRX48gBj03PIABYJGV6BPwb72YpMqEBONZMF+JrkVKhRCF0ur/4Bf5prHxg6Qfg1ytP/4csRC9DQ=="],"ultimate":false}}"#; + serde_json::from_str::>(path_info_json).expect("must serialize"); + } + */ +} diff --git a/src/push.rs b/src/push.rs index 18f74b9..719d3a8 100644 --- a/src/push.rs +++ b/src/push.rs @@ -1,7 +1,6 @@ use std::{ fs, iter::once, - path::PathBuf, sync::{ Arc, atomic::{AtomicUsize, Ordering}, @@ -14,10 +13,10 @@ use aws_sdk_s3 as s3; use futures::future::join_all; use nix_compat::narinfo::{self, SigningKey}; use tokio::sync::{RwLock, mpsc}; -use tracing::debug; +use tracing::{debug, info, trace}; use url::Url; -use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader}; +use crate::{PushArgs, path_info::PathInfo, uploader::Uploader}; pub struct Push { upstream_caches: Vec, @@ -25,7 +24,6 @@ pub struct Push { s3_client: s3::Client, signing_key: SigningKey, bucket: String, - store: Arc, // paths that we skipped cause of a signature match signature_hit_count: AtomicUsize, // paths that we skipped cause we found it on an upstream @@ -37,7 +35,7 @@ pub struct Push { } impl Push { - pub async fn new(cli: &PushArgs, store: Store) -> Result { + pub async fn new(cli: &PushArgs) -> Result { let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1); for upstream in cli .upstreams @@ -69,7 +67,6 @@ impl Push { s3_client, signing_key, bucket: cli.bucket.clone(), - store: Arc::new(store), signature_hit_count: AtomicUsize::new(0), upstream_hit_count: AtomicUsize::new(0), already_exists_count: AtomicUsize::new(0), @@ -77,33 +74,18 @@ impl Push { }) } - pub async fn add_paths(&'static self, paths: Vec) -> Result<()> { - let mut futs = Vec::with_capacity(paths.len()); - for path in paths { - let store_paths = self.store_paths.clone(); - let store = self.store.clone(); - - futs.push(tokio::spawn(async move { - let path_info = PathInfo::from_path(path.as_path(), &store) - .await - .context("get path info for path")?; - debug!("path-info for {path:?}: {path_info:?}"); - - store_paths.write().await.extend( - path_info - .get_closure(&store) - .await - .context("closure from path info")?, - ); - Ok(()) - })); - } - join_all(futs) + pub async fn paths_from_package(&mut self, package: &str) -> Result<()> { + let path_info = PathInfo::from_path(package) .await - .into_iter() - .flatten() - .collect::>>()?; - println!("found {} store paths", self.store_paths.read().await.len()); + .context("get path info for package")?; + debug!("path-info for {package}: {:?}", path_info); + self.store_paths.write().await.extend( + path_info + .get_closure() + .await + .context("closure from path info")?, + ); + info!("found {} store paths", self.store_paths.read().await.len()); Ok(()) } @@ -124,7 +106,7 @@ impl Push { for path in store_paths.into_iter() { if path.check_upstream_signature(&self.upstream_caches) { - debug!("skip {} (signature match)", path.absolute_path()); + trace!("skip {} (signature match)", path.absolute_path()); self.signature_hit_count.fetch_add(1, Ordering::Release); continue; } @@ -139,13 +121,13 @@ impl Push { .check_if_already_exists(&self.s3_client, self.bucket.clone()) .await { - debug!("skip {} (already exists)", path.absolute_path()); + trace!("skip {} (already exists)", path.absolute_path()); self.already_exists_count.fetch_add(1, Ordering::Relaxed); } else { tx.send(path).await.unwrap(); } } else { - debug!("skip {} (upstream hit)", path.absolute_path()); + trace!("skip {} (upstream hit)", path.absolute_path()); self.upstream_hit_count.fetch_add(1, Ordering::Relaxed); } }) @@ -160,12 +142,13 @@ impl Push { } async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { - let mut uploads = Vec::new(); + let mut uploads = Vec::with_capacity(10); loop { if let Some(path_to_upload) = rx.recv().await { - println!("uploading: {}", path_to_upload.absolute_path()); + let absolute_path = path_to_upload.absolute_path(); + println!("uploading: {}", absolute_path); let uploader = Uploader::new( &self.signing_key, path_to_upload, @@ -173,12 +156,10 @@ impl Push { self.bucket.clone(), )?; - uploads.push(tokio::spawn({ - async move { - let res = uploader.upload().await; - self.upload_count.fetch_add(1, Ordering::Relaxed); - res - } + uploads.push(tokio::spawn(async move { + let res = uploader.upload().await; + self.upload_count.fetch_add(1, Ordering::Relaxed); + res })); } else { join_all(uploads) diff --git a/src/store.rs b/src/store.rs deleted file mode 100644 index 0433362..0000000 --- a/src/store.rs +++ /dev/null @@ -1,96 +0,0 @@ -use std::{ffi::OsStr, os::unix::ffi::OsStrExt, sync::Arc}; - -use anyhow::{Context, Result}; -use nix_compat::store_path::StorePath; -use tokio::task; - -use crate::{ - bindings::{self, AsyncWriteAdapter}, - path_info::PathInfo, -}; - -pub struct Store { - inner: Arc, -} - -impl Store { - pub fn connect() -> Result { - let inner = unsafe { bindings::open_nix_store()? }; - Ok(Self { - inner: Arc::new(inner), - }) - } - - pub async fn compute_fs_closure( - &self, - path: StorePath, - ) -> Result>> { - let inner = self.inner.clone(); - task::spawn_blocking(move || { - let cxx_vector = - inner - .store() - .compute_fs_closure(path.to_string().as_bytes(), false, true, true)?; - Ok(cxx_vector - .iter() - .map(|x| { - StorePath::from_bytes(x.as_bytes()) - .context("make StorePath from vector returned by compute_fs_closure") - }) - .collect::>()?) - }) - .await - .unwrap() - } - - pub async fn query_path_info(&self, path: StorePath) -> Result { - let inner = self.inner.clone(); - - task::spawn_blocking(move || { - let mut c_path_info = inner - .store() - .query_path_info(path.to_string().as_bytes()) - .context("query cpp for path info")?; - - let signatures = c_path_info - .pin_mut() - .sigs() - .into_iter() - .map(|x| { - let osstr = OsStr::from_bytes(x.as_bytes()); - osstr.to_str().unwrap().to_string() - }) - .collect(); - let references = c_path_info - .pin_mut() - .references() - .into_iter() - .map(|x| StorePath::from_bytes(x.as_bytes())) - .collect::>() - .context("get references from pathinfo")?; - - Ok(PathInfo { - path, - signatures, - references, - }) - }) - .await - .unwrap() - } - - pub fn stream_nar(&self, path: StorePath) -> AsyncWriteAdapter { - let inner = self.inner.clone(); - let (adapter, mut sender) = AsyncWriteAdapter::new(); - - task::spawn_blocking(move || { - if let Err(e) = inner - .store() - .nar_from_path(path.to_string().as_bytes().to_vec(), sender.clone()) - { - let _ = sender.rust_error(e); - } - }); - adapter - } -} diff --git a/src/uploader.rs b/src/uploader.rs index 8f4efaa..b0520ac 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -1,30 +1,20 @@ -use std::{collections::BTreeMap, os::unix::fs::PermissionsExt, path::PathBuf}; - use anyhow::Result; use async_compression::{Level, tokio::bufread::ZstdEncoder}; use aws_sdk_s3::{ self as s3, types::{CompletedMultipartUpload, CompletedPart}, }; -use bytes::{BufMut, Bytes, BytesMut}; -use futures::{future::join_all, stream::TryStreamExt}; +use futures::future::join_all; use nix_compat::{ - nar::writer::r#async as nar, narinfo::{self, NarInfo, SigningKey}, nixbase32, store_path::StorePath, }; use sha2::{Digest, Sha256}; -use tokio::{ - fs::{File, read_dir, read_link}, - io::{AsyncRead, BufReader}, - pin, -}; -use tokio_stream::wrappers::ReadDirStream; -use tokio_util::io::{InspectReader, read_buf}; +use tokio::{io::AsyncReadExt, process::Command}; use tracing::debug; -use crate::{bindings::AsyncWriteAdapter, path_info::PathInfo, store::Store}; +use crate::path_info::PathInfo; const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5; @@ -33,7 +23,6 @@ pub struct Uploader<'a> { path: PathInfo, s3_client: &'a s3::Client, bucket: String, - store: &'a Store, } impl<'a> Uploader<'a> { @@ -42,37 +31,37 @@ impl<'a> Uploader<'a> { path: PathInfo, s3_client: &'a s3::Client, bucket: String, - store: &'a Store, ) -> Result { Ok(Self { signing_key, path, s3_client, bucket, - store, }) } pub async fn upload(&self) -> Result<()> { - let mut nar_temp = File::open(tempfile::Builder::new().tempfile()?.path()).await?; - self.make_nar(&mut nar_temp).await; + let nar = self.make_nar().await?; + let mut nar_info = self.narinfo_from_nar(&nar)?; + let nar = self.compress_nar(&nar).await; - // this goes to the .narinfo file - let mut nar_hasher = Sha256::new(); - // this is the URL for file .narinfo points to - let mut file_hasher = Sha256::new(); - let nar_reader = compress_and_hash_nar(nar_temp, &mut nar_hasher, &mut file_hasher); + // update fields that we know after compression + let mut hasher = Sha256::new(); + hasher.update(&nar); + let hash: [u8; 32] = hasher.finalize().into(); + let nar_url = self.nar_url(&hash); + nar_info.file_hash = Some(hash); + nar_info.file_size = Some(nar.len() as u64); + nar_info.url = nar_url.as_str(); + debug!("uploading nar with key: {nar_url}"); - let buf = BytesMut::with_capacity(MULTIPART_CUTOFF); - let - - if first_chunk.len() < MULTIPART_CUTOFF { + if nar.len() < MULTIPART_CUTOFF { let put_object = self .s3_client .put_object() .bucket(&self.bucket) .key(&nar_url) - .body(first_chunk.into()) + .body(nar.into()) .send() .await?; debug!("put object: {:#?}", put_object); @@ -143,11 +132,20 @@ impl<'a> Uploader<'a> { .body(nar_info.to_string().as_bytes().to_vec().into()) .send() .await?; - debug!("done uploading narinfo"); Ok(()) } + async fn make_nar(&self) -> Result> { + Ok(Command::new("nix") + .arg("nar") + .arg("dump-path") + .arg(self.path.absolute_path()) + .output() + .await? + .stdout) + } + fn narinfo_from_nar(&self, nar: &[u8]) -> Result { let mut hasher = Sha256::new(); hasher.update(nar); @@ -161,7 +159,7 @@ impl<'a> Uploader<'a> { signatures: Vec::new(), ca: None, system: None, - deriver: None, + deriver: self.path.deriver.as_ref().map(|x| x.as_ref()), compression: Some("zstd"), file_hash: None, file_size: None, @@ -172,89 +170,18 @@ impl<'a> Uploader<'a> { Ok(nar_info) } - async fn make_nar(&self, sink: &mut File) -> Result<()> { - let nar = nar::open(sink).await?; - let path = self.path.absolute_path(); - let metadata = File::open(&path).await?.metadata().await?; + fn nar_url(&self, compressed_nar_hash: &[u8]) -> String { + let compressed_nar_hash = nixbase32::encode(compressed_nar_hash); + format!("nar/{compressed_nar_hash}.nar.zst") + } - if metadata.is_symlink() { - let target = read_link(&path).await?; - nar.symlink(target.as_os_str().as_encoded_bytes()).await; - } else if metadata.is_dir() { - let mut nar = nar.directory().await?; - nar_from_dir(path.into(), &mut nar).await; - nar.close().await; - } else if metadata.is_file() { - let perms = metadata.permissions().mode(); - let mut executable = false; - if (perms & 0o700) == 0o700 { - executable = true; - } - - let mut file = BufReader::new(File::open(&path).await?); - nar.file(executable, metadata.len(), &mut file).await; - } - - Ok(()) + async fn compress_nar(&self, nar: &[u8]) -> Vec { + let mut encoder = ZstdEncoder::with_quality(nar, Level::Default); + let mut compressed = Vec::with_capacity(nar.len()); + encoder + .read_to_end(&mut compressed) + .await + .expect("should compress just fine"); + compressed } } - -async fn nar_from_dir(path: PathBuf, node: &mut nar::Directory<'_, '_>) -> Result<()> { - let root = ReadDirStream::new(read_dir(&path).await?); - let entries = root - .map_ok(|x| (x.file_name(), x)) - .try_collect::>() - .await?; - - // directory entries must be written in ascending order of name - for (name, entry) in entries.iter() { - let node = node.entry(name.as_encoded_bytes()).await?; - let metadata = entry.metadata().await?; - - if metadata.is_symlink() { - let target = read_link(entry.path()).await?; - node.symlink(target.as_os_str().as_encoded_bytes()).await; - } else if metadata.is_dir() { - let mut node = node.directory().await?; - Box::pin(nar_from_dir(entry.path(), &mut node)).await; - node.close().await; - } else if metadata.is_file() { - let perms = metadata.permissions().mode(); - let mut executable = false; - if (perms & 0o700) == 0o700 { - executable = true; - } - - let mut file = BufReader::new(File::open(entry.path()).await?); - node.file(executable, metadata.len(), &mut file).await; - } - } - Ok(()) -} - -fn compress_and_hash_nar( - nar_file: File, - nar_hasher: &mut Sha256, - compressed_nar_hasher: &mut Sha256, -) -> impl AsyncRead { - let nar_reader = InspectReader::new(nar_file, |x| nar_hasher.update(x)); - let nar_buf_reader = BufReader::new(nar_reader); - - let compressed_nar_reader = ZstdEncoder::with_quality(nar_buf_reader, Level::Default); - InspectReader::new(compressed_nar_reader, |x| compressed_nar_hasher.update(x)) -} - -fn nar_url(compressed_nar_hash: &[u8]) -> String { - format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash)) -} - -async fn read_buf_nar(stream: &mut S, mut buf: BytesMut) -> Result { - while buf.len() < buf.capacity() { - let n = read_buf(stream, &mut buf).await?; - - if n == 0 { - break; - } - } - Ok(buf.freeze()) -}