Skip to content

Commit

Permalink
[SchemaRegistry] update API for consistency (#20538)
Browse files Browse the repository at this point in the history
* changes

* updated failing tests

* keep Dict for now

* pylint errors

* nit

* add recordings

* tests recordings
  • Loading branch information
swathipil authored Sep 9, 2021
1 parent 40c9060 commit 1f2218a
Show file tree
Hide file tree
Showing 25 changed files with 400 additions and 470 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,21 @@ class SchemaRegistryAvroSerializer(object):
SchemaRegistryAvroSerializer provides the ability to serialize and deserialize data according
to the given avro schema. It would automatically register, get and cache the schema.
:param schema_registry: The schema registry client
:param client: The schema registry client
which is used to register schema and retrieve schema from the service.
:type schema_registry: ~azure.schemaregistry.SchemaRegistryClient
:param str schema_group: Schema group under which schema should be registered.
:type client: ~azure.schemaregistry.SchemaRegistryClient
:param str group_name: Schema group under which schema should be registered.
:keyword bool auto_register_schemas: When true, register new schemas passed to serialize.
Otherwise, and by default, fail if it has not been pre-registered in the registry.
:keyword str codec: The writer codec. If None, let the avro library decides.
"""

def __init__(self, schema_registry, schema_group, **kwargs):
def __init__(self, client, group_name, **kwargs):
# type: ("SchemaRegistryClient", str, Any) -> None
self._schema_group = schema_group
self._schema_group = group_name
self._avro_serializer = AvroObjectSerializer(codec=kwargs.get("codec"))
self._schema_registry_client = schema_registry # type: "SchemaRegistryClient"
self._schema_registry_client = client # type: "SchemaRegistryClient"
self._auto_register_schemas = kwargs.get("auto_register_schemas", False)
self._auto_register_schema_func = (
self._schema_registry_client.register_schema
Expand Down Expand Up @@ -120,17 +120,18 @@ def _get_schema(self, schema_id, **kwargs):
self._schema_to_id[schema_str] = schema_id
return schema_str

def serialize(self, data, schema, **kwargs):
def serialize(self, value, schema, **kwargs):
# type: (Dict[str, Any], Union[str, bytes], Any) -> bytes
"""
Encode dict data with the given schema. The returns bytes are consisted of: The first 4 bytes
Encode data with the given schema. The returns bytes are consisted of: The first 4 bytes
denoting record format identifier. The following 32 bytes denoting schema id returned by schema registry
service. The remaining bytes are the real data payload.
:param data: The dict data to be encoded.
:param value: The data to be encoded.
:type value: Dict[str, Any]
:param schema: The schema used to encode the data.
:type schema: Union[str, bytes]
:return:
:rtype: bytes
"""
raw_input_schema = schema
try:
Expand All @@ -142,7 +143,7 @@ def serialize(self, data, schema, **kwargs):

record_format_identifier = b"\0\0\0\0"
schema_id = self._get_schema_id(cached_schema.fullname, cached_schema, **kwargs)
data_bytes = self._avro_serializer.serialize(data, cached_schema)
data_bytes = self._avro_serializer.serialize(value, cached_schema)

stream = BytesIO()

Expand All @@ -155,21 +156,21 @@ def serialize(self, data, schema, **kwargs):
stream.close()
return payload

def deserialize(self, data, **kwargs):
def deserialize(self, value, **kwargs):
# type: (bytes, Any) -> Dict[str, Any]
"""
Decode bytes data.
:param bytes data: The bytes data needs to be decoded.
:param bytes value: The bytes data needs to be decoded.
:rtype: Dict[str, Any]
"""
# record_format_identifier = data[0:4] # The first 4 bytes are retained for future record format identifier.
schema_id = data[
schema_id = value[
SCHEMA_ID_START_INDEX : (SCHEMA_ID_START_INDEX + SCHEMA_ID_LENGTH)
].decode("utf-8")
schema_content = self._get_schema(schema_id, **kwargs)

dict_data = self._avro_serializer.deserialize(
data[DATA_START_INDEX:], schema_content
dict_value = self._avro_serializer.deserialize(
value[DATA_START_INDEX:], schema_content
)
return dict_data
return dict_value
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ def on_event(partition_context, event):

# create a SchemaRegistryAvroSerializer instance
avro_serializer = SchemaRegistryAvroSerializer(
schema_registry=SchemaRegistryClient(
client=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_ENDPOINT,
credential=DefaultAzureCredential()
),
schema_group=SCHEMA_GROUP
group_name=SCHEMA_GROUP
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def send_event_data_batch(producer, serializer):
# Use the serialize method to convert dict object to bytes with the given avro schema.
# The serialize method would automatically register the schema into the Schema Registry Service and
# schema would be cached locally for future usage.
payload_bytes = serializer.serialize(data=dict_data, schema=SCHEMA_STRING)
payload_bytes = serializer.serialize(value=dict_data, schema=SCHEMA_STRING)
print('The bytes of serialized dict data is {}.'.format(payload_bytes))

event_data = EventData(body=payload_bytes) # pass the bytes data to the body of an EventData
Expand All @@ -60,11 +60,11 @@ def send_event_data_batch(producer, serializer):

# create a SchemaRegistryAvroSerializer instance
avro_serializer = SchemaRegistryAvroSerializer(
schema_registry=SchemaRegistryClient(
client=SchemaRegistryClient(
endpoint=SCHEMA_REGISTRY_ENDPOINT,
credential=DefaultAzureCredential()
),
schema_group=SCHEMA_GROUP
group_name=SCHEMA_GROUP
)


Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ interactions:
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
response:
body:
string: '{"id":"576838e0558c43f8b85cdaadbd4561f5"}'
string: '{"id":"041afcdb34a546faa3aa26a991567e32"}'
headers:
content-type:
- application/json
date:
- Wed, 01 Sep 2021 17:06:46 GMT
- Wed, 08 Sep 2021 22:17:05 GMT
location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
server:
Expand All @@ -38,9 +38,9 @@ interactions:
transfer-encoding:
- chunked
x-schema-id:
- 576838e0558c43f8b85cdaadbd4561f5
- 041afcdb34a546faa3aa26a991567e32
x-schema-id-location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/576838e0558c43f8b85cdaadbd4561f5?api-version=2017-04
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/041afcdb34a546faa3aa26a991567e32?api-version=2017-04
x-schema-type:
- Avro
x-schema-version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ interactions:
uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04
response:
body:
string: '{"id":"576838e0558c43f8b85cdaadbd4561f5"}'
string: '{"id":"041afcdb34a546faa3aa26a991567e32"}'
headers:
content-type:
- application/json
date:
- Wed, 01 Sep 2021 17:08:24 GMT
- Wed, 08 Sep 2021 22:17:06 GMT
location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04
server:
Expand All @@ -38,9 +38,9 @@ interactions:
transfer-encoding:
- chunked
x-schema-id:
- 576838e0558c43f8b85cdaadbd4561f5
- 041afcdb34a546faa3aa26a991567e32
x-schema-id-location:
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/576838e0558c43f8b85cdaadbd4561f5?api-version=2017-04
- https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/041afcdb34a546faa3aa26a991567e32?api-version=2017-04
x-schema-type:
- Avro
x-schema-version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_raw_avro_serializer_negative(self):
raw_avro_object_serializer.serialize(dict_data_missing_required_field, schema)

@SchemaRegistryPowerShellPreparer()
def test_basic_sr_avro_serializer(self, schemaregistry_endpoint, schemaregistry_group, **kwargs):
def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistry_endpoint, schemaregistry_group, **kwargs):
sr_client = self.create_basic_client(SchemaRegistryClient, endpoint=schemaregistry_endpoint)
sr_avro_serializer = SchemaRegistryAvroSerializer(sr_client, schemaregistry_group, auto_register_schemas=True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,22 @@
def _parse_schema_properties_dict(response):
return {
'location': response.headers.get('location'),
'location_by_id': response.headers.get('schema-id-location'),
'schema_id': response.headers.get('schema-id'),
'id': response.headers.get('schema-id'),
'serialization_type': response.headers.get('serialization-type'),
'version': int(response.headers.get('schema-version'))
}


def _parse_response_schema_properties(response):
properties_dict = _parse_schema_properties_dict(response)
properties_dict['schema_id'] = response.json()["id"]
properties_dict['id'] = response.json()["id"]
return SchemaProperties(
**properties_dict
)


def _parse_response_schema(response):
return Schema(
schema_content=response.text(),
schema_properties=SchemaProperties(**_parse_schema_properties_dict(response))
content=response.text(),
properties=SchemaProperties(**_parse_schema_properties_dict(response))
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ class SchemaProperties(object):
"""
Meta properties of a schema.
:ivar schema_id: References specific schema in registry namespace.
:type schema_id: str
:ivar id: References specific schema in registry namespace.
:type id: str
:ivar location: URL location of schema, identified by schema group, schema name, and version.
:type location: str
:ivar location_by_id: URL location of schema, identified by schema ID.
:type location_by_id: str
:ivar serialization_type: Serialization type for the schema being stored.
:type serialization_type: str
:ivar version: Version of the returned schema.
Expand All @@ -54,13 +52,12 @@ class SchemaProperties(object):

def __init__(
self,
schema_id=None,
id=None, # pylint:disable=redefined-builtin
**kwargs
):
# type: (Optional[str], Any) -> None
self.schema_id = schema_id
self.id = id
self.location = kwargs.get('location')
self.location_by_id = kwargs.get('location_by_id')
self.serialization_type = kwargs.get('serialization_type')
self.version = kwargs.get('version')

Expand All @@ -69,10 +66,10 @@ class Schema(object):
"""
The schema content of a schema, along with id and meta properties.
:ivar schema_content: The content of the schema.
:type schema_content: str
:ivar schema_properties: The properties of the schema.
:type schema_properties: SchemaProperties
:ivar content: The content of the schema.
:type content: str
:ivar properties: The properties of the schema.
:type properties: SchemaProperties
.. admonition:: Example:
Expand All @@ -87,9 +84,9 @@ class Schema(object):

def __init__(
self,
schema_content,
schema_properties,
content,
properties,
):
# type: (str, SchemaProperties) -> None
self.schema_content = schema_content
self.schema_properties = schema_properties
self.content = content
self.properties = properties
Loading

0 comments on commit 1f2218a

Please sign in to comment.