Skip to content

Commit

Permalink
Ikumarapeli/recording downloader (Azure#29604)
Browse files Browse the repository at this point in the history
* download and delete recording methods

* lint fixes

* lint fixes

* review comments

* fix test cases
  • Loading branch information
ikumarapeli-msft authored Apr 10, 2023
1 parent 4e1d96d commit c05e0e9
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
# license information.
# --------------------------------------------------------------------------

from io import BytesIO
from typing import TYPE_CHECKING # pylint: disable=unused-import
from azure.core.pipeline.transport import HttpResponse
from ._models import RecordingStateResponse, StartRecordingOptions
from ._content_downloader import ContentDownloader

from ._generated.operations import CallRecordingOperations

Expand All @@ -20,6 +23,7 @@ def __init__(# pylint: disable=missing-client-constructor-parameter-credential,
call_recording_client: CallRecordingOperations
) -> None:
self._call_recording_client = call_recording_client
self._downloader = ContentDownloader(call_recording_client)

def start_recording(
self,
Expand Down Expand Up @@ -126,3 +130,96 @@ def get_recording_properties(
recording_id = recording_id, **kwargs)
return RecordingStateResponse._from_generated(# pylint:disable=protected-access
recording_state_response)

def download_streaming(
self,
source_location: str,
offset: int = None,
length: int = None,
**kwargs
) -> HttpResponse:
"""Download a stream of the call recording.
:param source_location: The source location. Required.
:type source_location: str
:param offset: Offset byte. Not required.
:type offset: int
:param length: how many bytes. Not required.
:type length: int
:return: HttpResponse (octet-stream)
:rtype: HttpResponse (octet-stream)
"""
stream = self._downloader.download_streaming(
source_location = source_location,
offset = offset,
length = length,
**kwargs
)
return stream

def delete_recording(
self,
recording_location: str,
**kwargs
) -> None:
"""Delete a call recording.
:param recording_location: The recording location. Required.
:type recording_location: str
"""
self._downloader.delete_recording(recording_location = recording_location, **kwargs)

def download_to_path(
self,
source_location: str,
destination_path: str,
offset: int = None,
length: int = None,
**kwargs
) -> None:
"""Download a stream of the call recording to the destination.
:param source_location: The source location uri. Required.
:type source_location: str
:param destination_path: The destination path. Required.
:type destination_path: str
:param offset: Offset byte. Not required.
:type offset: int
:param length: how many bytes. Not required.
:type length: int
"""
stream = self._downloader.download_streaming(source_location = source_location,
offset = offset,
length = length,
**kwargs
)
with open(destination_path, 'wb') as writer:
writer.write(stream.read())


def download_to_stream(
self,
source_location: str,
destination_stream: BytesIO,
offset: int = None,
length: int = None,
**kwargs
) -> None:
"""Download a stream of the call recording to the destination.
:param source_location: The source location uri. Required.
:type source_location: str
:param destination_stream: The destination stream. Required.
:type destination_stream: BytesIO
:param offset: Offset byte. Not required.
:type offset: int
:param length: how many bytes. Not required.
:type length: int
"""
stream = self._downloader.download_streaming(source_location = source_location,
offset = offset,
length = length,
**kwargs
)
with open(destination_stream, 'wb') as writer:
writer.write(stream.read())
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------

from typing import Any

from urllib.parse import ParseResult, urlparse
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ResourceExistsError,
ResourceNotFoundError,
ResourceNotModifiedError,
map_error,
)
from azure.core.pipeline import PipelineResponse
from azure.core.pipeline.transport import HttpResponse
from azure.core.rest import HttpRequest
from azure.core.utils import case_insensitive_dict

from ._generated import models as _models
from ._generated._serialization import Serializer
from ._generated.operations import CallRecordingOperations

_SERIALIZER = Serializer()
_SERIALIZER.client_side_validation = False


class ContentDownloader(object):
def __init__(
self,
call_recording_client, # type: CallRecordingOperations
): # type: (...) -> None

self._call_recording_client = call_recording_client

def download_streaming( # pylint: disable=inconsistent-return-statements
self, source_location: str, offset: int, length: int, **kwargs: Any
) -> HttpResponse:
"""Download a stream of the call recording.
:param source_location: The source location. Required.
:type source_location: str
:param offset: Offset byte. Not required.
:type offset: int
:param length: how many bytes. Not required.
:type length: int
:return: HttpResponse (octet-stream)
:rtype: HttpResponse (octet-stream)
"""

if length is not None and offset is None:
raise ValueError("Offset value must not be None if length is set.")
if length is not None:
length = offset + length - 1 # Service actually uses an end-range inclusive index

error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})

parsedEndpoint:ParseResult = urlparse(
self._call_recording_client._config.endpoint # pylint: disable=protected-access
)

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
request = build_call_recording_download_recording_request(
source_location = source_location,
headers =_headers,
params =_params,
start = offset,
end = length,
host = parsedEndpoint.hostname
)

pipeline_response: PipelineResponse = self._call_recording_client._client._pipeline.run( # pylint: disable=protected-access
request, stream = True, **kwargs
)
response = pipeline_response.http_response

if response.status_code in [200, 206]:
return response

map_error(status_code = response.status_code, response = response, error_map = error_map)
error = self._call_recording_client._deserialize.failsafe_deserialize( # pylint: disable=protected-access
_models.CommunicationErrorResponse, pipeline_response
)
raise HttpResponseError(response = response, model = error)

