Skip to content

Commit

Permalink
sasl: Enable STS credential refresh (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
garrett528 authored Sep 14, 2021
1 parent b7c8393 commit 90e27a0
Show file tree
Hide file tree
Showing 17 changed files with 1,847 additions and 579 deletions.
6 changes: 5 additions & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ ssl_key | * | |
ssl.certificate.location | * | | | low | Path to client's public key (PEM) used for authentication. <br>*Type: string*
ssl.certificate.pem | * | | | low | Client's public key string (PEM format) used for authentication. <br>*Type: string*
ssl_certificate | * | | | low | Client's public key as set by rd_kafka_conf_set_ssl_cert() <br>*Type: see dedicated API*
ssl.ca.location | * | | probe | low | File or directory path to CA certificate(s) for verifying the broker's key. Defaults: On Windows the system's CA certificates are automatically looked up in the Windows Root certificate store. On Mac OSX this configuration defaults to `probe`. It is recommended to install openssl using Homebrew, to provide CA certificates. On Linux install the distribution's ca-certificates package. If OpenSSL is statically linked or `ssl.ca.location` is set to `probe` a list of standard paths will be probed and the first one found will be used as the default CA certificate location path. If OpenSSL is dynamically linked the OpenSSL library's default path will be used (see `OPENSSLDIR` in `openssl version -a`). <br>*Type: string*
ssl.ca.location | * | | | low | File or directory path to CA certificate(s) for verifying the broker's key. Defaults: On Windows the system's CA certificates are automatically looked up in the Windows Root certificate store. On Mac OSX this configuration defaults to `probe`. It is recommended to install openssl using Homebrew, to provide CA certificates. On Linux install the distribution's ca-certificates package. If OpenSSL is statically linked or `ssl.ca.location` is set to `probe` a list of standard paths will be probed and the first one found will be used as the default CA certificate location path. If OpenSSL is dynamically linked the OpenSSL library's default path will be used (see `OPENSSLDIR` in `openssl version -a`). <br>*Type: string*
ssl_ca | * | | | low | CA certificate as set by rd_kafka_conf_set_ssl_cert() <br>*Type: see dedicated API*
ssl.ca.certificate.stores | * | | Root | low | Comma-separated list of Windows Certificate stores to load CA certificates from. Certificates will be loaded in the same order as stores are specified. If no certificates can be loaded from any of the specified stores an error is logged and the OpenSSL library's default CA location is used instead. Store names are typically one or more of: MY, Root, Trust, CA. <br>*Type: string*
ssl.crl.location | * | | | low | Path to CRL for verifying broker's certificate validity. <br>*Type: string*
Expand All @@ -91,7 +91,11 @@ sasl.password | * | |
sasl.aws.access.key.id | * | | | high | SASL AWS access key id for use with the AWS_MSK_IAM mechanism <br>*Type: string*
sasl.aws.secret.access.key | * | | | high | SASL AWS secret access key for use with the AWS_MSK_IAM mechanism <br>*Type: string*
sasl.aws.region | * | | | high | SASL AWS region for use with the AWS_MSK_IAM mechanism <br>*Type: string*
enable.sasl.aws.use.sts | * | true, false | false | low | Enable the builtin AWS STS credential refresh handler. Only use this if you intend to use temporary credentials. If you use permanent credentials, keep this with the default (disabled). <br>*Type: boolean*
sasl.aws.security.token | * | | | high | SASL AWS security for use with the AWS_MSK_IAM mechanism if using STS (temp) credentials <br>*Type: string*
sasl.aws.role.arn | * | | | high | AWS RoleARN to use for calling STS. <br>*Type: string*
sasl.aws.role.session.name | * | | | high | Session name to use for STS AssumeRole. <br>*Type: string*
sasl.aws.duration.sec | * | 900 .. 43200 | 900 | low | The duration, in seconds, of the role session. Minimum is 900 seconds (15 minutes) and max is 12 hours. This will default to 900 seconds if not set. <br>*Type: integer*
sasl.oauthbearer.config | * | | | low | SASL/OAUTHBEARER configuration. The format is implementation-dependent and must be parsed accordingly. The default unsecured token implementation (see https://tools.ietf.org/html/rfc7515#appendix-A.5) recognizes space-separated name=value pairs with valid names including principalClaimName, principal, scopeClaimName, scope, and lifeSeconds. The default value for principalClaimName is "sub", the default value for scopeClaimName is "scope", and the default value for lifeSeconds is 3600. The scope value is CSV format with the default value being no/empty scope. For example: `principalClaimName=azp principal=admin scopeClaimName=roles scope=role1,role2 lifeSeconds=600`. In addition, SASL extensions can be communicated to the broker via `extension_NAME=value`. For example: `principal=admin extension_traceId=123` <br>*Type: string*
enable.sasl.oauthbearer.unsecure.jwt | * | true, false | false | low | Enable the builtin unsecure JWT OAUTHBEARER token handler if no oauthbearer_refresh_cb has been set. This builtin handler should only be used for development or testing, and not in production. <br>*Type: boolean*
oauthbearer_token_refresh_cb | * | | | low | SASL/OAUTHBEARER token refresh callback (set with rd_kafka_conf_set_oauthbearer_token_refresh_cb(), triggered by rd_kafka_poll(), et.al. This callback will be triggered when it is time to refresh the client's OAUTHBEARER token. <br>*Type: see dedicated API*
Expand Down
5 changes: 4 additions & 1 deletion configure.self
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ void foo (void) {
# SASL AWS MSK IAM requires base64 encoding from OpenSSL
if mkl_lib_check "curl" "" disable CC "-lcurl" \
"#include <curl/curl.h>"; then
mkl_allvar_set WITH_SASL_AWS_MSK_IAM WITH_SASL_AWS_MSK_IAM y
if mkl_lib_check "libxml2" "" disable CC "-lxml2" \
"#include <libxml/parser.h>"; then
mkl_allvar_set WITH_SASL_AWS_MSK_IAM WITH_SASL_AWS_MSK_IAM y
fi
fi
fi

Expand Down
52 changes: 45 additions & 7 deletions examples/producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ int main (int argc, char **argv) {
const char *aws_access_key_id; /* Argument: aws access key id for IAM auth */
const char *aws_secret_access_key; /* Argument: aws secret access key for IAM auth */
const char *aws_region; /* Argument: aws region for IAM auth */
const char *aws_security_token; /* Argument: aws security token for temp credentials */
const char *role_arn; /* Argument: aws RoleARN to use for STS */
const char *role_session_name; /* Argument: aws session name to use for STS */

/*
* Argument validation
Expand All @@ -103,6 +106,15 @@ int main (int argc, char **argv) {
aws_access_key_id = argv[3];
aws_secret_access_key = argv[4];
aws_region = argv[5];
} else if (argc == 9) {
brokers = argv[1];
topic = argv[2];
aws_access_key_id = argv[3];
aws_secret_access_key = argv[4];
aws_region = argv[5];
aws_security_token = argv[6];
role_arn = argv[7];
role_session_name = argv[8];
} else {
fprintf(stderr, "%% Usage: %s <broker> <topic> (optional) <aws_access_key_id> <aws_secret_access_key> <aws_region>\n", argv[0]);
return 1;
Expand All @@ -123,8 +135,14 @@ int main (int argc, char **argv) {
fprintf(stderr, "%s\n", errstr);
return 1;
}

if (rd_kafka_conf_set(conf, "debug", "broker,security",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
return 1;
}

if (argc == 6) {
if (argc >= 6) {
if (rd_kafka_conf_set(conf, "security.protocol", "SASL_SSL",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
Expand Down Expand Up @@ -154,12 +172,32 @@ int main (int argc, char **argv) {
fprintf(stderr, "%s\n", errstr);
return 1;
}

// if (rd_kafka_conf_set(conf, "debug", "all",
// errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
// fprintf(stderr, "%s\n", errstr);
// return 1;
// }
}

if (argc == 9) {
if (rd_kafka_conf_set(conf, "sasl.aws.security.token", aws_security_token,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
return 1;
}

if (rd_kafka_conf_set(conf, "sasl.aws.role.arn", role_arn,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
return 1;
}

if (rd_kafka_conf_set(conf, "sasl.aws.role.session.name", role_session_name,
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
return 1;
}

if (rd_kafka_conf_set(conf, "enable.sasl.aws.use.sts", "1",
errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr, "%s\n", errstr);
return 1;
}
}

/* Set the delivery report callback.
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_txnmgr.c rdkafka_coord.c \
rdvarint.c rdbuf.c rdmap.c rdunittest.c \
rdkafka_mock.c rdkafka_mock_handlers.c rdkafka_mock_cgrp.c \
rdkafka_error.c rdstringbuilder.c\
rdkafka_error.c rdstringbuilder.c rdkafka_aws.c \
$(SRCS_y)

HDRS= rdkafka.h rdkafka_mock.h
Expand Down
5 changes: 5 additions & 0 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
#include "rdkafka_interceptor.h"
#include "rdkafka_idempotence.h"
#include "rdkafka_sasl_oauthbearer.h"
#include "rdkafka_sasl_aws_msk_iam.h"
#if WITH_SSL
#include "rdkafka_ssl.h"
#endif
Expand Down Expand Up @@ -2231,6 +2232,10 @@ rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *app_conf,
rk->rk_conf.enabled_events |=
RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH;
#endif
#if WITH_SASL_AWS_MSK_IAM
rk->rk_conf.enabled_events |=
RD_KAFKA_EVENT_AWS_MSK_IAM_CREDENTIAL_REFRESH;
#endif

rk->rk_controllerid = -1;

Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -2098,6 +2098,7 @@ void rd_kafka_conf_set_oauthbearer_token_refresh_cb (
const char *oauthbearer_config,
void *opaque));


/**
* @brief Set socket callback.
*
Expand Down Expand Up @@ -4950,6 +4951,9 @@ typedef int rd_kafka_event_type_t;
#define RD_KAFKA_EVENT_OAUTHBEARER_TOKEN_REFRESH 0x100 /**< SASL/OAUTHBEARER
token needs to be
refreshed */
#define RD_KAFKA_EVENT_AWS_MSK_IAM_CREDENTIAL_REFRESH 0x200 /**< SASL/AWS_MSK_IAM
credentials need to be
refreshed */


/**
Expand Down
Loading

0 comments on commit 90e27a0

Please sign in to comment.