Skip to content

Commit

Permalink
Implement autonat protocol (#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
ianopolous authored Feb 20, 2024
1 parent 24982ee commit 562ce10
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 7 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ List of components in the Libp2p spec and their JVM implementation status
| **Stream Multiplexing** | [yamux](https://github.com/libp2p/specs/blob/master/yamux/README.md) | :lemon: |
| | [mplex](https://github.com/libp2p/specs/blob/master/mplex/README.md) | :green_apple: |
| **NAT Traversal** | [circuit-relay-v2](https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md) | :lemon: |
| | [autonat](https://github.com/libp2p/specs/tree/master/autonat) | |
| | [autonat](https://github.com/libp2p/specs/tree/master/autonat) | :lemon: |
| | [hole-punching](https://github.com/libp2p/specs/blob/master/connections/hole-punching.md) | |
| **Discovery** | [bootstrap](https://github.com/libp2p/specs/blob/master/kad-dht/README.md#bootstrap-process) | |
| | random-walk | |
Expand Down
172 changes: 172 additions & 0 deletions libp2p/src/main/java/io/libp2p/protocol/autonat/AutonatProtocol.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package io.libp2p.protocol.autonat;

import com.google.protobuf.*;
import io.libp2p.core.*;
import io.libp2p.core.Stream;
import io.libp2p.core.multiformats.*;
import io.libp2p.core.multistream.*;
import io.libp2p.protocol.*;
import io.libp2p.protocol.autonat.pb.*;
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
import org.jetbrains.annotations.*;

public class AutonatProtocol extends ProtobufProtocolHandler<AutonatProtocol.AutoNatController> {

public static class Binding extends StrictProtocolBinding<AutoNatController> {
public Binding() {
super("/libp2p/autonat/v1.0.0", new AutonatProtocol());
}
}

public interface AutoNatController {
CompletableFuture<Autonat.Message> rpc(Autonat.Message req);

default CompletableFuture<Autonat.Message.DialResponse> requestDial(
PeerId ourId, List<Multiaddr> us) {
if (us.isEmpty())
throw new IllegalStateException("Requested autonat dial with no addresses!");
return rpc(Autonat.Message.newBuilder()
.setType(Autonat.Message.MessageType.DIAL)
.setDial(
Autonat.Message.Dial.newBuilder()
.setPeer(
Autonat.Message.PeerInfo.newBuilder()
.addAllAddrs(
us.stream()
.map(a -> ByteString.copyFrom(a.serialize()))
.collect(Collectors.toList()))
.setId(ByteString.copyFrom(ourId.getBytes()))))
.build())
.thenApply(msg -> msg.getDialResponse());
}
}

public static class Sender implements ProtocolMessageHandler<Autonat.Message>, AutoNatController {
private final Stream stream;
private final LinkedBlockingDeque<CompletableFuture<Autonat.Message>> queue =
new LinkedBlockingDeque<>();

public Sender(Stream stream) {
this.stream = stream;
}

@Override
public void onMessage(@NotNull Stream stream, Autonat.Message msg) {
queue.poll().complete(msg);
}

public CompletableFuture<Autonat.Message> rpc(Autonat.Message req) {
CompletableFuture<Autonat.Message> res = new CompletableFuture<>();
queue.add(res);
stream.writeAndFlush(req);
return res;
}
}

private static boolean sameIP(Multiaddr a, Multiaddr b) {
if (a.has(Protocol.IP4))
return a.getFirstComponent(Protocol.IP4).equals(b.getFirstComponent(Protocol.IP4));
if (a.has(Protocol.IP6))
return a.getFirstComponent(Protocol.IP6).equals(b.getFirstComponent(Protocol.IP6));
return false;
}

private static boolean reachableIP(Multiaddr a) {
try {
if (a.has(Protocol.IP4))
return InetAddress.getByName(a.getFirstComponent(Protocol.IP4).getStringValue())
.isReachable(1000);
if (a.has(Protocol.IP6))
return InetAddress.getByName(a.getFirstComponent(Protocol.IP6).getStringValue())
.isReachable(1000);
} catch (IOException e) {
}
return false;
}

public static class Receiver
implements ProtocolMessageHandler<Autonat.Message>, AutoNatController {
private final Stream p2pstream;

public Receiver(Stream p2pstream) {
this.p2pstream = p2pstream;
}

@Override
public void onMessage(@NotNull Stream stream, Autonat.Message msg) {
switch (msg.getType()) {
case DIAL:
{
Autonat.Message.Dial dial = msg.getDial();
PeerId peerId = new PeerId(dial.getPeer().getId().toByteArray());
List<Multiaddr> requestedDials =
dial.getPeer().getAddrsList().stream()
.map(s -> Multiaddr.deserialize(s.toByteArray()))
.collect(Collectors.toList());
PeerId streamPeerId = stream.remotePeerId();
if (!peerId.equals(streamPeerId)) {
p2pstream.close();
return;
}

Multiaddr remote = stream.getConnection().remoteAddress();
Optional<Multiaddr> reachable =
requestedDials.stream()
.filter(a -> sameIP(a, remote))
.filter(a -> !a.has(Protocol.P2PCIRCUIT))
.filter(a -> reachableIP(a))
.findAny();
Autonat.Message.Builder resp =
Autonat.Message.newBuilder().setType(Autonat.Message.MessageType.DIAL_RESPONSE);
if (reachable.isPresent()) {
resp =
resp.setDialResponse(
Autonat.Message.DialResponse.newBuilder()
.setStatus(Autonat.Message.ResponseStatus.OK)
.setAddr(ByteString.copyFrom(reachable.get().serialize())));
} else {
resp =
resp.setDialResponse(
Autonat.Message.DialResponse.newBuilder()
.setStatus(Autonat.Message.ResponseStatus.E_DIAL_ERROR));
}
p2pstream.writeAndFlush(resp);
}
default:
{
}
}
}

public CompletableFuture<Autonat.Message> rpc(Autonat.Message msg) {
return CompletableFuture.failedFuture(
new IllegalStateException("Cannot send form a receiver!"));
}
}

private static final int TRAFFIC_LIMIT = 2 * 1024;

public AutonatProtocol() {
super(Autonat.Message.getDefaultInstance(), TRAFFIC_LIMIT, TRAFFIC_LIMIT);
}

@NotNull
@Override
protected CompletableFuture<AutoNatController> onStartInitiator(@NotNull Stream stream) {
Sender replyPropagator = new Sender(stream);
stream.pushHandler(replyPropagator);
return CompletableFuture.completedFuture(replyPropagator);
}

@NotNull
@Override
protected CompletableFuture<AutoNatController> onStartResponder(@NotNull Stream stream) {
Receiver dialer = new Receiver(stream);
stream.pushHandler(dialer);
return CompletableFuture.completedFuture(dialer);
}
}
37 changes: 37 additions & 0 deletions libp2p/src/main/proto/autonat.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
syntax = "proto2";

package io.libp2p.protocol.autonat.pb;

message Message {
enum MessageType {
DIAL = 0;
DIAL_RESPONSE = 1;
}

enum ResponseStatus {
OK = 0;
E_DIAL_ERROR = 100;
E_DIAL_REFUSED = 101;
E_BAD_REQUEST = 200;
E_INTERNAL_ERROR = 300;
}

message PeerInfo {
optional bytes id = 1;
repeated bytes addrs = 2;
}

message Dial {
optional PeerInfo peer = 1;
}

message DialResponse {
optional ResponseStatus status = 1;
optional string statusText = 2;
optional bytes addr = 3;
}

optional MessageType type = 1;
optional Dial dial = 2;
optional DialResponse dialResponse = 3;
}
73 changes: 73 additions & 0 deletions libp2p/src/test/java/io/libp2p/core/AutonatTestJava.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.libp2p.core;

import io.libp2p.core.dsl.*;
import io.libp2p.core.multiformats.*;
import io.libp2p.core.mux.*;
import io.libp2p.protocol.*;
import io.libp2p.protocol.autonat.*;
import io.libp2p.protocol.autonat.pb.*;
import io.libp2p.security.noise.*;
import io.libp2p.transport.tcp.*;
import java.util.concurrent.*;
import org.junit.jupiter.api.*;

public class AutonatTestJava {

@Test
void autonatDial() throws Exception {
Host clientHost =
new HostBuilder()
.transport(TcpTransport::new)
.secureChannel(NoiseXXSecureChannel::new)
.muxer(StreamMuxerProtocol::getYamux)
.protocol(new Ping())
.protocol(new AutonatProtocol.Binding())
.listen("/ip4/127.0.0.1/tcp/0")
.build();

Host serverHost =
new HostBuilder()
.transport(TcpTransport::new)
.secureChannel(NoiseXXSecureChannel::new)
.muxer(StreamMuxerProtocol::getYamux)
.protocol(new Ping())
.protocol(new AutonatProtocol.Binding())
.listen("/ip4/127.0.0.1/tcp/0")
.build();

CompletableFuture<Void> clientStarted = clientHost.start();
CompletableFuture<Void> serverStarted = serverHost.start();
clientStarted.get(5, TimeUnit.SECONDS);
System.out.println("Client started");
serverStarted.get(5, TimeUnit.SECONDS);
System.out.println("Server started");

StreamPromise<AutonatProtocol.AutoNatController> autonat =
clientHost
.getNetwork()
.connect(serverHost.getPeerId(), serverHost.listenAddresses().get(0))
.thenApply(it -> it.muxerSession().createStream(new AutonatProtocol.Binding()))
.get(5, TimeUnit.SECONDS);

Stream autonatStream = autonat.getStream().get(5, TimeUnit.SECONDS);
System.out.println("Autonat stream created");
AutonatProtocol.AutoNatController autonatCtr = autonat.getController().get(5, TimeUnit.SECONDS);
System.out.println("Autonat controller created");

Autonat.Message.DialResponse resp =
autonatCtr
.requestDial(clientHost.getPeerId(), clientHost.listenAddresses())
.get(5, TimeUnit.SECONDS);
Assertions.assertEquals(resp.getStatus(), Autonat.Message.ResponseStatus.OK);
Multiaddr received = Multiaddr.deserialize(resp.getAddr().toByteArray());
Assertions.assertEquals(received, clientHost.listenAddresses().get(0));

autonatStream.close().get(5, TimeUnit.SECONDS);
System.out.println("Autonat stream closed");

clientHost.stop().get(5, TimeUnit.SECONDS);
System.out.println("Client stopped");
serverHost.stop().get(5, TimeUnit.SECONDS);
System.out.println("Server stopped");
}
}
8 changes: 2 additions & 6 deletions libp2p/src/test/java/io/libp2p/core/RelayTestJava.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ private static void enableRelay(BuilderJ b, List<RelayTransport.CandidateRelay>

@Test
void pingOverLocalRelay() throws Exception {
String localListenAddress = "/ip4/127.0.0.1/tcp/40002";

Host relayHost =
new HostBuilder()
.builderModifier(b -> enableRelay(b, Collections.emptyList()))
Expand Down Expand Up @@ -79,7 +77,7 @@ void pingOverLocalRelay() throws Exception {
.secureChannel(NoiseXXSecureChannel::new)
.muxer(StreamMuxerProtocol::getYamux)
.protocol(new Ping())
.listen(localListenAddress)
.listen("/ip4/127.0.0.1/tcp/0")
.listen(relayAddr + "/p2p-circuit")
.build();
serverHost.getNetwork().getTransports().stream()
Expand Down Expand Up @@ -130,8 +128,6 @@ void pingOverLocalRelay() throws Exception {

@Test
void relayStreamsAreLimited() throws Exception {
String localListenAddress = "/ip4/127.0.0.1/tcp/40002";

Host relayHost =
new HostBuilder()
.builderModifier(b -> enableRelay(b, Collections.emptyList()))
Expand Down Expand Up @@ -181,7 +177,7 @@ void relayStreamsAreLimited() throws Exception {
.secureChannel(NoiseXXSecureChannel::new)
.muxer(StreamMuxerProtocol::getYamux)
.protocol(new Blob(blobSize))
.listen(localListenAddress)
.listen("/ip4/127.0.0.1/tcp/0")
.listen(relayAddr + "/p2p-circuit")
.build();
serverHost.getNetwork().getTransports().stream()
Expand Down

0 comments on commit 562ce10

Please sign in to comment.