Skip to content

Commit

Permalink
Latte broker code improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Jul 14, 2024
1 parent 3cade2a commit 3d757dd
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 84 deletions.
38 changes: 18 additions & 20 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ abstract class BrokerClient(
}
}

internal fun toResponseTopic(topic: String): String =
if (connection.supportsTopicHotSwap) "$topic.responses" else topic

internal fun toResponseKey(key: String): String = "$key.response"

}

private class TopicMetadata(
Expand All @@ -127,21 +122,6 @@ private class TopicMetadata(
private val topic: String,
) {

private class KeyMetadata(val key: String) {
val producers: MutableSet<ProducerSubclient<*>> = Collections.synchronizedSet(HashSet())
val consumers: MutableSet<ConsumerSubclient<*>> = Collections.synchronizedSet(HashSet())

val isEmpty: Boolean
get() = producers.isEmpty() && consumers.isEmpty()

fun destroy() {
producers.forEach(ProducerSubclient<*>::destroy)
consumers.forEach(ConsumerSubclient<*>::destroy)
producers.clear()
consumers.clear()
}
}

private val log by Log
private val _keys: MutableMap<String, KeyMetadata> = Collections.synchronizedMap(HashMap())
private val isBeingDestroyed = AtomicBoolean(false)
Expand All @@ -158,6 +138,9 @@ private class TopicMetadata(
subclient.key,
subclient.topic
)
check(subclient.topic == topic) {
"Attempting to register subclient with topic '${subclient.topic}' in TopicMetadata of '$topic'"
}
val metadata = getOrCreateKeyMetadata(subclient.key)
when (subclient) {
is ConsumerSubclient<*> -> {
Expand Down Expand Up @@ -255,6 +238,21 @@ private class TopicMetadata(

}

private class KeyMetadata(val key: String) {
val producers: MutableSet<ProducerSubclient<*>> = Collections.synchronizedSet(HashSet())
val consumers: MutableSet<ConsumerSubclient<*>> = Collections.synchronizedSet(HashSet())

val isEmpty: Boolean
get() = producers.isEmpty() && consumers.isEmpty()

fun destroy() {
producers.forEach(ProducerSubclient<*>::destroy)
consumers.forEach(ConsumerSubclient<*>::destroy)
producers.clear()
consumers.clear()
}
}

@PublishedApi
internal inline fun <reified T> isTypeNullable(): Boolean {
return null is T || T::class.java == Unit::class.java || T::class.java == Void::class.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ abstract class BrokerConnection {
listeners.remove(cb)
if (listeners.size == 0) {
log.debug("Removing topic '{}'", topic)
deferredTopicsToCreate.remove(topic)
removeTopic(topic)
null
} else {
Expand Down
31 changes: 7 additions & 24 deletions latte/src/main/java/gg/beemo/latte/broker/BrokerMessageHeaders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,17 @@ open class BrokerMessageHeaders(val headers: Map<String, String>) {
}

constructor(
sourceService: String,
sourceInstance: String,
connection: BrokerConnection,
targetServices: Set<String>,
targetInstances: Set<String>,
) : this(
createHeadersMap(
sourceService,
sourceInstance,
connection.serviceName,
connection.instanceId,
targetServices,
targetInstances,
null,
)
)

constructor(
connection: BrokerConnection,
targetServices: Set<String>,
targetInstances: Set<String>,
) : this(
connection.serviceName,
connection.instanceId,
targetServices,
targetInstances,
),
)

companion object {
Expand All @@ -54,6 +42,7 @@ open class BrokerMessageHeaders(val headers: Map<String, String>) {
private const val HEADER_TARGET_INSTANCES = "target-instances"
private const val HEADER_MESSAGE_ID = "message-id"

// Needs to be JvmStatic to be used in subclasses
@JvmStatic
protected fun createHeadersMap(
sourceService: String,
Expand All @@ -66,23 +55,17 @@ open class BrokerMessageHeaders(val headers: Map<String, String>) {
val headers = HashMap<String, String>()
headers[HEADER_SOURCE_SERVICE] = sourceService
headers[HEADER_SOURCE_INSTANCE] = sourceInstance
headers[HEADER_TARGET_SERVICES] = joinToString(targetServices)
headers[HEADER_TARGET_INSTANCES] = joinToString(targetInstances)
headers[HEADER_TARGET_SERVICES] = targetServices.joinToString(",")
headers[HEADER_TARGET_INSTANCES] = targetInstances.joinToString(",")
headers[HEADER_MESSAGE_ID] = messageId ?: UUID.randomUUID().toString()
headers.putAll(extra)
return headers
}

@JvmStatic
protected fun splitToSet(value: String): Set<String> {
return value.split(",").filter { it.isNotEmpty() }.toSet()
}

@JvmStatic
protected fun joinToString(value: Set<String>): String {
return value.joinToString(",")
}

}

}
Expand Down
19 changes: 12 additions & 7 deletions latte/src/main/java/gg/beemo/latte/broker/rpc/RpcClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ class RpcClient<RequestT, ResponseT>(
private val requestConsumer = client.consumer(topic, key, options, requestType, requestIsNullable) { msg ->
suspend fun sendResponse(response: ResponseT?, status: RpcStatus, isException: Boolean, isUpdate: Boolean) {
val responseMsg = RpcResponseMessage(
client.toResponseTopic(topic),
client.toResponseKey(key),
toResponseTopic(topic),
toResponseKey(key),
response,
RpcMessageHeaders(
connection,
Expand Down Expand Up @@ -71,24 +71,24 @@ class RpcClient<RequestT, ResponseT>(
return@consumer
} catch (ex: Exception) {
log.error(
"Uncaught RPC callbac#k error while processing message ${msg.headers.messageId} " +
"Uncaught RPC callback error while processing message ${msg.headers.messageId} " +
"with key '$key' in topic '$topic'",
ex,
)
return@consumer
}
}
private val responseProducer = client.producer(
client.toResponseTopic(topic),
client.toResponseKey(key),
toResponseTopic(topic),
toResponseKey(key),
options,
responseType,
responseIsNullable,
)
private val responseFlow = MutableSharedFlow<BaseBrokerMessage<ResponseT>>()
private val responseConsumer = client.consumer(
client.toResponseTopic(topic),
client.toResponseKey(key),
toResponseTopic(topic),
toResponseKey(key),
options,
responseType,
responseIsNullable,
Expand Down Expand Up @@ -160,6 +160,11 @@ class RpcClient<RequestT, ResponseT>(

}

private fun toResponseTopic(topic: String): String =
if (connection.supportsTopicHotSwap) "$topic.responses" else topic

private fun toResponseKey(key: String): String = "$key.response"

override fun doDestroy() {
requestProducer.destroy()
requestConsumer.destroy()
Expand Down
44 changes: 11 additions & 33 deletions latte/src/main/java/gg/beemo/latte/broker/rpc/RpcMessageHeaders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ class RpcMessageHeaders(headers: Map<String, String>) : BrokerMessageHeaders(hea
constructor(base: BrokerMessageHeaders) : this(base.headers)

constructor(
sourceService: String,
sourceInstance: String,
connection: BrokerConnection,
targetServices: Set<String>,
targetInstances: Set<String>,
inReplyTo: MessageId,
Expand All @@ -33,46 +32,25 @@ class RpcMessageHeaders(headers: Map<String, String>) : BrokerMessageHeaders(hea
isUpdate: Boolean,
) : this(
createHeadersMap(
sourceService,
sourceInstance,
connection.serviceName,
connection.instanceId,
targetServices,
targetInstances,
null,
extra = mapOf(
HEADER_IN_REPLY_TO to inReplyTo,
HEADER_STATUS to status.code.toString(),
HEADER_IS_EXCEPTION to isException.toString(),
HEADER_IS_UPDATE to isUpdate.toString(),
)
)
)

constructor(
connection: BrokerConnection,
targetServices: Set<String>,
targetInstances: Set<String>,
inReplyTo: MessageId,
status: RpcStatus,
isException: Boolean,
isUpdate: Boolean,
) : this(
connection.serviceName,
connection.instanceId,
targetServices,
targetInstances,
inReplyTo,
status,
isException,
isUpdate,
extra =
mapOf(
HEADER_IN_REPLY_TO to inReplyTo,
HEADER_STATUS to status.code.toString(),
HEADER_IS_EXCEPTION to isException.toString(),
HEADER_IS_UPDATE to isUpdate.toString(),
),
),
)

companion object {

private const val HEADER_IN_REPLY_TO = "rpc-in-reply-to"
private const val HEADER_STATUS = "rpc-response-status"
private const val HEADER_IS_EXCEPTION = "rpc-is-exception"
private const val HEADER_IS_UPDATE = "rpc-is-update"

}

}

0 comments on commit 3d757dd

Please sign in to comment.