Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Blob] Added QQ Parquet Format #17855

Merged
merged 24 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions sdk/storage/azure-storage-blob/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Release History

## 12.8.1b1 (Unreleased)

## 12.9.0b1 (Unreleased)

## 12.8.0 (2021-03-01)
**Stable release of preview features**
Expand Down
4 changes: 4 additions & 0 deletions sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@
BlobQueryError,
DelimitedJsonDialect,
DelimitedTextDialect,
ParquetDialect,
ArrowDialect,
ArrowType,
QuickQueryDialect,
ObjectReplicationPolicy,
ObjectReplicationRule
)
Expand Down Expand Up @@ -225,8 +227,10 @@ def download_blob_from_url(
'BlobQueryError',
'DelimitedJsonDialect',
'DelimitedTextDialect',
'ParquetDialect',
'ArrowDialect',
'ArrowType',
'QuickQueryDialect',
'BlobQueryReader',
'ObjectReplicationPolicy',
'ObjectReplicationRule'
Expand Down
39 changes: 30 additions & 9 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_blob_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
upload_block_blob,
upload_append_blob,
upload_page_blob, _any_conditions)
from ._models import BlobType, BlobBlock, BlobProperties, BlobQueryError
from ._models import BlobType, BlobBlock, BlobProperties, BlobQueryError, ParquetDialect, QuickQueryDialect, \
DelimitedJsonDialect, DelimitedTextDialect
from ._download import StorageStreamDownloader
from ._lease import BlobLeaseClient

