diff --git a/sdk/storage/azure-storage-blob/pom.xml b/sdk/storage/azure-storage-blob/pom.xml
index 25e2ea912aaed..5eed95c03e284 100644
--- a/sdk/storage/azure-storage-blob/pom.xml
+++ b/sdk/storage/azure-storage-blob/pom.xml
@@ -119,6 +119,16 @@
3.2.7
test
+
+ org.slf4j
+ slf4j-api
+ 1.7.30
+
+
+ org.slf4j
+ slf4j-simple
+ 1.7.30
+
@@ -201,6 +211,21 @@
maven-surefire-plugin
3.0.0-M3
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 3.0.0-M3
+
+
+
+
+ org.slf4j:slf4j-api:[1.7.30]
+ org.slf4j:slf4j-simple:[1.7.30]
+
+
+
+
+
@@ -260,6 +285,21 @@
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+ 3.0.0-M3
+
+
+
+
+ org.slf4j:slf4j-api:[1.7.30]
+ org.slf4j:slf4j-simple:[1.7.30]
+
+
+
+
+
org.apache.maven.plugins
@@ -281,7 +321,6 @@
--add-reads com.azure.core.test=ALL-UNNAMED
--add-reads com.azure.core.amqp=ALL-UNNAMED
--add-reads com.azure.storage.common=ALL-UNNAMED
- --add-reads com.azure.storage.internal.avro=ALL-UNNAMED
diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java
index b9b68766fcecf..dd7f99bc1ad69 100644
--- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java
+++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java
@@ -19,6 +19,7 @@
import com.azure.storage.internal.avro.implementation.AvroReaderFactory;
import com.azure.storage.internal.avro.implementation.schema.AvroSchema;
import com.azure.storage.internal.avro.implementation.schema.primitive.AvroNullSchema;
+import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -35,6 +36,7 @@
*/
public class BlobQueryReader {
+ private final ClientLogger logger = new ClientLogger(BlobQueryReader.class);
private final Flux avro;
private final Consumer progressConsumer;
private final Consumer errorConsumer;
@@ -63,9 +65,13 @@ public BlobQueryReader(Flux avro, Consumer progre
* @return The parsed query reactive stream.
*/
public Flux read() {
- return new AvroReaderFactory().getAvroReader(avro).read()
- .map(AvroObject::getObject)
- .concatMap(this::parseRecord);
+ try {
+ return new AvroReaderFactory().getAvroReader(avro).read()
+ .map(AvroObject::getObject)
+ .concatMap(this::parseRecord);
+ } catch (Throwable ex) {
+ return Flux.error(logger.logThrowableAsError(Exceptions.propagate(ex)));
+ }
}
/**
diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy
index 321ca5d4a659f..29cb545b20ed4 100644
--- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy
+++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy
@@ -4,6 +4,8 @@
package com.azure.storage.blob
import com.azure.core.http.RequestConditions
+import com.azure.core.http.policy.HttpLogDetailLevel
+import com.azure.core.http.policy.HttpLogOptions
import com.azure.core.util.CoreUtils
import com.azure.core.util.polling.LongRunningOperationStatus
import com.azure.identity.DefaultAzureCredentialBuilder
@@ -12,6 +14,7 @@ import com.azure.storage.blob.models.ArchiveStatus
import com.azure.storage.blob.models.BlobBeginCopySourceRequestConditions
import com.azure.storage.blob.models.BlobErrorCode
import com.azure.storage.blob.models.BlobHttpHeaders
+import com.azure.storage.blob.models.BlobQueryDelimitedSerialization
import com.azure.storage.blob.models.BlobRange
import com.azure.storage.blob.models.BlobRequestConditions
import com.azure.storage.blob.models.BlobStorageException
@@ -66,6 +69,74 @@ class BlobAPITest extends APISpec {
bc.getBlockBlobClient().upload(defaultInputStream.get(), defaultDataSize)
}
+ // Generates and uploads a CSV file
+ def uploadCsv(BlobQueryDelimitedSerialization s, int numCopies) {
+ String header = String.join(new String(s.getColumnSeparator()), "rn1", "rn2", "rn3", "rn4")
+ .concat(new String(s.getRecordSeparator()))
+ byte[] headers = header.getBytes()
+
+ String csv = String.join(new String(s.getColumnSeparator()), "100", "200", "300", "400")
+ .concat(new String(s.getRecordSeparator()))
+ .concat(String.join(new String(s.getColumnSeparator()), "300", "400", "500", "600")
+ .concat(new String(s.getRecordSeparator())))
+
+ byte[] csvData = csv.getBytes()
+
+ int headerLength = s.isHeadersPresent() ? headers.length : 0
+ byte[] data = new byte[headerLength + csvData.length * numCopies]
+ if (s.isHeadersPresent()) {
+ System.arraycopy(headers, 0, data, 0, headers.length)
+ }
+
+ for (int i = 0; i < numCopies; i++) {
+ int o = i * csvData.length + headerLength
+ System.arraycopy(csvData, 0, data, o, csvData.length)
+ }
+
+ InputStream inputStream = new ByteArrayInputStream(data)
+
+ bc.upload(inputStream, data.length, true)
+ }
+
+ @Requires({ liveMode() }) // TODO (gapra-msft): Remove this is just to test
+ def "Query async"() {
+ setup:
+ def oldBc = bc
+ System.setProperty("AZURE_LOG_LEVEL", "INFO")
+ System.setProperty("org.slf4j.simpleLogger.defaultLogLevel" , "DEBUG");
+ bc = getServiceClientBuilder(primaryCredential, primaryBlobServiceClient.getAccountUrl())
+ .httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.HEADERS).addAllowedHeaderName("x-ms-request-id"))
+ .buildClient().getBlobContainerClient(bc.getContainerName())
+ .getBlobClient(bc.getBlobName())
+ BlobQueryDelimitedSerialization ser = new BlobQueryDelimitedSerialization()
+ .setRecordSeparator('\n' as char)
+ .setColumnSeparator(',' as char)
+ .setEscapeChar('\0' as char)
+ .setFieldQuote('\0' as char)
+ .setHeadersPresent(false)
+ uploadCsv(ser, 1)
+ def expression = "SELECT * from BlobStorage"
+
+ ByteArrayOutputStream downloadData = new ByteArrayOutputStream()
+ bc.download(downloadData)
+ byte[] downloadedData = downloadData.toByteArray()
+
+ /* Output Stream. */
+ when:
+ OutputStream os = new ByteArrayOutputStream()
+ bc.query(os, expression)
+ byte[] osData = os.toByteArray()
+
+ then:
+ notThrown(BlobStorageException)
+ osData == downloadedData
+
+ cleanup:
+ bc = oldBc
+ System.clearProperty("AZURE_LOG_LEVEL")
+ System.clearProperty("org.slf4j.simpleLogger.defaultLogLevel")
+ }
+
def "Upload input stream overwrite fails"() {
when:
bc.upload(defaultInputStream.get(), defaultDataSize)
diff --git a/sdk/storage/azure-storage-internal-avro/src/main/java/com/azure/storage/internal/avro/implementation/AvroReaderFactory.java b/sdk/storage/azure-storage-internal-avro/src/main/java/com/azure/storage/internal/avro/implementation/AvroReaderFactory.java
index 01322904a935a..3619279e2da04 100644
--- a/sdk/storage/azure-storage-internal-avro/src/main/java/com/azure/storage/internal/avro/implementation/AvroReaderFactory.java
+++ b/sdk/storage/azure-storage-internal-avro/src/main/java/com/azure/storage/internal/avro/implementation/AvroReaderFactory.java
@@ -3,6 +3,7 @@
package com.azure.storage.internal.avro.implementation;
+import com.azure.core.util.logging.ClientLogger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;