Skip to content

Commit

Permalink
Expose creation of CompatibilityChecker (fd4s#988)
Browse files Browse the repository at this point in the history
* Expose creation of compat checker

* trigger build
  • Loading branch information
keirlawson authored Oct 16, 2023
1 parent e8aac1e commit e8c6339
Showing 1 changed file with 41 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,42 +39,51 @@ trait SchemaSuite extends FunSuite {
override def apply(): CompatibilityChecker[IO] = checker

override def beforeAll(): Unit =
checker = clientSettings.createSchemaRegistryClient
.map { client =>
new CompatibilityChecker[IO] {
private def registrySchema(subject: String): IO[Schema] =
for {
metadata <- IO.delay(client.getLatestSchemaMetadata(subject))
schema <- IO.delay(
client.getSchemaById(metadata.getId).asInstanceOf[AvroSchema]
)
} yield schema.rawSchema()
checker = newCompatibilityChecker(clientSettings)
.unsafeRunSync()
}

def newCompatibilityChecker(
clientSettings: SchemaRegistryClientSettings[IO]
): IO[CompatibilityChecker[IO]] =
clientSettings.createSchemaRegistryClient
.map { client =>
new CompatibilityChecker[IO] {

private def registrySchema(subject: String): IO[Schema] =
for {
metadata <- IO.delay(client.getLatestSchemaMetadata(subject))
schema <- IO.delay(
client.getSchemaById(metadata.getId).asInstanceOf[AvroSchema]
)
} yield schema.rawSchema()

def checkReaderCompatibility[A](
reader: Codec[A],
writerSubject: String
): IO[SchemaCompatibility.SchemaPairCompatibility] = {
val vulcanSchema = codecAsSchema(reader)
registrySchema(writerSubject).map { regSchema =>
SchemaCompatibility.checkReaderWriterCompatibility(
vulcanSchema,
regSchema
)
}
def checkReaderCompatibility[A](
reader: Codec[A],
writerSubject: String
): IO[SchemaCompatibility.SchemaPairCompatibility] = {
val vulcanSchema = codecAsSchema(reader)
registrySchema(writerSubject).map { regSchema =>
SchemaCompatibility.checkReaderWriterCompatibility(
vulcanSchema,
regSchema
)
}
}

def checkWriterCompatibility[A](writer: Codec[A], readerSubject: String)
: IO[SchemaCompatibility.SchemaPairCompatibility] = {
val vulcanSchema = codecAsSchema(writer)
registrySchema(readerSubject).map { regSchema =>
SchemaCompatibility.checkReaderWriterCompatibility(
regSchema,
vulcanSchema
)
}
def checkWriterCompatibility[A](
writer: Codec[A],
readerSubject: String
): IO[SchemaCompatibility.SchemaPairCompatibility] = {
val vulcanSchema = codecAsSchema(writer)
registrySchema(readerSubject).map { regSchema =>
SchemaCompatibility.checkReaderWriterCompatibility(
regSchema,
vulcanSchema
)
}
}

}
.unsafeRunSync()
}
}
}

0 comments on commit e8c6339

Please sign in to comment.