From 9a8c0173c76da16e1eb7e43a1a0653e0ff6ac898 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 5 Dec 2023 16:23:48 -0700 Subject: [PATCH] fix: upgrade to libp2p 0.53 (#205) Libp2p has three major features we want: 1. Stable support for QUIC 2. Full tracing support 3. Uses prometheus_client 0.22, this enables use to use tokio-metrics In order to upgrade we needed to address the follow major breaking changes: * KeepAlive is now a boolean signal (bitswap keep alive logic is now much simpler) * Supported Protocols are only known per connection, we now hard code the list for diagnostic reasons There are a few other minor breaking changes but did not affect our code much. --- Cargo.lock | 538 ++++++++++------------ Cargo.toml | 8 +- beetle/iroh-bitswap/Cargo.toml | 1 + beetle/iroh-bitswap/src/handler.rs | 67 +-- beetle/iroh-bitswap/src/lib.rs | 37 +- one/Cargo.toml | 1 + one/src/lib.rs | 1 - one/src/network.rs | 5 +- p2p/Cargo.toml | 29 +- p2p/src/behaviour.rs | 2 +- p2p/src/behaviour/ceramic_peer_manager.rs | 33 +- p2p/src/node.rs | 221 +++++---- p2p/src/providers.rs | 15 +- p2p/src/publisher.rs | 18 +- p2p/src/rpc.rs | 35 +- p2p/src/swarm.rs | 194 ++++---- recon/Cargo.toml | 3 +- recon/src/libp2p.rs | 11 +- recon/src/libp2p/handler.rs | 43 +- recon/src/libp2p/tests.rs | 11 +- 20 files changed, 579 insertions(+), 694 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ffc27a808..8e9482470 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -292,7 +292,7 @@ dependencies = [ "event-listener 3.1.0", "event-listener-strategy", "futures-core", - "pin-project-lite 0.2.13", + "pin-project-lite", ] [[package]] @@ -393,7 +393,7 @@ checksum = "655b9c7fe787d3b25cc0f804a1a8401790f0c5bc395beb5a64dc77d8de079105" dependencies = [ "event-listener 3.1.0", "event-listener-strategy", - "pin-project-lite 0.2.13", + "pin-project-lite", ] [[package]] @@ -463,7 +463,7 @@ dependencies = [ "log 0.4.20", "memchr", "once_cell", - "pin-project-lite 0.2.13", + "pin-project-lite", "pin-utils", "slab", "wasm-bindgen-futures", @@ -477,7 +477,7 @@ checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" dependencies = [ "async-stream-impl", "futures-core", - "pin-project-lite 0.2.13", + "pin-project-lite", ] [[package]] @@ -518,12 +518,25 @@ dependencies = [ "futures-sink", "futures-util", "memchr", - "pin-project-lite 0.2.13", + "pin-project-lite", "serde", "serde_cbor", "serde_json", ] +[[package]] +name = "asynchronous-codec" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a860072022177f903e59730004fb5dc13db9275b79bb2aef7ba8ce831956c233" +dependencies = [ + "bytes 1.5.0", + "futures-sink", + "futures-util", + "memchr", + "pin-project-lite", +] + [[package]] name = "atoi" version = "2.0.0" @@ -595,7 +608,7 @@ dependencies = [ "memchr", "mime 0.3.17", "percent-encoding 2.3.0", - "pin-project-lite 0.2.13", + "pin-project-lite", "rustversion", "serde", "sync_wrapper", @@ -1128,7 +1141,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", - "unsigned-varint", + "unsigned-varint 0.7.2", ] [[package]] @@ -1329,8 +1342,6 @@ dependencies = [ "iroh-util", "libp2p", "libp2p-identity", - "libp2p-mplex", - "libp2p-quic", "lru 0.10.1", "multihash 0.18.1", "prometheus-client", @@ -1433,7 +1444,7 @@ dependencies = [ "multihash 0.16.3", "serde", "serde_bytes", - "unsigned-varint", + "unsigned-varint 0.7.2", ] [[package]] @@ -1447,7 +1458,7 @@ dependencies = [ "multihash 0.18.1", "serde", "serde_bytes", - "unsigned-varint", + "unsigned-varint 0.7.2", ] [[package]] @@ -2519,18 +2530,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "enum-as-inner" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9720bba047d567ffc8a3cba48bf19126600e249ab7f128e9233e6376976a116" -dependencies = [ - "heck", - "proc-macro2 1.0.69", - "quote 1.0.33", - "syn 1.0.109", -] - [[package]] name = "enum-as-inner" version = "0.6.0" @@ -2607,7 +2606,7 @@ checksum = "d93877bcde0eb80ca09131a08d23f0a5c18a620b01db137dba666d18cd9b30c2" dependencies = [ "concurrent-queue", "parking", - "pin-project-lite 0.2.13", + "pin-project-lite", ] [[package]] @@ -2617,7 +2616,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d96b852f1345da36d551b9473fa1e2b1eb5c5195585c6c018118bc92a8d91160" dependencies = [ "event-listener 3.1.0", - "pin-project-lite 0.2.13", + "pin-project-lite", ] [[package]] @@ -2904,9 +2903,9 @@ dependencies = [ [[package]] name = "futures-bounded" -version = "0.1.0" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b07bbbe7d7e78809544c6f718d875627addc73a7c3582447abc052cd3dc67e0" +checksum = "e1e2774cc104e198ef3d3e1ff4ab40f86fa3245d6cb6a3a46174f21463cee173" dependencies = [ "futures-timer", "futures-util", @@ -2968,7 +2967,7 @@ dependencies = [ "futures-io", "memchr", "parking", - "pin-project-lite 0.2.13", + "pin-project-lite", "waker-fn", ] @@ -2983,7 +2982,7 @@ dependencies = [ "futures-io", "memchr", "parking", - "pin-project-lite 0.2.13", + "pin-project-lite", ] [[package]] @@ -3049,7 +3048,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project-lite 0.2.13", + "pin-project-lite", "pin-utils", "slab", ] @@ -3322,6 +3321,52 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f" +[[package]] +name = "hickory-proto" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "091a6fbccf4860009355e3efc52ff4acf37a63489aad7435372d44ceeb6fbbcf" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna", + "ipnet", + "once_cell", + "rand 0.8.5", + "socket2 0.5.5", + "thiserror", + "tinyvec", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "hickory-resolver" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35b8f021164e6a984c9030023544c57789c51760065cd510572fedcfb04164e8" +dependencies = [ + "cfg-if", + "futures-util", + "hickory-proto", + "ipconfig", + "lru-cache", + "once_cell", + "parking_lot", + "rand 0.8.5", + "resolv-conf", + "smallvec", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "hkdf" version = "0.12.3" @@ -3379,7 +3424,7 @@ checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes 1.5.0", "http", - "pin-project-lite 0.2.13", + "pin-project-lite", ] [[package]] @@ -3434,7 +3479,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project-lite 0.2.13", + "pin-project-lite", "socket2 0.4.10", "tokio", "tower-service", @@ -3498,7 +3543,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ "hyper", - "pin-project-lite 0.2.13", + "pin-project-lite", "tokio", "tokio-io-timeout", ] @@ -3545,17 +3590,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" -[[package]] -name = "idna" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" -dependencies = [ - "matches", - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "idna" version = "0.4.0" @@ -3755,7 +3789,7 @@ dependencies = [ "async-channel 1.9.0", "async-stream", "async-trait", - "asynchronous-codec", + "asynchronous-codec 0.6.2", "bytes 1.5.0", "ceramic-metrics", "cid 0.10.1", @@ -3773,6 +3807,7 @@ dependencies = [ "prost-build", "rand 0.8.5", "smallvec", + "test-log", "thiserror", "tokio", "tokio-context", @@ -3780,7 +3815,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", - "unsigned-varint", + "unsigned-varint 0.7.2", ] [[package]] @@ -4451,9 +4486,9 @@ checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" [[package]] name = "libp2p" -version = "0.52.4" +version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e94495eb319a85b70a68b85e2389a95bb3555c71c49025b78c691a854a7e6464" +checksum = "1252a34c693386829c34d44ccfbce86679d2a9a2c61f582863649bbf57f26260" dependencies = [ "bytes 1.5.0", "either", @@ -4480,6 +4515,7 @@ dependencies = [ "libp2p-request-response", "libp2p-swarm", "libp2p-tcp", + "libp2p-tls", "libp2p-upnp", "libp2p-websocket", "libp2p-yamux", @@ -4491,9 +4527,9 @@ dependencies = [ [[package]] name = "libp2p-allow-block-list" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55b46558c5c0bf99d3e2a1a38fd54ff5476ca66dd1737b12466a1824dd219311" +checksum = "107b238b794cb83ab53b74ad5dcf7cca3200899b72fe662840cfb52f5b0a32e6" dependencies = [ "libp2p-core", "libp2p-identity", @@ -4503,11 +4539,12 @@ dependencies = [ [[package]] name = "libp2p-autonat" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e907be08be5e4152317a79d310a6f501a1b5c02a81dcb065dc865475bbae9498" +checksum = "d95151726170e41b591735bf95c42b888fe4aa14f65216a9fbf0edcc04510586" dependencies = [ "async-trait", + "asynchronous-codec 0.6.2", "futures", "futures-timer", "instant", @@ -4515,16 +4552,17 @@ dependencies = [ "libp2p-identity", "libp2p-request-response", "libp2p-swarm", - "log 0.4.20", "quick-protobuf", + "quick-protobuf-codec 0.2.0", "rand 0.8.5", + "tracing", ] [[package]] name = "libp2p-connection-limits" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f5107ad45cb20b2f6c3628c7b6014b996fcb13a88053f4569c872c6e30abf58" +checksum = "f2af4b1e1a1d6c5005a59b42287c0a526bcce94d8d688e2e9233b18eb843ceb4" dependencies = [ "libp2p-core", "libp2p-identity", @@ -4534,9 +4572,9 @@ dependencies = [ [[package]] name = "libp2p-core" -version = "0.40.1" +version = "0.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd44289ab25e4c9230d9246c475a22241e301b23e8f4061d3bdef304a1a99713" +checksum = "59c61b924474cf2c7edccca306693e798d797b85d004f4fef5689a7a3e6e8fe5" dependencies = [ "either", "fnv", @@ -4544,7 +4582,6 @@ dependencies = [ "futures-timer", "instant", "libp2p-identity", - "log 0.4.20", "multiaddr", "multihash 0.19.1", "multistream-select", @@ -4557,54 +4594,57 @@ dependencies = [ "serde", "smallvec", "thiserror", - "unsigned-varint", + "tracing", + "unsigned-varint 0.7.2", "void", ] [[package]] name = "libp2p-dcutr" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "458dce197aa5347a7ec0634a4c1343c6dfbf75859ef34d51e92b0cc333fe7cc3" +checksum = "a4f7bb7fa2b9e6cad9c30a6f67e3ff5c1e4b658c62b6375e35861a85f9c97bf3" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.6.2", "either", "futures", + "futures-bounded", "futures-timer", "instant", "libp2p-core", "libp2p-identity", "libp2p-swarm", - "log 0.4.20", + "lru 0.11.1", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.2.0", "thiserror", + "tracing", "void", ] [[package]] name = "libp2p-dns" -version = "0.40.1" +version = "0.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6a18db73084b4da2871438f6239fef35190b05023de7656e877c18a00541a3b" +checksum = "d17cbcf7160ff35c3e8e560de4a068fe9d6cb777ea72840e48eb76ff9576c4b6" dependencies = [ "async-trait", "futures", + "hickory-resolver", "libp2p-core", "libp2p-identity", - "log 0.4.20", "parking_lot", "smallvec", - "trust-dns-resolver", + "tracing", ] [[package]] name = "libp2p-gossipsub" -version = "0.45.2" +version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1f9624e2a843b655f1c1b8262b8d5de6f309413fca4d66f01bb0662429f84dc" +checksum = "201f0626acd8985fae7fdd318e86c954574b9eef2e5dec433936a19a0338393d" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.6.2", "base64 0.21.5", "byteorder", "bytes 1.5.0", @@ -4618,26 +4658,26 @@ dependencies = [ "libp2p-core", "libp2p-identity", "libp2p-swarm", - "log 0.4.20", "prometheus-client", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.2.0", "rand 0.8.5", "regex", "serde", "sha2 0.10.8", "smallvec", - "unsigned-varint", + "tracing", + "unsigned-varint 0.7.2", "void", ] [[package]] name = "libp2p-identify" -version = "0.43.1" +version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45a96638a0a176bec0a4bcaebc1afa8cf909b114477209d7456ade52c61cd9cd" +checksum = "0544703553921214556f7567278b4f00cdd5052d29b0555ab88290cbfe54d81c" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.6.2", "either", "futures", "futures-bounded", @@ -4645,12 +4685,12 @@ dependencies = [ "libp2p-core", "libp2p-identity", "libp2p-swarm", - "log 0.4.20", "lru 0.12.0", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.2.0", "smallvec", "thiserror", + "tracing", "void", ] @@ -4677,12 +4717,12 @@ dependencies = [ [[package]] name = "libp2p-kad" -version = "0.44.6" +version = "0.45.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16ea178dabba6dde6ffc260a8e0452ccdc8f79becf544946692fff9d412fc29d" +checksum = "8cd9ae9180fbe425f14e5558b0dfcb3ae8a76075b0eefb7792076902fbb63a14" dependencies = [ "arrayvec 0.7.4", - "asynchronous-codec", + "asynchronous-codec 0.6.2", "bytes 1.5.0", "either", "fnv", @@ -4692,46 +4732,47 @@ dependencies = [ "libp2p-core", "libp2p-identity", "libp2p-swarm", - "log 0.4.20", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.2.0", "rand 0.8.5", "serde", "sha2 0.10.8", "smallvec", "thiserror", + "tracing", "uint", - "unsigned-varint", + "unsigned-varint 0.7.2", "void", ] [[package]] name = "libp2p-mdns" -version = "0.44.0" +version = "0.45.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42a2567c305232f5ef54185e9604579a894fd0674819402bb0ac0246da82f52a" +checksum = "49007d9a339b3e1d7eeebc4d67c05dbf23d300b7d091193ec2d3f26802d7faf2" dependencies = [ "data-encoding", "futures", + "hickory-proto", "if-watch", "libp2p-core", "libp2p-identity", "libp2p-swarm", - "log 0.4.20", "rand 0.8.5", "smallvec", "socket2 0.5.5", "tokio", - "trust-dns-proto 0.22.0", + "tracing", "void", ] [[package]] name = "libp2p-metrics" -version = "0.13.1" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "239ba7d28f8d0b5d77760dc6619c05c7e88e74ec8fbbe97f856f20a56745e620" +checksum = "fdac91ae4f291046a3b2660c039a2830c931f84df2ee227989af92f7692d3357" dependencies = [ + "futures", "instant", "libp2p-core", "libp2p-dcutr", @@ -4742,41 +4783,22 @@ dependencies = [ "libp2p-ping", "libp2p-relay", "libp2p-swarm", - "once_cell", + "pin-project", "prometheus-client", ] -[[package]] -name = "libp2p-mplex" -version = "0.40.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93959ed08b6caf9810e067655e25f1362098797fef7c44d3103e63dcb6f0fabe" -dependencies = [ - "asynchronous-codec", - "bytes 1.5.0", - "futures", - "libp2p-core", - "libp2p-identity", - "log 0.4.20", - "nohash-hasher", - "parking_lot", - "rand 0.8.5", - "smallvec", - "unsigned-varint", -] - [[package]] name = "libp2p-noise" -version = "0.43.2" +version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2eeec39ad3ad0677551907dd304b2f13f17208ccebe333bef194076cd2e8921" +checksum = "8ecd0545ce077f6ea5434bcb76e8d0fe942693b4380aaad0d34a358c2bd05793" dependencies = [ + "asynchronous-codec 0.7.0", "bytes 1.5.0", "curve25519-dalek 4.1.1", "futures", "libp2p-core", "libp2p-identity", - "log 0.4.20", "multiaddr", "multihash 0.19.1", "once_cell", @@ -4786,15 +4808,16 @@ dependencies = [ "snow", "static_assertions", "thiserror", + "tracing", "x25519-dalek", "zeroize", ] [[package]] name = "libp2p-ping" -version = "0.43.1" +version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e702d75cd0827dfa15f8fd92d15b9932abe38d10d21f47c50438c71dd1b5dae3" +checksum = "76b94ee41bd8c294194fe608851e45eb98de26fe79bc7913838cbffbfe8c7ce2" dependencies = [ "either", "futures", @@ -4803,32 +4826,32 @@ dependencies = [ "libp2p-core", "libp2p-identity", "libp2p-swarm", - "log 0.4.20", "rand 0.8.5", + "tracing", "void", ] [[package]] name = "libp2p-plaintext" -version = "0.40.1" +version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53cc5390cc2f77b7de2452fb6105892d0bb64e3cafa3bb346abb603f4cc93a09" +checksum = "67330af40b67217e746d42551913cfb7ad04c74fa300fb329660a56318590b3f" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.6.2", "bytes 1.5.0", "futures", "libp2p-core", "libp2p-identity", - "log 0.4.20", "quick-protobuf", - "unsigned-varint", + "quick-protobuf-codec 0.2.0", + "tracing", ] [[package]] name = "libp2p-quic" -version = "0.9.3" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "130d451d83f21b81eb7b35b360bc7972aeafb15177784adc56528db082e6b927" +checksum = "c02570b9effbc7c33331803104a8e9e53af7f2bdb4a2b61be420d6667545a0f5" dependencies = [ "bytes 1.5.0", "futures", @@ -4837,7 +4860,6 @@ dependencies = [ "libp2p-core", "libp2p-identity", "libp2p-tls", - "log 0.4.20", "parking_lot", "quinn", "rand 0.8.5", @@ -4846,15 +4868,16 @@ dependencies = [ "socket2 0.5.5", "thiserror", "tokio", + "tracing", ] [[package]] name = "libp2p-relay" -version = "0.16.2" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65bab06b10bbfb3936955965a01bd8db105b8675faabd55c88f94703feec318b" +checksum = "0aadb213ffc8e1a6f2b9c48dcf0fc07bf370f2ea4db7981813d45e50671c8d9d" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.7.0", "bytes 1.5.0", "either", "futures", @@ -4864,39 +4887,42 @@ dependencies = [ "libp2p-core", "libp2p-identity", "libp2p-swarm", - "log 0.4.20", "quick-protobuf", - "quick-protobuf-codec", + "quick-protobuf-codec 0.3.0", "rand 0.8.5", "static_assertions", "thiserror", + "tracing", "void", ] [[package]] name = "libp2p-request-response" -version = "0.25.3" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8e3b4d67870478db72bac87bfc260ee6641d0734e0e3e275798f089c3fecfd4" +checksum = "198a07e045ca23ad3cdb0f54ef3dfb5750056e63af06803d189b0393f865f461" dependencies = [ "async-trait", "futures", + "futures-bounded", + "futures-timer", "instant", "libp2p-core", "libp2p-identity", "libp2p-swarm", - "log 0.4.20", "rand 0.8.5", "smallvec", + "tracing", "void", ] [[package]] name = "libp2p-swarm" -version = "0.43.7" +version = "0.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "580189e0074af847df90e75ef54f3f30059aedda37ea5a1659e8b9fca05c0141" +checksum = "643ce11d87db56387631c9757b61b83435b434f94dc52ec267c1666e560e78b0" dependencies = [ + "async-std", "either", "fnv", "futures", @@ -4905,23 +4931,22 @@ dependencies = [ "libp2p-core", "libp2p-identity", "libp2p-swarm-derive", - "log 0.4.20", "multistream-select", "once_cell", "rand 0.8.5", "smallvec", "tokio", + "tracing", "void", ] [[package]] name = "libp2p-swarm-derive" -version = "0.33.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4d5ec2a3df00c7836d7696c136274c9c59705bac69133253696a6c932cd1d74" +checksum = "9b27d257436d01433a21da8da7688c83dba35826726161a328ff0989cd7af2dd" dependencies = [ "heck", - "proc-macro-warning", "proc-macro2 1.0.69", "quote 1.0.33", "syn 2.0.39", @@ -4929,9 +4954,9 @@ dependencies = [ [[package]] name = "libp2p-swarm-test" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61761099882b9c4fe02d4d0fc47641e81381dd2a95a7b4ddeb0dc02f3daaaf16" +checksum = "a73027f1bdabd15d08b2c7954911cd56a6265c476763b2ceb10d9dc5ea4366b2" dependencies = [ "async-trait", "futures", @@ -4942,15 +4967,15 @@ dependencies = [ "libp2p-swarm", "libp2p-tcp", "libp2p-yamux", - "log 0.4.20", "rand 0.8.5", + "tracing", ] [[package]] name = "libp2p-tcp" -version = "0.40.1" +version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b558dd40d1bcd1aaaed9de898e9ec6a436019ecc2420dd0016e712fbb61c5508" +checksum = "8b2460fc2748919adff99ecbc1aab296e4579e41f374fb164149bd2c9e529d4c" dependencies = [ "async-io 1.13.0", "futures", @@ -4959,16 +4984,16 @@ dependencies = [ "libc", "libp2p-core", "libp2p-identity", - "log 0.4.20", "socket2 0.5.5", "tokio", + "tracing", ] [[package]] name = "libp2p-tls" -version = "0.2.1" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8218d1d5482b122ccae396bbf38abdcb283ecc96fa54760e1dfd251f0546ac61" +checksum = "93ce7e3c2e7569d685d08ec795157981722ff96e9e9f9eae75df3c29d02b07a5" dependencies = [ "futures", "futures-rustls", @@ -4985,50 +5010,50 @@ dependencies = [ [[package]] name = "libp2p-upnp" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82775a47b34f10f787ad3e2a22e2c1541e6ebef4fe9f28f3ac553921554c94c1" +checksum = "963eb8a174f828f6a51927999a9ab5e45dfa9aa2aa5fed99aa65f79de6229464" dependencies = [ "futures", "futures-timer", "igd-next", "libp2p-core", "libp2p-swarm", - "log 0.4.20", "tokio", + "tracing", "void", ] [[package]] name = "libp2p-websocket" -version = "0.42.1" +version = "0.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3facf0691bab65f571bc97c6c65ffa836248ca631d631b7691ac91deb7fceb5f" +checksum = "f4846d51afd08180e164291c3754ba30dd4fbac6fac65571be56403c16431a5e" dependencies = [ "either", "futures", "futures-rustls", "libp2p-core", "libp2p-identity", - "log 0.4.20", "parking_lot", - "quicksink", + "pin-project-lite", "rw-stream-sink", "soketto", + "tracing", "url", "webpki-roots", ] [[package]] name = "libp2p-yamux" -version = "0.44.1" +version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eedcb62824c4300efb9cfd4e2a6edaf3ca097b9e68b36dabe45a44469fd6a85" +checksum = "751f4778f71bc3db1ccf2451e7f4484463fec7f00c1ac2680e39c8368c23aae8" dependencies = [ "futures", "libp2p-core", - "log 0.4.20", "thiserror", + "tracing", "yamux", ] @@ -5152,6 +5177,15 @@ dependencies = [ "hashbrown 0.13.2", ] +[[package]] +name = "lru" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4a83fb7698b3643a0e34f9ae6f2e8f0178c0fd42f8b59d493aa271ff3a5bf21" +dependencies = [ + "hashbrown 0.14.2", +] + [[package]] name = "lru" version = "0.12.0" @@ -5185,12 +5219,6 @@ dependencies = [ "regex-automata 0.1.10", ] -[[package]] -name = "matches" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" - [[package]] name = "matchit" version = "0.7.3" @@ -5352,7 +5380,7 @@ dependencies = [ "percent-encoding 2.3.0", "serde", "static_assertions", - "unsigned-varint", + "unsigned-varint 0.7.2", "url", ] @@ -5394,7 +5422,7 @@ dependencies = [ "serde-big-array", "sha2 0.10.8", "sha3", - "unsigned-varint", + "unsigned-varint 0.7.2", ] [[package]] @@ -5413,7 +5441,7 @@ dependencies = [ "serde-big-array", "sha2 0.10.8", "sha3", - "unsigned-varint", + "unsigned-varint 0.7.2", ] [[package]] @@ -5424,7 +5452,7 @@ checksum = "076d548d76a0e2a0d4ab471d0b1c36c577786dfc4471242035d97a12a735c492" dependencies = [ "core2", "serde", - "unsigned-varint", + "unsigned-varint 0.7.2", ] [[package]] @@ -5476,7 +5504,7 @@ dependencies = [ "log 0.4.20", "pin-project", "smallvec", - "unsigned-varint", + "unsigned-varint 0.7.2", ] [[package]] @@ -5905,7 +5933,7 @@ dependencies = [ "indexmap 1.9.3", "js-sys", "once_cell", - "pin-project-lite 0.2.13", + "pin-project-lite", "thiserror", ] @@ -6029,11 +6057,12 @@ dependencies = [ [[package]] name = "pem" -version = "1.1.1" +version = "3.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8835c273a76a90455d7344889b0964598e3316e2a79ede8e36f16bdcf2228b8" +checksum = "3163d2912b7c3b52d651a055f2c7eec9ba5cd22d26ef75b8dd3a59980b185923" dependencies = [ - "base64 0.13.1", + "base64 0.21.5", + "serde", ] [[package]] @@ -6165,12 +6194,6 @@ dependencies = [ "syn 2.0.39", ] -[[package]] -name = "pin-project-lite" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777" - [[package]] name = "pin-project-lite" version = "0.2.13" @@ -6311,7 +6334,7 @@ dependencies = [ "concurrent-queue", "libc", "log 0.4.20", - "pin-project-lite 0.2.13", + "pin-project-lite", "windows-sys 0.48.0", ] @@ -6323,7 +6346,7 @@ checksum = "e53b6af1f60f36f8c2ac2aad5459d75a5a9b4be1e8cdd40264f315d78193e531" dependencies = [ "cfg-if", "concurrent-queue", - "pin-project-lite 0.2.13", + "pin-project-lite", "rustix 0.38.25", "tracing", "windows-sys 0.48.0", @@ -6489,17 +6512,6 @@ version = "0.5.20+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" -[[package]] -name = "proc-macro-warning" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1eaa7fa0aa1929ffdf7eeb6eac234dde6268914a14ad44d23521ab6a9b258e" -dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", - "syn 2.0.39", -] - [[package]] name = "proc-macro2" version = "0.4.30" @@ -6526,9 +6538,9 @@ checksum = "8bccbff07d5ed689c4087d20d7307a52ab6141edeedf487c3876a55b86cf63df" [[package]] name = "prometheus-client" -version = "0.21.2" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c99afa9a01501019ac3a14d71d9f94050346f55ca471ce90c799a15c58f61e2" +checksum = "510c4f1c9d81d556458f94c98f857748130ea9737bbd6053da497503b26ea63c" dependencies = [ "dtoa", "itoa", @@ -6639,11 +6651,24 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ededb1cd78531627244d51dd0c7139fbe736c7d57af0092a76f0ffb2f56e98" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.6.2", "bytes 1.5.0", "quick-protobuf", "thiserror", - "unsigned-varint", + "unsigned-varint 0.7.2", +] + +[[package]] +name = "quick-protobuf-codec" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0471957c92926797222fa475072f58ddc5d5bc969ccc0c6f317b2fc7f44bc60" +dependencies = [ + "asynchronous-codec 0.7.0", + "bytes 1.5.0", + "quick-protobuf", + "thiserror", + "unsigned-varint 0.8.0", ] [[package]] @@ -6657,17 +6682,6 @@ dependencies = [ "rand 0.8.5", ] -[[package]] -name = "quicksink" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77de3c815e5a160b1539c6592796801df2043ae35e123b46d73380cfa57af858" -dependencies = [ - "futures-core", - "futures-sink", - "pin-project-lite 0.1.12", -] - [[package]] name = "quinn" version = "0.10.2" @@ -6676,7 +6690,7 @@ checksum = "8cc2c5017e4b43d5995dcea317bc46c1e09404c0a9664d2908f7f02dfe943d75" dependencies = [ "bytes 1.5.0", "futures-io", - "pin-project-lite 0.2.13", + "pin-project-lite", "quinn-proto", "quinn-udp", "rustc-hash", @@ -6939,9 +6953,9 @@ dependencies = [ [[package]] name = "rcgen" -version = "0.10.0" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffbe84efe2f38dea12e9bfc1f65377fdf03e53a18cb3b995faedf7934c7e785b" +checksum = "52c4f3084aa3bc7dfbba4eff4fab2a54db4324965d8872ab933565e6fbd83bc6" dependencies = [ "pem", "ring 0.16.20", @@ -6977,7 +6991,7 @@ version = "0.9.0" dependencies = [ "anyhow", "async-trait", - "asynchronous-codec", + "asynchronous-codec 0.6.2", "ceramic-core", "cid 0.10.1", "codespan-reporting", @@ -7096,7 +7110,7 @@ dependencies = [ "mime 0.3.17", "once_cell", "percent-encoding 2.3.0", - "pin-project-lite 0.2.13", + "pin-project-lite", "rustls", "rustls-pemfile", "serde", @@ -8387,7 +8401,7 @@ dependencies = [ "simple_asn1", "ssi-crypto", "thiserror", - "unsigned-varint", + "unsigned-varint 0.7.2", "zeroize", ] @@ -8955,7 +8969,7 @@ dependencies = [ "mio", "num_cpus", "parking_lot", - "pin-project-lite 0.2.13", + "pin-project-lite", "signal-hook-registry", "socket2 0.5.5", "tokio-macros", @@ -8978,7 +8992,7 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" dependencies = [ - "pin-project-lite 0.2.13", + "pin-project-lite", "tokio", ] @@ -9032,7 +9046,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" dependencies = [ "futures-core", - "pin-project-lite 0.2.13", + "pin-project-lite", "tokio", ] @@ -9046,7 +9060,9 @@ dependencies = [ "futures-core", "futures-io", "futures-sink", - "pin-project-lite 0.2.13", + "futures-util", + "hashbrown 0.14.2", + "pin-project-lite", "tokio", "tracing", ] @@ -9168,7 +9184,7 @@ dependencies = [ "futures-util", "indexmap 1.9.3", "pin-project", - "pin-project-lite 0.2.13", + "pin-project-lite", "rand 0.8.5", "slab", "tokio", @@ -9197,7 +9213,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log 0.4.20", - "pin-project-lite 0.2.13", + "pin-project-lite", "tracing-attributes", "tracing-core", ] @@ -9335,78 +9351,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "trust-dns-proto" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f7f83d1e4a0e4358ac54c5c3681e5d7da5efc5a7a632c90bb6d6669ddd9bc26" -dependencies = [ - "async-trait", - "cfg-if", - "data-encoding", - "enum-as-inner 0.5.1", - "futures-channel", - "futures-io", - "futures-util", - "idna 0.2.3", - "ipnet", - "lazy_static", - "rand 0.8.5", - "smallvec", - "socket2 0.4.10", - "thiserror", - "tinyvec", - "tokio", - "tracing", - "url", -] - -[[package]] -name = "trust-dns-proto" -version = "0.23.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3119112651c157f4488931a01e586aa459736e9d6046d3bd9105ffb69352d374" -dependencies = [ - "async-trait", - "cfg-if", - "data-encoding", - "enum-as-inner 0.6.0", - "futures-channel", - "futures-io", - "futures-util", - "idna 0.4.0", - "ipnet", - "once_cell", - "rand 0.8.5", - "smallvec", - "thiserror", - "tinyvec", - "tokio", - "tracing", - "url", -] - -[[package]] -name = "trust-dns-resolver" -version = "0.23.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a3e6c3aff1718b3c73e395d1f35202ba2ffa847c6a62eea0db8fb4cfe30be6" -dependencies = [ - "cfg-if", - "futures-util", - "ipconfig", - "lru-cache", - "once_cell", - "parking_lot", - "rand 0.8.5", - "resolv-conf", - "smallvec", - "thiserror", - "tokio", - "tracing", - "trust-dns-proto 0.23.2", -] - [[package]] name = "try-lock" version = "0.2.4" @@ -9531,7 +9475,17 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105" dependencies = [ - "asynchronous-codec", + "asynchronous-codec 0.6.2", + "bytes 1.5.0", +] + +[[package]] +name = "unsigned-varint" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" +dependencies = [ + "asynchronous-codec 0.7.0", "bytes 1.5.0", ] @@ -9554,7 +9508,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" dependencies = [ "form_urlencoded", - "idna 0.4.0", + "idna", "percent-encoding 2.3.0", ] diff --git a/Cargo.toml b/Cargo.toml index 8dd9e7d1d..b486d107f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,7 @@ keyed_priority_queue = "0.4.1" lazy_static = "1.4" libipld = "0.16" libipld-cbor = "0.16" -libp2p = { version = "0.52.4", default-features = false } +libp2p = { version = "0.53", default-features = false } libp2p-identity = { version = "0.2", features = ["peerid", "ed25519"] } lru = "0.10" mime = "0.3" @@ -116,7 +116,7 @@ opentelemetry-otlp = "0.11" par-stream = { version = "0.10.2", default-features = false } paste = "1.0.9" phf = "0.11" -prometheus-client = "0.21" +prometheus-client = "0.22" proptest = "1" prost = "0.11" prost-build = "0.11.1" @@ -165,9 +165,7 @@ tokio = { version = "1", default-features = false, features = [ tokio-context = "0.1.3" tokio-stream = "0.1.11" tokio-test = "0.4.2" -tokio-util = { version = "0.7.10", default-features = false, features = [ - "compat", -] } +tokio-util = { version = "0.7.10", features = ["compat", "rt"] } toml = "0.5.9" tower = "0.4" tower-http = "0.3" diff --git a/beetle/iroh-bitswap/Cargo.toml b/beetle/iroh-bitswap/Cargo.toml index ae705c116..d68c82ff5 100644 --- a/beetle/iroh-bitswap/Cargo.toml +++ b/beetle/iroh-bitswap/Cargo.toml @@ -47,6 +47,7 @@ libp2p = { workspace = true, features = ["yamux", "noise", "tcp", "tokio"] } tokio = { workspace = true, features = ["macros", "net", "rt"] } tokio-util = { workspace = true, features = ["compat"] } tracing-subscriber = { workspace = true, features = ["env-filter"] } +test-log.workspace = true [[bench]] diff --git a/beetle/iroh-bitswap/src/handler.rs b/beetle/iroh-bitswap/src/handler.rs index 0ead54534..24ed144f0 100644 --- a/beetle/iroh-bitswap/src/handler.rs +++ b/beetle/iroh-bitswap/src/handler.rs @@ -2,7 +2,7 @@ use std::{ collections::VecDeque, fmt::Debug, task::{Context, Poll}, - time::{Duration, Instant}, + time::Duration, }; use asynchronous_codec::Framed; @@ -17,12 +17,12 @@ use futures::{ use libp2p::swarm::handler::FullyNegotiatedInbound; use libp2p::swarm::{ handler::{DialUpgradeError, FullyNegotiatedOutbound}, - ConnectionHandler, ConnectionHandlerEvent, KeepAlive, StreamUpgradeError, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, StreamUpgradeError, SubstreamProtocol, }; use libp2p::PeerId; use smallvec::SmallVec; use tokio::sync::oneshot; -use tracing::{trace, warn}; +use tracing::{debug, trace, warn}; use crate::{ error::Error, @@ -31,10 +31,6 @@ use crate::{ protocol::{BitswapCodec, ProtocolConfig, ProtocolId}, }; -/// The initial time (in seconds) we set the keep alive for protocol negotiations to occur. -// TODO: configurable -const INITIAL_KEEP_ALIVE: u64 = 30; - #[derive(thiserror::Error, Debug)] pub enum BitswapHandlerError { /// The message exceeds the maximum transmission size. @@ -85,12 +81,8 @@ pub enum BitswapHandlerIn { Unprotect, } -type BitswapConnectionHandlerEvent = ConnectionHandlerEvent< - ProtocolConfig, - (BitswapMessage, BitswapMessageResponse), - HandlerEvent, - BitswapHandlerError, ->; +type BitswapConnectionHandlerEvent = + ConnectionHandlerEvent; /// Protocol Handler that manages a single long-lived substream with a peer. pub struct BitswapHandler { @@ -118,7 +110,7 @@ pub struct BitswapHandler { upgrade_errors: VecDeque>, /// Flag determining whether to maintain the connection to the peer. - keep_alive: KeepAlive, + keep_alive: bool, } impl Debug for BitswapHandler { @@ -145,8 +137,6 @@ impl Debug for BitswapHandler { impl BitswapHandler { /// Builds a new [`BitswapHandler`]. - // TODO(WS1-1291): Remove uses of KeepAlive::Until - #[allow(deprecated)] pub fn new( remote_peer_id: PeerId, protocol_config: ProtocolConfig, @@ -161,7 +151,7 @@ impl BitswapHandler { protocol: None, idle_timeout, upgrade_errors: VecDeque::new(), - keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)), + keep_alive: true, events: Default::default(), } } @@ -170,7 +160,6 @@ impl BitswapHandler { impl ConnectionHandler for BitswapHandler { type FromBehaviour = BitswapHandlerIn; type ToBehaviour = HandlerEvent; - type Error = BitswapHandlerError; type InboundOpenInfo = (); type InboundProtocol = ProtocolConfig; type OutboundOpenInfo = (BitswapMessage, BitswapMessageResponse); @@ -180,12 +169,10 @@ impl ConnectionHandler for BitswapHandler { self.listen_protocol.clone() } - fn connection_keep_alive(&self) -> KeepAlive { + fn connection_keep_alive(&self) -> bool { self.keep_alive } - // TODO(WS1-1291): Remove uses of KeepAlive::Until - #[allow(deprecated)] fn poll(&mut self, cx: &mut Context<'_>) -> Poll { inc!(BitswapMetrics::HandlerPollCount); if !self.events.is_empty() { @@ -197,7 +184,7 @@ impl ConnectionHandler for BitswapHandler { // Handle any upgrade errors if let Some(error) = self.upgrade_errors.pop_front() { inc!(BitswapMetrics::HandlerConnUpgradeErrors); - let reported_error = match error { + let error = match error { StreamUpgradeError::Timeout => BitswapHandlerError::NegotiationTimeout, StreamUpgradeError::Apply(e) => e, StreamUpgradeError::NegotiationFailed => { @@ -205,9 +192,12 @@ impl ConnectionHandler for BitswapHandler { } StreamUpgradeError::Io(e) => e.into(), }; + debug!(%error, "connection upgrade failed"); - // Close the connection - return Poll::Ready(ConnectionHandlerEvent::Close(reported_error)); + // We no longer want to use this connection. + // Let the swarm close it if no one else is using it. + self.keep_alive = false; + return Poll::Pending; } // determine if we need to create the stream @@ -227,7 +217,7 @@ impl ConnectionHandler for BitswapHandler { if let Poll::Ready(Some(event)) = self.inbound_substreams.poll_next_unpin(cx) { if let ConnectionHandlerEvent::NotifyBehaviour(HandlerEvent::Message { .. }) = event { // Update keep alive as we have received a message - self.keep_alive = KeepAlive::Until(Instant::now() + self.idle_timeout); + self.keep_alive = true; } return Poll::Ready(event); @@ -282,35 +272,24 @@ impl ConnectionHandler for BitswapHandler { self.upgrade_errors.push_back(err); } - libp2p::swarm::handler::ConnectionEvent::ListenUpgradeError(_) - | libp2p::swarm::handler::ConnectionEvent::LocalProtocolsChange(_) - | libp2p::swarm::handler::ConnectionEvent::RemoteProtocolsChange(_) => {} + _ => {} } } - // TODO(WS1-1291): Remove uses of KeepAlive::Until - #[allow(deprecated)] fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { match event { BitswapHandlerIn::Message(m, response) => { self.send_queue.push_back((m, response)); - // sending a message, reset keepalive - self.keep_alive = KeepAlive::Until(Instant::now() + self.idle_timeout); - } - BitswapHandlerIn::Protect => { - self.keep_alive = KeepAlive::Yes; - } - BitswapHandlerIn::Unprotect => { - self.keep_alive = - KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)); + // sending a message, ensure keep_alive is true + self.keep_alive = true } + BitswapHandlerIn::Protect => self.keep_alive = true, + BitswapHandlerIn::Unprotect => self.keep_alive = false, } } } -// TODO(WS1-1344): Remove uses of ConnectionHandlerEvent::Close -#[allow(deprecated)] #[tracing::instrument(skip(substream))] fn inbound_substream( // Include remote_peer_id for tracing context only @@ -330,10 +309,8 @@ fn inbound_substream( } _ => { warn!(%err, "inbound stream error"); - // More serious errors, close this side of the stream. If the - // peer is still around, they will re-establish their connection - - yield ConnectionHandlerEvent::Close(err); + // Stop using the connection, if we are the last protocol using the + // connection then it will be closed. break; } } diff --git a/beetle/iroh-bitswap/src/lib.rs b/beetle/iroh-bitswap/src/lib.rs index 351f5fd76..d5f5fc53f 100644 --- a/beetle/iroh-bitswap/src/lib.rs +++ b/beetle/iroh-bitswap/src/lib.rs @@ -18,9 +18,7 @@ use ceramic_metrics::{bitswap::BitswapMetrics, inc, record}; use cid::Cid; use handler::{BitswapHandler, HandlerEvent}; use libp2p::swarm::ConnectionId; -use libp2p::swarm::{ - CloseConnection, DialError, NetworkBehaviour, NotifyHandler, PollParameters, ToSwarm, -}; +use libp2p::swarm::{CloseConnection, DialError, NetworkBehaviour, NotifyHandler, ToSwarm}; use libp2p::{swarm::dial_opts::DialOpts, StreamProtocol}; use libp2p::{Multiaddr, PeerId}; use tokio::sync::{mpsc, oneshot}; @@ -404,7 +402,7 @@ impl NetworkBehaviour for Bitswap { type ConnectionHandler = BitswapHandler; type ToSwarm = BitswapEvent; - fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) { + fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) { match event { libp2p::swarm::FromSwarm::ConnectionEstablished(event) => { trace!( @@ -472,7 +470,6 @@ impl NetworkBehaviour for Bitswap { fn poll( &mut self, cx: &mut Context<'_>, - _params: &mut impl PollParameters, ) -> Poll>> { inc!(BitswapMetrics::ToSwarmPollTick); // limit work @@ -620,9 +617,9 @@ mod tests { use libp2p::yamux; use libp2p::{core::muxing::StreamMuxerBox, swarm}; use libp2p::{noise, PeerId, Swarm, Transport}; + use test_log::test; use tokio::sync::{mpsc, RwLock}; use tracing::{info, trace}; - use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use super::*; use crate::Block; @@ -699,57 +696,54 @@ mod tests { } } - #[tokio::test] + #[test(tokio::test)] async fn test_get_1_block() { get_block::<1>().await; } - #[tokio::test] + #[test(tokio::test)] async fn test_get_2_block() { get_block::<2>().await; } - #[tokio::test] + #[test(tokio::test)] async fn test_get_4_block() { get_block::<4>().await; } - #[tokio::test] + #[test(tokio::test)] async fn test_get_64_block() { get_block::<64>().await; } - #[tokio::test] + #[test(tokio::test)] async fn test_get_65_block() { get_block::<65>().await; } - #[tokio::test] + #[test(tokio::test)] async fn test_get_66_block() { get_block::<66>().await; } - #[tokio::test] + #[test(tokio::test)] async fn test_get_128_block() { - tracing_subscriber::registry() - .with(fmt::layer().pretty()) - .with(EnvFilter::from_default_env()) - .init(); - get_block::<128>().await; } - #[tokio::test] + #[test(tokio::test)] async fn test_get_1024_block() { get_block::<1024>().await; } async fn get_block() { + info!("get_block"); let (peer1_id, trans) = mk_transport(); let store1 = TestStore::default(); let bs1 = Bitswap::new(peer1_id, store1.clone(), Config::default()).await; - let config = swarm::Config::with_tokio_executor(); + let config = swarm::Config::with_tokio_executor() + .with_idle_connection_timeout(Duration::from_secs(5)); let mut swarm1 = Swarm::new(trans, bs1, peer1_id, config); let blocks = (0..N).map(|_| create_random_block_v1()).collect::>(); @@ -783,7 +777,8 @@ mod tests { let store2 = TestStore::default(); let bs2 = Bitswap::new(peer2_id, store2.clone(), Config::default()).await; - let config = swarm::Config::with_tokio_executor(); + let config = swarm::Config::with_tokio_executor() + .with_idle_connection_timeout(Duration::from_secs(5)); let mut swarm2 = Swarm::new(trans, bs2, peer2_id, config); let swarm2_bs = swarm2.behaviour().clone(); diff --git a/one/Cargo.toml b/one/Cargo.toml index 38ebcdbb8..e0881409e 100644 --- a/one/Cargo.toml +++ b/one/Cargo.toml @@ -46,6 +46,7 @@ tracing-appender = "0.2.2" tracing-subscriber.workspace = true tracing.workspace = true + [features] default = [] tokio-console = ["ceramic-metrics/tokio-console"] diff --git a/one/src/lib.rs b/one/src/lib.rs index 0c65f68a0..659fd308a 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -319,7 +319,6 @@ impl Daemon { let info = Info::new().await?; let mut metrics_config = MetricsConfig { - // Do not push metrics to any endpoint. export: opts.metrics, tracing: opts.tracing, log_format: match opts.log_format { diff --git a/one/src/network.rs b/one/src/network.rs index 5a4f0c575..102a9c6ad 100644 --- a/one/src/network.rs +++ b/one/src/network.rs @@ -12,7 +12,7 @@ use libp2p::identity::Keypair; use recon::{libp2p::Recon, Sha256a}; use sqlx::SqlitePool; use tokio::task::{self, JoinHandle}; -use tracing::error; +use tracing::{debug, error}; /// Builder provides an ordered API for constructing an Ipfs service. pub struct Builder { @@ -56,8 +56,9 @@ impl Builder { let task = task::spawn(async move { if let Err(err) = p2p.run().await { - error!("{:?}", err); + error!(%err, "failed to gracefully stop p2p task"); } + debug!("node task finished"); }); Ok(Builder { diff --git a/p2p/Cargo.toml b/p2p/Cargo.toml index a316e5480..c8b483c90 100644 --- a/p2p/Cargo.toml +++ b/p2p/Cargo.toml @@ -35,8 +35,6 @@ iroh-rpc-client.workspace = true iroh-rpc-types.workspace = true iroh-util.workspace = true libp2p-identity.workspace = true -libp2p-mplex = "0.40.0" -libp2p-quic = { version = "0.9", features = ["tokio"] } lru.workspace = true multihash.workspace = true prometheus-client.workspace = true @@ -56,25 +54,28 @@ zeroize.workspace = true [dependencies.libp2p] workspace = true features = [ + "autonat", + "dcutr", + "dns", + "ed25519", "gossipsub", - "kad", "identify", - "ping", + "kad", + "macros", + "quic", "mdns", - "noise", - "yamux", - "tcp", - "dns", - "request-response", - "websocket", - "serde", "metrics", + "noise", + "ping", "relay", - "dcutr", - "autonat", + "request-response", "rsa", + "serde", + "tcp", + "tls", "tokio", - "macros", + "websocket", + "yamux", ] [dev-dependencies] diff --git a/p2p/src/behaviour.rs b/p2p/src/behaviour.rs index 9724320f2..609bdf7ec 100644 --- a/p2p/src/behaviour.rs +++ b/p2p/src/behaviour.rs @@ -50,7 +50,7 @@ pub(crate) struct NodeBehaviour { limits: connection_limits::Behaviour, pub(crate) peer_manager: CeramicPeerManager, ping: Ping, - identify: identify::Behaviour, + pub(crate) identify: identify::Behaviour, pub(crate) bitswap: Toggle>, pub(crate) kad: Toggle>, mdns: Toggle, diff --git a/p2p/src/behaviour/ceramic_peer_manager.rs b/p2p/src/behaviour/ceramic_peer_manager.rs index 7be4f7bda..854a58299 100644 --- a/p2p/src/behaviour/ceramic_peer_manager.rs +++ b/p2p/src/behaviour/ceramic_peer_manager.rs @@ -18,7 +18,7 @@ use libp2p::swarm::{ use libp2p::{ identify::Info as IdentifyInfo, multiaddr::Protocol, - swarm::{dummy, ConnectionId, DialError, NetworkBehaviour, PollParameters}, + swarm::{dummy, ConnectionId, DialError, NetworkBehaviour}, Multiaddr, PeerId, }; use tokio::time; @@ -34,7 +34,6 @@ use crate::metrics::{self, Metrics}; pub struct CeramicPeerManager { metrics: Metrics, info: AHashMap, - supported_protocols: Vec, ceramic_peers: AHashMap, } @@ -80,7 +79,6 @@ impl CeramicPeerManager { Ok(Self { metrics, info: Default::default(), - supported_protocols: Default::default(), ceramic_peers, }) } @@ -97,10 +95,6 @@ impl CeramicPeerManager { self.info.get(peer_id) } - pub fn supported_protocols(&self) -> Vec { - self.supported_protocols.clone() - } - pub fn is_ceramic_peer(&self, peer_id: &PeerId) -> bool { self.ceramic_peers.contains_key(peer_id) } @@ -143,7 +137,7 @@ impl NetworkBehaviour for CeramicPeerManager { type ConnectionHandler = dummy::ConnectionHandler; type ToSwarm = PeerManagerEvent; - fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) { + fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) { match event { libp2p::swarm::FromSwarm::ConnectionEstablished(event) => { // First connection @@ -194,30 +188,7 @@ impl NetworkBehaviour for CeramicPeerManager { fn poll( &mut self, cx: &mut Context<'_>, - params: &mut impl PollParameters, ) -> Poll>> { - // TODO(nathanielc): - // We can only get the supported protocols of the local node by examining the - // `PollParameters`, which mean you can only get the supported protocols by examining the - // `PollParameters` in this method (`poll`) of a network behaviour. - // I injected this responsibility in the `peer_manager`, because it's the only "simple" - // network behaviour we have implemented. - // There is an issue up to remove `PollParameters`, and a discussion into how to instead - // get the `supported_protocols` of the node: - // https://github.com/libp2p/rust-libp2p/issues/3124 - // When that is resolved, we can hopefully remove this responsibility from the `peer_manager`, - // where it, frankly, doesn't belong. - // - // With libp2p 0.52.4 supported_protocols has been deprecated. We should now be able to - // refactor the peer_manager to not need to handle supported_protocols. - #[allow(deprecated)] - if self.supported_protocols.is_empty() { - self.supported_protocols = params - .supported_protocols() - .map(|p| String::from_utf8_lossy(&p).to_string()) - .collect(); - } - for (peer_id, peer) in self.ceramic_peers.iter_mut() { if let Some(mut dial_future) = peer.dial_future.take() { match dial_future.as_mut().poll_unpin(cx) { diff --git a/p2p/src/node.rs b/p2p/src/node.rs index 0364e0f56..77a828e9c 100644 --- a/p2p/src/node.rs +++ b/p2p/src/node.rs @@ -28,7 +28,7 @@ use libp2p::{ mdns, metrics::Recorder as _, multiaddr::Protocol, - swarm::{dial_opts::DialOpts, ConnectionHandler, NetworkBehaviour, SwarmEvent}, + swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent}, PeerId, StreamProtocol, Swarm, }; use sqlx::SqlitePool; @@ -72,6 +72,7 @@ where { metrics: Metrics, swarm: Swarm>, + supported_protocols: HashSet, net_receiver_in: Receiver, dial_queries: AHashMap>>>, lookup_queries: AHashMap>>>, @@ -136,11 +137,7 @@ where // Allow IntoConnectionHandler deprecated associated type. // We are not using IntoConnectionHandler directly only referencing the type as part of this event signature. -#[allow(deprecated)] -type NodeSwarmEvent = SwarmEvent< - as NetworkBehaviour>::ToSwarm, - < as NetworkBehaviour>::ConnectionHandler as ConnectionHandler>::Error, ->; +type NodeSwarmEvent = SwarmEvent< as NetworkBehaviour>::ToSwarm>; impl Node where I: Recon, @@ -165,7 +162,7 @@ where let block_store = crate::SQLiteBlockStore::new(sql_pool).await?; let mut swarm = build_swarm( &libp2p_config, - &keypair, + keypair, recons, block_store.clone(), metrics.clone(), @@ -209,6 +206,28 @@ where Ok(Node { metrics, swarm, + // TODO(WS1-1364): Determine psuedo-dynamically the set of locally supported protocols. + // For now hard code all protocols. + // https://github.com/libp2p/rust-libp2p/discussions/4982 + supported_protocols: HashSet::from_iter( + [ + "/ipfs/bitswap", + "/ipfs/bitswap/1.0.0", + "/ipfs/bitswap/1.1.0", + "/ipfs/bitswap/1.2.0", + "/ipfs/id/1.0.0", + "/ipfs/id/push/1.0.0", + "/ipfs/kad/1.0.0", + "/ipfs/ping/1.0.0", + "/libp2p/autonat/1.0.0", + "/libp2p/circuit/relay/0.2.0/hop", + "/libp2p/circuit/relay/0.2.0/stop", + "/meshsub/1.0.0", + "/meshsub/1.1.0", + ] + .iter() + .map(|p| p.to_string()), + ), net_receiver_in: network_receiver_in, dial_queries: Default::default(), lookup_queries: Default::default(), @@ -263,7 +282,6 @@ where let mut kad_state = KadBootstrapState::Idle; loop { self.metrics.record(&LoopEvent); - tokio::select! { swarm_event = self.swarm.next() => { let swarm_event = swarm_event.expect("the swarm will never die"); @@ -694,109 +712,115 @@ where Event::Identify(e) => { libp2p_metrics().record(&*e); trace!("tick: identify {:?}", e); - if let identify::Event::Received { peer_id, info } = *e { - // Did we learn about a new external address? - if !self - .swarm - .external_addresses() - .any(|addr| addr == &info.observed_addr) - && !self.failed_external_addresses.contains(&info.observed_addr) - { - if self.trust_observed_addrs { - debug!( - address=%info.observed_addr, - %peer_id, - "adding trusted external address observed from peer", - ); - // Explicily trust any observed address from any peer. - self.swarm.add_external_address(info.observed_addr.clone()); - } else if let Some(autonat) = self.swarm.behaviour_mut().autonat.as_mut() { - // Probe the observed addr for external connectivity. - // Only probe one address at a time. - // - // This logic is run very frequently because any new peer connection - // for a new observed address triggers this path. Its typical to have - // only a few external addresses, in which cases its likely that the - // in-progress address probe is one that will succeed. - // - // In cases where there are lots of different observed addresses its - // likely that NAT hasn't been setup and so the peer doesn't have an - // external address. Therefore we do not want to waste resources on - // probing many different addresses that are likely to fail. - if self.active_address_probe.is_none() { - self.active_address_probe = Some(info.observed_addr.clone()); + match *e { + identify::Event::Received { peer_id, info } => { + // Did we learn about a new external address? + if !self + .swarm + .external_addresses() + .any(|addr| addr == &info.observed_addr) + && !self.failed_external_addresses.contains(&info.observed_addr) + { + if self.trust_observed_addrs { debug!( address=%info.observed_addr, %peer_id, - "probing observed address from peer for external connectivity", + "adding trusted external address observed from peer", ); - autonat.probe_address(info.observed_addr.clone()); - } + // Explicily trust any observed address from any peer. + self.swarm.add_external_address(info.observed_addr.clone()); + } else if let Some(autonat) = + self.swarm.behaviour_mut().autonat.as_mut() + { + // Probe the observed addr for external connectivity. + // Only probe one address at a time. + // + // This logic is run very frequently because any new peer connection + // for a new observed address triggers this path. Its typical to have + // only a few external addresses, in which cases its likely that the + // in-progress address probe is one that will succeed. + // + // In cases where there are lots of different observed addresses its + // likely that NAT hasn't been setup and so the peer doesn't have an + // external address. Therefore we do not want to waste resources on + // probing many different addresses that are likely to fail. + if self.active_address_probe.is_none() { + self.active_address_probe = Some(info.observed_addr.clone()); + debug!( + address=%info.observed_addr, + %peer_id, + "probing observed address from peer for external connectivity", + ); + autonat.probe_address(info.observed_addr.clone()); + } + }; }; - }; - let mut kad_address_added = false; - for protocol in &info.protocols { - // Sometimes peers do not report that they support the kademlia protocol. - // Here we assume that all ceramic peers do support the protocol. - // Therefore we add all ceramic peers and any peers that explicitly support - // kademlia to the kademlia routing table. - if self - .swarm - .behaviour() - .peer_manager - .is_ceramic_peer(&peer_id) - || protocol == &kad::PROTOCOL_NAME - { - for addr in &info.listen_addrs { - if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() { - kad.add_address(&peer_id, addr.clone()); - kad_address_added = true; + let mut kad_address_added = false; + for protocol in &info.protocols { + // Sometimes peers do not report that they support the kademlia protocol. + // Here we assume that all ceramic peers do support the protocol. + // Therefore we add all ceramic peers and any peers that explicitly support + // kademlia to the kademlia routing table. + if self + .swarm + .behaviour() + .peer_manager + .is_ceramic_peer(&peer_id) + || protocol == &kad::PROTOCOL_NAME + { + for addr in &info.listen_addrs { + if let Some(kad) = self.swarm.behaviour_mut().kad.as_mut() { + kad.add_address(&peer_id, addr.clone()); + kad_address_added = true; + } } - } - } else if protocol == &StreamProtocol::new("/libp2p/autonat/1.0.0") { - // TODO: expose protocol name on `libp2p::autonat`. - // TODO: should we remove them at some point? - for addr in &info.listen_addrs { - if let Some(autonat) = self.swarm.behaviour_mut().autonat.as_mut() { - autonat.add_server(peer_id, Some(addr.clone())); + } else if protocol == &StreamProtocol::new("/libp2p/autonat/1.0.0") { + // TODO: expose protocol name on `libp2p::autonat`. + // TODO: should we remove them at some point? + for addr in &info.listen_addrs { + if let Some(autonat) = + self.swarm.behaviour_mut().autonat.as_mut() + { + autonat.add_server(peer_id, Some(addr.clone())); + } } } } - } - if let Some(bitswap) = self.swarm.behaviour().bitswap.as_ref() { - bitswap.on_identify(&peer_id, &info.protocols); - } + if let Some(bitswap) = self.swarm.behaviour().bitswap.as_ref() { + bitswap.on_identify(&peer_id, &info.protocols); + } - self.swarm - .behaviour_mut() - .peer_manager - .inject_identify_info(peer_id, info.clone()); + self.swarm + .behaviour_mut() + .peer_manager + .inject_identify_info(peer_id, info.clone()); - if let Some(channels) = self.lookup_queries.remove(&peer_id) { - for chan in channels { - chan.send(Ok(info.clone())).ok(); + if let Some(channels) = self.lookup_queries.remove(&peer_id) { + for chan in channels { + chan.send(Ok(info.clone())).ok(); + } + } + if kad_address_added { + Ok(Some(SwarmEventResult::KademliaAddressAdded)) + } else { + Ok(None) } } - if kad_address_added { - Ok(Some(SwarmEventResult::KademliaAddressAdded)) - } else { - Ok(None) - } - } else if let identify::Event::Error { peer_id, error } = *e { - if let Some(channels) = self.lookup_queries.remove(&peer_id) { - for chan in channels { - chan.send(Err(anyhow!( - "error upgrading connection to peer {:?}: {}", - peer_id, - error - ))) - .ok(); + identify::Event::Error { peer_id, error } => { + if let Some(channels) = self.lookup_queries.remove(&peer_id) { + for chan in channels { + chan.send(Err(anyhow!( + "error upgrading connection to peer {:?}: {}", + peer_id, + error + ))) + .ok(); + } } + Ok(None) } - Ok(None) - } else { - Ok(None) + identify::Event::Sent { .. } | identify::Event::Pushed { .. } => Ok(None), } } Event::Ping(e) => { @@ -1183,7 +1207,8 @@ where let observed_addrs = self.swarm.external_addresses().cloned().collect(); let protocol_version = String::from(crate::behaviour::PROTOCOL_VERSION); let agent_version = String::from(crate::behaviour::AGENT_VERSION); - let protocols = self.swarm.behaviour().peer_manager.supported_protocols(); + let mut protocols: Vec = self.supported_protocols.iter().cloned().collect(); + protocols.sort(); response_channel .send(Lookup { @@ -1269,7 +1294,7 @@ mod tests { use ssh_key::private::Ed25519Keypair; use test_log::test; - use libp2p::{identity::Keypair as Libp2pKeypair, kad::record::Key}; + use libp2p::{identity::Keypair as Libp2pKeypair, kad::RecordKey}; use super::*; use anyhow::Result; @@ -1326,7 +1351,7 @@ mod tests { /// When `None`, it will use a previously derived peer_id `12D3KooWFma2D63TG9ToSiRsjFkoNm2tTihScTBAEdXxinYk5rwE`. // cspell:disable-line seed: Option, /// Optional `Keys` the node should provide to the DHT on start up. - keys: Option>, + keys: Option>, /// Pass through to node.trust_observed_addrs trust_observed_addrs: bool, } diff --git a/p2p/src/providers.rs b/p2p/src/providers.rs index 9f6bab798..4171fa7e9 100644 --- a/p2p/src/providers.rs +++ b/p2p/src/providers.rs @@ -2,7 +2,7 @@ use std::collections::{HashSet, VecDeque}; use ahash::AHashMap; use libp2p::{ - kad::{self, record::Key, store::MemoryStore, GetProvidersError, QueryId}, + kad::{self, store::MemoryStore, GetProvidersError, QueryId, RecordKey}, PeerId, }; use tokio::sync::mpsc; @@ -15,13 +15,13 @@ const OUTSTANDING_LIMIT: usize = 2048; #[derive(Debug)] pub struct Providers { outstanding_queries: VecDeque, - current_queries: AHashMap, + current_queries: AHashMap, max_running_queries: usize, } #[derive(Debug)] struct Query { - key: Key, + key: RecordKey, queries: Vec, } @@ -48,7 +48,12 @@ impl Providers { } /// Drops queries if the queue is full. - pub fn push(&mut self, key: Key, limit: usize, response_channel: ResponseChannel) -> bool { + pub fn push( + &mut self, + key: RecordKey, + limit: usize, + response_channel: ResponseChannel, + ) -> bool { // Check if we already have a query running if let Some(running_query) = self.current_queries.get_mut(&key) { // send all found providers @@ -96,7 +101,7 @@ impl Providers { &mut self, id: QueryId, is_last: bool, - key: Key, + key: RecordKey, providers: HashSet, kad: &mut kad::Behaviour, ) { diff --git a/p2p/src/publisher.rs b/p2p/src/publisher.rs index 31e7ecd67..01e7f28f2 100644 --- a/p2p/src/publisher.rs +++ b/p2p/src/publisher.rs @@ -11,7 +11,7 @@ use anyhow::Result; use ceramic_metrics::Recorder; use futures_timer::Delay; use futures_util::{future::BoxFuture, Future, Stream}; -use libp2p::kad::{record::Key, AddProviderError, AddProviderOk, AddProviderResult}; +use libp2p::kad::{AddProviderError, AddProviderOk, AddProviderResult, RecordKey}; use multihash::Multihash; use tokio::sync::{ mpsc::{channel, error::TrySendError, Receiver, Sender}, @@ -43,8 +43,8 @@ const BATCH_ADDITIVE_INCREASE: usize = 100; // Publisher implements [`Stream`] to produce batches of DHT keys to provide. pub struct Publisher { start_providing_results_tx: Sender, - batch: Option>, - batches_rx: Receiver>, + batch: Option>, + batches_rx: Receiver>, metrics: Metrics, } @@ -103,7 +103,7 @@ impl Publisher { // This stream produces one key at a time. This way we do not flood the swarm task with lots of // work. impl Stream for Publisher { - type Item = Key; + type Item = RecordKey; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { @@ -134,11 +134,11 @@ struct PublisherWorker { deadline: Instant, batch_size: usize, results_rx: Receiver, - current_queries: HashSet, + current_queries: HashSet, block_store: SQLiteBlockStore, last_hash: Option, batch_complete: Option>, - retries: HashMap, + retries: HashMap, } enum State { @@ -253,7 +253,7 @@ struct Batch { } impl Stream for PublisherWorker { - type Item = Vec; + type Item = Vec; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { trace!(state=?self.state, "poll_next"); @@ -343,7 +343,7 @@ impl Stream for PublisherWorker { let new_count = batch.hashes.len() as i64; // Collect the new keys - let new_keys: Vec = batch + let new_keys: Vec = batch .hashes .iter() .map(|hash| hash.to_bytes().into()) @@ -355,7 +355,7 @@ impl Stream for PublisherWorker { // only retry up to batch size. let mut max_retry_count = 0i64; let mut repeat_count = 0; - let keys: Vec = self + let keys: Vec = self .retries .iter() .take(self.batch_size - new_keys.len()) diff --git a/p2p/src/rpc.rs b/p2p/src/rpc.rs index 2f59826c8..0e565e8a0 100644 --- a/p2p/src/rpc.rs +++ b/p2p/src/rpc.rs @@ -11,13 +11,30 @@ use iroh_rpc_client::{ create_server, Lookup, P2pServer, ServerError, ServerSocket, HEALTH_POLL_WAIT, }; use iroh_rpc_types::{ - p2p::*, RpcError, RpcResult, VersionRequest, VersionResponse, WatchRequest, WatchResponse, + p2p::{ + BitswapRequest, BitswapResponse, ConnectByPeerIdRequest, ConnectRequest, DisconnectRequest, + ExternalAddrsRequest, ExternalAddrsResponse, FetchProvidersDhtRequest, + FetchProvidersDhtResponse, GetListeningAddrsRequest, GetListeningAddrsResponse, + GetPeersRequest, GetPeersResponse, GossipsubAddExplicitPeerRequest, + GossipsubAllMeshPeersRequest, GossipsubAllPeersRequest, GossipsubAllPeersResponse, + GossipsubMeshPeersRequest, GossipsubPeersResponse, GossipsubPublishRequest, + GossipsubPublishResponse, GossipsubRemoveExplicitPeerRequest, GossipsubSubscribeRequest, + GossipsubSubscribeResponse, GossipsubTopicsRequest, GossipsubTopicsResponse, + GossipsubUnsubscribeRequest, GossipsubUnsubscribeResponse, ListenersRequest, + ListenersResponse, LocalPeerIdRequest, LocalPeerIdResponse, LookupLocalRequest, + LookupRequest, LookupResponse, NotifyNewBlocksBitswapRequest, P2pAddr, P2pRequest, + P2pService, ShutdownRequest, StartProvidingRequest, StopProvidingRequest, + StopSessionBitswapRequest, + }, + RpcError, RpcResult, VersionRequest, VersionResponse, WatchRequest, WatchResponse, }; -use libp2p::gossipsub::{MessageId, PublishError, TopicHash}; use libp2p::identify::Info as IdentifyInfo; -use libp2p::kad::record::Key; use libp2p::Multiaddr; use libp2p::PeerId; +use libp2p::{ + gossipsub::{MessageId, PublishError, TopicHash}, + kad::RecordKey, +}; use std::collections::{HashMap, HashSet}; use std::result; use tokio::sync::mpsc::{channel, Receiver, Sender}; @@ -205,7 +222,7 @@ impl P2p { req: FetchProvidersDhtRequest, ) -> anyhow::Result>> { let key_bytes: &[u8] = req.key.0.as_ref(); - let key = libp2p::kad::record::Key::new(&key_bytes); + let key = RecordKey::new(&key_bytes); let cid: Cid = key_bytes.try_into()?; trace!("received fetch_provider_dht: {}", cid); let (s, r) = channel(64); @@ -235,7 +252,7 @@ impl P2p { async fn start_providing(self, req: StartProvidingRequest) -> Result<()> { trace!("received StartProviding request: {:?}", req.key); let key_bytes: &[u8] = req.key.0.as_ref(); - let key = libp2p::kad::record::Key::new(&key_bytes); + let key = RecordKey::new(&key_bytes); let (s, r) = oneshot::channel(); let msg = RpcMessage::StartProviding(s, key); @@ -251,7 +268,7 @@ impl P2p { async fn stop_providing(self, req: StopProvidingRequest) -> Result<()> { trace!("received StopProviding request: {:?}", req.key); let key_bytes: &[u8] = req.key.0.as_ref(); - let key = libp2p::kad::record::Key::new(&key_bytes); + let key = RecordKey::new(&key_bytes); let (s, r) = oneshot::channel(); let msg = RpcMessage::StopProviding(s, key); @@ -636,7 +653,7 @@ fn peer_info_from_lookup(l: Lookup) -> LookupResponse { #[derive(Debug)] pub enum ProviderRequestKey { - Dht(Key), + Dht(RecordKey), Bitswap(u64, Cid), } @@ -665,8 +682,8 @@ pub enum RpcMessage { response_channel: Sender, String>>, limit: usize, }, - StartProviding(oneshot::Sender>, Key), - StopProviding(oneshot::Sender>, Key), + StartProviding(oneshot::Sender>, RecordKey), + StopProviding(oneshot::Sender>, RecordKey), NetListeningAddrs(oneshot::Sender<(PeerId, Vec)>), NetPeers(oneshot::Sender>>), NetConnectByPeerId(oneshot::Sender>, PeerId), diff --git a/p2p/src/swarm.rs b/p2p/src/swarm.rs index 79707ee83..2a06e3cc9 100644 --- a/p2p/src/swarm.rs +++ b/p2p/src/swarm.rs @@ -1,137 +1,101 @@ -use std::time::Duration; - use anyhow::Result; use ceramic_core::{EventId, Interest}; -use futures::future::Either; -use libp2p::{ - core::{self, muxing::StreamMuxerBox, transport::Boxed}, - dns, noise, - swarm::{Config, Executor}, - tcp, websocket, - yamux::{self, WindowUpdateMode}, - PeerId, Swarm, Transport, -}; +use libp2p::{noise, relay, swarm::Executor, tcp, tls, yamux, Swarm, SwarmBuilder}; use libp2p_identity::Keypair; use recon::{libp2p::Recon, Sha256a}; use crate::{behaviour::NodeBehaviour, Libp2pConfig, Metrics, SQLiteBlockStore}; -/// Builds the transport stack that LibP2P will communicate over. -async fn build_transport( - keypair: &Keypair, +pub(crate) async fn build_swarm( config: &Libp2pConfig, -) -> ( - Boxed<(PeerId, StreamMuxerBox)>, - Option, -) { - // TODO: make transports configurable - - let port_reuse = true; - let connection_timeout = Duration::from_secs(30); - - // TCP - let tcp_config = tcp::Config::default().port_reuse(port_reuse); - let tcp_transport = tcp::tokio::Transport::new(tcp_config.clone()); - - // Websockets - let ws_tcp = websocket::WsConfig::new(tcp::tokio::Transport::new(tcp_config)); - let tcp_ws_transport = tcp_transport.or_transport(ws_tcp); - - // Quic - let quic_config = libp2p_quic::Config::new(keypair); - let quic_transport = libp2p_quic::tokio::Transport::new(quic_config); - - // Noise config for TCP & Websockets - let auth_config = - noise::Config::new(keypair).expect("should be able to configure noise with keypair"); - - // Stream muxer config for TCP & Websockets - let muxer_config = { - let mut mplex_config = libp2p_mplex::MplexConfig::new(); - mplex_config.set_max_buffer_size(usize::MAX); - - let mut yamux_config = yamux::Config::default(); - yamux_config.set_max_buffer_size(16 * 1024 * 1024); // TODO: configurable - yamux_config.set_receive_window_size(16 * 1024 * 1024); // TODO: configurable - yamux_config.set_window_update_mode(WindowUpdateMode::on_receive()); - core::upgrade::SelectUpgrade::new(yamux_config, mplex_config) + keypair: Keypair, + recons: Option<(I, M)>, + block_store: SQLiteBlockStore, + metrics: Metrics, +) -> Result>> +where + I: Recon, + M: Recon, +{ + let builder = SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_tcp( + tcp::Config::default().port_reuse(true), + noise::Config::new, + yamux::Config::default, + )? + .with_quic() + .with_dns()? + .with_websocket( + (tls::Config::new, noise::Config::new), + yamux::Config::default, + ) + .await?; + + let with_config = |cfg: libp2p::swarm::Config| { + cfg.with_notify_handler_buffer_size(config.notify_handler_buffer_size) + .with_per_connection_event_buffer_size(config.connection_event_buffer_size) + .with_dial_concurrency_factor(config.dial_concurrency_factor) + .with_idle_connection_timeout(config.idle_connection_timeout) }; - - // Enable Relay if enabled - let (tcp_ws_transport, relay_client) = if config.relay_client { - let (relay_transport, relay_client) = - libp2p::relay::client::new(keypair.public().to_peer_id()); - - let transport = relay_transport - .or_transport(tcp_ws_transport) - .upgrade(core::upgrade::Version::V1Lazy) - .authenticate(auth_config) - .multiplex(muxer_config) - .timeout(connection_timeout) - .boxed(); - - (transport, Some(relay_client)) + if config.relay_client { + Ok(builder + .with_relay_client( + (tls::Config::new, noise::Config::new), + yamux::Config::default, + )? + .with_behaviour(|keypair, relay_client| { + new_behavior( + config, + keypair, + Some(relay_client), + recons, + block_store, + metrics.clone(), + ) + .map_err(|err| err.into()) + })? + .with_swarm_config(with_config) + .build()) } else { - let tcp_transport = tcp_ws_transport - .upgrade(core::upgrade::Version::V1Lazy) - .authenticate(auth_config) - .multiplex(muxer_config) - .boxed(); - - (tcp_transport, None) - }; - - // Merge in quic - let transport = quic_transport - .or_transport(tcp_ws_transport) - .map(|o, _| match o { - Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - }) - .boxed(); - - // Setup dns resolution - - let dns_cfg = dns::ResolverConfig::cloudflare(); - let dns_opts = dns::ResolverOpts::default(); - let transport = dns::tokio::Transport::custom(transport, dns_cfg, dns_opts) - .unwrap() - .boxed(); - - (transport, relay_client) + Ok(builder + .with_behaviour(|keypair| { + new_behavior(config, keypair, None, recons, block_store, metrics.clone()) + .map_err(|err| err.into()) + })? + .with_swarm_config(with_config) + .build()) + } } -pub(crate) async fn build_swarm( +fn new_behavior( config: &Libp2pConfig, keypair: &Keypair, + relay_client: Option, recons: Option<(I, M)>, block_store: SQLiteBlockStore, metrics: Metrics, -) -> Result>> +) -> Result> where - I: Recon, - M: Recon, + I: Recon + Send, + M: Recon + Send, { - let peer_id = keypair.public().to_peer_id(); - - let (transport, relay_client) = build_transport(keypair, config).await; - let behaviour = NodeBehaviour::new( - keypair, - config, - relay_client, - recons, - block_store, - metrics.clone(), - ) - .await?; - - let swarm_config = Config::with_tokio_executor() - .with_notify_handler_buffer_size(config.notify_handler_buffer_size) - .with_per_connection_event_buffer_size(config.connection_event_buffer_size) - .with_dial_concurrency_factor(config.dial_concurrency_factor) - .with_idle_connection_timeout(config.idle_connection_timeout); - - Ok(Swarm::new(transport, behaviour, peer_id, swarm_config)) + // TODO(WS1-1363): Remove bitswap async initialization + let keypair = keypair.clone(); + let config = config.clone(); + let handle = tokio::runtime::Handle::current(); + std::thread::spawn(move || { + handle.block_on(NodeBehaviour::new( + &keypair, + &config, + relay_client, + recons, + block_store, + metrics, + )) + }) + .join() + .unwrap() } struct Tokio; diff --git a/recon/Cargo.toml b/recon/Cargo.toml index 2ab82704f..1309d11c9 100644 --- a/recon/Cargo.toml +++ b/recon/Cargo.toml @@ -31,7 +31,8 @@ void.workspace = true codespan-reporting = "0.11.1" expect-test.workspace = true lalrpop-util = { version = "0.20.0", features = ["lexer"] } -libp2p-swarm-test = "0.2.0" +libp2p = { workspace = true, features = ["ping"] } +libp2p-swarm-test = "0.3.0" pretty = "0.12.1" quickcheck = "1.0.3" regex = "1" diff --git a/recon/src/libp2p.rs b/recon/src/libp2p.rs index afd275514..c84255b42 100644 --- a/recon/src/libp2p.rs +++ b/recon/src/libp2p.rs @@ -117,15 +117,12 @@ pub struct Config { /// Start a new sync once the duration has past in the failed or synchronized state. /// Defaults to 1 second. pub per_peer_sync_timeout: Duration, - /// Duration to keep the connection alive, even when not in use. - pub idle_keep_alive: Duration, } impl Default for Config { fn default() -> Self { Self { per_peer_sync_timeout: Duration::from_millis(1000), - idle_keep_alive: Duration::from_millis(1000), } } } @@ -202,7 +199,7 @@ where type ToSwarm = Event; - fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) { + fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) { match event { libp2p::swarm::FromSwarm::ConnectionEstablished(info) => { self.peers.insert( @@ -256,6 +253,9 @@ where libp2p::swarm::FromSwarm::ExternalAddrExpired(_) => { debug!(kind = "ExternalAddrExpired", "ignored swarm event") } + _ => { + debug!("ignored unknown swarm event") + } } } @@ -315,7 +315,6 @@ where fn poll( &mut self, _cx: &mut std::task::Context<'_>, - _params: &mut impl libp2p::swarm::PollParameters, ) -> std::task::Poll>> { // Handle queue of swarm events. @@ -390,7 +389,6 @@ where peer, connection_id, handler::State::WaitingInbound, - self.config.idle_keep_alive, self.interest.clone(), self.model.clone(), )) @@ -411,7 +409,6 @@ where handler::State::RequestOutbound { stream_set: StreamSet::Interest, }, - self.config.idle_keep_alive, self.interest.clone(), self.model.clone(), )) diff --git a/recon/src/libp2p/handler.rs b/recon/src/libp2p/handler.rs index cba362b5f..1f4c2ae4c 100644 --- a/recon/src/libp2p/handler.rs +++ b/recon/src/libp2p/handler.rs @@ -2,11 +2,7 @@ //! //! A handler is created for each connected peer that speaks the Recon protocol. //! A handler is responsible for performing Recon synchronization with a peer. -use std::{ - collections::VecDeque, - task::Poll, - time::{Duration, Instant}, -}; +use std::{collections::VecDeque, task::Poll}; use anyhow::Result; use ceramic_core::{EventId, Interest}; @@ -14,7 +10,7 @@ use libp2p::{ futures::FutureExt, swarm::{ handler::{FullyNegotiatedInbound, FullyNegotiatedOutbound}, - ConnectionHandler, ConnectionHandlerEvent, ConnectionId, KeepAlive, SubstreamProtocol, + ConnectionHandler, ConnectionHandlerEvent, ConnectionId, SubstreamProtocol, }, }; use libp2p_identity::PeerId; @@ -32,8 +28,6 @@ pub struct Handler { interest: I, model: M, state: State, - keep_alive_duration: Duration, - keep_alive: KeepAlive, behavior_events_queue: VecDeque, } @@ -46,7 +40,6 @@ where peer_id: PeerId, connection_id: ConnectionId, state: State, - keep_alive_duration: Duration, interest: I, model: M, ) -> Self { @@ -56,8 +49,6 @@ where interest, model, state, - keep_alive_duration, - keep_alive: KeepAlive::Yes, behavior_events_queue: VecDeque::new(), } } @@ -66,8 +57,6 @@ where // See doc comment for State, each row of the transitions table // should map to exactly one call of this transition_state function. // - // TODO(WS1-1291): Remove uses of KeepAlive::Until - #[allow(deprecated)] fn transition_state(&mut self, state: State) { debug!( %self.remote_peer_id, @@ -77,16 +66,6 @@ where "state transition" ); self.state = state; - // Update KeepAlive - self.keep_alive = match (&self.state, self.keep_alive) { - (State::Idle, k @ KeepAlive::Until(_)) => k, - (State::Idle, _) => KeepAlive::Until(Instant::now() + self.keep_alive_duration), - (State::RequestOutbound { .. }, _) - | (State::WaitingInbound, _) - | (State::WaitingOutbound { .. }, _) - | (State::Outbound(_), _) - | (State::Inbound(_), _) => KeepAlive::Yes, - }; } } @@ -175,7 +154,6 @@ where { type FromBehaviour = FromBehaviour; type ToBehaviour = FromHandler; - type Error = Failure; type InboundProtocol = MultiReadyUpgrade; type OutboundProtocol = MultiReadyUpgrade; type OutboundOpenInfo = (); @@ -190,22 +168,16 @@ where ) } - fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive + fn connection_keep_alive(&self) -> bool { + // Only keep the connection alive if we are not idle + !matches!(&self.state, State::Idle) } - // TODO(WS1-1345): Remove uses of ConnectionHandler::Error - #[allow(deprecated)] fn poll( &mut self, cx: &mut std::task::Context<'_>, ) -> Poll< - ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::ToBehaviour, - Self::Error, - >, + ConnectionHandlerEvent, > { if let Some(event) = self.behavior_events_queue.pop_back() { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); @@ -382,6 +354,9 @@ where libp2p::swarm::handler::ConnectionEvent::RemoteProtocolsChange(changes) => { debug!(?changes, "remote protocols change") } + _ => { + debug!("ignoring unknown connection event") + } } } } diff --git a/recon/src/libp2p/tests.rs b/recon/src/libp2p/tests.rs index 7fa5109d9..98f63c123 100644 --- a/recon/src/libp2p/tests.rs +++ b/recon/src/libp2p/tests.rs @@ -22,7 +22,10 @@ use ceramic_core::{Cid, EventId, Interest, Network, PeerId}; use cid::multihash::{Code, MultihashDigest}; -use libp2p::swarm::{keep_alive, Swarm, SwarmEvent}; +use libp2p::{ + ping, + swarm::{Swarm, SwarmEvent}, +}; use libp2p_swarm_test::SwarmExt; use quickcheck::QuickCheck; use rand::{thread_rng, Rng}; @@ -141,7 +144,7 @@ fn recon_sync() { let mut swarm2 = build_swarm(&runtime, "swarm2", config); runtime.block_on(async { - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; for _ in 0..drive_count.get() { @@ -267,10 +270,10 @@ fn unsupported_does_not_fail() { .build() .unwrap(); - let mut swarm1 = Swarm::new_ephemeral(|_| keep_alive::Behaviour); + let mut swarm1 = Swarm::new_ephemeral(|_| ping::Behaviour::default()); let mut swarm2 = build_swarm(&runtime, "swarm2", Config::default()); let result = runtime.block_on(async { - swarm1.listen().await; + swarm1.listen().with_memory_addr_external().await; swarm2.connect(&mut swarm1).await; runtime.spawn(swarm1.loop_on_next());