Expand Down Expand Up @@ -832,16 +833,32 @@ def _quick_query_options(self, query_expression,
# type: (str, **Any) -> Dict[str, Any]
delimiter = '\n'
input_format = kwargs.pop('blob_format', None)
if input_format:
if input_format == QuickQueryDialect.ParquetDialect:
input_format = ParquetDialect()
if input_format == QuickQueryDialect.DelimitedJsonDialect:
input_format = DelimitedJsonDialect()
if input_format == QuickQueryDialect.DelimitedTextDialect:
input_format = DelimitedTextDialect()
input_parquet_format = isinstance(input_format, ParquetDialect)
if input_format and not input_parquet_format:
try:
delimiter = input_format.lineterminator
except AttributeError:
try:
delimiter = input_format.delimiter
except AttributeError:
raise ValueError("The Type of blob_format can only be DelimitedTextDialect or DelimitedJsonDialect")
raise ValueError("The Type of blob_format can only be DelimitedTextDialect or "
"DelimitedJsonDialect or ParquetDialect")
output_format = kwargs.pop('output_format', None)
if output_format == QuickQueryDialect.ParquetDialect:
output_format = ParquetDialect()
if output_format == QuickQueryDialect.DelimitedJsonDialect:
output_format = DelimitedJsonDialect()
if output_format == QuickQueryDialect.DelimitedTextDialect:
output_format = DelimitedTextDialect()
if output_format:
if isinstance(output_format, ParquetDialect):
raise ValueError("ParquetDialect is invalid as an output format.")
try:
delimiter = output_format.lineterminator
except AttributeError:
Expand All @@ -850,7 +867,7 @@ def _quick_query_options(self, query_expression,
except AttributeError:
pass
else:
output_format = input_format
output_format = input_format if not input_parquet_format else None
query_request = QueryRequest(
expression=query_expression,
input_serialization=serialize_query_format(input_format),
Expand Down Expand Up @@ -894,14 +911,18 @@ def query_blob(self, query_expression, **kwargs):
:keyword blob_format:
Optional. Defines the serialization of the data currently stored in the blob. The default is to
treat the blob data as CSV data formatted in the default dialect. This can be overridden with
a custom DelimitedTextDialect, or alternatively a DelimitedJsonDialect.
a custom DelimitedTextDialect, or alternatively a DelimitedJsonDialect or ParquetDialect.
These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string
:paramtype blob_format: ~azure.storage.blob.DelimitedTextDialect or ~azure.storage.blob.DelimitedJsonDialect
or ~azure.storage.blob.ParquetDialect or ~azure.storage.blob.QuickQueryDialect or str
:keyword output_format:
Optional. Defines the output serialization for the data stream. By default the data will be returned
as it is represented in the blob. By providing an output format, the blob data will be reformatted
according to that profile. This value can be a DelimitedTextDialect or a DelimitedJsonDialect.
:paramtype output_format: ~azure.storage.blob.DelimitedTextDialect, ~azure.storage.blob.DelimitedJsonDialect
or list[~azure.storage.blob.ArrowDialect]
as it is represented in the blob (Parquet formats default to DelimitedTextDialect).
By providing an output format, the blob data will be reformatted according to that profile.
This value can be a DelimitedTextDialect or a DelimitedJsonDialect or ArrowDialect.
These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string
:paramtype output_format: ~azure.storage.blob.DelimitedTextDialect or ~azure.storage.blob.DelimitedJsonDialect
or list[~azure.storage.blob.ArrowDialect] or ~azure.storage.blob.QuickQueryDialect or str
:keyword lease:
Required if the blob has an active lease. Value can be a BlobLeaseClient object
or the lease ID as a string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# pylint: disable=unused-import,ungrouped-imports
from typing import Any

from azure.core.pipeline.transport import HttpRequest, HttpResponse

from ._configuration import AzureBlobStorageConfiguration
from .operations import ServiceOperations
from .operations import ContainerOperations
Expand Down Expand Up @@ -77,6 +79,24 @@ def __init__(
self.block_blob = BlockBlobOperations(
self._client, self._config, self._serialize, self._deserialize)

def _send_request(self, http_request, **kwargs):
# type: (HttpRequest, Any) -> HttpResponse
"""Runs the network request through the client's chained policies.

:param http_request: The network request you want to make. Required.
:type http_request: ~azure.core.pipeline.transport.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to True.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.pipeline.transport.HttpResponse
"""
path_format_arguments = {
'url': self._serialize.url("self._config.url", self._config.url, 'str', skip_quote=True),
}
http_request.url = self._client.format_url(http_request.url, **path_format_arguments)
stream = kwargs.pop("stream", True)
pipeline_response = self._client._pipeline.run(http_request, stream=stream, **kwargs)
return pipeline_response.http_response

def close(self):
# type: () -> None
self._client.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(
super(AzureBlobStorageConfiguration, self).__init__(**kwargs)

self.url = url
self.version = "2020-06-12"
self.version = "2020-08-04"
kwargs.setdefault('sdk_moniker', 'azureblobstorage/{}'.format(VERSION))
self._configure(**kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Any

from azure.core import AsyncPipelineClient
from azure.core.pipeline.transport import AsyncHttpResponse, HttpRequest
from msrest import Deserializer, Serializer

from ._configuration import AzureBlobStorageConfiguration
Expand Down Expand Up @@ -72,6 +73,23 @@ def __init__(
self.block_blob = BlockBlobOperations(
self._client, self._config, self._serialize, self._deserialize)

async def _send_request(self, http_request: HttpRequest, **kwargs: Any) -> AsyncHttpResponse:
"""Runs the network request through the client's chained policies.

:param http_request: The network request you want to make. Required.
:type http_request: ~azure.core.pipeline.transport.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to True.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.pipeline.transport.AsyncHttpResponse
"""
path_format_arguments = {
'url': self._serialize.url("self._config.url", self._config.url, 'str', skip_quote=True),
}
http_request.url = self._client.format_url(http_request.url, **path_format_arguments)
stream = kwargs.pop("stream", True)
pipeline_response = await self._client._pipeline.run(http_request, stream=stream, **kwargs)
return pipeline_response.http_response

async def close(self) -> None:
await self._client.close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
super(AzureBlobStorageConfiguration, self).__init__(**kwargs)

self.url = url
self.version = "2020-06-12"
self.version = "2020-08-04"
kwargs.setdefault('sdk_moniker', 'azureblobstorage/{}'.format(VERSION))
self._configure(**kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import datetime
from typing import Any, Callable, Dict, Generic, IO, Optional, TypeVar
from typing import Any, Callable, Dict, Generic, IO, Optional, TypeVar, Union
import warnings

from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
Expand Down Expand Up @@ -48,6 +48,9 @@ async def create(
encryption_algorithm: Optional[str] = "AES256",
request_id_parameter: Optional[str] = None,
blob_tags_string: Optional[str] = None,
immutability_policy_expiry: Optional[datetime.datetime] = None,
immutability_policy_mode: Optional[Union[str, "_models.BlobImmutabilityPolicyMode"]] = None,
legal_hold: Optional[bool] = None,
blob_http_headers: Optional["_models.BlobHTTPHeaders"] = None,
lease_access_conditions: Optional["_models.LeaseAccessConditions"] = None,
cpk_info: Optional["_models.CpkInfo"] = None,
Expand Down Expand Up @@ -80,6 +83,13 @@ async def create(
:type request_id_parameter: str
:param blob_tags_string: Optional. Used to set blob tags in various blob operations.
:type blob_tags_string: str
:param immutability_policy_expiry: Specifies the date time when the blobs immutability policy
is set to expire.
:type immutability_policy_expiry: ~datetime.datetime
:param immutability_policy_mode: Specifies the immutability policy mode to set on the blob.
:type immutability_policy_mode: str or ~azure.storage.blob.models.BlobImmutabilityPolicyMode
:param legal_hold: Specified if a legal hold should be set on the blob.
:type legal_hold: bool
:param blob_http_headers: Parameter group.
:type blob_http_headers: ~azure.storage.blob.models.BlobHTTPHeaders
:param lease_access_conditions: Parameter group.
Expand Down Expand Up @@ -194,6 +204,12 @@ async def create(
header_parameters['x-ms-client-request-id'] = self._serialize.header("request_id_parameter", request_id_parameter, 'str')
if blob_tags_string is not None:
header_parameters['x-ms-tags'] = self._serialize.header("blob_tags_string", blob_tags_string, 'str')
if immutability_policy_expiry is not None:
header_parameters['x-ms-immutability-policy-until-date'] = self._serialize.header("immutability_policy_expiry", immutability_policy_expiry, 'rfc-1123')
if immutability_policy_mode is not None:
header_parameters['x-ms-immutability-policy-mode'] = self._serialize.header("immutability_policy_mode", immutability_policy_mode, 'str')
if legal_hold is not None:
header_parameters['x-ms-legal-hold'] = self._serialize.header("legal_hold", legal_hold, 'bool')
header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')

request = self._client.put(url, query_parameters, header_parameters)
Expand All @@ -202,7 +218,7 @@ async def create(

if response.status_code not in [201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize(_models.StorageError, response)
error = self._deserialize.failsafe_deserialize(_models.StorageError, response)
raise HttpResponseError(response=response, model=error)

response_headers = {}
Expand Down Expand Up @@ -374,7 +390,7 @@ async def append_block(

if response.status_code not in [201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize(_models.StorageError, response)
error = self._deserialize.failsafe_deserialize(_models.StorageError, response)
raise HttpResponseError(response=response, model=error)

response_headers = {}
Expand Down Expand Up @@ -577,7 +593,7 @@ async def append_block_from_url(

if response.status_code not in [201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize(_models.StorageError, response)
error = self._deserialize.failsafe_deserialize(_models.StorageError, response)
raise HttpResponseError(response=response, model=error)

response_headers = {}
Expand Down Expand Up @@ -691,7 +707,7 @@ async def seal(

if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize(_models.StorageError, response)
error = self._deserialize.failsafe_deserialize(_models.StorageError, response)
raise HttpResponseError(response=response, model=error)

response_headers = {}
Expand Down
Loading