diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml new file mode 100644 index 000000000..85cd693ae --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.yml @@ -0,0 +1,55 @@ +name: Bug Report +description: Create a bug report for jvm-libp2p + +body: + - type: markdown + attributes: + value: | + Thank you for filing a bug report! + - type: textarea + attributes: + label: Summary + description: Please provide a short summary of the bug, along with any information you feel relevant to replicate the bug. + validations: + required: true + - type: textarea + attributes: + label: Expected behavior + description: Describe what you expect to happen. + validations: + required: true + - type: textarea + attributes: + label: Actual behavior + description: Describe what actually happens. + validations: + required: true + - type: textarea + attributes: + label: Relevant log output + description: Please copy and paste any relevant log output. This will be automatically formatted into code, so no need for backticks. + render: shell + validations: + required: false + - type: textarea + attributes: + label: Possible Solution + description: Suggest a fix/reason for the bug, or ideas how to implement the addition or change. + validations: + required: false + - type: textarea + attributes: + label: Version + description: Which version of libp2p are you using? libp2p version (version number, commit, or branch) + validations: + required: false + - type: dropdown + attributes: + label: Would you like to work on fixing this bug ? + description: Any contribution towards fixing the bug is greatly appreciated. We are more than happy to provide help on the process. + options: + - "Yes" + - "No" + - Maybe + validations: + required: true diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml new file mode 100644 index 000000000..5842c85fe --- /dev/null +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -0,0 +1,8 @@ +blank_issues_enabled: true +contact_links: + - name: Technical Questions + url: https://github.com/libp2p/jvm-libp2p/discussions/new?category=q-a + about: Please ask technical questions in the jvm-libp2p Github Discussions forum. + - name: Community-wide libp2p Discussion + url: https://discuss.libp2p.io + about: Discussions and questions about the libp2p community. diff --git a/.github/ISSUE_TEMPLATE/enhancement.yml b/.github/ISSUE_TEMPLATE/enhancement.yml new file mode 100644 index 000000000..65def1ad7 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/enhancement.yml @@ -0,0 +1,31 @@ +name: Enhancement +description: Suggest an improvement to an existing jvm-libp2p feature. +body: + - type: textarea + attributes: + label: Description + description: Describe the enhancement that you are proposing. + validations: + required: true + - type: textarea + attributes: + label: Motivation + description: Explain why this enhancement is beneficial. + validations: + required: true + - type: textarea + attributes: + label: Current Implementation + description: Describe the current implementation. + validations: + required: true + - type: dropdown + attributes: + label: Are you planning to do it yourself in a pull request ? + description: Any contribution is greatly appreciated. We are more than happy to provide help on the process. + options: + - "Yes" + - "No" + - Maybe + validations: + required: true diff --git a/.github/ISSUE_TEMPLATE/feature_request.yml b/.github/ISSUE_TEMPLATE/feature_request.yml new file mode 100644 index 000000000..411c43e38 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.yml @@ -0,0 +1,42 @@ +name: Feature request +description: Suggest a new feature in jvm-libp2p +body: + - type: markdown + attributes: + value: | + If you'd like to suggest a feature related to libp2p but not specifically related to the JVM implementation, please file an issue at https://github.com/libp2p/specs instead. + - type: textarea + attributes: + label: Description + description: Briefly describe the feature that you are requesting. + validations: + required: true + - type: textarea + attributes: + label: Motivation + description: Explain why this feature is needed. + validations: + required: true + - type: textarea + attributes: + label: Requirements + description: Write a list of what you want this feature to do. + placeholder: "1." + validations: + required: true + - type: textarea + attributes: + label: Open questions + description: Use this section to ask any questions that are related to the feature. + validations: + required: false + - type: dropdown + attributes: + label: Are you planning to do it yourself in a pull request ? + description: Any contribution is greatly appreciated. We are more than happy to provide help on the process. + options: + - "Yes" + - "No" + - Maybe + validations: + required: true diff --git a/README.md b/README.md index 3229f3d41..b7ec0ae09 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ List of components in the Libp2p spec and their JVM implementation status | **Stream Multiplexing** | [yamux](https://github.com/libp2p/specs/blob/master/yamux/README.md) | :lemon: | | | [mplex](https://github.com/libp2p/specs/blob/master/mplex/README.md) | :green_apple: | | **NAT Traversal** | [circuit-relay-v2](https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md) | :lemon: | -| | [autonat](https://github.com/libp2p/specs/tree/master/autonat) | | +| | [autonat](https://github.com/libp2p/specs/tree/master/autonat) | :lemon: | | | [hole-punching](https://github.com/libp2p/specs/blob/master/connections/hole-punching.md) | | | **Discovery** | [bootstrap](https://github.com/libp2p/specs/blob/master/kad-dht/README.md#bootstrap-process) | | | | random-walk | | diff --git a/build.gradle.kts b/build.gradle.kts index 5e5757281..8b26fcdee 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -18,7 +18,7 @@ plugins { id("io.gitlab.arturbosch.detekt").version("1.22.0") id("java") id("maven-publish") - id("org.jetbrains.dokka").version("1.9.0") + id("org.jetbrains.dokka").version("1.9.20") id("com.diffplug.spotless").version("6.21.0") id("java-test-fixtures") id("io.spring.dependency-management").version("1.1.3") @@ -37,7 +37,7 @@ configure( } ) { group = "io.libp2p" - version = "1.1.0-RELEASE" + version = "1.1.1-RELEASE" apply(plugin = "kotlin") apply(plugin = "idea") diff --git a/libp2p/src/main/java/io/libp2p/protocol/autonat/AutonatProtocol.java b/libp2p/src/main/java/io/libp2p/protocol/autonat/AutonatProtocol.java new file mode 100644 index 000000000..5425ba16a --- /dev/null +++ b/libp2p/src/main/java/io/libp2p/protocol/autonat/AutonatProtocol.java @@ -0,0 +1,172 @@ +package io.libp2p.protocol.autonat; + +import com.google.protobuf.*; +import io.libp2p.core.*; +import io.libp2p.core.Stream; +import io.libp2p.core.multiformats.*; +import io.libp2p.core.multistream.*; +import io.libp2p.protocol.*; +import io.libp2p.protocol.autonat.pb.*; +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.*; +import org.jetbrains.annotations.*; + +public class AutonatProtocol extends ProtobufProtocolHandler { + + public static class Binding extends StrictProtocolBinding { + public Binding() { + super("/libp2p/autonat/v1.0.0", new AutonatProtocol()); + } + } + + public interface AutoNatController { + CompletableFuture rpc(Autonat.Message req); + + default CompletableFuture requestDial( + PeerId ourId, List us) { + if (us.isEmpty()) + throw new IllegalStateException("Requested autonat dial with no addresses!"); + return rpc(Autonat.Message.newBuilder() + .setType(Autonat.Message.MessageType.DIAL) + .setDial( + Autonat.Message.Dial.newBuilder() + .setPeer( + Autonat.Message.PeerInfo.newBuilder() + .addAllAddrs( + us.stream() + .map(a -> ByteString.copyFrom(a.serialize())) + .collect(Collectors.toList())) + .setId(ByteString.copyFrom(ourId.getBytes())))) + .build()) + .thenApply(msg -> msg.getDialResponse()); + } + } + + public static class Sender implements ProtocolMessageHandler, AutoNatController { + private final Stream stream; + private final LinkedBlockingDeque> queue = + new LinkedBlockingDeque<>(); + + public Sender(Stream stream) { + this.stream = stream; + } + + @Override + public void onMessage(@NotNull Stream stream, Autonat.Message msg) { + queue.poll().complete(msg); + } + + public CompletableFuture rpc(Autonat.Message req) { + CompletableFuture res = new CompletableFuture<>(); + queue.add(res); + stream.writeAndFlush(req); + return res; + } + } + + private static boolean sameIP(Multiaddr a, Multiaddr b) { + if (a.has(Protocol.IP4)) + return a.getFirstComponent(Protocol.IP4).equals(b.getFirstComponent(Protocol.IP4)); + if (a.has(Protocol.IP6)) + return a.getFirstComponent(Protocol.IP6).equals(b.getFirstComponent(Protocol.IP6)); + return false; + } + + private static boolean reachableIP(Multiaddr a) { + try { + if (a.has(Protocol.IP4)) + return InetAddress.getByName(a.getFirstComponent(Protocol.IP4).getStringValue()) + .isReachable(1000); + if (a.has(Protocol.IP6)) + return InetAddress.getByName(a.getFirstComponent(Protocol.IP6).getStringValue()) + .isReachable(1000); + } catch (IOException e) { + } + return false; + } + + public static class Receiver + implements ProtocolMessageHandler, AutoNatController { + private final Stream p2pstream; + + public Receiver(Stream p2pstream) { + this.p2pstream = p2pstream; + } + + @Override + public void onMessage(@NotNull Stream stream, Autonat.Message msg) { + switch (msg.getType()) { + case DIAL: + { + Autonat.Message.Dial dial = msg.getDial(); + PeerId peerId = new PeerId(dial.getPeer().getId().toByteArray()); + List requestedDials = + dial.getPeer().getAddrsList().stream() + .map(s -> Multiaddr.deserialize(s.toByteArray())) + .collect(Collectors.toList()); + PeerId streamPeerId = stream.remotePeerId(); + if (!peerId.equals(streamPeerId)) { + p2pstream.close(); + return; + } + + Multiaddr remote = stream.getConnection().remoteAddress(); + Optional reachable = + requestedDials.stream() + .filter(a -> sameIP(a, remote)) + .filter(a -> !a.has(Protocol.P2PCIRCUIT)) + .filter(a -> reachableIP(a)) + .findAny(); + Autonat.Message.Builder resp = + Autonat.Message.newBuilder().setType(Autonat.Message.MessageType.DIAL_RESPONSE); + if (reachable.isPresent()) { + resp = + resp.setDialResponse( + Autonat.Message.DialResponse.newBuilder() + .setStatus(Autonat.Message.ResponseStatus.OK) + .setAddr(ByteString.copyFrom(reachable.get().serialize()))); + } else { + resp = + resp.setDialResponse( + Autonat.Message.DialResponse.newBuilder() + .setStatus(Autonat.Message.ResponseStatus.E_DIAL_ERROR)); + } + p2pstream.writeAndFlush(resp); + } + default: + { + } + } + } + + public CompletableFuture rpc(Autonat.Message msg) { + return CompletableFuture.failedFuture( + new IllegalStateException("Cannot send form a receiver!")); + } + } + + private static final int TRAFFIC_LIMIT = 2 * 1024; + + public AutonatProtocol() { + super(Autonat.Message.getDefaultInstance(), TRAFFIC_LIMIT, TRAFFIC_LIMIT); + } + + @NotNull + @Override + protected CompletableFuture onStartInitiator(@NotNull Stream stream) { + Sender replyPropagator = new Sender(stream); + stream.pushHandler(replyPropagator); + return CompletableFuture.completedFuture(replyPropagator); + } + + @NotNull + @Override + protected CompletableFuture onStartResponder(@NotNull Stream stream) { + Receiver dialer = new Receiver(stream); + stream.pushHandler(dialer); + return CompletableFuture.completedFuture(dialer); + } +} diff --git a/libp2p/src/main/kotlin/io/libp2p/discovery/MDnsDiscovery.kt b/libp2p/src/main/kotlin/io/libp2p/discovery/MDnsDiscovery.kt index 658103db3..9838f4ad8 100644 --- a/libp2p/src/main/kotlin/io/libp2p/discovery/MDnsDiscovery.kt +++ b/libp2p/src/main/kotlin/io/libp2p/discovery/MDnsDiscovery.kt @@ -76,7 +76,14 @@ class MDnsDiscovery( val address = host.listenAddresses().find { it.has(Protocol.IP4) } - val str = address?.getFirstComponent(Protocol.TCP)?.stringValue!! + val ipv6OnlyAddress = if (address == null) { + host.listenAddresses().find { + it.has(Protocol.IP6) + } + } else { + address + } + val str = ipv6OnlyAddress?.getFirstComponent(Protocol.TCP)?.stringValue!! return Integer.parseInt(str) } diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt index eb8033cea..4dd764097 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/AbstractRouter.kt @@ -323,7 +323,11 @@ abstract class AbstractRouter( override fun getPeerTopics(): CompletableFuture>> { return submitOnEventThread { - peersTopics.asFirstToSecondMap().mapKeys { it.key.peerId } + peersTopics.asFirstToSecondMap() + .map { (key, value) -> + key.peerId to value.toSet() + } + .toMap() } } diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt index b1de2bd05..d548c517f 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRouter.kt @@ -317,6 +317,10 @@ open class GossipRouter( } private fun handleIHave(msg: Rpc.ControlIHave, peer: PeerHandler) { + // we ignore IHAVE gossip for unknown topics + if (msg.hasTopicID() && !mesh.containsKey(msg.topicID)) { + return + } val peerScore = score.score(peer.peerId) // we ignore IHAVE gossip from any peer whose score is below the gossip threshold if (peerScore < scoreParams.gossipThreshold) return @@ -544,7 +548,7 @@ open class GossipRouter( peers.shuffled(random) .take(max((params.gossipFactor * peers.size).toInt(), params.DLazy)) - .forEach { enqueueIhave(it, shuffledMessageIds) } + .forEach { enqueueIhave(it, shuffledMessageIds, topic) } } private fun graft(peer: PeerHandler, topic: Topic) { @@ -587,8 +591,8 @@ open class GossipRouter( private fun enqueueIwant(peer: PeerHandler, messageIds: List) = pendingRpcParts.getQueue(peer).addIWants(messageIds) - private fun enqueueIhave(peer: PeerHandler, messageIds: List) = - pendingRpcParts.getQueue(peer).addIHaves(messageIds) + private fun enqueueIhave(peer: PeerHandler, messageIds: List, topic: Topic) = + pendingRpcParts.getQueue(peer).addIHaves(messageIds, topic) data class AcceptRequestsWhitelistEntry(val whitelistedTill: Long, val messagesAccepted: Int = 0) { fun incrementMessageCount() = AcceptRequestsWhitelistEntry(whitelistedTill, messagesAccepted + 1) diff --git a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueue.kt b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueue.kt index cc5fe7893..e90332589 100644 --- a/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueue.kt +++ b/libp2p/src/main/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueue.kt @@ -10,8 +10,8 @@ import pubsub.pb.Rpc interface GossipRpcPartsQueue : RpcPartsQueue { - fun addIHave(messageId: MessageId) - fun addIHaves(messageIds: Collection) = messageIds.forEach { addIHave(it) } + fun addIHave(messageId: MessageId, topic: Topic) + fun addIHaves(messageIds: Collection, topic: Topic) = messageIds.forEach { addIHave(it, topic) } fun addIWant(messageId: MessageId) fun addIWants(messageIds: Collection) = messageIds.forEach { addIWant(it) } @@ -37,14 +37,13 @@ open class DefaultGossipRpcPartsQueue( private val params: GossipParams ) : DefaultRpcPartsQueue(), GossipRpcPartsQueue { - protected data class IHavePart(val messageId: MessageId) : AbstractPart { + protected data class IHavePart(val messageId: MessageId, val topic: Topic) : AbstractPart { override fun appendToBuilder(builder: Rpc.RPC.Builder) { val ctrlBuilder = builder.controlBuilder - val iHaveBuilder = if (ctrlBuilder.ihaveBuilderList.isEmpty()) { - ctrlBuilder.addIhaveBuilder() - } else { - ctrlBuilder.getIhaveBuilder(0) - } + val iHaveBuilder = ctrlBuilder.ihaveBuilderList + .find { it.topicID == topic } + ?: ctrlBuilder.addIhaveBuilder().setTopicID(topic) + iHaveBuilder.addMessageIDs(messageId.toProtobuf()) } } @@ -82,8 +81,8 @@ open class DefaultGossipRpcPartsQueue( } } - override fun addIHave(messageId: MessageId) { - addPart(IHavePart(messageId)) + override fun addIHave(messageId: MessageId, topic: Topic) { + addPart(IHavePart(messageId, topic)) } override fun addIWant(messageId: MessageId) { diff --git a/libp2p/src/main/kotlin/io/libp2p/security/noise/NoiseXXSecureChannel.kt b/libp2p/src/main/kotlin/io/libp2p/security/noise/NoiseXXSecureChannel.kt index 8ab7bcb14..c9c02d82b 100644 --- a/libp2p/src/main/kotlin/io/libp2p/security/noise/NoiseXXSecureChannel.kt +++ b/libp2p/src/main/kotlin/io/libp2p/security/noise/NoiseXXSecureChannel.kt @@ -97,7 +97,8 @@ class NoiseIoHandshake( private var sentNoiseKeyPayload = false private var instancePayload: ByteArray? = null private var activated = false - private var remotePeerId: PeerId? = null + private var remotePubKey: PubKey? = null + private val remotePeerId: PeerId? get() = remotePubKey?.let { PeerId.fromPubKey(it) } private var expectedRemotePeerId: PeerId? = null init { @@ -139,7 +140,7 @@ class NoiseIoHandshake( // the remote public key has been provided by the XX protocol val derivedRemotePublicKey = handshakeState.remotePublicKey if (derivedRemotePublicKey.hasPublicKey()) { - remotePeerId = verifyPayload(ctx, instancePayload!!, derivedRemotePublicKey) + remotePubKey = verifyPayload(ctx, instancePayload!!, derivedRemotePublicKey) if (role == Role.INIT && expectedRemotePeerId != remotePeerId) { throw InvalidRemotePubKey() } @@ -248,7 +249,7 @@ class NoiseIoHandshake( ctx: ChannelHandlerContext, payload: ByteArray, remotePublicKeyState: DHState - ): PeerId { + ): PubKey { log.debug("Verifying noise static key payload") val (pubKeyFromMessage, signatureFromMessage) = unpackKeyAndSignature(payload) @@ -264,7 +265,7 @@ class NoiseIoHandshake( handshakeFailed(ctx, InvalidRemotePubKey()) } - return PeerId.fromPubKey(pubKeyFromMessage) + return pubKeyFromMessage } // verifyPayload private fun unpackKeyAndSignature(payload: ByteArray): Pair { @@ -287,7 +288,7 @@ class NoiseIoHandshake( val secureSession = NoiseSecureChannelSession( PeerId.fromPubKey(localKey.publicKey()), remotePeerId!!, - localKey.publicKey(), + remotePubKey!!, aliceSplit, bobSplit ) diff --git a/libp2p/src/main/proto/autonat.proto b/libp2p/src/main/proto/autonat.proto new file mode 100644 index 000000000..0e92a5178 --- /dev/null +++ b/libp2p/src/main/proto/autonat.proto @@ -0,0 +1,37 @@ +syntax = "proto2"; + +package io.libp2p.protocol.autonat.pb; + +message Message { + enum MessageType { + DIAL = 0; + DIAL_RESPONSE = 1; + } + + enum ResponseStatus { + OK = 0; + E_DIAL_ERROR = 100; + E_DIAL_REFUSED = 101; + E_BAD_REQUEST = 200; + E_INTERNAL_ERROR = 300; + } + + message PeerInfo { + optional bytes id = 1; + repeated bytes addrs = 2; + } + + message Dial { + optional PeerInfo peer = 1; + } + + message DialResponse { + optional ResponseStatus status = 1; + optional string statusText = 2; + optional bytes addr = 3; + } + + optional MessageType type = 1; + optional Dial dial = 2; + optional DialResponse dialResponse = 3; +} diff --git a/libp2p/src/test/java/io/libp2p/core/AutonatTestJava.java b/libp2p/src/test/java/io/libp2p/core/AutonatTestJava.java new file mode 100644 index 000000000..5d74c06de --- /dev/null +++ b/libp2p/src/test/java/io/libp2p/core/AutonatTestJava.java @@ -0,0 +1,73 @@ +package io.libp2p.core; + +import io.libp2p.core.dsl.*; +import io.libp2p.core.multiformats.*; +import io.libp2p.core.mux.*; +import io.libp2p.protocol.*; +import io.libp2p.protocol.autonat.*; +import io.libp2p.protocol.autonat.pb.*; +import io.libp2p.security.noise.*; +import io.libp2p.transport.tcp.*; +import java.util.concurrent.*; +import org.junit.jupiter.api.*; + +public class AutonatTestJava { + + @Test + void autonatDial() throws Exception { + Host clientHost = + new HostBuilder() + .transport(TcpTransport::new) + .secureChannel(NoiseXXSecureChannel::new) + .muxer(StreamMuxerProtocol::getYamux) + .protocol(new Ping()) + .protocol(new AutonatProtocol.Binding()) + .listen("/ip4/127.0.0.1/tcp/0") + .build(); + + Host serverHost = + new HostBuilder() + .transport(TcpTransport::new) + .secureChannel(NoiseXXSecureChannel::new) + .muxer(StreamMuxerProtocol::getYamux) + .protocol(new Ping()) + .protocol(new AutonatProtocol.Binding()) + .listen("/ip4/127.0.0.1/tcp/0") + .build(); + + CompletableFuture clientStarted = clientHost.start(); + CompletableFuture serverStarted = serverHost.start(); + clientStarted.get(5, TimeUnit.SECONDS); + System.out.println("Client started"); + serverStarted.get(5, TimeUnit.SECONDS); + System.out.println("Server started"); + + StreamPromise autonat = + clientHost + .getNetwork() + .connect(serverHost.getPeerId(), serverHost.listenAddresses().get(0)) + .thenApply(it -> it.muxerSession().createStream(new AutonatProtocol.Binding())) + .get(5, TimeUnit.SECONDS); + + Stream autonatStream = autonat.getStream().get(5, TimeUnit.SECONDS); + System.out.println("Autonat stream created"); + AutonatProtocol.AutoNatController autonatCtr = autonat.getController().get(5, TimeUnit.SECONDS); + System.out.println("Autonat controller created"); + + Autonat.Message.DialResponse resp = + autonatCtr + .requestDial(clientHost.getPeerId(), clientHost.listenAddresses()) + .get(5, TimeUnit.SECONDS); + Assertions.assertEquals(resp.getStatus(), Autonat.Message.ResponseStatus.OK); + Multiaddr received = Multiaddr.deserialize(resp.getAddr().toByteArray()); + Assertions.assertEquals(received, clientHost.listenAddresses().get(0)); + + autonatStream.close().get(5, TimeUnit.SECONDS); + System.out.println("Autonat stream closed"); + + clientHost.stop().get(5, TimeUnit.SECONDS); + System.out.println("Client stopped"); + serverHost.stop().get(5, TimeUnit.SECONDS); + System.out.println("Server stopped"); + } +} diff --git a/libp2p/src/test/java/io/libp2p/core/RelayTestJava.java b/libp2p/src/test/java/io/libp2p/core/RelayTestJava.java index 03ce8c28a..cc6c4a53a 100644 --- a/libp2p/src/test/java/io/libp2p/core/RelayTestJava.java +++ b/libp2p/src/test/java/io/libp2p/core/RelayTestJava.java @@ -31,8 +31,6 @@ private static void enableRelay(BuilderJ b, List @Test void pingOverLocalRelay() throws Exception { - String localListenAddress = "/ip4/127.0.0.1/tcp/40002"; - Host relayHost = new HostBuilder() .builderModifier(b -> enableRelay(b, Collections.emptyList())) @@ -79,7 +77,7 @@ void pingOverLocalRelay() throws Exception { .secureChannel(NoiseXXSecureChannel::new) .muxer(StreamMuxerProtocol::getYamux) .protocol(new Ping()) - .listen(localListenAddress) + .listen("/ip4/127.0.0.1/tcp/0") .listen(relayAddr + "/p2p-circuit") .build(); serverHost.getNetwork().getTransports().stream() @@ -130,8 +128,6 @@ void pingOverLocalRelay() throws Exception { @Test void relayStreamsAreLimited() throws Exception { - String localListenAddress = "/ip4/127.0.0.1/tcp/40002"; - Host relayHost = new HostBuilder() .builderModifier(b -> enableRelay(b, Collections.emptyList())) @@ -181,7 +177,7 @@ void relayStreamsAreLimited() throws Exception { .secureChannel(NoiseXXSecureChannel::new) .muxer(StreamMuxerProtocol::getYamux) .protocol(new Blob(blobSize)) - .listen(localListenAddress) + .listen("/ip4/127.0.0.1/tcp/0") .listen(relayAddr + "/p2p-circuit") .build(); serverHost.getNetwork().getTransports().stream() @@ -215,7 +211,7 @@ void relayStreamsAreLimited() throws Exception { System.out.println("Blob controller created"); Assertions.assertThrows( - ExecutionException.class, () -> blobCtr.blob().get(5, TimeUnit.SECONDS)); + ExecutionException.class, () -> blobCtr.blob().get(30, TimeUnit.SECONDS)); clientHost.stop().get(5, TimeUnit.SECONDS); System.out.println("Client stopped"); diff --git a/libp2p/src/test/kotlin/io/libp2p/discovery/MDnsDiscoveryTest.kt b/libp2p/src/test/kotlin/io/libp2p/discovery/MDnsDiscoveryTest.kt index b0cfa93c6..f0acd5790 100644 --- a/libp2p/src/test/kotlin/io/libp2p/discovery/MDnsDiscoveryTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/discovery/MDnsDiscoveryTest.kt @@ -24,6 +24,18 @@ class MDnsDiscoveryTest { } } + val hostIpv6 = object : NullHost() { + override val peerId: PeerId = PeerId.fromPubKey( + generateEcdsaKeyPair().second + ) + + override fun listenAddresses(): List { + return listOf( + Multiaddr("/ip6/::/tcp/4001") + ) + } + } + val otherHost = object : NullHost() { override val peerId: PeerId = PeerId.fromPubKey( generateEcdsaKeyPair().second @@ -47,6 +59,15 @@ class MDnsDiscoveryTest { discoverer.stop().get(1, TimeUnit.SECONDS) } + @Test + fun `start and stop discovery ipv6`() { + val discoverer = MDnsDiscovery(hostIpv6, testServiceTag) + + discoverer.start().get(1, TimeUnit.SECONDS) + TimeUnit.MILLISECONDS.sleep(100) + discoverer.stop().get(1, TimeUnit.SECONDS) + } + @Test fun `start discovery and listen for self`() { var peerInfo: PeerInfo? = null diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt index b06f713bc..cc45118e0 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/PubsubRouterTest.kt @@ -1,7 +1,13 @@ package io.libp2p.pubsub -import io.libp2p.core.pubsub.* +import io.libp2p.core.pubsub.MessageApi +import io.libp2p.core.pubsub.RESULT_INVALID +import io.libp2p.core.pubsub.RESULT_VALID +import io.libp2p.core.pubsub.Subscriber import io.libp2p.core.pubsub.Topic +import io.libp2p.core.pubsub.ValidationResult +import io.libp2p.core.pubsub.Validator +import io.libp2p.core.pubsub.createPubsubApi import io.libp2p.etc.types.seconds import io.libp2p.etc.types.toByteBuf import io.libp2p.etc.types.toBytesBigEndian @@ -10,6 +16,7 @@ import io.libp2p.pubsub.gossip.GossipRouter import io.libp2p.tools.TestChannel.TestConnection import io.netty.handler.logging.LogLevel import io.netty.util.ResourceLeakDetector +import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import pubsub.pb.Rpc @@ -279,7 +286,10 @@ abstract class PubsubRouterTest(val routerFactory: DeterministicFuzzRouterFactor doTenNeighborsTopology() } - fun doTenNeighborsTopology(randomSeed: Int = 0, routerFactory: DeterministicFuzzRouterFactory = this.routerFactory) { + fun doTenNeighborsTopology( + randomSeed: Int = 0, + routerFactory: DeterministicFuzzRouterFactory = this.routerFactory + ) { val fuzz = DeterministicFuzz().also { it.randomSeed = randomSeed.toLong() } @@ -398,6 +408,7 @@ abstract class PubsubRouterTest(val routerFactory: DeterministicFuzzRouterFactor routers[1].connectSemiDuplex(routers[2], pubsubLogs = LogLevel.ERROR) val apis = routers.map { createPubsubApi(it.router) } + class RecordingSubscriber : Subscriber { var count = 0 override fun accept(t: MessageApi) { @@ -460,4 +471,54 @@ abstract class PubsubRouterTest(val routerFactory: DeterministicFuzzRouterFactor Assertions.assertEquals(2, subs2[2].count) Assertions.assertEquals(0, subs2[3].count) } + + @Test + fun `getPeerTopics() should return immutable snapshot`() { + val fuzz = DeterministicFuzz() + + fun executeAsyncNow(asyncTask: () -> CompletableFuture): T { + val future = asyncTask() + fuzz.timeController.addTime(Duration.ofMillis(1)) + if (!future.isDone) throw AssertionError("Async task was not complete within virtual 1ms") + return future.join() + } + + val router1 = fuzz.createTestRouter(routerFactory) + val router2 = fuzz.createTestRouter(routerFactory) + router2.router.subscribe("topic1") + + router1.connectSemiDuplex(router2, LogLevel.DEBUG, LogLevel.DEBUG) + + val peerTopics1 = executeAsyncNow { router1.router.getPeerTopics() } + val peerTopics1MapIt = peerTopics1.entries.iterator() + val peerTopics1SetIt = peerTopics1.entries.first().value.iterator() + + router2.router.subscribe("topic2") + + val router3 = fuzz.createTestRouter(routerFactory) + router3.router.subscribe("topic3") + router1.connectSemiDuplex(router3, LogLevel.DEBUG, LogLevel.DEBUG) + + val peerTopics2 = executeAsyncNow { router1.router.getPeerTopics() } + + assertThat(peerTopics2) + .containsExactlyInAnyOrderEntriesOf( + mapOf( + router2.peerId to setOf("topic1", "topic2"), + router3.peerId to setOf("topic3") + ) + ) + + assertThat(peerTopics1) + .containsExactlyInAnyOrderEntriesOf( + mapOf( + router2.peerId to setOf("topic1") + ) + ) + + assertThat(peerTopics1MapIt.next().key).isEqualTo(router2.peerId) + assertThat(peerTopics1MapIt.hasNext()).isFalse() + assertThat(peerTopics1SetIt.next()).isEqualTo("topic1") + assertThat(peerTopics1SetIt.hasNext()).isFalse() + } } diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRouterListLimitsTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRouterListLimitsTest.kt index 6942cc979..81dc3d761 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRouterListLimitsTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRouterListLimitsTest.kt @@ -1,5 +1,6 @@ package io.libp2p.pubsub.gossip +import io.libp2p.pubsub.Topic import io.libp2p.pubsub.gossip.builders.GossipParamsBuilder import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder import io.libp2p.tools.protobuf.RpcBuilder @@ -35,6 +36,8 @@ class GossipRouterListLimitsTest { private val routerWithLimits = GossipRouterBuilder(params = gossipParamsWithLimits).build() private val routerWithNoLimits = GossipRouterBuilder(params = gossipParamsNoLimits).build() + private val topic: Topic = "topic1" + @Test fun validateProtobufLists_validMessage() { val msg = fullMsgBuilder().build() @@ -96,7 +99,7 @@ class GossipRouterListLimitsTest { @Test fun validateProtobufLists_tooManyIHaves() { val builder = fullMsgBuilder() - builder.addIHaves(maxIHaveLength, 1) + builder.addIHaves(maxIHaveLength, 1, topic) val msg = builder.build() Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() @@ -105,7 +108,7 @@ class GossipRouterListLimitsTest { @Test fun validateProtobufLists_tooManyIHaveMsgIds() { val builder = fullMsgBuilder() - builder.addIHaves(1, maxIHaveLength) + builder.addIHaves(1, maxIHaveLength, topic) val msg = builder.build() Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isFalse() @@ -186,7 +189,7 @@ class GossipRouterListLimitsTest { @Test fun validateProtobufLists_maxIHaves() { val builder = fullMsgBuilder() - builder.addIHaves(maxIHaveLength - 1, 1) + builder.addIHaves(maxIHaveLength - 1, 1, topic) val msg = builder.build() Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() @@ -195,7 +198,7 @@ class GossipRouterListLimitsTest { @Test fun validateProtobufLists_maxIHaveMsgIds() { val builder = fullMsgBuilder() - builder.addIHaves(1, maxIHaveLength - 1) + builder.addIHaves(1, maxIHaveLength - 1, topic) val msg = builder.build() Assertions.assertThat(routerWithLimits.validateMessageListLimits(msg)).isTrue() @@ -256,7 +259,7 @@ class GossipRouterListLimitsTest { // Add some data to all possible fields builder.addSubscriptions(listSize) builder.addPublishMessages(listSize, listSize) - builder.addIHaves(listSize, listSize) + builder.addIHaves(listSize, listSize, topic) builder.addIWants(listSize, listSize) builder.addGrafts(listSize) builder.addPrunes(listSize, listSize) diff --git a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueueTest.kt b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueueTest.kt index c5cc7c85c..5b6b35e55 100644 --- a/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueueTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/pubsub/gossip/GossipRpcPartsQueueTest.kt @@ -3,6 +3,7 @@ package io.libp2p.pubsub.gossip import io.libp2p.core.PeerId import io.libp2p.etc.types.toProtobuf import io.libp2p.etc.types.toWBytes +import io.libp2p.pubsub.Topic import io.libp2p.pubsub.gossip.builders.GossipParamsBuilder import io.libp2p.pubsub.gossip.builders.GossipRouterBuilder import org.assertj.core.api.Assertions.assertThat @@ -49,7 +50,7 @@ class GossipRpcPartsQueueTest { queue.addPublish(createRpcMessage("topic-$it", "data")) } (1..iHaves).forEach { - queue.addIHave(byteArrayOf(it.toByte()).toWBytes()) + queue.addIHave(byteArrayOf(it.toByte()).toWBytes(), "topic-$it") } (1..iWants).forEach { queue.addIWant(byteArrayOf(it.toByte()).toWBytes()) @@ -259,4 +260,50 @@ class GossipRpcPartsQueueTest { assertThat(msgs).hasSize(3) assertThat(msgs.merge()).isEqualTo(single) } + + @Test + fun `check that resulting IHAVE sets the topic ID`() { + val topic1: Topic = "topic1" + val messageId1 = "1111".toWBytes() + val topic2: Topic = "topic2" + val messageId2 = "2222".toWBytes() + val partsQueue = TestGossipQueue(gossipParamsWithLimits) + partsQueue.addIHave(messageId1, topic1) + partsQueue.addIHave(messageId2, topic2) + val res = partsQueue.takeMerged().first() + + val serialized = res.toByteArray() + val deserializedRpc = Rpc.RPC.parseFrom(serialized) + assertThat(deserializedRpc.control.ihaveList).containsExactlyInAnyOrder( + Rpc.ControlIHave.newBuilder().setTopicID(topic1).addMessageIDs(messageId1.toProtobuf()).build(), + Rpc.ControlIHave.newBuilder().setTopicID(topic2).addMessageIDs(messageId2.toProtobuf()).build(), + ) + } + + @Test + fun `check that resulting IHAVE correctly groups topics`() { + val partsQueue = TestGossipQueue(gossipParamsWithLimits) + + partsQueue.addIHave("1111".toWBytes(), "topic1") + partsQueue.addIHave("2222".toWBytes(), "topic2") + partsQueue.addIHave("3333".toWBytes(), "topic1") + + val res = partsQueue.takeMerged().first() + + val serialized = res.toByteArray() + val deserializedRpc = Rpc.RPC.parseFrom(serialized) + assertThat(deserializedRpc.control.ihaveList).containsExactlyInAnyOrder( + Rpc.ControlIHave.newBuilder() + .setTopicID("topic1") + .addAllMessageIDs( + listOf( + "1111".toWBytes().toProtobuf(), + "3333".toWBytes().toProtobuf() + ) + ).build(), + Rpc.ControlIHave.newBuilder() + .setTopicID("topic2") + .addMessageIDs("2222".toWBytes().toProtobuf()).build(), + ) + } } diff --git a/libp2p/src/test/kotlin/io/libp2p/security/CipherSecureChannelTest.kt b/libp2p/src/test/kotlin/io/libp2p/security/CipherSecureChannelTest.kt index bfb68a36e..b7290a6a5 100644 --- a/libp2p/src/test/kotlin/io/libp2p/security/CipherSecureChannelTest.kt +++ b/libp2p/src/test/kotlin/io/libp2p/security/CipherSecureChannelTest.kt @@ -16,6 +16,33 @@ import java.util.concurrent.TimeUnit.SECONDS abstract class CipherSecureChannelTest(secureChannelCtor: SecureChannelCtor, muxers: List, announce: String) : SecureChannelTestBase(secureChannelCtor, muxers, announce) { + @Test + fun `verify secure session`() { + val (privKey1, pubKey1) = generateKeyPair(KeyType.ECDSA) + val (privKey2, pubKey2) = generateKeyPair(KeyType.ECDSA) + + val protocolSelect1 = makeSelector(privKey1, muxerIds) + val protocolSelect2 = makeSelector(privKey2, muxerIds) + + val eCh1 = makeDialChannel("#1", protocolSelect1, PeerId.fromPubKey(pubKey2)) + val eCh2 = makeListenChannel("#2", protocolSelect2) + + logger.debug("Connecting channels...") + val connection = TestChannel.interConnect(eCh1, eCh2) + + val secSession1 = protocolSelect1.selectedFuture.join() + assertThat(secSession1.localId).isEqualTo(PeerId.fromPubKey(pubKey1)) + assertThat(secSession1.remoteId).isEqualTo(PeerId.fromPubKey(pubKey2)) + assertThat(secSession1.remotePubKey).isEqualTo(pubKey2) + + val secSession2 = protocolSelect2.selectedFuture.join() + assertThat(secSession2.localId).isEqualTo(PeerId.fromPubKey(pubKey2)) + assertThat(secSession2.remoteId).isEqualTo(PeerId.fromPubKey(pubKey1)) + assertThat(secSession2.remotePubKey).isEqualTo(pubKey1) + + logger.debug("Connection made: $connection") + } + @Test fun `incorrect initiator remote PeerId should throw`() { val (privKey1, _) = generateKeyPair(KeyType.ECDSA) diff --git a/libp2p/src/testFixtures/kotlin/io/libp2p/tools/protobuf/RpcBuilder.kt b/libp2p/src/testFixtures/kotlin/io/libp2p/tools/protobuf/RpcBuilder.kt index a4c55bba6..4da90ef85 100644 --- a/libp2p/src/testFixtures/kotlin/io/libp2p/tools/protobuf/RpcBuilder.kt +++ b/libp2p/src/testFixtures/kotlin/io/libp2p/tools/protobuf/RpcBuilder.kt @@ -1,6 +1,7 @@ package io.libp2p.tools.protobuf import io.libp2p.etc.types.toProtobuf +import io.libp2p.pubsub.Topic import pubsub.pb.Rpc import kotlin.random.Random @@ -28,9 +29,9 @@ class RpcBuilder { } } - fun addIHaves(iHaveCount: Int, messageIdCount: Int) { + fun addIHaves(iHaveCount: Int, messageIdCount: Int, topic: Topic) { for (i in 0 until iHaveCount) { - val iHaveBuilder = Rpc.ControlIHave.newBuilder() + val iHaveBuilder = Rpc.ControlIHave.newBuilder().setTopicID(topic) for (j in 0 until messageIdCount) { iHaveBuilder.addMessageIDs(Random.nextBytes(6).toProtobuf()) } diff --git a/versions.gradle b/versions.gradle index 93cfded5e..497ef091f 100644 --- a/versions.gradle +++ b/versions.gradle @@ -31,7 +31,7 @@ dependencyManagement { entry 'protobuf-java' entry 'protoc' } - dependencySet(group: "io.netty", version: "4.1.97.Final") { + dependencySet(group: "io.netty", version: "4.1.108.Final") { entry 'netty-common' entry 'netty-handler' entry 'netty-transport'