From 573942a718c76478a3723b23de5afcde80516a76 Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Thu, 3 Sep 2020 13:00:21 -0700 Subject: [PATCH 1/6] repro issue --- sdk/storage/azure-storage-blob/pom.xml | 41 ++++++++++- .../com/azure/storage/blob/BlobAPITest.groovy | 71 +++++++++++++++++++ 2 files changed, 111 insertions(+), 1 deletion(-) diff --git a/sdk/storage/azure-storage-blob/pom.xml b/sdk/storage/azure-storage-blob/pom.xml index 25e2ea912aaed..5eed95c03e284 100644 --- a/sdk/storage/azure-storage-blob/pom.xml +++ b/sdk/storage/azure-storage-blob/pom.xml @@ -119,6 +119,16 @@ 3.2.7 test + + org.slf4j + slf4j-api + 1.7.30 + + + org.slf4j + slf4j-simple + 1.7.30 + @@ -201,6 +211,21 @@ maven-surefire-plugin 3.0.0-M3 + + org.apache.maven.plugins + maven-enforcer-plugin + 3.0.0-M3 + + + + + org.slf4j:slf4j-api:[1.7.30] + org.slf4j:slf4j-simple:[1.7.30] + + + + + @@ -260,6 +285,21 @@ + + org.apache.maven.plugins + maven-enforcer-plugin + 3.0.0-M3 + + + + + org.slf4j:slf4j-api:[1.7.30] + org.slf4j:slf4j-simple:[1.7.30] + + + + + org.apache.maven.plugins @@ -281,7 +321,6 @@ --add-reads com.azure.core.test=ALL-UNNAMED --add-reads com.azure.core.amqp=ALL-UNNAMED --add-reads com.azure.storage.common=ALL-UNNAMED - --add-reads com.azure.storage.internal.avro=ALL-UNNAMED diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy index 321ca5d4a659f..29cb545b20ed4 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobAPITest.groovy @@ -4,6 +4,8 @@ package com.azure.storage.blob import com.azure.core.http.RequestConditions +import com.azure.core.http.policy.HttpLogDetailLevel +import com.azure.core.http.policy.HttpLogOptions import com.azure.core.util.CoreUtils import com.azure.core.util.polling.LongRunningOperationStatus import com.azure.identity.DefaultAzureCredentialBuilder @@ -12,6 +14,7 @@ import com.azure.storage.blob.models.ArchiveStatus import com.azure.storage.blob.models.BlobBeginCopySourceRequestConditions import com.azure.storage.blob.models.BlobErrorCode import com.azure.storage.blob.models.BlobHttpHeaders +import com.azure.storage.blob.models.BlobQueryDelimitedSerialization import com.azure.storage.blob.models.BlobRange import com.azure.storage.blob.models.BlobRequestConditions import com.azure.storage.blob.models.BlobStorageException @@ -66,6 +69,74 @@ class BlobAPITest extends APISpec { bc.getBlockBlobClient().upload(defaultInputStream.get(), defaultDataSize) } + // Generates and uploads a CSV file + def uploadCsv(BlobQueryDelimitedSerialization s, int numCopies) { + String header = String.join(new String(s.getColumnSeparator()), "rn1", "rn2", "rn3", "rn4") + .concat(new String(s.getRecordSeparator())) + byte[] headers = header.getBytes() + + String csv = String.join(new String(s.getColumnSeparator()), "100", "200", "300", "400") + .concat(new String(s.getRecordSeparator())) + .concat(String.join(new String(s.getColumnSeparator()), "300", "400", "500", "600") + .concat(new String(s.getRecordSeparator()))) + + byte[] csvData = csv.getBytes() + + int headerLength = s.isHeadersPresent() ? headers.length : 0 + byte[] data = new byte[headerLength + csvData.length * numCopies] + if (s.isHeadersPresent()) { + System.arraycopy(headers, 0, data, 0, headers.length) + } + + for (int i = 0; i < numCopies; i++) { + int o = i * csvData.length + headerLength + System.arraycopy(csvData, 0, data, o, csvData.length) + } + + InputStream inputStream = new ByteArrayInputStream(data) + + bc.upload(inputStream, data.length, true) + } + + @Requires({ liveMode() }) // TODO (gapra-msft): Remove this is just to test + def "Query async"() { + setup: + def oldBc = bc + System.setProperty("AZURE_LOG_LEVEL", "INFO") + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel" , "DEBUG"); + bc = getServiceClientBuilder(primaryCredential, primaryBlobServiceClient.getAccountUrl()) + .httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.HEADERS).addAllowedHeaderName("x-ms-request-id")) + .buildClient().getBlobContainerClient(bc.getContainerName()) + .getBlobClient(bc.getBlobName()) + BlobQueryDelimitedSerialization ser = new BlobQueryDelimitedSerialization() + .setRecordSeparator('\n' as char) + .setColumnSeparator(',' as char) + .setEscapeChar('\0' as char) + .setFieldQuote('\0' as char) + .setHeadersPresent(false) + uploadCsv(ser, 1) + def expression = "SELECT * from BlobStorage" + + ByteArrayOutputStream downloadData = new ByteArrayOutputStream() + bc.download(downloadData) + byte[] downloadedData = downloadData.toByteArray() + + /* Output Stream. */ + when: + OutputStream os = new ByteArrayOutputStream() + bc.query(os, expression) + byte[] osData = os.toByteArray() + + then: + notThrown(BlobStorageException) + osData == downloadedData + + cleanup: + bc = oldBc + System.clearProperty("AZURE_LOG_LEVEL") + System.clearProperty("org.slf4j.simpleLogger.defaultLogLevel") + } + def "Upload input stream overwrite fails"() { when: bc.upload(defaultInputStream.get(), defaultDataSize) From 4b2e50abc66dfb9ab66d0b07aee63fa8af8d0ac6 Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Thu, 3 Sep 2020 13:10:45 -0700 Subject: [PATCH 2/6] added try catch --- .../blob/implementation/util/BlobQueryReader.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java index b9b68766fcecf..4e9411d345299 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java @@ -3,6 +3,7 @@ package com.azure.storage.blob.implementation.util; +import com.azure.core.util.FluxUtil; import com.azure.core.util.logging.ClientLogger; import com.azure.storage.blob.implementation.models.DelimitedTextConfiguration; import com.azure.storage.blob.implementation.models.JsonTextConfiguration; @@ -35,6 +36,7 @@ */ public class BlobQueryReader { + private final ClientLogger logger = new ClientLogger(BlobQueryReader.class); private final Flux avro; private final Consumer progressConsumer; private final Consumer errorConsumer; @@ -63,9 +65,13 @@ public BlobQueryReader(Flux avro, Consumer progre * @return The parsed query reactive stream. */ public Flux read() { - return new AvroReaderFactory().getAvroReader(avro).read() - .map(AvroObject::getObject) - .concatMap(this::parseRecord); + try { + return new AvroReaderFactory().getAvroReader(avro).read() + .map(AvroObject::getObject) + .concatMap(this::parseRecord); + } catch (RuntimeException ex) { + return FluxUtil.fluxError(logger, ex); + } } /** From f4b1fb74452377cbd62843376832a8d7167ab781 Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Thu, 3 Sep 2020 13:23:27 -0700 Subject: [PATCH 3/6] Added more try catch --- .../implementation/AvroReaderFactory.java | 39 ++++++++++++------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/sdk/storage/azure-storage-internal-avro/src/main/java/com/azure/storage/internal/avro/implementation/AvroReaderFactory.java b/sdk/storage/azure-storage-internal-avro/src/main/java/com/azure/storage/internal/avro/implementation/AvroReaderFactory.java index 01322904a935a..b65127ff97653 100644 --- a/sdk/storage/azure-storage-internal-avro/src/main/java/com/azure/storage/internal/avro/implementation/AvroReaderFactory.java +++ b/sdk/storage/azure-storage-internal-avro/src/main/java/com/azure/storage/internal/avro/implementation/AvroReaderFactory.java @@ -3,6 +3,7 @@ package com.azure.storage.internal.avro.implementation; +import com.azure.core.util.logging.ClientLogger; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,6 +14,8 @@ */ public class AvroReaderFactory { + private final ClientLogger logger = new ClientLogger(AvroReaderFactory.class); + /** * Gets a new instance of {@link AvroReader}. * @@ -24,15 +27,19 @@ public class AvroReaderFactory { */ public AvroReader getAvroReader(Flux avroHeader, Flux avroBody, long offset, long thresholdIndex) { - return () -> { - AvroParser parser = new AvroParser(true); - /* Parse the header. */ - return avroHeader.concatMap(parser::parse) - /* Prepare the parser to read the body at an offset.*/ - .then(Mono.defer(() -> parser.prepareParserToReadBody(offset, thresholdIndex))) - /* Parse the body. */ - .thenMany(avroBody.concatMap(parser::parse)); - }; + try { + return () -> { + AvroParser parser = new AvroParser(true); + /* Parse the header. */ + return avroHeader.concatMap(parser::parse) + /* Prepare the parser to read the body at an offset.*/ + .then(Mono.defer(() -> parser.prepareParserToReadBody(offset, thresholdIndex))) + /* Parse the body. */ + .thenMany(avroBody.concatMap(parser::parse)); + }; + } catch (RuntimeException ex) { + throw logger.logExceptionAsError(ex); + } } /** @@ -42,10 +49,14 @@ public AvroReader getAvroReader(Flux avroHeader, Flux av * @return An AvroReader. */ public AvroReader getAvroReader(Flux avro) { - return () -> { - AvroParser parser = new AvroParser(false); - /* Parse the header. */ - return avro.concatMap(parser::parse); - }; + try { + return () -> { + AvroParser parser = new AvroParser(false); + /* Parse the header. */ + return avro.concatMap(parser::parse); + }; + } catch (RuntimeException ex) { + throw logger.logExceptionAsError(ex); + } } } From 8c897f9f95dc1bd1999af9d60145b65576cc1c02 Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Thu, 3 Sep 2020 13:46:38 -0700 Subject: [PATCH 4/6] try something new --- .../storage/blob/implementation/util/BlobQueryReader.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java index 4e9411d345299..dc5cf1bc7aa79 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java @@ -65,8 +65,10 @@ public BlobQueryReader(Flux avro, Consumer progre * @return The parsed query reactive stream. */ public Flux read() { + AvroReaderFactory avroReaderFactory; try { - return new AvroReaderFactory().getAvroReader(avro).read() + avroReaderFactory = new AvroReaderFactory(); + return avroReaderFactory.getAvroReader(avro).read() .map(AvroObject::getObject) .concatMap(this::parseRecord); } catch (RuntimeException ex) { From c19bace727bce93b336c92417e31018abf8c79e4 Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Thu, 3 Sep 2020 14:08:10 -0700 Subject: [PATCH 5/6] trying to catch it again --- .../implementation/util/BlobQueryReader.java | 9 ++--- .../implementation/AvroReaderFactory.java | 38 +++++++------------ 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java index dc5cf1bc7aa79..02b1e762576e1 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java @@ -20,6 +20,7 @@ import com.azure.storage.internal.avro.implementation.AvroReaderFactory; import com.azure.storage.internal.avro.implementation.schema.AvroSchema; import com.azure.storage.internal.avro.implementation.schema.primitive.AvroNullSchema; +import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -65,14 +66,12 @@ public BlobQueryReader(Flux avro, Consumer progre * @return The parsed query reactive stream. */ public Flux read() { - AvroReaderFactory avroReaderFactory; try { - avroReaderFactory = new AvroReaderFactory(); - return avroReaderFactory.getAvroReader(avro).read() + return new AvroReaderFactory().getAvroReader(avro).read() .map(AvroObject::getObject) .concatMap(this::parseRecord); - } catch (RuntimeException ex) { - return FluxUtil.fluxError(logger, ex); + } catch (Throwable ex) { + return Flux.error(logger.logThrowableAsError(Exceptions.propagate(ex))); } } diff --git a/sdk/storage/azure-storage-internal-avro/src/main/java/com/azure/storage/internal/avro/implementation/AvroReaderFactory.java b/sdk/storage/azure-storage-internal-avro/src/main/java/com/azure/storage/internal/avro/implementation/AvroReaderFactory.java index b65127ff97653..3619279e2da04 100644 --- a/sdk/storage/azure-storage-internal-avro/src/main/java/com/azure/storage/internal/avro/implementation/AvroReaderFactory.java +++ b/sdk/storage/azure-storage-internal-avro/src/main/java/com/azure/storage/internal/avro/implementation/AvroReaderFactory.java @@ -14,8 +14,6 @@ */ public class AvroReaderFactory { - private final ClientLogger logger = new ClientLogger(AvroReaderFactory.class); - /** * Gets a new instance of {@link AvroReader}. * @@ -27,19 +25,15 @@ public class AvroReaderFactory { */ public AvroReader getAvroReader(Flux avroHeader, Flux avroBody, long offset, long thresholdIndex) { - try { - return () -> { - AvroParser parser = new AvroParser(true); - /* Parse the header. */ - return avroHeader.concatMap(parser::parse) - /* Prepare the parser to read the body at an offset.*/ - .then(Mono.defer(() -> parser.prepareParserToReadBody(offset, thresholdIndex))) - /* Parse the body. */ - .thenMany(avroBody.concatMap(parser::parse)); - }; - } catch (RuntimeException ex) { - throw logger.logExceptionAsError(ex); - } + return () -> { + AvroParser parser = new AvroParser(true); + /* Parse the header. */ + return avroHeader.concatMap(parser::parse) + /* Prepare the parser to read the body at an offset.*/ + .then(Mono.defer(() -> parser.prepareParserToReadBody(offset, thresholdIndex))) + /* Parse the body. */ + .thenMany(avroBody.concatMap(parser::parse)); + }; } /** @@ -49,14 +43,10 @@ public AvroReader getAvroReader(Flux avroHeader, Flux av * @return An AvroReader. */ public AvroReader getAvroReader(Flux avro) { - try { - return () -> { - AvroParser parser = new AvroParser(false); - /* Parse the header. */ - return avro.concatMap(parser::parse); - }; - } catch (RuntimeException ex) { - throw logger.logExceptionAsError(ex); - } + return () -> { + AvroParser parser = new AvroParser(false); + /* Parse the header. */ + return avro.concatMap(parser::parse); + }; } } From 99af1349fc3f5aa607fa2a1b7f7c5c2e40171c02 Mon Sep 17 00:00:00 2001 From: Gauri Prasad Date: Thu, 3 Sep 2020 15:58:56 -0700 Subject: [PATCH 6/6] fixed revert --- .../azure/storage/blob/implementation/util/BlobQueryReader.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java index 02b1e762576e1..dd7f99bc1ad69 100644 --- a/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java +++ b/sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BlobQueryReader.java @@ -3,7 +3,6 @@ package com.azure.storage.blob.implementation.util; -import com.azure.core.util.FluxUtil; import com.azure.core.util.logging.ClientLogger; import com.azure.storage.blob.implementation.models.DelimitedTextConfiguration; import com.azure.storage.blob.implementation.models.JsonTextConfiguration;