Skip to content

Commit

Permalink
Simplify record code
Browse files Browse the repository at this point in the history
  • Loading branch information
bplommer committed Apr 30, 2021
1 parent d23717b commit 40e71fb
Showing 1 changed file with 52 additions and 67 deletions.
119 changes: 52 additions & 67 deletions modules/core/src/main/scala/vulcan/Codec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -905,70 +905,60 @@ object Codec extends CodecCompanionCompat {
props: Props = Props.empty
)(f: FieldBuilder[A] => FreeApplicative[Field[A, *], A]): Codec.Aux[GenericRecord, A] = {
val typeName = if (namespace.isEmpty) name else s"$namespace.$name"
val free = f(FieldBuilder.instance)
val schema = AvroError.catchNonFatal {
val fields =
free.analyze {
f(FieldBuilder.instance).analyze {
new (Field[A, *] ~> λ[a => Either[AvroError, Chain[Schema.Field]]]) {
def apply[B](field: Field[A, B]) =
field.codec.schema.flatMap { schema =>
field.props.toChain
.flatMap { props =>
field.default
.traverse(field.codec.encode(_))
.map { default =>
Chain.one {
val schemaField =
new Schema.Field(
field.name,
schema,
field.doc.orNull,
adaptForSchema {
default.map {
case null => Schema.Field.NULL_DEFAULT_VALUE
case other => other
}.orNull
},
field.order.getOrElse(Schema.Field.Order.ASCENDING)
)

field.aliases.foreach(schemaField.addAlias)

props.foldLeft(()) {
case ((), (name, value)) =>
schemaField.addProp(name, value)
}

schemaField
}
}
}
(
field.codec.schema,
field.props.toChain,
field.default.traverse(field.codec.encode(_))
).mapN { (schema, props, default) =>
val schemaField =
new Schema.Field(
field.name,
schema,
field.doc.orNull,
default.map {
case null => Schema.Field.NULL_DEFAULT_VALUE
case other => adaptForSchema(other)
}.orNull,
field.order.getOrElse(Schema.Field.Order.ASCENDING)
)

field.aliases.foreach(schemaField.addAlias)

props.foldLeft(()) {
case ((), (name, value)) =>
schemaField.addProp(name, value)
}
Chain.one(schemaField)
}
}
}

fields.flatMap { fields =>
props.toChain.map { props =>
val record =
Schema.createRecord(
name,
doc.orNull,
namespace,
false,
fields.toList.asJava
)

aliases.foreach(record.addAlias)

props.foldLeft(()) {
case ((), (name, value)) =>
record.addProp(name, value)
}
(fields, props.toChain).mapN { (fields, props) =>
val record =
Schema.createRecord(
name,
doc.orNull,
namespace,
false,
fields.toList.asJava
)

aliases.foreach(record.addAlias)

record
props.foldLeft(()) {
case ((), (name, value)) =>
record.addProp(name, value)
}

record
}
}

Codec
.instanceForTypes[GenericRecord, A](
"IndexedRecord",
Expand All @@ -977,7 +967,7 @@ object Codec extends CodecCompanionCompat {
a =>
schema.flatMap { schema =>
val fields =
free.analyze {
f(FieldBuilder.instance).analyze {
new (Field[A, *] ~> λ[a => Either[AvroError, Chain[(String, Any)]]]) {
def apply[B](field: Field[A, B]) =
field.codec
Expand All @@ -995,22 +985,17 @@ object Codec extends CodecCompanionCompat {
}
}, {
case (record: IndexedRecord, _) =>
val recordSchema = record.getSchema()
val recordFields = recordSchema.getFields()

free.foldMap {
f(FieldBuilder.instance).foldMap {
new (Field[A, *] ~> Either[AvroError, *]) {
def apply[B](field: Field[A, B]) = {
val schemaField = recordSchema.getField(field.name)
if (schemaField != null) {
val value = record.get(recordFields.indexOf(schemaField))
field.codec.decode(value, schemaField.schema())
} else {
field.default.toRight {
AvroError.decodeMissingRecordField(field.name)
}
def apply[B](field: Field[A, B]): Either[AvroError, B] =
record.getSchema.getField(field.name) match {
case null =>
field.default.toRight {
AvroError.decodeMissingRecordField(field.name)
}
case schemaField =>
field.codec.decode(record.get(schemaField.pos), schemaField.schema)
}
}
}
}
}
Expand Down

0 comments on commit 40e71fb

Please sign in to comment.