Skip to content

Commit

Permalink
[Blob] Added QQ Parquet Format (#17855)
Browse files Browse the repository at this point in the history
* Added QQ Parquet

* cleanup

* added parquet format

* added version

* added parquet tests

* removed comment and fixed docstrings

* abs path

* changed version

* added qq parquet to dl

* changed msrest version

* added tests

* recorded tests

* changed version for fileshares

* fixed parquet type check

* added missing comma

* resolved archboard feedback

* removed useless imports

* added metrics for datalake

* added extra test

* updated datalake depedency

* updated datalake depedency
  • Loading branch information
tasherif-msft authored Apr 28, 2021
1 parent a361d92 commit 4e3282a
Show file tree
Hide file tree
Showing 56 changed files with 7,493 additions and 323 deletions.
3 changes: 1 addition & 2 deletions sdk/storage/azure-storage-blob/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Release History

## 12.8.1b1 (Unreleased)

## 12.9.0b1 (Unreleased)

## 12.8.0 (2021-03-01)
**Stable release of preview features**
Expand Down
4 changes: 4 additions & 0 deletions sdk/storage/azure-storage-blob/azure/storage/blob/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@
BlobQueryError,
DelimitedJsonDialect,
DelimitedTextDialect,
ParquetDialect,
ArrowDialect,
ArrowType,
QuickQueryDialect,
ObjectReplicationPolicy,
ObjectReplicationRule
)
Expand Down Expand Up @@ -225,8 +227,10 @@ def download_blob_from_url(
'BlobQueryError',
'DelimitedJsonDialect',
'DelimitedTextDialect',
'ParquetDialect',
'ArrowDialect',
'ArrowType',
'QuickQueryDialect',
'BlobQueryReader',
'ObjectReplicationPolicy',
'ObjectReplicationRule'
Expand Down
39 changes: 30 additions & 9 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_blob_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@
upload_block_blob,
upload_append_blob,
upload_page_blob, _any_conditions)
from ._models import BlobType, BlobBlock, BlobProperties, BlobQueryError
from ._models import BlobType, BlobBlock, BlobProperties, BlobQueryError, ParquetDialect, QuickQueryDialect, \
DelimitedJsonDialect, DelimitedTextDialect
from ._download import StorageStreamDownloader
from ._lease import BlobLeaseClient

