Skip to content

Commit

Permalink
Feature/move to az cli (#115)
Browse files Browse the repository at this point in the history
* * Add support for accessToken
* * Refactor code for tests
* * Changes to build file
  • Loading branch information
ag-ramachandran authored May 2, 2024
1 parent adbb190 commit d7a57ee
Show file tree
Hide file tree
Showing 8 changed files with 556 additions and 505 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,20 @@ jobs:
build:
name: Maven Build
runs-on: ubuntu-latest
environment: build
# Permissions block is optional, useful for dependabot checks
permissions:
checks: write
contents: read
pull-requests: write
id-token: write
contents: read
steps:
- name: Azure login
uses: azure/login@v2
with:
client-id: ${{ secrets.APP_ID }}
tenant-id: ${{ secrets.TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
- uses: actions/checkout@v3
- name: Setup Java 8
uses: actions/setup-java@v3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigException;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -56,6 +57,9 @@ public class KustoSinkConfig extends AbstractConfig {
private static final String KUSTO_CONNECTION_PROXY_PORT_DISPLAY = "Proxy port used to connect to Kusto";

private static final String KUSTO_AUTH_APPKEY_DISPLAY = "Kusto Auth AppKey";
static final String KUSTO_AUTH_ACCESS_TOKEN_CONF = "aad.auth.accesstoken";
private static final String KUSTO_AUTH_ACCESS_TOKEN_DISPLAY = "Kusto Auth AccessToken";
private static final String KUSTO_AUTH_ACCESS_TOKEN_DOC = "Kusto Access Token for Azure Active Directory authentication";
private static final String KUSTO_AUTH_AUTHORITY_DOC = "Azure Active Directory tenant.";
private static final String KUSTO_AUTH_AUTHORITY_DISPLAY = "Kusto Auth Authority";
private static final String KUSTO_AUTH_STRATEGY_DOC = "Strategy to authenticate against Azure Active Directory, either ``application`` (default) or ``managed_identity``.";
Expand Down Expand Up @@ -276,6 +280,16 @@ private static void defineConnectionConfigs(ConfigDef result) {
connectionGroupOrder++,
Width.MEDIUM,
KUSTO_AUTH_APPKEY_DISPLAY)
.define(
KUSTO_AUTH_ACCESS_TOKEN_CONF,
Type.PASSWORD,
null,
Importance.LOW,
KUSTO_AUTH_ACCESS_TOKEN_DOC,
connectionGroupName,
connectionGroupOrder++,
Width.MEDIUM,
KUSTO_AUTH_ACCESS_TOKEN_DISPLAY)
.define(
KUSTO_AUTH_APPID_CONF,
Type.STRING,
Expand Down Expand Up @@ -314,7 +328,9 @@ private static void defineConnectionConfigs(ConfigDef result) {
KustoAuthenticationStrategy.APPLICATION.name(),
KustoAuthenticationStrategy.MANAGED_IDENTITY.name(),
KustoAuthenticationStrategy.APPLICATION.name().toLowerCase(Locale.ENGLISH),
KustoAuthenticationStrategy.MANAGED_IDENTITY.name().toLowerCase(Locale.ENGLISH)),
KustoAuthenticationStrategy.MANAGED_IDENTITY.name().toLowerCase(Locale.ENGLISH),
KustoAuthenticationStrategy.AZ_DEV_TOKEN.name(),
KustoAuthenticationStrategy.AZ_DEV_TOKEN.name().toLowerCase(Locale.ENGLISH)),
Importance.HIGH,
KUSTO_AUTH_STRATEGY_DOC,
connectionGroupName,
Expand Down Expand Up @@ -358,6 +374,9 @@ public String getAuthAppId() {
public String getAuthAppKey() {
return this.getPassword(KUSTO_AUTH_APPKEY_CONF).value();
}
public String getAuthAccessToken() {
return this.getPassword(KUSTO_AUTH_ACCESS_TOKEN_CONF).value();
}

public String getAuthAuthority() {
return this.getString(KUSTO_AUTH_AUTHORITY_CONF);
Expand Down Expand Up @@ -456,7 +475,7 @@ enum BehaviorOnError {
*
* @return array of available behavior on error mode names
*/
public static String[] getNames() {
public static String @NotNull [] getNames() {
return Arrays
.stream(BehaviorOnError.class.getEnumConstants())
.map(Enum::name)
Expand All @@ -465,6 +484,6 @@ public static String[] getNames() {
}

enum KustoAuthenticationStrategy {
APPLICATION, MANAGED_IDENTITY
APPLICATION, MANAGED_IDENTITY, AZ_DEV_TOKEN
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.microsoft.azure.kusto.ingest.IngestionProperties;

class TopicIngestionProperties {
public class TopicIngestionProperties {
IngestionProperties ingestionProperties;
boolean streaming;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;

class TopicPartitionWriter {
public class TopicPartitionWriter {

private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class);
private static final String COMPRESSION_EXTENSION = ".gz";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,27 @@ class ITCoordinates {
final String appId;
final String appKey;
final String authority;
final String accessToken;
final String cluster;
final String ingestCluster;
final String database;

String table;

ITCoordinates(String appId, String appKey, String authority, String cluster, String ingestCluster, String database, String table) {
ITCoordinates(String appId, String appKey, String authority, String accessToken, String cluster,
String ingestCluster, String database, String table) {
this.appId = appId;
this.appKey = appKey;
this.authority = authority;
this.accessToken = accessToken;
this.authority = StringUtils.defaultIfBlank(authority,"microsoft.com");
this.ingestCluster = ingestCluster;
this.cluster = cluster;
this.database = database;
this.table = table;
}

boolean isValidConfig() {
return StringUtils.isNotEmpty(appId) && StringUtils.isNotEmpty(appKey) && StringUtils.isNotEmpty(authority) && StringUtils.isNotEmpty(cluster)
return StringUtils.isNotEmpty(authority) && StringUtils.isNotEmpty(cluster)
&& StringUtils.isNotEmpty(ingestCluster);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package com.microsoft.azure.kusto.kafka.connect.sink.it;

import java.util.Collections;
import java.util.UUID;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;

import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenRequestContext;
import com.azure.identity.AzureCliCredentialBuilder;

public class ITSetup {
static ITCoordinates getConnectorProperties() {
String testPrefix = "tmpKafkaSinkIT_";
Expand All @@ -16,7 +21,15 @@ static ITCoordinates getConnectorProperties() {
String database = getProperty("database", "e2e", true);
String defaultTable = testPrefix + UUID.randomUUID().toString().replace('-', '_');
String table = getProperty("table", defaultTable, true);
return new ITCoordinates(appId, appKey, authority, cluster, ingestCluster, database, table);
return new ITCoordinates(appId, appKey, authority,getAccessToken(cluster), cluster, ingestCluster, database, table);
}

private static String getAccessToken(String cluster) {
String clusterScope = String.format("%s/.default", cluster);
TokenRequestContext tokenRequestContext = new TokenRequestContext()
.setScopes(Collections.singletonList(clusterScope));
AccessToken accessTokenObj = new AzureCliCredentialBuilder().build().getTokenSync(tokenRequestContext);
return accessTokenObj.getToken();
}

private static String getProperty(String attribute, String defaultValue, boolean sanitize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ class KustoSinkIT {

@BeforeAll
public static void startContainers() throws Exception {

coordinates = getConnectorProperties();
if (coordinates.isValidConfig()) {
ConnectionStringBuilder engineCsb = ConnectionStringBuilder.createWithAadApplicationCredentials(coordinates.cluster,
coordinates.appId, coordinates.appKey, coordinates.authority);
ConnectionStringBuilder dmCsb = ConnectionStringBuilder.createWithAadApplicationCredentials(coordinates.ingestCluster,
coordinates.appId, coordinates.appKey, coordinates.authority);
ConnectionStringBuilder engineCsb = ConnectionStringBuilder.createWithAadAccessTokenAuthentication(coordinates.cluster,
coordinates.accessToken);
ConnectionStringBuilder dmCsb = ConnectionStringBuilder.
createWithAadAccessTokenAuthentication(coordinates.ingestCluster,coordinates.accessToken);
engineClient = ClientFactory.createClient(engineCsb);
dmClient = ClientFactory.createClient(dmCsb);
log.info("Creating tables in Kusto");
Expand Down Expand Up @@ -190,8 +191,8 @@ public void shouldHandleAllTypesOfEvents() {
connectorProps.put("topics", String.format("e2e.%s.topic", dataFormat));
connectorProps.put("kusto.tables.topics.mapping", topicTableMapping);
connectorProps.put("aad.auth.authority", coordinates.authority);
connectorProps.put("aad.auth.appid", coordinates.appId);
connectorProps.put("aad.auth.appkey", coordinates.appKey);
connectorProps.put("aad.auth.accesstoken", coordinates.accessToken);
connectorProps.put("aad.auth.strategy", "AZ_DEV_TOKEN".toLowerCase());
connectorProps.put("kusto.query.url", coordinates.cluster);
connectorProps.put("kusto.ingestion.url", coordinates.ingestCluster);
connectorProps.put("schema.registry.url", srUrl);
Expand Down

0 comments on commit d7a57ee

Please sign in to comment.