Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add datachannel support for relay message transport. #2001

Merged
merged 8 commits into from
Mar 16, 2023
4 changes: 2 additions & 2 deletions doc/relay.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

### Relays
Relays (aka secure octo) use ICE and DTLS/SRTP between each pair of bridges, so a secure
network is not required. It uses and requires colibri websockets for the
bridge-bridge connections (endpoints can still use SCTP).
network is not required. It uses and requires either SCTP or colibri websockets for the
bridge-bridge connections.

## Jitsi Videobridge configuration

Expand Down
102 changes: 2 additions & 100 deletions jvb/src/main/kotlin/org/jitsi/videobridge/Endpoint.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.jitsi.nlj.rtp.SsrcAssociationType
import org.jitsi.nlj.rtp.VideoRtpPacket
import org.jitsi.nlj.srtp.TlsRole
import org.jitsi.nlj.stats.EndpointConnectionStats
import org.jitsi.nlj.stats.NodeStatsBlock
import org.jitsi.nlj.transform.node.ConsumerNode
import org.jitsi.nlj.util.Bandwidth
import org.jitsi.nlj.util.LocalSsrcAssociation
Expand Down Expand Up @@ -69,7 +68,8 @@ import org.jitsi.videobridge.message.SenderSourceConstraintsMessage
import org.jitsi.videobridge.message.SenderVideoConstraintsMessage
import org.jitsi.videobridge.relay.AudioSourceDesc
import org.jitsi.videobridge.rest.root.debug.EndpointDebugFeatures
import org.jitsi.videobridge.sctp.SctpConfig
import org.jitsi.videobridge.sctp.DataChannelHandler
import org.jitsi.videobridge.sctp.SctpHandler
import org.jitsi.videobridge.sctp.SctpManager
import org.jitsi.videobridge.stats.PacketTransitStats
import org.jitsi.videobridge.transport.dtls.DtlsTransport
Expand All @@ -85,13 +85,11 @@ import org.jitsi_modified.sctp4j.SctpDataCallback
import org.jitsi_modified.sctp4j.SctpServerSocket
import org.jitsi_modified.sctp4j.SctpSocket
import org.json.simple.JSONObject
import java.nio.ByteBuffer
import java.security.SecureRandom
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.Optional
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import java.util.function.Supplier
Expand Down Expand Up @@ -1205,100 +1203,4 @@ class Endpoint @JvmOverloads constructor(
bandwidthProbing.bandwidthEstimationChanged(newValue)
}
}

/**
* A node which can be placed in the pipeline to cache Data channel packets
* until the DataChannelStack is ready to handle them.
*/
private class DataChannelHandler : ConsumerNode("Data channel handler") {
private val dataChannelStackLock = Any()
private var dataChannelStack: DataChannelStack? = null
private val cachedDataChannelPackets = LinkedBlockingQueue<PacketInfo>()

public override fun consume(packetInfo: PacketInfo) {
synchronized(dataChannelStackLock) {
when (val packet = packetInfo.packet) {
is DataChannelPacket -> {
dataChannelStack?.onIncomingDataChannelPacket(
ByteBuffer.wrap(packet.buffer), packet.sid, packet.ppid
) ?: run {
cachedDataChannelPackets.add(packetInfo)
}
}
else -> Unit
}
}
}

fun setDataChannelStack(dataChannelStack: DataChannelStack) {
// Submit this to the pool since we wait on the lock and process any
// cached packets here as well

// Submit this to the pool since we wait on the lock and process any
// cached packets here as well
TaskPools.IO_POOL.execute {
// We grab the lock here so that we can set the SCTP manager and
// process any previously-cached packets as an atomic operation.
// It also prevents another thread from coming in via
// #doProcessPackets and processing packets at the same time in
// another thread, which would be a problem.
synchronized(dataChannelStackLock) {
this.dataChannelStack = dataChannelStack
cachedDataChannelPackets.forEach {
val dcp = it.packet as DataChannelPacket
dataChannelStack.onIncomingDataChannelPacket(
ByteBuffer.wrap(dcp.buffer), dcp.sid, dcp.ppid
)
}
}
}
}

override fun trace(f: () -> Unit) = f.invoke()
}