Expand Down Expand Up @@ -832,16 +833,32 @@ def _quick_query_options(self, query_expression,
# type: (str, **Any) -> Dict[str, Any]
delimiter = '\n'
input_format = kwargs.pop('blob_format', None)
if input_format:
if input_format == QuickQueryDialect.ParquetDialect:
input_format = ParquetDialect()
if input_format == QuickQueryDialect.DelimitedJsonDialect:
input_format = DelimitedJsonDialect()
if input_format == QuickQueryDialect.DelimitedTextDialect:
input_format = DelimitedTextDialect()
input_parquet_format = isinstance(input_format, ParquetDialect)
if input_format and not input_parquet_format:
try:
delimiter = input_format.lineterminator
except AttributeError:
try:
delimiter = input_format.delimiter
except AttributeError:
raise ValueError("The Type of blob_format can only be DelimitedTextDialect or DelimitedJsonDialect")
raise ValueError("The Type of blob_format can only be DelimitedTextDialect or "
"DelimitedJsonDialect or ParquetDialect")
output_format = kwargs.pop('output_format', None)
if output_format == QuickQueryDialect.ParquetDialect:
output_format = ParquetDialect()
if output_format == QuickQueryDialect.DelimitedJsonDialect:
output_format = DelimitedJsonDialect()
if output_format == QuickQueryDialect.DelimitedTextDialect:
output_format = DelimitedTextDialect()
if output_format:
if isinstance(output_format, ParquetDialect):
raise ValueError("ParquetDialect is invalid as an output format.")
try:
delimiter = output_format.lineterminator
except AttributeError:
Expand All @@ -850,7 +867,7 @@ def _quick_query_options(self, query_expression,
except AttributeError:
pass
else:
output_format = input_format
output_format = input_format if not input_parquet_format else None
query_request = QueryRequest(
expression=query_expression,
input_serialization=serialize_query_format(input_format),
Expand Down Expand Up @@ -894,14 +911,18 @@ def query_blob(self, query_expression, **kwargs):
:keyword blob_format:
Optional. Defines the serialization of the data currently stored in the blob. The default is to
treat the blob data as CSV data formatted in the default dialect. This can be overridden with
a custom DelimitedTextDialect, or alternatively a DelimitedJsonDialect.
a custom DelimitedTextDialect, or alternatively a DelimitedJsonDialect or ParquetDialect.
These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string
:paramtype blob_format: ~azure.storage.blob.DelimitedTextDialect or ~azure.storage.blob.DelimitedJsonDialect
or ~azure.storage.blob.ParquetDialect or ~azure.storage.blob.QuickQueryDialect or str
:keyword output_format:
Optional. Defines the output serialization for the data stream. By default the data will be returned
as it is represented in the blob. By providing an output format, the blob data will be reformatted
according to that profile. This value can be a DelimitedTextDialect or a DelimitedJsonDialect.
:paramtype output_format: ~azure.storage.blob.DelimitedTextDialect, ~azure.storage.blob.DelimitedJsonDialect
or list[~azure.storage.blob.ArrowDialect]
as it is represented in the blob (Parquet formats default to DelimitedTextDialect).
By providing an output format, the blob data will be reformatted according to that profile.
This value can be a DelimitedTextDialect or a DelimitedJsonDialect or ArrowDialect.
These dialects can be passed through their respective classes, the QuickQueryDialect enum or as a string
:paramtype output_format: ~azure.storage.blob.DelimitedTextDialect or ~azure.storage.blob.DelimitedJsonDialect
or list[~azure.storage.blob.ArrowDialect] or ~azure.storage.blob.QuickQueryDialect or str
:keyword lease:
Required if the blob has an active lease. Value can be a BlobLeaseClient object
or the lease ID as a string.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# pylint: disable=unused-import,ungrouped-imports
from typing import Any

from azure.core.pipeline.transport import HttpRequest, HttpResponse

from ._configuration import AzureBlobStorageConfiguration
from .operations import ServiceOperations
from .operations import ContainerOperations
Expand Down Expand Up @@ -77,6 +79,24 @@ def __init__(
self.block_blob = BlockBlobOperations(
self._client, self._config, self._serialize, self._deserialize)

def _send_request(self, http_request, **kwargs):
# type: (HttpRequest, Any) -> HttpResponse
"""Runs the network request through the client's chained policies.
:param http_request: The network request you want to make. Required.
:type http_request: ~azure.core.pipeline.transport.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to True.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.pipeline.transport.HttpResponse
"""
path_format_arguments = {
'url': self._serialize.url("self._config.url", self._config.url, 'str', skip_quote=True),
}
http_request.url = self._client.format_url(http_request.url, **path_format_arguments)
stream = kwargs.pop("stream", True)
pipeline_response = self._client._pipeline.run(http_request, stream=stream, **kwargs)
return pipeline_response.http_response

def close(self):
# type: () -> None
self._client.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(
super(AzureBlobStorageConfiguration, self).__init__(**kwargs)

self.url = url
self.version = "2020-06-12"
self.version = "2020-08-04"
kwargs.setdefault('sdk_moniker', 'azureblobstorage/{}'.format(VERSION))
self._configure(**kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Any

from azure.core import AsyncPipelineClient
from azure.core.pipeline.transport import AsyncHttpResponse, HttpRequest
from msrest import Deserializer, Serializer

from ._configuration import AzureBlobStorageConfiguration
Expand Down Expand Up @@ -72,6 +73,23 @@ def __init__(
self.block_blob = BlockBlobOperations(
self._client, self._config, self._serialize, self._deserialize)

async def _send_request(self, http_request: HttpRequest, **kwargs: Any) -> AsyncHttpResponse:
"""Runs the network request through the client's chained policies.
:param http_request: The network request you want to make. Required.
:type http_request: ~azure.core.pipeline.transport.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to True.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.pipeline.transport.AsyncHttpResponse
"""
path_format_arguments = {
'url': self._serialize.url("self._config.url", self._config.url, 'str', skip_quote=True),
}
http_request.url = self._client.format_url(http_request.url, **path_format_arguments)
stream = kwargs.pop("stream", True)
pipeline_response = await self._client._pipeline.run(http_request, stream=stream, **kwargs)
return pipeline_response.http_response

async def close(self) -> None:
await self._client.close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def __init__(
super(AzureBlobStorageConfiguration, self).__init__(**kwargs)

self.url = url
self.version = "2020-06-12"
self.version = "2020-08-04"
kwargs.setdefault('sdk_moniker', 'azureblobstorage/{}'.format(VERSION))
self._configure(**kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# Changes may cause incorrect behavior and will be lost if the code is regenerated.
# --------------------------------------------------------------------------
import datetime
from typing import Any, Callable, Dict, Generic, IO, Optional, TypeVar
from typing import Any, Callable, Dict, Generic, IO, Optional, TypeVar, Union
import warnings

from azure.core.exceptions import ClientAuthenticationError, HttpResponseError, ResourceExistsError, ResourceNotFoundError, map_error
Expand Down Expand Up @@ -48,6 +48,9 @@ async def create(
encryption_algorithm: Optional[str] = "AES256",
request_id_parameter: Optional[str] = None,
blob_tags_string: Optional[str] = None,
immutability_policy_expiry: Optional[datetime.datetime] = None,
immutability_policy_mode: Optional[Union[str, "_models.BlobImmutabilityPolicyMode"]] = None,
legal_hold: Optional[bool] = None,
blob_http_headers: Optional["_models.BlobHTTPHeaders"] = None,
lease_access_conditions: Optional["_models.LeaseAccessConditions"] = None,
cpk_info: Optional["_models.CpkInfo"] = None,
Expand Down Expand Up @@ -80,6 +83,13 @@ async def create(
:type request_id_parameter: str
:param blob_tags_string: Optional. Used to set blob tags in various blob operations.
:type blob_tags_string: str
:param immutability_policy_expiry: Specifies the date time when the blobs immutability policy
is set to expire.
:type immutability_policy_expiry: ~datetime.datetime
:param immutability_policy_mode: Specifies the immutability policy mode to set on the blob.
:type immutability_policy_mode: str or ~azure.storage.blob.models.BlobImmutabilityPolicyMode
:param legal_hold: Specified if a legal hold should be set on the blob.
:type legal_hold: bool
:param blob_http_headers: Parameter group.
:type blob_http_headers: ~azure.storage.blob.models.BlobHTTPHeaders
:param lease_access_conditions: Parameter group.
Expand Down Expand Up @@ -194,6 +204,12 @@ async def create(
header_parameters['x-ms-client-request-id'] = self._serialize.header("request_id_parameter", request_id_parameter, 'str')
if blob_tags_string is not None:
header_parameters['x-ms-tags'] = self._serialize.header("blob_tags_string", blob_tags_string, 'str')
if immutability_policy_expiry is not None:
header_parameters['x-ms-immutability-policy-until-date'] = self._serialize.header("immutability_policy_expiry", immutability_policy_expiry, 'rfc-1123')
if immutability_policy_mode is not None:
header_parameters['x-ms-immutability-policy-mode'] = self._serialize.header("immutability_policy_mode", immutability_policy_mode, 'str')
if legal_hold is not None:
header_parameters['x-ms-legal-hold'] = self._serialize.header("legal_hold", legal_hold, 'bool')
header_parameters['Accept'] = self._serialize.header("accept", accept, 'str')

request = self._client.put(url, query_parameters, header_parameters)
Expand All @@ -202,7 +218,7 @@ async def create(

if response.status_code not in [201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize(_models.StorageError, response)
error = self._deserialize.failsafe_deserialize(_models.StorageError, response)
raise HttpResponseError(response=response, model=error)

response_headers = {}
Expand Down Expand Up @@ -374,7 +390,7 @@ async def append_block(

if response.status_code not in [201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize(_models.StorageError, response)
error = self._deserialize.failsafe_deserialize(_models.StorageError, response)
raise HttpResponseError(response=response, model=error)

response_headers = {}
Expand Down Expand Up @@ -577,7 +593,7 @@ async def append_block_from_url(

if response.status_code not in [201]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize(_models.StorageError, response)
error = self._deserialize.failsafe_deserialize(_models.StorageError, response)
raise HttpResponseError(response=response, model=error)

response_headers = {}
Expand Down Expand Up @@ -691,7 +707,7 @@ async def seal(

if response.status_code not in [200]:
map_error(status_code=response.status_code, response=response, error_map=error_map)
error = self._deserialize(_models.StorageError, response)
error = self._deserialize.failsafe_deserialize(_models.StorageError, response)
raise HttpResponseError(response=response, model=error)

response_headers = {}
Expand Down
Loading

0 comments on commit 4e3282a

Please sign in to comment.