diff --git a/buildSrc/src/main/kotlin/Dependencies.kt b/buildSrc/src/main/kotlin/Dependencies.kt index cd17ea858..f5d62145d 100644 --- a/buildSrc/src/main/kotlin/Dependencies.kt +++ b/buildSrc/src/main/kotlin/Dependencies.kt @@ -16,7 +16,9 @@ object Dependencies { val kotlinGradlePlugin = "org.jetbrains.kotlin:kotlin-gradle-plugin:1.4.10" val kotlinReflection = "org.jetbrains.kotlin:kotlin-reflect:1.4.10" val kotlinStdLib = "org.jetbrains.kotlin:kotlin-stdlib:1.4.10" - val kotlinxCoroutines = "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.5" + val kotlinxCoroutines = "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.3" + val kotlinxCoroutinesJdk8 = "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.4.3" + val kotlinxCoroutinesReactive = "org.jetbrains.kotlinx:kotlinx-coroutines-reactive:1.4.3" val ktlintVersion = "0.40.0" val loggingApi = "io.github.microutils:kotlin-logging:1.7.9" val mavenPublishGradlePlugin = "com.vanniktech:gradle-maven-publish-plugin:0.12.0" diff --git a/docs/guide/asynchronous_programming.md b/docs/guide/asynchronous_programming.md new file mode 100644 index 000000000..f045b4d2d --- /dev/null +++ b/docs/guide/asynchronous_programming.md @@ -0,0 +1,136 @@ +## Nonblocking I/O +The AWS SDK 2.x features [truly nonblocking asynchronous clients](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/asynchronous.html) that implement high +concurrency across a few threads. + +!!! warning "SDK 1.x uses blocking I/O" + The AWS SDK for Java 1.11.x has asynchronous clients that are wrappers around a thread pool and blocking synchronous clients that don’t provide the full benefit of nonblocking I/O. + +## Tempest Async APIs +Tempest for SDK 2.x comes with async APIs that utilize Kotlin [coroutine](https://kotlinlang.org/docs/coroutines-overview.html) and Java [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html). + +Declare you DB and tables as `AsyncLogicalDb` and `AsyncLogicalTable`. + +=== "Kotlin - SDK 2.x" + + ```kotlin + interface AsyncMusicDb : AsyncLogicalDb { + @TableName("music_items") + val music: AsyncMusicTable + } + + interface AsyncMusicTable : AsyncLogicalTable { + val albumInfo: AsyncInlineView + val albumTracks: AsyncInlineView + + val playlistInfo: AsyncInlineView + + // Global Secondary Indexes. + val albumInfoByGenre: AsyncSecondaryIndex + val albumInfoByArtist: AsyncSecondaryIndex + + // Local Secondary Indexes. + val albumTracksByTitle: AsyncSecondaryIndex + } + ``` + +=== "Java - SDK 2.x" + + ```java + public interface AsyncMusicDb extends AsyncLogicalDb { + @TableName("music_items") + AsyncMusicTable music(); + } + + public interface AsyncMusicTable extends AsyncLogicalTable { + AsyncInlineView albumInfo(); + AsyncInlineView albumTracks(); + + AsyncInlineView playlistInfo(); + + // Global Secondary Indexes. + AsyncSecondaryIndex albumInfoByGenre(); + AsyncSecondaryIndex albumInfoByArtist(); + + // Local Secondary Indexes. + AsyncSecondaryIndex albumTracksByTitle(); + } + ``` + +Write familiar code that is asynchronous under the hood. + +=== "Kotlin - SDK 2.x" + + ```kotlin + private val table: AsyncMusicTable + + suspend fun changePlaylistName(playlistToken: String, newName: String) { + // Read. + val existing = checkNotNull( + table.playlistInfo.load(PlaylistInfo.Key(playlistToken)) // This is a suspend function. + ) { "Playlist does not exist: $playlistToken" } + // Modify. + val newPlaylist = existing.copy( + playlist_name = newName, + playlist_version = existing.playlist_version + 1 + ) + // Write. + table.playlistInfo.save( // This is a suspend function. + newPlaylist, + ifPlaylistVersionIs(existing.playlist_version) + ) + } + + private fun ifPlaylistVersionIs(playlist_version: Long): Expression { + return Expression.builder() + .expression("playlist_version = :playlist_version") + .expressionValues(mapOf(":playlist_version" to AttributeValue.builder().n("$playlist_version").build())) + .build() + } + ``` + +=== "Java - SDK 2.x" + + ```java + private final AsyncMusicTable table; + + public CompletableFuture changePlaylistName(String playlistToken, String newName) { + // Read. + return table.playlistInfo() + .loadAsync(new PlaylistInfo.Key(playlistToken)) + .thenCompose(existing -> { + if (existing == null) { + throw new IllegalStateException("Playlist does not exist: " + playlistToken); + } + // Modify. + PlaylistInfo newPlaylist = new PlaylistInfo( + existing.playlist_token, + newName, + existing.playlist_tracks, + // playlist_version. + existing.playlist_version + 1 + ); + // Write. + return table.playlistInfo() + .saveAsync( + newPlaylist, + ifPlaylistVersionIs(existing.playlist_version) + ); + }); + } + + private Expression ifPlaylistVersionIs(Long playlist_version) { + return Expression.builder() + .expression("playlist_version = :playlist_version") + .expressionValues( + Map.of(":playlist_version", AttributeValue.builder().n("" + playlist_version).build())) + .build(); + } + ``` + +--- + +Check out the code samples on Github: + +* Music Library - SDK 2.x ([.kt](https://github.com/cashapp/tempest/tree/master/samples/musiclibrary2/src/main/kotlin/app/cash/tempest2/musiclibrary), [.java](https://github.com/cashapp/tempest/tree/master/samples/musiclibrary2/src/main/java/app/cash/tempest2/musiclibrary/java)) +* Asynchronous Programing - SDK 2.x ([.kt](https://github.com/cashapp/tempest/blob/master/samples/guides2/src/main/kotlin/app/cash/tempest2/guides/AsynchronousProgramming.kt), [.java](https://github.com/cashapp/tempest/blob/master/samples/guides2/src/main/java/app/cash/tempest2/guides/java/AsynchronousProgramming.java)) + \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index cf7668b95..3571958d0 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -64,6 +64,7 @@ nav: - 'Query & Scan': guide/query_scan.md - 'Transaction': guide/transaction.md - 'Testing': guide/testing.md + - 'Asynchronous Programming': guide/asynchronous_programming.md - 'DynamoDB Resources': guide/dynamodb_resources.md - 'Reference': diff --git a/samples/guides-junit4/build.gradle.kts b/samples/guides-junit4/build.gradle.kts index 9771fb15f..0dfbe7424 100644 --- a/samples/guides-junit4/build.gradle.kts +++ b/samples/guides-junit4/build.gradle.kts @@ -7,7 +7,6 @@ dependencies { implementation(project(":samples:musiclibrary")) implementation(project(":samples:urlshortener")) implementation(Dependencies.kotlinStdLib) - implementation(Dependencies.kotlinxCoroutines) testImplementation(Dependencies.assertj) testImplementation(Dependencies.junit4Api) diff --git a/samples/guides-junit5/build.gradle.kts b/samples/guides-junit5/build.gradle.kts index a2d443d59..36ad5494f 100644 --- a/samples/guides-junit5/build.gradle.kts +++ b/samples/guides-junit5/build.gradle.kts @@ -7,7 +7,6 @@ dependencies { implementation(project(":samples:musiclibrary")) implementation(project(":samples:urlshortener")) implementation(Dependencies.kotlinStdLib) - implementation(Dependencies.kotlinxCoroutines) testImplementation(Dependencies.assertj) testImplementation(Dependencies.junitApi) diff --git a/samples/guides2-junit4/build.gradle.kts b/samples/guides2-junit4/build.gradle.kts index e9ee4543f..e49a4365c 100644 --- a/samples/guides2-junit4/build.gradle.kts +++ b/samples/guides2-junit4/build.gradle.kts @@ -7,7 +7,6 @@ dependencies { implementation(project(":samples:musiclibrary2")) implementation(project(":samples:urlshortener2")) implementation(Dependencies.kotlinStdLib) - implementation(Dependencies.kotlinxCoroutines) testImplementation(Dependencies.assertj) testImplementation(Dependencies.junit4Api) diff --git a/samples/guides2-junit5/build.gradle.kts b/samples/guides2-junit5/build.gradle.kts index 9d651eac4..b6c3b0494 100644 --- a/samples/guides2-junit5/build.gradle.kts +++ b/samples/guides2-junit5/build.gradle.kts @@ -7,7 +7,6 @@ dependencies { implementation(project(":samples:musiclibrary2")) implementation(project(":samples:urlshortener2")) implementation(Dependencies.kotlinStdLib) - implementation(Dependencies.kotlinxCoroutines) testImplementation(Dependencies.assertj) testImplementation(Dependencies.junitApi) diff --git a/samples/guides2/src/main/java/app/cash/tempest2/guides/java/AsynchronousProgramming.java b/samples/guides2/src/main/java/app/cash/tempest2/guides/java/AsynchronousProgramming.java new file mode 100644 index 000000000..759361228 --- /dev/null +++ b/samples/guides2/src/main/java/app/cash/tempest2/guides/java/AsynchronousProgramming.java @@ -0,0 +1,50 @@ +package app.cash.tempest2.guides.java; + +import app.cash.tempest2.musiclibrary.java.AsyncMusicTable; +import app.cash.tempest2.musiclibrary.java.PlaylistInfo; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import software.amazon.awssdk.enhanced.dynamodb.Expression; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; + +public class AsynchronousProgramming { + + private final AsyncMusicTable table; + + public AsynchronousProgramming(AsyncMusicTable table) { + this.table = table; + } + + public CompletableFuture changePlaylistName(String playlistToken, String newName) { + // Read. + return table.playlistInfo() + .loadAsync(new PlaylistInfo.Key(playlistToken)) + .thenCompose(existing -> { + if (existing == null) { + throw new IllegalStateException("Playlist does not exist: " + playlistToken); + } + // Modify. + PlaylistInfo newPlaylist = new PlaylistInfo( + existing.playlist_token, + newName, + existing.playlist_tracks, + // playlist_version. + existing.playlist_version + 1 + ); + // Write. + return table.playlistInfo() + .saveAsync( + newPlaylist, + ifPlaylistVersionIs(existing.playlist_version) + ); + }); + } + + private Expression ifPlaylistVersionIs(Long playlist_version) { + return Expression.builder() + .expression("playlist_version = :playlist_version") + .expressionValues( + Map.of(":playlist_version", AttributeValue.builder().n("" + playlist_version).build())) + .build(); + } +} diff --git a/samples/guides2/src/main/kotlin/app/cash/tempest2/guides/AsynchronousProgramming.kt b/samples/guides2/src/main/kotlin/app/cash/tempest2/guides/AsynchronousProgramming.kt new file mode 100644 index 000000000..eef884d81 --- /dev/null +++ b/samples/guides2/src/main/kotlin/app/cash/tempest2/guides/AsynchronousProgramming.kt @@ -0,0 +1,35 @@ +package app.cash.tempest2.guides + +import app.cash.tempest2.musiclibrary.AsyncMusicTable +import app.cash.tempest2.musiclibrary.PlaylistInfo +import software.amazon.awssdk.enhanced.dynamodb.Expression +import software.amazon.awssdk.services.dynamodb.model.AttributeValue + +class AsynchronousProgramming( + private val table: AsyncMusicTable, +) { + + suspend fun changePlaylistName(playlistToken: String, newName: String) { + // Read. + val existing = checkNotNull( + table.playlistInfo.load(PlaylistInfo.Key(playlistToken)) // This is a suspend function. + ) { "Playlist does not exist: $playlistToken" } + // Modify. + val newPlaylist = existing.copy( + playlist_name = newName, + playlist_version = existing.playlist_version + 1 + ) + // Write. + table.playlistInfo.save( // This is a suspend function. + newPlaylist, + ifPlaylistVersionIs(existing.playlist_version) + ) + } + + private fun ifPlaylistVersionIs(playlist_version: Long): Expression { + return Expression.builder() + .expression("playlist_version = :playlist_version") + .expressionValues(mapOf(":playlist_version" to AttributeValue.builder().n("$playlist_version").build())) + .build() + } +} \ No newline at end of file diff --git a/samples/musiclibrary2/src/main/java/app/cash/tempest2/musiclibrary/java/AsyncMusicDb.java b/samples/musiclibrary2/src/main/java/app/cash/tempest2/musiclibrary/java/AsyncMusicDb.java new file mode 100644 index 000000000..e3b3d9722 --- /dev/null +++ b/samples/musiclibrary2/src/main/java/app/cash/tempest2/musiclibrary/java/AsyncMusicDb.java @@ -0,0 +1,25 @@ +/* + * Copyright 2021 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.cash.tempest2.musiclibrary.java; + +import app.cash.tempest2.AsyncLogicalDb; +import app.cash.tempest2.TableName; + +public interface AsyncMusicDb extends AsyncLogicalDb { + @TableName("j_music_items") + AsyncMusicTable music(); +} diff --git a/samples/musiclibrary2/src/main/java/app/cash/tempest2/musiclibrary/java/AsyncMusicTable.java b/samples/musiclibrary2/src/main/java/app/cash/tempest2/musiclibrary/java/AsyncMusicTable.java new file mode 100644 index 000000000..b173926a4 --- /dev/null +++ b/samples/musiclibrary2/src/main/java/app/cash/tempest2/musiclibrary/java/AsyncMusicTable.java @@ -0,0 +1,35 @@ +/* + * Copyright 2021 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.cash.tempest2.musiclibrary.java; + +import app.cash.tempest2.AsyncInlineView; +import app.cash.tempest2.AsyncLogicalTable; +import app.cash.tempest2.AsyncSecondaryIndex; + +public interface AsyncMusicTable extends AsyncLogicalTable { + AsyncInlineView albumInfo(); + AsyncInlineView albumTracks(); + + AsyncInlineView playlistInfo(); + + // Global Secondary Indexes. + AsyncSecondaryIndex albumInfoByGenre(); + AsyncSecondaryIndex albumInfoByArtist(); + + // Local Secondary Indexes. + AsyncSecondaryIndex albumTracksByTitle(); +} diff --git a/samples/musiclibrary2/src/main/kotlin/app/cash/tempest2/musiclibrary/AsyncMusicDb.kt b/samples/musiclibrary2/src/main/kotlin/app/cash/tempest2/musiclibrary/AsyncMusicDb.kt new file mode 100644 index 000000000..1008db627 --- /dev/null +++ b/samples/musiclibrary2/src/main/kotlin/app/cash/tempest2/musiclibrary/AsyncMusicDb.kt @@ -0,0 +1,9 @@ +package app.cash.tempest2.musiclibrary + +import app.cash.tempest2.AsyncLogicalDb +import app.cash.tempest2.TableName + +interface AsyncMusicDb : AsyncLogicalDb { + @TableName("music_items") + val music: AsyncMusicTable +} diff --git a/samples/musiclibrary2/src/main/kotlin/app/cash/tempest2/musiclibrary/AsyncMusicTable.kt b/samples/musiclibrary2/src/main/kotlin/app/cash/tempest2/musiclibrary/AsyncMusicTable.kt new file mode 100644 index 000000000..84641359d --- /dev/null +++ b/samples/musiclibrary2/src/main/kotlin/app/cash/tempest2/musiclibrary/AsyncMusicTable.kt @@ -0,0 +1,19 @@ +package app.cash.tempest2.musiclibrary + +import app.cash.tempest2.AsyncInlineView +import app.cash.tempest2.AsyncLogicalTable +import app.cash.tempest2.AsyncSecondaryIndex + +interface AsyncMusicTable : AsyncLogicalTable { + val albumInfo: AsyncInlineView + val albumTracks: AsyncInlineView + + val playlistInfo: AsyncInlineView + + // Global Secondary Indexes. + val albumInfoByGenre: AsyncSecondaryIndex + val albumInfoByArtist: AsyncSecondaryIndex + + // Local Secondary Indexes. + val albumTracksByTitle: AsyncSecondaryIndex +} diff --git a/tempest-internal/src/main/kotlin/app/cash/tempest/internal/ProxyFactory.kt b/tempest-internal/src/main/kotlin/app/cash/tempest/internal/ProxyFactory.kt index 06e91ce71..76416c826 100644 --- a/tempest-internal/src/main/kotlin/app/cash/tempest/internal/ProxyFactory.kt +++ b/tempest-internal/src/main/kotlin/app/cash/tempest/internal/ProxyFactory.kt @@ -22,7 +22,7 @@ import java.lang.reflect.Method import java.lang.reflect.Proxy import kotlin.reflect.KClass -class ProxyFactory { +object ProxyFactory { fun create( type: KClass, diff --git a/tempest/src/main/kotlin/app/cash/tempest/internal/LogicalDbFactory.kt b/tempest/src/main/kotlin/app/cash/tempest/internal/LogicalDbFactory.kt index 1ab76ae05..c776c2c15 100644 --- a/tempest/src/main/kotlin/app/cash/tempest/internal/LogicalDbFactory.kt +++ b/tempest/src/main/kotlin/app/cash/tempest/internal/LogicalDbFactory.kt @@ -37,7 +37,6 @@ internal class LogicalDbFactory( ) { private val logicalTableFactory = LogicalTableFactory() - private val proxyFactory: ProxyFactory = ProxyFactory() private val schema = Schema.create( V1StringAttributeValue, V1MapAttributeValue.Factory(dynamoDbMapper), @@ -61,7 +60,7 @@ internal class LogicalDbFactory( schema, logicalTableFactory ) - return proxyFactory.create(dbType, methodHandlers.toMap(), logicalDb) + return ProxyFactory.create(dbType, methodHandlers.toMap(), logicalDb) } private fun queryable( @@ -139,7 +138,7 @@ internal class LogicalDbFactory( } methodHandlers[member.javaMethod] = GetterMethodHandler(component) } - return proxyFactory.create(tableType, methodHandlers.toMap(), logicalTable) + return ProxyFactory.create(tableType, methodHandlers.toMap(), logicalTable) } } diff --git a/tempest2-testing-internal/src/main/kotlin/app/cash/tempest2/testing/internal/DefaultTestDynamoDbClient.kt b/tempest2-testing-internal/src/main/kotlin/app/cash/tempest2/testing/internal/DefaultTestDynamoDbClient.kt index cb382cbdf..e2de5808a 100644 --- a/tempest2-testing-internal/src/main/kotlin/app/cash/tempest2/testing/internal/DefaultTestDynamoDbClient.kt +++ b/tempest2-testing-internal/src/main/kotlin/app/cash/tempest2/testing/internal/DefaultTestDynamoDbClient.kt @@ -19,8 +19,10 @@ package app.cash.tempest2.testing.internal import app.cash.tempest2.testing.TestDynamoDbClient import app.cash.tempest2.testing.TestTable import com.google.common.util.concurrent.AbstractIdleService +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.dynamodb.DynamoDbClient import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest +import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsAsyncClient import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient class DefaultTestDynamoDbClient( @@ -30,15 +32,23 @@ class DefaultTestDynamoDbClient( override val dynamoDb: DynamoDbClient get() = requireNotNull(_dynamoDb) { "`dynamoDb` is only usable while the service is running" } + override val asyncDynamoDb: DynamoDbAsyncClient + get() = requireNotNull(_dynamoDbAsync) { "`dynamoDb` is only usable while the service is running" } override val dynamoDbStreams: DynamoDbStreamsClient get() = requireNotNull(_dynamoDbStreams) { "`dynamoDbStreams` is only usable while the service is running" } + override val asyncDynamoDbStreams: DynamoDbStreamsAsyncClient + get() = requireNotNull(_dynamoDbStreamsAsync) { "`dynamoDbStreams` is only usable while the service is running" } private var _dynamoDb: DynamoDbClient? = null + private var _dynamoDbAsync: DynamoDbAsyncClient? = null private var _dynamoDbStreams: DynamoDbStreamsClient? = null + private var _dynamoDbStreamsAsync: DynamoDbStreamsAsyncClient? = null override fun startUp() { _dynamoDb = connect(port) + _dynamoDbAsync = connectAsync(port) _dynamoDbStreams = connectToStreams(port) + _dynamoDbStreamsAsync = connectToStreamsAsync(port) // Cleans up the tables before each run. for (tableName in dynamoDb.listTables().tableNames()) { diff --git a/tempest2-testing-internal/src/main/kotlin/app/cash/tempest2/testing/internal/TestUtils.kt b/tempest2-testing-internal/src/main/kotlin/app/cash/tempest2/testing/internal/TestUtils.kt index 3a7312a33..c739408f5 100644 --- a/tempest2-testing-internal/src/main/kotlin/app/cash/tempest2/testing/internal/TestUtils.kt +++ b/tempest2-testing-internal/src/main/kotlin/app/cash/tempest2/testing/internal/TestUtils.kt @@ -23,8 +23,10 @@ import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient import software.amazon.awssdk.enhanced.dynamodb.TableSchema import software.amazon.awssdk.enhanced.dynamodb.model.CreateTableEnhancedRequest import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.dynamodb.DynamoDbClient import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput +import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsAsyncClient import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient import java.net.ServerSocket import java.net.URI @@ -47,6 +49,16 @@ fun connect(port: Int): DynamoDbClient { .build() } +fun connectAsync(port: Int): DynamoDbAsyncClient { + return DynamoDbAsyncClient.builder() + // The values that you supply for the AWS access key and the Region are only used to name + // the database file. + .credentialsProvider(AWS_CREDENTIALS_PROVIDER) + .region(Region.US_WEST_2) + .endpointOverride(URI.create("http://localhost:$port")) + .build() +} + fun connectToStreams(port: Int): DynamoDbStreamsClient { return DynamoDbStreamsClient.builder() // The values that you supply for the AWS access key and the Region are only used to name @@ -57,6 +69,16 @@ fun connectToStreams(port: Int): DynamoDbStreamsClient { .build() } +fun connectToStreamsAsync(port: Int): DynamoDbStreamsAsyncClient { + return DynamoDbStreamsAsyncClient.builder() + // The values that you supply for the AWS access key and the Region are only used to name + // the database file. + .credentialsProvider(AWS_CREDENTIALS_PROVIDER) + .region(Region.US_WEST_2) + .endpointOverride(URI.create("http://localhost:$port")) + .build() +} + fun DynamoDbClient.createTable( table: TestTable ) { diff --git a/tempest2-testing/src/main/kotlin/app/cash/tempest2/testing/TestDynamoDbClient.kt b/tempest2-testing/src/main/kotlin/app/cash/tempest2/testing/TestDynamoDbClient.kt index e3f75a971..3a7e16492 100644 --- a/tempest2-testing/src/main/kotlin/app/cash/tempest2/testing/TestDynamoDbClient.kt +++ b/tempest2-testing/src/main/kotlin/app/cash/tempest2/testing/TestDynamoDbClient.kt @@ -18,21 +18,32 @@ package app.cash.tempest2.testing import app.cash.tempest2.LogicalDb import com.google.common.util.concurrent.Service +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClientExtension +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.dynamodb.DynamoDbClient +import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsAsyncClient import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient import kotlin.reflect.KClass +typealias AsyncLogicalDb = app.cash.tempest2.AsyncLogicalDb + interface TestDynamoDbClient : Service { val tables: List /** A DynamoDB instance that is usable while this service is running. */ val dynamoDb: DynamoDbClient + /** A DynamoDB instance that is usable while this service is running. */ + val asyncDynamoDb: DynamoDbAsyncClient + /** A DynamoDB streams instance that is usable while this service is running. */ val dynamoDbStreams: DynamoDbStreamsClient + /** A DynamoDB streams instance that is usable while this service is running. */ + val asyncDynamoDbStreams: DynamoDbStreamsAsyncClient + fun logicalDb(type: KClass): DB { return logicalDb(type, emptyList()) } @@ -66,6 +77,40 @@ interface TestDynamoDbClient : Service { ): DB { return logicalDb(type.kotlin, extensions) } + + fun asyncLogicalDb(type: KClass): DB { + return asyncLogicalDb(type, emptyList()) + } + + fun asyncLogicalDb(type: KClass, vararg extensions: DynamoDbEnhancedClientExtension): DB { + return asyncLogicalDb(type, extensions.toList()) + } + + fun asyncLogicalDb( + type: KClass, + extensions: List + ): DB { + val enhancedClient = DynamoDbEnhancedAsyncClient.builder() + .dynamoDbClient(asyncDynamoDb) + .extensions(extensions) + .build() + return app.cash.tempest2.AsyncLogicalDb.create(type, enhancedClient) + } + + fun asyncLogicalDb(type: Class): DB { + return asyncLogicalDb(type.kotlin) + } + + fun asyncLogicalDb(type: Class, vararg extensions: DynamoDbEnhancedClientExtension): DB { + return asyncLogicalDb(type.kotlin, extensions.toList()) + } + + fun asyncLogicalDb( + type: Class, + extensions: List + ): DB { + return asyncLogicalDb(type.kotlin, extensions) + } } inline fun TestDynamoDbClient.logicalDb(vararg extensions: DynamoDbEnhancedClientExtension): DB { @@ -75,3 +120,11 @@ inline fun TestDynamoDbClient.logicalDb(vararg extensio inline fun TestDynamoDbClient.logicalDb(extensions: List): DB { return logicalDb(DB::class, extensions) } + +inline fun TestDynamoDbClient.asyncLogicalDb(vararg extensions: DynamoDbEnhancedClientExtension): DB { + return asyncLogicalDb(extensions.toList()) +} + +inline fun TestDynamoDbClient.asyncLogicalDb(extensions: List): DB { + return asyncLogicalDb(DB::class, extensions) +} diff --git a/tempest2/build.gradle.kts b/tempest2/build.gradle.kts index d3c8d1604..466930e6e 100644 --- a/tempest2/build.gradle.kts +++ b/tempest2/build.gradle.kts @@ -6,6 +6,9 @@ dependencies { api(Dependencies.aws2Dynamodb) api(Dependencies.aws2DynamodbEnhanced) api(Dependencies.findbugsJsr305) + api(Dependencies.kotlinxCoroutines) + implementation(Dependencies.kotlinxCoroutinesJdk8) + implementation(Dependencies.kotlinxCoroutinesReactive) implementation(project(":tempest-internal")) implementation(Dependencies.kotlinReflection) implementation(Dependencies.kotlinStdLib) diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/AsyncLogicalDb.kt b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncLogicalDb.kt new file mode 100644 index 000000000..16d2debd9 --- /dev/null +++ b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncLogicalDb.kt @@ -0,0 +1,221 @@ +/* + * Copyright 2021 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.cash.tempest2 + +import app.cash.tempest2.internal.AsyncLogicalDbFactory +import kotlinx.coroutines.future.await +import kotlinx.coroutines.reactive.awaitFirst +import org.reactivestreams.Publisher +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient +import software.amazon.awssdk.enhanced.dynamodb.extensions.annotations.DynamoDbVersionAttribute +import software.amazon.awssdk.services.dynamodb.DynamoDbClient +import java.util.concurrent.CompletableFuture +import javax.annotation.CheckReturnValue +import kotlin.reflect.KClass + +/** + * A collection of tables that implement the DynamoDB best practice of putting multiple + * item types into the same storage table. This makes it possible to perform aggregate operations + * and transactions on those item types. + */ +interface AsyncLogicalDb : AsyncLogicalTable.Factory { + + /** + * Retrieves multiple items from multiple tables using their primary keys. + * + * This method performs one or more calls to the [DynamoDbClient.batchGetItem] API. + * + * A single operation can retrieve up to 16 MB of data, which can contain as many as 100 items. + * BatchGetItem returns a partial result if the response size limit is exceeded, the table's + * provisioned throughput is exceeded, or an internal processing failure occurs. If a partial + * result is returned, this method backs off and retries the `UnprocessedKeys` in the next API + * call. + */ + suspend fun batchLoad( + keys: KeySet, + consistentReads: Boolean = false + ): ItemSet = batchLoadAsync(keys, consistentReads).awaitFirst() + + suspend fun batchLoad( + keys: Iterable, + consistentReads: Boolean = false + ): ItemSet { + return batchLoad(KeySet(keys), consistentReads) + } + + suspend fun batchLoad( + vararg keys: Any, + consistentReads: Boolean = false + ): ItemSet { + return batchLoad(keys.toList(), consistentReads) + } + + /** + * Saves and deletes the objects given using one or more calls to the + * [DynamoDbClient.batchWriteItem] API. **Callers should always check the returned + * [BatchWriteResult]** because this method returns normally even if some writes were not + * performed. + * + * This method does not support versioning annotations and behaves like [DynamoDbClient.putItem]. + * + * A single call to BatchWriteItem can write up to 16 MB of data, which can comprise as many as 25 + * put or delete requests. Individual items to be written can be as large as 400 KB. + * + * In order to improve performance with these large-scale operations, this does not behave + * in the same way as individual PutItem and DeleteItem calls would. For example, you cannot specify + * conditions on individual put and delete requests, and BatchWriteItem does not return deleted + * items in the response. + */ + @CheckReturnValue + suspend fun batchWrite( + writeSet: BatchWriteSet + ): BatchWriteResult = batchWriteAsync(writeSet).await() + + /** + * Transactionally loads objects specified by transactionLoadRequest by calling + * [DynamoDbClient.transactGetItems] API. + * + * A transaction cannot contain more than 25 unique items. + * A transaction cannot contain more than 4 MB of data. + * No two actions in a transaction can work against the same item in the same table. + */ + suspend fun transactionLoad(keys: KeySet): ItemSet = transactionLoadAsync(keys).await() + + suspend fun transactionLoad(keys: Iterable): ItemSet { + return transactionLoad(KeySet(keys)) + } + + suspend fun transactionLoad(vararg keys: Any): ItemSet { + return transactionLoad(keys.toList()) + } + + /** + * Transactionally writes objects specified by transactionWriteRequest by calling + * [DynamoDbClient.transactWriteItems] API. + * + * This method supports versioning annotations, but not in conjunction with condition expressions. + * It throws [software.amazon.awssdk.core.exception.SdkClientException] exception if class of + * any input object is annotated with [DynamoDbVersionAttribute] and a condition expression is + * also present. + * + * A transaction cannot contain more than 25 unique items, including conditions. + * A transaction cannot contain more than 4 MB of data. + * No two actions in a transaction can work against the same item in the same table. + * For example, you cannot both ConditionCheck and Update the same item in one transaction. + */ + suspend fun transactionWrite(writeSet: TransactionWriteSet) { + transactionWriteAsync(writeSet).await() + } + + companion object { + inline operator fun invoke( + dynamoDbEnhancedClient: DynamoDbEnhancedAsyncClient + ): DB { + return create(DB::class, dynamoDbEnhancedClient) + } + + fun create( + dbType: KClass, + dynamoDbEnhancedClient: DynamoDbEnhancedAsyncClient + ): DB { + return AsyncLogicalDbFactory(dynamoDbEnhancedClient).logicalDb(dbType) + } + + // Overloaded functions for Java callers (Kotlin interface companion objects do not support + // having @JvmStatic and `@JvmOverloads` at the same time). + // https://youtrack.jetbrains.com/issue/KT-35716 + + @JvmStatic + fun create( + dbType: Class, + dynamoDbEnhancedClient: DynamoDbEnhancedAsyncClient + ) = create(dbType.kotlin, dynamoDbEnhancedClient) + } + + // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). + + fun batchLoadAsync( + keys: KeySet, + consistentReads: Boolean + ): Publisher + + fun batchLoadAsync( + keys: Iterable, + consistentReads: Boolean + ) = batchLoadAsync(KeySet(keys), consistentReads) + + fun batchLoadAsync( + vararg keys: Any, + consistentReads: Boolean + ) = batchLoadAsync(keys.toList(), consistentReads) + + fun batchLoadAsync( + keys: Iterable + ) = batchLoadAsync(keys, consistentReads = false) + + fun batchWriteAsync( + writeSet: BatchWriteSet + ): CompletableFuture + + fun transactionLoadAsync(keys: KeySet): CompletableFuture + + fun transactionLoadAsync(keys: Iterable) = transactionLoadAsync(KeySet(keys)) + + fun transactionLoadAsync(vararg keys: Any) = transactionLoadAsync(keys.toList()) + + fun transactionWriteAsync(writeSet: TransactionWriteSet): CompletableFuture +} + +/** + * A collection of views on a DynamoDB table that makes it easy to model heterogeneous items + * using strongly typed data classes. + */ +interface AsyncLogicalTable : + AsyncView, + AsyncInlineView.Factory, + AsyncSecondaryIndex.Factory { + + /** [type] must be a key type or item type of one of the views of this table. */ + fun codec(type: KClass): Codec + + interface Factory { + fun , RI : Any> logicalTable( + tableName: String, + tableType: KClass + ): T + } +} + +interface AsyncInlineView : AsyncView, AsyncScannable, AsyncQueryable { + + interface Factory { + fun inlineView( + keyType: KClass, + itemType: KClass + ): AsyncInlineView + } +} + +interface AsyncSecondaryIndex : AsyncScannable, AsyncQueryable { + + interface Factory { + fun secondaryIndex( + keyType: KClass, + itemType: KClass + ): AsyncSecondaryIndex + } +} diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/AsyncQuery.kt b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncQuery.kt new file mode 100644 index 000000000..3d7cbc76c --- /dev/null +++ b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncQuery.kt @@ -0,0 +1,79 @@ +/* + * Copyright 2021 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.cash.tempest2 + +import kotlinx.coroutines.reactive.awaitFirst +import org.reactivestreams.Publisher +import software.amazon.awssdk.enhanced.dynamodb.Expression + +interface AsyncQueryable { + + /** + * Reads up to the [pageSize] items or a maximum of 1 MB of data. This limit applies before the + * filter expression is evaluated. + */ + suspend fun query( + keyCondition: KeyCondition, + asc: Boolean = true, + pageSize: Int = 100, + consistentRead: Boolean = false, + filterExpression: Expression? = null, + initialOffset: Offset? = null + ): Page = queryAsync(keyCondition, asc, pageSize, consistentRead, filterExpression, initialOffset).awaitFirst() + + // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). + + fun queryAsync( + keyCondition: KeyCondition, + asc: Boolean, + pageSize: Int, + consistentRead: Boolean, + filterExpression: Expression?, + initialOffset: Offset? + ): Publisher> + + fun queryAsync(keyCondition: KeyCondition) = queryAsync( + keyCondition, + config = QueryConfig.Builder().build(), + initialOffset = null + ) + + fun queryAsync(keyCondition: KeyCondition, initialOffset: Offset?) = queryAsync( + keyCondition, + config = QueryConfig.Builder().build(), + initialOffset = initialOffset + ) + + fun queryAsync(keyCondition: KeyCondition, config: QueryConfig) = queryAsync( + keyCondition, + config = config, + initialOffset = null + ) + + fun queryAsync( + keyCondition: KeyCondition, + config: QueryConfig, + initialOffset: Offset? + ) = queryAsync( + keyCondition, + config.asc, + config.pageSize, + config.consistentRead, + config.filterExpression, + initialOffset + ) +} diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/AsyncScan.kt b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncScan.kt new file mode 100644 index 000000000..c91929678 --- /dev/null +++ b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncScan.kt @@ -0,0 +1,65 @@ +/* + * Copyright 2021 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.cash.tempest2 + +import kotlinx.coroutines.reactive.awaitFirst +import org.reactivestreams.Publisher +import software.amazon.awssdk.enhanced.dynamodb.Expression + +interface AsyncScannable { + /** + * Scans up to the [pageSize] items or a maximum of 1 MB of data. This limit applies before the + * filter expression is evaluated. + */ + suspend fun scan( + pageSize: Int = 100, + consistentRead: Boolean = false, + filterExpression: Expression? = null, + initialOffset: Offset? = null + ) = scanAsync(pageSize, consistentRead, filterExpression, initialOffset).awaitFirst() + + // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). + + fun scanAsync( + pageSize: Int = 100, + consistentRead: Boolean = false, + filterExpression: Expression? = null, + initialOffset: Offset? = null + ): Publisher> + + fun scanAsync() = scanAsync( + ScanConfig.Builder().build(), + initialOffset = null + ) + + fun scanAsync(initialOffset: Offset?) = scanAsync( + ScanConfig.Builder().build(), + initialOffset = initialOffset + ) + + fun scanAsync(config: ScanConfig) = scanAsync( + config, + initialOffset = null + ) + + fun scanAsync(config: ScanConfig, initialOffset: Offset?) = scanAsync( + config.pageSize, + config.consistentRead, + config.filterExpression, + initialOffset + ) +} diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/AsyncView.kt b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncView.kt new file mode 100644 index 000000000..744b89a44 --- /dev/null +++ b/tempest2/src/main/kotlin/app/cash/tempest2/AsyncView.kt @@ -0,0 +1,99 @@ +/* + * Copyright 2021 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.cash.tempest2 + +import kotlinx.coroutines.future.await +import software.amazon.awssdk.enhanced.dynamodb.Expression +import software.amazon.awssdk.enhanced.dynamodb.extensions.VersionedRecordExtension +import software.amazon.awssdk.services.dynamodb.DynamoDbClient +import java.util.concurrent.CompletableFuture + +interface AsyncView { + /** + * Returns an item whose keys match those of the prototype key object given, or null if no + * such item exists. + */ + suspend fun load(key: K, consistentReads: Boolean = false): I? = loadAsync(key, consistentReads).await() + + /** + * Saves an item in DynamoDB. This method uses [DynamoDbClient.putItem] to clear + * and replace all attributes, including unmodeled ones, on save. Partial update, i.e. + * [DynamoDbClient.updateItem], is not supported yet. + * + * Any options specified in the [saveExpression] parameter will be overlaid on any constraints due + * to versioned attributes. + */ + suspend fun save( + item: I, + saveExpression: Expression? = null + ) = saveAsync(item, saveExpression).await() + + /** + * Deletes the item identified by [key] from its DynamoDB table using [deleteExpression]. Any + * options specified in the [deleteExpression] parameter will be overlaid on any constraints due + * to versioned attributes. + * + * If the item to be deleted has versioned attributes, load the item and use [delete] instead. + * For more information, see [VersionedRecordExtension]. + */ + suspend fun deleteKey( + key: K, + deleteExpression: Expression? = null + ) = deleteKeyAsync(key, deleteExpression).await() + + /** + * Deletes [item] from its DynamoDB table using [deleteExpression]. Any options specified in the + * [deleteExpression] parameter will be overlaid on any constraints due to versioned attributes. + */ + suspend fun delete( + item: I, + deleteExpression: Expression? = null + ) = deleteAsync(item, deleteExpression).await() + + // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). + + fun loadAsync(key: K, consistentReads: Boolean): CompletableFuture + + fun loadAsync(key: K) = loadAsync(key, false) + + fun saveAsync( + item: I, + saveExpression: Expression? + ): CompletableFuture + + fun saveAsync( + item: I + ) = saveAsync(item, saveExpression = null) + + fun deleteKeyAsync( + key: K, + deleteExpression: Expression? + ): CompletableFuture + + fun deleteKeyAsync( + key: K + ) = deleteKeyAsync(key, deleteExpression = null) + + fun deleteAsync( + item: I, + deleteExpression: Expression? + ): CompletableFuture + + fun deleteAsync( + item: I + ) = deleteAsync(item, deleteExpression = null) +} diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/View.kt b/tempest2/src/main/kotlin/app/cash/tempest2/View.kt index 148f031b7..6592752bf 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/View.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/View.kt @@ -51,7 +51,7 @@ interface View { fun deleteKey( key: K, deleteExpression: Expression? = null - ) + ): I? /** * Deletes [item] from its DynamoDB table using [deleteExpression]. Any options specified in the @@ -60,7 +60,7 @@ interface View { fun delete( item: I, deleteExpression: Expression? = null - ) + ): I? // Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`). diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/AsyncLogicalDbFactory.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/AsyncLogicalDbFactory.kt new file mode 100644 index 000000000..301ebfb80 --- /dev/null +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/AsyncLogicalDbFactory.kt @@ -0,0 +1,209 @@ +/* + * Copyright 2021 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.cash.tempest2.internal + +import app.cash.tempest.internal.Codec +import app.cash.tempest.internal.GetterMethodHandler +import app.cash.tempest.internal.ItemType +import app.cash.tempest.internal.KeyType +import app.cash.tempest.internal.MethodHandler +import app.cash.tempest.internal.ProxyFactory +import app.cash.tempest.internal.RawItemType +import app.cash.tempest.internal.Schema +import app.cash.tempest.internal.declaredMembers +import app.cash.tempest2.AsyncInlineView +import app.cash.tempest2.AsyncLogicalDb +import app.cash.tempest2.AsyncLogicalTable +import app.cash.tempest2.AsyncQueryable +import app.cash.tempest2.AsyncScannable +import app.cash.tempest2.AsyncSecondaryIndex +import app.cash.tempest2.AsyncView +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient +import software.amazon.awssdk.enhanced.dynamodb.TableSchema +import java.lang.reflect.Method +import kotlin.reflect.KClass +import kotlin.reflect.full.isSubclassOf +import kotlin.reflect.jvm.jvmErasure + +internal class AsyncLogicalDbFactory( + private val dynamoDbEnhancedClient: DynamoDbEnhancedAsyncClient +) : AsyncLogicalTable.Factory { + private val schema = Schema.create( + V2StringAttributeValue, + V2MapAttributeValue.Factory, + V2ForIndexAnnotation, + V2AttributeAnnotation, + V2RawItemTypeFactory() + ) + + fun logicalDb(dbType: KClass): DB { + val logicalDb = DynamoDbLogicalDb( + DynamoDbLogicalDb.MappedTableResourceFactory.simple(dynamoDbEnhancedClient::table), + schema, + ).async(dynamoDbEnhancedClient, this) + val methodHandlers = mutableMapOf() + for (member in dbType.declaredMembers) { + if (!member.returnType.jvmErasure.isSubclassOf(AsyncLogicalTable::class)) { + continue + } + val tableType = member.returnType.jvmErasure as KClass> + val tableName = getTableName(member, dbType) + val table = logicalTable(tableName, tableType) + methodHandlers[member.javaMethod] = GetterMethodHandler(table) + } + return ProxyFactory.create(dbType, methodHandlers.toMap(), logicalDb) + } + + override fun , RI : Any> logicalTable(tableName: String, tableType: KClass): T { + val rawItemType = schema.addRawItem(tableName, tableType.rawItemType) + val tableSchema = TableSchema.fromClass(rawItemType.type.java) + val dynamoDbTable = dynamoDbEnhancedClient.table(rawItemType.tableName, tableSchema) + val logicalTable = + object : + AsyncLogicalTable, + AsyncView by DynamoDbView( + rawItemType.codec as Codec, + rawItemType.codec as Codec, + tableSchema, + ).async(dynamoDbTable), + AsyncInlineView.Factory by InlineViewFactory(rawItemType, tableSchema, dynamoDbTable), + AsyncSecondaryIndex.Factory by SecondaryIndexFactory(rawItemType, tableSchema, dynamoDbTable) { + override fun codec(type: KClass): app.cash.tempest2.Codec = CodecAdapter(schema.codec(type)) + } + val methodHandlers = mutableMapOf() + for (member in tableType.declaredMembers) { + val component = when (member.returnType.jvmErasure) { + AsyncInlineView::class -> { + val keyType = member.returnType.arguments[0].type?.jvmErasure!! + val itemType = member.returnType.arguments[1].type?.jvmErasure!! + logicalTable.inlineView(keyType, itemType) + } + AsyncSecondaryIndex::class -> { + val keyType = member.returnType.arguments[0].type?.jvmErasure!! + val itemType = member.returnType.arguments[1].type?.jvmErasure!! + logicalTable.secondaryIndex(keyType, itemType) + } + else -> null + } + methodHandlers[member.javaMethod] = GetterMethodHandler(component) + } + return ProxyFactory.create(tableType, methodHandlers.toMap(), logicalTable) + } + + inner class InlineViewFactory( + private val rawItemType: RawItemType, + private val tableSchema: TableSchema, + private val dynamoDbTable: DynamoDbAsyncTable, + ) : AsyncInlineView.Factory { + + override fun inlineView( + keyType: KClass, + itemType: KClass + ): AsyncInlineView { + val item = schema.addItem(itemType, rawItemType.type) + val key = schema.addKey(keyType, itemType) + return object : + AsyncInlineView, + AsyncView by DynamoDbView( + key.codec as Codec, + item.codec as Codec, + tableSchema, + ).async(dynamoDbTable), + AsyncQueryable by queryable( + rawItemType, + item, + key, + tableSchema, + dynamoDbTable, + ), + AsyncScannable by scannable( + item, + key, + tableSchema, + dynamoDbTable, + ) {} + } + } + + inner class SecondaryIndexFactory( + private val rawItemType: RawItemType, + private val tableSchema: TableSchema, + private val dynamoDbTable: DynamoDbAsyncTable, + ) : AsyncSecondaryIndex.Factory { + + override fun secondaryIndex( + keyType: KClass, + itemType: KClass + ): AsyncSecondaryIndex { + val item = schema.addItem(itemType, rawItemType.type) + val key = schema.addKey(keyType, itemType) + return object : + AsyncSecondaryIndex, + AsyncQueryable by queryable( + rawItemType, + item, + key, + tableSchema, + dynamoDbTable, + ), + AsyncScannable by scannable( + item, + key, + tableSchema, + dynamoDbTable, + ) {} + } + } + + private fun queryable( + rawItemType: RawItemType, + itemType: ItemType, + keyType: KeyType, + tableSchema: TableSchema, + dynamoDbTable: DynamoDbAsyncTable, + ): AsyncQueryable { + if (keyType.rangeKeyName == null) { + return UnsupportedAsyncQueryable(rawItemType.type) + } + return DynamoDbQueryable( + keyType.secondaryIndexName, + itemType.attributeNames, + keyType.codec as Codec, + itemType.codec as Codec, + tableSchema, + ).async(dynamoDbTable) + } + + private fun scannable( + itemType: ItemType, + keyType: KeyType, + tableSchema: TableSchema, + dynamoDbTable: DynamoDbAsyncTable, + ): AsyncScannable { + return DynamoDbScannable( + keyType.secondaryIndexName, + itemType.attributeNames, + keyType.codec as Codec, + itemType.codec as Codec, + tableSchema, + ).async(dynamoDbTable) + } + + private val , RI : Any> KClass.rawItemType: KClass + get() = supertypes[0].arguments[0].type?.jvmErasure!! as KClass +} diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt index b955ff4be..2e152b09e 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbLogicalDb.kt @@ -19,6 +19,8 @@ package app.cash.tempest2.internal import app.cash.tempest.internal.ItemType import app.cash.tempest.internal.RawItemType import app.cash.tempest.internal.Schema +import app.cash.tempest2.AsyncLogicalDb +import app.cash.tempest2.AsyncLogicalTable import app.cash.tempest2.BatchWriteSet import app.cash.tempest2.ItemSet import app.cash.tempest2.KeySet @@ -27,15 +29,20 @@ import app.cash.tempest2.LogicalTable import app.cash.tempest2.TransactionWriteSet import app.cash.tempest2.internal.DynamoDbLogicalDb.WriteRequest.Op.CLOBBER import app.cash.tempest2.internal.DynamoDbLogicalDb.WriteRequest.Op.DELETE +import org.reactivestreams.Publisher +import software.amazon.awssdk.enhanced.dynamodb.Document +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient -import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable import software.amazon.awssdk.enhanced.dynamodb.Expression import software.amazon.awssdk.enhanced.dynamodb.Key +import software.amazon.awssdk.enhanced.dynamodb.MappedTableResource import software.amazon.awssdk.enhanced.dynamodb.TableMetadata import software.amazon.awssdk.enhanced.dynamodb.TableSchema import software.amazon.awssdk.enhanced.dynamodb.internal.EnhancedClientUtils import software.amazon.awssdk.enhanced.dynamodb.model.BatchGetItemEnhancedRequest +import software.amazon.awssdk.enhanced.dynamodb.model.BatchGetResultPage import software.amazon.awssdk.enhanced.dynamodb.model.BatchWriteItemEnhancedRequest +import software.amazon.awssdk.enhanced.dynamodb.model.BatchWriteResult import software.amazon.awssdk.enhanced.dynamodb.model.ConditionCheck import software.amazon.awssdk.enhanced.dynamodb.model.DeleteItemEnhancedRequest import software.amazon.awssdk.enhanced.dynamodb.model.GetItemEnhancedRequest @@ -46,25 +53,123 @@ import software.amazon.awssdk.enhanced.dynamodb.model.UpdateItemEnhancedRequest import software.amazon.awssdk.enhanced.dynamodb.model.WriteBatch import software.amazon.awssdk.services.dynamodb.model.AttributeValue import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException +import java.util.concurrent.CompletableFuture import kotlin.reflect.KClass internal class DynamoDbLogicalDb( - private val dynamoDbEnhancedClient: DynamoDbEnhancedClient, + private val mappedTableResourceFactory: MappedTableResourceFactory, private val schema: Schema, - logicalTableFactory: LogicalTable.Factory -) : LogicalDb, LogicalTable.Factory by logicalTableFactory { +) { - override fun batchLoad( + interface MappedTableResourceFactory { + fun mappedTableResource(tableName: String, tableSchema: TableSchema): MappedTableResource + + companion object { + fun simple(create: (String, TableSchema) -> MappedTableResource): MappedTableResourceFactory { + return object : MappedTableResourceFactory { + override fun mappedTableResource(tableName: String, tableSchema: TableSchema): MappedTableResource { + return create(tableName, tableSchema as TableSchema) as MappedTableResource + } + } + } + } + } + + fun sync(dynamoDbEnhancedClient: DynamoDbEnhancedClient, logicalTableFactory: LogicalTable.Factory) = Sync(dynamoDbEnhancedClient, logicalTableFactory) + + inner class Sync( + private val dynamoDbEnhancedClient: DynamoDbEnhancedClient, + logicalTableFactory: LogicalTable.Factory + ) : LogicalDb, LogicalTable.Factory by logicalTableFactory { + + override fun batchLoad( + keys: KeySet, + consistentReads: Boolean + ): ItemSet { + val (requests, requestsByTable, batchRequest) = toBatchLoadRequest(keys, consistentReads) + val page = dynamoDbEnhancedClient.batchGetItem(batchRequest).iterator().next() + return toBatchLoadResponse(requestsByTable, requests, page) + } + + override fun batchWrite( + writeSet: BatchWriteSet + ): app.cash.tempest2.BatchWriteResult { + val (requestsByTable, batchRequest) = toBatchWriteRequest(writeSet) + val result = dynamoDbEnhancedClient.batchWriteItem(batchRequest) + return toBatchWriteResponse(requestsByTable, result) + } + + override fun transactionLoad(keys: KeySet): ItemSet { + val (requests, batchRequest) = toTransactionLoadRequest(keys) + val documents = dynamoDbEnhancedClient.transactGetItems(batchRequest) + return toTransactionLoadResponse(documents, requests) + } + + override fun transactionWrite(writeSet: TransactionWriteSet) { + val writeRequest = toTransactionWriteRequest(writeSet) + try { + dynamoDbEnhancedClient.transactWriteItems(writeRequest) + } catch (e: TransactionCanceledException) { + toTransactionWriteException(writeSet, e) + } + } + } + + fun async(dynamoDbEnhancedClient: DynamoDbEnhancedAsyncClient, logicalTableFactory: AsyncLogicalTable.Factory) = Async(dynamoDbEnhancedClient, logicalTableFactory) + + inner class Async( + private val dynamoDbEnhancedClient: DynamoDbEnhancedAsyncClient, + logicalTableFactory: AsyncLogicalTable.Factory + ) : AsyncLogicalDb, AsyncLogicalTable.Factory by logicalTableFactory { + + override fun batchLoadAsync( + keys: KeySet, + consistentReads: Boolean + ): Publisher { + val (requests, requestsByTable, batchRequest) = toBatchLoadRequest(keys, consistentReads) + return dynamoDbEnhancedClient.batchGetItem(batchRequest) + .limit(1) + .map { page -> toBatchLoadResponse(requestsByTable, requests, page) } + } + + override fun batchWriteAsync( + writeSet: BatchWriteSet + ): CompletableFuture { + val (requestsByTable, batchRequest) = toBatchWriteRequest(writeSet) + return dynamoDbEnhancedClient.batchWriteItem(batchRequest) + .thenApply { result -> toBatchWriteResponse(requestsByTable, result) } + } + + override fun transactionLoadAsync(keys: KeySet): CompletableFuture { + val (requests, batchRequest) = toTransactionLoadRequest(keys) + return dynamoDbEnhancedClient.transactGetItems(batchRequest) + .thenApply { documents -> toTransactionLoadResponse(documents, requests) } + } + + override fun transactionWriteAsync(writeSet: TransactionWriteSet): CompletableFuture { + val writeRequest = toTransactionWriteRequest(writeSet) + return dynamoDbEnhancedClient.transactWriteItems(writeRequest) + .exceptionally { e -> + if (e is TransactionCanceledException) { + toTransactionWriteException(writeSet, e) as Void + } else { + throw e + } + } + } + } + + private fun toBatchLoadRequest( keys: KeySet, consistentReads: Boolean - ): ItemSet { + ): Triple, Map, List>, BatchGetItemEnhancedRequest> { val requests = keys.map { LoadRequest(it.encodeAsKey().rawItemKey(), it.expectedItemType()) } val requestsByTable = requests.groupBy { it.tableType } val batchRequest = BatchGetItemEnhancedRequest.builder() .readBatches( requestsByTable.map { (tableType, requestsForTable) -> ReadBatch.builder(tableType.java) - .mappedTableResource(dynamoDbTable(tableType)) + .mappedTableResource(mappedTableResource(tableType)) .apply { for (request in requestsForTable) { addGetItem(request.key.key, consistentReads) @@ -74,12 +179,19 @@ internal class DynamoDbLogicalDb( } ) .build() - val page = dynamoDbEnhancedClient.batchGetItem(batchRequest).iterator().next() + return Triple(requests, requestsByTable, batchRequest) + } + + private fun toBatchLoadResponse( + requestsByTable: Map, List>, + requests: List, + page: BatchGetResultPage + ): ItemSet { val results = mutableSetOf() val tableTypes = requestsByTable.keys val resultTypes = requests.map { it.key to it.resultType }.toMap() for (tableType in tableTypes) { - for (result in page.resultsForTable(dynamoDbTable(tableType))) { + for (result in page.resultsForTable(mappedTableResource(tableType))) { val resultType = resultTypes[result.rawItemKey()]!! val decoded = resultType.codec.toApp(result) results.add(decoded) @@ -88,9 +200,7 @@ internal class DynamoDbLogicalDb( return ItemSet(results) } - override fun batchWrite( - writeSet: BatchWriteSet - ): app.cash.tempest2.BatchWriteResult { + private fun toBatchWriteRequest(writeSet: BatchWriteSet): Pair, List>, BatchWriteItemEnhancedRequest> { val clobberRequests = writeSet.itemsToClobber.map { WriteRequest(it.encodeAsItem(), CLOBBER) } val deleteRequests = writeSet.keysToDelete.map { WriteRequest(it.encodeAsKey(), DELETE) } val requests = clobberRequests + deleteRequests @@ -99,7 +209,7 @@ internal class DynamoDbLogicalDb( .writeBatches( requestsByTable.map { (tableType, writeRequestsForTable) -> WriteBatch.builder(tableType.java) - .mappedTableResource(dynamoDbTable(tableType)) + .mappedTableResource(mappedTableResource(tableType)) .apply { for (request in writeRequestsForTable) { when (request.op) { @@ -112,12 +222,18 @@ internal class DynamoDbLogicalDb( } ) .build() - val result = dynamoDbEnhancedClient.batchWriteItem(batchRequest) + return Pair(requestsByTable, batchRequest) + } + + private fun toBatchWriteResponse( + requestsByTable: Map, List>, + result: BatchWriteResult + ): app.cash.tempest2.BatchWriteResult { val unprocessedClobbers = mutableListOf() val unprocessedDeletes = mutableListOf() val tableTypes = requestsByTable.keys for (tableType in tableTypes) { - val table = dynamoDbTable(tableType) + val table = mappedTableResource(tableType) val rawClobbersItems = result.unprocessedPutItemsForTable(table) for (rawItem in rawClobbersItems) { unprocessedClobbers.add(rawItem.rawItemKey().key) @@ -133,27 +249,33 @@ internal class DynamoDbLogicalDb( ) } - override fun transactionLoad(keys: KeySet): ItemSet { + private fun toTransactionLoadRequest(keys: KeySet): Pair, TransactGetItemsEnhancedRequest> { val requests = keys.map { LoadRequest(it.encodeAsKey().rawItemKey(), it.expectedItemType()) } val batchRequest = TransactGetItemsEnhancedRequest.builder() .apply { for (request in requests) { - addGetItem(dynamoDbTable(request.tableType), request.key.key) + addGetItem(mappedTableResource(request.tableType), request.key.key) } } .build() - val documents = dynamoDbEnhancedClient.transactGetItems(batchRequest) + return Pair(requests, batchRequest) + } + + private fun toTransactionLoadResponse( + documents: MutableList, + requests: List + ): ItemSet { val results = mutableSetOf() for ((document, request) in documents.zip(requests)) { - val result = document.getItem(dynamoDbTable(request.tableType)) ?: continue + val result = document.getItem(mappedTableResource(request.tableType)) ?: continue val decoded = request.resultType.codec.toApp(result) results.add(decoded) } return ItemSet(results) } - override fun transactionWrite(writeSet: TransactionWriteSet) { - val writeRequest = TransactWriteItemsEnhancedRequest.builder() + private fun toTransactionWriteRequest(writeSet: TransactionWriteSet): TransactWriteItemsEnhancedRequest? { + return TransactWriteItemsEnhancedRequest.builder() .apply { for (itemToSave in writeSet.itemsToSave) { addUpdateItem(itemToSave.encodeAsItem(), writeSet.writeExpressions[itemToSave]) @@ -169,17 +291,16 @@ internal class DynamoDbLogicalDb( } } .build() + } + + fun toTransactionWriteException(writeSet: TransactionWriteSet, e: TransactionCanceledException) { // We don't want to wrap these exceptions but only add a more useful message so upstream callers can themselves // parse the potentially concurrency related TransactionCancelledExceptions // https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/dynamodbv2/model/TransactionCanceledException.html - try { - dynamoDbEnhancedClient.transactWriteItems(writeRequest) - } catch (e: TransactionCanceledException) { - throw TransactionCanceledException.builder() - .message("Write transaction failed: ${writeSet.describeOperations()}") - .cancellationReasons(e.cancellationReasons()) - .build() - } + throw TransactionCanceledException.builder() + .message("Write transaction failed: ${writeSet.describeOperations()}") + .cancellationReasons(e.cancellationReasons()) + .build() } private fun Any.rawItemKey(): RawItemKey { @@ -188,7 +309,7 @@ internal class DynamoDbLogicalDb( rawItemType.tableName, EnhancedClientUtils.createKeyFromItem( this, - dynamoDbTable(this::class).tableSchema(), + mappedTableResource(this::class).tableSchema(), TableMetadata.primaryIndexName(), ), rawItemType.hashKeyName, @@ -204,9 +325,9 @@ internal class DynamoDbLogicalDb( ) { "Cannot find a dynamodb table for ${this::class}" } } - private fun dynamoDbTable(tableType: KClass<*>): DynamoDbTable { + private fun mappedTableResource(tableType: KClass<*>): MappedTableResource { val rawItemType = schema.getRawItem(tableType)!! - return dynamoDbEnhancedClient.table( + return mappedTableResourceFactory.mappedTableResource( rawItemType.tableName, TableSchema.fromClass(rawItemType.type.java) as TableSchema ) @@ -264,7 +385,7 @@ internal class DynamoDbLogicalDb( item: T, expression: Expression? ) = addUpdateItem( - dynamoDbTable(item::class), + mappedTableResource(item::class), UpdateItemEnhancedRequest.builder(item.javaClass) .item(item) .conditionExpression(expression) @@ -275,7 +396,7 @@ internal class DynamoDbLogicalDb( item: T, expression: Expression? ) = addDeleteItem( - dynamoDbTable(item::class), + mappedTableResource(item::class), DeleteItemEnhancedRequest.builder() .key(item.rawItemKey().key) .conditionExpression(expression) @@ -286,7 +407,7 @@ internal class DynamoDbLogicalDb( item: T, expression: Expression? ) = addConditionCheck( - dynamoDbTable(item::class), + mappedTableResource(item::class), ConditionCheck.builder() .key(item.rawItemKey().key) .conditionExpression(expression) diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbQueryable.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbQueryable.kt index 4000b0cd3..19b1d47f5 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbQueryable.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbQueryable.kt @@ -17,16 +17,20 @@ package app.cash.tempest2.internal import app.cash.tempest.internal.Codec +import app.cash.tempest2.AsyncQueryable import app.cash.tempest2.BeginsWith import app.cash.tempest2.Between import app.cash.tempest2.KeyCondition import app.cash.tempest2.Offset import app.cash.tempest2.Page import app.cash.tempest2.Queryable +import org.reactivestreams.Publisher +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable import software.amazon.awssdk.enhanced.dynamodb.Expression import software.amazon.awssdk.enhanced.dynamodb.Key import software.amazon.awssdk.enhanced.dynamodb.TableMetadata +import software.amazon.awssdk.enhanced.dynamodb.TableSchema import software.amazon.awssdk.enhanced.dynamodb.internal.EnhancedClientUtils import software.amazon.awssdk.enhanced.dynamodb.model.QueryConditional import software.amazon.awssdk.enhanced.dynamodb.model.QueryEnhancedRequest @@ -37,17 +41,59 @@ internal class DynamoDbQueryable( private val specificAttributeNames: Set, private val keyCodec: Codec, private val itemCodec: Codec, - private val dynamoDbTable: DynamoDbTable -) : Queryable { + private val tableSchema: TableSchema, +) { - override fun query( + fun sync(dynamoDbTable: DynamoDbTable) = Sync(dynamoDbTable) + + inner class Sync( + private val dynamoDbTable: DynamoDbTable + ) : Queryable { + + override fun query( + keyCondition: KeyCondition, + asc: Boolean, + pageSize: Int, + consistentRead: Boolean, + filterExpression: Expression?, + initialOffset: Offset? + ): Page { + val request = toQueryRequest(keyCondition, asc, consistentRead, pageSize, filterExpression, initialOffset) + val page = if (secondaryIndexName != null) { + dynamoDbTable.index(secondaryIndexName).query(request) + } else { + dynamoDbTable.query(request) + } + .iterator().next() + return toQueryResponse(page) + } + } + + fun async(dynamoDbTable: DynamoDbAsyncTable) = Async(dynamoDbTable) + + inner class Async( + private val dynamoDbTable: DynamoDbAsyncTable + ) : AsyncQueryable { + override fun queryAsync(keyCondition: KeyCondition, asc: Boolean, pageSize: Int, consistentRead: Boolean, filterExpression: Expression?, initialOffset: Offset?): Publisher> { + val request = toQueryRequest(keyCondition, asc, consistentRead, pageSize, filterExpression, initialOffset) + return if (secondaryIndexName != null) { + dynamoDbTable.index(secondaryIndexName).query(request) + } else { + dynamoDbTable.query(request) + } + .limit(1) + .map { page -> toQueryResponse(page) } + } + } + + private fun toQueryRequest( keyCondition: KeyCondition, asc: Boolean, - pageSize: Int, consistentRead: Boolean, + pageSize: Int, filterExpression: Expression?, initialOffset: Offset? - ): Page { + ): QueryEnhancedRequest { val query = QueryEnhancedRequest.builder() .queryConditional(toQueryConditional(keyCondition)) .scanIndexForward(asc) @@ -60,12 +106,10 @@ internal class DynamoDbQueryable( if (initialOffset != null) { query.exclusiveStartKey(initialOffset.encodeOffset()) } - val page = if (secondaryIndexName != null) { - dynamoDbTable.index(secondaryIndexName).query(query.build()) - } else { - dynamoDbTable.query(query.build()) - } - .iterator().next() + return query.build() + } + + private fun toQueryResponse(page: software.amazon.awssdk.enhanced.dynamodb.model.Page): Page { val contents = page.items().map { itemCodec.toApp(it) } val offset = page.lastEvaluatedKey()?.decodeOffset() return Page(contents, offset) @@ -101,18 +145,18 @@ internal class DynamoDbQueryable( private fun Offset.encodeOffset(): Map { val offsetKey = keyCodec.toDb(key) - return dynamoDbTable.tableSchema().itemToMap(offsetKey, true) + return tableSchema.itemToMap(offsetKey, true) } private fun Map.decodeOffset(): Offset { - val offsetKeyAttributes = dynamoDbTable.tableSchema().mapToItem(this) + val offsetKeyAttributes = tableSchema.mapToItem(this) val offsetKey = keyCodec.toApp(offsetKeyAttributes) return Offset(offsetKey) } private fun R.key(): Key { return EnhancedClientUtils.createKeyFromItem( - this, dynamoDbTable.tableSchema(), + this, tableSchema, secondaryIndexName ?: TableMetadata.primaryIndexName() ) } diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt index 8766188b3..c8e79ab26 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbScannable.kt @@ -17,11 +17,15 @@ package app.cash.tempest2.internal import app.cash.tempest.internal.Codec +import app.cash.tempest2.AsyncScannable import app.cash.tempest2.Offset import app.cash.tempest2.Page import app.cash.tempest2.Scannable +import org.reactivestreams.Publisher +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable import software.amazon.awssdk.enhanced.dynamodb.Expression +import software.amazon.awssdk.enhanced.dynamodb.TableSchema import software.amazon.awssdk.enhanced.dynamodb.model.ScanEnhancedRequest import software.amazon.awssdk.services.dynamodb.model.AttributeValue @@ -30,15 +34,60 @@ internal class DynamoDbScannable( private val attributeNames: Set, private val keyCodec: Codec, private val itemCodec: Codec, - private val dynamoDbTable: DynamoDbTable -) : Scannable { + private val tableSchema: TableSchema, +) { - override fun scan( - pageSize: Int, + fun sync(dynamoDbTable: DynamoDbTable) = Sync(dynamoDbTable) + + inner class Sync( + private val dynamoDbTable: DynamoDbTable + ) : Scannable { + + override fun scan( + pageSize: Int, + consistentRead: Boolean, + filterExpression: Expression?, + initialOffset: Offset? + ): Page { + val request = toScanRequest(consistentRead, pageSize, filterExpression, initialOffset) + val page = if (secondaryIndexName != null) { + dynamoDbTable.index(secondaryIndexName).scan(request) + } else { + dynamoDbTable.scan(request) + } + .iterator().next() + return toScanResponse(page) + } + } + + fun async(dynamoDbTable: DynamoDbAsyncTable) = Async(dynamoDbTable) + + inner class Async( + private val dynamoDbTable: DynamoDbAsyncTable + ) : AsyncScannable { + + override fun scanAsync( + pageSize: Int, + consistentRead: Boolean, + filterExpression: Expression?, + initialOffset: Offset? + ): Publisher> { + val request = toScanRequest(consistentRead, pageSize, filterExpression, initialOffset) + return if (secondaryIndexName != null) { + dynamoDbTable.index(secondaryIndexName).scan(request) + } else { + dynamoDbTable.scan(request) + } + .limit(1).map(::toScanResponse) + } + } + + private fun toScanRequest( consistentRead: Boolean, + pageSize: Int, filterExpression: Expression?, initialOffset: Offset? - ): Page { + ): ScanEnhancedRequest { val scan = ScanEnhancedRequest.builder() .consistentRead(consistentRead) .limit(pageSize) @@ -49,13 +98,10 @@ internal class DynamoDbScannable( if (initialOffset != null) { scan.exclusiveStartKey(initialOffset.encodeOffset()) } - val page = if (secondaryIndexName != null) { - dynamoDbTable.index(secondaryIndexName).scan(scan.build()) - } else { - dynamoDbTable.scan(scan.build()) - } - .iterator().next() + return scan.build() + } + private fun toScanResponse(page: software.amazon.awssdk.enhanced.dynamodb.model.Page): Page { val contents = page.items().map { itemCodec.toApp(it) } val offset = page.lastEvaluatedKey()?.decodeOffset() return Page(contents, offset) @@ -63,11 +109,11 @@ internal class DynamoDbScannable( private fun Offset.encodeOffset(): Map { val offsetKey = keyCodec.toDb(key) - return dynamoDbTable.tableSchema().itemToMap(offsetKey, true) + return tableSchema.itemToMap(offsetKey, true) } private fun Map.decodeOffset(): Offset { - val offsetKeyAttributes = dynamoDbTable.tableSchema().mapToItem(this) + val offsetKeyAttributes = tableSchema.mapToItem(this) val offsetKey = keyCodec.toApp(offsetKeyAttributes) return Offset(offsetKey) } diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbView.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbView.kt index 77c89dddb..12eaa19f5 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbView.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/DynamoDbView.kt @@ -17,75 +17,139 @@ package app.cash.tempest2.internal import app.cash.tempest.internal.Codec +import app.cash.tempest2.AsyncView import app.cash.tempest2.View +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable import software.amazon.awssdk.enhanced.dynamodb.Expression import software.amazon.awssdk.enhanced.dynamodb.Key import software.amazon.awssdk.enhanced.dynamodb.TableMetadata +import software.amazon.awssdk.enhanced.dynamodb.TableSchema import software.amazon.awssdk.enhanced.dynamodb.internal.EnhancedClientUtils import software.amazon.awssdk.enhanced.dynamodb.model.DeleteItemEnhancedRequest import software.amazon.awssdk.enhanced.dynamodb.model.GetItemEnhancedRequest import software.amazon.awssdk.enhanced.dynamodb.model.PutItemEnhancedRequest +import java.util.concurrent.CompletableFuture internal class DynamoDbView( private val keyCodec: Codec, private val itemCodec: Codec, - private val dynamoDbTable: DynamoDbTable -) : View { + private val tableSchema: TableSchema, +) { - override fun load(key: K, consistentReads: Boolean): I? { + fun sync(dynamoDbTable: DynamoDbTable) = Sync(dynamoDbTable) + + inner class Sync( + private val dynamoDbTable: DynamoDbTable + ) : View { + override fun load(key: K, consistentReads: Boolean): I? { + val request = toLoadRequest(key, consistentReads) + val itemObject = dynamoDbTable.getItem(request) + return toLoadResponse(itemObject) + } + + override fun save( + item: I, + saveExpression: Expression? + ) { + val request = toSaveRequest(item, saveExpression) + dynamoDbTable.putItem(request) + } + + override fun deleteKey( + key: K, + deleteExpression: Expression? + ): I? { + val request = toDeleteKeyRequest(key, deleteExpression) + val itemObject = dynamoDbTable.deleteItem(request) + return toDeleteResponse(itemObject) + } + + override fun delete( + item: I, + deleteExpression: Expression? + ): I? { + val request = toDeleteItemRequest(item, deleteExpression) + val itemObject = dynamoDbTable.deleteItem(request) + return toDeleteResponse(itemObject) + } + } + + fun async(dynamoDbTable: DynamoDbAsyncTable) = Async(dynamoDbTable) + + inner class Async( + private val dynamoDbTable: DynamoDbAsyncTable + ) : AsyncView { + override fun loadAsync(key: K, consistentReads: Boolean): CompletableFuture { + val request = toLoadRequest(key, consistentReads) + return dynamoDbTable.getItem(request).thenApply(::toDeleteResponse) + } + + override fun saveAsync( + item: I, + saveExpression: Expression? + ): CompletableFuture { + val request = toSaveRequest(item, saveExpression) + return dynamoDbTable.putItem(request) + } + + override fun deleteKeyAsync( + key: K, + deleteExpression: Expression? + ): CompletableFuture { + val request = toDeleteKeyRequest(key, deleteExpression) + return dynamoDbTable.deleteItem(request).thenApply(::toDeleteResponse) + } + + override fun deleteAsync( + item: I, + deleteExpression: Expression? + ): CompletableFuture { + val request = toDeleteItemRequest(item, deleteExpression) + return dynamoDbTable.deleteItem(request).thenApply(::toDeleteResponse) + } + } + + private fun R.key(): Key { + return EnhancedClientUtils.createKeyFromItem( + this, tableSchema, + TableMetadata.primaryIndexName() + ) + } + + private fun toLoadRequest(key: K, consistentReads: Boolean): GetItemEnhancedRequest { val keyObject = keyCodec.toDb(key) - val request = GetItemEnhancedRequest.builder() + return GetItemEnhancedRequest.builder() .key(keyObject.key()) .consistentRead(consistentReads) .build() - val itemObject = dynamoDbTable.getItem(request) - return if (itemObject != null) itemCodec.toApp(itemObject) else null } - override fun save( - item: I, - saveExpression: Expression? - ) { + private fun toLoadResponse(itemObject: R?) = if (itemObject != null) itemCodec.toApp(itemObject) else null + + private fun toSaveRequest(item: I, saveExpression: Expression?): PutItemEnhancedRequest { val itemObject = itemCodec.toDb(item) - val request = PutItemEnhancedRequest.builder(dynamoDbTable.tableSchema().itemType().rawClass()) + return PutItemEnhancedRequest.builder(tableSchema.itemType().rawClass()) .item(itemObject) .conditionExpression(saveExpression) .build() - dynamoDbTable.putItem(request) } - override fun deleteKey( - key: K, - deleteExpression: Expression? - ) { + private fun toDeleteKeyRequest(key: K, deleteExpression: Expression?): DeleteItemEnhancedRequest { val keyObject = keyCodec.toDb(key) - deleteInternal(keyObject, deleteExpression) + return DeleteItemEnhancedRequest.builder() + .key(keyObject.key()) + .conditionExpression(deleteExpression) + .build() } - override fun delete( - item: I, - deleteExpression: Expression? - ) { + private fun toDeleteItemRequest(item: I, deleteExpression: Expression?): DeleteItemEnhancedRequest { val itemObject = itemCodec.toDb(item) - deleteInternal(itemObject, deleteExpression) - } - - private fun deleteInternal( - itemObject: R, - deleteExpression: Expression? - ) { - val request = DeleteItemEnhancedRequest.builder() + return DeleteItemEnhancedRequest.builder() .key(itemObject.key()) .conditionExpression(deleteExpression) .build() - dynamoDbTable.deleteItem(request) } - private fun R.key(): Key { - return EnhancedClientUtils.createKeyFromItem( - this, dynamoDbTable.tableSchema(), - TableMetadata.primaryIndexName() - ) - } + private fun toDeleteResponse(itemObject: R?) = if (itemObject != null) itemCodec.toApp(itemObject) else null } diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/LogicalDbFactory.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/LogicalDbFactory.kt index 48319e155..c364e2575 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/LogicalDbFactory.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/LogicalDbFactory.kt @@ -31,9 +31,9 @@ import app.cash.tempest2.LogicalTable import app.cash.tempest2.Queryable import app.cash.tempest2.Scannable import app.cash.tempest2.SecondaryIndex -import app.cash.tempest2.TableName import app.cash.tempest2.View import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient +import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable import software.amazon.awssdk.enhanced.dynamodb.TableSchema import java.lang.reflect.Method import kotlin.reflect.KClass @@ -42,10 +42,7 @@ import kotlin.reflect.jvm.jvmErasure internal class LogicalDbFactory( private val dynamoDbEnhancedClient: DynamoDbEnhancedClient -) { - - private val logicalTableFactory = LogicalTableFactory() - private val proxyFactory = ProxyFactory() +) : LogicalTable.Factory { private val schema = Schema.create( V2StringAttributeValue, V2MapAttributeValue.Factory, @@ -55,101 +52,63 @@ internal class LogicalDbFactory( ) fun logicalDb(dbType: KClass): DB { + val logicalDb = DynamoDbLogicalDb( + DynamoDbLogicalDb.MappedTableResourceFactory.simple(dynamoDbEnhancedClient::table), + schema, + ).sync(dynamoDbEnhancedClient, this) val methodHandlers = mutableMapOf() for (member in dbType.declaredMembers) { if (!member.returnType.jvmErasure.isSubclassOf(LogicalTable::class)) { continue } val tableType = member.returnType.jvmErasure as KClass> - val tableName = member.annotations.filterIsInstance().singleOrNull()?.value - requireNotNull(tableName) { - "Please annotate ${member.javaMethod} in $dbType with `@TableName`" - } - val table = logicalTableFactory.logicalTable(tableName, tableType) + val tableName = getTableName(member, dbType) + val table = logicalTable(tableName, tableType) methodHandlers[member.javaMethod] = GetterMethodHandler(table) } - val logicalDb = DynamoDbLogicalDb( - dynamoDbEnhancedClient, - schema, - logicalTableFactory - ) - return proxyFactory.create(dbType, methodHandlers.toMap(), logicalDb) - } - - private fun queryable( - rawItemType: RawItemType, - itemType: ItemType, - keyType: KeyType - ): Queryable { - if (keyType.rangeKeyName == null) { - return UnsupportedQueryable(rawItemType.type) - } - return DynamoDbQueryable( - keyType.secondaryIndexName, - itemType.attributeNames, - keyType.codec as Codec, - itemType.codec as Codec, - dynamoDbEnhancedClient.table(rawItemType.tableName, TableSchema.fromClass(rawItemType.type.java)) - ) - } - - private fun scannable( - rawItemType: RawItemType, - itemType: ItemType, - keyType: KeyType - ): Scannable { - return DynamoDbScannable( - keyType.secondaryIndexName, - itemType.attributeNames, - keyType.codec as Codec, - itemType.codec as Codec, - dynamoDbEnhancedClient.table(rawItemType.tableName, TableSchema.fromClass(rawItemType.type.java)) - ) + return ProxyFactory.create(dbType, methodHandlers.toMap(), logicalDb) } - inner class LogicalTableFactory : LogicalTable.Factory { - - override fun , RI : Any> logicalTable(tableName: String, tableType: KClass): T { - val rawItemType = schema.addRawItem(tableName, tableType.rawItemType) - val codec = rawItemType.codec as Codec - val view = DynamoDbView( - codec, - codec, - dynamoDbEnhancedClient.table(rawItemType.tableName, TableSchema.fromClass(rawItemType.type.java)) - ) - val inlineViewFactory = InlineViewFactory(rawItemType) - val secondaryIndexFactory = SecondaryIndexFactory(rawItemType) - val logicalTable = - object : - LogicalTable, - View by view, - InlineView.Factory by inlineViewFactory, - SecondaryIndex.Factory by secondaryIndexFactory { - override fun codec(type: KClass): app.cash.tempest2.Codec = CodecAdapter(schema.codec(type)) + override fun , RI : Any> logicalTable(tableName: String, tableType: KClass): T { + val rawItemType = schema.addRawItem(tableName, tableType.rawItemType) + val tableSchema = TableSchema.fromClass(rawItemType.type.java) + val dynamoDbTable = dynamoDbEnhancedClient.table(rawItemType.tableName, tableSchema) + val logicalTable = + object : + LogicalTable, + View by DynamoDbView( + rawItemType.codec as Codec, + rawItemType.codec as Codec, + tableSchema, + ).sync(dynamoDbTable), + InlineView.Factory by InlineViewFactory(rawItemType, tableSchema, dynamoDbTable), + SecondaryIndex.Factory by SecondaryIndexFactory(rawItemType, tableSchema, dynamoDbTable) { + override fun codec(type: KClass): app.cash.tempest2.Codec = CodecAdapter(schema.codec(type)) + } + val methodHandlers = mutableMapOf() + for (member in tableType.declaredMembers) { + val component = when (member.returnType.jvmErasure) { + InlineView::class -> { + val keyType = member.returnType.arguments[0].type?.jvmErasure!! + val itemType = member.returnType.arguments[1].type?.jvmErasure!! + logicalTable.inlineView(keyType, itemType) } - val methodHandlers = mutableMapOf() - for (member in tableType.declaredMembers) { - val component = when (member.returnType.jvmErasure) { - InlineView::class -> { - val keyType = member.returnType.arguments[0].type?.jvmErasure!! - val itemType = member.returnType.arguments[1].type?.jvmErasure!! - inlineViewFactory.inlineView(keyType, itemType) - } - SecondaryIndex::class -> { - val keyType = member.returnType.arguments[0].type?.jvmErasure!! - val itemType = member.returnType.arguments[1].type?.jvmErasure!! - secondaryIndexFactory.secondaryIndex(keyType, itemType) - } - else -> null + SecondaryIndex::class -> { + val keyType = member.returnType.arguments[0].type?.jvmErasure!! + val itemType = member.returnType.arguments[1].type?.jvmErasure!! + logicalTable.secondaryIndex(keyType, itemType) } - methodHandlers[member.javaMethod] = GetterMethodHandler(component) + else -> null } - return proxyFactory.create(tableType, methodHandlers.toMap(), logicalTable) + methodHandlers[member.javaMethod] = GetterMethodHandler(component) } + return ProxyFactory.create(tableType, methodHandlers.toMap(), logicalTable) } inner class InlineViewFactory( - private val rawItemType: RawItemType + private val rawItemType: RawItemType, + private val tableSchema: TableSchema, + private val dynamoDbTable: DynamoDbTable, ) : InlineView.Factory { override fun inlineView( @@ -158,31 +117,33 @@ internal class LogicalDbFactory( ): InlineView { val item = schema.addItem(itemType, rawItemType.type) val key = schema.addKey(keyType, itemType) - val view = DynamoDbView( - key.codec as Codec, - item.codec as Codec, - dynamoDbEnhancedClient.table(rawItemType.tableName, TableSchema.fromClass(rawItemType.type.java)) - ) - val queryable = queryable( - rawItemType, - item, - key - ) - val scannable = scannable( - rawItemType, - item, - key - ) return object : InlineView, - View by view, - Queryable by queryable, - Scannable by scannable {} + View by DynamoDbView( + key.codec as Codec, + item.codec as Codec, + tableSchema, + ).sync(dynamoDbTable), + Queryable by queryable( + rawItemType, + item, + key, + tableSchema, + dynamoDbTable, + ), + Scannable by scannable( + item, + key, + tableSchema, + dynamoDbTable, + ) {} } } inner class SecondaryIndexFactory( - private val rawItemType: RawItemType + private val rawItemType: RawItemType, + private val tableSchema: TableSchema, + private val dynamoDbTable: DynamoDbTable, ) : SecondaryIndex.Factory { override fun secondaryIndex( @@ -191,33 +152,58 @@ internal class LogicalDbFactory( ): SecondaryIndex { val item = schema.addItem(itemType, rawItemType.type) val key = schema.addKey(keyType, itemType) - val queryable = queryable( - rawItemType, - item, - key - ) - val scannable = scannable( - rawItemType, - item, - key - ) return object : SecondaryIndex, - Queryable by queryable, - Scannable by scannable {} + Queryable by queryable( + rawItemType, + item, + key, + tableSchema, + dynamoDbTable, + ), + Scannable by scannable( + item, + key, + tableSchema, + dynamoDbTable, + ) {} } } - private class CodecAdapter( - private val internal: Codec - ) : app.cash.tempest2.Codec { - override fun toDb(appItem: A): D = internal.toDb(appItem) - - override fun toApp(dbItem: D): A = internal.toApp(dbItem) + private fun queryable( + rawItemType: RawItemType, + itemType: ItemType, + keyType: KeyType, + tableSchema: TableSchema, + dynamoDbTable: DynamoDbTable, + ): Queryable { + if (keyType.rangeKeyName == null) { + return UnsupportedQueryable(rawItemType.type) + } + return DynamoDbQueryable( + keyType.secondaryIndexName, + itemType.attributeNames, + keyType.codec as Codec, + itemType.codec as Codec, + tableSchema, + ).sync(dynamoDbTable) } - companion object { - val , RI : Any> KClass.rawItemType: KClass - get() = supertypes[0].arguments[0].type?.jvmErasure!! as KClass + private fun scannable( + itemType: ItemType, + keyType: KeyType, + tableSchema: TableSchema, + dynamoDbTable: DynamoDbTable, + ): Scannable { + return DynamoDbScannable( + keyType.secondaryIndexName, + itemType.attributeNames, + keyType.codec as Codec, + itemType.codec as Codec, + tableSchema, + ).sync(dynamoDbTable) } + + private val , RI : Any> KClass.rawItemType: KClass + get() = supertypes[0].arguments[0].type?.jvmErasure!! as KClass } diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedAsyncQueryable.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedAsyncQueryable.kt new file mode 100644 index 000000000..f290a1433 --- /dev/null +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/UnsupportedAsyncQueryable.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2021 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.cash.tempest2.internal + +import app.cash.tempest2.AsyncQueryable +import app.cash.tempest2.KeyCondition +import app.cash.tempest2.Offset +import app.cash.tempest2.Page +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.reactive.asPublisher +import org.reactivestreams.Publisher +import software.amazon.awssdk.enhanced.dynamodb.Expression +import kotlin.reflect.KClass + +internal class UnsupportedAsyncQueryable( + private val rawType: KClass<*> +) : AsyncQueryable { + override fun queryAsync( + keyCondition: KeyCondition, + asc: Boolean, + pageSize: Int, + consistentRead: Boolean, + filterExpression: Expression?, + initialOffset: Offset? + ): Publisher> { + return flow> { + throw UnsupportedOperationException("Require $rawType to have a range key. You can query a table or an index only if it has a composite primary key (partition key and sort key)") + }.asPublisher() + } +} diff --git a/tempest2/src/main/kotlin/app/cash/tempest2/internal/V2.kt b/tempest2/src/main/kotlin/app/cash/tempest2/internal/V2.kt index 3366eb537..3d76858aa 100644 --- a/tempest2/src/main/kotlin/app/cash/tempest2/internal/V2.kt +++ b/tempest2/src/main/kotlin/app/cash/tempest2/internal/V2.kt @@ -17,6 +17,8 @@ package app.cash.tempest2.internal import app.cash.tempest.internal.AttributeAnnotation +import app.cash.tempest.internal.ClassMember +import app.cash.tempest.internal.Codec import app.cash.tempest.internal.ForIndexAnnotation import app.cash.tempest.internal.ItemType import app.cash.tempest.internal.MapAttributeValue @@ -24,6 +26,7 @@ import app.cash.tempest.internal.RawItemType import app.cash.tempest.internal.StringAttributeValue import app.cash.tempest2.Attribute import app.cash.tempest2.ForIndex +import app.cash.tempest2.TableName import software.amazon.awssdk.enhanced.dynamodb.TableMetadata import software.amazon.awssdk.enhanced.dynamodb.TableSchema import software.amazon.awssdk.services.dynamodb.model.AttributeValue @@ -82,3 +85,19 @@ internal class V2RawItemTypeFactory : RawItemType.Factory { ) } } + +internal fun getTableName(member: ClassMember, dbType: KClass<*>): String { + val tableName = member.annotations.filterIsInstance().singleOrNull()?.value + requireNotNull(tableName) { + "Please annotate ${member.javaMethod} in $dbType with `@TableName`" + } + return tableName +} + +internal class CodecAdapter( + private val internal: Codec +) : app.cash.tempest2.Codec { + override fun toDb(appItem: A): D = internal.toDb(appItem) + + override fun toApp(dbItem: D): A = internal.toApp(dbItem) +} diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/AsyncLogicalDbBatchTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/AsyncLogicalDbBatchTest.kt new file mode 100644 index 000000000..02c0498fa --- /dev/null +++ b/tempest2/src/test/kotlin/app/cash/tempest2/AsyncLogicalDbBatchTest.kt @@ -0,0 +1,129 @@ +/* + * Copyright 2021 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.cash.tempest2 + +import app.cash.tempest2.musiclibrary.AlbumTrack +import app.cash.tempest2.musiclibrary.AsyncMusicDb +import app.cash.tempest2.musiclibrary.PlaylistInfo +import app.cash.tempest2.musiclibrary.testDb +import app.cash.tempest2.testing.asyncLogicalDb +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension +import java.time.Duration + +class AsyncLogicalDbBatchTest { + + @RegisterExtension + @JvmField + val db = testDb() + + private val musicDb by lazy { db.asyncLogicalDb() } + private val musicTable by lazy { musicDb.music } + + @Test + fun batchLoad() = runBlocking { + val albumTracks = listOf( + AlbumTrack("ALBUM_1", 1, "dreamin'", Duration.parse("PT3M28S")), + AlbumTrack("ALBUM_1", 2, "what you do to me", Duration.parse("PT3M24S")), + AlbumTrack("ALBUM_1", 3, "too slow", Duration.parse("PT2M27S")) + ) + for (albumTrack in albumTracks) { + musicTable.albumTracks.save(albumTrack) + } + val playlistInfo = PlaylistInfo( + playlist_token = "PLAYLIST_1", + playlist_name = "WFH Music", + playlist_tracks = listOf(AlbumTrack.Key("ALBUM_1", 1)) + ) + musicTable.playlistInfo.save(playlistInfo) + + val loadedItems = musicDb.batchLoad( + PlaylistInfo.Key("PLAYLIST_1"), + AlbumTrack.Key("ALBUM_1", track_number = 1), + AlbumTrack.Key("ALBUM_1", track_number = 2), + AlbumTrack.Key("ALBUM_1", track_number = 3) + ) + assertThat(loadedItems.getItems()).containsAll(albumTracks) + assertThat(loadedItems.getItems()).containsExactly(playlistInfo) + } + + @Test + fun batchLoadMultipleTables() = runBlocking { + val albumTracks = listOf( + AlbumTrack("ALBUM_1", 1, "dreamin'", Duration.parse("PT3M28S")), + AlbumTrack("ALBUM_1", 2, "what you do to me", Duration.parse("PT3M24S")), + AlbumTrack("ALBUM_1", 3, "too slow", Duration.parse("PT2M27S")) + ) + for (albumTrack in albumTracks) { + musicTable.albumTracks.save(albumTrack) + } + val playlistInfo = PlaylistInfo( + "PLAYLIST_1", + "WFH Music", + listOf(AlbumTrack.Key("ALBUM_1", 1)) + ) + musicTable.playlistInfo.save(playlistInfo) + + val items = musicDb.batchLoad( + AlbumTrack.Key("ALBUM_1", track_number = 1), + AlbumTrack.Key("ALBUM_1", track_number = 2), + AlbumTrack.Key("ALBUM_1", track_number = 3), + PlaylistInfo.Key("PLAYLIST_1") + ) + assertThat(items.getItems()).containsAll(albumTracks) + assertThat(items.getItems()).containsExactly(playlistInfo) + } + + @Test + fun batchLoadAfterBatchWrite() = runBlocking { + val albumTracks = listOf( + AlbumTrack("ALBUM_1", 1, "dreamin'", Duration.parse("PT3M28S")), + AlbumTrack("ALBUM_1", 2, "what you do to me", Duration.parse("PT3M24S")), + AlbumTrack("ALBUM_1", 3, "too slow", Duration.parse("PT2M27S")) + ) + val result = musicDb.batchWrite(BatchWriteSet.Builder().clobber(albumTracks).build()) + assertThat(result.isSuccessful).isTrue() + + val items = musicDb.batchLoad( + AlbumTrack.Key("ALBUM_1", track_number = 1), + AlbumTrack.Key("ALBUM_1", track_number = 2), + AlbumTrack.Key("ALBUM_1", track_number = 3) + ) + assertThat(items).containsAll(albumTracks) + } + + @Test + fun batchLoadAfterBatchDelete() = runBlocking { + val t1 = AlbumTrack("ALBUM_1", 1, "dreamin'", Duration.parse("PT3M28S")) + val t2 = AlbumTrack("ALBUM_1", 2, "what you do to me", Duration.parse("PT3M24S")) + val t3 = AlbumTrack("ALBUM_1", 3, "too slow", Duration.parse("PT2M27S")) + + val result1 = musicDb.batchWrite(BatchWriteSet.Builder().clobber(t1, t2, t3).build()) + assertThat(result1.isSuccessful).isTrue() + val result2 = musicDb.batchWrite(BatchWriteSet.Builder().delete(AlbumTrack.Key("ALBUM_1", 2)).build()) + assertThat(result2.isSuccessful).isTrue() + + val items = musicDb.batchLoad( + AlbumTrack.Key("ALBUM_1", track_number = 1), + AlbumTrack.Key("ALBUM_1", track_number = 2), + AlbumTrack.Key("ALBUM_1", track_number = 3) + ) + assertThat(items).containsExactly(t3, t1) + } +} diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/AsyncLogicalDbTransactionTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/AsyncLogicalDbTransactionTest.kt new file mode 100644 index 000000000..084e263aa --- /dev/null +++ b/tempest2/src/test/kotlin/app/cash/tempest2/AsyncLogicalDbTransactionTest.kt @@ -0,0 +1,292 @@ +/* + * Copyright 2021 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.cash.tempest2 + +import app.cash.tempest2.musiclibrary.AlbumTrack +import app.cash.tempest2.musiclibrary.AsyncMusicDb +import app.cash.tempest2.musiclibrary.PlaylistInfo +import app.cash.tempest2.musiclibrary.testDb +import app.cash.tempest2.testing.asyncLogicalDb +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension +import software.amazon.awssdk.enhanced.dynamodb.Expression +import software.amazon.awssdk.services.dynamodb.model.AttributeValue +import software.amazon.awssdk.services.dynamodb.model.TransactionCanceledException +import java.time.Duration + +class AsyncLogicalDbTransactionTest { + + @RegisterExtension + @JvmField + val db = testDb() + + private val musicDb by lazy { db.asyncLogicalDb() } + private val musicTable by lazy { musicDb.music } + + @Test + fun transactionLoad() = runBlocking { + val albumTracks = listOf( + AlbumTrack( + "ALBUM_1", + 1, + "dreamin'", + Duration.parse("PT3M28S") + ), + AlbumTrack( + "ALBUM_1", + 2, + "what you do to me", + Duration.parse("PT3M24S") + ), + AlbumTrack( + "ALBUM_1", + 3, + "too slow", + Duration.parse("PT2M27S") + ) + ) + for (albumTrack in albumTracks) { + musicTable.albumTracks.save(albumTrack) + } + val playlistInfo = PlaylistInfo( + "PLAYLIST_1", + "WFH Music", + listOf(AlbumTrack.Key("ALBUM_1", 1)) + ) + musicTable.playlistInfo.save(playlistInfo) + + val loadedItems = musicDb.transactionLoad( + PlaylistInfo.Key("PLAYLIST_1"), + AlbumTrack.Key("ALBUM_1", 1), + AlbumTrack.Key("ALBUM_1", 2), + AlbumTrack.Key("ALBUM_1", 3) + ) + assertThat(loadedItems.getItems()).containsAll(albumTracks) + assertThat(loadedItems.getItems()).contains(playlistInfo) + } + + @Test + fun transactionLoadAfterTransactionWrite() = runBlocking { + val albumTracks = listOf( + AlbumTrack( + "ALBUM_1", + 1, + "dreamin'", + Duration.parse("PT3M28S") + ), + AlbumTrack( + "ALBUM_1", + 2, + "what you do to me", + Duration.parse("PT3M24S") + ), + AlbumTrack( + "ALBUM_1", + 3, + "too slow", + Duration.parse("PT2M27S") + ) + ) + val playlistInfo = + PlaylistInfo("PLAYLIST_1", "WFH Music", listOf()) + + val writeTransaction = TransactionWriteSet.Builder() + .save(albumTracks[0]) + .save(albumTracks[1]) + .save(albumTracks[2]) + .save(playlistInfo) + .build() + musicDb.transactionWrite(writeTransaction) + + // Read items at the same time in a serializable manner. + val loadedItems = musicDb.transactionLoad( + PlaylistInfo.Key("PLAYLIST_1"), + AlbumTrack.Key("ALBUM_1", 1), + AlbumTrack.Key("ALBUM_1", 2), + AlbumTrack.Key("ALBUM_1", 3) + ) + assertThat(loadedItems.getItems()).containsAll(albumTracks) + assertThat(loadedItems.getItems()).containsExactly(playlistInfo) + } + + @Test + fun conditionalUpdateInTransactionWrite() = runBlocking { + val playlistInfoV1 = + PlaylistInfo("PLAYLIST_1", "WFH Music", emptyList()) + musicTable.playlistInfo.save(playlistInfoV1) + val albumTrack = AlbumTrack( + "ALBUM_1", + 1, + "dreamin'", + Duration.parse("PT3M28S") + ) + musicTable.albumTracks.save(albumTrack) + + // Add a PlaylistEntry and update PlaylistInfo, in an ACID manner using transactionWrite. + val playlistInfoV2 = playlistInfoV1.copy( + playlist_name = "WFH Forever Music", + playlist_version = playlistInfoV1.playlist_version + 1 + ) + val writeTransaction = TransactionWriteSet.Builder() + .save( + playlistInfoV2, + ifPlaylistVersionIs(playlistInfoV1.playlist_version) + ) + .delete(AlbumTrack.Key("ALBUM_1", 1)) + .build() + musicDb.transactionWrite(writeTransaction) + + val loadedItems = musicDb.transactionLoad( + PlaylistInfo.Key("PLAYLIST_1"), + AlbumTrack.Key("ALBUM_1", 1) + ) + assertThat(loadedItems.getItems()).containsExactly(playlistInfoV2) + assertThat(loadedItems.getItems()).isEmpty() + } + + @Test + fun conditionalUpdateFailureInTransactionWrite() = runBlocking { + val playlistInfoV1 = + PlaylistInfo("PLAYLIST_1", "WFH Music", emptyList()) + musicTable.playlistInfo.save(playlistInfoV1) + val albumTrack = AlbumTrack( + "ALBUM_1", + 1, + "dreamin'", + Duration.parse("PT3M28S") + ) + musicTable.albumTracks.save(albumTrack) + + // Add a PlaylistEntry and update PlaylistInfo, in an ACID manner using transactionWrite. + val playlistInfoV2 = playlistInfoV1.copy( + playlist_version = playlistInfoV1.playlist_version + 1 + ) + + val writeTransaction = TransactionWriteSet.Builder() + .save( + playlistInfoV2, + ifPlaylistVersionIs(playlistInfoV1.playlist_version) + ) + .delete(AlbumTrack.Key("ALBUM_1", 1)) + .build() + // Introduce a race condition. + musicTable.playlistInfo.save(playlistInfoV2) + + assertThatExceptionOfType(TransactionCanceledException::class.java) + .isThrownBy { + runBlocking { + musicDb.transactionWrite(writeTransaction) + } + } + // Confirm the exception message doesn't contain any item data. + .withMessageContaining( + "Write transaction failed: [" + + "Save item (non-key attributes omitted) music_items[partition_key=AttributeValue(S=PLAYLIST_1),sort_key=AttributeValue(S=INFO_)], " + + "Delete key music_items[partition_key=AttributeValue(S=ALBUM_1),sort_key=AttributeValue(S=TRACK_0000000000000001)]]" + ) + } + + @Test + fun conditionCheckInTransactionWrite() = runBlocking { + val playlistInfoV1 = + PlaylistInfo("PLAYLIST_1", "WFH Music", emptyList()) + musicTable.playlistInfo.save(playlistInfoV1) + val albumTrack = AlbumTrack( + "ALBUM_1", + 1, + "dreamin'", + Duration.parse("PT3M28S") + ) + musicTable.albumTracks.save(albumTrack) + + val playlistInfoV2 = playlistInfoV1.copy( + playlist_tracks = playlistInfoV1.playlist_tracks + AlbumTrack.Key("ALBUM_1", 1), + playlist_version = playlistInfoV1.playlist_version + 1 + ) + val writeTransaction = TransactionWriteSet.Builder() + .save( + playlistInfoV2, + ifPlaylistVersionIs(playlistInfoV1.playlist_version) + ) + // Add a PlaylistEntry only if the AlbumTrack exists. + .checkCondition( + AlbumTrack.Key("ALBUM_1", 1), + trackExists() + ) + .build() + musicDb.transactionWrite(writeTransaction) + + val loadedItems = musicDb.transactionLoad(PlaylistInfo.Key("PLAYLIST_1")) + assertThat(loadedItems.getItems()).containsExactly(playlistInfoV2) + } + + @Test + fun conditionCheckFailureInTransactionWrite() = runBlocking { + val playlistInfoV1 = + PlaylistInfo("PLAYLIST_1", "WFH Music", emptyList()) + musicTable.playlistInfo.save(playlistInfoV1) + + val playlistInfoV2 = playlistInfoV1.copy( + playlist_tracks = playlistInfoV1.playlist_tracks + AlbumTrack.Key("ALBUM_1", 1), + playlist_version = playlistInfoV1.playlist_version + 1 + ) + val writeTransaction = TransactionWriteSet.Builder() + .save( + playlistInfoV2, + ifPlaylistVersionIs(playlistInfoV1.playlist_version) + ) + // Add a playlist entry only if the AlbumTrack exists. + .checkCondition( + AlbumTrack.Key("ALBUM_1", 1), + trackExists() + ) + .build() + + assertThatExceptionOfType(TransactionCanceledException::class.java) + .isThrownBy { + runBlocking { + musicDb.transactionWrite(writeTransaction) + } + } + // Confirm the exception message doesn't contain any item data. + .withMessageContaining( + "Write transaction failed: [" + + "Save item (non-key attributes omitted) music_items[partition_key=AttributeValue(S=PLAYLIST_1),sort_key=AttributeValue(S=INFO_)], " + + "Check key music_items[partition_key=AttributeValue(S=ALBUM_1),sort_key=AttributeValue(S=TRACK_0000000000000001)]]" + ) + } + + private fun ifPlaylistVersionIs(playlist_version: Long): Expression { + return Expression.builder() + .expression("playlist_version = :playlist_version") + .expressionValues( + mapOf( + ":playlist_version" to AttributeValue.builder().n("$playlist_version").build() + ) + ) + .build() + } + + private fun trackExists(): Expression { + return Expression.builder() + .expression("attribute_exists(track_title)") + .build() + } +} diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncQueryableTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncQueryableTest.kt new file mode 100644 index 000000000..2f9c86be5 --- /dev/null +++ b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncQueryableTest.kt @@ -0,0 +1,292 @@ +/* + * Copyright 2021 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.cash.tempest2 + +import app.cash.tempest.musiclibrary.AFTER_HOURS_EP +import app.cash.tempest.musiclibrary.LOCKDOWN_SINGLE +import app.cash.tempest.musiclibrary.THE_DARK_SIDE_OF_THE_MOON +import app.cash.tempest.musiclibrary.THE_WALL +import app.cash.tempest.musiclibrary.WHAT_YOU_DO_TO_ME_SINGLE +import app.cash.tempest2.musiclibrary.AlbumInfo +import app.cash.tempest2.musiclibrary.AlbumTrack +import app.cash.tempest2.musiclibrary.AsyncMusicDb +import app.cash.tempest2.musiclibrary.albumTitles +import app.cash.tempest2.musiclibrary.givenAlbums +import app.cash.tempest2.musiclibrary.testDb +import app.cash.tempest2.musiclibrary.trackTitles +import app.cash.tempest2.testing.asyncLogicalDb +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension +import software.amazon.awssdk.enhanced.dynamodb.Expression +import software.amazon.awssdk.services.dynamodb.model.AttributeValue +import java.time.Duration + +class DynamoDbAsyncQueryableTest { + + @RegisterExtension + @JvmField + val db = testDb() + + private val musicTable by lazy { db.asyncLogicalDb().music } + + @Test + fun primaryIndexBetween() = runBlocking { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val page1 = musicTable.albumTracks.query( + keyCondition = Between( + AlbumTrack.Key(AFTER_HOURS_EP.album_token, 1), + AlbumTrack.Key(AFTER_HOURS_EP.album_token, 1) + ) + ) + assertThat(page1.hasMorePages).isFalse() + assertThat(page1.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..0)) + + val page2 = musicTable.albumTracks.query( + keyCondition = Between( + AlbumTrack.Key(AFTER_HOURS_EP.album_token, 2), + AlbumTrack.Key(AFTER_HOURS_EP.album_token, 3) + ) + ) + assertThat(page2.hasMorePages).isFalse() + assertThat(page2.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(1..2)) + + val page3 = musicTable.albumTracks.query( + keyCondition = Between( + AlbumTrack.Key(AFTER_HOURS_EP.album_token, 1), + AlbumTrack.Key(AFTER_HOURS_EP.album_token, 3) + ) + ) + assertThat(page3.hasMorePages).isFalse() + assertThat(page3.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..2)) + } + + @Test + fun primaryIndexBeginsWith() = runBlocking { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val page1 = musicTable.albumTracks.query( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token)) + ) + assertThat(page1.hasMorePages).isFalse() + assertThat(page1.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles) + + val page2 = musicTable.albumTracks.query( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, 3)) + ) + assertThat(page2.hasMorePages).isFalse() + assertThat(page2.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(2..2)) + } + + @Test + fun primaryIndexFilter() = runBlocking { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val page1 = musicTable.albumTracks.query( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token)), + filterExpression = runLengthLongerThan(Duration.ofMinutes(3)) + ) + assertThat(page1.hasMorePages).isFalse() + assertThat(page1.trackTitles).containsExactly( + AFTER_HOURS_EP.trackTitles[0], + AFTER_HOURS_EP.trackTitles[1], + AFTER_HOURS_EP.trackTitles[4] + ) + } + + @Test + fun primaryIndexPagination() = runBlocking { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val page1 = musicTable.albumTracks.query( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + pageSize = 2 + ) + assertThat(page1.hasMorePages).isTrue() + assertThat(page1.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(0..1)) + + val page2 = musicTable.albumTracks.query( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + pageSize = 2, + initialOffset = page1.offset + ) + assertThat(page2.hasMorePages).isTrue() + assertThat(page2.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(2..3)) + + val page3 = musicTable.albumTracks.query( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + pageSize = 2, + initialOffset = page2.offset + ) + assertThat(page3.hasMorePages).isFalse() + assertThat(page3.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.slice(4..4)) + } + + @Test + fun primaryIndexDesc() = runBlocking { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val page = musicTable.albumTracks.query( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + asc = false + ) + assertThat(page.hasMorePages).isFalse() + assertThat(page.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed()) + } + + @Test + fun primaryIndexDescPagination() = runBlocking { + musicTable.givenAlbums(AFTER_HOURS_EP) + + val page1 = musicTable.albumTracks.query( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + asc = false, + pageSize = 2 + ) + assertThat(page1.hasMorePages).isTrue() + assertThat(page1.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed().slice(0..1)) + + val page2 = musicTable.albumTracks.query( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + asc = false, + pageSize = 2, + initialOffset = page1.offset + ) + assertThat(page2.hasMorePages).isTrue() + assertThat(page2.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed().slice(2..3)) + + val page3 = musicTable.albumTracks.query( + keyCondition = BeginsWith(AlbumTrack.Key(AFTER_HOURS_EP.album_token, "")), + asc = false, + pageSize = 2, + initialOffset = page2.offset + ) + assertThat(page3.hasMorePages).isFalse() + assertThat(page3.trackTitles).containsAll(AFTER_HOURS_EP.trackTitles.reversed().slice(4..4)) + } + + @Test + fun localSecondaryIndex() = runBlocking { + musicTable.givenAlbums(AFTER_HOURS_EP) + val expectedTrackTitles = AFTER_HOURS_EP.trackTitles.sorted() + + val page = musicTable.albumTracksByTitle.query( + keyCondition = BeginsWith(AlbumTrack.TitleIndexOffset(AFTER_HOURS_EP.album_token)) + ) + assertThat(page.hasMorePages).isFalse() + assertThat(page.trackTitles).containsAll(expectedTrackTitles) + } + + @Test + fun localSecondaryIndexPagination() = runBlocking { + musicTable.givenAlbums(AFTER_HOURS_EP) + val expectedTrackTitles = AFTER_HOURS_EP.trackTitles.sorted() + + val page1 = musicTable.albumTracksByTitle.query( + keyCondition = BeginsWith(AlbumTrack.TitleIndexOffset(AFTER_HOURS_EP.album_token)), + pageSize = 2 + ) + assertThat(page1.hasMorePages).isTrue() + assertThat(page1.trackTitles).containsAll(expectedTrackTitles.slice(0..1)) + + val page2 = musicTable.albumTracksByTitle.query( + keyCondition = BeginsWith(AlbumTrack.TitleIndexOffset(AFTER_HOURS_EP.album_token)), + pageSize = 2, + initialOffset = page1.offset + ) + assertThat(page2.hasMorePages).isTrue() + assertThat(page2.trackTitles).containsAll(expectedTrackTitles.slice(2..3)) + + val page3 = musicTable.albumTracksByTitle.query( + keyCondition = BeginsWith(AlbumTrack.TitleIndexOffset(AFTER_HOURS_EP.album_token)), + pageSize = 2, + initialOffset = page2.offset + ) + assertThat(page3.hasMorePages).isFalse() + assertThat(page3.trackTitles).containsAll(expectedTrackTitles.slice(4..4)) + } + + @Test + fun globalSecondaryIndex() = runBlocking { + musicTable.givenAlbums( + THE_DARK_SIDE_OF_THE_MOON, + THE_WALL, + WHAT_YOU_DO_TO_ME_SINGLE, + AFTER_HOURS_EP, + LOCKDOWN_SINGLE + ) + val artist1Page = musicTable.albumInfoByArtist.query( + BeginsWith(AlbumInfo.ArtistIndexOffset("Pink Floyd", "")) + ) + assertThat(artist1Page.hasMorePages).isFalse() + assertThat(artist1Page.albumTitles).containsExactly( + THE_DARK_SIDE_OF_THE_MOON.album_title, + THE_WALL.album_title + ) + + val artist2Page = musicTable.albumInfoByArtist.query( + BeginsWith(AlbumInfo.ArtistIndexOffset("53 Theives", "")) + ) + assertThat(artist2Page.hasMorePages).isFalse() + assertThat(artist2Page.albumTitles).containsExactly( + AFTER_HOURS_EP.album_title, + WHAT_YOU_DO_TO_ME_SINGLE.album_title, + LOCKDOWN_SINGLE.album_title + ) + } + + @Test + fun globalSecondaryIndexPagination() = runBlocking { + musicTable.givenAlbums( + WHAT_YOU_DO_TO_ME_SINGLE, + AFTER_HOURS_EP, + LOCKDOWN_SINGLE + ) + val page1 = musicTable.albumInfoByArtist.query( + BeginsWith(AlbumInfo.ArtistIndexOffset("53 Theives", "")), + pageSize = 2 + ) + assertThat(page1.hasMorePages).isTrue() + assertThat(page1.albumTitles).containsExactly( + AFTER_HOURS_EP.album_title, + WHAT_YOU_DO_TO_ME_SINGLE.album_title + ) + + val page2 = musicTable.albumInfoByArtist.query( + BeginsWith(AlbumInfo.ArtistIndexOffset("53 Theives", "")), + pageSize = 2, + initialOffset = page1.offset + ) + assertThat(page2.hasMorePages).isFalse() + assertThat(page2.albumTitles).containsExactly( + LOCKDOWN_SINGLE.album_title + ) + } + + private fun runLengthLongerThan(duration: Duration): Expression { + return Expression.builder() + .expression("run_length > :duration") + .expressionValues( + mapOf( + ":duration" to AttributeValue.builder().s(duration.toString()).build() + ) + ) + .build() + } +} diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncScannableTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncScannableTest.kt new file mode 100644 index 000000000..5cfad4d96 --- /dev/null +++ b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncScannableTest.kt @@ -0,0 +1,177 @@ +/* + * Copyright 2021 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.cash.tempest2 + +import app.cash.tempest.musiclibrary.AFTER_HOURS_EP +import app.cash.tempest.musiclibrary.LOCKDOWN_SINGLE +import app.cash.tempest.musiclibrary.THE_DARK_SIDE_OF_THE_MOON +import app.cash.tempest.musiclibrary.THE_WALL +import app.cash.tempest.musiclibrary.WHAT_YOU_DO_TO_ME_SINGLE +import app.cash.tempest2.musiclibrary.AsyncMusicDb +import app.cash.tempest2.musiclibrary.albumTitles +import app.cash.tempest2.musiclibrary.givenAlbums +import app.cash.tempest2.musiclibrary.testDb +import app.cash.tempest2.musiclibrary.trackTitles +import app.cash.tempest2.testing.asyncLogicalDb +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension +import software.amazon.awssdk.enhanced.dynamodb.Expression +import software.amazon.awssdk.services.dynamodb.model.AttributeValue +import java.time.Duration + +class DynamoDbAsyncScannableTest { + + @RegisterExtension + @JvmField + val db = testDb() + + private val musicTable by lazy { db.asyncLogicalDb().music } + + @Test + fun primaryIndex() = runBlocking { + musicTable.givenAlbums( + THE_DARK_SIDE_OF_THE_MOON, + THE_WALL, + WHAT_YOU_DO_TO_ME_SINGLE, + AFTER_HOURS_EP, + LOCKDOWN_SINGLE + ) + + val page1 = musicTable.albumInfo.scan( + filterExpression = releaseYearIs(2020) + ) + + assertThat(page1.hasMorePages).isFalse() + assertThat(page1.albumTitles).containsExactly( + AFTER_HOURS_EP.album_title, + LOCKDOWN_SINGLE.album_title + ) + } + + @Test + fun localSecondaryIndex() = runBlocking { + musicTable.givenAlbums(THE_WALL) + val expectedTrackTitles = THE_WALL.trackTitles.sorted() + + val page1 = musicTable.albumTracksByTitle.scan( + pageSize = 20 + ) + assertThat(page1.hasMorePages).isTrue() + assertThat(page1.trackTitles).containsAll(expectedTrackTitles.slice(0..19)) + + val page2 = musicTable.albumTracksByTitle.scan( + pageSize = 20, + initialOffset = page1.offset + ) + assertThat(page2.hasMorePages).isFalse() + assertThat(page2.trackTitles).containsAll(expectedTrackTitles.slice(20..24)) + } + + @Test + fun localSecondaryIndexWithFilter() = runBlocking { + musicTable.givenAlbums(THE_WALL) + val expectedTrackTitles = THE_WALL.tracks + .filter { it.run_length > Duration.ofMinutes(3) } + .map { it.track_title } + .sorted() + + val page1 = musicTable.albumTracksByTitle.scan( + filterExpression = runLengthLongerThan(Duration.ofMinutes(3)) + ) + + assertThat(page1.hasMorePages).isFalse() + assertThat(page1.trackTitles).containsAll(expectedTrackTitles) + } + + @Test + fun globalSecondaryIndex() = runBlocking { + musicTable.givenAlbums( + THE_DARK_SIDE_OF_THE_MOON, + THE_WALL, + WHAT_YOU_DO_TO_ME_SINGLE, + AFTER_HOURS_EP, + LOCKDOWN_SINGLE + ) + + val page1 = musicTable.albumInfoByArtist.scan() + + assertThat(page1.hasMorePages).isFalse() + assertThat(page1.albumTitles).containsExactly( + AFTER_HOURS_EP.album_title, + WHAT_YOU_DO_TO_ME_SINGLE.album_title, + LOCKDOWN_SINGLE.album_title, + THE_DARK_SIDE_OF_THE_MOON.album_title, + THE_WALL.album_title + ) + } + + @Test + fun globalSecondaryIndexWithFilter() = runBlocking { + musicTable.givenAlbums( + THE_DARK_SIDE_OF_THE_MOON, + THE_WALL, + WHAT_YOU_DO_TO_ME_SINGLE, + AFTER_HOURS_EP, + LOCKDOWN_SINGLE + ) + + val page1 = musicTable.albumInfoByArtist.scan( + filterExpression = releaseYearIs(2020) + ) + + assertThat(page1.hasMorePages).isFalse() + assertThat(page1.albumTitles).containsExactly( + AFTER_HOURS_EP.album_title, + LOCKDOWN_SINGLE.album_title + ) + } + + private fun releaseYearIs(year: Int): Expression { + return Expression.builder() + .expression("begins_with(release_date, :year)") + .expressionValues( + mapOf( + ":year" to AttributeValue.builder().s("$year").build() + ) + ) + .build() + } + + private fun isTrack(): Expression { + return Expression.builder() + .expression("begins_with(sort_key, :track_prefix)") + .expressionValues( + mapOf( + ":track_prefix" to AttributeValue.builder().s("TRACK_").build() + ) + ) + .build() + } + + private fun runLengthLongerThan(duration: Duration): Expression { + return Expression.builder() + .expression("run_length > :duration") + .expressionValues( + mapOf( + ":duration" to AttributeValue.builder().s(duration.toString()).build() + ) + ) + .build() + } +} diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncViewTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncViewTest.kt new file mode 100644 index 000000000..ed2ae20e3 --- /dev/null +++ b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbAsyncViewTest.kt @@ -0,0 +1,154 @@ +/* + * Copyright 2021 Square Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package app.cash.tempest2 + +import app.cash.tempest2.musiclibrary.AlbumInfo +import app.cash.tempest2.musiclibrary.AlbumTrack +import app.cash.tempest2.musiclibrary.AsyncMusicDb +import app.cash.tempest2.musiclibrary.PlaylistInfo +import app.cash.tempest2.musiclibrary.testDb +import app.cash.tempest2.testing.asyncLogicalDb +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.RegisterExtension +import software.amazon.awssdk.enhanced.dynamodb.Expression +import software.amazon.awssdk.services.dynamodb.model.AttributeValue +import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException +import java.time.LocalDate + +class DynamoDbAsyncViewTest { + + @RegisterExtension + @JvmField + val db = testDb() + + private val musicTable by lazy { db.asyncLogicalDb().music } + + @Test + fun loadAfterSave() = runBlocking { + val albumInfo = AlbumInfo( + "ALBUM_1", + "after hours - EP", + "53 Thieves", + LocalDate.of(2020, 2, 21), + "Contemporary R&B" + ) + musicTable.albumInfo.save(albumInfo) + + // Query the movies created. + val loadedAlbumInfo = musicTable.albumInfo.load(albumInfo.key)!! + + assertThat(loadedAlbumInfo.album_token).isEqualTo(albumInfo.album_token) + assertThat(loadedAlbumInfo.artist_name).isEqualTo(albumInfo.artist_name) + assertThat(loadedAlbumInfo.release_date).isEqualTo(albumInfo.release_date) + assertThat(loadedAlbumInfo.genre_name).isEqualTo(albumInfo.genre_name) + } + + @Test + fun saveIfNotExist() = runBlocking { + val albumInfo = AlbumInfo( + "ALBUM_1", + "after hours - EP", + "53 Thieves", + LocalDate.of(2020, 2, 21), + "Contemporary R&B" + ) + musicTable.albumInfo.save(albumInfo, ifNotExist()) + + // This fails because the album info already exists. + assertThatExceptionOfType(ConditionalCheckFailedException::class.java) + .isThrownBy { + runBlocking { + musicTable.albumInfo.save(albumInfo, ifNotExist()) + } + } + } + + @Test + fun optimisticLocking() = runBlocking { + val playlistInfoV1 = PlaylistInfo( + "PLAYLIST_1", + "WFH Music", + listOf( + AlbumTrack.Key("ALBUM_1", 1), + AlbumTrack.Key("ALBUM_3", 2) + ) + ) + musicTable.playlistInfo.save(playlistInfoV1) + + // Update PlaylistInfo only if playlist_version is 0. + val playlistInfoV2 = playlistInfoV1.copy( + playlist_name = "WFH Forever Music", + playlist_version = 2 + ) + musicTable.playlistInfo.save( + playlistInfoV2, + ifPlaylistVersionIs(playlistInfoV1.playlist_version) + ) + + val actualPlaylistInfoV2 = musicTable.playlistInfo.load(PlaylistInfo.Key("PLAYLIST_1"))!! + assertThat(actualPlaylistInfoV2).isEqualTo(playlistInfoV2) + + // This fails because playlist_size is already 1. + assertThatExceptionOfType(ConditionalCheckFailedException::class.java) + .isThrownBy { + runBlocking { + musicTable.playlistInfo.save( + playlistInfoV2, + ifPlaylistVersionIs(playlistInfoV1.playlist_version) + ) + } + } + } + + @Test + fun delete() = runBlocking { + val albumInfo = AlbumInfo( + "ALBUM_1", + "after hours - EP", + "53 Thieves", + LocalDate.of(2020, 2, 21), + "Contemporary R&B" + ) + musicTable.albumInfo.save(albumInfo) + + val deleted = musicTable.albumInfo.deleteKey(albumInfo.key) + assertThat(deleted).isEqualTo(albumInfo) + + val loadedAlbumInfo = musicTable.albumInfo.load(albumInfo.key) + assertThat(loadedAlbumInfo).isNull() + } + + private fun ifNotExist(): Expression { + return Expression.builder() + .expression("attribute_not_exists(partition_key)") + .build() + } + + private fun ifPlaylistVersionIs(playlist_version: Long): Expression { + return Expression.builder() + .expression("playlist_version = :version") + .expressionValues( + mapOf( + ":version" to AttributeValue.builder().n("$playlist_version").build() + ) + ) + .build() + } +} diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbViewTest.kt b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbViewTest.kt index a4cc1895b..db2102268 100644 --- a/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbViewTest.kt +++ b/tempest2/src/test/kotlin/app/cash/tempest2/DynamoDbViewTest.kt @@ -123,7 +123,8 @@ class DynamoDbViewTest { ) musicTable.albumInfo.save(albumInfo) - musicTable.albumInfo.deleteKey(albumInfo.key) + val deleted = musicTable.albumInfo.deleteKey(albumInfo.key) + assertThat(deleted).isEqualTo(albumInfo) val loadedAlbumInfo = musicTable.albumInfo.load(albumInfo.key) assertThat(loadedAlbumInfo).isNull() diff --git a/tempest2/src/test/kotlin/app/cash/tempest2/musiclibrary/TestUtils.kt b/tempest2/src/test/kotlin/app/cash/tempest2/musiclibrary/TestUtils.kt index fc9b62728..0072b2de8 100644 --- a/tempest2/src/test/kotlin/app/cash/tempest2/musiclibrary/TestUtils.kt +++ b/tempest2/src/test/kotlin/app/cash/tempest2/musiclibrary/TestUtils.kt @@ -102,3 +102,27 @@ fun MusicTable.givenAlbums(vararg albums: Album) { } } } + +suspend fun AsyncMusicTable.givenAlbums(vararg albums: Album) { + for (album in albums) { + albumInfo.save( + AlbumInfo( + album.album_token, + album.album_title, + album.artist_name, + album.release_date, + album.genre_name + ) + ) + for ((i, track) in album.tracks.withIndex()) { + albumTracks.save( + AlbumTrack( + album.album_token, + i + 1L, + track.track_title, + track.run_length + ) + ) + } + } +}