From facdce5c2d85f6194ffdab2edda9e7bc94740482 Mon Sep 17 00:00:00 2001 From: Daniel Dubovski Date: Thu, 13 Dec 2018 16:43:16 +0200 Subject: [PATCH 1/2] Feature/client request properties (#57) * add `ClientRequestProperties` --- data/pom.xml | 11 +++- .../microsoft/azure/kusto/data/Client.java | 1 + .../azure/kusto/data/ClientFactory.java | 1 - .../azure/kusto/data/ClientImpl.java | 44 ++++++++++---- .../kusto/data/ClientRequestProperties.java | 57 +++++++++++++++++++ .../kusto/data/ConnectionStringBuilder.java | 2 +- .../microsoft/azure/kusto/data/Results.java | 31 ++++++---- .../com/microsoft/azure/kusto/data/Utils.java | 27 +++++++-- .../data/ClientRequestPropertiesTest.java | 34 +++++++++++ ingest/pom.xml | 12 ++++ samples/src/main/java/Query.java | 9 +++ 11 files changed, 199 insertions(+), 30 deletions(-) create mode 100644 data/src/main/java/com/microsoft/azure/kusto/data/ClientRequestProperties.java create mode 100644 data/src/test/java/com/microsoft/azure/kusto/data/ClientRequestPropertiesTest.java diff --git a/data/pom.xml b/data/pom.xml index 50eee99c..da89e762 100644 --- a/data/pom.xml +++ b/data/pom.xml @@ -27,7 +27,9 @@ - com.microsoft.azure.kusto.ingest.Client + com.microsoft.azure.kusto.data.Client + true + true @@ -98,7 +100,12 @@ ${mockito.version} test - + + org.skyscreamer + jsonassert + 1.5.0 + test + diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/Client.java b/data/src/main/java/com/microsoft/azure/kusto/data/Client.java index 3aed30c8..bf948d4f 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/Client.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/Client.java @@ -9,4 +9,5 @@ public interface Client { Results execute(String database, String command) throws DataServiceException, DataClientException; + Results execute(String database, String command, ClientRequestProperties properties) throws DataServiceException, DataClientException; } diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/ClientFactory.java b/data/src/main/java/com/microsoft/azure/kusto/data/ClientFactory.java index 27dc0d55..9ce0741e 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/ClientFactory.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/ClientFactory.java @@ -7,5 +7,4 @@ public class ClientFactory { public static Client createClient(ConnectionStringBuilder csb) throws URISyntaxException { return new ClientImpl(csb); } - } diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java b/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java index 95e231e8..0ae038f9 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java @@ -7,12 +7,15 @@ import org.json.JSONObject; import java.net.URISyntaxException; +import java.util.concurrent.TimeUnit; public class ClientImpl implements Client { private static final String ADMIN_COMMANDS_PREFIX = "."; private static final String API_VERSION = "v1"; private static final String DEFAULT_DATABASE_NAME = "NetDefaultDb"; + private static final Long COMMAND_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(10) + TimeUnit.SECONDS.toMillis(30); + private static final Long QUERY_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(4) + TimeUnit.SECONDS.toMillis(30); private AadAuthenticationHelper aadAuthenticationHelper; private String clusterUrl; @@ -27,31 +30,50 @@ public Results execute(String command) throws DataServiceException, DataClientEx } public Results execute(String database, String command) throws DataServiceException, DataClientException { + return execute(database, command, null); + } + + public Results execute(String database, String command, ClientRequestProperties properties) throws DataServiceException, DataClientException { + // Argument validation: + if (StringUtils.isAnyEmpty(database, command)) { + throw new IllegalArgumentException("database or command are empty"); + } + + Long timeoutMs = null; + + if (properties != null) { + timeoutMs = properties.getTimeoutInMilliSec(); + } + String clusterEndpoint; if (command.startsWith(ADMIN_COMMANDS_PREFIX)) { clusterEndpoint = String.format("%s/%s/rest/mgmt", clusterUrl, API_VERSION); + if (timeoutMs == null) { + timeoutMs = COMMAND_TIMEOUT_IN_MILLISECS; + } } else { clusterEndpoint = String.format("%s/%s/rest/query", clusterUrl, API_VERSION); - } - return execute(database, command, clusterEndpoint); - } - - private Results execute(String database, String command, String clusterEndpoint) throws DataServiceException, DataClientException { - // Argument validation: - if (StringUtils.isAnyEmpty(database, command, clusterEndpoint)) { - throw new IllegalArgumentException("database, command or clusterEndpoint are empty"); + if (timeoutMs == null) { + timeoutMs = QUERY_TIMEOUT_IN_MILLISECS; + } } String aadAccessToken = aadAuthenticationHelper.acquireAccessToken(); String jsonString; try { - jsonString = new JSONObject() + JSONObject json = new JSONObject() .put("db", database) - .put("csl", command).toString(); + .put("csl", command); + + if (properties != null) { + json.put("properties", properties.toString()); + } + + jsonString = json.toString(); } catch (JSONException e) { throw new DataClientException(clusterEndpoint, String.format(clusterEndpoint, "Error in executing command: %s, in database: %s", command, database), e); } - return Utils.post(clusterEndpoint, aadAccessToken, jsonString); + return Utils.post(clusterEndpoint, aadAccessToken, jsonString, timeoutMs.intValue()); } } \ No newline at end of file diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/ClientRequestProperties.java b/data/src/main/java/com/microsoft/azure/kusto/data/ClientRequestProperties.java new file mode 100644 index 00000000..9b8958d5 --- /dev/null +++ b/data/src/main/java/com/microsoft/azure/kusto/data/ClientRequestProperties.java @@ -0,0 +1,57 @@ +package com.microsoft.azure.kusto.data; + +import org.json.JSONObject; + +import java.util.HashMap; + +/* + * Kusto supports attaching various properties to client requests (such as queries and control commands). + * Such properties may be used to provide additional information to Kusto (for example, for the purpose of correlating client/service interaction), + * may affect what limits and policies get applied to the request, and much more. + * For a complete list of available client request properties + * check out https://docs.microsoft.com/en-us/azure/kusto/api/netfx/request-properties#list-of-clientrequestproperties + */ +public class ClientRequestProperties { + private static final String OPTIONS_KEY = "Options"; + private static final String OPTION_SERVER_TIMEOUT = "servertimeout"; + private HashMap properties; + private HashMap options; + + public ClientRequestProperties() { + properties = new HashMap<>(); + options = new HashMap<>(); + properties.put(OPTIONS_KEY, options); + } + + public void setOption(String name, Object value) { + options.put(name, value); + } + + public Object getOption(String name) { + return options.get(name); + } + + public void removeOption(String name) { + options.remove(name); + } + + public void clearOptions() { + options.clear(); + } + + public Long getTimeoutInMilliSec() { + return (Long) getOption(OPTION_SERVER_TIMEOUT); + } + + public void setTimeoutInMilliSec(Long timeoutInMs) { + options.put(OPTION_SERVER_TIMEOUT, timeoutInMs); + } + + JSONObject toJson() { + return new JSONObject(properties); + } + + public String toString() { + return toJson().toString(); + } +} diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/ConnectionStringBuilder.java b/data/src/main/java/com/microsoft/azure/kusto/data/ConnectionStringBuilder.java index 5d73a31f..13f87b25 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/ConnectionStringBuilder.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/ConnectionStringBuilder.java @@ -36,7 +36,7 @@ private ConnectionStringBuilder(String resourceUri) privateKey = null; } - private static ConnectionStringBuilder createWithAadUserCredentials(String resourceUri, + public static ConnectionStringBuilder createWithAadUserCredentials(String resourceUri, String username, String password, String authorityId) diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/Results.java b/data/src/main/java/com/microsoft/azure/kusto/data/Results.java index 5bb19089..c995a19c 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/Results.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/Results.java @@ -8,21 +8,30 @@ public class Results { private HashMap columnNameToType; private ArrayList> values; - public HashMap getColumnNameToIndex() { return columnNameToIndex; } + public Results(HashMap columnNameToIndex, HashMap columnNameToType, + ArrayList> values) { + this.columnNameToIndex = columnNameToIndex; + this.columnNameToType = columnNameToType; + this.values = values; + } - public HashMap getColumnNameToType() { return columnNameToType; } + public HashMap getColumnNameToIndex() { + return columnNameToIndex; + } - public Integer getIndexByColumnName(String columnName) { return columnNameToIndex.get(columnName); } + public HashMap getColumnNameToType() { + return columnNameToType; + } - public String getTypeByColumnName(String columnName) { return columnNameToType.get(columnName); } + public Integer getIndexByColumnName(String columnName) { + return columnNameToIndex.get(columnName); + } - public ArrayList> getValues() { return values; } + public String getTypeByColumnName(String columnName) { + return columnNameToType.get(columnName); + } - public Results(HashMap columnNameToIndex, HashMap columnNameToType, - ArrayList> values) - { - this.columnNameToIndex = columnNameToIndex; - this.columnNameToType = columnNameToType; - this.values = values; + public ArrayList> getValues() { + return values; } } diff --git a/data/src/main/java/com/microsoft/azure/kusto/data/Utils.java b/data/src/main/java/com/microsoft/azure/kusto/data/Utils.java index e0cf00fa..2cd476fe 100644 --- a/data/src/main/java/com/microsoft/azure/kusto/data/Utils.java +++ b/data/src/main/java/com/microsoft/azure/kusto/data/Utils.java @@ -3,13 +3,16 @@ import com.microsoft.azure.kusto.data.exceptions.DataClientException; import com.microsoft.azure.kusto.data.exceptions.DataServiceException; import com.microsoft.azure.kusto.data.exceptions.DataWebException; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.StatusLine; import org.apache.http.client.HttpClient; +import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import org.json.JSONArray; @@ -22,8 +25,16 @@ class Utils { - static Results post(String url, String aadAccessToken, String payload) throws DataServiceException, DataClientException { - HttpClient httpClient = HttpClients.createSystem(); + static Results post(String url, String aadAccessToken, String payload, Integer timeoutMs) throws DataServiceException, DataClientException { + + HttpClient httpClient; + if (timeoutMs != null) { + RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(timeoutMs).build(); + httpClient = HttpClientBuilder.create().useSystemProperties().setDefaultRequestConfig(requestConfig).build(); + } else { + httpClient = HttpClients.createSystem(); + } + HttpPost httpPost = new HttpPost(url); // Request parameters and other properties. @@ -38,7 +49,15 @@ static Results post(String url, String aadAccessToken, String payload) throws Da httpPost.addHeader("Content-Type", "application/json"); httpPost.addHeader("Accept-Encoding", "gzip,deflate"); httpPost.addHeader("Fed", "True"); - httpPost.addHeader("x-ms-client-version", "Kusto.Java.Client"); + + String version = Utils.class.getPackage().getImplementationVersion(); + String clientVersion = "Kusto.Java.Client"; + if (StringUtils.isNotBlank(version)) { + clientVersion += ":" + version; + } + + httpPost.addHeader("x-ms-client-version", clientVersion); + httpPost.addHeader("x-ms-client-request-id", String.format("KJC.execute;%s", java.util.UUID.randomUUID())); try { //Execute and get the response. @@ -71,7 +90,7 @@ static Results post(String url, String aadAccessToken, String payload) throws Da for (int i = 0; i < resultsRows.length(); i++) { JSONArray row = resultsRows.getJSONArray(i); ArrayList rowVector = new ArrayList<>(); - for(int j = 0; j < row.length(); ++j) { + for (int j = 0; j < row.length(); ++j) { Object obj = row.get(j); if (obj == JSONObject.NULL) { rowVector.add(null); diff --git a/data/src/test/java/com/microsoft/azure/kusto/data/ClientRequestPropertiesTest.java b/data/src/test/java/com/microsoft/azure/kusto/data/ClientRequestPropertiesTest.java new file mode 100644 index 00000000..8d024749 --- /dev/null +++ b/data/src/test/java/com/microsoft/azure/kusto/data/ClientRequestPropertiesTest.java @@ -0,0 +1,34 @@ +package com.microsoft.azure.kusto.data; + +import org.json.JSONException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.skyscreamer.jsonassert.JSONAssert; + +import java.util.concurrent.TimeUnit; + +public class ClientRequestPropertiesTest { + @Test + @DisplayName("test set/get timeout") + void timeoutSetGet() { + ClientRequestProperties props = new ClientRequestProperties(); + Long expected = TimeUnit.MINUTES.toMillis(100); + + // before setting value should be null + Assertions.assertEquals(null, props.getTimeoutInMilliSec()); + + props.setTimeoutInMilliSec(expected); + Assertions.assertEquals(expected, props.getTimeoutInMilliSec()); + } + + @Test + @DisplayName("test ClientRequestProperties toString") + void propertiesToString() throws JSONException { + ClientRequestProperties props = new ClientRequestProperties(); + props.setOption("a", 1); + props.setOption("b", "hello"); + + JSONAssert.assertEquals("{\"Options\": {\"a\":1, \"b\":\"hello\"}}", props.toString(), false); + } +} diff --git a/ingest/pom.xml b/ingest/pom.xml index 48f63510..cc12a00e 100644 --- a/ingest/pom.xml +++ b/ingest/pom.xml @@ -44,6 +44,18 @@ + + org.apache.maven.plugins + maven-jar-plugin + + + + true + true + + + + maven-surefire-plugin ${maven-surefire-plugin.version} diff --git a/samples/src/main/java/Query.java b/samples/src/main/java/Query.java index 78471e80..55816fb5 100644 --- a/samples/src/main/java/Query.java +++ b/samples/src/main/java/Query.java @@ -1,7 +1,10 @@ import com.microsoft.azure.kusto.data.ClientImpl; +import com.microsoft.azure.kusto.data.ClientRequestProperties; import com.microsoft.azure.kusto.data.ConnectionStringBuilder; import com.microsoft.azure.kusto.data.Results; +import java.util.concurrent.TimeUnit; + public class Query { public static void main(String[] args) { @@ -17,6 +20,12 @@ public static void main(String[] args) { Results results = client.execute( System.getProperty("dbName"), System.getProperty("query")); System.out.println(String.format("Kusto sent back %s rows.", results.getValues().size())); + + // in case we want to pass client request properties + ClientRequestProperties clientRequestProperties = new ClientRequestProperties(); + clientRequestProperties.setTimeoutInMilliSec(TimeUnit.MINUTES.toMillis(1)); + + results = client.execute( System.getProperty("dbName"), System.getProperty("query"), clientRequestProperties); } catch (Exception e) { e.printStackTrace(); } From c2ffedf63754545dbae83a62f24bfe2a810ee876 Mon Sep 17 00:00:00 2001 From: Tamir Kamara <26870601+tamirkamara@users.noreply.github.com> Date: Tue, 18 Dec 2018 18:58:51 +0200 Subject: [PATCH 2/2] Ingest from a ResultSet implementation (#58) * ingest a resultset * update versions of dependencies --- README.md | 2 +- data/pom.xml | 2 +- .../test/resources/simplelogger.properties | 7 + .../kusto/ingest/AzureStorageHelper.java | 33 +-- .../azure/kusto/ingest/IngestClient.java | 56 +++++- .../azure/kusto/ingest/IngestClientImpl.java | 148 +++++++++++++- .../ingest/source/ResultSetSourceInfo.java | 46 ++++- .../azure/kusto/ingest/source/SourceInfo.java | 5 + .../kusto/ingest/IngestClientImplTest.java | 190 +++++++++++++++++- .../source/ResultSetSourceInfoTest.java | 48 +++++ .../test/resources/simplelogger.properties | 7 + pom.xml | 4 +- 12 files changed, 494 insertions(+), 54 deletions(-) create mode 100644 data/src/test/resources/simplelogger.properties create mode 100644 ingest/src/test/java/com/microsoft/azure/kusto/ingest/source/ResultSetSourceInfoTest.java create mode 100644 ingest/src/test/resources/simplelogger.properties diff --git a/README.md b/README.md index 62031baf..85bf1142 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Microsoft Azure Kusto (Azure Data Explorer) SDK for Java -master: [![Build Status](https://travis-ci.org/Azure/azure-kusto-java.svg?branch=master)](https://travis-ci.org/Azure/azure-kusto-java) +master: [![Build Status](https://travis-ci.org/Azure/azure-kusto-java.svg)](https://travis-ci.org/Azure/azure-kusto-java) dev: [![Build Status](https://travis-ci.org/Azure/azure-kusto-java.svg?branch=dev)](https://travis-ci.org/Azure/azure-kusto-java) This is the Microsoft Azure Kusto client library which allows communication with Kusto to bring data in (ingest) and query information already stored in the database. diff --git a/data/pom.xml b/data/pom.xml index da89e762..58eebadf 100644 --- a/data/pom.xml +++ b/data/pom.xml @@ -55,7 +55,7 @@ com.microsoft.azure adal4j - 1.6.0 + 1.6.3 diff --git a/data/src/test/resources/simplelogger.properties b/data/src/test/resources/simplelogger.properties new file mode 100644 index 00000000..9a311880 --- /dev/null +++ b/data/src/test/resources/simplelogger.properties @@ -0,0 +1,7 @@ +# SLF4J's SimpleLogger configuration file +# Simple implementation of Logger that sends all enabled log messages, for all defined loggers, to System.err. + +# Default logging detail level for all instances of SimpleLogger. +# Must be one of ("trace", "debug", "info", "warn", or "error"). +# If not specified, defaults to "info". +org.slf4j.simpleLogger.log.com.microsoft.azure.kusto=debug \ No newline at end of file diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/AzureStorageHelper.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/AzureStorageHelper.java index 56a7102f..b2511b70 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/AzureStorageHelper.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/AzureStorageHelper.java @@ -29,13 +29,13 @@ class AzureStorageHelper { private static final int GZIP_BUFFER_SIZE = 16384; private static final int STREAM_BUFFER_SIZE = 16384; - public void postMessageToQueue(String queuePath, String content) throws StorageException, URISyntaxException { + void postMessageToQueue(String queuePath, String content) throws StorageException, URISyntaxException { CloudQueue queue = new CloudQueue(new URI(queuePath)); CloudQueueMessage queueMessage = new CloudQueueMessage(content); queue.addMessage(queueMessage); } - public void azureTableInsertEntity(String tableUri, TableServiceEntity entity) throws StorageException, URISyntaxException { + void azureTableInsertEntity(String tableUri, TableServiceEntity entity) throws StorageException, URISyntaxException { CloudTable table = new CloudTable(new URI(tableUri)); // Create an operation to add the new customer to the table basics table. TableOperation insert = TableOperation.insert(entity); @@ -43,7 +43,7 @@ public void azureTableInsertEntity(String tableUri, TableServiceEntity entity) t table.execute(insert); } - public CloudBlockBlob uploadLocalFileToBlob(String filePath, String blobName, String storageUri) throws URISyntaxException, StorageException, IOException { + CloudBlockBlob uploadLocalFileToBlob(String filePath, String blobName, String storageUri) throws URISyntaxException, StorageException, IOException { log.debug("uploadLocalFileToBlob: filePath: {}, blobName: {}, storageUri: {}", filePath, blobName, storageUri); // Check if the file is already compressed: @@ -52,9 +52,9 @@ public CloudBlockBlob uploadLocalFileToBlob(String filePath, String blobName, St CloudBlobContainer container = new CloudBlobContainer(new URI(storageUri)); File sourceFile = new File(filePath); - CloudBlockBlob blob = container.getBlockBlobReference(blobName + (isCompressed?"":".gz")); + CloudBlockBlob blob = container.getBlockBlobReference(blobName + (isCompressed ? "" : ".gz")); - if(!isCompressed){ + if (!isCompressed) { compressAndUploadFile(filePath, blob); } else { blob.uploadFromFile(sourceFile.getAbsolutePath()); @@ -68,29 +68,29 @@ private void compressAndUploadFile(String filePath, CloudBlockBlob blob) throws BlobOutputStream bos = blob.openOutputStream(); GZIPOutputStream gzout = new GZIPOutputStream(bos); - streamFile(fin, gzout, GZIP_BUFFER_SIZE); + copyStream(fin, gzout, GZIP_BUFFER_SIZE); gzout.close(); fin.close(); } - public CloudBlockBlob uploadStreamToBlob(InputStream inputStream, String blobName, String storageUri, boolean compress) throws IOException, URISyntaxException, StorageException { - log.debug("uploadLocalFileToBlob: blobName: {}, storageUri: {}", blobName, storageUri); + CloudBlockBlob uploadStreamToBlob(InputStream inputStream, String blobName, String storageUri, boolean compress) throws IOException, URISyntaxException, StorageException { + log.debug("uploadStreamToBlob - blobName: {}, storageUri: {}", blobName, storageUri); CloudBlobContainer container = new CloudBlobContainer(new URI(storageUri)); - CloudBlockBlob blob = container.getBlockBlobReference(blobName+ (compress?".gz":"")); + CloudBlockBlob blob = container.getBlockBlobReference(blobName + (compress ? ".gz" : "")); BlobOutputStream bos = blob.openOutputStream(); - if(compress){ + if (compress) { GZIPOutputStream gzout = new GZIPOutputStream(bos); - streamFile(inputStream,gzout,GZIP_BUFFER_SIZE); + copyStream(inputStream, gzout, GZIP_BUFFER_SIZE); gzout.close(); - }else{ - streamFile(inputStream,bos,STREAM_BUFFER_SIZE); + } else { + copyStream(inputStream, bos, STREAM_BUFFER_SIZE); bos.close(); } return blob; } - private void streamFile(InputStream inputStream, OutputStream outputStream, int bufferSize) throws IOException { + private void copyStream(InputStream inputStream, OutputStream outputStream, int bufferSize) throws IOException { byte[] buffer = new byte[bufferSize]; int length; while ((length = inputStream.read(buffer)) > 0) { @@ -98,8 +98,9 @@ private void streamFile(InputStream inputStream, OutputStream outputStream, int } } - public String getBlobPathWithSas(CloudBlockBlob blob) { - StorageCredentialsSharedAccessSignature signature = (StorageCredentialsSharedAccessSignature)blob.getServiceClient().getCredentials(); + String getBlobPathWithSas(CloudBlockBlob blob) { + StorageCredentialsSharedAccessSignature signature = (StorageCredentialsSharedAccessSignature) blob.getServiceClient().getCredentials(); return blob.getStorageUri().getPrimaryUri().toString() + "?" + signature.getToken(); } + } diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClient.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClient.java index 878cce8b..dddb4e15 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClient.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClient.java @@ -10,12 +10,60 @@ public interface IngestClient { - IngestionResult ingestFromFile (FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException; + /** + * Ingests a local file into the service + * + * @param fileSourceInfo The specific SourceInfo to be ingested + * @param ingestionProperties Settings used to customize the ingestion operation + * @return A result that could be used to query for status + * @throws IngestionClientException An exception originating from a client activity + * @throws IngestionServiceException An exception returned from the service + */ + IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException; - IngestionResult ingestFromBlob (BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException; + /** + * Ingests a blob (a file stored as an Azure Storage Blob) into the service + * + * @param blobSourceInfo The specific SourceInfo to be ingested + * @param ingestionProperties Settings used to customize the ingestion operation + * @return A result that could be used to query for status + * @throws IngestionClientException An exception originating from a client activity + * @throws IngestionServiceException An exception returned from the service + */ + IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException; - IngestionResult ingestFromResultSet (ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException; + /** + * Ingests a ResultSet into the service + * + * @param resultSetSourceInfo The specific SourceInfo to be ingested + * @param ingestionProperties Settings used to customize the ingestion operation + * @return A result that could be used to query for status + * @throws IngestionClientException An exception originating from a client activity + * @throws IngestionServiceException An exception returned from the service + */ + IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException; - IngestionResult ingestFromStream (StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException; + /** + * Ingests a ResultSet into the service + * + * @param resultSetSourceInfo The specific SourceInfo to be ingested + * @param ingestionProperties Settings used to customize the ingestion operation + * @param tempStoragePath A local folder path that will be used as a temporary storage, data will be deleted on successful ingestion + * @return A result that could be used to query for status + * @throws IngestionClientException An exception originating from a client activity + * @throws IngestionServiceException An exception returned from the service + */ + IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties, String tempStoragePath) throws IngestionClientException, IngestionServiceException; + + /** + * Ingests a InputStream into the service + * + * @param streamSourceInfo The specific SourceInfo to be ingested + * @param ingestionProperties Settings used to customize the ingestion operation + * @return A result that could be used to query for status + * @throws IngestionClientException An exception originating from a client activity + * @throws IngestionServiceException An exception returned from the service + */ + IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException; } diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientImpl.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientImpl.java index 8b50c5e9..9a53af53 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientImpl.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/IngestClientImpl.java @@ -12,20 +12,26 @@ import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.CloudBlockBlob; +import org.apache.commons.lang3.StringUtils; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; +import java.io.*; import java.lang.invoke.MethodHandles; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.sql.Date; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; import java.time.Instant; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.UUID; +import java.util.zip.GZIPOutputStream; class IngestClientImpl implements IngestClient { @@ -38,16 +44,28 @@ class IngestClientImpl implements IngestClient { IngestClientImpl(ConnectionStringBuilder csb) throws URISyntaxException { log.info("Creating a new IngestClient"); Client client = ClientFactory.createClient(csb); - resourceManager = new ResourceManager(client); + this.resourceManager = new ResourceManager(client); + this.azureStorageHelper = new AzureStorageHelper(); + } + + IngestClientImpl(ResourceManager resourceManager) { + log.info("Creating a new IngestClient"); + this.resourceManager = resourceManager; azureStorageHelper = new AzureStorageHelper(); } + IngestClientImpl(ResourceManager resourceManager, AzureStorageHelper azureStorageHelper) { + log.info("Creating a new IngestClient"); + this.resourceManager = resourceManager; + this.azureStorageHelper = azureStorageHelper; + } + @Override public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException { // Argument validation: - if (blobSourceInfo == null || ingestionProperties == null){ + if (blobSourceInfo == null || ingestionProperties == null) { throw new IllegalArgumentException("blobSourceInfo or ingestionProperties is null"); } blobSourceInfo.validate(); @@ -107,7 +125,7 @@ public IngestionResult ingestFromBlob(BlobSourceInfo blobSourceInfo, IngestionPr @Override public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException { // Argument validation: - if (fileSourceInfo == null || ingestionProperties == null){ + if (fileSourceInfo == null || ingestionProperties == null) { throw new IllegalArgumentException("fileSourceInfo or ingestionProperties is null"); } fileSourceInfo.validate(); @@ -135,7 +153,7 @@ public IngestionResult ingestFromFile(FileSourceInfo fileSourceInfo, IngestionPr @Override public IngestionResult ingestFromStream(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException { // Argument validation: - if (streamSourceInfo == null || ingestionProperties == null){ + if (streamSourceInfo == null || ingestionProperties == null) { throw new IllegalArgumentException("streamSourceInfo or ingestionProperties is null"); } streamSourceInfo.validate(); @@ -204,8 +222,120 @@ private String genBlobName(String fileName, String databaseName, String tableNam return String.format("%s__%s__%s__%s", databaseName, tableName, UUID.randomUUID().toString(), fileName); } - @Override - public IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) { - throw new UnsupportedOperationException(); + public IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties) throws IngestionClientException, IngestionServiceException { + return ingestFromResultSet(resultSetSourceInfo, ingestionProperties, ""); } + + public IngestionResult ingestFromResultSet(ResultSetSourceInfo resultSetSourceInfo, IngestionProperties ingestionProperties, String tempStoragePath) throws IngestionClientException, IngestionServiceException { + try { + Objects.requireNonNull(resultSetSourceInfo, "resultSetSourceInfo cannot be null"); + resultSetSourceInfo.validate(); + + File tempFile; + + if (StringUtils.isBlank(tempStoragePath)) { + tempFile = File.createTempFile("kusto-resultset", ".csv.gz"); + } else { + log.debug("Temp file will be created in a user specified folder: {}", tempStoragePath); + tempFile = File.createTempFile("kusto-resultset", ".csv.gz", new File(tempStoragePath)); + } + + FileOutputStream fos = new FileOutputStream(tempFile, false); + GZIPOutputStream gzipos = new GZIPOutputStream(fos); + Writer writer = new OutputStreamWriter(new BufferedOutputStream(gzipos), StandardCharsets.UTF_8); + log.debug("Writing resultset to temp csv file: {}", tempFile.getAbsolutePath()); + + long numberOfChars = resultSetToCsv(resultSetSourceInfo.getResultSet(), writer, false); + + // utf8 chars are 2 bytes each + FileSourceInfo fileSourceInfo = new FileSourceInfo(tempFile.getAbsolutePath(), numberOfChars * 2); + IngestionResult ingestionResult = ingestFromFile(fileSourceInfo, ingestionProperties); + + //noinspection ResultOfMethodCallIgnored + tempFile.delete(); + + return ingestionResult; + } catch (IngestionClientException | IngestionServiceException ex) { + log.error("Unexpected error when ingesting a result set.", ex); + throw ex; + } catch (IOException ex) { + String msg = "Failed to write or delete local file"; + log.error(msg, ex); + throw new IngestionClientException(msg); + } + } + + long resultSetToCsv(ResultSet resultSet, Writer writer, boolean includeHeaderAsFirstRow) throws IngestionClientException { + final String LINE_SEPARATOR = System.getProperty("line.separator"); + + try { + String columnSeparator = ""; + + ResultSetMetaData metaData = resultSet.getMetaData(); + int numberOfColumns = metaData.getColumnCount(); + + if (includeHeaderAsFirstRow) { + for (int column = 0; column < numberOfColumns; column++) { + writer.write(columnSeparator); + writer.write(metaData.getColumnLabel(column + 1)); + + columnSeparator = ","; + } + + writer.write(LINE_SEPARATOR); + } + + int numberOfRecords = 0; + long numberOfChars = 0; + + // Get all rows. + while (resultSet.next()) { + numberOfChars += writeResultSetRow(resultSet, writer, numberOfColumns); + writer.write(LINE_SEPARATOR); + // Increment row count + numberOfRecords++; + } + + log.debug("Number of chars written from column values: {}", numberOfChars); + + long totalNumberOfChars = numberOfChars + numberOfRecords * LINE_SEPARATOR.length(); + + log.debug("Wrote resultset to file. CharsCount: {}, ColumnCount: {}, RecordCount: {}" + , numberOfChars, numberOfColumns, numberOfRecords); + + return totalNumberOfChars; + } catch (Exception ex) { + String msg = "Unexpected error when writing result set to temporary file."; + log.error(msg, ex); + throw new IngestionClientException(msg); + } finally { + try { + writer.close(); + } catch (IOException e) { /* ignore */ + } + } + } + + private int writeResultSetRow(ResultSet resultSet, Writer writer, int numberOfColumns) throws IOException, SQLException { + int numberOfChars = 0; + String columnString; + String columnSeparator = ""; + + for (int i = 1; i <= numberOfColumns; i++) { + writer.write(columnSeparator); + writer.write('"'); + columnString = resultSet.getObject(i).toString().replace("\"", "\"\""); + writer.write(columnString); + writer.write('"'); + + columnSeparator = ","; + numberOfChars += columnString.length(); + } + + return numberOfChars + + numberOfColumns * 2 * columnSeparator.length() // 2 " per column + + numberOfColumns - 1 // last column doesn't have a separator + ; + } + } diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/source/ResultSetSourceInfo.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/source/ResultSetSourceInfo.java index 0ddbfd35..9a09b64a 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/source/ResultSetSourceInfo.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/source/ResultSetSourceInfo.java @@ -1,27 +1,53 @@ package com.microsoft.azure.kusto.ingest.source; import java.sql.ResultSet; +import java.util.Objects; import java.util.UUID; +/** + * Represents the Resultset source information used for ingestion. + */ public class ResultSetSourceInfo extends AbstractSourceInfo { private ResultSet resultSet; - public ResultSet getResultSet() { - return resultSet; + /** + * Creates a ResultSetSourceInfo. + * + * @param resultSet The ResultSet with the data to be ingested. + */ + public ResultSetSourceInfo(ResultSet resultSet) { + setResultSet(resultSet); } - public void setResultSet(ResultSet resultSet) { - this.resultSet = resultSet; + /** + * Creates a ResultSetSourceInfo + * + * @param resultSet The ResultSet with the data to be ingested. + * @param sourceId An identifier that could later be used to trace this specific source data. + */ + public ResultSetSourceInfo(ResultSet resultSet, UUID sourceId) { + setResultSet(resultSet); + this.setSourceId(sourceId); } - public ResultSetSourceInfo(ResultSet resultSet) { - this.resultSet = resultSet; + /** + * Gets the ResultSet. + * + * @return The ResultSet in the SourceInfo + */ + public ResultSet getResultSet() { + return resultSet; } - public ResultSetSourceInfo(ResultSet resultSet, UUID sourceId) { - this.resultSet = resultSet; - this.setSourceId(sourceId); + /** + * Sets the ResultSet. + * + * @param resultSet The ResultSet with the data to be ingested. + */ + @SuppressWarnings("WeakerAccess") + public void setResultSet(ResultSet resultSet) { + this.resultSet = Objects.requireNonNull(resultSet, "ResultSet cannot be null"); } @Override @@ -30,6 +56,6 @@ public String toString() { } public void validate() { - // Not implemented yet + //nothing to validate as of now. } } diff --git a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/source/SourceInfo.java b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/source/SourceInfo.java index 367001c4..d19ca0ba 100644 --- a/ingest/src/main/java/com/microsoft/azure/kusto/ingest/source/SourceInfo.java +++ b/ingest/src/main/java/com/microsoft/azure/kusto/ingest/source/SourceInfo.java @@ -3,7 +3,12 @@ import java.util.UUID; public interface SourceInfo { + /** + * Checks that this SourceInfo is defined appropriately. + */ void validate(); + UUID getSourceId(); + void setSourceId(UUID sourceId); } diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/IngestClientImplTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/IngestClientImplTest.java index 9969033a..bcf22198 100644 --- a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/IngestClientImplTest.java +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/IngestClientImplTest.java @@ -1,18 +1,26 @@ package com.microsoft.azure.kusto.ingest; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException; +import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; +import com.microsoft.azure.kusto.ingest.result.TableReportIngestionResult; import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; +import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo; import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo; import com.microsoft.azure.storage.blob.CloudBlockBlob; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; -import java.io.FileInputStream; -import java.io.InputStream; +import java.io.*; import java.net.URI; import java.nio.file.Paths; +import java.sql.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.*; @@ -21,7 +29,7 @@ class IngestClientImplTest { private ResourceManager resourceManagerMock = mock(ResourceManager.class); private IngestClientImpl ingestClientImplMock; private AzureStorageHelper azureStorageHelperMock; - private IngestionProperties props; + private IngestionProperties ingestionProperties; @BeforeEach void setUp() { @@ -43,8 +51,8 @@ void setUp() { when(resourceManagerMock.getIdentityToken()) .thenReturn("identityToken"); - props = new IngestionProperties("dbName", "tableName"); - props.setJsonMappingName("mappingName"); + ingestionProperties = new IngestionProperties("dbName", "tableName"); + ingestionProperties.setJsonMappingName("mappingName"); } catch (Exception e) { e.printStackTrace(); @@ -66,9 +74,9 @@ void ingestFromBlob() { BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath, size); - ingestClientImplMock.ingestFromBlob(blobSourceInfo, props); + ingestClientImplMock.ingestFromBlob(blobSourceInfo, ingestionProperties); - verify(ingestClientImplMock).ingestFromBlob(blobSourceInfo, props); + verify(ingestClientImplMock).ingestFromBlob(blobSourceInfo, ingestionProperties); } catch (Exception e) { e.printStackTrace(); @@ -87,10 +95,10 @@ void ingestFromFile() { FileSourceInfo fileSourceInfo = new FileSourceInfo(testFilePath, 0); int numOfFiles = 3; for (int i = 0; i < numOfFiles; i++) { - ingestClientImplMock.ingestFromFile(fileSourceInfo, props); + ingestClientImplMock.ingestFromFile(fileSourceInfo, ingestionProperties); } - verify(ingestClientImplMock, times(numOfFiles)).ingestFromFile(fileSourceInfo, props); + verify(ingestClientImplMock, times(numOfFiles)).ingestFromFile(fileSourceInfo, ingestionProperties); } catch (Exception e) { e.printStackTrace(); @@ -107,12 +115,172 @@ void ingestFromStream() { int numOfFiles = 3; for (int i = 0; i < numOfFiles; i++) { InputStream stream = new FileInputStream(testFilePath); - StreamSourceInfo streamSourceInfo = new StreamSourceInfo(stream,false); - ingestClientImplMock.ingestFromStream(streamSourceInfo, props); + StreamSourceInfo streamSourceInfo = new StreamSourceInfo(stream, false); + ingestClientImplMock.ingestFromStream(streamSourceInfo, ingestionProperties); } verify(ingestClientImplMock, times(numOfFiles)).ingestFromStream(any(StreamSourceInfo.class), any(IngestionProperties.class)); } catch (Exception e) { e.printStackTrace(); } } + + @Test + void ingestFromResultSet_DefaultTempFolder_Success() throws IngestionClientException, IngestionServiceException { + IngestClientImpl ingestClient = new IngestClientImpl(resourceManagerMock, azureStorageHelperMock); + // we need a spy to intercept calls to internal methods so it wouldn't be called + IngestClientImpl ingestClientSpy = spy(ingestClient); + TableReportIngestionResult ingestionResultMock = mock(TableReportIngestionResult.class); + + doReturn(ingestionResultMock).when(ingestClientSpy).ingestFromFile(any(), any()); + long numberOfChars = 1000; + doReturn(numberOfChars).when(ingestClientSpy).resultSetToCsv(any(), any(), anyBoolean()); + + ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(mock(ResultSet.class)); + + ingestClientSpy.ingestFromResultSet(resultSetSourceInfo, ingestionProperties); + verify(ingestClientSpy).ingestFromResultSet(resultSetSourceInfo, ingestionProperties); + + // captor to allow us to inspect the internal call values + ArgumentCaptor fileSourceInfoCaptor = ArgumentCaptor.forClass(FileSourceInfo.class); + verify(ingestClientSpy).ingestFromFile(fileSourceInfoCaptor.capture(), any(IngestionProperties.class)); + FileSourceInfo fileSourceInfoActual = fileSourceInfoCaptor.getValue(); + assertEquals(numberOfChars * 2, fileSourceInfoActual.getRawSizeInBytes()); + } + + @Test + void ingestFromResultSet_SpecifyTempFolder_Success() throws IngestionClientException, IngestionServiceException { + IngestClientImpl ingestClient = new IngestClientImpl(resourceManagerMock, azureStorageHelperMock); + IngestClientImpl ingestClientSpy = spy(ingestClient); + TableReportIngestionResult ingestionResultMock = mock(TableReportIngestionResult.class); + + doReturn(ingestionResultMock).when(ingestClientSpy).ingestFromFile(any(), any()); + long numberOfChars = 1000; + doReturn(numberOfChars).when(ingestClientSpy).resultSetToCsv(any(), any(), anyBoolean()); + + ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(mock(ResultSet.class)); + + File tempPath = new File(Paths.get(System.getProperty("java.io.tmpdir"), String.valueOf(System.currentTimeMillis())).toString()); + //noinspection ResultOfMethodCallIgnored + tempPath.mkdirs(); + + String tempFolderPath = tempPath.toString(); + + ingestClientSpy.ingestFromResultSet(resultSetSourceInfo, ingestionProperties, tempFolderPath); + verify(ingestClientSpy).ingestFromResultSet(resultSetSourceInfo, ingestionProperties, tempFolderPath); + + // captor to allow us to inspect the internal call values + ArgumentCaptor fileSourceInfoCaptor = ArgumentCaptor.forClass(FileSourceInfo.class); + verify(ingestClientSpy).ingestFromFile(fileSourceInfoCaptor.capture(), any(IngestionProperties.class)); + FileSourceInfo fileSourceInfoActual = fileSourceInfoCaptor.getValue(); + assertEquals(numberOfChars * 2, fileSourceInfoActual.getRawSizeInBytes()); + // make sure the temp file was written to the folder we specified + Assertions.assertTrue(fileSourceInfoActual.getFilePath().startsWith(tempFolderPath)); + } + + @Test + void ingestFromResultSet_ResultSetToCsv_Error() throws IngestionClientException, IngestionServiceException { + IngestClientImpl ingestClient = new IngestClientImpl(resourceManagerMock, azureStorageHelperMock); + // we need a spy to intercept the call to ingestFromFile so it wouldn't be called + IngestClientImpl ingestClientSpy = spy(ingestClient); + TableReportIngestionResult ingestionResultMock = mock(TableReportIngestionResult.class); + + doReturn(ingestionResultMock).when(ingestClientSpy).ingestFromFile(any(), any()); + doThrow(new IngestionClientException("error in resultSetToCsv")) + .when(ingestClientSpy) + .resultSetToCsv(any(), any(), anyBoolean()); + + ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(mock(ResultSet.class)); + + assertThrows(IngestionClientException.class, + () -> ingestClientSpy.ingestFromResultSet(resultSetSourceInfo, ingestionProperties)); + } + + @Test + void ingestFromResultSet_FileIngest_IngestionClientException() throws IngestionClientException, IngestionServiceException, SQLException { + IngestClient ingestClient = new IngestClientImpl(resourceManagerMock, azureStorageHelperMock); + // we need a spy to intercept the call to ingestFromFile so it wouldn't be called + IngestClient ingestClientSpy = spy(ingestClient); + + IngestionClientException ingestionClientException = new IngestionClientException("Client exception in ingestFromFile"); + doThrow(ingestionClientException).when(ingestClientSpy).ingestFromFile(any(), any()); + + ResultSet resultSet = getSampleResultSet(); + ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(resultSet); + + assertThrows(IngestionClientException.class, + () -> ingestClientSpy.ingestFromResultSet(resultSetSourceInfo, ingestionProperties)); + } + + @Test + void ingestFromResultSet_FileIngest_IngestionServiceException() throws IngestionClientException, IngestionServiceException, SQLException { + IngestClient ingestClient = new IngestClientImpl(resourceManagerMock, azureStorageHelperMock); + // we need a spy to intercept the call to ingestFromFile so it wouldn't be called + IngestClient ingestClientSpy = spy(ingestClient); + + IngestionServiceException ingestionServiceException = new IngestionServiceException("Service exception in ingestFromFile"); + doThrow(ingestionServiceException).when(ingestClientSpy).ingestFromFile(any(), any()); + + ResultSet resultSet = getSampleResultSet(); + ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(resultSet); + + assertThrows(IngestionServiceException.class, + () -> ingestClientSpy.ingestFromResultSet(resultSetSourceInfo, ingestionProperties)); + } + + @Test + void resultSetToCsv_Success() throws SQLException, IngestionClientException { + IngestClientImpl ingestClient = new IngestClientImpl(resourceManagerMock, azureStorageHelperMock); + ResultSet resultSet = getSampleResultSet(); + StringWriter stringWriter = new StringWriter(); + long numberOfCharsActual = ingestClient.resultSetToCsv(resultSet, stringWriter, false); + + final String expected = getSampleResultSetDump(); + + assertEquals(expected, stringWriter.toString()); // check the string values + assertEquals(expected.length(), numberOfCharsActual); // check the returned length + } + + @Test + void resultSetToCsv_ClosedResultSet_Exception() throws SQLException { + IngestClientImpl ingestClient = new IngestClientImpl(resourceManagerMock, azureStorageHelperMock); + ResultSet resultSet = getSampleResultSet(); + resultSet.close(); + StringWriter stringWriter = new StringWriter(); + + assertThrows(IngestionClientException.class, + () -> ingestClient.resultSetToCsv(resultSet, stringWriter, false)); + } + + @Test + void resultSetToCsv_Writer_Exception() throws SQLException, IOException { + IngestClientImpl ingestClient = new IngestClientImpl(resourceManagerMock, azureStorageHelperMock); + ResultSet resultSet = getSampleResultSet(); + + Writer writer = mock(Writer.class); + doThrow(new IOException("Some exception")).when(writer).write(anyString()); + + assertThrows(IngestionClientException.class, + () -> ingestClient.resultSetToCsv(resultSet, writer, false)); + } + + private ResultSet getSampleResultSet() throws SQLException { + // create a database connection + Connection connection = DriverManager.getConnection("jdbc:sqlite:"); + + Statement statement = connection.createStatement(); + statement.setQueryTimeout(5); // set timeout to 5 sec. + + statement.executeUpdate("drop table if exists person"); + statement.executeUpdate("create table person (id integer, name string)"); + statement.executeUpdate("insert into person values(1, 'leo')"); + statement.executeUpdate("insert into person values(2, 'yui')"); + + return statement.executeQuery("select * from person"); + } + + private String getSampleResultSetDump() { + return System.getProperty("line.separator").equals("\n") ? + "\"1\",\"leo\"\n\"2\",\"yui\"\n" : + "\"1\",\"leo\"\r\n\"2\",\"yui\"\r\n"; + } } diff --git a/ingest/src/test/java/com/microsoft/azure/kusto/ingest/source/ResultSetSourceInfoTest.java b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/source/ResultSetSourceInfoTest.java new file mode 100644 index 00000000..a066514f --- /dev/null +++ b/ingest/src/test/java/com/microsoft/azure/kusto/ingest/source/ResultSetSourceInfoTest.java @@ -0,0 +1,48 @@ +package com.microsoft.azure.kusto.ingest.source; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.sql.ResultSet; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ResultSetSourceInfoTest { + + @Test + @DisplayName("test set/get resultset") + void ResultSet_GetSet_Success() { + ResultSet mockResultSet = Mockito.mock(ResultSet.class); + ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(mockResultSet); + + //this also tests that the constructor worked as expected + Assertions.assertEquals(mockResultSet, resultSetSourceInfo.getResultSet()); + + //use the setter to replace the resultset + ResultSet mockResultSet1 = Mockito.mock(ResultSet.class); + resultSetSourceInfo.setResultSet(mockResultSet1); + Assertions.assertEquals(mockResultSet1, resultSetSourceInfo.getResultSet()); + } + + @Test + @DisplayName("test new object with null resultset") + void Constructor_NullResultSet_Exception() { + assertThrows(NullPointerException.class, () -> new ResultSetSourceInfo(null)); + } + + @Test + @DisplayName("test object toString") + void toString_Success() { + ResultSet mockResultSet = Mockito.mock(ResultSet.class); + UUID uuid = UUID.randomUUID(); + + ResultSetSourceInfo resultSetSourceInfo = new ResultSetSourceInfo(mockResultSet, uuid); + + Assertions.assertTrue(resultSetSourceInfo.toString().contains(uuid.toString())); + } + + +} \ No newline at end of file diff --git a/ingest/src/test/resources/simplelogger.properties b/ingest/src/test/resources/simplelogger.properties new file mode 100644 index 00000000..9a311880 --- /dev/null +++ b/ingest/src/test/resources/simplelogger.properties @@ -0,0 +1,7 @@ +# SLF4J's SimpleLogger configuration file +# Simple implementation of Logger that sends all enabled log messages, for all defined loggers, to System.err. + +# Default logging detail level for all instances of SimpleLogger. +# Must be one of ("trace", "debug", "info", "warn", or "error"). +# If not specified, defaults to "info". +org.slf4j.simpleLogger.log.com.microsoft.azure.kusto=debug \ No newline at end of file diff --git a/pom.xml b/pom.xml index 0f02797b..4ca92c6f 100644 --- a/pom.xml +++ b/pom.xml @@ -33,9 +33,9 @@ 1.8 1.8.0-beta2 3.8.0 - 2.22.0 + 2.22.1 5.2.0 - 2.21.0 + 2.23.4