diff --git a/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy b/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy index 634e7cad3..3c7b1fdf5 100644 --- a/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/RegistryProxyController.groovy @@ -41,11 +41,11 @@ import io.micronaut.scheduling.annotation.ExecuteOn import io.seqera.wave.ErrorHandler import io.seqera.wave.configuration.HttpClientConfig import io.seqera.wave.core.RegistryProxyService -import io.seqera.wave.core.RegistryProxyService.DelegateResponse import io.seqera.wave.core.RouteHandler import io.seqera.wave.core.RoutePath import io.seqera.wave.exception.DockerRegistryException import io.seqera.wave.exchange.RegistryErrorResponse +import io.seqera.wave.proxy.DelegateResponse import io.seqera.wave.ratelimit.AcquireRequest import io.seqera.wave.ratelimit.RateLimiterService import io.seqera.wave.service.blob.BlobCacheService @@ -54,7 +54,6 @@ import io.seqera.wave.storage.DigestStore import io.seqera.wave.storage.DockerDigestStore import io.seqera.wave.storage.HttpDigestStore import io.seqera.wave.storage.Storage -import io.seqera.wave.util.Retryable import jakarta.inject.Inject import org.reactivestreams.Publisher import reactor.core.publisher.Mono @@ -274,7 +273,7 @@ class RegistryProxyController { final resp = proxyService.handleRequest(route, headers) HttpResponse .status(HttpStatus.valueOf(resp.statusCode)) - .body(resp.body.bytes) + .body(resp.body) .headers(toMutableHeaders(resp.headers)) } @@ -348,14 +347,9 @@ class RegistryProxyController { } MutableHttpResponse fromContentResponse(DelegateResponse resp, RoutePath route) { - // create the retry logic on error § - final retryable = Retryable - .of(httpConfig) - .onRetry((event) -> log.warn("Unable to read manifest body - request: $route; event: $event")) - HttpResponse .status(HttpStatus.valueOf(resp.statusCode)) - .body(retryable.apply(()-> resp.body.bytes)) + .body(resp.body) .headers(toMutableHeaders(resp.headers)) } diff --git a/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy b/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy index 01ed6bd94..23b88ee17 100644 --- a/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy +++ b/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy @@ -20,8 +20,8 @@ package io.seqera.wave.core import java.util.concurrent.CompletableFuture +import com.google.common.hash.Hashing import groovy.transform.CompileStatic -import groovy.transform.ToString import groovy.util.logging.Slf4j import io.micronaut.cache.annotation.Cacheable import io.micronaut.context.annotation.Context @@ -36,6 +36,8 @@ import io.seqera.wave.auth.RegistryLookupService import io.seqera.wave.configuration.HttpClientConfig import io.seqera.wave.http.HttpClientFactory import io.seqera.wave.model.ContainerCoordinates +import io.seqera.wave.proxy.DelegateResponse +import io.seqera.wave.proxy.ProxyCache import io.seqera.wave.proxy.ProxyClient import io.seqera.wave.service.CredentialsService import io.seqera.wave.service.builder.BuildRequest @@ -44,6 +46,7 @@ import io.seqera.wave.storage.DigestStore import io.seqera.wave.storage.Storage import io.seqera.wave.tower.PlatformId import io.seqera.wave.util.RegHelper +import io.seqera.wave.util.Retryable import jakarta.inject.Inject import jakarta.inject.Singleton import reactor.core.publisher.Flux @@ -91,6 +94,9 @@ class RegistryProxyService { @Client("stream-client") private ReactorStreamingHttpClient streamClient + @Inject + private ProxyCache cache + private ContainerAugmenter scanner(ProxyClient proxyClient) { return new ContainerAugmenter() .withStorage(storage) @@ -141,7 +147,31 @@ class RegistryProxyService { } } - DelegateResponse handleRequest(RoutePath route, Map> headers){ + static protected String requestKey(RoutePath route, Map> headers) { + final hasher = Hashing.sipHash24().newHasher() + hasher.putUnencodedChars(route.stableHash()) + hasher.putUnencodedChars('/') + for( Map.Entry> entry : (headers ?: Map.of()) ) { + hasher.putUnencodedChars(entry.key) + for( String it : entry.value ) { + if( it ) + hasher.putUnencodedChars(it) + hasher.putUnencodedChars('/') + } + hasher.putUnencodedChars('/') + } + return hasher.hash().toString() + } + + DelegateResponse handleRequest(RoutePath route, Map> headers) { + final resp = cache.getOrCompute( + requestKey(route, headers), + (it)-> handleRequest0(route, headers), + (resp)-> route.isDigest() && resp.isCacheable() ) + return resp + } + + private DelegateResponse handleRequest0(RoutePath route, Map> headers) { ProxyClient proxyClient = client(route) final resp1 = proxyClient.getStream(route.path, headers, false) final redirect = resp1.headers().firstValue('Location').orElse(null) @@ -182,10 +212,15 @@ class RegistryProxyService { // otherwise read it and include the body input stream in the response // the caller must consume and close the body to prevent memory leaks else { + // create the retry logic on error § + final retryable = Retryable + .of(httpConfig) + .onRetry((event) -> log.warn("Unable to read blob body - request: $route; event: $event")) + // read the body and compose the response return new DelegateResponse( statusCode: resp1.statusCode(), headers: resp1.headers().map(), - body: resp1.body() ) + body: retryable.apply(()-> resp1.body().bytes) ) } } @@ -226,15 +261,6 @@ class RegistryProxyService { return result } - @ToString(includeNames = true, includePackage = false) - static class DelegateResponse { - int statusCode - Map> headers - InputStream body - String location - boolean isRedirect() { location } - } - Flux> streamBlob(RoutePath route, Map> headers) { ProxyClient proxyClient = client(route) return proxyClient.stream(streamClient, route.path, headers) diff --git a/src/main/groovy/io/seqera/wave/core/RoutePath.groovy b/src/main/groovy/io/seqera/wave/core/RoutePath.groovy index caeb8b448..dad184f14 100644 --- a/src/main/groovy/io/seqera/wave/core/RoutePath.groovy +++ b/src/main/groovy/io/seqera/wave/core/RoutePath.groovy @@ -27,6 +27,7 @@ import io.micronaut.core.annotation.Nullable import io.seqera.wave.model.ContainerCoordinates import io.seqera.wave.service.request.ContainerRequest import io.seqera.wave.tower.PlatformId +import io.seqera.wave.util.RegHelper import static io.seqera.wave.WaveDefault.DOCKER_IO /** * Model a container registry route path @@ -150,4 +151,9 @@ class RoutePath implements ContainerPath { else throw new IllegalArgumentException("Not a valid container path - offending value: '$location'") } + + String stableHash() { + RegHelper.sipHash(type, registry, path, image, reference, identity.stableHash()) + } + } diff --git a/src/test/groovy/io/seqera/wave/tower/client/TowerClientTest.groovy b/src/main/groovy/io/seqera/wave/proxy/DelegateResponse.groovy similarity index 54% rename from src/test/groovy/io/seqera/wave/tower/client/TowerClientTest.groovy rename to src/main/groovy/io/seqera/wave/proxy/DelegateResponse.groovy index c1a32b317..fcbbc4189 100644 --- a/src/test/groovy/io/seqera/wave/tower/client/TowerClientTest.groovy +++ b/src/main/groovy/io/seqera/wave/proxy/DelegateResponse.groovy @@ -16,32 +16,22 @@ * along with this program. If not, see . */ -package io.seqera.wave.tower.client +package io.seqera.wave.proxy -import spock.lang.Specification +import groovy.transform.ToString +import io.seqera.wave.encoder.MoshiExchange /** - * + * Model a response object to be forwarded to the client + * * @author Paolo Di Tommaso */ -class TowerClientTest extends Specification { - - def 'should create consistent hash' () { - given: - def client = new TowerClient() - - expect: - client.makeKey('a') == '92cf27ac76c18d8e' - and: - client.makeKey('a') == client.makeKey('a') - and: - client.makeKey('a','b','c') == client.makeKey('a','b','c') - and: - client.makeKey('a','b',null) == client.makeKey('a','b',null) - and: - client.makeKey(new URI('http://foo.com')) == client.makeKey('http://foo.com') - and: - client.makeKey(100l) == client.makeKey('100') - } - +@ToString(includeNames = true, includePackage = false) +class DelegateResponse implements MoshiExchange { + int statusCode + Map> headers + byte[] body + String location + boolean isRedirect() { location } + boolean isCacheable() { location!=null || (body!=null && statusCode>=200 && statusCode<400) } } diff --git a/src/main/groovy/io/seqera/wave/proxy/ProxyCache.groovy b/src/main/groovy/io/seqera/wave/proxy/ProxyCache.groovy new file mode 100644 index 000000000..e99db9fae --- /dev/null +++ b/src/main/groovy/io/seqera/wave/proxy/ProxyCache.groovy @@ -0,0 +1,62 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.proxy + +import java.time.Duration + +import com.squareup.moshi.adapters.PolymorphicJsonAdapterFactory +import groovy.transform.CompileStatic +import io.micronaut.context.annotation.Value +import io.micronaut.core.annotation.Nullable +import io.seqera.wave.encoder.MoshiEncodeStrategy +import io.seqera.wave.encoder.MoshiExchange +import io.seqera.wave.store.cache.AbstractTieredCache +import io.seqera.wave.store.cache.L2TieredCache +import jakarta.inject.Singleton +/** + * Implements a tiered cache for proxied http responses + * + * @author Paolo Di Tommaso + */ +@Singleton +@CompileStatic +class ProxyCache extends AbstractTieredCache { + ProxyCache(@Nullable L2TieredCache l2, + @Value('${wave.proxy-cache.duration:1h}') Duration duration, + @Value('${wave.proxy-cache.max-size:10000}') long maxSize) { + super(l2, encoder(), duration, maxSize) + } + + static MoshiEncodeStrategy encoder() { + // json adapter factory + final factory = PolymorphicJsonAdapterFactory.of(MoshiExchange.class, "@type") + .withSubtype(Entry.class, Entry.name) + .withSubtype(DelegateResponse.class, DelegateResponse.simpleName) + // the encoding strategy + return new MoshiEncodeStrategy(factory) {} + } + + String getName() { + 'proxy-cache' + } + + String getPrefix() { + 'proxy-cache/v1' + } +} diff --git a/src/main/groovy/io/seqera/wave/service/stream/StreamServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/stream/StreamServiceImpl.groovy index e2b1c69ee..d321d1c43 100644 --- a/src/main/groovy/io/seqera/wave/service/stream/StreamServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/stream/StreamServiceImpl.groovy @@ -100,7 +100,7 @@ class StreamServiceImpl implements StreamService { // when it's a response with a binary body, just return it if( resp.body!=null ) { log.debug "Streaming response body for route: $route" - return resp.body + return new ByteArrayInputStream(resp.body) } // otherwise cache the blob and stream the resulting uri if( blobCacheService ) { diff --git a/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy b/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy index 1d110edab..7af90599c 100644 --- a/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy +++ b/src/main/groovy/io/seqera/wave/store/cache/AbstractTieredCache.groovy @@ -62,10 +62,12 @@ abstract class AbstractTieredCache implements TieredCac private L2TieredCache l2 - private final Lock sync = new ReentrantLock() + private final WeakHashMap locks = new WeakHashMap<>() AbstractTieredCache(L2TieredCache l2, MoshiEncodeStrategy encoder, Duration duration, long maxSize) { - log.info "Cache '${getName()}' config - prefix=${getPrefix()}; ttl=${duration}; max-size: ${maxSize}; l2=${l2}" + log.info "Cache '${getName()}' config - prefix=${getPrefix()}; ttl=${duration}; max-size: ${maxSize}" + if( l2==null ) + log.warn "Missing L2 cache for tiered cache '${getName()}'" this.l2 = l2 this.ttl = duration this.encoder = encoder @@ -89,12 +91,49 @@ abstract class AbstractTieredCache implements TieredCac } } + /** + * Retrieve the value associated with the specified key + * + * @param key + * The key of the value to be retrieved + * @return + * The value associated with the specified key, or {@code null} otherwise + */ @Override V get(String key) { - getOrCompute(key, null) + getOrCompute(key, null, (v)->true) } + /** + * Retrieve the value associated with the specified key + * + * @param key + * The key of the value to be retrieved + * @param loader + * A function invoked to load the value the entry with the specified key is not available + * @return + * The value associated with the specified key, or {@code null} otherwise + */ V getOrCompute(String key, Function loader) { + getOrCompute(key, loader, (v)->true) + } + + /** + * Retrieve the value associated with the specified key + * + * @param key + * The key of the value to be retrieved + * @param loader + * The function invoked to load the value the entry with the specified key is not available + * @param cacheCondition + * The function to determine if the loaded value should be cached + * @return + * The value associated with the specified key, or #function result otherwise + */ + V getOrCompute(String key, Function loader, Function cacheCondition) { + assert key!=null, "Argument key cannot be null" + assert cacheCondition!=null, "Argument condition cannot be null" + log.trace "Cache '${name}' checking key=$key" // Try L1 cache first V value = l1.synchronous().getIfPresent(key) @@ -103,6 +142,7 @@ abstract class AbstractTieredCache implements TieredCac return value } + final sync = locks.computeIfAbsent(key, (k)-> new ReentrantLock()) sync.lock() try { value = l1.synchronous().getIfPresent(key) @@ -124,7 +164,7 @@ abstract class AbstractTieredCache implements TieredCac if( value==null && loader!=null ) { log.trace "Cache '${name}' invoking loader - key=$key" value = loader.apply(key) - if( value!=null ) { + if( value!=null && cacheCondition.apply(value) ) { l1.synchronous().put(key,value) l2Put(key,value) } diff --git a/src/main/groovy/io/seqera/wave/tower/PlatformId.groovy b/src/main/groovy/io/seqera/wave/tower/PlatformId.groovy index e115b3d07..887432e60 100644 --- a/src/main/groovy/io/seqera/wave/tower/PlatformId.groovy +++ b/src/main/groovy/io/seqera/wave/tower/PlatformId.groovy @@ -22,6 +22,7 @@ import groovy.transform.Canonical import groovy.transform.CompileStatic import io.seqera.wave.api.ContainerInspectRequest import io.seqera.wave.api.SubmitContainerTokenRequest +import io.seqera.wave.util.RegHelper import io.seqera.wave.util.StringUtils /** @@ -80,4 +81,14 @@ class PlatformId { ", workflowId=" + workflowId + ')'; } + + String stableHash() { + RegHelper.sipHash( + getUserId(), + getUserEmail(), + workspaceId, + accessToken, + towerEndpoint, + workflowId ) + } } diff --git a/src/main/groovy/io/seqera/wave/tower/client/TowerClient.groovy b/src/main/groovy/io/seqera/wave/tower/client/TowerClient.groovy index 72d55e29a..ef616f7ed 100644 --- a/src/main/groovy/io/seqera/wave/tower/client/TowerClient.groovy +++ b/src/main/groovy/io/seqera/wave/tower/client/TowerClient.groovy @@ -20,7 +20,6 @@ package io.seqera.wave.tower.client import java.util.concurrent.CompletableFuture -import com.google.common.hash.Hashing import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import io.micronaut.core.annotation.Nullable @@ -29,6 +28,7 @@ import io.seqera.wave.tower.client.cache.ClientCacheLong import io.seqera.wave.tower.client.cache.ClientCacheShort import io.seqera.wave.tower.client.connector.TowerConnector import io.seqera.wave.tower.compute.DescribeWorkflowLaunchResponse +import io.seqera.wave.util.RegHelper import jakarta.inject.Inject import jakarta.inject.Singleton import org.apache.commons.lang3.StringUtils @@ -68,7 +68,7 @@ class TowerClient { UserInfoResponse userInfo(String towerEndpoint, JwtAuth authorization) { final uri = userInfoEndpoint(towerEndpoint) - final k = makeKey(uri, authorization.key, null, null) + final k = RegHelper.sipHash(uri, authorization.key, null, null) // NOTE: it assumes the user info metadata does nor change over time // and therefore the *long* expiration cached is used get0(uri, towerEndpoint, authorization, UserInfoResponse, k, CacheMode.LONG) as UserInfoResponse @@ -76,7 +76,7 @@ class TowerClient { ListCredentialsResponse listCredentials(String towerEndpoint, JwtAuth authorization, Long workspaceId, String workflowId) { final uri = listCredentialsEndpoint(towerEndpoint, workspaceId) - final k = makeKey(uri, authorization.key, workspaceId, workflowId) + final k = RegHelper.sipHash(uri, authorization.key, workspaceId, workflowId) // NOTE: when the 'workflowId' is provided it assumes credentials will not change during // the workflow execution and therefore the *long* expiration cached is used final mode = workflowId ? CacheMode.LONG : CacheMode.SHORT @@ -85,7 +85,7 @@ class TowerClient { GetCredentialsKeysResponse fetchEncryptedCredentials(String towerEndpoint, JwtAuth authorization, String credentialsId, String pairingId, Long workspaceId, String workflowId) { final uri = fetchCredentialsEndpoint(towerEndpoint, credentialsId, pairingId, workspaceId) - final k = makeKey(uri, authorization.key, workspaceId, workflowId) + final k = RegHelper.sipHash(uri, authorization.key, workspaceId, workflowId) // NOTE: when the 'workflowId' is provided it assumes credentials will not change during // the workflow execution and therefore the *long* expiration cached is used final mode = workflowId ? CacheMode.LONG : CacheMode.SHORT @@ -129,7 +129,7 @@ class TowerClient { DescribeWorkflowLaunchResponse describeWorkflowLaunch(String towerEndpoint, JwtAuth authorization, String workflowId) { final uri = workflowLaunchEndpoint(towerEndpoint,workflowId) - final k = makeKey(uri, authorization.key, null, workflowId) + final k = RegHelper.sipHash(uri, authorization.key, null, workflowId) // NOTE: it assumes the workflow launch definition cannot change for the specified 'workflowId' // and therefore the *long* expiration cached is used return get0(uri, towerEndpoint, authorization, DescribeWorkflowLaunchResponse.class, k, CacheMode.LONG) as DescribeWorkflowLaunchResponse @@ -139,16 +139,6 @@ class TowerClient { return URI.create("${checkEndpoint(towerEndpoint)}/workflow/${workflowId}/launch") } - protected String makeKey(Object... keys) { - final h = Hashing.sipHash24().newHasher() - for( Object it : keys ) { - if( it!=null ) - h.putUnencodedChars(it.toString()) - h.putUnencodedChars('/') - } - return h.hash() - } - /** Only for testing - do not use */ protected void invalidateCache() { cacheLong.invalidateAll() diff --git a/src/main/groovy/io/seqera/wave/util/RegHelper.groovy b/src/main/groovy/io/seqera/wave/util/RegHelper.groovy index ed55907d7..4ffc5a225 100644 --- a/src/main/groovy/io/seqera/wave/util/RegHelper.groovy +++ b/src/main/groovy/io/seqera/wave/util/RegHelper.groovy @@ -203,6 +203,20 @@ class RegHelper { return hasher.hash().toString() } + static String sipHash(Object... keys) { + if( keys == null ) + throw new IllegalArgumentException("Missing argument for sipHash method") + + final hasher = Hashing.sipHash24().newHasher() + for( Object it : keys ) { + if( it!=null ) + hasher.putUnencodedChars(it.toString()) + hasher.putUnencodedChars(Character.toString(0x1C)) + } + hasher.putUnencodedChars(Character.toString(0x1E)) + return hasher.hash() + } + static String layerName(ContainerLayer layer) { return "layer-${layer.gzipDigest.replace(/sha256:/,'')}.tar.gz" } diff --git a/src/test/groovy/io/seqera/wave/core/RegistryProxyServiceTest.groovy b/src/test/groovy/io/seqera/wave/core/RegistryProxyServiceTest.groovy index 68674e2fb..17c113c58 100644 --- a/src/test/groovy/io/seqera/wave/core/RegistryProxyServiceTest.groovy +++ b/src/test/groovy/io/seqera/wave/core/RegistryProxyServiceTest.groovy @@ -68,4 +68,31 @@ class RegistryProxyServiceTest extends Specification { then: resp1 == 'sha256:d5c169e210d6b789b2dc7eced66471cf4ce2c7260ac7299fbef464ff902086be' } + + def 'should return a stable hash' () { + given: + def p1 = RoutePath.parse('quay.io/v2/ubuntu/blobs/sha256:123') + + when: + def k1 = RegistryProxyService.requestKey(p1, null) + then: + k1 == 'b133b45b6c64b652' + + when: + def k2 = RegistryProxyService.requestKey(p1, [:]) + then: + k2 == 'b133b45b6c64b652' + + when: + def k3 = RegistryProxyService.requestKey(p1, ['Content-Type': ['text/1']]) + then: + k3 == '654ce7450c0a9c35' + + when: + def k4 = RegistryProxyService.requestKey(p1, ['Content-Type': ['text/1', 'text/2']]) + then: + k4 == 'afabdb14b217efd4' + + } + } diff --git a/src/test/groovy/io/seqera/wave/core/RoutePathTest.groovy b/src/test/groovy/io/seqera/wave/core/RoutePathTest.groovy index 1bf1902fe..6ca5c63b9 100644 --- a/src/test/groovy/io/seqera/wave/core/RoutePathTest.groovy +++ b/src/test/groovy/io/seqera/wave/core/RoutePathTest.groovy @@ -18,9 +18,11 @@ package io.seqera.wave.core +import spock.lang.Shared import spock.lang.Specification import spock.lang.Unroll +import io.seqera.wave.api.SubmitContainerTokenRequest import io.seqera.wave.model.ContainerCoordinates import io.seqera.wave.service.request.ContainerRequest import io.seqera.wave.tower.PlatformId @@ -151,4 +153,26 @@ class RoutePathTest extends Specification { route2.request.containerImage == 'ubuntu:latest' route2.identity == new PlatformId(new User(id: 100)) } + + @Shared + def ID1 = PlatformId.of(new User(id:1), Mock(SubmitContainerTokenRequest)) + @Shared + def ID2 = PlatformId.of(new User(id:2), Mock(SubmitContainerTokenRequest)) + + @Unroll + def 'should return immutable hash' () { + expect: + RoutePath.parse(GIVEN, ID).stableHash() == EXPECTED + + where: + GIVEN | ID | EXPECTED + '/v2/hello-world/manifests/latest' | null | 'f565ecdba5885ced' + '/v2/hello-world/manifests/sha256:123' | null | 'a5fd596b535678f0' + '/v2/hello-world/blobs/latest' | null | '0baf67ef8cb144c2' + 'docker.io/v2/hello-world/blobs/latest' | null | '0baf67ef8cb144c2' // <- same as above because default to docker.io when registry is omitted + 'quay.io/v2/hello-world/blobs/latest' | null | '52907f8c52f2fbe0' + 'quay.io/v2/hello-world/blobs/latest' | ID1 | '9b3eb558c666828e' + 'quay.io/v2/hello-world/blobs/latest' | ID2 | '492e93611079e4d9' + + } } diff --git a/src/test/groovy/io/seqera/wave/proxy/ProxyCacheTest.groovy b/src/test/groovy/io/seqera/wave/proxy/ProxyCacheTest.groovy new file mode 100644 index 000000000..2881fe70b --- /dev/null +++ b/src/test/groovy/io/seqera/wave/proxy/ProxyCacheTest.groovy @@ -0,0 +1,68 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.proxy + +import spock.lang.Shared +import spock.lang.Specification + +import java.time.Duration + +import io.micronaut.context.ApplicationContext +import io.seqera.wave.store.cache.RedisL2TieredCache +import io.seqera.wave.test.RedisTestContainer +/** + * + * @author Paolo Di Tommaso + */ +class ProxyCacheTest extends Specification implements RedisTestContainer { + + @Shared + ApplicationContext applicationContext + + def setup() { + applicationContext = ApplicationContext.run([ + REDIS_HOST : redisHostName, + REDIS_PORT : redisPort + ], 'test', 'redis') + sleep(500) // workaround to wait for Redis connection + } + + def cleanup() { + applicationContext.close() + } + + def 'should cache user info response' () { + given: + def AWAIT = 150 + def store = applicationContext.getBean(RedisL2TieredCache) + def cache = new ProxyCache(store, Duration.ofMillis(AWAIT), 100) + and: + def k = UUID.randomUUID().toString() + def resp = new DelegateResponse(location: 'http://foo.com', + statusCode: 200, + body: new byte[] { 1,2,3 } + ) + + when: + cache.put(k, resp) + then: + cache.get(k) == resp + } + +} diff --git a/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy b/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy index 3b313483d..13d5cec10 100644 --- a/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy +++ b/src/test/groovy/io/seqera/wave/store/cache/AbstractTieredCacheTest.groovy @@ -143,4 +143,31 @@ class AbstractTieredCacheTest extends Specification implements RedisTestContaine cache2.get(k) == null } + def 'should get or compute a value with condition' () { + given: + def AWAIT = 150 + def store = applicationContext.getBean(RedisL2TieredCache) + def cache = new MyCache(store, Duration.ofMillis(AWAIT), 100) + and: + def k1 = UUID.randomUUID().toString() + def k2 = UUID.randomUUID().toString() + + expect: + cache.get(k1) == null + + when: + Entry r1 = cache.getOrCompute(k1, (it)-> new Entry(it+'1', it+'2'), (v)-> true) + then: + r1 == new Entry(k1+'1', k1+'2') + then: + cache.get(k1) == new Entry(k1+'1', k1+'2') + + when: + Entry r2 = cache.getOrCompute(k2, (it)-> new Entry(it+'3', it+'4'), (v)-> false) + then: + r2 == new Entry(k2+'3', k2+'4') + then: + cache.get(k2) == null + } + } diff --git a/src/test/groovy/io/seqera/wave/tower/PlatformIdTest.groovy b/src/test/groovy/io/seqera/wave/tower/PlatformIdTest.groovy index ed7676610..72ed418a8 100644 --- a/src/test/groovy/io/seqera/wave/tower/PlatformIdTest.groovy +++ b/src/test/groovy/io/seqera/wave/tower/PlatformIdTest.groovy @@ -84,4 +84,57 @@ class PlatformIdTest extends Specification { id1.hashCode() != id2.hashCode() id3.hashCode() == id1.hashCode() } + + def 'should return stable hash' () { + given: + def id1 = PlatformId.of(new User(id:1, email: 'p@foo.com'), Mock(SubmitContainerTokenRequest)) + + def id2 = PlatformId.of(new User(id:2, email: 'p@foo.com'), Mock(SubmitContainerTokenRequest)) + + def id3 = PlatformId.of(new User(id:1, email: 'p@foo.com'), new SubmitContainerTokenRequest( + towerEndpoint: 'http://foo.com', + towerAccessToken: 'token-123', + towerRefreshToken: 'refresh-123', + towerWorkspaceId: 100 )) + and: + def id4 = PlatformId.of(new User(id:1, email: 'p@foo.com'), new SubmitContainerTokenRequest( + towerEndpoint: 'http://bar.com', // <-- change endpoint + towerAccessToken: 'token-123', + towerRefreshToken: 'refresh-123', + towerWorkspaceId: 100 )) + and: + def id5 = PlatformId.of(new User(id:1, email: 'p@foo.com'), new SubmitContainerTokenRequest( + towerEndpoint: 'http://foo.com', + towerAccessToken: 'token-789', // <-- change token + towerRefreshToken: 'refresh-123', + towerWorkspaceId: 100 )) + + def id6 = PlatformId.of(new User(id:1, email: 'p@foo.com'), new SubmitContainerTokenRequest( + towerEndpoint: 'http://foo.com', + towerAccessToken: 'token-123', + towerRefreshToken: 'refresh-xxx', // <-- change refresh, does not affect cache + towerWorkspaceId: 100 )) + + def id7 = PlatformId.of(new User(id:1, email: 'p@foo.com'), new SubmitContainerTokenRequest( + towerEndpoint: 'http://foo.com', + towerAccessToken: 'token-123', + towerRefreshToken: 'refresh-123', + towerWorkspaceId: 200 )) // <-- change workspace id + + expect: + id1.stableHash() == 'a81eac1325c75af4' + and: + id2.stableHash() == '0bdd37bce6961402' + and: + id3.stableHash() == '0a630e69cd59db4e' + and: + id4.stableHash() == 'bf4cd9423edd1a4e' + and: + id5.stableHash() == 'b1977315b3edd1fc' + and: + id6.stableHash() == '0a630e69cd59db4e' + and: + id7.stableHash() == 'bb346b2662dc1696' + } + } diff --git a/src/test/groovy/io/seqera/wave/util/RegHelperTest.groovy b/src/test/groovy/io/seqera/wave/util/RegHelperTest.groovy index 55bc7940b..d567dff05 100644 --- a/src/test/groovy/io/seqera/wave/util/RegHelperTest.groovy +++ b/src/test/groovy/io/seqera/wave/util/RegHelperTest.groovy @@ -26,6 +26,8 @@ import java.util.concurrent.CompletableFuture import com.google.common.hash.Hashing import groovy.util.logging.Slf4j import io.seqera.wave.test.ManifestConst +import io.seqera.wave.tower.client.TowerClient + /** * * @author Paolo Di Tommaso @@ -204,4 +206,23 @@ class RegHelperTest extends Specification { .hash() .toString() } + + + def 'should create consistent hash for open array' () { + given: + def client = new TowerClient() + + expect: + RegHelper.sipHash('a') == 'bcf5c2d233d23f0f' + and: + RegHelper.sipHash('a') == RegHelper.sipHash('a') + and: + RegHelper.sipHash('a','b','c') == RegHelper.sipHash('a','b','c') + and: + RegHelper.sipHash('a','b',null) == RegHelper.sipHash('a','b',null) + and: + RegHelper.sipHash(new URI('http://foo.com')) == RegHelper.sipHash('http://foo.com') + and: + RegHelper.sipHash(100l) == RegHelper.sipHash('100') + } }