Skip to content

Commit

Permalink
Merge pull request #73 from Azure/dev
Browse files Browse the repository at this point in the history
Version 1.0.0-BETA-03
  • Loading branch information
tamirkamara authored Feb 26, 2019
2 parents 1217310 + 91049c4 commit 0c6b678
Show file tree
Hide file tree
Showing 32 changed files with 1,297 additions and 618 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Microsoft Azure Kusto (Azure Data Explorer) SDK for Java

master: [![Build Status](https://travis-ci.org/Azure/azure-kusto-java.svg)](https://travis-ci.org/Azure/azure-kusto-java)
dev: [![Build Status](https://travis-ci.org/Azure/azure-kusto-java.svg?branch=dev)](https://travis-ci.org/Azure/azure-kusto-java)
master: [![Build status](https://msazure.visualstudio.com/One/_apis/build/status/Custom/Kusto/azure-kusto-java-sdk%20ci?branchName=master)](https://msazure.visualstudio.com/One/_build/latest?definitionId=57651)
dev: [![Build status](https://msazure.visualstudio.com/One/_apis/build/status/Custom/Kusto/azure-kusto-java-sdk%20ci?branchName=dev)](https://msazure.visualstudio.com/One/_build/latest?definitionId=57651)

This is the Microsoft Azure Kusto client library which allows communication with Kusto to bring data in (ingest) and query information already stored in the database.
This library contains 3 different modules:
Expand Down
30 changes: 29 additions & 1 deletion data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>kusto-client</artifactId>
<groupId>com.microsoft.azure.kusto</groupId>
<version>1.0.0-BETA-01</version>
<version>1.0.0-BETA-03</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -46,6 +46,34 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>${maven-source-plugin.version}</version>
<executions>
<execution>
<id>attach-sources</id>
<phase>verify</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>${maven-javadoc-plugin.version}</version>
<executions>
<execution>
<id>attach-javadoc</id>
<phase>verify</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@

import javax.naming.ServiceUnavailableException;
import java.awt.*;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class AadAuthenticationHelper {
class AadAuthenticationHelper {

private final static String DEFAULT_AAD_TENANT = "common";
private final static String CLIENT_ID = "db662dc1-0cfe-4e1c-a843-19a68e65be58";
final static long MIN_ACCESS_TOKEN_VALIDITY_IN_MILLISECS = 60000;

private ClientCredential clientCredential;
private String userUsername;
Expand All @@ -29,15 +32,21 @@ public class AadAuthenticationHelper {
private X509Certificate x509Certificate;
private PrivateKey privateKey;
private AuthenticationType authenticationType;
private String accessToken;
private AuthenticationResult lastAuthenticationResult;
private Lock lastAuthenticationResultLock = new ReentrantLock();
private String applicationClientId;

private enum AuthenticationType {
AAD_USERNAME_PASSWORD,
AAD_APPLICATION_KEY,
AAD_DEVICE_LOGIN,
AAD_APPLICATION_CERTIFICATE
AAD_APPLICATION_CERTIFICATE,
AAD_ACCESS_TOKEN
}

public AadAuthenticationHelper(@NotNull ConnectionStringBuilder csb) throws URISyntaxException {
AadAuthenticationHelper(@NotNull ConnectionStringBuilder csb) throws URISyntaxException {

URI clusterUri = new URI(csb.getClusterUrl());
clusterUrl = String.format("%s://%s", clusterUri.getScheme(), clusterUri.getHost());
if (StringUtils.isNotEmpty(csb.getApplicationClientId()) && StringUtils.isNotEmpty(csb.getApplicationKey())) {
Expand All @@ -50,8 +59,11 @@ public AadAuthenticationHelper(@NotNull ConnectionStringBuilder csb) throws URIS
} else if (csb.getX509Certificate() != null && csb.getPrivateKey() != null) {
x509Certificate = csb.getX509Certificate();
privateKey = csb.getPrivateKey();
clientCredential = new ClientCredential(csb.getApplicationClientId(), null);
applicationClientId = csb.getApplicationClientId();
authenticationType = AuthenticationType.AAD_APPLICATION_CERTIFICATE;
} else if (StringUtils.isNotBlank(csb.getAccessToken())) {
authenticationType = AuthenticationType.AAD_ACCESS_TOKEN;
accessToken = csb.getAccessToken();
} else {
authenticationType = AuthenticationType.AAD_DEVICE_LOGIN;
}
Expand All @@ -61,24 +73,26 @@ public AadAuthenticationHelper(@NotNull ConnectionStringBuilder csb) throws URIS
aadAuthorityUri = String.format("https://login.microsoftonline.com/%s", aadAuthorityId);
}

String acquireAccessToken() throws DataServiceException {
try {
switch (authenticationType) {
case AAD_APPLICATION_KEY:
return acquireAadApplicationAccessToken().getAccessToken();
case AAD_USERNAME_PASSWORD:
return acquireAadUserAccessToken().getAccessToken();
case AAD_DEVICE_LOGIN:
return acquireAccessTokenUsingDeviceCodeFlow().getAccessToken();
case AAD_APPLICATION_CERTIFICATE:
return acquireWithClientCertificate().getAccessToken();
default:
throw new DataServiceException("Authentication type: " + authenticationType.name() + " is invalid");
String acquireAccessToken() throws DataServiceException {
if (authenticationType == AuthenticationType.AAD_ACCESS_TOKEN) {
return accessToken;
}

if (lastAuthenticationResult == null) {
acquireToken();
} else if (isTokenExpired()) {
if (lastAuthenticationResult.getRefreshToken() == null) {
acquireToken();
} else {
lastAuthenticationResultLock.lock();
if (isTokenExpired()) {
lastAuthenticationResult = acquireAccessTokenByRefreshToken();
}
lastAuthenticationResultLock.unlock();
}
} catch (Exception e) {
throw new DataServiceException(e.getMessage());
}

return lastAuthenticationResult.getAccessToken();
}

private AuthenticationResult acquireAadUserAccessToken() throws DataServiceException, DataClientException {
Expand Down Expand Up @@ -136,7 +150,7 @@ private AuthenticationResult acquireAccessTokenUsingDeviceCodeFlow() throws Exce
ExecutorService service = null;
try {
service = Executors.newSingleThreadExecutor();
context = new AuthenticationContext( aadAuthorityUri, true, service);
context = new AuthenticationContext(aadAuthorityUri, true, service);
Future<DeviceCode> future = context.acquireDeviceCode(CLIENT_ID, clusterUrl, null);
DeviceCode deviceCode = future.get();
System.out.println(deviceCode.getMessage());
Expand All @@ -158,35 +172,37 @@ private AuthenticationResult acquireAccessTokenUsingDeviceCodeFlow() throws Exce
}

private AuthenticationResult waitAndAcquireTokenByDeviceCode(DeviceCode deviceCode, AuthenticationContext context)
throws InterruptedException{
throws InterruptedException {
int timeout = 15 * 1000;
AuthenticationResult result = null;
while (timeout > 0){
try{
while (timeout > 0) {
try {
Future<AuthenticationResult> futureResult = context.acquireTokenByDeviceCode(deviceCode, null);
return futureResult.get();
} catch (ExecutionException e) {
Thread.sleep(1000);
timeout -= 1000;
Thread.sleep(1000);
timeout -= 1000;
}
}
return result;
}

AuthenticationResult acquireWithClientCertificate()
throws IOException, InterruptedException, ExecutionException, ServiceUnavailableException{
throws InterruptedException, ExecutionException, ServiceUnavailableException {

AuthenticationContext context;
AuthenticationResult result;
AuthenticationResult result = null;
ExecutorService service = null;

try {
service = Executors.newSingleThreadExecutor();
context = new AuthenticationContext(aadAuthorityUri, false, service);
AsymmetricKeyCredential asymmetricKeyCredential = AsymmetricKeyCredential.create(clientCredential.getClientId(),
AsymmetricKeyCredential asymmetricKeyCredential = AsymmetricKeyCredential.create(applicationClientId,
privateKey, x509Certificate);
// pass null value for optional callback function and acquire access token
result = context.acquireToken(clusterUrl, asymmetricKeyCredential, null).get();
} catch (MalformedURLException e) {
e.printStackTrace();
} finally {
if (service != null) {
service.shutdown();
Expand All @@ -198,4 +214,64 @@ AuthenticationResult acquireWithClientCertificate()
return result;
}

private void acquireToken() throws DataServiceException {
lastAuthenticationResultLock.lock();
if (lastAuthenticationResult == null || isTokenExpired()) {
try {
switch (authenticationType) {
case AAD_APPLICATION_KEY:
lastAuthenticationResult = acquireAadApplicationAccessToken();
break;
case AAD_USERNAME_PASSWORD:
lastAuthenticationResult = acquireAadUserAccessToken();
break;
case AAD_DEVICE_LOGIN:
lastAuthenticationResult = acquireAccessTokenUsingDeviceCodeFlow();
break;
case AAD_APPLICATION_CERTIFICATE:
lastAuthenticationResult = acquireWithClientCertificate();
break;
default:
throw new DataServiceException("Authentication type: " + authenticationType.name() + " is invalid");
}
} catch (Exception e) {
throw new DataServiceException(e.getMessage());
}
}
lastAuthenticationResultLock.unlock();
}

private boolean isTokenExpired() {
return lastAuthenticationResult.getExpiresOnDate().before(dateInAMinute());
}

AuthenticationResult acquireAccessTokenByRefreshToken() throws DataServiceException {
AuthenticationContext context;
ExecutorService service = null;

try {
service = Executors.newSingleThreadExecutor();
context = new AuthenticationContext(aadAuthorityUri, false, service);
switch (authenticationType) {
case AAD_APPLICATION_KEY:
case AAD_APPLICATION_CERTIFICATE:
return context.acquireTokenByRefreshToken(lastAuthenticationResult.getRefreshToken(), clientCredential, null).get();
case AAD_USERNAME_PASSWORD:
case AAD_DEVICE_LOGIN:
return context.acquireTokenByRefreshToken(lastAuthenticationResult.getRefreshToken(), CLIENT_ID, clusterUrl, null).get();
default:
throw new DataServiceException("Authentication type: " + authenticationType.name() + " is invalid");
}
} catch (Exception e) {
throw new DataServiceException(e.getMessage());
} finally {
if (service != null) {
service.shutdown();
}
}
}

Date dateInAMinute() {
return new Date(System.currentTimeMillis() + MIN_ACCESS_TOKEN_VALIDITY_IN_MILLISECS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,22 @@ public class ClientImpl implements Client {

private AadAuthenticationHelper aadAuthenticationHelper;
private String clusterUrl;
private String clientVersionForTracing;
private String applicationNameForTracing;

public ClientImpl(ConnectionStringBuilder csb) throws URISyntaxException {
clusterUrl = csb.getClusterUrl();
aadAuthenticationHelper = new AadAuthenticationHelper(csb);
clientVersionForTracing = "Kusto.Java.Client";
String version = Utils.GetPackageVersion();
if (StringUtils.isNotBlank(version)) {
clientVersionForTracing += ":" + version;
}

if (StringUtils.isNotBlank(csb.getClientVersionForTracing())) {
clientVersionForTracing += "[" + csb.getClientVersionForTracing() + "]";
}
applicationNameForTracing = csb.getApplicationNameForTracing();
}

public Results execute(String command) throws DataServiceException, DataClientException {
Expand Down Expand Up @@ -74,6 +86,6 @@ public Results execute(String database, String command, ClientRequestProperties
throw new DataClientException(clusterEndpoint, String.format(clusterEndpoint, "Error in executing command: %s, in database: %s", command, database), e);
}

return Utils.post(clusterEndpoint, aadAccessToken, jsonString, timeoutMs.intValue());
return Utils.post(clusterEndpoint, aadAccessToken, jsonString, timeoutMs.intValue(), clientVersionForTracing, applicationNameForTracing);
}
}
Loading

0 comments on commit 0c6b678

Please sign in to comment.