Skip to content

Commit

Permalink
Merge pull request #594 from Semisol/feature/3ph-base
Browse files Browse the repository at this point in the history
3PH part 1: Prepare for 3PH support
  • Loading branch information
lthibault authored Dec 8, 2024
2 parents 46ccd63 + a1ef2e9 commit d491d2f
Show file tree
Hide file tree
Showing 22 changed files with 11,932 additions and 556 deletions.
102 changes: 73 additions & 29 deletions rpc/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,25 @@ type PeerID struct {
// attached to the message via SCM_RIGHTS. This file descriptor would be one
// end of a newly-created socketpair, with the other end having been sent to the
// process hosting the capability in RecipientId.
type ThirdPartyCapID capnp.Ptr
//
// Some networks, as an optimization, may permit ThirdPartyToContact to be
// forwarded across multiple vats. For example, imagine Alice sends a capability
// to Bob, who passes it along to Carol, who further pass it to Dave. Bob will send
// a `Provide` message to Alice telling her to expect the capability to be picked
// up by Carol, and then will pass Carol a `ThirdPartyToContact` pointing to Alice.
// If `ThirdPartyToContact` is non-forwardable, then Carol must form a connection
// to Alice, send an `Accept` to receive the capability, and then immediately send
// a `Provide` to provide it to Dave, before then being able to give a
// `ThirdPartyToContact` to Dave which points to Alice. This is a bit of a waste.
// If `ThirdPartyToContact` is forwardable, then Carol can simply pass it along to
// Dave without making any connection to Alice. Some VatNetwork implementations may
// require that Carol add a signature to the `ThirdPartyToContact` authenticating
// that she really did forward it to Dave, which Dave will then present back to
// Alice. Other implementations may simply pass along an unguessable token and
// instruct Alice that whoever presents the token should receive the capability.
// A VatNetwork may choose not to allow forwarding if it doesn't want its security
// to be dependent on secret bearer tokens nor cryptographic signatures.
type ThirdPartyToContact capnp.Ptr

// The information that must be sent in a `Provide` message to identify the
// recipient of the capability.
Expand All @@ -41,7 +59,7 @@ type ThirdPartyCapID capnp.Ptr
// attached to the message via SCM_RIGHTS. This file descriptor would be one
// end of a newly-created socketpair, with the other end having been sent to the
// capability's recipient in ThirdPartyCapId.
type RecipientID capnp.Ptr
type ThirdPartyToAwait capnp.Ptr

// The information that must be sent in an `Accept` message to identify the
// object being accepted.
Expand All @@ -50,46 +68,72 @@ type RecipientID capnp.Ptr
// be the public key fingerprint of the provider vat along with a nonce matching
// the one in the `RecipientId` used in the `Provide` message sent from that
// provider.
type ProvisionID capnp.Ptr
type ThirdPartyCompletion capnp.Ptr

// Data needed to perform a third-party handoff, returned by
// Newtork.Introduce.
// Data needed to perform a third-party handoff.
type IntroductionInfo struct {
SendToRecipient ThirdPartyCapID
SendToProvider RecipientID
SendToProvider ThirdPartyToAwait
SendToRecipient ThirdPartyToContact
}

// A Network is a reference to a multi-party (generally >= 3) network
// of Cap'n Proto peers. Use this instead of NewConn when establishing
// connections outside a point-to-point setting.
//
// In addition to satisfying the method set, a correct implementation
// of Network must be comparable.
type Network interface {
// Return the identifier for caller on this network.
LocalID() PeerID

// Connect to another peer by ID. The supplied Options are used
// for the connection, with the values for RemotePeerID and Network
// overridden by the Network.
Dial(PeerID, *Options) (*Conn, error)
// Connect to another peer by ID. Re-uses any existing connection
// to the peer.
Dial(PeerID) (*Conn, error)

// Accept and handle incoming connections on the network until
// the context is canceled.
Serve(context.Context) error
}

// Accept the next incoming connection on the network, using the
// supplied Options for the connection. Generally, callers will
// want to invoke this in a loop when launching a server.
Accept(context.Context, *Options) (*Conn, error)
// A Network3PH is a Network which supports three-party handoff of capabilities.
// TODO(before merge): could this interface be named better?
type Network3PH interface {
// Introduces both connections for a three-party handoff. After this,
// the `ThirdPartyToAwait` will be sent to the `provider` and the
// `ThirdPartyToContact` will be sent to the `recipient`.
//
// An error indicates introduction is not possible between the two `Conn`s.
Introduce(provider *Conn, recipient *Conn) (IntroductionInfo, error)

// Introduce the two connections, in preparation for a third party
// handoff. Afterwards, a Provide messsage should be sent to
// provider, and a ThirdPartyCapId should be sent to recipient.
Introduce(provider, recipient *Conn) (IntroductionInfo, error)
// Attempts forwarding of a `ThirdPartyToContact` received from `from` to
// `destination`, with both vats being in this Network. This method
// return a `ThirdPartyToContact` to send to `destination`.
//
// An error indicates forwarding is not possible.
Forward(from *Conn, destination *Conn, info ThirdPartyToContact) (ThirdPartyToContact, error)

// Given a ThirdPartyCapID, received from introducedBy, connect
// to the third party. The caller should then send an Accept
// message over the returned Connection.
DialIntroduced(capID ThirdPartyCapID, introducedBy *Conn) (*Conn, ProvisionID, error)
// Completes a three-party handoff.
//
// The provided `completion` has been received from `conn` in an `Accept`.
//
// This method blocks until there is a matching `AwaitThirdParty`, if there is
// none currently, and returns the `value` passed to it.
//
// An error indicates that this completion can never succeed, for example due
// to a `completion` that is malformed. The error will be sent in response to the
// `Accept`.
CompleteThirdParty(ctx context.Context, conn *Conn, completion ThirdPartyCompletion) (any, error)

// Given a RecipientID received in a Provide message via
// introducedBy, wait for the recipient to connect, and
// return the connection formed. If there is already an
// established connection to the relevant Peer, this
// SHOULD return the existing connection immediately.
AcceptIntroduced(recipientID RecipientID, introducedBy *Conn) (*Conn, error)
// Awaits for completion of a three-party handoff.
//
// The provided `await` has been received from `conn`.
//
// While the context is valid, any `CompleteThirdParty` calls that match
// the provided `await` should return `value`.
//
// After the context is canceled, future calls to `CompleteThirdParty` are
// not required to return the provided `value`.
//
// This method SHOULD not block.
AwaitThirdParty(ctx context.Context, conn *Conn, await ThirdPartyToAwait, value any)
}
4 changes: 2 additions & 2 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ func (c *lockedConn) parseReturn(dq *deferred.Queue, ret rpccp.Return, called []
return parsedReturn{err: rpcerr.WrapFailed("parse return", err), parseFailed: true}
}
return parsedReturn{err: exc.New(exc.Type(e.Type()), "", reason)}
case rpccp.Return_Which_acceptFromThirdParty:
case rpccp.Return_Which_awaitFromThirdParty:
// TODO: 3PH. Can wait until after the MVP, because we can keep
// setting allowThirdPartyTailCall = false
fallthrough
Expand Down Expand Up @@ -1742,7 +1742,7 @@ func (c *Conn) handleDisembargo(ctx context.Context, in transport.IncomingMessag
})
})

