From bd72b747e5c3ecaf69ce88d38b8f8489398ee6f5 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sat, 26 Oct 2024 18:29:53 +0530 Subject: [PATCH 1/2] Added implementation for simple thread safe async event emitter --- .../src/main/java/com/ably/chat/Emitter.kt | 90 +++++++++++++++++++ .../test/java/com/ably/chat/EmitterTest.java | 38 ++++++++ .../test/java/com/ably/chat/EmitterTest.kt | 33 +++++++ .../test/java/com/ably/chat/TestUtils.java | 38 ++++++++ .../src/test/java/com/ably/chat/TestUtils.kt | 19 ++++ 5 files changed, 218 insertions(+) create mode 100644 chat-android/src/main/java/com/ably/chat/Emitter.kt create mode 100644 chat-android/src/test/java/com/ably/chat/EmitterTest.java create mode 100644 chat-android/src/test/java/com/ably/chat/EmitterTest.kt create mode 100644 chat-android/src/test/java/com/ably/chat/TestUtils.java diff --git a/chat-android/src/main/java/com/ably/chat/Emitter.kt b/chat-android/src/main/java/com/ably/chat/Emitter.kt new file mode 100644 index 0000000..9c22214 --- /dev/null +++ b/chat-android/src/main/java/com/ably/chat/Emitter.kt @@ -0,0 +1,90 @@ +package com.ably.chat + +import io.ably.lib.util.Log.ERROR +import io.ably.lib.util.Log.LogHandler +import java.util.LinkedList +import java.util.concurrent.CopyOnWriteArrayList +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch + +/** + * Emitter interface for supplied value + * Ideally, class implementation should work for both kotlin and java + */ +interface Emitter { + fun emit(value: V) + fun on(block: suspend CoroutineScope.(V) -> Unit): Subscription + fun offAll() +} + +/** + * AsyncEmitter is thread safe, highly performant async emitter implementation for kotlin. + * Currently, use-case is limited to handle internal events. + * This can be modified in the future to handle external listeners, events etc + */ +class AsyncEmitter (private val collectorScope: CoroutineScope = CoroutineScope(Dispatchers.Default)) : Emitter { + + // Read more on https://www.codejava.net/java-core/concurrency/java-concurrent-collection-copyonwritearraylist-examples + private val subscribers = CopyOnWriteArrayList>() + + override fun emit(value: V) { + for (subscriber in subscribers) { + subscriber.notifyAsync(value) + } + } + + override fun on(block: suspend CoroutineScope.(V) -> Unit): Subscription { + val subscriber = AsyncSubscriber(collectorScope, block) + subscribers.addIfAbsent(subscriber) + return Subscription { + subscribers.remove(subscriber) + } + } + + override fun offAll() { + subscribers.clear() + } +} + +private class AsyncSubscriber( + private val scope: CoroutineScope, + private val subscriberBlock: (suspend CoroutineScope.(V) -> Unit), + private val logger: LogHandler? = null, +) { + private var isSubscriberRunning = false + private val values = LinkedList() + + fun notifyAsync(value: V) { + sequentialScope.launch { + values.add(value) + if (!isSubscriberRunning) { + isSubscriberRunning = true + while (values.isNotEmpty()) { + val valueTobeEmitted = values.poll() + try { + // Should process values sequentially, similar to blocking eventEmitter + scope.launch { subscriberBlock(valueTobeEmitted as V) }.join() + } catch (t: Throwable) { + // TODO - replace with more verbose logging + logger?.println(ERROR, "AsyncSubscriber", "Error processing value $valueTobeEmitted", t) + } + } + isSubscriberRunning = false + } + } + } + + override fun equals(other: Any?): Boolean { + if (other is AsyncSubscriber<*>) { + // Avoid registering duplicate anonymous subscriber block with same instance id + // Common scenario when Android activity is refreshed or some app components refresh + return this.subscriberBlock.hashCode() == other.subscriberBlock.hashCode() + } + return super.equals(other) + } + + companion object { + val sequentialScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1)) + } +} diff --git a/chat-android/src/test/java/com/ably/chat/EmitterTest.java b/chat-android/src/test/java/com/ably/chat/EmitterTest.java new file mode 100644 index 0000000..16398f0 --- /dev/null +++ b/chat-android/src/test/java/com/ably/chat/EmitterTest.java @@ -0,0 +1,38 @@ +package com.ably.chat; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; + +public class EmitterTest { + + @Test + public void testEmitter() { + AsyncEmitter asyncEmitter = new AsyncEmitter<>(); + ArrayList receivedValues = new ArrayList<>(); + + asyncEmitter.emit("1"); + + Subscription subscription = asyncEmitter.on((coroutineScope, s, continuation) -> { + receivedValues.add(s); + return null; + }); + + asyncEmitter.emit("2"); + asyncEmitter.emit("3"); + asyncEmitter.emit("4"); + + subscription.unsubscribe(); + + asyncEmitter.emit("5"); + asyncEmitter.emit("6"); + + Exception conditionError = new TestUtils.ConditionalWaiter(). + wait(() -> receivedValues.size() == 3, 5000); + Assert.assertNull(conditionError); + + Assert.assertEquals(Arrays.asList("2", "3", "4"), receivedValues); + } +} diff --git a/chat-android/src/test/java/com/ably/chat/EmitterTest.kt b/chat-android/src/test/java/com/ably/chat/EmitterTest.kt new file mode 100644 index 0000000..c340852 --- /dev/null +++ b/chat-android/src/test/java/com/ably/chat/EmitterTest.kt @@ -0,0 +1,33 @@ +package com.ably.chat + +import kotlinx.coroutines.test.runTest +import org.junit.Assert +import org.junit.Test + +class AsyncEmitterTest { + + @Test + fun `should be able to emit and listen to event`() = runTest { + val asyncEmitter = AsyncEmitter() + val receivedValues = mutableListOf() + + asyncEmitter.emit("1") + + val subscription = asyncEmitter.on { received: String -> + receivedValues.add(received) + } + + asyncEmitter.emit("2") + asyncEmitter.emit("3") + asyncEmitter.emit("4") + + subscription.unsubscribe() + + asyncEmitter.emit("5") + asyncEmitter.emit("6") + + assertWaiter { receivedValues.size == 3 }.join() + + Assert.assertEquals(listOf("2", "3", "4"), receivedValues) + } +} diff --git a/chat-android/src/test/java/com/ably/chat/TestUtils.java b/chat-android/src/test/java/com/ably/chat/TestUtils.java new file mode 100644 index 0000000..91ea1b0 --- /dev/null +++ b/chat-android/src/test/java/com/ably/chat/TestUtils.java @@ -0,0 +1,38 @@ +package com.ably.chat; + +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TestUtils { + + public interface ConditionFn { + O call(); + } + + public static class ConditionalWaiter { + public Exception wait(ConditionFn condition, int timeoutInMs) { + AtomicBoolean taskTimedOut = new AtomicBoolean(); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + taskTimedOut.set(true); + } + }, timeoutInMs); + while (true) { + try { + Boolean result = condition.call(); + if (result) { + return null; + } + if (taskTimedOut.get()) { + throw new Exception("Timed out after " + timeoutInMs + "ms waiting for condition"); + } + Thread.sleep(200); + } catch (Exception e) { + return e; + } + } + } + } +} diff --git a/chat-android/src/test/java/com/ably/chat/TestUtils.kt b/chat-android/src/test/java/com/ably/chat/TestUtils.kt index e493418..faf807a 100644 --- a/chat-android/src/test/java/com/ably/chat/TestUtils.kt +++ b/chat-android/src/test/java/com/ably/chat/TestUtils.kt @@ -4,6 +4,12 @@ import com.google.gson.JsonElement import io.ably.lib.types.AsyncHttpPaginatedResponse import io.mockk.every import io.mockk.mockk +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.withTimeout fun buildAsyncHttpPaginatedResponse(items: List): AsyncHttpPaginatedResponse { val response = mockk() @@ -49,3 +55,16 @@ fun mockOccupancyApiResponse(realtimeClientMock: RealtimeClient, response: JsonE ) } } + +suspend fun assertWaiter(timeoutInMs: Long = 10000, block : () -> Boolean): Job { + // Need to create coroutineScope because delay doesn't work in runTest default CoroutineScope + val scope = CoroutineScope(Dispatchers.Default) + return scope.launch { + withTimeout(timeoutInMs) { + do { + val success = block() + delay(100) + } while (!success) + } + } +} From dc4125d85b723bf3881d42f636afaeaee6d2d8fc Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sun, 27 Oct 2024 00:06:37 +0530 Subject: [PATCH 2/2] Fixed asyncEmitter, marked interface methods as Synchronized to restrict single threaded access --- .../src/main/java/com/ably/chat/Emitter.kt | 46 ++++++++-------- .../test/java/com/ably/chat/EmitterTest.kt | 53 +++++++++++++++++++ 2 files changed, 78 insertions(+), 21 deletions(-) diff --git a/chat-android/src/main/java/com/ably/chat/Emitter.kt b/chat-android/src/main/java/com/ably/chat/Emitter.kt index 9c22214..65c8e23 100644 --- a/chat-android/src/main/java/com/ably/chat/Emitter.kt +++ b/chat-android/src/main/java/com/ably/chat/Emitter.kt @@ -2,8 +2,8 @@ package com.ably.chat import io.ably.lib.util.Log.ERROR import io.ably.lib.util.Log.LogHandler -import java.util.LinkedList -import java.util.concurrent.CopyOnWriteArrayList +import java.util.TreeSet +import java.util.concurrent.LinkedBlockingQueue import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch @@ -19,29 +19,33 @@ interface Emitter { } /** - * AsyncEmitter is thread safe, highly performant async emitter implementation for kotlin. + * AsyncEmitter is thread safe, async emitter implementation for kotlin. * Currently, use-case is limited to handle internal events. * This can be modified in the future to handle external listeners, events etc */ class AsyncEmitter (private val collectorScope: CoroutineScope = CoroutineScope(Dispatchers.Default)) : Emitter { - // Read more on https://www.codejava.net/java-core/concurrency/java-concurrent-collection-copyonwritearraylist-examples - private val subscribers = CopyOnWriteArrayList>() + private val subscribers = TreeSet>() + @Synchronized override fun emit(value: V) { for (subscriber in subscribers) { - subscriber.notifyAsync(value) + subscriber.notify(value) } } + @Synchronized override fun on(block: suspend CoroutineScope.(V) -> Unit): Subscription { val subscriber = AsyncSubscriber(collectorScope, block) - subscribers.addIfAbsent(subscriber) + subscribers.add(subscriber) return Subscription { - subscribers.remove(subscriber) + synchronized(this) { + subscribers.remove(subscriber) + } } } + @Synchronized override fun offAll() { subscribers.clear() } @@ -51,13 +55,13 @@ private class AsyncSubscriber( private val scope: CoroutineScope, private val subscriberBlock: (suspend CoroutineScope.(V) -> Unit), private val logger: LogHandler? = null, -) { +) : Comparable { + private val values = LinkedBlockingQueue() private var isSubscriberRunning = false - private val values = LinkedList() - fun notifyAsync(value: V) { + fun notify(value: V) { + values.add(value) sequentialScope.launch { - values.add(value) if (!isSubscriberRunning) { isSubscriberRunning = true while (values.isNotEmpty()) { @@ -75,16 +79,16 @@ private class AsyncSubscriber( } } - override fun equals(other: Any?): Boolean { - if (other is AsyncSubscriber<*>) { - // Avoid registering duplicate anonymous subscriber block with same instance id - // Common scenario when Android activity is refreshed or some app components refresh - return this.subscriberBlock.hashCode() == other.subscriberBlock.hashCode() - } - return super.equals(other) - } - companion object { val sequentialScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1)) } + + override fun compareTo(other: V): Int { + // Avoid registering duplicate anonymous subscriber block with same instance id + // Common scenario when Android activity is refreshed or some app components refresh + if (other is AsyncSubscriber<*>) { + return this.subscriberBlock.hashCode().compareTo(other.subscriberBlock.hashCode()) + } + return this.hashCode().compareTo(other.hashCode()) + } } diff --git a/chat-android/src/test/java/com/ably/chat/EmitterTest.kt b/chat-android/src/test/java/com/ably/chat/EmitterTest.kt index c340852..49f6a49 100644 --- a/chat-android/src/test/java/com/ably/chat/EmitterTest.kt +++ b/chat-android/src/test/java/com/ably/chat/EmitterTest.kt @@ -1,6 +1,13 @@ package com.ably.chat +import java.util.concurrent.LinkedBlockingQueue +import kotlin.time.DurationUnit +import kotlin.time.toDuration +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withContext import org.junit.Assert import org.junit.Test @@ -14,6 +21,7 @@ class AsyncEmitterTest { asyncEmitter.emit("1") val subscription = asyncEmitter.on { received: String -> + delay((2000..3000).random().toDuration(DurationUnit.MILLISECONDS)) receivedValues.add(received) } @@ -27,7 +35,52 @@ class AsyncEmitterTest { asyncEmitter.emit("6") assertWaiter { receivedValues.size == 3 }.join() + Assert.assertEquals(3, receivedValues.size) Assert.assertEquals(listOf("2", "3", "4"), receivedValues) } + + @Test + fun `should be able to handle concurrent emits and listen to them in the same order`() = runTest { + val asyncEmitter = AsyncEmitter() + val emitted = LinkedBlockingQueue() + val receivedValues1 = mutableListOf() + val receivedValues2 = mutableListOf() + val receivedValues3 = mutableListOf() + + + asyncEmitter.on { received -> + receivedValues1.add(received) + } + + asyncEmitter.on { received -> + receivedValues2.add(received) + } + + asyncEmitter.on { received -> + receivedValues3.add(received) + } + + // Concurrently emit 100000 events from multiple threads + withContext(Dispatchers.IO) { + repeat(100000) { + launch { + asyncEmitter.emit(it) + emitted.add(it) + } + } + } + + assertWaiter { emitted.size == 100000 }.join() + assertWaiter { receivedValues1.size == 100000 }.join() + assertWaiter { receivedValues2.size == 100000 }.join() + + Assert.assertEquals(receivedValues1, receivedValues2) + Assert.assertEquals(receivedValues2, receivedValues3) + + Assert.assertEquals(100000, emitted.size) + Assert.assertEquals(100000, receivedValues1.size) + Assert.assertEquals(100000, receivedValues2.size) + Assert.assertEquals(100000, receivedValues3.size) + } }