Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repro issue #14790

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion sdk/storage/azure-storage-blob/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,16 @@
<version>3.2.7</version> <!-- {x-version-update;cglib:cglib-nodep;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version> <!-- {x-version-update;org.slf4j:slf4j-api;external_dependency} -->
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version> <!-- {x-version-update;org.slf4j:slf4j-simple;external_dependency} -->
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -201,6 +211,21 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-surefire-plugin;external_dependency} -->
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>3.0.0-M3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-enforcer-plugin;external_dependency} -->
<configuration>
<rules>
<bannedDependencies>
<includes>
<include>org.slf4j:slf4j-api:[1.7.30]</include> <!-- {x-include-update;org.slf4j:slf4j-api;external_dependency} -->
<include>org.slf4j:slf4j-simple:[1.7.30]</include> <!-- {x-include-update;org.slf4j:slf4j-simple;external_dependency} -->
</includes>
</bannedDependencies>
</rules>
</configuration>
</plugin>
</plugins>
</build>
</profile>
Expand Down Expand Up @@ -260,6 +285,21 @@
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>3.0.0-M3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-enforcer-plugin;external_dependency} -->
<configuration>
<rules>
<bannedDependencies>
<includes>
<include>org.slf4j:slf4j-api:[1.7.30]</include> <!-- {x-include-update;org.slf4j:slf4j-api;external_dependency} -->
<include>org.slf4j:slf4j-simple:[1.7.30]</include> <!-- {x-include-update;org.slf4j:slf4j-simple;external_dependency} -->
</includes>
</bannedDependencies>
</rules>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand All @@ -281,7 +321,6 @@
--add-reads com.azure.core.test=ALL-UNNAMED
--add-reads com.azure.core.amqp=ALL-UNNAMED
--add-reads com.azure.storage.common=ALL-UNNAMED
--add-reads com.azure.storage.internal.avro=ALL-UNNAMED
</argLine>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.azure.storage.internal.avro.implementation.AvroReaderFactory;
import com.azure.storage.internal.avro.implementation.schema.AvroSchema;
import com.azure.storage.internal.avro.implementation.schema.primitive.AvroNullSchema;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -35,6 +36,7 @@
*/
public class BlobQueryReader {

private final ClientLogger logger = new ClientLogger(BlobQueryReader.class);
private final Flux<ByteBuffer> avro;
private final Consumer<BlobQueryProgress> progressConsumer;
private final Consumer<BlobQueryError> errorConsumer;
Expand Down Expand Up @@ -63,9 +65,13 @@ public BlobQueryReader(Flux<ByteBuffer> avro, Consumer<BlobQueryProgress> progre
* @return The parsed query reactive stream.
*/
public Flux<ByteBuffer> read() {
return new AvroReaderFactory().getAvroReader(avro).read()
.map(AvroObject::getObject)
.concatMap(this::parseRecord);
try {
return new AvroReaderFactory().getAvroReader(avro).read()
.map(AvroObject::getObject)
.concatMap(this::parseRecord);
} catch (Throwable ex) {
return Flux.error(logger.logThrowableAsError(Exceptions.propagate(ex)));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package com.azure.storage.blob

import com.azure.core.http.RequestConditions
import com.azure.core.http.policy.HttpLogDetailLevel
import com.azure.core.http.policy.HttpLogOptions
import com.azure.core.util.CoreUtils
import com.azure.core.util.polling.LongRunningOperationStatus
import com.azure.identity.DefaultAzureCredentialBuilder
Expand All @@ -12,6 +14,7 @@ import com.azure.storage.blob.models.ArchiveStatus
import com.azure.storage.blob.models.BlobBeginCopySourceRequestConditions
import com.azure.storage.blob.models.BlobErrorCode
import com.azure.storage.blob.models.BlobHttpHeaders
import com.azure.storage.blob.models.BlobQueryDelimitedSerialization
import com.azure.storage.blob.models.BlobRange
import com.azure.storage.blob.models.BlobRequestConditions
import com.azure.storage.blob.models.BlobStorageException
Expand Down Expand Up @@ -66,6 +69,74 @@ class BlobAPITest extends APISpec {
bc.getBlockBlobClient().upload(defaultInputStream.get(), defaultDataSize)
}

// Generates and uploads a CSV file
def uploadCsv(BlobQueryDelimitedSerialization s, int numCopies) {
String header = String.join(new String(s.getColumnSeparator()), "rn1", "rn2", "rn3", "rn4")
.concat(new String(s.getRecordSeparator()))
byte[] headers = header.getBytes()

String csv = String.join(new String(s.getColumnSeparator()), "100", "200", "300", "400")
.concat(new String(s.getRecordSeparator()))
.concat(String.join(new String(s.getColumnSeparator()), "300", "400", "500", "600")
.concat(new String(s.getRecordSeparator())))

byte[] csvData = csv.getBytes()

int headerLength = s.isHeadersPresent() ? headers.length : 0
byte[] data = new byte[headerLength + csvData.length * numCopies]
if (s.isHeadersPresent()) {
System.arraycopy(headers, 0, data, 0, headers.length)
}

for (int i = 0; i < numCopies; i++) {
int o = i * csvData.length + headerLength
System.arraycopy(csvData, 0, data, o, csvData.length)
}

InputStream inputStream = new ByteArrayInputStream(data)

bc.upload(inputStream, data.length, true)
}

@Requires({ liveMode() }) // TODO (gapra-msft): Remove this is just to test
def "Query async"() {
setup:
def oldBc = bc
System.setProperty("AZURE_LOG_LEVEL", "INFO")
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel" , "DEBUG");
bc = getServiceClientBuilder(primaryCredential, primaryBlobServiceClient.getAccountUrl())
.httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.HEADERS).addAllowedHeaderName("x-ms-request-id"))
.buildClient().getBlobContainerClient(bc.getContainerName())
.getBlobClient(bc.getBlobName())
BlobQueryDelimitedSerialization ser = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n' as char)
.setColumnSeparator(',' as char)
.setEscapeChar('\0' as char)
.setFieldQuote('\0' as char)
.setHeadersPresent(false)
uploadCsv(ser, 1)
def expression = "SELECT * from BlobStorage"

ByteArrayOutputStream downloadData = new ByteArrayOutputStream()
bc.download(downloadData)
byte[] downloadedData = downloadData.toByteArray()

/* Output Stream. */
when:
OutputStream os = new ByteArrayOutputStream()
bc.query(os, expression)
byte[] osData = os.toByteArray()

then:
notThrown(BlobStorageException)
osData == downloadedData

cleanup:
bc = oldBc
System.clearProperty("AZURE_LOG_LEVEL")
System.clearProperty("org.slf4j.simpleLogger.defaultLogLevel")
}

def "Upload input stream overwrite fails"() {
when:
bc.upload(defaultInputStream.get(), defaultDataSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

package com.azure.storage.internal.avro.implementation;

import com.azure.core.util.logging.ClientLogger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down