Skip to content
This repository has been archived by the owner on Jul 2, 2021. It is now read-only.

Commit

Permalink
Merge pull request #43 from UrbanCompass/fix-replay-concurrency-bug
Browse files Browse the repository at this point in the history
Fix concurrency bug in Replay
  • Loading branch information
Russell Stephens authored Oct 10, 2019
2 parents 89cca75 + 3d51773 commit f6b15c4
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
13 changes: 10 additions & 3 deletions snail-kotlin/src/main/java/com/compass/snail/Replay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,29 @@ package com.compass.snail

import com.compass.snail.disposer.Disposable
import kotlinx.coroutines.CoroutineDispatcher
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock

open class Replay<T>(private val threshold: Int) : Observable<T>() {
private var values: MutableList<T> = mutableListOf()
private val lock = ReentrantLock()

override fun subscribe(dispatcher: CoroutineDispatcher?, next: ((T) -> Unit)?, error: ((Throwable) -> Unit)?, done: (() -> Unit)?): Disposable {
replay(dispatcher, createHandler(next, error, done))
return super.subscribe(dispatcher, next, error, done)
}

override fun next(value: T) {
values.add(value)
values = values.takeLast(threshold).toMutableList()
lock.withLock {
values.add(value)
values = values.takeLast(threshold).toMutableList()
}
super.next(value)
}

private fun replay(dispatcher: CoroutineDispatcher?, handler: (Event<T>) -> Unit) {
values.forEach { notify(Subscriber(dispatcher, handler, this), Event(next = Next(it))) }
lock.withLock {
values.forEach { notify(Subscriber(dispatcher, handler, this), Event(next = Next(it))) }
}
}
}
37 changes: 37 additions & 0 deletions snail-kotlin/src/test/java/com/compass/snail/ReplayTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package com.compass.snail
import org.junit.Assert.assertEquals
import org.junit.Before
import org.junit.Test
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kotlin.concurrent.thread

class ReplayTests {
private var subject: Replay<String>? = null
Expand Down Expand Up @@ -38,4 +41,38 @@ class ReplayTests {
assertEquals("2", b[0])
assertEquals(2, b.size)
}

@Test
fun testMultiThreadedBehavior() {
val subject = Replay<Int>(1)
var a = 0
var b = 0

subject.subscribe(next = {
a = it
})
subject.subscribe(next = {
b = it
})

val latch = CountDownLatch(2)
thread {
for (i in 1..100) {
subject.next(i)
}
latch.countDown()
}
thread {
for (i in 1..100) {
subject.next(i)
}
latch.countDown()
}
latch.await(1000, TimeUnit.SECONDS)

subject.removeSubscribers()

assertEquals(100, a)
assertEquals(100, b)
}
}

0 comments on commit f6b15c4

Please sign in to comment.