Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Schema Registry] add tests in avro serializer to check that types are not leaked #20818

Closed
1 task done
swathipil opened this issue Sep 24, 2021 · 3 comments · Fixed by #21381
Closed
1 task done

[Schema Registry] add tests in avro serializer to check that types are not leaked #20818

swathipil opened this issue Sep 24, 2021 · 3 comments · Fixed by #21381
Assignees
Labels
blocking-release Blocks release Client This issue points to a problem in the data-plane of the library. Messaging Messaging crew Schema Registry

Comments

@swathipil
Copy link
Member

swathipil commented Sep 24, 2021

background discussion: #20708.
Read through Avro API surface and add more test coverage to ensure that types are not leaked.
Check:

  • Exceptions

  • Serialization

  • Deserialization
    If Avro specific exceptions thrown, catch and throw azure-core exceptions.

  • create gist for addressing this

    • do we want to do client side validation at all, or let the service validate?
@swathipil swathipil added Client This issue points to a problem in the data-plane of the library. Schema Registry Messaging Messaging crew labels Sep 24, 2021
@swathipil swathipil added this to the [2021] October milestone Sep 24, 2021
@swathipil swathipil self-assigned this Sep 24, 2021
@swathipil swathipil added Epic blocking-release Blocks release and removed Epic labels Sep 24, 2021
@swathipil swathipil changed the title [Schema Registry] add tests in avro serializer to check that no types are leaked [Schema Registry] add tests in avro serializer to check that types are not leaked Sep 24, 2021
@swathipil
Copy link
Member Author

swathipil commented Sep 29, 2021

____________________ FIELD ____________________

@swathipil
Copy link
Member Author

swathipil commented Sep 29, 2021

from fastavro import parse_schema, schemaless_writer, schemaless_reader
import avro
from avro.io import DatumWriter, DatumReader, BinaryDecoder, BinaryEncoder
import json
from io import BytesIO
import time

