Skip to content

Commit

Permalink
Tiered cache improvements
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
pditommaso committed Dec 22, 2024
1 parent 9dfe921 commit abe6e9b
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.seqera.wave.core

import java.time.Duration
import java.util.concurrent.CompletableFuture

import com.google.common.hash.Hashing
Expand Down Expand Up @@ -169,8 +170,11 @@ class RegistryProxyService {
DelegateResponse handleRequest(RoutePath route, Map<String,List<String>> headers) {
final resp = cache.getOrCompute(
requestKey(route, headers),
(it)-> handleRequest0(route, headers),
(resp)-> route.isDigest() && resp.isCacheable() )
(it)-> {
final resp = handleRequest0(route, headers)
final ttl = route.isDigest() && resp.isCacheable() ? cache.duration : null
return new Tuple2<DelegateResponse, Duration>(resp, ttl)
})
return resp
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/groovy/io/seqera/wave/proxy/DelegateResponse.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.seqera.wave.proxy

import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import io.seqera.wave.encoder.MoshiExchange

Expand All @@ -26,6 +27,7 @@ import io.seqera.wave.encoder.MoshiExchange
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@EqualsAndHashCode
@ToString(includeNames = true, includePackage = false)
class DelegateResponse implements MoshiExchange {
int statusCode
Expand Down
23 changes: 19 additions & 4 deletions src/main/groovy/io/seqera/wave/proxy/ProxyCache.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,15 @@ import jakarta.inject.Singleton
@Singleton
@CompileStatic
class ProxyCache extends AbstractTieredCache<DelegateResponse> {
ProxyCache(@Nullable L2TieredCache l2,
@Value('${wave.proxy-cache.duration:1h}') Duration duration,
@Value('${wave.proxy-cache.max-size:10000}') long maxSize) {
super(l2, encoder(), duration, maxSize)

@Value('${wave.proxy-cache.duration:1h}')
private Duration duration

@Value('${wave.proxy-cache.max-size:10000}')
private int maxSize

ProxyCache(@Nullable L2TieredCache l2) {
super(l2, encoder())
}

static MoshiEncodeStrategy encoder() {
Expand All @@ -52,11 +57,21 @@ class ProxyCache extends AbstractTieredCache<DelegateResponse> {
return new MoshiEncodeStrategy<AbstractTieredCache.Entry>(factory) {}
}

@Override
String getName() {
'proxy-cache'
}

@Override
String getPrefix() {
'proxy-cache/v1'
}

@Override
int getMaxSize() {
return maxSize
}

Duration getDuration() { duration }

}
116 changes: 77 additions & 39 deletions src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
package io.seqera.wave.store.cache

import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
import java.util.function.Function

import com.github.benmanes.caffeine.cache.AsyncCache
import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.RemovalCause
import com.github.benmanes.caffeine.cache.RemovalListener
Expand Down Expand Up @@ -56,28 +56,41 @@ abstract class AbstractTieredCache<V extends MoshiExchange> implements TieredCac
private EncodingStrategy<Entry> encoder

// FIXME https://github.com/seqeralabs/wave/issues/747
private AsyncCache<String,V> l1
private volatile AsyncCache<String,Entry> _l1

private final Duration ttl
private Lock sync = new ReentrantLock()

private L2TieredCache<String,String> l2

private final WeakHashMap<String,Lock> locks = new WeakHashMap<>()

AbstractTieredCache(L2TieredCache<String,String> l2, MoshiEncodeStrategy encoder, Duration duration, long maxSize) {
log.info "Cache '${getName()}' config - prefix=${getPrefix()}; ttl=${duration}; max-size: ${maxSize}"
AbstractTieredCache(L2TieredCache<String,String> l2, MoshiEncodeStrategy encoder) {
if( l2==null )
log.warn "Missing L2 cache for tiered cache '${getName()}'"
this.l2 = l2
this.ttl = duration
this.encoder = encoder
this.l1 = Caffeine.newBuilder()
.expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS)
.maximumSize(maxSize)
.removalListener(removalListener0())
.buildAsync()
}

private Cache<String,Entry> getL1() {
if( _l1!=null ){
return _l1.synchronous()
}
sync.lock()
try {
log.info "Cache '${getName()}' config - prefix=${getPrefix()}; max-size: ${maxSize}"
_l1 = Caffeine.newBuilder()
.maximumSize(maxSize)
.removalListener(removalListener0())
.buildAsync()
return _l1.synchronous()
}
finally {
sync.unlock()
}
}

abstract int getMaxSize()

abstract protected getName()

abstract protected String getPrefix()
Expand All @@ -101,7 +114,7 @@ abstract class AbstractTieredCache<V extends MoshiExchange> implements TieredCac
*/
@Override
V get(String key) {
getOrCompute(key, null, (v)->true)
getOrCompute(key, null, null)
}

/**
Expand All @@ -114,8 +127,14 @@ abstract class AbstractTieredCache<V extends MoshiExchange> implements TieredCac
* @return
* The value associated with the specified key, or {@code null} otherwise
*/
V getOrCompute(String key, Function<String,V> loader) {
getOrCompute(key, loader, (v)->true)
V getOrCompute(String key, Function<String,V> loader, Duration ttl) {

final provisioner = (String k)-> {
final v=loader.apply(k);
v!=null ? new Tuple2<V,Duration>(v,ttl) : null
}

getOrCompute(key, loader!=null ? provisioner : null)
}

/**
Expand All @@ -125,18 +144,15 @@ abstract class AbstractTieredCache<V extends MoshiExchange> implements TieredCac
* The key of the value to be retrieved
* @param loader
* The function invoked to load the value the entry with the specified key is not available
* @param cacheCondition
* The function to determine if the loaded value should be cached
* @return
* The value associated with the specified key, or #function result otherwise
*/
V getOrCompute(String key, Function<String,V> loader, Function<V,Boolean> cacheCondition) {
V getOrCompute(String key, Function<String, Tuple2<V,Duration>> loader) {
assert key!=null, "Argument key cannot be null"
assert cacheCondition!=null, "Argument condition cannot be null"

log.trace "Cache '${name}' checking key=$key"
// Try L1 cache first
V value = l1.synchronous().getIfPresent(key)
V value = l1Get(key)
if (value != null) {
log.trace "Cache '${name}' L1 hit (a) - key=$key => value=$value"
return value
Expand All @@ -145,28 +161,30 @@ abstract class AbstractTieredCache<V extends MoshiExchange> implements TieredCac
final sync = locks.computeIfAbsent(key, (k)-> new ReentrantLock())
sync.lock()
try {
value = l1.synchronous().getIfPresent(key)
value = l1Get(key)
if (value != null) {
log.trace "Cache '${name}' L1 hit (b) - key=$key => value=$value"
return value
}

// Fallback to L2 cache
value = l2Get(key)
if (value != null) {
log.trace "Cache '${name}' L2 hit - key=$key => value=$value"
final entry = l2GetEntry(key)
if (entry != null) {
log.trace "Cache '${name}' L2 hit - key=$key => value=$entry.value"
// Rehydrate L1 cache
l1.synchronous().put(key, value)
return value
l1.put(key, entry)
return (V) entry.value
}

// still not value found, use loader function to fetch the value
if( value==null && loader!=null ) {
log.trace "Cache '${name}' invoking loader - key=$key"
value = loader.apply(key)
if( value!=null && cacheCondition.apply(value) ) {
l1.synchronous().put(key,value)
l2Put(key,value)
final ret = loader.apply(key)
value = ret?.v1
Duration ttl = ret?.v2
if( value!=null && ttl!=null ) {
l1Put(key, value, ttl)
l2Put(key, value, ttl)
}
}

Expand All @@ -180,41 +198,61 @@ abstract class AbstractTieredCache<V extends MoshiExchange> implements TieredCac
}

@Override
void put(String key, V value) {
void put(String key, V value, Duration ttl) {
assert key!=null, "Cache key argument cannot be null"
assert value!=null, "Cache value argument cannot be null"
log.trace "Cache '${name}' putting - key=$key; value=${value}"
l1.synchronous().put(key, value)
l2Put(key, value)
l1Put(key, value, ttl)
l2Put(key, value, ttl)
}

protected String key0(String k) { return getPrefix() + ':' + k }

protected V l2Get(String key) {
protected V l1Get(String key) {
final entry = l1.getIfPresent(key)
if( entry == null )
return null

if( System.currentTimeMillis() > entry.expiresAt ) {
log.trace "Cache '${name}' L1 exipired - key=$key => value=${entry.value}"
return null
}
return (V) entry.value
}

protected void l1Put(String key, V value, Duration ttl) {
l1.put(key, new Entry(value,ttl.toMillis()))
}

protected Entry l2GetEntry(String key) {
if( l2 == null )
return null

final raw = l2.get(key0(key))
if( raw == null )
return null

final Entry payload = encoder.decode(raw)
if( System.currentTimeMillis() > payload.expiresAt ) {
log.trace "Cache '${name}' L2 exipired - key=$key => value=${payload.value}"
final Entry entry = encoder.decode(raw)
if( System.currentTimeMillis() > entry.expiresAt ) {
log.trace "Cache '${name}' L2 exipired - key=$key => value=${entry}"
return null
}
return (V) payload.value
return entry
}

protected V l2Get(String key) {
return (V) l2GetEntry(key)?.value
}

protected void l2Put(String key, V value) {
protected void l2Put(String key, V value, Duration ttl) {
if( l2 != null ) {
final raw = encoder.encode(new Entry(value, ttl.toMillis() + System.currentTimeMillis()))
l2.put(key0(key), raw, ttl)
}
}

void invalidateAll() {
l1.synchronous().invalidateAll()
l1.invalidateAll()
}

}
14 changes: 0 additions & 14 deletions src/main/groovy/io/seqera/wave/store/cache/L2TieredCache.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,11 @@
*/

package io.seqera.wave.store.cache

import java.time.Duration

/**
* Define the interface for 2nd level tired cache
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
interface L2TieredCache<K,V> extends TieredCache<K,V> {


/**
* Add a value in the cache with the specified key. If a value already exists is overridden
* with the new value.
*
* @param key The key of the value to be added. {@code null} is not allowed.
* @param value The value to be added in the cache for the specified key. {@code null} is not allowed.
* @param ttl The value time-to-live, after which the value is automatically evicted.
*/
void put(K key, V value, Duration ttl)

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ class RedisL2TieredCache implements L2TieredCache<String,String> {
}
}

@Override
void put(String key, String value) {
put(key, value, null)
}

@Override
void put(String key, String value, Duration ttl) {
try( Jedis conn=pool.getResource() ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/

package io.seqera.wave.store.cache

import java.time.Duration

/**
* Base interface for tiered-cache system
*
Expand All @@ -38,7 +41,8 @@ interface TieredCache<K,V> {
*
* @param key The key of the value to be added. {@code null} is not allowed.
* @param value The value to be added in the cache for the specified key. {@code null} is not allowed.
* @param ttl The value time-to-live, after which the value is automatically evicted.
*/
void put(K key, V value)
void put(K key, V value, Duration ttl)

}
Loading

0 comments on commit abe6e9b

Please sign in to comment.