Skip to content

Commit

Permalink
Rename Codec to SchemaRegistryCodec (#13698)
Browse files Browse the repository at this point in the history
  • Loading branch information
srnagar authored Jul 31, 2020
1 parent a5229d5 commit 7d2859b
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package com.azure.data.schemaregistry.avro;

import com.azure.core.util.logging.ClientLogger;
import com.azure.data.schemaregistry.Codec;
import com.azure.data.schemaregistry.SchemaRegistryCodec;
import com.azure.data.schemaregistry.models.SerializationException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
Expand All @@ -21,7 +21,7 @@
/**
* Base Codec class for Avro encoder and decoder implementations
*/
public class AvroCodec implements Codec {
public class AvroSchemaRegistryCodec implements SchemaRegistryCodec {
private final ClientLogger logger = new ClientLogger(AvroCodec.class);
private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get();
Expand All @@ -33,9 +33,9 @@ public class AvroCodec implements Codec {
* Instantiates AvroCodec instance
* @param avroSpecificReader flag indicating if decoder should decode records as SpecificRecords
*/
public AvroCodec(Boolean avroSpecificReader) {
public AvroSchemaRegistryCodec(Boolean avroSpecificReader) {
if (avroSpecificReader == null) {
this.avroSpecificReader = AvroCodec.AVRO_SPECIFIC_READER_DEFAULT;
this.avroSpecificReader = AvroSchemaRegistryCodec.AVRO_SPECIFIC_READER_DEFAULT;
}
else {
this.avroSpecificReader = avroSpecificReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class SchemaRegistryAvroAsyncSerializer extends SchemaRegistrySerializer
* @param schemaGroup
* @param autoRegisterSchemas
*/
SchemaRegistryAvroAsyncSerializer(CachedSchemaRegistryAsyncClient registryClient, AvroCodec codec,
SchemaRegistryAvroAsyncSerializer(CachedSchemaRegistryAsyncClient registryClient, AvroSchemaRegistryCodec codec,
String schemaGroup, Boolean autoRegisterSchemas) {
super(registryClient, codec, Collections.singletonList(codec), autoRegisterSchemas, schemaGroup);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public SchemaRegistryAvroAsyncSerializer buildAsyncSerializer() {
builder.maxCacheSize(maxCacheSize);
}

AvroCodec codec = new AvroCodec(this.avroSpecificReader);
AvroSchemaRegistryCodec codec = new AvroSchemaRegistryCodec(this.avroSpecificReader);

CachedSchemaRegistryAsyncClient client = builder
.addCodec(codec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,20 @@ public CachedSchemaRegistryClientBuilder addPolicy(HttpPipelinePolicy policy) {
*
* The parseMethod argument should be a stateless, idempotent function.
*
* @param codec Codec class implementation
* @param schemaRegistryCodec Codec class implementation
* @return The updated {@link CachedSchemaRegistryClientBuilder} object.
*/
public CachedSchemaRegistryClientBuilder addCodec(Codec codec) {
Objects.requireNonNull(codec, "'codec' cannot be null.");
if (CoreUtils.isNullOrEmpty(codec.getSchemaType())) {
public CachedSchemaRegistryClientBuilder addCodec(SchemaRegistryCodec schemaRegistryCodec) {
Objects.requireNonNull(schemaRegistryCodec, "'codec' cannot be null.");
if (CoreUtils.isNullOrEmpty(schemaRegistryCodec.getSchemaType())) {
throw logger.logExceptionAsError(
new IllegalArgumentException("Serialization type cannot be null or empty."));
}
if (this.typeParserMap.containsKey(codec.getSchemaType())) {
if (this.typeParserMap.containsKey(schemaRegistryCodec.getSchemaType())) {
throw logger.logExceptionAsError(
new IllegalArgumentException("Multiple parse methods for single serialization type may not be added."));
}
this.typeParserMap.put(codec.getSchemaType(), codec::parseSchemaString);
this.typeParserMap.put(schemaRegistryCodec.getSchemaType(), schemaRegistryCodec::parseSchemaString);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/**
* An interface defining operations required for registry-based serialization and deserialization.
*/
public interface Codec {
public interface SchemaRegistryCodec {
/**
* @return String representation of schema type, e.g. "avro" or "json".
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public abstract class SchemaRegistrySerializer {

CachedSchemaRegistryAsyncClient schemaRegistryClient;

private Codec serializerCodec;
private final Map<String, Codec> deserializerCodecMap = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
private SchemaRegistryCodec serializerSchemaRegistryCodec;
private final Map<String, SchemaRegistryCodec> deserializerCodecMap = new ConcurrentSkipListMap<>(String.CASE_INSENSITIVE_ORDER);
private String schemaType;

Boolean autoRegisterSchemas = SchemaRegistrySerializer.AUTO_REGISTER_SCHEMAS_DEFAULT;
Expand All @@ -47,34 +47,34 @@ public abstract class SchemaRegistrySerializer {
* Constructor for AbstractSchemaRegistrySerializer implementations.
*
* @param schemaRegistryClient client to be used for interfacing with Schema Registry service
* @param serializerCodec Codec to be used for serialization operations
* @param deserializerCodecList list of Codecs to be used to deserialize incoming payloads
* @param serializerSchemaRegistryCodec Codec to be used for serialization operations
* @param deserializerSchemaRegistryCodecList list of Codecs to be used to deserialize incoming payloads
*/
public SchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryClient,
Codec serializerCodec, List<Codec> deserializerCodecList) {
this(schemaRegistryClient, serializerCodec, deserializerCodecList, null, null);
SchemaRegistryCodec serializerSchemaRegistryCodec, List<SchemaRegistryCodec> deserializerSchemaRegistryCodecList) {
this(schemaRegistryClient, serializerSchemaRegistryCodec, deserializerSchemaRegistryCodecList, null, null);
}

public SchemaRegistrySerializer(CachedSchemaRegistryAsyncClient schemaRegistryClient,
Codec serializerCodec, List<Codec> deserializerCodecList, Boolean autoRegisterSchemas,
SchemaRegistryCodec serializerSchemaRegistryCodec, List<SchemaRegistryCodec> deserializerSchemaRegistryCodecList, Boolean autoRegisterSchemas,
String schemaGroup) {

Objects.requireNonNull(serializerCodec);
Objects.requireNonNull(deserializerCodecList);
Objects.requireNonNull(serializerSchemaRegistryCodec);
Objects.requireNonNull(deserializerSchemaRegistryCodecList);

if (schemaRegistryClient == null) {
throw logger.logExceptionAsError(
new IllegalArgumentException("Schema registry client must be initialized and passed into builder."));
}

if (deserializerCodecList.size() == 0) {
if (deserializerSchemaRegistryCodecList.size() == 0) {
throw logger.logExceptionAsError(
new IllegalArgumentException("At least one Codec must be provided for deserialization."));
}

this.schemaRegistryClient = schemaRegistryClient;
this.serializerCodec = serializerCodec;
for (Codec c : deserializerCodecList) {
this.serializerSchemaRegistryCodec = serializerSchemaRegistryCodec;
for (SchemaRegistryCodec c : deserializerSchemaRegistryCodecList) {
if (this.deserializerCodecMap.containsKey(c.getSchemaType())) {
throw logger.logExceptionAsError(
new IllegalArgumentException("Only on Codec can be provided per schema serialization type."));
Expand Down Expand Up @@ -109,17 +109,17 @@ protected <T extends OutputStream> Mono<T> serialize(T s, Object object) {
"Null object, behavior should be defined in concrete serializer implementation."));
}

if (serializerCodec == null) {
if (serializerSchemaRegistryCodec == null) {
return monoError(logger, new SerializationException(
"Byte encoder null, serializer must be initialized with a byte encoder."));
}

if (schemaType == null) {
schemaType = serializerCodec.getSchemaType();
schemaType = serializerSchemaRegistryCodec.getSchemaType();
}

String schemaString = serializerCodec.getSchemaString(object);
String schemaName = serializerCodec.getSchemaName(object);
String schemaString = serializerSchemaRegistryCodec.getSchemaString(object);
String schemaName = serializerSchemaRegistryCodec.getSchemaName(object);

return this.maybeRegisterSchema(this.schemaGroup, schemaName, schemaString, this.schemaType)
.onErrorMap(e -> {
Expand Down Expand Up @@ -153,7 +153,7 @@ protected <T extends OutputStream> Mono<T> serialize(T s, Object object) {
.put(id.getBytes(StandardCharsets.UTF_8));
try {
s.write(idBuffer.array());
s.write(serializerCodec.encode(object));
s.write(serializerSchemaRegistryCodec.encode(object));
} catch (IOException e) {
sink.error(new SerializationException(e.getMessage(), e));
}
Expand Down Expand Up @@ -209,8 +209,8 @@ protected Mono<Object> deserialize(InputStream s) {
int length = buffer.limit() - SchemaRegistrySerializer.SCHEMA_ID_SIZE;
byte[] b = Arrays.copyOfRange(buffer.array(), start, start + length);

Codec codec = getDeserializerCodec(registryObject);
sink.next(codec.decode(b, payloadSchema));
SchemaRegistryCodec schemaRegistryCodec = getDeserializerCodec(registryObject);
sink.next(schemaRegistryCodec.decode(b, payloadSchema));
})
.onErrorMap(e -> {
if (e instanceof SchemaRegistryClientException) {
Expand Down Expand Up @@ -243,16 +243,16 @@ protected Mono<Object> deserialize(InputStream s) {
* @return Codec to be used to deserialize encoded payload bytes
* @throws SerializationException if decoder for the required schema type has not been loaded
*/
private Codec getDeserializerCodec(SchemaRegistryObject registryObject) throws SerializationException {
Codec codec = deserializerCodecMap.get(registryObject.getSchemaType());
if (codec == null) {
private SchemaRegistryCodec getDeserializerCodec(SchemaRegistryObject registryObject) throws SerializationException {
SchemaRegistryCodec schemaRegistryCodec = deserializerCodecMap.get(registryObject.getSchemaType());
if (schemaRegistryCodec == null) {
throw logger.logExceptionAsError(
new SerializationException(
String.format("No deserializer codec class found for schema type '%s'.",
registryObject.getSchemaType())
));
}
return codec;
return schemaRegistryCodec;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class SampleCodec implements Codec {
public class SampleSchemaRegistryCodec implements SchemaRegistryCodec {

public SampleCodec() { }
public SampleSchemaRegistryCodec() { }

@Override
public String getSchemaName(Object object) throws SerializationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class SchemaRegistrySerializerTest {
@Test
public void testRegistryGuidPrefixedToPayload() {
// manually add SchemaRegistryObject into mock registry client cache
SampleCodec encoder = new SampleCodec();
SampleSchemaRegistryCodec encoder = new SampleSchemaRegistryCodec();
SchemaRegistryObject registered = new SchemaRegistryObject(MOCK_GUID,
encoder.getSchemaType(),
encoder.getSchemaName(null),
Expand Down Expand Up @@ -108,7 +108,7 @@ public void testIfRegistryNullThenThrow() {
@Test
public void testAddDeserializerCodec() throws IOException, SchemaRegistryClientException, SerializationException {
// add sample codec impl and test that it is used for decoding payload
SampleCodec decoder = new SampleCodec();
SampleSchemaRegistryCodec decoder = new SampleSchemaRegistryCodec();

// manually add SchemaRegistryObject to cache
SchemaRegistryObject registered = new SchemaRegistryObject(MOCK_GUID,
Expand Down Expand Up @@ -136,7 +136,7 @@ public void testAddDeserializerCodec() throws IOException, SchemaRegistryClientE
},
ex -> System.out.println(ex));

assertEquals(SampleCodec.CONSTANT_PAYLOAD,
assertEquals(SampleSchemaRegistryCodec.CONSTANT_PAYLOAD,
serializer.deserialize(new ByteArrayInputStream(getPayload())).block());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class TestDummySerializer extends SchemaRegistrySerializer {
TestDummySerializer(
CachedSchemaRegistryAsyncClient mockClient,
boolean autoRegisterSchemas) {
super(mockClient, new SampleCodec(), Collections.singletonList(new SampleCodec()));
super(mockClient, new SampleSchemaRegistryCodec(), Collections.singletonList(new SampleSchemaRegistryCodec()));

// allows simulating improperly written serializer constructor that does not initialize byte encoder
this.autoRegisterSchemas = autoRegisterSchemas;
Expand Down

0 comments on commit 7d2859b

Please sign in to comment.