Skip to content

Commit

Permalink
Samples: Fix avro.schema.Parse in snippets (#888)
Browse files Browse the repository at this point in the history
* fix avro.schema.parse in snippets

* fix all

* change avro.schema to schema

* change revert Parse
  • Loading branch information
acocuzzo authored Mar 20, 2023
1 parent 8cd6e72 commit dc6babc
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions samples/snippets/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def publish_avro_records(project_id: str, topic_id: str, avsc_file: str) -> None
"""Pulbish a BINARY or JSON encoded message to a topic configured with an Avro schema."""
# [START pubsub_publish_avro_records]
from avro.io import BinaryEncoder, DatumWriter
import avro
import avro.schema as schema
import io
import json
from google.api_core.exceptions import NotFound
Expand All @@ -473,7 +473,7 @@ def publish_avro_records(project_id: str, topic_id: str, avsc_file: str) -> None
topic_path = publisher_client.topic_path(project_id, topic_id)

# Prepare to write Avro records to the binary output stream.
avro_schema = avro.schema.parse(open(avsc_file, "rb").read())
avro_schema = schema.parse(open(avsc_file, "rb").read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()

Expand Down Expand Up @@ -562,7 +562,7 @@ def subscribe_with_avro_schema(
) -> None:
"""Receive and decode messages sent to a topic with an Avro schema."""
# [START pubsub_subscribe_avro_records]
import avro
import avro.schema as schema
from avro.io import BinaryDecoder, DatumReader
from concurrent.futures import TimeoutError
import io
Expand All @@ -579,7 +579,7 @@ def subscribe_with_avro_schema(
subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

avro_schema = avro.schema.parse(open(avsc_file, "rb").read())
avro_schema = schema.parse(open(avsc_file, "rb").read())

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Get the message serialization type.
Expand Down Expand Up @@ -622,7 +622,7 @@ def subscribe_with_avro_schema_with_revisions(
) -> None:
"""Receive and decode messages sent to a topic with an Avro schema."""
# [START pubsub_subscribe_avro_records_with_revisions]
import avro
import avro.schema as schema
from avro.io import BinaryDecoder, DatumReader
from concurrent.futures import TimeoutError
import io
Expand All @@ -642,7 +642,7 @@ def subscribe_with_avro_schema_with_revisions(
subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

writer_avro_schema = avro.schema.parse(open(avsc_file, "rb").read())
writer_avro_schema = schema.parse(open(avsc_file, "rb").read())
# Dict to keep readers for different schema revisions.
revisions_to_readers = {}

Expand All @@ -662,7 +662,7 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"{schema_path} not found.")
message.nack()
return
reader_avro_schema = avro.schema.parse(received_avro_schema.definition)
reader_avro_schema = schema.parse(received_avro_schema.definition)
revisions_to_readers[schema_revision_id] = DatumReader(
writer_avro_schema, reader_avro_schema
)
Expand Down

0 comments on commit dc6babc

Please sign in to comment.