Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simple thread safe event emitter #38

Draft
wants to merge 2 commits into
base: feature/roomlifecycle-attach-with-retry
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions chat-android/src/main/java/com/ably/chat/Emitter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.ably.chat

import io.ably.lib.util.Log.ERROR
import io.ably.lib.util.Log.LogHandler
import java.util.TreeSet
import java.util.concurrent.LinkedBlockingQueue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch

/**
* Emitter interface for supplied value
* Ideally, class implementation should work for both kotlin and java
*/
interface Emitter<V> {
fun emit(value: V)
fun on(block: suspend CoroutineScope.(V) -> Unit): Subscription
fun offAll()
}

/**
* AsyncEmitter is thread safe, async emitter implementation for kotlin.
* Currently, use-case is limited to handle internal events.
* This can be modified in the future to handle external listeners, events etc
*/
class AsyncEmitter<V> (private val collectorScope: CoroutineScope = CoroutineScope(Dispatchers.Default)) : Emitter<V> {

private val subscribers = TreeSet<AsyncSubscriber<V>>()

@Synchronized
override fun emit(value: V) {
for (subscriber in subscribers) {
subscriber.notify(value)
}
}

@Synchronized
override fun on(block: suspend CoroutineScope.(V) -> Unit): Subscription {
val subscriber = AsyncSubscriber(collectorScope, block)
subscribers.add(subscriber)
return Subscription {
synchronized(this) {
subscribers.remove(subscriber)
}
}
}

@Synchronized
override fun offAll() {
subscribers.clear()
}
}

private class AsyncSubscriber<V>(
private val scope: CoroutineScope,
private val subscriberBlock: (suspend CoroutineScope.(V) -> Unit),
private val logger: LogHandler? = null,
) : Comparable<V> {
private val values = LinkedBlockingQueue<V>()
private var isSubscriberRunning = false

fun notify(value: V) {
values.add(value)
sequentialScope.launch {
if (!isSubscriberRunning) {
isSubscriberRunning = true
while (values.isNotEmpty()) {
val valueTobeEmitted = values.poll()
try {
// Should process values sequentially, similar to blocking eventEmitter
scope.launch { subscriberBlock(valueTobeEmitted as V) }.join()
} catch (t: Throwable) {
// TODO - replace with more verbose logging
logger?.println(ERROR, "AsyncSubscriber", "Error processing value $valueTobeEmitted", t)
}
}
isSubscriberRunning = false
}
}
}

companion object {
val sequentialScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))
}

override fun compareTo(other: V): Int {
// Avoid registering duplicate anonymous subscriber block with same instance id
// Common scenario when Android activity is refreshed or some app components refresh
if (other is AsyncSubscriber<*>) {
return this.subscriberBlock.hashCode().compareTo(other.subscriberBlock.hashCode())
}
return this.hashCode().compareTo(other.hashCode())
}
}
38 changes: 38 additions & 0 deletions chat-android/src/test/java/com/ably/chat/EmitterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.ably.chat;

import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;

public class EmitterTest {

@Test
public void testEmitter() {
AsyncEmitter<String> asyncEmitter = new AsyncEmitter<>();
ArrayList<String> receivedValues = new ArrayList<>();

asyncEmitter.emit("1");

Subscription subscription = asyncEmitter.on((coroutineScope, s, continuation) -> {
receivedValues.add(s);
return null;
});

asyncEmitter.emit("2");
asyncEmitter.emit("3");
asyncEmitter.emit("4");

subscription.unsubscribe();

asyncEmitter.emit("5");
asyncEmitter.emit("6");

Exception conditionError = new TestUtils.ConditionalWaiter().
wait(() -> receivedValues.size() == 3, 5000);
Assert.assertNull(conditionError);

Assert.assertEquals(Arrays.asList("2", "3", "4"), receivedValues);
}
}
86 changes: 86 additions & 0 deletions chat-android/src/test/java/com/ably/chat/EmitterTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.ably.chat

