Skip to content

Commit

Permalink
Store class instances rather than type tags in 'Retrieve' commands, t…
Browse files Browse the repository at this point in the history
…hus avoiding hammering 'classFromType'. Results in a considerable speedup.
  • Loading branch information
Gerard Murphy committed Mar 30, 2019
1 parent f081d28 commit a75b9a5
Showing 1 changed file with 38 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import scala.collection.mutable.{
SortedMap => MutableSortedMap
}
import scala.concurrent.duration._
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.runtime.universe.{TypeTag, typeOf}
import scala.util.{DynamicVariable, Try}

object ImmutableObjectStorage {
Expand Down Expand Up @@ -102,17 +102,17 @@ object ImmutableObjectStorage {

case class Store[X](immutableObject: X) extends Operation[TrancheId]

case class Retrieve[X: TypeTag](trancheId: TrancheId) extends Operation[X] {
val capturedTypeTag = implicitly[TypeTag[X]]
}
case class Retrieve[X](trancheId: TrancheId, clazz: Class[X])
extends Operation[X]

type Session[X] = FreeT[Operation, EitherThrowableOr, X]

def store[X](immutableObject: X): Session[TrancheId] =
FreeT.liftF[Operation, EitherThrowableOr, TrancheId](Store(immutableObject))

def retrieve[X: TypeTag](id: TrancheId): Session[X] =
FreeT.liftF[Operation, EitherThrowableOr, X](Retrieve(id))
FreeT.liftF[Operation, EitherThrowableOr, X](
Retrieve(id, classFromType(typeOf[X])))

def runToYieldTrancheIds(session: Session[Vector[TrancheId]])
: Tranches => EitherThrowableOr[Vector[TrancheId]] =
Expand Down Expand Up @@ -173,6 +173,7 @@ object ImmutableObjectStorage {
}
}.withRegistrar(
kryo =>
// TODO - check that this is really necessary...
kryo.register(
classOf[ClosureSerializer.Closure],
new CleaningSerializer(
Expand Down Expand Up @@ -277,7 +278,7 @@ object ImmutableObjectStorage {
new StdInstantiatorStrategy

def createProxy[Result](clazz: Class[Result],
acquiredState: AcquiredState[Result]): AnyRef = {
acquiredState: AcquiredState[Result]): Result = {
val proxyClassInstantiator =
synchronized {
cachedProxyClassInstantiators.getOrElseUpdate(clazz, {
Expand All @@ -287,11 +288,10 @@ object ImmutableObjectStorage {

val proxy = proxyClassInstantiator
.newInstance()
.asInstanceOf[StateAcquisition[Result]]

proxy.acquire(acquiredState)
proxy.asInstanceOf[StateAcquisition[Result]].acquire(acquiredState)

proxy
proxy.asInstanceOf[Result]
}
}

Expand All @@ -307,6 +307,8 @@ object ImmutableObjectStorage {

private val referenceResolverCacheTimeToLive = Some(10 minutes)

// TODO: if the keys are identity hash codes, what's going to stop the original object
// from being garbage collected and another one taking the same identity hash code? Oh dear....
private val objectReferenceIdCache: Cache[ObjectReferenceId] =
CaffeineCache[ObjectReferenceId](
CacheConfig.defaultCacheConfig.copy(
Expand Down Expand Up @@ -340,12 +342,13 @@ object ImmutableObjectStorage {
!clazz.isPrimitive && clazz != classOf[String]

def retrieveUnderlying[X](trancheIdForExternalObjectReference: TrancheId,
objectReferenceId: ObjectReferenceId): X = {
objectReferenceId: ObjectReferenceId,
clazz: Class[X]): X = {
if (!completedOperationDataByTrancheId.contains(
trancheIdForExternalObjectReference)) {
val _ =
thisSessionInterpreter(
Retrieve(trancheIdForExternalObjectReference))
Retrieve(trancheIdForExternalObjectReference, clazz))
}

completedOperationDataByTrancheId(trancheIdForExternalObjectReference).referenceResolver
Expand All @@ -354,7 +357,8 @@ object ImmutableObjectStorage {
}

class AcquiredState[X](trancheIdForExternalObjectReference: TrancheId,
objectReferenceId: ObjectReferenceId)
objectReferenceId: ObjectReferenceId,
clazz: Class[X])
extends proxySupport.AcquiredState[X] {
@transient
private var _underlying: Option[X] = None
Expand All @@ -364,7 +368,8 @@ object ImmutableObjectStorage {
case None =>
val result: X = retrieveUnderlying(
trancheIdForExternalObjectReference,
objectReferenceId)
objectReferenceId,
clazz)

_underlying = Some(result)

Expand Down Expand Up @@ -451,21 +456,25 @@ object ImmutableObjectStorage {
}

resultFromExistingAssociation.getOrElse {
if (proxySupport.isNotToBeProxied(clazz))
retrieveUnderlying(trancheIdForExternalObjectReference,
objectReferenceId)
else {
def wildcardCapture[X]: proxySupport.AcquiredState[X] =
new AcquiredState[X](trancheIdForExternalObjectReference,
objectReferenceId)

val proxy: AnyRef =
proxySupport.createProxy(clazz, wildcardCapture)

referenceIdToProxyMap.put(objectReferenceId, proxy)
def wildcardCapture[X <: AnyRef](clazz: Class[X]): X =
if (proxySupport.isNotToBeProxied(clazz))
retrieveUnderlying(trancheIdForExternalObjectReference,
objectReferenceId,
clazz)
else {
val proxy: X =
proxySupport.createProxy(
clazz,
new AcquiredState(trancheIdForExternalObjectReference,
objectReferenceId,
clazz))

referenceIdToProxyMap.put(objectReferenceId, proxy)

proxy
}

proxy
}
wildcardCapture(clazz.asInstanceOf[Class[_ <: AnyRef]])
}
}(objectCache, mode, implicitly[Flags])

Expand Down Expand Up @@ -512,7 +521,7 @@ object ImmutableObjectStorage {
trancheId
}

case retrieve @ Retrieve(trancheId) =>
case retrieve @ Retrieve(trancheId, clazz) =>
for {
tranche <- tranches.retrieveTranche(trancheId)
result <- Try {
Expand All @@ -536,8 +545,7 @@ object ImmutableObjectStorage {
completedOperationDataByTrancheId += trancheId -> CompletedOperationData(
trancheSpecificReferenceResolver,
deserialized)
classFromType(retrieve.capturedTypeTag.tpe)
.cast(deserialized)
clazz.cast(deserialized)
}).asInstanceOf[X]
}.toEither
} yield result
Expand Down

0 comments on commit a75b9a5

Please sign in to comment.