diff --git a/Cargo.lock b/Cargo.lock index aee2282..2608cf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -102,6 +102,39 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "async-trait" +version = "0.1.88" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -429,7 +462,7 @@ dependencies = [ "rustls-native-certs 0.8.1", "rustls-pki-types", "tokio", - "tower", + "tower 0.5.2", "tracing", ] @@ -553,6 +586,53 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -657,6 +737,12 @@ version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.10.1" @@ -759,12 +845,62 @@ dependencies = [ "cc", ] +[[package]] +name = "codespan-reporting" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe6d2e5af09e8c8ad56c969f2157a3d4238cebc7c55f0a517728c38f7b200f81" +dependencies = [ + "serde", + "termcolor", + "unicode-width", +] + [[package]] name = "colorchoice" version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "console-api" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8030735ecb0d128428b64cd379809817e620a40e5001c54465b99ec5feec2857" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6539aa9c6a4cd31f4b1c040f860a1eac9aa80e7df6b05d506a6e7179936d6a01" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "hyper-util", + "prost", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -848,6 +984,21 @@ dependencies = [ "crc", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crypto-bigint" version = "0.4.9" @@ -907,6 +1058,65 @@ dependencies = [ "syn", ] +[[package]] +name = "cxx" +version = "1.0.157" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6354e975ea4ec28033ec3a36fa9baa1a02e3eb22ad740eeb4929370d4f5ba8" +dependencies = [ + "cc", + "cxxbridge-cmd", + "cxxbridge-flags", + "cxxbridge-macro", + "foldhash", + "link-cplusplus", +] + +[[package]] +name = "cxx-build" +version = "1.0.157" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b4400e26ea4b99417e4263b1ce2d8452404d750ba0809a7bd043072593d430d" +dependencies = [ + "cc", + "codespan-reporting", + "proc-macro2", + "quote", + "scratch", + "syn", +] + +[[package]] +name = "cxxbridge-cmd" +version = "1.0.157" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31860c98f69fc14da5742c5deaf78983e846c7b27804ca8c8319e32eef421bde" +dependencies = [ + "clap", + "codespan-reporting", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "cxxbridge-flags" +version = "1.0.157" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0402a66013f3b8d3d9f2d7c9994656cc81e671054822b0728d7454d9231892f" + +[[package]] +name = "cxxbridge-macro" +version = "1.0.157" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64c0b38f32d68f3324a981645ee39b2d686af36d03c98a386df3716108c9feae" +dependencies = [ + "proc-macro2", + "quote", + "rustversion", + "syn", +] + [[package]] name = "data-encoding" version = "2.9.0" @@ -1090,6 +1300,16 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" +[[package]] +name = "flate2" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ced92e76e966ca2fd84c8f7aa01a4aea65b0eb6648d72f7c8f3e2764a67fece" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1289,7 +1509,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap", + "indexmap 2.9.0", "slab", "tokio", "tokio-util", @@ -1308,13 +1528,19 @@ dependencies = [ "futures-core", "futures-sink", "http 1.3.1", - "indexmap", + "indexmap 2.9.0", "slab", "tokio", "tokio-util", "tracing", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.15.2" @@ -1326,6 +1552,19 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "flate2", + "nom 7.1.3", + "num-traits", +] + [[package]] name = "heck" version = "0.5.0" @@ -1424,6 +1663,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "humantime" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" + [[package]] name = "hyper" version = "0.14.32" @@ -1461,6 +1706,7 @@ dependencies = [ "http 1.3.1", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -1502,6 +1748,19 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.6.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -1677,6 +1936,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.9.0" @@ -1684,7 +1953,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -1772,6 +2041,15 @@ dependencies = [ "libc", ] +[[package]] +name = "link-cplusplus" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a6f6da007f968f9def0d65a05b187e2960183de70c160204ecfccf0ee330212" +dependencies = [ + "cc", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -1812,7 +2090,7 @@ version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" dependencies = [ - "hashbrown", + "hashbrown 0.15.2", ] [[package]] @@ -1824,6 +2102,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -1944,18 +2228,25 @@ dependencies = [ "async-compression", "aws-config", "aws-sdk-s3", + "bytes", "clap", + "console-subscriber", + "cxx", + "cxx-build", "ed25519-dalek", "futures", "nix-compat", + "pkg-config", "regex", "reqwest", "serde", "serde_json", "sha2", + "tempfile", "tokio", + "tokio-stream", + "tokio-util", "tracing", - "tracing-subscriber", "url", ] @@ -1978,16 +2269,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "nu-ansi-term" -version = "0.46.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" -dependencies = [ - "overload", - "winapi", -] - [[package]] name = "num-conv" version = "0.1.0" @@ -2098,12 +2379,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "p256" version = "0.11.1" @@ -2144,6 +2419,26 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pin-project" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677f1add503faace112b9f1373e43e9e054bfdd22ff1a63c1bc485eaec6a6a8a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -2188,6 +2483,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + [[package]] name = "prettyplease" version = "0.2.32" @@ -2216,6 +2520,38 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "prost-types" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16" +dependencies = [ + "prost", +] + [[package]] name = "quote" version = "1.0.40" @@ -2231,6 +2567,27 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + [[package]] name = "rand_core" version = "0.6.4" @@ -2334,7 +2691,7 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", - "tower", + "tower 0.5.2", "tower-service", "url", "wasm-bindgen", @@ -2538,6 +2895,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "scratch" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f6280af86e5f559536da57a45ebc84948833b3bee313a7dd25232e09c878a52" + [[package]] name = "sct" version = "0.7.1" @@ -2841,6 +3204,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "2.0.12" @@ -2926,6 +3298,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.52.0", ] @@ -2970,6 +3343,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.14" @@ -2995,11 +3379,61 @@ version = "0.22.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474" dependencies = [ - "indexmap", + "indexmap 2.9.0", "toml_datetime", "winnow", ] +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.22.1", + "bytes", + "h2 0.4.9", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "socket2", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.5.2" @@ -3059,17 +3493,6 @@ dependencies = [ "valuable", ] -[[package]] -name = "tracing-log" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" -dependencies = [ - "log", - "once_cell", - "tracing-core", -] - [[package]] name = "tracing-subscriber" version = "0.3.19" @@ -3077,15 +3500,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ "matchers", - "nu-ansi-term", "once_cell", "regex", "sharded-slab", - "smallvec", "thread_local", "tracing", "tracing-core", - "tracing-log", ] [[package]] @@ -3106,6 +3526,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unicode-width" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" + [[package]] name = "untrusted" version = "0.9.0" @@ -3296,27 +3722,14 @@ dependencies = [ ] [[package]] -name = "winapi" -version = "0.3.9" +name = "winapi-util" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", + "windows-sys 0.59.0", ] -[[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" - -[[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - [[package]] name = "windows-link" version = "0.1.1" @@ -3558,6 +3971,26 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zerocopy" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zerofrom" version = "0.1.6" diff --git a/Cargo.toml b/Cargo.toml index f7bc3f0..2430fdd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,16 @@ reqwest = "0.12.15" serde = { version = "1.0.219", features = [ "derive" ]} serde_json = "1.0.140" sha2 = "0.10.8" -tokio = { version = "1.44.1", features = [ "full" ]} +tokio = { version = "1.44.1", features = [ "full", "tracing", "parking_lot" ]} tracing = "0.1.41" -tracing-subscriber = { version = "0.3.19", features = ["env-filter"]} url = { version = "2.5.4", features = [ "serde" ]} +cxx = "1.0" +console-subscriber = "0.4.1" +bytes = "1.10.1" +tokio-stream = { version = "0.1.17", features = ["fs"] } +tempfile = "3.19.1" +tokio-util = { version = "0.7.14", features = ["io"] } + +[build-dependencies] +cxx-build = "1.0" +pkg-config = "0.3.32" diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..2bbe451 --- /dev/null +++ b/build.rs @@ -0,0 +1,21 @@ +fn main() { + cxx_build::bridge("src/bindings/mod.rs") + .file("src/bindings/nix.cpp") + .flag("-std=c++2a") + .flag("-O2") + .flag("-include") + .flag("nix/config.h") + .flag("-I") + .flag(concat!(env!("NIX_INCLUDE_PATH"), "/nix")) + .compile("nixbinding"); + println!("cargo:rerun-if-changed=src/bindings"); + + pkg_config::Config::new() + .atleast_version("2.4") + .probe("nix-store") + .unwrap(); + pkg_config::Config::new() + .atleast_version("2.4") + .probe("nix-main") + .unwrap(); +} diff --git a/flake.nix b/flake.nix index c263f49..46caa41 100644 --- a/flake.nix +++ b/flake.nix @@ -22,6 +22,7 @@ }; toolchain = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml; craneLib = (crane.mkLib pkgs).overrideToolchain(_: toolchain); + lib = pkgs.lib; in { devShells.default = pkgs.mkShell { @@ -29,9 +30,15 @@ pkg-config ]; buildInputs = with pkgs; [ - openssl toolchain + openssl + nix + boost + tokio-console ]; + NIX_INCLUDE_PATH = "${lib.getDev pkgs.nix}/include"; + RUST_LOG = "nixcp=debug"; + RUST_BACKGRACE = 1; }; packages.default = craneLib.buildPackage { diff --git a/src/bindings/mod.rs b/src/bindings/mod.rs new file mode 100644 index 0000000..701a15e --- /dev/null +++ b/src/bindings/mod.rs @@ -0,0 +1,234 @@ +/* +Copyright 2022 Zhaofeng Li and the Attic contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +//! `libnixstore` Bindings +#![allow(dead_code)] + +use std::cell::UnsafeCell; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use anyhow::Result; +use futures::stream::{Stream, StreamExt}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +// The C++ implementation takes care of concurrency +#[repr(transparent)] +pub struct FfiNixStore(UnsafeCell>); + +unsafe impl Send for FfiNixStore {} +unsafe impl Sync for FfiNixStore {} + +impl FfiNixStore { + pub fn store(&self) -> Pin<&mut ffi::CNixStore> { + unsafe { + let ptr = self.0.get().as_mut().unwrap(); + ptr.pin_mut() + } + } +} + +/// Obtain a handle to the Nix store. +pub unsafe fn open_nix_store() -> Result { + match ffi::open_nix_store() { + Ok(ptr) => { + let cell = UnsafeCell::new(ptr); + Ok(FfiNixStore(cell)) + } + Err(e) => Err(e.into()), + } +} + +// TODO: Benchmark different implementations +// (tokio, crossbeam, flume) +mod mpsc { + // Tokio + pub use tokio::sync::mpsc::{ + UnboundedReceiver, UnboundedSender, error::SendError, unbounded_channel, + }; +} + +/// Async write request. +#[derive(Debug)] +enum AsyncWriteMessage { + Data(Vec), + Error(String), + Eof, +} + +/// Async write request sender. +#[derive(Clone)] +pub struct AsyncWriteSender { + sender: mpsc::UnboundedSender, +} + +impl AsyncWriteSender { + fn send(&mut self, data: &[u8]) -> Result<(), mpsc::SendError> { + let message = AsyncWriteMessage::Data(Vec::from(data)); + self.sender.send(message) + } + + fn eof(&mut self) -> Result<(), mpsc::SendError> { + let message = AsyncWriteMessage::Eof; + self.sender.send(message) + } + + pub(crate) fn rust_error( + &mut self, + error: impl std::error::Error, + ) -> Result<(), impl std::error::Error> { + let message = AsyncWriteMessage::Error(error.to_string()); + self.sender.send(message) + } +} + +/// A wrapper of the `AsyncWrite` trait for the synchronous Nix C++ land. +pub struct AsyncWriteAdapter { + receiver: mpsc::UnboundedReceiver, + eof: bool, +} + +impl AsyncWriteAdapter { + pub fn new() -> (Self, Box) { + let (sender, receiver) = mpsc::unbounded_channel(); + + let r = Self { + receiver, + eof: false, + }; + let sender = Box::new(AsyncWriteSender { sender }); + + (r, sender) + } + + /// Write everything the sender sends to us. + pub async fn write_all(mut self, mut writer: Box) -> Result<()> { + let writer = writer.as_mut(); + + while let Some(data) = self.next().await { + match data { + Ok(v) => { + writer.write_all(&v).await?; + } + Err(e) => { + return Err(e); + } + } + } + + if !self.eof { + Err(io::Error::from(io::ErrorKind::BrokenPipe).into()) + } else { + Ok(()) + } + } +} + +impl Stream for AsyncWriteAdapter { + type Item = Result>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.receiver.poll_recv(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(message)) => { + use AsyncWriteMessage::*; + match message { + Data(v) => Poll::Ready(Some(Ok(v))), + Error(exception) => { + let error = anyhow::Error::msg(format!("cxx error: {exception}")); + Poll::Ready(Some(Err(error))) + } + Eof => { + self.eof = true; + Poll::Ready(None) + } + } + } + Poll::Ready(None) => { + if !self.eof { + Poll::Ready(Some(Err(io::Error::from(io::ErrorKind::BrokenPipe).into()))) + } else { + Poll::Ready(None) + } + } + } + } +} + +#[cxx::bridge] +/// Generated by `cxx.rs`. +/// +/// Mid-level wrapper of `libnixstore` implemented in C++. +mod ffi { + extern "Rust" { + type AsyncWriteSender; + fn send(self: &mut AsyncWriteSender, data: &[u8]) -> Result<()>; + fn eof(self: &mut AsyncWriteSender) -> Result<()>; + } + + unsafe extern "C++" { + include!("nixcp/src/bindings/nix.hpp"); + + // ========= + // CNixStore + // ========= + + /// Mid-level wrapper for the Unix Domain Socket Nix Store. + type CNixStore; + + /// Queries information about a valid path. + fn query_path_info( + self: Pin<&mut CNixStore>, + store_path: &[u8], + ) -> Result>; + + /// Computes the closure of a valid path. + /// + /// If `flip_directions` is true, the set of paths that can reach `store_path` is + /// returned. + fn compute_fs_closure( + self: Pin<&mut CNixStore>, + store_path: &[u8], + flip_direction: bool, + include_outputs: bool, + include_derivers: bool, + ) -> Result>>; + + /// Obtains a handle to the Nix store. + fn open_nix_store() -> Result>; + + /// Creates a NAR dump from a path. + fn nar_from_path( + self: Pin<&mut CNixStore>, + base_name: Vec, + sender: Box, + ) -> Result<()>; + + // ========= + // CPathInfo + // ========= + + /// Mid-level wrapper for the `nix::ValidPathInfo` struct. + type CPathInfo; + + /// Returns the references of the store path. + fn references(self: Pin<&mut CPathInfo>) -> UniquePtr>; + + /// Returns the possibly invalid signatures attached to the store path. + fn sigs(self: Pin<&mut CPathInfo>) -> UniquePtr>; + } +} diff --git a/src/bindings/nix.cpp b/src/bindings/nix.cpp new file mode 100644 index 0000000..326e878 --- /dev/null +++ b/src/bindings/nix.cpp @@ -0,0 +1,124 @@ +/* +Copyright 2022 Zhaofeng Li and the Attic contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// C++ side of the libnixstore glue. +// +// We implement a mid-level wrapper of the Nix Store interface, +// which is then wrapped again in the Rust side to enable full +// async-await operation. +// +// Here we stick with the naming conventions of Rust and handle +// Rust types directly where possible, so that the interfaces are +// satisfying to use from the Rust side via cxx.rs. + +#include "nixcp/src/bindings/nix.hpp" + +static std::mutex g_init_nix_mutex; +static bool g_init_nix_done = false; + +static nix::StorePath store_path_from_rust(RBasePathSlice base_name) { + std::string_view sv((const char *)base_name.data(), base_name.size()); + return nix::StorePath(sv); +} + +// ======== +// RustSink +// ======== + +RustSink::RustSink(RBox sender) : sender(std::move(sender)) {} + +void RustSink::operator () (std::string_view data) { + RBasePathSlice s((const unsigned char *)data.data(), data.size()); + + this->sender->send(s); +} + +void RustSink::eof() { + this->sender->eof(); +} + + +// ========= +// CPathInfo +// ========= + +CPathInfo::CPathInfo(nix::ref pi) : pi(pi) {} + +std::unique_ptr> CPathInfo::sigs() { + std::vector result; + for (auto&& elem : this->pi->sigs) { + result.push_back(std::string(elem)); + } + return std::make_unique>(result); +} + +std::unique_ptr> CPathInfo::references() { + std::vector result; + for (auto&& elem : this->pi->references) { + result.push_back(std::string(elem.to_string())); + } + return std::make_unique>(result); +} + +// ========= +// CNixStore +// ========= + +CNixStore::CNixStore() { + std::map params; + std::lock_guard lock(g_init_nix_mutex); + + if (!g_init_nix_done) { + nix::initNix(); + g_init_nix_done = true; + } + + this->store = nix::openStore(nix::settings.storeUri.get(), params); +} + +std::unique_ptr CNixStore::query_path_info(RBasePathSlice base_name) { + auto store_path = store_path_from_rust(base_name); + + auto r = this->store->queryPathInfo(store_path); + return std::make_unique(r); +} + +std::unique_ptr> CNixStore::compute_fs_closure(RBasePathSlice base_name, bool flip_direction, bool include_outputs, bool include_derivers) { + std::set out; + + this->store->computeFSClosure(store_path_from_rust(base_name), out, flip_direction, include_outputs, include_derivers); + + std::vector result; + for (auto&& elem : out) { + result.push_back(std::string(elem.to_string())); + } + return std::make_unique>(result); +} + +void CNixStore::nar_from_path(RVec base_name, RBox sender) { + RustSink sink(std::move(sender)); + + std::string_view sv((const char *)base_name.data(), base_name.size()); + nix::StorePath store_path(sv); + + // exceptions will be thrown into Rust + this->store->narFromPath(store_path, sink); + sink.eof(); +} + +std::unique_ptr open_nix_store() { + return std::make_unique(); +} diff --git a/src/bindings/nix.hpp b/src/bindings/nix.hpp new file mode 100644 index 0000000..5c79a33 --- /dev/null +++ b/src/bindings/nix.hpp @@ -0,0 +1,88 @@ +/* +Copyright 2022 Zhaofeng Li and the Attic contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// C++ side of the libnixstore glue. +// +// We implement a mid-level wrapper of the Nix Store interface, +// which is then wrapped again in the Rust side to enable full +// async-await operation. +// +// Here we stick with the naming conventions of Rust and handle +// Rust types directly where possible, so that the interfaces are +// satisfying to use from the Rust side via cxx.rs. + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +template using RVec = rust::Vec; +template using RBox = rust::Box; +template using RSlice = rust::Slice; +using RString = rust::String; +using RStr = rust::Str; +using RBasePathSlice = RSlice; +using RHashSlice = RSlice; + +struct AsyncWriteSender; + +struct RustSink : nix::Sink +{ + RBox sender; +public: + RustSink(RBox sender); + void operator () (std::string_view data) override; + void eof(); +}; + +// Opaque wrapper for nix::ValidPathInfo +class CPathInfo { + nix::ref pi; +public: + CPathInfo(nix::ref pi); + std::unique_ptr> sigs(); + std::unique_ptr> references(); +}; + +class CNixStore { + std::shared_ptr store; +public: + CNixStore(); + + RString store_dir(); + std::unique_ptr query_path_info(RBasePathSlice base_name); + std::unique_ptr> compute_fs_closure( + RBasePathSlice base_name, + bool flip_direction, + bool include_outputs, + bool include_derivers); + void nar_from_path(RVec base_name, RBox sender); +}; + +std::unique_ptr open_nix_store(); + +// Relies on our definitions +#include "nixcp/src/bindings/mod.rs.h" diff --git a/src/main.rs b/src/main.rs index 57d3340..f4c0f4f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,19 @@ #![feature(let_chains)] #![feature(extend_one)] +use std::path::PathBuf; + use anyhow::{Context, Result}; use clap::{Args, Parser, Subcommand}; -use tracing_subscriber::{EnvFilter, FmtSubscriber}; use push::Push; +use store::Store; +mod bindings; mod cli; mod path_info; mod push; +mod store; mod uploader; #[derive(Parser, Debug)] @@ -60,26 +64,25 @@ pub struct PushArgs { #[arg(long)] skip_signature_check: bool, - /// Package or store path to upload - /// e.g. nixpkgs#hello or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 - #[arg(value_name = "package or store path")] - package: String, + /// Path to upload + /// e.g. ./result or /nix/store/y4qpcibkj767szhjb58i2sidmz8m24hb-hello-2.12.1 + #[arg(value_name = "PATH")] + paths: Vec, } #[tokio::main] async fn main() -> Result<()> { - let filter = EnvFilter::from_default_env(); - let subscriber = FmtSubscriber::builder().with_env_filter(filter).finish(); - tracing::subscriber::set_global_default(subscriber)?; + console_subscriber::init(); let cli = Cli::parse(); match &cli.command { Commands::Push(cli) => { - let push = Box::leak(Box::new(Push::new(cli).await?)); - push.paths_from_package(&cli.package) + let store = Store::connect()?; + let push = Box::leak(Box::new(Push::new(cli, store).await?)); + push.add_paths(cli.paths.clone()) .await - .context("nixcp get paths from package")?; + .context("add paths to push")?; push.run().await.context("nixcp run")?; } } diff --git a/src/path_info.rs b/src/path_info.rs index 6dcbb53..e62b68a 100644 --- a/src/path_info.rs +++ b/src/path_info.rs @@ -1,85 +1,60 @@ use std::collections::HashSet; -use anyhow::{Context, Error, Result}; +use anyhow::{Context, Result}; use aws_sdk_s3 as s3; +use futures::future::join_all; use nix_compat::nixbase32; use nix_compat::store_path::StorePath; use regex::Regex; -use serde::Deserialize; +use std::path::Path; use tokio::process::Command; -use tracing::{debug, error, trace}; +use tracing::{debug, trace}; use url::Url; -// nix path-info --derivation --json -#[derive(Debug, Clone, Deserialize)] +use crate::store::Store; + +#[derive(Debug, Clone)] pub struct PathInfo { - pub deriver: Option>, pub path: StorePath, - signatures: Option>, + pub signatures: Vec, pub references: Vec>, } impl PathInfo { - // get PathInfo for a package or a store path - // we deserialize this as an array of `PathInfo` below - pub async fn from_path(path: &str) -> Result { - debug!("query nix path-info for {path}"); - // use lix cause nix would return a json map instead of an array - // json output is not stable and could break in future - // TODO figure out a better way - let nix_cmd = Command::new("nix") - .arg("run") - .arg("--experimental-features") - .arg("nix-command flakes") - .arg("github:nixos/nixpkgs/nixos-unstable#lix") - .arg("--") - .arg("path-info") - .arg("--json") - .arg(path) - .output() - .await - .context("`nix path-info` failed for {package}")?; + pub async fn from_path(path: &Path, store: &Store) -> Result { + debug!("query path info for {:?}", path); - trace!( - "nix path-info output: {}", - String::from_utf8_lossy(&nix_cmd.stdout) - ); - - // nix path-info returns an array with one element - match serde_json::from_slice::>(&nix_cmd.stdout) - .context("parse path info from stdout") - { - Ok(path_info) => path_info - .into_iter() - .next() - .ok_or_else(|| Error::msg("nix path-info returned empty")), - Err(e) => { - error!( - "Failed to parse data from `nix path-info`. The path may not exist on your system." - ); - Err(e) + let derivation = match path.extension() { + Some(ext) if ext == "drv" => path.as_os_str().as_encoded_bytes(), + _ => { + &Command::new("nix") + .arg("path-info") + .arg("--derivation") + .arg(path) + .output() + .await + .context(format!("run command: nix path-info --derivaiton {path:?}"))? + .stdout } - } + }; + let derivation = String::from_utf8_lossy(derivation); + debug!("derivation: {derivation}"); + + let store_path = StorePath::from_absolute_path(derivation.trim().as_bytes()) + .context("storepath from derivation")?; + store + .query_path_info(store_path) + .await + .context("query pathinfo for derivation") } - pub async fn get_closure(&self) -> Result> { - debug!("query nix-store for {}", self.absolute_path()); - let nix_store_cmd = Command::new("nix-store") - .arg("--query") - .arg("--requisites") - .arg("--include-outputs") - .arg(self.absolute_path()) - .output() - .await - .expect("nix-store cmd failed"); - - let nix_store_paths = String::from_utf8(nix_store_cmd.stdout)?; - let nix_store_paths: Vec<&str> = nix_store_paths.lines().collect(); - let mut closure = Vec::with_capacity(nix_store_paths.len()); - for path in nix_store_paths { - closure.push(Self::from_path(path).await?); - } - Ok(closure) + pub async fn get_closure(&self, store: &Store) -> Result> { + let futs = store + .compute_fs_closure(self.path.clone()) + .await? + .into_iter() + .map(|x| store.query_path_info(x)); + join_all(futs).await.into_iter().collect() } /// checks if the path is signed by any upstream. if it is, we assume a cache hit. @@ -101,15 +76,13 @@ impl PathInfo { } fn signees(&self) -> Vec<&str> { - if let Some(signatures) = self.signatures.as_ref() { - let signees: Vec<_> = signatures - .iter() - .filter_map(|signature| Some(signature.split_once(":")?.0)) - .collect(); - trace!("signees for {}: {:?}", self.path, signees); - return signees; - } - Vec::new() + let signers: Vec<_> = self + .signatures + .iter() + .filter_map(|signature| Some(signature.split_once(":")?.0)) + .collect(); + trace!("signers for {}: {:?}", self.path, signers); + return signers; } pub async fn check_upstream_hit(&self, upstreams: &[Url]) -> bool { @@ -149,69 +122,3 @@ impl PathInfo { .is_ok() } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn get_signees_from_path_info() { - let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#; - let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize"); - - path_info.signatures = Some(vec![ - "cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(), - "nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(), - ]); - let signees = path_info.signees(); - assert_eq!(signees, vec!["cache.nixos.org-1", "nixcache.cy7.sh"]); - } - - #[test] - fn match_upstream_cache_from_signature() { - let path_info_json = r#"{"deriver":"/nix/store/idy9slp6835nm6x2i41vzm4g1kai1m2p-nixcp-0.1.0.drv.drv","narHash":"sha256-BG5iQEKKOM7d4199942ReE+bZxQDGDuOZqQ5jkTp45o=","narSize":27851376,"path":"/nix/store/giv6gcnv0ymqgi60dx0fsk2l1pxdd1n0-nixcp-0.1.0","references":["/nix/store/954l60hahqvr0hbs7ww6lmgkxvk8akdf-openssl-3.4.1","/nix/store/ik84lbv5jvjm1xxvdl8mhg52ry3xycvm-gcc-14-20241116-lib","/nix/store/rmy663w9p7xb202rcln4jjzmvivznmz8-glibc-2.40-66"],"registrationTime":1744643248,"signatures":["nixcache.cy7.sh:n1lnCoT16xHcuV+tc+/TbZ2m+UKuI15ok+3cg2i5yFHO8+QVUn0x+tOSy6bZ+KxWl4PvmIjUQN1Kus0efn46Cw=="],"valid":true}"#; - let mut path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize"); - - path_info.signatures = Some(vec![ - "cache.nixos.org-1:sRAGxSFkQ6PGzPGs9caX6y81tqfevIemSSWZjeD7/v1X0J9kEeafaFgz+zBD/0k8imHSWi/leCoIXSCG6/MrCw==".to_string(), - "nixcache.cy7.sh:hV1VQvztp8UY7hq/G22uzC3vQp4syBtnpJh21I1CRJykqweohb4mdS3enyi+9xXqAUZMfNrZuRFSySqa5WK1Dg==".to_string(), - "nix-community.cachix.org-1:mB9FSh9qf2dCimDSUo8Zy7bkq5CX+/rkCWyvRCYg3Fs=".to_string(), - ]); - assert!( - path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()]) - ); - assert!( - path_info.check_upstream_signature(&[Url::parse("https://nixcache.cy7.sh").unwrap()]) - ); - assert!( - path_info.check_upstream_signature(&[ - Url::parse("https://nix-community.cachix.org").unwrap() - ]) - ); - assert!( - !path_info - .check_upstream_signature(&[Url::parse("https://fake-cache.cachix.org").unwrap()]), - ); - } - - #[test] - fn path_info_without_signature() { - let path_info_json = r#"{"ca":"fixed:r:sha256:1q10p04pgx9sk6xbvrkn4nvh0ys2lzplgcni5368f4z3cr8ikbmz","narHash":"sha256-v64ZUWbjE4fMKNGyR++nQnsAtyV25r26mTr1dwm4IOA=","narSize":5520,"path":"/nix/store/gj6hz9mj23v01yvq1nn5f655jrcky1qq-nixos-option.nix","references":[],"registrationTime":1744740942,"valid":true}"#; - let path_info: PathInfo = serde_json::from_str(path_info_json).expect("must serialize"); - - assert!( - !path_info.check_upstream_signature(&[Url::parse("https://cache.nixos.org").unwrap()]) - ); - } - - /* - #[test] - fn path_info_deserialize_nix_map() { - let path_info_json = r#"{"/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation":{"ca":null,"deriver":"/nix/store/h8z25s6arcrns5nmrq1yhgbamywjivpn-home-manager-generation.drv","narHash":"sha256-o4qwqyJ5UVm9cyC/nBNcNYVnIM14Pewgw7fou+wUVSY=","narSize":13608,"references":["/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/4d0ix5djms3n2jnjdc58l916cwack1rp-empty-directory","/nix/store/56zmgla8443qfrkrh2ch0vz0mh8ywrw1-home-manager-files","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/80l1sb3vcmrkcdd7ihlizkcnv19rq9fj-ncurses-6.5","/nix/store/8vm1jxsc0jphd65vb7r6g5ysgqw0yh9f-home-manager-generation","/nix/store/92as847i10kl6s19fi910ddyk9l83835-check-link-targets.sh","/nix/store/9c90iz95yynyh3vsc67zndch6j01vgz3-home-manager-path","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bm5fi6wj0w4r2wjll2448k307bzfcjwx-cleanup","/nix/store/c244fsb3a7i5837lzn94m4bmav9i5p9b-link","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/gpxsdrrd4x93fs75395vr2dfys1ki9mq-jq-1.7.1-bin","/nix/store/jlf743lqxbvad6dbgndsgqfg20m2np5i-sd-switch-0.5.3","/nix/store/mhmgm739aagj4x7hr6ag2wjmxhmpy8mf-gettext-0.22.5","/nix/store/w9db12j05yv5hl31s6jndd9cfm1g1gw4-hm-modules-messages","/nix/store/wj1c3gsiajabnq50ifxqnlv60i5rhqj7-diffutils-3.10","/nix/store/xhql0ilzbiqwnmz4z8y0phk611wynxf2-gnused-4.9","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zhrjg6wxrxmdlpn6iapzpp2z2vylpvw5-home-manager.sh"],"registrationTime":1744742989,"signatures":["nixcache.cy7.sh:Vq4X95kSzum7BwrBhjmmM2yVipfBI3AE3jgZ3b3RoYrP4/ghotbDdlwCvwK3qx4BQdEOLSgrC1tDwiMNb6oRBw=="],"ultimate":false}}"#; - serde_json::from_str::>(path_info_json).expect("must serialize"); - - let path_info_json = r#"{"/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b":{"ca":null,"deriver":"/nix/store/12ssi931481jlkizgfk1c1jnawvwjbhh-nixos-system-chunk-25.05.20250412.2631b0b.drv","narHash":"sha256-CHhBIzMD4v/FKqKgGroq0UC1k3GrK5lcNwQPMpv2xLc=","narSize":20704,"references":["/nix/store/0yjiyixxsr137iw93hnaacdsssy1li9h-switch-to-configuration-0.1.0","/nix/store/14rby7cpwrzjsjym44cl5h6nj6qpn1gs-etc","/nix/store/3a2ahdaprw6df0lml1pj9jhbi038dsjh-nixos-system-chunk-25.05.20250412.2631b0b","/nix/store/3wjljpj30fvv2cdb60apr4126pa5bm87-shadow-4.17.2","/nix/store/40yifhx34v4g4llrdn3v2ag8w02j10fv-gnugrep-3.11","/nix/store/58br4vk3q5akf4g8lx0pqzfhn47k3j8d-bash-5.2p37","/nix/store/5dyh8l59kfvf89zjkbmjfnx7fix93n4f-net-tools-2.10","/nix/store/aq9wdsz12bg9252790l9awiry2bml4ls-sops-install-secrets-0.0.1","/nix/store/b00kq6fjhgisdrykg621vml8505nnmb3-users-groups.json","/nix/store/b2cfj7yk3wfg1jdwjzim7306hvsc5gnl-systemd-257.3","/nix/store/bfr68wi6k8icb3j9fy3fzchva56djfhd-mounts.sh","/nix/store/cjnihsds5hhnji9r85hglph07q9y9hgc-system-path","/nix/store/cvlbhhrvzfkjl2hrrzhq3vr5gzan1r60-bash-interactive-5.2p37","/nix/store/f9jll96j74f5ykvs062718b98lfjbn9g-util-linux-2.40.4-bin","/nix/store/h7zih134d3n5yk8pnhv1fa38n6qkyrn2-pre-switch-checks","/nix/store/idn5n51246piyxcr3v6gxnj5a5l9mzpn-linux-6.14.2","/nix/store/ipn5793y61x2904xqnkgbjnp91svjjzx-perl-5.40.0-env","/nix/store/j1rikvl25pz0b5ham1ijq0nbg1q2fqfy-initrd-linux-6.14.2","/nix/store/jgawnqyh6piwcl79gxpmq5czx9rfr9xh-glibc-locales-2.40-66","/nix/store/jqgmcv8j4gj59218hcbiyn8z951rycdj-install-grub.sh","/nix/store/kpmybhxy3gz6k1znbdirwsp3c6wvsgg9-manifest.json","/nix/store/lgainx4gl6q7mhiwmls81d3n51p5jz7z-linux-6.14.2-modules","/nix/store/mhxn5kwnri3z9hdzi3x0980id65p0icn-lib.sh","/nix/store/n8n0faszqlnf3mdg0fj6abnknrhjsw5j-perl-5.40.0-env","/nix/store/nq61v7a601gjndijq5nndprkzpwz4q9g-glibc-2.40-66-bin","/nix/store/nx27idxpvi3fk3p7admvhipny73nr25n-kmod-31","/nix/store/pggww1d2pg24fcg5v36xn63n53vanyyi-gnupg-2.4.7","/nix/store/rg5rf512szdxmnj9qal3wfdnpfsx38qi-setup-etc.pl","/nix/store/vvlfaafnz3pdhw7lx5kc5gb9pl4zhz5l-local-cmds","/nix/store/w142vx7ij1fz6qwhp5dprkf59cizvv1v-update-users-groups.pl","/nix/store/xq5f95pp297afc2xjgrmhmf9w631qp7m-findutils-4.10.0","/nix/store/yh6qg1nsi5h2xblcr67030pz58fsaxx3-coreutils-9.6","/nix/store/zlsmh0ccgvncg30qb4y0mp5pahnk1wnw-append-initrd-secrets","/nix/store/zs07icpv5ykf8m36xcv717hh26bp09fa-firmware","/nix/store/zy2n4id5gcxcbx2x8jbblkmcpdlpsypk-getent-glibc-2.40-66"],"registrationTime":1744743136,"signatures":["nixcache.cy7.sh:dZ1XiKQNe0fRX48gBj03PIABYJGV6BPwb72YpMqEBONZMF+JrkVKhRCF0ur/4Bf5prHxg6Qfg1ytP/4csRC9DQ=="],"ultimate":false}}"#; - serde_json::from_str::>(path_info_json).expect("must serialize"); - } - */ -} diff --git a/src/push.rs b/src/push.rs index 719d3a8..18f74b9 100644 --- a/src/push.rs +++ b/src/push.rs @@ -1,6 +1,7 @@ use std::{ fs, iter::once, + path::PathBuf, sync::{ Arc, atomic::{AtomicUsize, Ordering}, @@ -13,10 +14,10 @@ use aws_sdk_s3 as s3; use futures::future::join_all; use nix_compat::narinfo::{self, SigningKey}; use tokio::sync::{RwLock, mpsc}; -use tracing::{debug, info, trace}; +use tracing::debug; use url::Url; -use crate::{PushArgs, path_info::PathInfo, uploader::Uploader}; +use crate::{PushArgs, path_info::PathInfo, store::Store, uploader::Uploader}; pub struct Push { upstream_caches: Vec, @@ -24,6 +25,7 @@ pub struct Push { s3_client: s3::Client, signing_key: SigningKey, bucket: String, + store: Arc, // paths that we skipped cause of a signature match signature_hit_count: AtomicUsize, // paths that we skipped cause we found it on an upstream @@ -35,7 +37,7 @@ pub struct Push { } impl Push { - pub async fn new(cli: &PushArgs) -> Result { + pub async fn new(cli: &PushArgs, store: Store) -> Result { let mut upstreams = Vec::with_capacity(cli.upstreams.len() + 1); for upstream in cli .upstreams @@ -67,6 +69,7 @@ impl Push { s3_client, signing_key, bucket: cli.bucket.clone(), + store: Arc::new(store), signature_hit_count: AtomicUsize::new(0), upstream_hit_count: AtomicUsize::new(0), already_exists_count: AtomicUsize::new(0), @@ -74,18 +77,33 @@ impl Push { }) } - pub async fn paths_from_package(&mut self, package: &str) -> Result<()> { - let path_info = PathInfo::from_path(package) + pub async fn add_paths(&'static self, paths: Vec) -> Result<()> { + let mut futs = Vec::with_capacity(paths.len()); + for path in paths { + let store_paths = self.store_paths.clone(); + let store = self.store.clone(); + + futs.push(tokio::spawn(async move { + let path_info = PathInfo::from_path(path.as_path(), &store) + .await + .context("get path info for path")?; + debug!("path-info for {path:?}: {path_info:?}"); + + store_paths.write().await.extend( + path_info + .get_closure(&store) + .await + .context("closure from path info")?, + ); + Ok(()) + })); + } + join_all(futs) .await - .context("get path info for package")?; - debug!("path-info for {package}: {:?}", path_info); - self.store_paths.write().await.extend( - path_info - .get_closure() - .await - .context("closure from path info")?, - ); - info!("found {} store paths", self.store_paths.read().await.len()); + .into_iter() + .flatten() + .collect::>>()?; + println!("found {} store paths", self.store_paths.read().await.len()); Ok(()) } @@ -106,7 +124,7 @@ impl Push { for path in store_paths.into_iter() { if path.check_upstream_signature(&self.upstream_caches) { - trace!("skip {} (signature match)", path.absolute_path()); + debug!("skip {} (signature match)", path.absolute_path()); self.signature_hit_count.fetch_add(1, Ordering::Release); continue; } @@ -121,13 +139,13 @@ impl Push { .check_if_already_exists(&self.s3_client, self.bucket.clone()) .await { - trace!("skip {} (already exists)", path.absolute_path()); + debug!("skip {} (already exists)", path.absolute_path()); self.already_exists_count.fetch_add(1, Ordering::Relaxed); } else { tx.send(path).await.unwrap(); } } else { - trace!("skip {} (upstream hit)", path.absolute_path()); + debug!("skip {} (upstream hit)", path.absolute_path()); self.upstream_hit_count.fetch_add(1, Ordering::Relaxed); } }) @@ -142,13 +160,12 @@ impl Push { } async fn upload(&'static self, mut rx: mpsc::Receiver) -> Result<()> { - let mut uploads = Vec::with_capacity(10); + let mut uploads = Vec::new(); loop { if let Some(path_to_upload) = rx.recv().await { - let absolute_path = path_to_upload.absolute_path(); + println!("uploading: {}", path_to_upload.absolute_path()); - println!("uploading: {}", absolute_path); let uploader = Uploader::new( &self.signing_key, path_to_upload, @@ -156,10 +173,12 @@ impl Push { self.bucket.clone(), )?; - uploads.push(tokio::spawn(async move { - let res = uploader.upload().await; - self.upload_count.fetch_add(1, Ordering::Relaxed); - res + uploads.push(tokio::spawn({ + async move { + let res = uploader.upload().await; + self.upload_count.fetch_add(1, Ordering::Relaxed); + res + } })); } else { join_all(uploads) diff --git a/src/store.rs b/src/store.rs new file mode 100644 index 0000000..0433362 --- /dev/null +++ b/src/store.rs @@ -0,0 +1,96 @@ +use std::{ffi::OsStr, os::unix::ffi::OsStrExt, sync::Arc}; + +use anyhow::{Context, Result}; +use nix_compat::store_path::StorePath; +use tokio::task; + +use crate::{ + bindings::{self, AsyncWriteAdapter}, + path_info::PathInfo, +}; + +pub struct Store { + inner: Arc, +} + +impl Store { + pub fn connect() -> Result { + let inner = unsafe { bindings::open_nix_store()? }; + Ok(Self { + inner: Arc::new(inner), + }) + } + + pub async fn compute_fs_closure( + &self, + path: StorePath, + ) -> Result>> { + let inner = self.inner.clone(); + task::spawn_blocking(move || { + let cxx_vector = + inner + .store() + .compute_fs_closure(path.to_string().as_bytes(), false, true, true)?; + Ok(cxx_vector + .iter() + .map(|x| { + StorePath::from_bytes(x.as_bytes()) + .context("make StorePath from vector returned by compute_fs_closure") + }) + .collect::>()?) + }) + .await + .unwrap() + } + + pub async fn query_path_info(&self, path: StorePath) -> Result { + let inner = self.inner.clone(); + + task::spawn_blocking(move || { + let mut c_path_info = inner + .store() + .query_path_info(path.to_string().as_bytes()) + .context("query cpp for path info")?; + + let signatures = c_path_info + .pin_mut() + .sigs() + .into_iter() + .map(|x| { + let osstr = OsStr::from_bytes(x.as_bytes()); + osstr.to_str().unwrap().to_string() + }) + .collect(); + let references = c_path_info + .pin_mut() + .references() + .into_iter() + .map(|x| StorePath::from_bytes(x.as_bytes())) + .collect::>() + .context("get references from pathinfo")?; + + Ok(PathInfo { + path, + signatures, + references, + }) + }) + .await + .unwrap() + } + + pub fn stream_nar(&self, path: StorePath) -> AsyncWriteAdapter { + let inner = self.inner.clone(); + let (adapter, mut sender) = AsyncWriteAdapter::new(); + + task::spawn_blocking(move || { + if let Err(e) = inner + .store() + .nar_from_path(path.to_string().as_bytes().to_vec(), sender.clone()) + { + let _ = sender.rust_error(e); + } + }); + adapter + } +} diff --git a/src/uploader.rs b/src/uploader.rs index b0520ac..8f4efaa 100644 --- a/src/uploader.rs +++ b/src/uploader.rs @@ -1,20 +1,30 @@ +use std::{collections::BTreeMap, os::unix::fs::PermissionsExt, path::PathBuf}; + use anyhow::Result; use async_compression::{Level, tokio::bufread::ZstdEncoder}; use aws_sdk_s3::{ self as s3, types::{CompletedMultipartUpload, CompletedPart}, }; -use futures::future::join_all; +use bytes::{BufMut, Bytes, BytesMut}; +use futures::{future::join_all, stream::TryStreamExt}; use nix_compat::{ + nar::writer::r#async as nar, narinfo::{self, NarInfo, SigningKey}, nixbase32, store_path::StorePath, }; use sha2::{Digest, Sha256}; -use tokio::{io::AsyncReadExt, process::Command}; +use tokio::{ + fs::{File, read_dir, read_link}, + io::{AsyncRead, BufReader}, + pin, +}; +use tokio_stream::wrappers::ReadDirStream; +use tokio_util::io::{InspectReader, read_buf}; use tracing::debug; -use crate::path_info::PathInfo; +use crate::{bindings::AsyncWriteAdapter, path_info::PathInfo, store::Store}; const MULTIPART_CUTOFF: usize = 1024 * 1024 * 5; @@ -23,6 +33,7 @@ pub struct Uploader<'a> { path: PathInfo, s3_client: &'a s3::Client, bucket: String, + store: &'a Store, } impl<'a> Uploader<'a> { @@ -31,37 +42,37 @@ impl<'a> Uploader<'a> { path: PathInfo, s3_client: &'a s3::Client, bucket: String, + store: &'a Store, ) -> Result { Ok(Self { signing_key, path, s3_client, bucket, + store, }) } pub async fn upload(&self) -> Result<()> { - let nar = self.make_nar().await?; - let mut nar_info = self.narinfo_from_nar(&nar)?; - let nar = self.compress_nar(&nar).await; + let mut nar_temp = File::open(tempfile::Builder::new().tempfile()?.path()).await?; + self.make_nar(&mut nar_temp).await; - // update fields that we know after compression - let mut hasher = Sha256::new(); - hasher.update(&nar); - let hash: [u8; 32] = hasher.finalize().into(); - let nar_url = self.nar_url(&hash); - nar_info.file_hash = Some(hash); - nar_info.file_size = Some(nar.len() as u64); - nar_info.url = nar_url.as_str(); - debug!("uploading nar with key: {nar_url}"); + // this goes to the .narinfo file + let mut nar_hasher = Sha256::new(); + // this is the URL for file .narinfo points to + let mut file_hasher = Sha256::new(); + let nar_reader = compress_and_hash_nar(nar_temp, &mut nar_hasher, &mut file_hasher); - if nar.len() < MULTIPART_CUTOFF { + let buf = BytesMut::with_capacity(MULTIPART_CUTOFF); + let + + if first_chunk.len() < MULTIPART_CUTOFF { let put_object = self .s3_client .put_object() .bucket(&self.bucket) .key(&nar_url) - .body(nar.into()) + .body(first_chunk.into()) .send() .await?; debug!("put object: {:#?}", put_object); @@ -132,20 +143,11 @@ impl<'a> Uploader<'a> { .body(nar_info.to_string().as_bytes().to_vec().into()) .send() .await?; + debug!("done uploading narinfo"); Ok(()) } - async fn make_nar(&self) -> Result> { - Ok(Command::new("nix") - .arg("nar") - .arg("dump-path") - .arg(self.path.absolute_path()) - .output() - .await? - .stdout) - } - fn narinfo_from_nar(&self, nar: &[u8]) -> Result { let mut hasher = Sha256::new(); hasher.update(nar); @@ -159,7 +161,7 @@ impl<'a> Uploader<'a> { signatures: Vec::new(), ca: None, system: None, - deriver: self.path.deriver.as_ref().map(|x| x.as_ref()), + deriver: None, compression: Some("zstd"), file_hash: None, file_size: None, @@ -170,18 +172,89 @@ impl<'a> Uploader<'a> { Ok(nar_info) } - fn nar_url(&self, compressed_nar_hash: &[u8]) -> String { - let compressed_nar_hash = nixbase32::encode(compressed_nar_hash); - format!("nar/{compressed_nar_hash}.nar.zst") - } + async fn make_nar(&self, sink: &mut File) -> Result<()> { + let nar = nar::open(sink).await?; + let path = self.path.absolute_path(); + let metadata = File::open(&path).await?.metadata().await?; - async fn compress_nar(&self, nar: &[u8]) -> Vec { - let mut encoder = ZstdEncoder::with_quality(nar, Level::Default); - let mut compressed = Vec::with_capacity(nar.len()); - encoder - .read_to_end(&mut compressed) - .await - .expect("should compress just fine"); - compressed + if metadata.is_symlink() { + let target = read_link(&path).await?; + nar.symlink(target.as_os_str().as_encoded_bytes()).await; + } else if metadata.is_dir() { + let mut nar = nar.directory().await?; + nar_from_dir(path.into(), &mut nar).await; + nar.close().await; + } else if metadata.is_file() { + let perms = metadata.permissions().mode(); + let mut executable = false; + if (perms & 0o700) == 0o700 { + executable = true; + } + + let mut file = BufReader::new(File::open(&path).await?); + nar.file(executable, metadata.len(), &mut file).await; + } + + Ok(()) } } + +async fn nar_from_dir(path: PathBuf, node: &mut nar::Directory<'_, '_>) -> Result<()> { + let root = ReadDirStream::new(read_dir(&path).await?); + let entries = root + .map_ok(|x| (x.file_name(), x)) + .try_collect::>() + .await?; + + // directory entries must be written in ascending order of name + for (name, entry) in entries.iter() { + let node = node.entry(name.as_encoded_bytes()).await?; + let metadata = entry.metadata().await?; + + if metadata.is_symlink() { + let target = read_link(entry.path()).await?; + node.symlink(target.as_os_str().as_encoded_bytes()).await; + } else if metadata.is_dir() { + let mut node = node.directory().await?; + Box::pin(nar_from_dir(entry.path(), &mut node)).await; + node.close().await; + } else if metadata.is_file() { + let perms = metadata.permissions().mode(); + let mut executable = false; + if (perms & 0o700) == 0o700 { + executable = true; + } + + let mut file = BufReader::new(File::open(entry.path()).await?); + node.file(executable, metadata.len(), &mut file).await; + } + } + Ok(()) +} + +fn compress_and_hash_nar( + nar_file: File, + nar_hasher: &mut Sha256, + compressed_nar_hasher: &mut Sha256, +) -> impl AsyncRead { + let nar_reader = InspectReader::new(nar_file, |x| nar_hasher.update(x)); + let nar_buf_reader = BufReader::new(nar_reader); + + let compressed_nar_reader = ZstdEncoder::with_quality(nar_buf_reader, Level::Default); + InspectReader::new(compressed_nar_reader, |x| compressed_nar_hasher.update(x)) +} + +fn nar_url(compressed_nar_hash: &[u8]) -> String { + format!("nar/{}.nar.zst", nixbase32::encode(compressed_nar_hash)) +} + +async fn read_buf_nar(stream: &mut S, mut buf: BytesMut) -> Result { + while buf.len() < buf.capacity() { + let n = read_buf(stream, &mut buf).await?; + + if n == 0 { + break; + } + } + Ok(buf.freeze()) +}