Skip to content

Commit

Permalink
Added implementation for simple thread safe async event emitter
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Oct 26, 2024
1 parent 1eded83 commit 2cdbdb3
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 0 deletions.
91 changes: 91 additions & 0 deletions chat-android/src/main/java/com/ably/chat/Emitter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.ably.chat

import io.ably.lib.util.Log.ERROR
import io.ably.lib.util.Log.LogHandler
import java.util.LinkedList
import java.util.concurrent.CopyOnWriteArrayList
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch

/**
* Emitter interface for supplied value
* Ideally, class implementation should work for both kotlin and java
*/
interface Emitter<V> {
fun emit(value: V)
fun on(block: suspend CoroutineScope.(V) -> Unit): Subscription
fun offAll()
}

/**
* AsyncEmitter is thread safe, highly performant async emitter implementation for kotlin.
* Currently, use-case is limited to handle internal events.
* This can be modified in the future to handle external listeners, events etc
*/
class AsyncEmitter<V> (private val collectorScope: CoroutineScope = CoroutineScope(Dispatchers.Default)) : Emitter<V> {

// Read https://www.codejava.net/java-core/concurrency/java-concurrent-collection-copyonwritearraylist-examples
// For more information on why it's good to have this list
private val subscribers = CopyOnWriteArrayList<AsyncSubscriber<V>>()

override fun emit(value: V) {
for (subscriber in subscribers) {
subscriber.notifyAsync(value)
}
}

override fun on(block: suspend CoroutineScope.(V) -> Unit): Subscription {
val subscriber = AsyncSubscriber(collectorScope, block)
subscribers.addIfAbsent(subscriber)
return Subscription {
subscribers.remove(subscriber)
}
}

override fun offAll() {
subscribers.clear()
}
}

private class AsyncSubscriber<V>(
private val scope: CoroutineScope,
private val subscriberBlock: (suspend CoroutineScope.(V) -> Unit),
private val logger: LogHandler? = null,
) {
private var isSubscriberRunning = false
private val values = LinkedList<V>()

fun notifyAsync(value: V) {
sequentialScope.launch {
values.add(value)
if (!isSubscriberRunning) {
isSubscriberRunning = true
while (values.isNotEmpty()) {
val valueTobeEmitted = values.poll()
try {
// Process values sequentially, similar to blocking eventEmitter
scope.launch { subscriberBlock(valueTobeEmitted as V) }.join()
} catch (t: Throwable) {
// TODO - replace with more verbose logging
logger?.println(ERROR, "AsyncSubscriber", "Error processing value $valueTobeEmitted", t)
}
}
isSubscriberRunning = false
}
}
}

override fun equals(other: Any?): Boolean {
if (other is AsyncSubscriber<*>) {
// Avoid registering duplicate anonymous subscriber block with same instance id
// Common scenario when Android activity is refreshed or some app components refresh
return this.subscriberBlock.hashCode() == other.subscriberBlock.hashCode()
}
return super.equals(other)
}

companion object {
val sequentialScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))
}
}
38 changes: 38 additions & 0 deletions chat-android/src/test/java/com/ably/chat/EmitterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.ably.chat;

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;

public class EmitterTest {

@Test
public void testEmitter() {
AsyncEmitter<String> asyncEmitter = new AsyncEmitter<>();
ArrayList<String> receivedValues = new ArrayList<>();

asyncEmitter.emit("1");

Subscription subscription = asyncEmitter.on((coroutineScope, s, continuation) -> {
receivedValues.add(s);
return null;
});

asyncEmitter.emit("2");
asyncEmitter.emit("3");
asyncEmitter.emit("4");

subscription.unsubscribe();

asyncEmitter.emit("5");
asyncEmitter.emit("6");

Exception conditionError = new TestUtils.ConditionalWaiter().
wait(() -> receivedValues.size() == 3, 5000);
Assert.assertNull(conditionError);

Assert.assertEquals(Arrays.asList("2", "3", "4"), receivedValues);
}
}
33 changes: 33 additions & 0 deletions chat-android/src/test/java/com/ably/chat/EmitterTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.ably.chat

import kotlinx.coroutines.test.runTest
import org.junit.Assert
import org.junit.Test

class AsyncEmitterTest {

@Test
fun `should be able to emit and listen to event`() = runTest {
val asyncEmitter = AsyncEmitter<String>()
val receivedValues = mutableListOf<String>()

asyncEmitter.emit("1")

val subscription = asyncEmitter.on { received: String ->
receivedValues.add(received)
}

asyncEmitter.emit("2")
asyncEmitter.emit("3")
asyncEmitter.emit("4")

subscription.unsubscribe()

asyncEmitter.emit("5")
asyncEmitter.emit("6")

assertWaiter { receivedValues.size == 3 }.join()

Assert.assertEquals(listOf("2", "3", "4"), receivedValues)
}
}
38 changes: 38 additions & 0 deletions chat-android/src/test/java/com/ably/chat/TestUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.ably.chat;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;

public class TestUtils {

public interface ConditionFn<O> {
O call();
}

public static class ConditionalWaiter {
public Exception wait(ConditionFn<Boolean> condition, int timeoutInMs) {
AtomicBoolean taskTimedOut = new AtomicBoolean();
new Timer().schedule(new TimerTask() {
@Override
public void run() {
taskTimedOut.set(true);
}
}, timeoutInMs);
while (true) {
try {
Boolean result = condition.call();
if (result) {
return null;
}
if (taskTimedOut.get()) {
throw new Exception("Timed out after " + timeoutInMs + "ms waiting for condition");
}
Thread.sleep(200);
} catch (Exception e) {
return e;
}
}
}
}
}
19 changes: 19 additions & 0 deletions chat-android/src/test/java/com/ably/chat/TestUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ import com.google.gson.JsonElement
import io.ably.lib.types.AsyncHttpPaginatedResponse
import io.mockk.every
import io.mockk.mockk
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout

fun buildAsyncHttpPaginatedResponse(items: List<JsonElement>): AsyncHttpPaginatedResponse {
val response = mockk<AsyncHttpPaginatedResponse>()
Expand Down Expand Up @@ -49,3 +55,16 @@ fun mockOccupancyApiResponse(realtimeClientMock: RealtimeClient, response: JsonE
)
}
}

suspend fun assertWaiter(timeoutInMs: Long = 10000, block : () -> Boolean): Job {
// Need to create coroutineScope because delay doesn't work in runTest default CoroutineScope
val scope = CoroutineScope(Dispatchers.Default)
return scope.launch {
withTimeout(timeoutInMs) {
do {
val success = block()
delay(100)
} while (!success)
}
}
}

0 comments on commit 2cdbdb3

Please sign in to comment.