def delete_recording( # pylint: disable=inconsistent-return-statements
self, recording_location: str, **kwargs: Any
) -> None:
"""Delete a call recording.
:param recording_location: The recording location. Required.
:type recording_location: str
"""

error_map = {
401: ClientAuthenticationError,
404: ResourceNotFoundError,
409: ResourceExistsError,
304: ResourceNotModifiedError,
}
error_map.update(kwargs.pop("error_map", {}) or {})

parsed_endpoint:ParseResult = urlparse(
self._call_recording_client._config.endpoint # pylint: disable=protected-access
)

_headers = kwargs.pop("headers", {}) or {}
_params = kwargs.pop("params", {}) or {}
request = build_call_recording_delete_recording_request(
recording_location = recording_location,
headers =_headers,
params =_params,
host = parsed_endpoint.hostname
)

pipeline_response: PipelineResponse = self._call_recording_client._client._pipeline.run( # pylint: disable=protected-access
request, stream = False, **kwargs
)

response = pipeline_response.http_response

if response.status_code not in [200]:
map_error(status_code=response.status_code, response = response, error_map=error_map)
error = self._call_recording_client._deserialize.failsafe_deserialize( # pylint: disable=protected-access
_models.CommunicationErrorResponse, pipeline_response
)
raise HttpResponseError(response=response, model=error)

def build_call_recording_delete_recording_request(recording_location: str, host: str, **kwargs: Any) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})

# Construct headers
_headers["x-ms-host"] = _SERIALIZER.header("x-ms-host", host, "str")
return HttpRequest(
method = "DELETE",
url = recording_location,
params = _params,
headers = _headers,
**kwargs
)

def build_call_recording_download_recording_request(source_location: str,
start:int,
end:int,
host:str,
**kwargs: Any
) -> HttpRequest:
_headers = case_insensitive_dict(kwargs.pop("headers", {}) or {})
_params = case_insensitive_dict(kwargs.pop("params", {}) or {})

rangeHeader = "bytes=" + str(start)
if end:
rangeHeader += "-" + str(end)
# Construct headers
_headers["Range"] = _SERIALIZER.header("range", rangeHeader, "str")
_headers["Accept"] = _SERIALIZER.header("accept", "application/json", "str")
_headers["x-ms-host"] = _SERIALIZER.header("x-ms-host", host, "str")
return HttpRequest(method = "GET", url = source_location, params = _params, headers = _headers, **kwargs)
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def __init__(
audio_channel_participant_ordering: Optional[List["CommunicationIdentifier"]] = None,
recording_storage: Optional[Union[str,
"RecordingStorage"]] = None,
external_storage_location: Optional[str] = None,
**kwargs: Any
) -> None:
"""
Expand Down Expand Up @@ -132,6 +133,9 @@ def __init__(
:keyword recording_storage_type: Recording storage mode. ``External`` enables bring your own
storage. Known values are: "acs" and "azureBlob".
:paramtype recording_storage_type: str or
:keyword external_storage_location: The location where recording is stored, when
RecordingStorageType is set to 'BlobStorage'.
:paramtype external_storage_location: str
~azure.communication.callautomation.models.RecordingStorageType
"""
super().__init__(**kwargs)
Expand All @@ -142,24 +146,26 @@ def __init__(
self.recording_format_type = recording_format
self.audio_channel_participant_ordering = audio_channel_participant_ordering
self.recording_storage_type = recording_storage
self.external_storage_location = external_storage_location

def _to_generated(self):
audio_channel_participant_ordering_list:List[CommunicationIdentifierModel] = None
if self.audio_channel_participant_ordering is not None:
audio_channel_participant_ordering_list=[
audio_channel_participant_ordering_list = [
serialize_identifier(identifier) for identifier
in self.audio_channel_participant_ordering]

return StartCallRecordingRequestRest(
call_locator=self.call_locator._to_generated(# pylint:disable=protected-access
),
recording_state_callback_uri=self.recording_state_callback_uri,
recording_content_type=self.recording_content_type,
recording_channel_type=self.recording_channel_type,
recording_format_type=self.recording_format_type,
audio_channel_participant_ordering=audio_channel_participant_ordering_list,
recording_storage_type=self.recording_storage_type
)
recording_state_callback_uri = self.recording_state_callback_uri,
recording_content_type = self.recording_content_type,
recording_channel_type = self.recording_channel_type,
recording_format_type = self.recording_format_type,
audio_channel_participant_ordering = audio_channel_participant_ordering_list,
recording_storage_type = self.recording_storage_type,
external_storage_location = self.external_storage_location
)


class RecordingStateResponse(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import urllib
import base64
import hmac
from urllib.parse import ParseResult, urlparse
from azure.core.credentials import AzureKeyCredential
from azure.core.pipeline.policies import SansIOHTTPPolicy
from .utils import get_current_utc_time
Expand Down Expand Up @@ -51,7 +52,11 @@ def _sign_request(self, request):
verb = request.http_request.method.upper()

# Get the path and query from url, which looks like https://host/path/query
query_url = str(request.http_request.url[len(self._host) + 8:])
parsedUrl:ParseResult = urlparse(request.http_request.url)
query_url = parsedUrl.path

if parsedUrl.query:
query_url += "?" + parsedUrl.query

if self._decode_url:
query_url = urllib.parse.unquote(query_url)
Expand Down
Loading

0 comments on commit c05e0e9

Please sign in to comment.