Skip to content

Commit

Permalink
YAML parser implementations (#74)
Browse files Browse the repository at this point in the history
* YAML parser implementations

* Fix dependency based on suggestion

* Remove metals plugin

* Fix code to work with the latest version

* Fix tests and add yaml extraction support

* Implement dynamic parser handling in sources

* Add Yaml example

* Minor tweaks

* Fix tests

* Update changelog and readme

* Handle default case

* Fix typo README.md

Co-authored-by: Matthieu Baechler <matthieu.baechler@gmail.com>

* Refactor refresh return type to be more clear

* Make the monoid implementation leaner

Co-authored-by: Matthieu Baechler <matthieu.baechler@gmail.com>
  • Loading branch information
ntrp and mbaechler authored May 12, 2021
1 parent 96f3c33 commit 11f2001
Show file tree
Hide file tree
Showing 34 changed files with 814 additions and 166 deletions.
8 changes: 5 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [??]
- Removed gRPC
- Implemented support for additional parsers
- Implemented YAML parser

## [0.10 - 02/18/2021]
- Added config to control how often notifications on ACL Source refreshes are sent
Expand All @@ -16,11 +18,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Upgrade to Kafka 2.5.0
- Added Bitbucket Cloud as an ACL source
- Added Branch parameter for Bitbucket Server ACL Source
- Added GitLab as an ACL souce
- Added GitLab as an ACL source
- Massive refactor to better handle refresh and parsing exceptions

## [0.8 - 10/01/2020]
- Added possibility to run AclSyncornizer just once and then stop KSM (Issue #56)
- Added possibility to run AclSynchronizer just once and then stop KSM (Issue #56)
- Updated to Kafka 2.3.1

## [0.7 - 24/07/2019]
Expand Down Expand Up @@ -75,5 +77,5 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Extract ACLs from Kafka.
- Tests including with Kafka running
- GitHub Enterprise Support
- GitHub authentication Support
- GitHub Authentication Support
- Slack Notification Support
50 changes: 45 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ With Conduktor you can visualize your ACLs in your Apache Kafka cluster!

# Kafka Security Manager

Kafka Security Manager (KSM) allows you to manage your Kafka ACLs at scale by leveraging an external source as the source of truth. Zookeeper just contains a copy of the ACLs instead of being the source.
Kafka Security Manager (KSM) allows you to manage your Kafka ACLs at scale by leveraging an external source as the source of truth.
Zookeeper just contains a copy of the ACLs instead of being the source.

![Kafka Security Manager Diagram](https://i.imgur.com/BuikeuB.png)

Expand All @@ -19,16 +20,54 @@ There are several advantages to this:

Your role is to ensure that Kafka Security Manager is never down, as it is now a custodian of your ACL.

A sample CSV to manage ACL is:
## Parsers

### CSV
The csv parser is the default parser and also the fallback one in case no other parser is matched.

This is a sample CSV acl file:
```
KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host
User:alice,Topic,LITERAL,foo,Read,Allow,*
User:bob,Group,PREFIXED,bar,Write,Deny,12.34.56.78
User:peter,Cluster,LITERAL,kafka-cluster,Create,Allow,*
```

**Important Note**: As of KSM 0.4, a new column `PatternType` has been added to match the changes that happened in Kafka 2.0. This enables KSM to manage `LITERAL` and `PREFIXED` ACLs. See #28

### YAML
The yaml parser will load ACLs from yaml instead, to activate the parser just provide files with `yml` or `yaml` extension.

An example YAML permission file might be:
```yaml
users:
alice:
topics:
foo:
- Read
bar*:
- Produce
bob:
groups:
bar:
- Write,Deny,12.34.56.78
bob*:
- All
transactional_ids:
bar-*:
- All
peter:
clusters:
kafka-cluster:
- Create
```
The YAML parser will handle automatically prefix patterns by simply appending a star to your resource name.
It also supports some helpers to simplify setup:
- Consume (Read, Describe)
- Produce (Write, Describe, Create, Cluster Create)
## Sources
Current sources shipping with KSM include:
- File
- GitHub
Expand Down Expand Up @@ -113,7 +152,8 @@ Overall we use the [lightbend config](https://github.com/lightbend/config) libra
The [default configurations](src/main/resources/application.conf) can be overwritten using the following environment variables:

- `KSM_READONLY=false`: enables KSM to synchronize from an External ACL source. The default value is `true`, which prevents KSM from altering ACLs in Zookeeper
- `KSM_EXTRACT=true`: enable extract mode (get all the ACLs from Kafka formatted as a CSV)
- `KSM_EXTRACT=true`: enable extract mode (get all the ACLs from Kafka formatted as a CSV or YAML)
- `KSM_EXTRACT_FORMAT=csv`: selects which format to extract the ACLs with (defaults to csv, supports also yaml)
- `KSM_REFRESH_FREQUENCY_MS=10000`: how often to check for changes in ACLs in Kafka and in the Source. 10000 ms by default. If it's set to `0` or negative value, for example `-1`, then KMS executes ACL synchronization just once and exits
- `KSM_NUM_FAILED_REFRESHES_BEFORE_NOTIFICATION=1`: how many times that the refresh of a Source needs to fail (e.g. HTTP timeouts) before a notification is sent. Any value less than or equal to `1` here will notify on every failure to refresh.
- `AUTHORIZER_CLASS`: authorizer class for ACL operations. Default is `SimpleAclAuthorizer`, configured with
Expand All @@ -138,7 +178,7 @@ The [default configurations](src/main/resources/application.conf) can be overwri
- `com.github.conduktor.ksm.source.NoSourceAcl` (default): No source for the ACLs. Only use with `KSM_READONLY=true`
- `com.github.conduktor.ksm.source.FileSourceAcl`: get the ACL source from a file on disk. Good for POC
- `com.github.conduktor.ksm.source.GitHubSourceAcl`: get the ACL from GitHub. Great to get started quickly and store the ACL securely under version control.
- `com.github.conduktor.ksm.source.GitLabSourceAcl`: get the ACL from GitLab using pesonal access tokens. Great to get started quickly and store the ACL securely under version control.
- `com.github.conduktor.ksm.source.GitLabSourceAcl`: get the ACL from GitLab using personal access tokens. Great to get started quickly and store the ACL securely under version control.
- `SOURCE_GITLAB_REPOID` GitLab project id
- `SOURCE_GITLAB_FILEPATH` Path to the ACL file in GitLab project
- `SOURCE_GITLAB_BRANCH` Git Branch name
Expand Down
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ libraryDependencies ++= Seq(

// test
"org.scalatest" %% "scalatest" % "3.0.5" % Test,
"org.scalamock" %% "scalamock" % "5.1.0" % Test,

// logging
"org.slf4j" % "slf4j-api" % "1.7.25",
Expand All @@ -38,6 +39,8 @@ libraryDependencies ++= Seq(

// parsers
"com.github.tototoshi" %% "scala-csv" % "1.3.5",
"io.circe" %% "circe-yaml" % "0.12.0",
"io.circe" %% "circe-generic" % "0.12.0",

// APIs
"org.skinny-framework" %% "skinny-http-client" % "2.3.7",
Expand Down
29 changes: 29 additions & 0 deletions example/acls.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
users:
alice:
topics:
foo:
- Read
baz*:
- Read
my-kafka-streams-app*:
- Create
bob:
groups:
bar:
- Write,Deny,12.34.56.78
transactional_ids:
bar-*:
- All
peter:
clusters:
kafka-cluster:
- Create
schemareg:
topics:
_schemas:
- All
'*':
- All
groups:
schema-registry:
- All
3 changes: 3 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ ksm {
extract = false
extract = ${?KSM_EXTRACT}

extract.format = csv
extract.format = ${?KSM_EXTRACT_FORMAT}

num.failed.refreshes.before.notification = 1
num.failed.refreshes.before.notification = ${?KSM_NUM_FAILED_REFRESHES_BEFORE_NOTIFICATION}

Expand Down
8 changes: 3 additions & 5 deletions src/main/scala/com/github/conduktor/ksm/AclSynchronizer.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package com.github.conduktor.ksm

import com.github.conduktor.ksm.notification.Notification
import com.github.conduktor.ksm.parser.AclParser
import com.github.conduktor.ksm.source.SourceAcl
import com.github.conduktor.ksm.source.{ParsingContext, SourceAcl}
import kafka.security.auth.{Acl, Authorizer, Resource}
import org.slf4j.{Logger, LoggerFactory}

Expand Down Expand Up @@ -60,7 +59,6 @@ class AclSynchronizer(
authorizer: Authorizer,
sourceAcl: SourceAcl,
notification: Notification,
aclParser: AclParser,
numFailedRefreshesBeforeNotification: Int,
readOnly: Boolean = false
) extends Runnable {
Expand Down Expand Up @@ -97,8 +95,8 @@ class AclSynchronizer(
authorizer
)
}
case Some(reader) =>
val sourceAclResult = aclParser.aclsFromReader(reader)
case Some(ParsingContext(parser, reader)) =>
val sourceAclResult = parser.aclsFromReader(reader)
reader.close()
sourceAclResult.result match {
// the source has changed
Expand Down
16 changes: 10 additions & 6 deletions src/main/scala/com/github/conduktor/ksm/AppConfig.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.github.conduktor.ksm

import com.github.conduktor.ksm.notification.Notification
import com.github.conduktor.ksm.parser.AclParserRegistry
import com.github.conduktor.ksm.source.SourceAcl
import com.typesafe.config.Config
import kafka.security.auth.Authorizer
import kafka.utils.CoreUtils

import scala.collection.JavaConverters._
import scala.util.Try

class AppConfig(config: Config) {

Expand All @@ -31,13 +31,16 @@ class AppConfig(config: Config) {

object Source {
private val sourceAclClass = config.getString("source.class")
val sourceAcl: SourceAcl = CoreUtils.createObject[SourceAcl](sourceAclClass)
def createSource(parserRegistry: AclParserRegistry):SourceAcl = {
val sourceAcl: SourceAcl = CoreUtils.createObject[SourceAcl](sourceAclClass, parserRegistry)

// here we get a dynamic config prefix given by the class.
// this will allow multiple classes to co-exist in the same config and avoid collisions
private val sourceAclConfig =
// here we get a dynamic config prefix given by the class.
// this will allow multiple classes to co-exist in the same config and avoid collisions
val sourceAclConfig =
config.getConfig(s"source.${sourceAcl.CONFIG_PREFIX}")
sourceAcl.configure(sourceAclConfig)
sourceAcl.configure(sourceAclConfig)
sourceAcl
}
}

object Notification {
Expand All @@ -57,6 +60,7 @@ class AppConfig(config: Config) {
val refreshFrequencyMs: Int = ksmConfig.getInt("refresh.frequency.ms")
val numFailedRefreshesBeforeNotification: Int = ksmConfig.getInt("num.failed.refreshes.before.notification")
val extract: Boolean = ksmConfig.getBoolean("extract")
val extractFormat: String = ksmConfig.getString("extract.format")
val readOnly: Boolean = ksmConfig.getBoolean("readonly")
}

Expand Down
21 changes: 7 additions & 14 deletions src/main/scala/com/github/conduktor/ksm/KafkaSecurityManager.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
package com.github.conduktor.ksm

import java.util.concurrent.atomic.AtomicBoolean

import com.github.conduktor.ksm.parser.CsvAclParser
import com.github.conduktor.ksm.parser.AclParserRegistry
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory

import scala.util.{Failure, Success, Try}
import java.util.concurrent.{
ExecutionException,
Executors,
ScheduledExecutorService,
TimeUnit
}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{ExecutionException, Executors, ScheduledExecutorService, TimeUnit}

object KafkaSecurityManager extends App {

Expand All @@ -23,17 +16,17 @@ object KafkaSecurityManager extends App {

var isCancelled: AtomicBoolean = new AtomicBoolean(false)
var aclSynchronizer: AclSynchronizer = _
val aclParser = new CsvAclParser(appConfig.Parser.csvDelimiter)
val parserRegistry: AclParserRegistry = new AclParserRegistry(appConfig)
val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)

if (appConfig.KSM.extract) {
new ExtractAcl(appConfig.Authorizer.authorizer, aclParser).extract()
val parser = parserRegistry.getParser(appConfig.KSM.extractFormat)
new ExtractAcl(appConfig.Authorizer.authorizer, parser).extract()
} else {
aclSynchronizer = new AclSynchronizer(
appConfig.Authorizer.authorizer,
appConfig.Source.sourceAcl,
appConfig.Source.createSource(parserRegistry),
appConfig.Notification.notification,
aclParser,
appConfig.KSM.numFailedRefreshesBeforeNotification,
appConfig.KSM.readOnly
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.github.conduktor.ksm.notification
import com.github.conduktor.ksm.parser.CsvParserException
import com.github.conduktor.ksm.parser.csv.CsvParserException
import com.github.conduktor.ksm.parser.yaml.YamlParserException
import com.typesafe.config.Config
import kafka.security.auth.{Acl, Resource}
import org.slf4j.{Logger, LoggerFactory}

import scala.util.{Failure, Try, Success}
import scala.util.{Failure, Success, Try}

case class ConsoleNotification() extends Notification {

Expand All @@ -25,6 +26,8 @@ case class ConsoleNotification() extends Notification {
errs.foreach {
case Failure(cPE: CsvParserException) =>
log.error(s"${cPE.getLocalizedMessage} | Row: ${cPE.printRow()}")
case Failure(cPE: YamlParserException) =>
log.error(s"${cPE.getLocalizedMessage} | Detail: ${cPE.print()}")
case Success(t) => log.error("refresh exception", t)
case Failure(t) => log.error("refresh exception", t)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.conduktor.ksm.notification
import com.fasterxml.jackson.databind.ObjectMapper
import com.github.conduktor.ksm.parser.CsvParserException
import com.github.conduktor.ksm.parser.csv.CsvParserException
import com.github.conduktor.ksm.parser.yaml.YamlParserException
import com.typesafe.config.Config
import kafka.security.auth.{Acl, Resource}
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -89,6 +90,8 @@ class SlackNotification extends Notification {
val messages = errs.map {
case Failure(cPE: CsvParserException) =>
s"${cPE.getLocalizedMessage} | Row: ${cPE.printRow()}"
case Failure(cPE: YamlParserException) =>
s"${cPE.getLocalizedMessage} | Detail: ${cPE.print()}"
case Success(t) => s"refresh exception: ${t.getLocalizedMessage}"
case Failure(t) => s"refresh exception: ${t.getLocalizedMessage}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ import kafka.security.auth.{Acl, Resource}

trait AclParser {

val name: String

def aclsFromReader(reader: Reader): SourceAclResult

def formatAcls(acls: List[(Resource, Acl)]): String

def matchesExtension(extension: String): Boolean

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.github.conduktor.ksm.parser

import com.github.conduktor.ksm.AppConfig
import com.github.conduktor.ksm.parser.csv.CsvAclParser
import com.github.conduktor.ksm.parser.yaml.YamlAclParser

class AclParserRegistry(val appConfig: AppConfig) {

val csvParser = new CsvAclParser(Option(appConfig).map(_.Parser).map(_.csvDelimiter).getOrElse(','))
val yamlParser = new YamlAclParser()

val parserMap: Map[String, AclParser] = Map(
csvParser.name -> csvParser,
yamlParser.name -> yamlParser
)

def getParser(parserName: String): AclParser = {
parserMap.getOrElse(
parserName,
throw new RuntimeException(s"Parse not found for $parserName")
)
}

def getParserByFilename(fileName: String): AclParser = {
val ext = fileName.split("\\.").last
parserMap.values
.find(_.matchesExtension(ext))
.getOrElse(
csvParser
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.github.conduktor.ksm.parser

/**
* Wrapper to exceptions in order to keep data of the row that failed
*
* @param t exception that has been thrown
*/
class ParserException(t: Throwable) extends RuntimeException(t) {}
Loading

0 comments on commit 11f2001

Please sign in to comment.