Skip to content

Commit

Permalink
Merge branch 'master' into handle-runtime-exception
Browse files Browse the repository at this point in the history
  • Loading branch information
simplesteph authored May 13, 2018
2 parents bb15d4a + afd3a26 commit 64e0611
Show file tree
Hide file tree
Showing 20 changed files with 240 additions and 211 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Added gRPC endpoint to perform API calls on KSM (the goal is to build a UI on top of KSM)
- Feature flag for gRPC server (off by default)
- Added gRPC reflection service
- using ScalaFMT instead of Scalariform

## [0.2] - 05/05/2018
- Kafka 1.1.0
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
resolvers += Resolver.typesafeRepo("releases")

addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.2")
addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.5.1")

addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.3")

Expand Down
43 changes: 25 additions & 18 deletions src/main/scala/com/github/simplesteph/ksm/AclSynchronizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,48 +4,54 @@ import com.github.simplesteph.ksm.notification.Notification
import com.github.simplesteph.ksm.source.{SourceAcl, SourceAclResult}
import kafka.security.auth.{Acl, Authorizer, Resource}
import org.slf4j.{Logger, LoggerFactory}

import scala.util.{Failure, Success, Try}

object AclSynchronizer {

private val log: Logger = LoggerFactory.getLogger(classOf[AclSynchronizer].getSimpleName)
private val log: Logger =
LoggerFactory.getLogger(classOf[AclSynchronizer].getSimpleName)

// transform Kafka ACLs to make them more agreeable to deal with
def flattenKafkaAcls(kafkaGroupedAcls: Map[Resource, Set[Acl]]): Set[(Resource, Acl)] = {
kafkaGroupedAcls.keySet.flatMap(resource => kafkaGroupedAcls(resource).map((resource, _)))
def flattenKafkaAcls(
kafkaGroupedAcls: Map[Resource, Set[Acl]]): Set[(Resource, Acl)] = {
kafkaGroupedAcls.keySet.flatMap(resource =>
kafkaGroupedAcls(resource).map((resource, _)))
}

// group the ACL by resource
def regroupAcls(flattenedAcls: Set[(Resource, Acl)]): Map[Resource, Set[Acl]] = {
flattenedAcls.groupBy { case (r: Resource, _: Acl) => r }
def regroupAcls(
flattenedAcls: Set[(Resource, Acl)]): Map[Resource, Set[Acl]] = {
flattenedAcls
.groupBy { case (r: Resource, _: Acl) => r }
.mapValues(_.map((y: (Resource, Acl)) => y._2))
}

// apply changes to Zookeeper / Kafka security and store the results in Notification object
def applySourceAcls(
sourceAcls: Set[(Resource, Acl)],
kafkaAcls: Set[(Resource, Acl)],
notification: Notification,
authZ: Authorizer): Unit = {
def applySourceAcls(sourceAcls: Set[(Resource, Acl)],
kafkaAcls: Set[(Resource, Acl)],
notification: Notification,
authZ: Authorizer): Unit = {
if (sourceAcls == kafkaAcls) {
log.info("No ACL changes")
} else {
val added = sourceAcls -- kafkaAcls
val removed = kafkaAcls -- sourceAcls

regroupAcls(added).foreach { case (resource, acls) => authZ.addAcls(acls, resource) }
regroupAcls(removed).foreach { case (resource, acls) => authZ.removeAcls(acls, resource) }
regroupAcls(added).foreach {
case (resource, acls) => authZ.addAcls(acls, resource)
}
regroupAcls(removed).foreach {
case (resource, acls) => authZ.removeAcls(acls, resource)
}

notification.notifySuccess(added, removed)
}
}
}

class AclSynchronizer(
authorizer: Authorizer,
sourceAcl: SourceAcl,
notification: Notification) {
class AclSynchronizer(authorizer: Authorizer,
sourceAcl: SourceAcl,
notification: Notification) {

import AclSynchronizer._

Expand Down Expand Up @@ -87,7 +93,8 @@ class AclSynchronizer(
}
}

def getKafkaAcls: Set[(Resource, Acl)] = flattenKafkaAcls(authorizer.getAcls())
def getKafkaAcls: Set[(Resource, Acl)] =
flattenKafkaAcls(authorizer.getAcls())

def close(): Unit = {
authorizer.close()
Expand Down
16 changes: 11 additions & 5 deletions src/main/scala/com/github/simplesteph/ksm/AppConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@ class AppConfig(config: Config) {

object Authorizer {
private val authorizerClass = config.getString("authorizer.class")
val authorizer: Authorizer = CoreUtils.createObject[Authorizer](authorizerClass)
val authorizer: Authorizer =
CoreUtils.createObject[Authorizer](authorizerClass)

private val authorizerConfig = config.getConfig("authorizer.config")
private val configMap = authorizerConfig.root().unwrapped().asScala.map { case (s, a) => (s, a.toString) }
private val configMap = authorizerConfig.root().unwrapped().asScala.map {
case (s, a) => (s, a.toString)
}
authorizer.configure(configMap.asJava)
}

Expand All @@ -25,17 +28,20 @@ class AppConfig(config: Config) {

// here we get a dynamic config prefix given by the class.
// this will allow multiple classes to co-exist in the same config and avoid collisions
private val sourceAclConfig = config.getConfig(s"source.${sourceAcl.CONFIG_PREFIX}")
private val sourceAclConfig =
config.getConfig(s"source.${sourceAcl.CONFIG_PREFIX}")
sourceAcl.configure(sourceAclConfig)
}

object Notification {
private val notificationClass = config.getString("notification.class")
val notification: Notification = CoreUtils.createObject[Notification](notificationClass)
val notification: Notification =
CoreUtils.createObject[Notification](notificationClass)

// here we get a dynamic config prefix given by the class.
// this will allow multiple classes to co-exist in the same config and avoid collisions
private val notificationConfig = config.getConfig(s"notification.${notification.CONFIG_PREFIX}")
private val notificationConfig =
config.getConfig(s"notification.${notification.CONFIG_PREFIX}")
notification.configure(notificationConfig)
}

Expand Down
8 changes: 3 additions & 5 deletions src/main/scala/com/github/simplesteph/ksm/ExtractAcl.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package com.github.simplesteph.ksm

import com.github.simplesteph.ksm.parser.AclParser
import kafka.security.auth.{ Acl, Authorizer, Resource }
import org.slf4j.{ Logger, LoggerFactory }
import kafka.security.auth.{Acl, Authorizer, Resource}
import org.slf4j.{Logger, LoggerFactory}

class ExtractAcl(
authorizer: Authorizer,
aclParser: AclParser) {
class ExtractAcl(authorizer: Authorizer, aclParser: AclParser) {

val log: Logger = LoggerFactory.getLogger(classOf[ExtractAcl].getSimpleName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.github.simplesteph.ksm.parser.CsvAclParser
import com.typesafe.config.ConfigFactory
import org.slf4j.LoggerFactory

import scala.util.{ Failure, Success, Try }
import scala.util.{Failure, Success, Try}

object KafkaSecurityManager extends App {

Expand All @@ -23,16 +23,14 @@ object KafkaSecurityManager extends App {
if (appConfig.KSM.extract) {
new ExtractAcl(appConfig.Authorizer.authorizer, CsvAclParser).extract()
} else {
aclSynchronizer = new AclSynchronizer(
appConfig.Authorizer.authorizer,
appConfig.Source.sourceAcl,
appConfig.Notification.notification)
aclSynchronizer = new AclSynchronizer(appConfig.Authorizer.authorizer,
appConfig.Source.sourceAcl,
appConfig.Notification.notification)

Try {
grpcServer = new KsmGrpcServer(
aclSynchronizer,
appConfig.GRPC.port,
appConfig.Feature.grpc)
grpcServer = new KsmGrpcServer(aclSynchronizer,
appConfig.GRPC.port,
appConfig.Feature.grpc)
grpcServer.start()
} match {
case Success(_) =>
Expand Down
18 changes: 10 additions & 8 deletions src/main/scala/com/github/simplesteph/ksm/grpc/KsmGrpcServer.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package com.github.simplesteph.ksm.grpc

import com.github.simplesteph.ksm.{ AclSynchronizer, KafkaSecurityManager }
import com.github.simplesteph.ksm.AclSynchronizer
import com.security.kafka.pb.ksm.KsmServiceGrpc
import io.grpc.protobuf.services.ProtoReflectionService
import io.grpc.{ Server, ServerBuilder }
import io.grpc.{Server, ServerBuilder}
import org.slf4j.LoggerFactory

import scala.concurrent.ExecutionContext

class KsmGrpcServer(
aclSynchronizer: AclSynchronizer,
port: Int,
enabled: Boolean) {
class KsmGrpcServer(aclSynchronizer: AclSynchronizer,
port: Int,
enabled: Boolean) {

val log = LoggerFactory.getLogger(KsmServiceGrpc.getClass)

Expand All @@ -20,9 +19,12 @@ class KsmGrpcServer(
def start(): Unit = {
if (enabled) {
log.info("Starting gRPC Server")
server = ServerBuilder.forPort(port)
server = ServerBuilder
.forPort(port)
.addService(ProtoReflectionService.newInstance())
.addService(KsmServiceGrpc.bindService(new KsmServiceImpl(aclSynchronizer), ExecutionContext.global))
.addService(
KsmServiceGrpc.bindService(new KsmServiceImpl(aclSynchronizer),
ExecutionContext.global))
.build()
server.start()
log.info("gRPC Server Started")
Expand Down
24 changes: 14 additions & 10 deletions src/main/scala/com/github/simplesteph/ksm/grpc/KsmServiceImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,28 @@ package com.github.simplesteph.ksm.grpc

import com.github.simplesteph.ksm.AclSynchronizer
import com.github.simplesteph.ksm.utils.ProtoConversionUtils
import com.security.kafka.pb.ksm.{ GetAllAclsRequest, GetAllAclsResponse, ResourceAndAclPb }
import com.security.kafka.pb.ksm.{
GetAllAclsRequest,
GetAllAclsResponse,
ResourceAndAclPb
}
import com.security.kafka.pb.ksm.KsmServiceGrpc.KsmService
import kafka.security.auth.{ Acl, Resource }
import kafka.security.auth.{Acl, Resource}

import scala.concurrent.Future

class KsmServiceImpl(aclSynchronizer: AclSynchronizer) extends KsmService {

override def getAllAcls(request: GetAllAclsRequest): Future[GetAllAclsResponse] = {
override def getAllAcls(
request: GetAllAclsRequest): Future[GetAllAclsResponse] = {
val aclsAndResources: Set[(Resource, Acl)] = aclSynchronizer.getKafkaAcls

val response = GetAllAclsResponse(
resourceAndAcls = aclsAndResources.map {
case (resource: Resource, acl: Acl) =>
ResourceAndAclPb(
resource = Some(ProtoConversionUtils.resourceToPb(resource)),
acl = Some(ProtoConversionUtils.aclToPb(acl)))
}.toSeq)
val response = GetAllAclsResponse(resourceAndAcls = aclsAndResources.map {
case (resource: Resource, acl: Acl) =>
ResourceAndAclPb(resource =
Some(ProtoConversionUtils.resourceToPb(resource)),
acl = Some(ProtoConversionUtils.aclToPb(acl)))
}.toSeq)

Future.successful(response)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
package com.github.simplesteph.ksm.notification
import com.github.simplesteph.ksm.parser.CsvParserException
import com.typesafe.config.Config
import kafka.security.auth.{ Acl, Resource }
import org.slf4j.{ Logger, LoggerFactory }
import kafka.security.auth.{Acl, Resource}
import org.slf4j.{Logger, LoggerFactory}

import scala.util.Try

case class ConsoleNotification() extends Notification {

val log: Logger = LoggerFactory.getLogger(classOf[ConsoleNotification].getSimpleName)
val log: Logger =
LoggerFactory.getLogger(classOf[ConsoleNotification].getSimpleName)

/**
* Config Prefix for configuring this module
*/
* Config Prefix for configuring this module
*/
override val CONFIG_PREFIX: String = "console"

/**
* internal config definition for the module
*/
* internal config definition for the module
*/
override def configure(config: Config): Unit = ()

override def notifyErrors(errs: List[Try[Throwable]]): Unit = {
NotificationUtils.errorsToString(errs).foreach(println)
}

override protected def notifyOne(action: String, acls: Set[(Resource, Acl)]): Unit = {
override protected def notifyOne(action: String,
acls: Set[(Resource, Acl)]): Unit = {
if (acls.nonEmpty) {
acls.foreach {
case (resource, acl) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,49 @@
package com.github.simplesteph.ksm.notification

import com.typesafe.config.Config
import kafka.security.auth.{ Acl, Resource }
import kafka.security.auth.{Acl, Resource}

import scala.util.Try

trait Notification {

/**
* Config Prefix for configuring this module
*/
* Config Prefix for configuring this module
*/
val CONFIG_PREFIX: String

/**
* internal config definition for the module
*/

* internal config definition for the module
*/
def configure(config: Config)

/**
* Function to be called by external accessors, but should not be implemented by subclasses
* @param added ACLs that have been added
* @param removed ACLs that have been removed
*/
def notifySuccess(added: Set[(Resource, Acl)], removed: Set[(Resource, Acl)]): Unit = {
* Function to be called by external accessors, but should not be implemented by subclasses
* @param added ACLs that have been added
* @param removed ACLs that have been removed
*/
def notifySuccess(added: Set[(Resource, Acl)],
removed: Set[(Resource, Acl)]): Unit = {
notifyOne("ADDED", added)
notifyOne("REMOVED", removed)
}

/**
*
* @param action ADDED or REMOVED
* @param acls List of corresponding ACLs
*/
*
* @param action ADDED or REMOVED
* @param acls List of corresponding ACLs
*/
protected def notifyOne(action: String, acls: Set[(Resource, Acl)]): Unit

/**
* Notification logic in case of errors
* @param errs list of errors
*/
* Notification logic in case of errors
* @param errs list of errors
*/
def notifyErrors(errs: List[Try[Throwable]]): Unit

/**
* Closing any outstanding objects owned by this notification
*/
* Closing any outstanding objects owned by this notification
*/
def close(): Unit

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ object NotificationUtils {
def errorsToString(errs: List[Try[Throwable]]): List[String] = {
errs.map(e =>
e.get match {
case cPE: CsvParserException => s"${cPE.getLocalizedMessage} | Row: ${cPE.printRow()}"
case cPE: CsvParserException =>
s"${cPE.getLocalizedMessage} | Row: ${cPE.printRow()}"
case _ => s"error while parsing ACL source: ${e.get}"
})
})
}

}
Loading

0 comments on commit 64e0611

Please sign in to comment.