case rpccp.Disembargo_context_Which_accept, rpccp.Disembargo_context_Which_provide:
case rpccp.Disembargo_context_Which_accept:
if c.network != nil {
panic("TODO: 3PH")
}
Expand Down
1 change: 0 additions & 1 deletion std/capnp/c++.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ annotation allowCancellation(interface, method, file) :Void;
# If your code is not cancellation-safe, then allowing cancellation might give a malicious client
# an easy way to induce use-after-free or other bugs in your server, by requesting cancellation
# when not expected.

using Go = import "/go.capnp";
$Go.package("cxx");
$Go.import("capnproto.org/go/capnp/v3/std/capnp/cxx");
48 changes: 48 additions & 0 deletions std/capnp/compat/byte-stream.capnp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
@0x8f5d14e1c273738d;

using Cxx = import "/capnp/c++.capnp";
$Cxx.namespace("capnp");
$Cxx.allowCancellation;

interface ByteStream {
write @0 (bytes :Data) -> stream;
# Write a chunk.

end @1 ();
# Signals clean EOF. (If the ByteStream is dropped without calling this, then the stream was
# prematurely canceled and so the body should not be considered complete.)

getSubstream @2 (callback :SubstreamCallback,
limit :UInt64 = 0xffffffffffffffff) -> (substream :ByteStream);
# This method is used to implement path shortening optimization. It is designed in particular
# with KJ streams' pumpTo() in mind.
#
# getSubstream() returns a new stream object that can be used to write to the same destination
# as this stream. The substream will operate until it has received `limit` bytes, or its `end()`
# method has been called, whichever occurs first. At that time, it invokes one of the methods of
# `callback` based on the termination condition.
#
# While a substream is active, it is an error to call write() on the original stream. Doing so
# may throw an exception or may arbitrarily interleave bytes with the substream's writes.

startTls @3 (expectedServerHostname :Text) -> stream;
# Client calls this method when it wants to initiate TLS. This ByteStream is not terminated,
# the caller should reuse it.

interface SubstreamCallback {
ended @0 (byteCount :UInt64);
# `end()` was called on the substream after writing `byteCount` bytes. The `end()` call was
# NOT forwarded to the underlying stream, which remains open.

reachedLimit @1 () -> (next :ByteStream);
# The number of bytes specified by the `limit` parameter of `getSubstream()` was reached.
# The substream will "resolve itself" to `next`, so that all future calls to the substream
# are forwarded to `next`.
#
# If the `write()` call which reached the limit included bytes past the limit, then the first
# `write()` call to `next` will be for those leftover bytes.
}
}
using Go = import "/go.capnp";
$Go.package("bytestream");
$Go.import("capnproto.org/go/capnp/v3/std/capnp/compat/bytestream");
Loading

0 comments on commit d491d2f

Please sign in to comment.