/**
* A node which can be placed in the pipeline to cache SCTP packets until
* the SCTPManager is ready to handle them.
*/
private class SctpHandler : ConsumerNode("SCTP handler") {
private val sctpManagerLock = Any()
private var sctpManager: SctpManager? = null
private val numCachedSctpPackets = AtomicLong(0)
private val cachedSctpPackets = LinkedBlockingQueue<PacketInfo>(100)

override fun consume(packetInfo: PacketInfo) {
synchronized(sctpManagerLock) {
if (SctpConfig.config.enabled) {
sctpManager?.handleIncomingSctp(packetInfo) ?: run {
numCachedSctpPackets.incrementAndGet()
cachedSctpPackets.add(packetInfo)
}
}
}
}

override fun getNodeStats(): NodeStatsBlock = super.getNodeStats().apply {
addNumber("num_cached_packets", numCachedSctpPackets.get())
}

fun setSctpManager(sctpManager: SctpManager) {
// Submit this to the pool since we wait on the lock and process any
// cached packets here as well
TaskPools.IO_POOL.execute {
// We grab the lock here so that we can set the SCTP manager and
// process any previously-cached packets as an atomic operation.
// It also prevents another thread from coming in via
// #doProcessPackets and processing packets at the same time in
// another thread, which would be a problem.
synchronized(sctpManagerLock) {
this.sctpManager = sctpManager
cachedSctpPackets.forEach { sctpManager.handleIncomingSctp(it) }
cachedSctpPackets.clear()
}
}
}

override fun trace(f: () -> Unit) = f.invoke()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ class Colibri2ConferenceHandler(
}
for (r in conferenceModifyIQ.relays) {
if (!RelayConfig.config.enabled) {
throw IqProcessingException(Condition.feature_not_implemented, "Octo is disable in configuration.")
throw IqProcessingException(Condition.feature_not_implemented, "Octo is disabled in configuration.")
}
if (!WebsocketServiceConfig.config.enabled) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still throw if neither SCTP nor websockets are enabled?

if (!WebsocketServiceConfig.config.enabled && !SctpConfig.config.enabled) {
logger.warn(
"Can not use a colibri2 relay, because colibri web sockets are not enabled. See " +
"https://github.com/jitsi/jitsi-videobridge/blob/master/doc/octo.md"
"Can not use a colibri2 relay, because neither SCTP nor colibri web sockets are enabled. See " +
"https://github.com/jitsi/jitsi-videobridge/blob/master/doc/relay.md"
)
throw UnsupportedOperationException(
"Colibri websockets or SCTP need to be enabled to use a colibri2 relay."
)
throw UnsupportedOperationException("Colibri websockets need to be enabled to use a colibri2 relay.")
}
responseBuilder.addRelay(handleColibri2Relay(r))
}
Expand Down Expand Up @@ -352,16 +354,42 @@ class Colibri2ConferenceHandler(
)
}

if (c2relay.transport?.sctp != null) throw IqProcessingException(
Condition.feature_not_implemented,
"SCTP is not supported for relays."
)
c2relay.transport?.sctp?.let { sctp ->
if (!SctpConfig.config.enabled) {
throw IqProcessingException(
Condition.feature_not_implemented,
"SCTP support is not configured"
)
}
if (sctp.port != null && sctp.port != SctpManager.DEFAULT_SCTP_PORT) {
throw IqProcessingException(
Condition.bad_request,
"Specific SCTP port requested, not supported."
)
}

relay.createSctpConnection(sctp)
}

c2relay.transport?.iceUdpTransport?.let { relay.setTransportInfo(it) }
if (c2relay.create) {
val transBuilder = Transport.getBuilder()
transBuilder.setIceUdpExtension(relay.describeTransport())
respBuilder.setTransport(transBuilder.build())
c2relay.transport?.sctp?.let {
bgrozev marked this conversation as resolved.
Show resolved Hide resolved
val role = if (it.role == Sctp.Role.CLIENT) {
Sctp.Role.SERVER
} else {
Sctp.Role.CLIENT
}
transBuilder.setSctp(
Sctp.Builder()
.setPort(SctpManager.DEFAULT_SCTP_PORT)
.setRole(role)
.build()
)

respBuilder.setTransport(transBuilder.build())
}
}

for (media: Media in c2relay.media) {
Expand Down
Loading