Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: catch json errors on empty responses from lrs #456

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions event_routing_backends/backends/events_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,13 @@ def bulk_send(self, events):
for events_for_route in event_routes.values():
prepared_events = []
host = None
ids = set()
for _, updated_event, host, _ in events_for_route:
if updated_event["id"] in set():
logger.info(f"Found duplicated event {updated_event['id']}")
continue
prepared_events.append(updated_event)
ids.add(updated_event["id"])

if prepared_events: # pragma: no cover
self.dispatch_bulk_events(
Expand Down
6 changes: 3 additions & 3 deletions event_routing_backends/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import uuid
from urllib.parse import parse_qs, urlparse

from functools import lru_cache
from dateutil.parser import parse
from django.conf import settings
from django.contrib.auth import get_user_model
Expand Down Expand Up @@ -51,7 +52,7 @@ def get_uuid5(namespace_key, name):
base_namespace = uuid.uuid5(base_uuid, namespace_key)
return uuid.uuid5(base_namespace, name)


@lru_cache
def get_anonymous_user_id(username_or_id, external_type):
"""
Generate anonymous user id.
Expand Down Expand Up @@ -95,7 +96,6 @@ def get_anonymous_user_id(username_or_id, external_type):

return anonymous_id


def get_user(username_or_id):
"""
Get user by username or user id.
Expand Down Expand Up @@ -149,7 +149,7 @@ def get_user_email(username_or_id):

return user_email


@lru_cache
def get_course_from_id(course_id):
"""
Get Course object using the `course_id`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@
from django.core.management.base import BaseCommand
from libcloud.storage.providers import get_driver
from libcloud.storage.types import Provider

from event_routing_backends.management.commands.helpers.queued_sender import QueuedSender

# Number of bytes to download at a time, this is 2 MB
CHUNK_SIZE = 1024 * 1024 * 2


def _get_chunks(source, file, start_byte, end_byte):
def _get_chunks(source, file):
"""
Fetch a chunk from the upstream source, retry 3 times if necessary.

Expand All @@ -35,8 +34,8 @@ def _get_chunks(source, file, start_byte, end_byte):
try:
chunks = source.download_object_range_as_stream(
file,
start_bytes=start_byte,
end_bytes=end_byte
start_bytes=0,
chunk_size=CHUNK_SIZE
)
break
# Catching all exceptions here because there's no telling what all
Expand Down Expand Up @@ -72,29 +71,22 @@ def transform_tracking_logs(
# Download the file as a stream of characters to save on memory
print(f"Streaming file {file}...")

last_successful_byte = 0
line = ""

while last_successful_byte < int(file.size):
end_byte = last_successful_byte + CHUNK_SIZE

end_byte = min(end_byte, file.size)

chunks = _get_chunks(source, file, last_successful_byte, end_byte)
chunks = _get_chunks(source, file)

for chunk in chunks:
chunk = chunk.decode('utf-8')
for chunk in chunks:
chunk = chunk.decode('utf-8')

# Loop through this chunk, if we find a newline it's time to process
# otherwise just keep appending.
for char in chunk:
if char == "\n" and line:
sender.transform_and_queue(line)
line = ""
else:
line += char
# Loop through this chunk, if we find a newline it's time to process
# otherwise just keep appending.
for char in chunk:
if char == "\n" and line:
sender.transform_and_queue(line)
line = ""
else:
line += char

last_successful_byte = end_byte
# Sometimes the file doesn't end with a newline, we try to use
# any remaining bytes as a final line.
if line:
Expand Down
6 changes: 3 additions & 3 deletions event_routing_backends/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from celery.utils.log import get_task_logger
from celery_utils.persist_on_failure import LoggedPersistOnFailureTask
from django.conf import settings

from json.decoder import JSONDecodeError
from event_routing_backends.processors.transformer_utils.exceptions import EventNotDispatched
from event_routing_backends.utils.http_client import HttpClient
from event_routing_backends.utils.xapi_lrs_client import LrsClient
Expand Down Expand Up @@ -131,7 +131,7 @@ def bulk_send_events(task, events, router_type, host_config):
client_class
)
)
except EventNotDispatched as exc:
except (EventNotDispatched, JSONDecodeError) as exc:
logger.exception(
'Exception occurred while trying to bulk dispatch {} events using client: {}'.format(
len(events),
Expand All @@ -143,6 +143,6 @@ def bulk_send_events(task, events, router_type, host_config):
# to inform about errors. If it's called asynchronously, we want to retry
# the celery task till it succeeds or reaches max retries.
if not task:
raise exc
return
raise task.retry(exc=exc, countdown=getattr(settings, 'EVENT_ROUTING_BACKEND_COUNTDOWN', 30),
max_retries=getattr(settings, 'EVENT_ROUTING_BACKEND_MAX_RETRIES', 3))
2 changes: 1 addition & 1 deletion event_routing_backends/utils/xapi_lrs_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def bulk_send(self, statement_data):
response = self.lrs_client.save_statements(statement_data)

if not response.success:
if response.response.code == 409:
if response.response.code == 409 or response.response.code == 204:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could just do in (204, 409) here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is never reached as the json decode error is triggered first. I will remove it

logger.warning(f"Duplicate event id found in: {response.request.content}")
else:
logger.warning(f"Failed request: {response.request.content}")
Expand Down
Loading