dict_record_schema = {"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}
str_record_schema = json.dumps(dict_record_schema)
bytes_record_schema = bytes(str_record_schema, "UTF-8")

record_val = {
    "name": "hello",
    "favorite_number": 3,
    "favorite_color": "yello"
}
count = 100000
num_times = 5

avro_schema_cache = {}
avro_reader_cache = {}

def to_dict(schema):
    if isinstance(schema, str):
        return json.loads(schema)
    try:
        return json.loads(schema.decode('utf-8'))
    except json.decoder.JSONDecodeError as e:
        raise ValueError("Error parsing JSON: {}, error = {}".format(schema, e))

def serialize_fastavro(schema, value):
    # schemas passed in to fastavro must be converted to dict first
    json_schema = to_dict(schema)
    parsed_schema = parse_schema(json_schema, _write_hint=False)

    stream = BytesIO()
    with stream:
        schemaless_writer(stream, parsed_schema, value)
        encoded_data = stream.getvalue()
    return encoded_data

def serialize_avro(schema, value):
    if not isinstance(schema, avro.schema.Schema):
        schema = avro.schema.parse(schema)
    try:
        writer = avro_schema_cache[str(schema)]
    except KeyError:
        writer = DatumWriter(schema)
        avro_schema_cache[str(schema)] = writer
    
    stream = BytesIO()
    with stream:
        writer.write(value, BinaryEncoder(stream))
        encoded_data = stream.getvalue()
    return encoded_data

def deserialize_fastavro(schema, data):
    json_schema = to_dict(schema)
    parsed_schema = parse_schema(json_schema, _write_hint=False)

    stream = BytesIO(data)
    with stream:
        decoded_data = schemaless_reader(stream, parsed_schema)

def deserialize_avro(schema, data):
    if not hasattr(data, 'read'):
        data = BytesIO(data)

    if not isinstance(schema, avro.schema.Schema):
        schema = avro.schema.parse(schema)

    try:
        reader = avro_reader_cache[str(schema)]
    except KeyError:
        reader = DatumReader(writers_schema=schema)
        avro_reader_cache[str(schema)] = reader

    with data:
        bin_decoder = BinaryDecoder(data)
        decoded_data = reader.read(bin_decoder)


def serialize_100000_records_fastavro():
    # with bytes schema
    times = []
    for _ in range(num_times):
        start_time = time.time()
        for _ in range(count):
            serialize_fastavro(bytes_record_schema, record_val)
        end_time = time.time()
        total_time = end_time - start_time
        times.append(total_time)
    avg_time = sum(times)/num_times
    print('serialize bytes schema fastavro: {}'.format(avg_time))

    # with str schema
    times = []
    for _ in range(num_times):
        start_time = time.time()
        for _ in range(count):
            serialize_fastavro(str_record_schema, record_val)
        end_time = time.time()
        total_time = end_time - start_time
        times.append(total_time)
    avg_time = sum(times)/num_times
    print('serialize str schema fastavro: {}'.format(avg_time))

def serialize_100000_records_avro():
    # with bytes schema
    times = []
    for _ in range(num_times):
        start_time = time.time()
        for _ in range(count):
            serialize_avro(bytes_record_schema, record_val)
        end_time = time.time()
        total_time = end_time - start_time
        times.append(total_time)
    avg_time = sum(times)/num_times
    print('serialize bytes schema avro: {}'.format(avg_time))

    # with str schema
    times = []
    for _ in range(num_times):
        start_time = time.time()
        for _ in range(count):
            serialize_avro(str_record_schema, record_val)
        end_time = time.time()
        total_time = end_time - start_time
        times.append(total_time)
    avg_time = sum(times)/num_times
    print('serialize str schema avro: {}'.format(avg_time))

def deserialize_100000_records_fastavro(encoded_data):
    # with bytes schema
    times = []
    for _ in range(num_times):
        start_time = time.time()
        for _ in range(count):
            deserialize_fastavro(bytes_record_schema, encoded_data)
        end_time = time.time()
        total_time = end_time - start_time
        times.append(total_time)
    avg_time = sum(times)/num_times
    print('deserialize bytes schema fastavro: {}'.format(avg_time))

    # with str schema
    times = []
    for _ in range(num_times):
        start_time = time.time()
        for _ in range(count):
            deserialize_fastavro(str_record_schema, encoded_data)
        end_time = time.time()
        total_time = end_time - start_time
        times.append(total_time)
    avg_time = sum(times)/num_times
    print('deserialize str schema fastavro: {}'.format(total_time))

def deserialize_100000_records_avro(encoded_data):
    times = []
    for _ in range(num_times):
        start_time = time.time()
        for _ in range(count):
            deserialize_avro(bytes_record_schema, encoded_data)
        end_time = time.time()
        total_time = end_time - start_time
        times.append(total_time)
    avg_time = sum(times)/num_times
    print('deserialize bytes schema avro: {}'.format(avg_time))

    # with str schema
    times = []
    for _ in range(num_times):
        start_time = time.time()
        for _ in range(count):
            deserialize_avro(str_record_schema, encoded_data)
        end_time = time.time()
        total_time = end_time - start_time
        times.append(total_time)
    avg_time = sum(times)/num_times
    print('deserialize str schema avro: {}'.format(avg_time))

serialize_100000_records_fastavro()
serialize_100000_records_avro()
encoded_data = serialize_avro(bytes_record_schema, record_val)
deserialize_100000_records_fastavro(encoded_data)
deserialize_100000_records_avro(encoded_data)

@swathipil
Copy link
Member Author

@swathipil swathipil removed the blocking-release Blocks release label Oct 1, 2021
@swathipil swathipil added the blocking-release Blocks release label Oct 8, 2021
azure-sdk pushed a commit to azure-sdk/azure-sdk-for-python that referenced this issue Oct 5, 2022
[Hub Generated] Review request for Microsoft.MachineLearningServices to add version preview/2022-10-01-preview (Azure#20818)

* Adds base for updating Microsoft.MachineLearningServices from version preview/2022-06-01-preview to version 2022-10-01-preview

* Updates readme

* Updates API version in new specs and examples

* Registries public preview 2022-10-01-preview (Azure#20200)

* Registries public preview 2022-10-01-preview

* Fix validation errors

* Fix LintDiff and Avocado errors

* Fixing prettier errors

* Fixing more LintDiff errors

* Another LintDiff error

* Another Lint error

Co-authored-by: Fahd Kamal <fahdk@microsoft.com>

* Added schedule api (Azure#20273)

* added schedule api

* add updateSchedules to examples parent folder

* fix typo

* change position of allOf

* remove subnet from example compute create

* after prettier run

* Description change for recurrence freq

* point to mfe for common objects

* add hash operator

* removed triggertype as well

* change year

* get rid of breaking changes since we didn't change the api

Co-authored-by: Naman Agarwal <naagarw@microsoft.com>

* Added Compute Instance OS Patching Properties (Azure#20284)

* Added CI Os Patching related properties

* Updated descriptions as per Dennis's suggestions

Co-authored-by: Srivatsa Sinha <srsinha@microsoft.com>

* Maulik/sync preview (Azure#20554)

* Added changes to sync with main

* Fixed Prettier Check

Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>

* Remove update schedules REST call for compute instance (Azure#20699)

* change post call to put

* change put to patch

* remove updateschedules rest call

Co-authored-by: Naman Agarwal <naagarw@microsoft.com>

* Update connection type and auth type (Azure#20493)

* Update connection type and auth type

* update connection category

* update x-ms-secret tag

* Add autologger_settings for CI (Azure#20302)

* Add autologger_settings for CI

Add settings to opt-in/out of default mlflow autologger on CIs.

* update machinelearningservices.json

* Add for command job

* fix comma

* Add autologger for command job

* fix CI failures

* Remove mfe.json from PR

* Remove whitespace

* Update custom-words.txt

* Delete incorrect custom-words.txt

* Fixing auto-rest issues in registries (Azure#20772)

* Fixing auto-rest issues in registries

* Removing location from the required list

Co-authored-by: Fahd Kamal <fahdk@microsoft.com>

* Maulik/mfe preview changes (Azure#20372)

* Added mfe.json preview changes

* Added resources to Registries and Workspaces folder

* removed unwanted resources

* removed unwanted resources

* Ran prettier check

* removed unwanted resources

* removed unwanted resources

* Added ManagedOnlineDeployment resource

* Added files to OnlineDeplyment

* Added Model Validation changes

* Added mfe.json changes to handle duplicate operation id changes

* Updated mfe to fix underscore error

* Update title of mfe.json

* Renamed name path parameter

* Updated examples

* Fixed Semantic validations and examples

* Fixed Semantic validations and examples

* Fixed Semantic validations and examples

* Removed name changes from workspaces

* Changes related to Model Validation

* Changes related to Model validation

* Added consumes amd produces in registries.json

* Removed changes from registries

* Updated parameter mlFlowAutologger to mlflowAutologger in mfe.json

* Changed property name to mlflowAutologger

Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>

* Updated custom words to resolve merge conflict (Azure#20833)

* Updated custom words to resolve merge conflict

* Updated custom words to resolve merge conflict

Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>

* Resolved merge conflict (Azure#20845)

Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>

* Fixed LintDiff checks (Azure#20847)

* Fixed LintDiff checks

* Fixed LintDiff checks

* Fixed LintDiff checks

* Fixed LintDiff checks

* Updated examples

Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>

* Resolved merge conflict of cutsom-words.txt (Azure#20956)

* Resolved merge conflict

* Resolved merge conflict

Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>

* [Hub Generated] Review request for Microsoft.MachineLearningServices to add version stable/2022-10-01 (Azure#20800) (Azure#20982)

* Adds base for updating Microsoft.MachineLearningServices from version stable/2022-05-01 to version 2022-10-01

* Updates readme

* Updates API version in new specs and examples

* Added readOnly property (Azure#20506)

* Added readOnly property

* Added changes to sync with main

* Fixed Prettier check

Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>

* Add schedule related schemas for Compute Instance (Azure#20697)

* support for update schedules on Compute Instance

* change put to patch

* remove update schedules rest call

Co-authored-by: Naman Agarwal <naagarw@microsoft.com>

* Added mfe.json changes (Azure#20514)

* Added mfe.json changes

* Added checks related changes

* Updated mfe.json to handle online endpoint and deployment changes

* Updated mfe.json with AutoRebuild changes

Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>

* Removed duplicate definitions and updated with mfe references (Azure#20840)

* Removed duplicate definitions and updated with mfe references

* Replaced Cron to CronTrigger, Recurrence to RecurrenceTrigger from mfe

Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>

Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>
Co-authored-by: Naman Agarwal <namanag16@gmail.com>
Co-authored-by: Naman Agarwal <naagarw@microsoft.com>

Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>
Co-authored-by: Naman Agarwal <namanag16@gmail.com>
Co-authored-by: Naman Agarwal <naagarw@microsoft.com>

* Revert "[Hub Generated] Review request for Microsoft.MachineLearningS… (Azure#20983)

* Revert "[Hub Generated] Review request for Microsoft.MachineLearningServices to add version stable/2022-10-01 (Azure#20800) (Azure#20982)"

This reverts commit 649ef8334aca13558e534babdcf53cef6831f22a.

* Updated readme.md to resolve merge conflict

Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>

* Maulik/fix preview (Azure#20994)

* Merged readme from main

* Added preview tag changes

Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>

* shuffle readme tags (Azure#21005)

* conflict resolution (Azure#21007)

Co-authored-by: fahdkmsft <51756016+fahdkmsft@users.noreply.github.com>
Co-authored-by: Fahd Kamal <fahdk@microsoft.com>
Co-authored-by: Naman Agarwal <namanag16@gmail.com>
Co-authored-by: Naman Agarwal <naagarw@microsoft.com>
Co-authored-by: srivatsasinha <102133347+srivatsasinha@users.noreply.github.com>
Co-authored-by: Srivatsa Sinha <srsinha@microsoft.com>
Co-authored-by: maulikjoshi <maulikjoshi@microsoft.com>
Co-authored-by: ZhidaLiu <zhili@microsoft.com>
Co-authored-by: prakharg-msft <75808410+prakharg-msft@users.noreply.github.com>
Co-authored-by: Teddy Todorov <thtodoro@microsoft.com>
@github-actions github-actions bot locked and limited conversation to collaborators Apr 11, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
blocking-release Blocks release Client This issue points to a problem in the data-plane of the library. Messaging Messaging crew Schema Registry
Projects
None yet
1 participant