Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jangalinski committed Jul 24, 2024
1 parent 26d55b9 commit 1c71162
Show file tree
Hide file tree
Showing 36 changed files with 421 additions and 153 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ build/
### DEV
_tmp/
.repository/

.console.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package io.toolisticon.kotlin.avro.example.java;

import io.toolisticon.example.bank.BankAccountCreated;
import io.toolisticon.kotlin.avro.repository.AvroSchemaResolverMap;
import org.javamoney.moneta.Money;
import org.junit.jupiter.api.Test;

import java.util.UUID;

import static io.toolisticon.kotlin.avro.AvroKotlin.avroSchemaResolver;
import static io.toolisticon.kotlin.avro.codec.SpecificRecordCodec.specificRecordSingleObjectDecoder;
import static io.toolisticon.kotlin.avro.codec.SpecificRecordCodec.specificRecordSingleObjectEncoder;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -21,7 +21,7 @@ void encodeAndDecodeEventWithMoneyLogicalType() {
.setCustomerId("1")
.setInitialBalance(Money.of(100.123456, "EUR"))
.build();
final var resolver = avroSchemaResolver(BankAccountCreated.getClassSchema());
final var resolver = new AvroSchemaResolverMap(BankAccountCreated.getClassSchema());

final var encoded = specificRecordSingleObjectEncoder().encode(bankAccountCreated);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.toolisticon.kotlin.avro.example

import io.toolisticon.kotlin.avro.AvroKotlin
import io.toolisticon.kotlin.avro.codec.GenericRecordCodec
import io.toolisticon.kotlin.avro.example.customerid.CustomerId
import io.toolisticon.kotlin.avro.example.money.MoneyLogicalType
import io.toolisticon.kotlin.avro.repository.avroSchemaResolver
import io.toolisticon.kotlin.avro.serialization.AvroKotlinSerialization
import io.toolisticon.kotlin.avro.value.CanonicalName.Companion.toCanonicalName
import io.toolisticon.kotlin.avro.value.Name.Companion.toName
Expand All @@ -17,7 +17,6 @@ internal class BankAccountCreatedDataTest {
@Test
fun `show schema`() {
val schema = KotlinExample.avro.schema(BankAccountCreatedData::class)
println(schema)
assertThat(schema.canonicalName).isEqualTo("io.toolisticon.bank.BankAccountCreated".toCanonicalName())
assertThat(schema.fields).hasSize(3)

Expand Down Expand Up @@ -49,7 +48,7 @@ internal class BankAccountCreatedDataTest {
val customerId = CustomerId.random()

val event = BankAccountCreatedData(accountId, customerId, amount)
val resolver = avroSchemaResolver(KotlinExample.avro.schema(BankAccountCreatedData::class))
val resolver = AvroKotlin.avroSchemaResolver(KotlinExample.avro.schema(BankAccountCreatedData::class))

val record = KotlinExample.avro.toRecord(event)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.toolisticon.kotlin.avro.example

import io.toolisticon.example.bank.BankAccountCreated
import io.toolisticon.kotlin.avro.AvroKotlin.avroSchemaResolver
import io.toolisticon.kotlin.avro.codec.GenericRecordCodec
import io.toolisticon.kotlin.avro.example.customerid.CustomerId
import io.toolisticon.kotlin.avro.model.wrapper.AvroSchema
import io.toolisticon.kotlin.avro.repository.avroSchemaResolver
import io.toolisticon.kotlin.avro.serialization.AvroKotlinSerialization
import io.toolisticon.kotlin.avro.value.SingleObjectEncodedBytes
import org.assertj.core.api.Assertions.assertThat
Expand Down Expand Up @@ -54,6 +54,5 @@ internal class JavaKotlinInterOpTest {
assertThat(decoded.accountId).isEqualTo(orig.accountId)
assertThat(decoded.customerId).isEqualTo(orig.customerId)
assertThat(decoded.initialBalance).isEqualTo(orig.initialBalance)

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,31 @@ import io.toolisticon.kotlin.avro.AvroKotlin
import io.toolisticon.kotlin.avro.codec.AvroCodec
import io.toolisticon.kotlin.avro.codec.GenericRecordCodec
import io.toolisticon.kotlin.avro.model.wrapper.AvroSchema
import io.toolisticon.kotlin.avro.model.wrapper.AvroSchemaChecks.compatibleToReadFrom
import io.toolisticon.kotlin.avro.repository.AvroSchemaResolver
import io.toolisticon.kotlin.avro.repository.MutableAvroSchemaResolver
import io.toolisticon.kotlin.avro.repository.AvroSchemaResolverMutableMap
import io.toolisticon.kotlin.avro.serialization.avro4k.avro4k
import io.toolisticon.kotlin.avro.serialization.spi.AvroSerializationModuleFactoryServiceLoader
import io.toolisticon.kotlin.avro.serialization.spi.SerializerModuleKtx.reduce
import io.toolisticon.kotlin.avro.value.AvroSchemaCompatibilityMap
import io.toolisticon.kotlin.avro.value.SingleObjectEncodedBytes
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.KSerializer
import kotlinx.serialization.modules.SerializersModule
import kotlinx.serialization.serializer
import mu.KLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import java.lang.Runtime.Version
import java.util.concurrent.ConcurrentHashMap
import kotlin.reflect.KClass
import kotlin.reflect.KType
import kotlin.reflect.full.createType

@OptIn(ExperimentalSerializationApi::class)
class AvroKotlinSerialization(
private val avro4k: Avro,
private val schemaResolver: MutableAvroSchemaResolver = MutableAvroSchemaResolver.EMPTY,
val avro4k: Avro,
private val schemaResolver: AvroSchemaResolverMutableMap = AvroSchemaResolverMutableMap.EMPTY,
private val genericData: GenericData = AvroKotlin.genericData
) {
) : AvroSchemaResolver by schemaResolver {
companion object : KLogging() {

fun configure(vararg serializersModules: SerializersModule): AvroKotlinSerialization {
Expand All @@ -42,41 +41,85 @@ class AvroKotlinSerialization(
}
}

@PublishedApi
internal val avro4kSingleObject = AvroSingleObject(schemaRegistry = schemaResolver.avro4k(), avro = avro4k)
init {
/**
* We _need_ `kotlinx.serialization >= 1.7`. spring boot provides an outdated version (1.6.3).
* This is a pita to resolve. This check makes sure, any misconfigurations are found on app-start.
*/
check(Version.parse(KSerializer::class.java.`package`.implementationVersion) >= Version.parse("1.7")) { "avro4k uses features that required kotlinx.serialization version >= 1.7.0. Make sure to include the correct versions, especially when you use spring-boot." }
}

val avro4kSingleObject = AvroSingleObject(schemaRegistry = schemaResolver.avro4k(), avro = avro4k)

val compatibilityCache = AvroSchemaCompatibilityMap()

private val kserializerCache = ConcurrentHashMap<KClass<*>, KSerializer<*>>()
private val schemaCache = ConcurrentHashMap<KClass<*>, AvroSchema>()

constructor() : this(
Avro { serializersModule = AvroSerializationModuleFactoryServiceLoader() }
)

@Suppress("UNCHECKED_CAST")
fun <T : Any> toGenericRecord(data: T): GenericRecord {
val kserializer = serializer(data::class) as KSerializer<T>
val schema = avro4k.schema(kserializer)
fun <T : Any> singleObjectEncoder(): AvroCodec.SingleObjectEncoder<T> = AvroCodec.SingleObjectEncoder { data ->
@Suppress("UNCHECKED_CAST")
val serializer = serializer(data::class) as KSerializer<T>
val writerSchema = schema(data::class)

return avro4k.encodeToGenericData(schema, kserializer, data) as GenericRecord
val bytes = avro4kSingleObject.encodeToByteArray(writerSchema.get(), serializer, data)

SingleObjectEncodedBytes.of(bytes)
}

fun <T : Any> fromGenericRecord(record: GenericRecord): T {
TODO("Not yet implemented")
fun <T : Any> singleObjectDecoder(): AvroCodec.SingleObjectDecoder<T> = AvroCodec.SingleObjectDecoder { bytes ->
val writerSchema = schemaResolver[bytes.fingerprint]
val klass = AvroKotlin.loadClassForSchema<T>(writerSchema)

@Suppress("UNCHECKED_CAST")
avro4kSingleObject.decodeFromByteArray(serializer(klass), bytes.value) as T
}

@Suppress("UNCHECKED_CAST")
fun <T : Any> toSingleObjectEncoded(data: T): SingleObjectEncodedBytes {
fun <T : Any> genericRecordEncoder(): AvroCodec.GenericRecordEncoder<T> = AvroCodec.GenericRecordEncoder { data ->
@Suppress("UNCHECKED_CAST")
val serializer = serializer(data::class) as KSerializer<T>
val writerSchema = schema(data::class)

val bytes = avro4kSingleObject.encodeToByteArray(writerSchema.get(), serializer, data)
avro4k.encodeToGenericData(writerSchema.get(), serializer, data) as GenericRecord
}

/**
* @param klass - optional. If we do know the klass already, we can pass it to avoid a second lookup.
*/
fun <T : Any> genericRecordDecoder(klass: KClass<T>? = null) = AvroCodec.GenericRecordDecoder { record ->
val writerSchema = AvroSchema(record.schema)
val readerKlass: KClass<T> = klass ?: AvroKotlin.loadClassForSchema(writerSchema)

@Suppress("UNCHECKED_CAST")
val kserializer = serializer(readerKlass) as KSerializer<T>
val readerSchema = schema(readerKlass)
val compatibility = compatibilityCache.compatibleToReadFrom(writerSchema, readerSchema)

require(compatibility.isCompatible) { "Reader/writer schema are incompatible." }

avro4k.decodeFromGenericData(writerSchema = writerSchema.get(), deserializer = kserializer, record)
}

@Suppress("UNCHECKED_CAST")
fun <T : Any> toGenericRecord(data: T): GenericRecord {
val kserializer = serializer(data::class) as KSerializer<T>
val schema = avro4k.schema(kserializer)

return SingleObjectEncodedBytes.of(bytes)
return avro4k.encodeToGenericData(schema, kserializer, data) as GenericRecord
}

fun <T : Any> toSingleObjectEncoded(data: T): SingleObjectEncodedBytes = singleObjectEncoder<T>().encode(data)

/**
* @return kotlinx-serializer for given class.
*/
fun serializer(klass: KClass<*>) = kserializerCache.computeIfAbsent(klass) { key ->

require(klass.isSerializable())

// TODO: if we use SpecificRecords, we could derive the schema from the class directly
logger.trace { "add kserializer for $key." }

Expand All @@ -97,17 +140,8 @@ class AvroKotlinSerialization(

inline fun <reified T : Any> fromRecord(record: GenericRecord): T = fromRecord(record, T::class)

@Suppress("UNCHECKED_CAST")
fun <T : Any> fromRecord(record: GenericRecord, type: KClass<T>): T {
val writerSchema = AvroSchema(record.schema)

val kserializer = serializer(type) as KSerializer<T>
val readerSchema = schema(type)

// TODO nicer?
require(readerSchema.compatibleToReadFrom(writerSchema).result.incompatibilities.isEmpty()) { "Reader/writer schema are incompatible" }

return avro4k.decodeFromGenericData(writerSchema = writerSchema.get(), deserializer = kserializer, record) as T
return genericRecordDecoder(type).decode(record)
}

fun <T : Any> encodeSingleObject(
Expand Down Expand Up @@ -147,4 +181,7 @@ class AvroKotlinSerialization(
)

fun registerSchema(schema: AvroSchema): AvroKotlinSerialization = apply { schemaResolver + schema }

fun cachedSerializerClasses() = kserializerCache.keys().toList().toSet()
fun cachedSchemaClasses() = schemaCache.keys().toList().toSet()
}

This file was deleted.

43 changes: 43 additions & 0 deletions avro-kotlin-serialization/src/main/kotlin/_reflection.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.toolisticon.kotlin.avro.serialization

import kotlinx.serialization.KSerializer
import kotlinx.serialization.Serializable
import org.apache.avro.specific.SpecificRecordBase
import kotlin.reflect.KClass
import kotlin.reflect.full.companionObject
import kotlin.reflect.full.companionObjectInstance
import kotlin.reflect.full.functions

/**
* Reflective access to [KSerializer<*>] for a given type.
*
* The type has to be a data class or enum and annotated with [Serializable].
*
* @return kserializer of given type
* @throws IllegalArgumentException when type is not a valid [Serializable] type
*/
@Throws(IllegalArgumentException::class)
@Deprecated("provided directly by kotlinx.serialization")
fun KClass<*>.kserializer(): KSerializer<*> {
require(this.isData) { "Type ${this.qualifiedName} is not a data class." }
require(this.isSerializable()) { "Type ${this.qualifiedName} is not serializable." }

val serializerFn =
requireNotNull(this.companionObject?.functions?.find { it.name == "serializer" }) { "Type ${this.qualifiedName} must have a Companion.serializer, as created by the serialization compiler plugin." }

return serializerFn.call(this.companionObjectInstance) as KSerializer<*>
}

fun KClass<*>.isSerializable(): Boolean = this.annotations.any { it is Serializable }

fun KClass<*>.isKotlinxDataClass(): Boolean {
// TODO: can this check be replaced by some convenience magic from kotlinx.serialization
return this.isData && this.isSerializable()
}

fun KClass<*>.isKotlinxEnumClass(): Boolean {
// TODO: can this check be replaced by some convenience magic from kotlinx.serialization
return this.java.isEnum && this.isSerializable()
}

fun KClass<*>.isGeneratedSpecificRecordBase(): Boolean = SpecificRecordBase::class.java.isAssignableFrom(this.java)
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.toolisticon.kotlin.avro.serialization.strategy

import org.apache.avro.generic.GenericRecord
import java.util.function.Predicate
import kotlin.reflect.KClass

interface GenericRecordSerializationStrategy : Predicate<KClass<*>> {

fun <T : Any> deserialize(serializedType: KClass<*>, data: GenericRecord): T

fun <T : Any> serialize(data: T): GenericRecord
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.toolisticon.kotlin.avro.serialization.strategy

import io.toolisticon.kotlin.avro.serialization.AvroKotlinSerialization
import io.toolisticon.kotlin.avro.serialization.isKotlinxDataClass
import org.apache.avro.generic.GenericRecord
import kotlin.reflect.KClass

class KotlinxDataClassStrategy(
private val avroKotlinSerialization: AvroKotlinSerialization

Check warning on line 9 in avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxDataClassStrategy.kt

View check run for this annotation

Codecov / codecov/patch

avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxDataClassStrategy.kt#L8-L9

Added lines #L8 - L9 were not covered by tests
) : GenericRecordSerializationStrategy {

override fun test(serializedType: KClass<*>): Boolean = serializedType.isKotlinxDataClass()

Check warning on line 12 in avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxDataClassStrategy.kt

View check run for this annotation

Codecov / codecov/patch

avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxDataClassStrategy.kt#L12

Added line #L12 was not covered by tests

@Suppress("UNCHECKED_CAST")
override fun <T : Any> deserialize(serializedType: KClass<*>, data: GenericRecord): T {
return avroKotlinSerialization.genericRecordDecoder(serializedType).decode(data) as T

Check warning on line 16 in avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxDataClassStrategy.kt

View check run for this annotation

Codecov / codecov/patch

avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxDataClassStrategy.kt#L16

Added line #L16 was not covered by tests
}

override fun <T : Any> serialize(data: T): GenericRecord {
return avroKotlinSerialization.genericRecordEncoder<T>().encode(data)

Check warning on line 20 in avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxDataClassStrategy.kt

View check run for this annotation

Codecov / codecov/patch

avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxDataClassStrategy.kt#L20

Added line #L20 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.toolisticon.kotlin.avro.serialization.strategy

import io.toolisticon.kotlin.avro.serialization.AvroKotlinSerialization
import io.toolisticon.kotlin.avro.serialization.isKotlinxEnumClass
import org.apache.avro.generic.GenericRecord
import kotlin.reflect.KClass

class KotlinxEnumClassStrategy(
private val avroKotlinSerialization: AvroKotlinSerialization

Check warning on line 9 in avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxEnumClassStrategy.kt

View check run for this annotation

Codecov / codecov/patch

avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxEnumClassStrategy.kt#L8-L9

Added lines #L8 - L9 were not covered by tests
) : GenericRecordSerializationStrategy {

override fun test(serializedType: KClass<*>): Boolean = serializedType.isKotlinxEnumClass()

Check warning on line 12 in avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxEnumClassStrategy.kt

View check run for this annotation

Codecov / codecov/patch

avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxEnumClassStrategy.kt#L12

Added line #L12 was not covered by tests

@Suppress("UNCHECKED_CAST")
override fun <T : Any> deserialize(serializedType: KClass<*>, data: GenericRecord): T {
return avroKotlinSerialization.genericRecordDecoder(serializedType).decode(data) as T

Check warning on line 16 in avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxEnumClassStrategy.kt

View check run for this annotation

Codecov / codecov/patch

avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxEnumClassStrategy.kt#L16

Added line #L16 was not covered by tests
}

override fun <T : Any> serialize(data: T): GenericRecord {
return avroKotlinSerialization.genericRecordEncoder<T>().encode(data)

Check warning on line 20 in avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxEnumClassStrategy.kt

View check run for this annotation

Codecov / codecov/patch

avro-kotlin-serialization/src/main/kotlin/strategy/KotlinxEnumClassStrategy.kt#L20

Added line #L20 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.toolisticon.kotlin.avro.serialization.strategy

import io.toolisticon.kotlin.avro.codec.SpecificRecordCodec
import io.toolisticon.kotlin.avro.serialization.isGeneratedSpecificRecordBase
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecordBase
import kotlin.reflect.KClass

class SpecificRecordBaseStrategy : GenericRecordSerializationStrategy {
private val converter = SpecificRecordCodec.specificRecordToGenericRecordConverter()

Check warning on line 10 in avro-kotlin-serialization/src/main/kotlin/strategy/SpecificRecordBaseStrategy.kt

View check run for this annotation

Codecov / codecov/patch

avro-kotlin-serialization/src/main/kotlin/strategy/SpecificRecordBaseStrategy.kt#L9-L10

Added lines #L9 - L10 were not covered by tests

override fun test(serializedType: KClass<*>): Boolean = serializedType.isGeneratedSpecificRecordBase()

Check warning on line 12 in avro-kotlin-serialization/src/main/kotlin/strategy/SpecificRecordBaseStrategy.kt

View check run for this annotation

Codecov / codecov/patch

avro-kotlin-serialization/src/main/kotlin/strategy/SpecificRecordBaseStrategy.kt#L12

Added line #L12 was not covered by tests

override fun <T : Any> deserialize(serializedType: KClass<*>, data: GenericRecord): T {
@Suppress("UNCHECKED_CAST")
return SpecificRecordCodec.genericRecordToSpecificRecordConverter(serializedType.java).convert(data) as T

Check warning on line 16 in avro-kotlin-serialization/src/main/kotlin/strategy/SpecificRecordBaseStrategy.kt

View check run for this annotation

Codecov / codecov/patch

avro-kotlin-serialization/src/main/kotlin/strategy/SpecificRecordBaseStrategy.kt#L16

Added line #L16 was not covered by tests
}

override fun <T : Any> serialize(data: T): GenericRecord = converter.convert(data as SpecificRecordBase)

Check warning on line 19 in avro-kotlin-serialization/src/main/kotlin/strategy/SpecificRecordBaseStrategy.kt

View check run for this annotation

Codecov / codecov/patch

avro-kotlin-serialization/src/main/kotlin/strategy/SpecificRecordBaseStrategy.kt#L19

Added line #L19 was not covered by tests
}
Loading

0 comments on commit 1c71162

Please sign in to comment.