Skip to content

Commit

Permalink
Merge pull request #2143 from vasilmkd/grow-shrink
Browse files Browse the repository at this point in the history
Grow and shrink the callback hashtable
  • Loading branch information
djspiewak authored Jul 24, 2021
2 parents ce32369 + e42dc35 commit 639557e
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ package unsafe
* @param initialCapacity the initial capacity of the hashtable, ''must'' be a
* power of 2
*/
private[effect] final class ThreadSafeHashtable(initialCapacity: Int) {
private[effect] final class ThreadSafeHashtable(private[this] val initialCapacity: Int) {
private[this] var hashtable: Array[Throwable => Unit] = new Array(initialCapacity)
private[this] var size: Int = 0
private[this] var mask: Int = initialCapacity - 1
Expand Down Expand Up @@ -107,6 +107,33 @@ private[effect] final class ThreadSafeHashtable(initialCapacity: Int) {
// Mark the removed callback with the `Tombstone` reference.
table(idx) = Tombstone
size -= 1

val sz = size
val cap = capacity
if (cap > initialCapacity && (sz << 2) < cap) {
// halve the capacity of the table if it has been filled with less
// than 1/4 of the capacity
val newCap = cap >>> 1
val newMask = newCap - 1
val newHashtable = new Array[Throwable => Unit](newCap)

val table = hashtable
var i = 0
while (i < cap) {
val cur = table(i)
if ((cur ne null) && (cur ne Tombstone)) {
// Only re-insert references to actual callbacks.
// Filters out `Tombstone`s.
insert(newHashtable, newMask, cur, System.identityHashCode(cur) >> log2NumTables)
}
i += 1
}

hashtable = newHashtable
mask = newMask
capacity = newCap
}

return
} else if (cur ne null) {
// Skip over references of other callbacks and `Tombstone` objects.
Expand All @@ -121,11 +148,16 @@ private[effect] final class ThreadSafeHashtable(initialCapacity: Int) {

def unsafeHashtable(): Array[Throwable => Unit] = hashtable

/**
/*
* Only used in testing.
*/

private[unsafe] def isEmpty: Boolean =
size == 0 && hashtable.forall(cb => (cb eq null) || (cb eq Tombstone))

private[unsafe] def unsafeCapacity(): Int = capacity

private[unsafe] def unsafeInitialCapacity(): Int = initialCapacity
}

private object ThreadSafeHashtable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ class StripedHashtableSpec extends BaseSpec with Runners {
.flatMap { _ =>
IO.blocking {
rt.fiberErrorCbs.synchronized {
rt.fiberErrorCbs.tables.forall(_.isEmpty) mustEqual true
rt.fiberErrorCbs.tables.forall { table =>
// check that each component hashtable of the larger striped
// hashtable is empty and has shrunk to its initial capacity
table.isEmpty && table.unsafeCapacity() == table.unsafeInitialCapacity()
} mustEqual true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ class StripedHashtableSpec extends BaseSpec with Runners {
.flatMap { _ =>
IO.blocking {
rt.fiberErrorCbs.synchronized {
rt.fiberErrorCbs.tables.forall(_.isEmpty) mustEqual true
rt.fiberErrorCbs.tables.forall { table =>
// check that each component hashtable of the larger striped
// hashtable is empty and has shrunk to its initial capacity
table.isEmpty && table.unsafeCapacity() == table.unsafeInitialCapacity()
} mustEqual true
}
}
}
Expand Down

0 comments on commit 639557e

Please sign in to comment.