From 562ce10be9f3839cd9ce0ce3b745fed7d3b1921e Mon Sep 17 00:00:00 2001 From: Dr Ian Preston Date: Tue, 20 Feb 2024 14:38:56 +0000 Subject: [PATCH] Implement autonat protocol (#349) --- README.md | 2 +- .../protocol/autonat/AutonatProtocol.java | 172 ++++++++++++++++++ libp2p/src/main/proto/autonat.proto | 37 ++++ .../java/io/libp2p/core/AutonatTestJava.java | 73 ++++++++ .../java/io/libp2p/core/RelayTestJava.java | 8 +- 5 files changed, 285 insertions(+), 7 deletions(-) create mode 100644 libp2p/src/main/java/io/libp2p/protocol/autonat/AutonatProtocol.java create mode 100644 libp2p/src/main/proto/autonat.proto create mode 100644 libp2p/src/test/java/io/libp2p/core/AutonatTestJava.java diff --git a/README.md b/README.md index 3229f3d4..b7ec0ae0 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ List of components in the Libp2p spec and their JVM implementation status | **Stream Multiplexing** | [yamux](https://github.com/libp2p/specs/blob/master/yamux/README.md) | :lemon: | | | [mplex](https://github.com/libp2p/specs/blob/master/mplex/README.md) | :green_apple: | | **NAT Traversal** | [circuit-relay-v2](https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md) | :lemon: | -| | [autonat](https://github.com/libp2p/specs/tree/master/autonat) | | +| | [autonat](https://github.com/libp2p/specs/tree/master/autonat) | :lemon: | | | [hole-punching](https://github.com/libp2p/specs/blob/master/connections/hole-punching.md) | | | **Discovery** | [bootstrap](https://github.com/libp2p/specs/blob/master/kad-dht/README.md#bootstrap-process) | | | | random-walk | | diff --git a/libp2p/src/main/java/io/libp2p/protocol/autonat/AutonatProtocol.java b/libp2p/src/main/java/io/libp2p/protocol/autonat/AutonatProtocol.java new file mode 100644 index 00000000..5425ba16 --- /dev/null +++ b/libp2p/src/main/java/io/libp2p/protocol/autonat/AutonatProtocol.java @@ -0,0 +1,172 @@ +package io.libp2p.protocol.autonat; + +import com.google.protobuf.*; +import io.libp2p.core.*; +import io.libp2p.core.Stream; +import io.libp2p.core.multiformats.*; +import io.libp2p.core.multistream.*; +import io.libp2p.protocol.*; +import io.libp2p.protocol.autonat.pb.*; +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.*; +import org.jetbrains.annotations.*; + +public class AutonatProtocol extends ProtobufProtocolHandler { + + public static class Binding extends StrictProtocolBinding { + public Binding() { + super("/libp2p/autonat/v1.0.0", new AutonatProtocol()); + } + } + + public interface AutoNatController { + CompletableFuture rpc(Autonat.Message req); + + default CompletableFuture requestDial( + PeerId ourId, List us) { + if (us.isEmpty()) + throw new IllegalStateException("Requested autonat dial with no addresses!"); + return rpc(Autonat.Message.newBuilder() + .setType(Autonat.Message.MessageType.DIAL) + .setDial( + Autonat.Message.Dial.newBuilder() + .setPeer( + Autonat.Message.PeerInfo.newBuilder() + .addAllAddrs( + us.stream() + .map(a -> ByteString.copyFrom(a.serialize())) + .collect(Collectors.toList())) + .setId(ByteString.copyFrom(ourId.getBytes())))) + .build()) + .thenApply(msg -> msg.getDialResponse()); + } + } + + public static class Sender implements ProtocolMessageHandler, AutoNatController { + private final Stream stream; + private final LinkedBlockingDeque> queue = + new LinkedBlockingDeque<>(); + + public Sender(Stream stream) { + this.stream = stream; + } + + @Override + public void onMessage(@NotNull Stream stream, Autonat.Message msg) { + queue.poll().complete(msg); + } + + public CompletableFuture rpc(Autonat.Message req) { + CompletableFuture res = new CompletableFuture<>(); + queue.add(res); + stream.writeAndFlush(req); + return res; + } + } + + private static boolean sameIP(Multiaddr a, Multiaddr b) { + if (a.has(Protocol.IP4)) + return a.getFirstComponent(Protocol.IP4).equals(b.getFirstComponent(Protocol.IP4)); + if (a.has(Protocol.IP6)) + return a.getFirstComponent(Protocol.IP6).equals(b.getFirstComponent(Protocol.IP6)); + return false; + } + + private static boolean reachableIP(Multiaddr a) { + try { + if (a.has(Protocol.IP4)) + return InetAddress.getByName(a.getFirstComponent(Protocol.IP4).getStringValue()) + .isReachable(1000); + if (a.has(Protocol.IP6)) + return InetAddress.getByName(a.getFirstComponent(Protocol.IP6).getStringValue()) + .isReachable(1000); + } catch (IOException e) { + } + return false; + } + + public static class Receiver + implements ProtocolMessageHandler, AutoNatController { + private final Stream p2pstream; + + public Receiver(Stream p2pstream) { + this.p2pstream = p2pstream; + } + + @Override + public void onMessage(@NotNull Stream stream, Autonat.Message msg) { + switch (msg.getType()) { + case DIAL: + { + Autonat.Message.Dial dial = msg.getDial(); + PeerId peerId = new PeerId(dial.getPeer().getId().toByteArray()); + List requestedDials = + dial.getPeer().getAddrsList().stream() + .map(s -> Multiaddr.deserialize(s.toByteArray())) + .collect(Collectors.toList()); + PeerId streamPeerId = stream.remotePeerId(); + if (!peerId.equals(streamPeerId)) { + p2pstream.close(); + return; + } + + Multiaddr remote = stream.getConnection().remoteAddress(); + Optional reachable = + requestedDials.stream() + .filter(a -> sameIP(a, remote)) + .filter(a -> !a.has(Protocol.P2PCIRCUIT)) + .filter(a -> reachableIP(a)) + .findAny(); + Autonat.Message.Builder resp = + Autonat.Message.newBuilder().setType(Autonat.Message.MessageType.DIAL_RESPONSE); + if (reachable.isPresent()) { + resp = + resp.setDialResponse( + Autonat.Message.DialResponse.newBuilder() + .setStatus(Autonat.Message.ResponseStatus.OK) + .setAddr(ByteString.copyFrom(reachable.get().serialize()))); + } else { + resp = + resp.setDialResponse( + Autonat.Message.DialResponse.newBuilder() + .setStatus(Autonat.Message.ResponseStatus.E_DIAL_ERROR)); + } + p2pstream.writeAndFlush(resp); + } + default: + { + } + } + } + + public CompletableFuture rpc(Autonat.Message msg) { + return CompletableFuture.failedFuture( + new IllegalStateException("Cannot send form a receiver!")); + } + } + + private static final int TRAFFIC_LIMIT = 2 * 1024; + + public AutonatProtocol() { + super(Autonat.Message.getDefaultInstance(), TRAFFIC_LIMIT, TRAFFIC_LIMIT); + } + + @NotNull + @Override + protected CompletableFuture onStartInitiator(@NotNull Stream stream) { + Sender replyPropagator = new Sender(stream); + stream.pushHandler(replyPropagator); + return CompletableFuture.completedFuture(replyPropagator); + } + + @NotNull + @Override + protected CompletableFuture onStartResponder(@NotNull Stream stream) { + Receiver dialer = new Receiver(stream); + stream.pushHandler(dialer); + return CompletableFuture.completedFuture(dialer); + } +} diff --git a/libp2p/src/main/proto/autonat.proto b/libp2p/src/main/proto/autonat.proto new file mode 100644 index 00000000..0e92a517 --- /dev/null +++ b/libp2p/src/main/proto/autonat.proto @@ -0,0 +1,37 @@ +syntax = "proto2"; + +package io.libp2p.protocol.autonat.pb; + +message Message { + enum MessageType { + DIAL = 0; + DIAL_RESPONSE = 1; + } + + enum ResponseStatus { + OK = 0; + E_DIAL_ERROR = 100; + E_DIAL_REFUSED = 101; + E_BAD_REQUEST = 200; + E_INTERNAL_ERROR = 300; + } + + message PeerInfo { + optional bytes id = 1; + repeated bytes addrs = 2; + } + + message Dial { + optional PeerInfo peer = 1; + } + + message DialResponse { + optional ResponseStatus status = 1; + optional string statusText = 2; + optional bytes addr = 3; + } + + optional MessageType type = 1; + optional Dial dial = 2; + optional DialResponse dialResponse = 3; +} diff --git a/libp2p/src/test/java/io/libp2p/core/AutonatTestJava.java b/libp2p/src/test/java/io/libp2p/core/AutonatTestJava.java new file mode 100644 index 00000000..5d74c06d --- /dev/null +++ b/libp2p/src/test/java/io/libp2p/core/AutonatTestJava.java @@ -0,0 +1,73 @@ +package io.libp2p.core; + +import io.libp2p.core.dsl.*; +import io.libp2p.core.multiformats.*; +import io.libp2p.core.mux.*; +import io.libp2p.protocol.*; +import io.libp2p.protocol.autonat.*; +import io.libp2p.protocol.autonat.pb.*; +import io.libp2p.security.noise.*; +import io.libp2p.transport.tcp.*; +import java.util.concurrent.*; +import org.junit.jupiter.api.*; + +public class AutonatTestJava { + + @Test + void autonatDial() throws Exception { + Host clientHost = + new HostBuilder() + .transport(TcpTransport::new) + .secureChannel(NoiseXXSecureChannel::new) + .muxer(StreamMuxerProtocol::getYamux) + .protocol(new Ping()) + .protocol(new AutonatProtocol.Binding()) + .listen("/ip4/127.0.0.1/tcp/0") + .build(); + + Host serverHost = + new HostBuilder() + .transport(TcpTransport::new) + .secureChannel(NoiseXXSecureChannel::new) + .muxer(StreamMuxerProtocol::getYamux) + .protocol(new Ping()) + .protocol(new AutonatProtocol.Binding()) + .listen("/ip4/127.0.0.1/tcp/0") + .build(); + + CompletableFuture clientStarted = clientHost.start(); + CompletableFuture serverStarted = serverHost.start(); + clientStarted.get(5, TimeUnit.SECONDS); + System.out.println("Client started"); + serverStarted.get(5, TimeUnit.SECONDS); + System.out.println("Server started"); + + StreamPromise autonat = + clientHost + .getNetwork() + .connect(serverHost.getPeerId(), serverHost.listenAddresses().get(0)) + .thenApply(it -> it.muxerSession().createStream(new AutonatProtocol.Binding())) + .get(5, TimeUnit.SECONDS); + + Stream autonatStream = autonat.getStream().get(5, TimeUnit.SECONDS); + System.out.println("Autonat stream created"); + AutonatProtocol.AutoNatController autonatCtr = autonat.getController().get(5, TimeUnit.SECONDS); + System.out.println("Autonat controller created"); + + Autonat.Message.DialResponse resp = + autonatCtr + .requestDial(clientHost.getPeerId(), clientHost.listenAddresses()) + .get(5, TimeUnit.SECONDS); + Assertions.assertEquals(resp.getStatus(), Autonat.Message.ResponseStatus.OK); + Multiaddr received = Multiaddr.deserialize(resp.getAddr().toByteArray()); + Assertions.assertEquals(received, clientHost.listenAddresses().get(0)); + + autonatStream.close().get(5, TimeUnit.SECONDS); + System.out.println("Autonat stream closed"); + + clientHost.stop().get(5, TimeUnit.SECONDS); + System.out.println("Client stopped"); + serverHost.stop().get(5, TimeUnit.SECONDS); + System.out.println("Server stopped"); + } +} diff --git a/libp2p/src/test/java/io/libp2p/core/RelayTestJava.java b/libp2p/src/test/java/io/libp2p/core/RelayTestJava.java index 03ce8c28..ff3f2a71 100644 --- a/libp2p/src/test/java/io/libp2p/core/RelayTestJava.java +++ b/libp2p/src/test/java/io/libp2p/core/RelayTestJava.java @@ -31,8 +31,6 @@ private static void enableRelay(BuilderJ b, List @Test void pingOverLocalRelay() throws Exception { - String localListenAddress = "/ip4/127.0.0.1/tcp/40002"; - Host relayHost = new HostBuilder() .builderModifier(b -> enableRelay(b, Collections.emptyList())) @@ -79,7 +77,7 @@ void pingOverLocalRelay() throws Exception { .secureChannel(NoiseXXSecureChannel::new) .muxer(StreamMuxerProtocol::getYamux) .protocol(new Ping()) - .listen(localListenAddress) + .listen("/ip4/127.0.0.1/tcp/0") .listen(relayAddr + "/p2p-circuit") .build(); serverHost.getNetwork().getTransports().stream() @@ -130,8 +128,6 @@ void pingOverLocalRelay() throws Exception { @Test void relayStreamsAreLimited() throws Exception { - String localListenAddress = "/ip4/127.0.0.1/tcp/40002"; - Host relayHost = new HostBuilder() .builderModifier(b -> enableRelay(b, Collections.emptyList())) @@ -181,7 +177,7 @@ void relayStreamsAreLimited() throws Exception { .secureChannel(NoiseXXSecureChannel::new) .muxer(StreamMuxerProtocol::getYamux) .protocol(new Blob(blobSize)) - .listen(localListenAddress) + .listen("/ip4/127.0.0.1/tcp/0") .listen(relayAddr + "/p2p-circuit") .build(); serverHost.getNetwork().getTransports().stream()