Skip to content

Commit

Permalink
feat: Complete parcel collection (#29)
Browse files Browse the repository at this point in the history
For example, here's how a private endpoint could use this:

```kotlin
val port = 13276 // On Android; 276 on desktop
val client = PoWebClient.initLocal(port)

// Specify the cert and private key of each endpoint. Needed to complete the handshake.
val nonceSigners = arrayOf(
    NonceSigner(endpointCertificate, endpointKeyPair.private)
)

runBlocking {
    client.use {
        client.collectParcels(nonceSigners).map {
            val parcel = try {
                it.deserializeAndValidateParcel()
            } catch (exc: RelaynetException) {
                it.ack()
                logWarning("Invalid parcel received", exc)
                return@map
            }

            val path = generateRandomPath()
            File(path).writeBytes(it.parcelSerialized)
            saveToDB(parcel, path)

            // ACK once the parcel has been safely processed/stored
            it.ack()
        }
    }
}
```

`collectParcels()` takes an optional argument (`StreamingMode`). It defaults to `StreamingMode.KeepAlive`.
  • Loading branch information
gnarea authored Aug 21, 2020
1 parent eed0150 commit 908c690
Show file tree
Hide file tree
Showing 18 changed files with 838 additions and 265 deletions.
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ buildscript {
ext.kotlinVersion = '1.4.0'
ext.protobufVersion = '3.13.0'
ext.protobufGradleVersion = '0.8.12'
ext.kotlinCoroutinesVersion = '1.3.7'
ext.kotlinCoroutinesVersion = '1.3.8'
ext.ktorVersion = '1.4.0'
ext.okhttpVersion = '4.8.1'
}
Expand Down Expand Up @@ -33,7 +33,7 @@ dependencies {

implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlinVersion")

api('tech.relaycorp:relaynet:1.34.1')
api('tech.relaycorp:relaynet:1.35.0')

// Handshake nonce signatures
implementation("org.bouncycastle:bcpkix-jdk15on:1.66")
Expand All @@ -57,6 +57,7 @@ dependencies {
testImplementation("org.jetbrains.kotlin:kotlin-test-junit5")
testImplementation("com.nhaarman.mockitokotlin2:mockito-kotlin:2.2.0")
testImplementation("org.mockito:mockito-inline:3.5.2")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:$kotlinCoroutinesVersion")
}

java {
Expand Down
152 changes: 133 additions & 19 deletions src/main/kotlin/tech/relaycorp/poweb/PoWebClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,175 @@ import io.ktor.client.engine.okhttp.OkHttp
import io.ktor.client.features.websocket.DefaultClientWebSocketSession
import io.ktor.client.features.websocket.WebSockets
import io.ktor.client.features.websocket.webSocket
import io.ktor.client.request.header
import io.ktor.http.cio.websocket.CloseReason
import io.ktor.http.cio.websocket.Frame
import io.ktor.http.cio.websocket.close
import io.ktor.http.cio.websocket.readBytes
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.flow
import tech.relaycorp.poweb.handshake.Challenge
import tech.relaycorp.poweb.handshake.InvalidMessageException
import tech.relaycorp.poweb.handshake.NonceSigner
import tech.relaycorp.poweb.handshake.InvalidChallengeException
import tech.relaycorp.poweb.handshake.Response
import tech.relaycorp.relaynet.bindings.pdc.NonceSigner
import tech.relaycorp.relaynet.bindings.pdc.ParcelCollection
import tech.relaycorp.relaynet.bindings.pdc.StreamingMode
import tech.relaycorp.relaynet.messages.InvalidMessageException
import tech.relaycorp.relaynet.messages.control.ParcelDelivery
import tech.relaycorp.relaynet.wrappers.x509.Certificate
import java.io.Closeable
import java.io.EOFException
import java.net.ConnectException

/**
* PoWeb client.
*
* @param hostName The IP address or domain for the PoWeb server
* @param port The port for the PoWeb server
* @param useTls Whether the PoWeb server uses TLS
*
* The underlying connection is created lazily.
*/
@KtorExperimentalAPI
public class PoWebClient internal constructor(
internal val hostName: String,
internal val port: Int,
internal val useTls: Boolean
) : Closeable {
@KtorExperimentalAPI
internal var ktorClient = HttpClient(OkHttp) {
install(WebSockets)
}

@KtorExperimentalAPI
private val wsScheme = if (useTls) "wss" else "ws"

/**
* Close the underlying connection to the server (if any).
*/
override fun close(): Unit = ktorClient.close()

private val wsUrl = "ws${if (useTls) "s" else ""}://$hostName:$port"
/**
* Collect parcels on behalf of the specified nodes.
*
* @param nonceSigners The nonce signers for each node whose parcels should be collected
* @param streamingMode Which streaming mode to ask the server to use
*/
@Throws(
ServerConnectionException::class,
InvalidServerMessageException::class,
NonceSignerException::class
)
public suspend fun collectParcels(
nonceSigners: Array<NonceSigner>,
streamingMode: StreamingMode = StreamingMode.KeepAlive
): Flow<ParcelCollection> = flow {
if (nonceSigners.isEmpty()) {
throw NonceSignerException("At least one nonce signer must be specified")
}

val trustedCertificates = nonceSigners.map { it.certificate }
val streamingModeHeader = Pair(StreamingMode.HEADER_NAME, streamingMode.headerValue)
wsConnect(PARCEL_COLLECTION_ENDPOINT_PATH, listOf(streamingModeHeader)) {
try {
handshake(nonceSigners)
} catch (exc: ClosedReceiveChannelException) {
// Alert the client to the fact that the server closed the connection before
// completing the handshake. Otherwise, the client will assume that the operation
// succeeded and there were no parcels to collect.
throw ServerConnectionException(
"Server closed the connection during the handshake",
exc
)
}
collectAndAckParcels(this, this@flow, trustedCertificates)

// The server must've closed the connection for us to get here, since we're consuming
// all incoming messages indefinitely.
val reason = closeReason.await()!!
if (reason.code != CloseReason.Codes.NORMAL.code) {
throw ServerConnectionException(
"Server closed the connection unexpectedly " +
"(code: ${reason.code}, reason: ${reason.message})"
)
}
}
}

@Throws(PoWebException::class)
private suspend fun collectAndAckParcels(
webSocketSession: DefaultClientWebSocketSession,
flowCollector: FlowCollector<ParcelCollection>,
trustedCertificates: List<Certificate>
) {
for (frame in webSocketSession.incoming) {
val delivery = try {
ParcelDelivery.deserialize(frame.readBytes())
} catch (exc: InvalidMessageException) {
webSocketSession.close(
CloseReason(CloseReason.Codes.VIOLATED_POLICY, "Invalid parcel delivery")
)
throw InvalidServerMessageException("Received invalid message from server", exc)
}
val collector = ParcelCollection(delivery.parcelSerialized, trustedCertificates) {
webSocketSession.outgoing.send(Frame.Text(delivery.deliveryId))
}
flowCollector.emit(collector)
}
}

@KtorExperimentalAPI
internal suspend fun wsConnect(
path: String,
headers: List<Pair<String, String>>? = null,
block: suspend DefaultClientWebSocketSession.() -> Unit
) = ktorClient.webSocket("${wsUrl}$path", block = block)
) = try {
ktorClient.webSocket(
"$wsScheme://$hostName:$port$path",
{ headers?.forEach { header(it.first, it.second) } },
block
)
} catch (exc: ConnectException) {
throw ServerConnectionException("Server is unreachable", exc)
} catch (exc: EOFException) {
throw ServerConnectionException("Connection was closed abruptly", exc)
}

public companion object {
private const val defaultLocalPort = 276
private const val defaultRemotePort = 443
internal const val PARCEL_COLLECTION_ENDPOINT_PATH = "/v1/parcel-collection"

public fun initLocal(port: Int = defaultLocalPort): PoWebClient =
PoWebClient("127.0.0.1", port, false)
private const val DEFAULT_LOCAL_PORT = 276
private const val DEFAULT_REMOTE_PORT = 443

public fun initRemote(hostName: String, port: Int = defaultRemotePort): PoWebClient =
PoWebClient(hostName, port, true)
/**
* Connect to a private gateway from a private endpoint.
*
* @param port The port for the PoWeb server
*
* TLS won't be used.
*/
public fun initLocal(port: Int = DEFAULT_LOCAL_PORT): PoWebClient =
PoWebClient("127.0.0.1", port, false)

/**
* Connect to a public gateway from a private gateway via TLS.
*
* @param hostName The IP address or domain for the PoWeb server
* @param port The port for the PoWeb server
*/
public fun initRemote(hostName: String, port: Int = DEFAULT_REMOTE_PORT): PoWebClient =
PoWebClient(hostName, port, true)
}
}

@Throws(PoWebException::class)
internal suspend fun DefaultClientWebSocketSession.handshake(nonceSigners: Array<NonceSigner>) {
if (nonceSigners.isEmpty()) {
throw PoWebException("At least one nonce signer must be specified")
}
private suspend fun DefaultClientWebSocketSession.handshake(nonceSigners: Array<NonceSigner>) {
val challengeRaw = incoming.receive()
val challenge = try {
Challenge.deserialize(challengeRaw.readBytes())
} catch (exc: InvalidMessageException) {
} catch (exc: InvalidChallengeException) {
close(CloseReason(CloseReason.Codes.VIOLATED_POLICY, ""))
throw PoWebException("Server sent an invalid handshake challenge", exc)
throw InvalidServerMessageException("Server sent an invalid handshake challenge", exc)
}
val nonceSignatures = nonceSigners.map { it.sign(challenge.nonce) }.toTypedArray()
val response = Response(nonceSignatures)
Expand Down
32 changes: 31 additions & 1 deletion src/main/kotlin/tech/relaycorp/poweb/PoWebException.kt
Original file line number Diff line number Diff line change
@@ -1,3 +1,33 @@
package tech.relaycorp.poweb

public class PoWebException(message: String, cause: Throwable? = null) : Exception(message, cause)
public abstract class PoWebException internal constructor(
message: String,
cause: Throwable? = null
) : Exception(message, cause)

/**
* Base class for connectivity errors and errors caused by the server.
*/
public sealed class ServerException(message: String, cause: Throwable?) :
PoWebException(message, cause)

/**
* Error before or while connected to the server.
*
* The client should retry later.
*/
public class ServerConnectionException(message: String, cause: Throwable? = null) :
ServerException(message, cause)

/**
* The server sent an invalid message.
*
* The server didn't adhere to the protocol. Retrying later is unlikely to make a difference.
*/
public class InvalidServerMessageException(message: String, cause: Throwable) :
ServerException(message, cause)

/**
* The client made a mistake while specifying the nonce signer(s).
*/
public class NonceSignerException(message: String) : PoWebException(message)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class Challenge(public val nonce: ByteArray) {
val pbChallenge = try {
PBChallenge.parseFrom(serialization)
} catch (_: InvalidProtocolBufferException) {
throw InvalidMessageException("Message is not a valid challenge")
throw InvalidChallengeException("Message is not a valid challenge")
}
return Challenge(pbChallenge.gatewayNonce.toByteArray())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package tech.relaycorp.poweb.handshake

public class InvalidChallengeException(message: String) : Exception(message)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package tech.relaycorp.poweb.handshake

public class InvalidResponseException(message: String) : Exception(message)
15 changes: 0 additions & 15 deletions src/main/kotlin/tech/relaycorp/poweb/handshake/NonceSigner.kt

This file was deleted.

6 changes: 3 additions & 3 deletions src/main/kotlin/tech/relaycorp/poweb/handshake/Response.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import tech.relaycorp.poweb.internal.protobuf_messages.handshake.Response as PBR
public class Response(public val nonceSignatures: Array<ByteArray>) {
public fun serialize(): ByteArray {
val pbResponse = PBResponse.newBuilder()
.addAllGatewayNonceSignatures(nonceSignatures.map { ByteString.copyFrom(it) })
.build()
.addAllGatewayNonceSignatures(nonceSignatures.map { ByteString.copyFrom(it) })
.build()
return pbResponse.toByteArray()
}

Expand All @@ -17,7 +17,7 @@ public class Response(public val nonceSignatures: Array<ByteArray>) {
val pbResponse = try {
PBResponse.parseFrom(serialization)
} catch (_: InvalidProtocolBufferException) {
throw InvalidMessageException("Message is not a valid response")
throw InvalidResponseException("Message is not a valid response")
}
val nonceSignatures = pbResponse.gatewayNonceSignaturesList.map { it.toByteArray() }
return Response(nonceSignatures.toTypedArray())
Expand Down
Loading

0 comments on commit 908c690

Please sign in to comment.