import java.util.concurrent.LinkedBlockingQueue
import kotlin.time.DurationUnit
import kotlin.time.toDuration
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import org.junit.Assert
import org.junit.Test

class AsyncEmitterTest {

@Test
fun `should be able to emit and listen to event`() = runTest {
val asyncEmitter = AsyncEmitter<String>()
val receivedValues = mutableListOf<String>()

asyncEmitter.emit("1")

val subscription = asyncEmitter.on { received: String ->
delay((2000..3000).random().toDuration(DurationUnit.MILLISECONDS))
receivedValues.add(received)
}

asyncEmitter.emit("2")
asyncEmitter.emit("3")
asyncEmitter.emit("4")

subscription.unsubscribe()

asyncEmitter.emit("5")
asyncEmitter.emit("6")

assertWaiter { receivedValues.size == 3 }.join()
Assert.assertEquals(3, receivedValues.size)

Assert.assertEquals(listOf("2", "3", "4"), receivedValues)
}

@Test
fun `should be able to handle concurrent emits and listen to them in the same order`() = runTest {
val asyncEmitter = AsyncEmitter<Int>()
val emitted = LinkedBlockingQueue<Int>()
val receivedValues1 = mutableListOf<Int>()
val receivedValues2 = mutableListOf<Int>()
val receivedValues3 = mutableListOf<Int>()


asyncEmitter.on { received ->
receivedValues1.add(received)
}

asyncEmitter.on { received ->
receivedValues2.add(received)
}

asyncEmitter.on { received ->
receivedValues3.add(received)
}

// Concurrently emit 100000 events from multiple threads
withContext(Dispatchers.IO) {
repeat(100000) {
launch {
asyncEmitter.emit(it)
emitted.add(it)
}
}
}

assertWaiter { emitted.size == 100000 }.join()
assertWaiter { receivedValues1.size == 100000 }.join()
assertWaiter { receivedValues2.size == 100000 }.join()

Assert.assertEquals(receivedValues1, receivedValues2)
Assert.assertEquals(receivedValues2, receivedValues3)

Assert.assertEquals(100000, emitted.size)
Assert.assertEquals(100000, receivedValues1.size)
Assert.assertEquals(100000, receivedValues2.size)
Assert.assertEquals(100000, receivedValues3.size)
}
}
38 changes: 38 additions & 0 deletions chat-android/src/test/java/com/ably/chat/TestUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.ably.chat;

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;

public class TestUtils {

public interface ConditionFn<O> {
O call();
}

public static class ConditionalWaiter {
public Exception wait(ConditionFn<Boolean> condition, int timeoutInMs) {
AtomicBoolean taskTimedOut = new AtomicBoolean();
new Timer().schedule(new TimerTask() {
@Override
public void run() {
taskTimedOut.set(true);
}
}, timeoutInMs);
while (true) {
try {
Boolean result = condition.call();
if (result) {
return null;
}
if (taskTimedOut.get()) {
throw new Exception("Timed out after " + timeoutInMs + "ms waiting for condition");
}
Thread.sleep(200);
} catch (Exception e) {
return e;
}
}
}
}
}
19 changes: 19 additions & 0 deletions chat-android/src/test/java/com/ably/chat/TestUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ import com.google.gson.JsonElement
import io.ably.lib.types.AsyncHttpPaginatedResponse
import io.mockk.every
import io.mockk.mockk
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout

fun buildAsyncHttpPaginatedResponse(items: List<JsonElement>): AsyncHttpPaginatedResponse {
val response = mockk<AsyncHttpPaginatedResponse>()
Expand Down Expand Up @@ -49,3 +55,16 @@ fun mockOccupancyApiResponse(realtimeClientMock: RealtimeClient, response: JsonE
)
}
}

suspend fun assertWaiter(timeoutInMs: Long = 10000, block : () -> Boolean): Job {
// Need to create coroutineScope because delay doesn't work in runTest default CoroutineScope
val scope = CoroutineScope(Dispatchers.Default)
return scope.launch {
withTimeout(timeoutInMs) {
do {
val success = block()
delay(100)
} while (!success)
}
}
}
Loading