Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra: rewrite on top of Java driver 4.5.0 #2182

Merged
merged 4 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ lazy val awslambda = alpakkaProject("awslambda",

lazy val azureStorageQueue = alpakkaProject("azure-storage-queue", "azure.storagequeue", Dependencies.AzureStorageQueue)

lazy val cassandra = alpakkaProject("cassandra", "cassandra", Dependencies.Cassandra)
lazy val cassandra = alpakkaProject("cassandra",
"cassandra",
Dependencies.Cassandra,
crossScalaVersions -= Dependencies.Scala211,
Test / parallelExecution := false)

lazy val couchbase =
alpakkaProject("couchbase",
Expand Down Expand Up @@ -339,6 +343,10 @@ lazy val docs = project
"extref.javaee-api.base_url" -> "https://docs.oracle.com/javaee/7/api/index.html?%s.html",
"extref.paho-api.base_url" -> "https://www.eclipse.org/paho/files/javadoc/index.html?%s.html",
"extref.slick.base_url" -> s"https://slick.lightbend.com/doc/${Dependencies.SlickVersion}/%s",
// Cassandra
"extref.cassandra.base_url" -> s"https://cassandra.apache.org/doc/${Dependencies.CassandraVersionInDocs}/%s",
"extref.cassandra-driver.base_url" -> s"https://docs.datastax.com/en/developer/java-driver/${Dependencies.CassandraDriverVersionInDocs}/%s",
"javadoc.com.datastax.oss.base_url" -> s"https://docs.datastax.com/en/drivers/java/${Dependencies.CassandraDriverVersionInDocs}/",
// Solr
"extref.solr.base_url" -> s"https://lucene.apache.org/solr/guide/${Dependencies.SolrVersionForDocs}/%s",
"javadoc.org.apache.solr.base_url" -> s"https://lucene.apache.org/solr/${Dependencies.SolrVersionForDocs}_0/solr-solrj/",
Expand Down
3 changes: 0 additions & 3 deletions cassandra/src/main/mima-filters/1.1.x.backwards.excludes

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ProblemFilters.exclude[Problem]("akka.stream.alpakka.cassandra.*")
22 changes: 22 additions & 0 deletions cassandra/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
alpakka.cassandra {
# The implementation of `akka.stream.alpakka.cassandra.CqlSessionProvider`
# used for creating the `CqlSession`.
# It may optionally have a constructor with an `ActorSystem` and `Config` parameters.
session-provider = "akka.stream.alpakka.cassandra.DefaultSessionProvider"

# Configure Akka Discovery by setting a service name
service-discovery {
name = ""
lookup-timeout = 1 s
}

# The ExecutionContext to use for the session tasks and future composition.
session-dispatcher = "akka.actor.default-dispatcher"

# Full config path to the Datastax Java driver's configuration section.
# When connecting to more than one Cassandra cluster different session configuration can be
# defined with this property.
# See https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/#quick-overview
# and https://docs.datastax.com/en/developer/java-driver/latest/manual/core/configuration/reference/
datastax-java-driver-config = "datastax-java-driver"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.cassandra

import akka.ConfigurationException
import akka.actor.ActorSystem
import akka.discovery.Discovery
import akka.util.JavaDurationConverters._
import com.datastax.oss.driver.api.core.CqlSession
import com.typesafe.config.{Config, ConfigFactory}

import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}

/**
* [[https://doc.akka.io/docs/akka/current/discovery/index.html Akka Discovery]]
* is enabled by setting the `service-discovery.name` in the given `CassandraSession` config.
*
* Akka Discovery overwrites the basic.contact-points` from the configuration with addresses
* provided by the configured Akka Discovery mechanism.
*
* Example using config-based Akka Discovery:
* {{{
* akka {
* discovery.method = config
* }
* akka.discovery.config.services = {
* cassandra-service = {
* endpoints = [
* {
* host = "127.0.0.1"
* port = 9042
* },
* {
* host = "127.0.0.2"
* port = 9042
* }
* ]
* }
* }
* alpakka.cassandra {
* service-discovery.name ="cassandra-service"
* }
* }}}
*
* Look up this `CassandraSession` with
* {{{
* CassandraSessionRegistry
* .get(system)
* .sessionFor(CassandraSessionSettings.create())
* }}}
*/
private[cassandra] object AkkaDiscoverySessionProvider {

def connect(system: ActorSystem, config: Config)(implicit ec: ExecutionContext): Future[CqlSession] = {
readNodes(config)(system, ec).flatMap { contactPoints =>
val driverConfigWithContactPoints = ConfigFactory.parseString(s"""
basic.contact-points = [${contactPoints.mkString("\"", "\", \"", "\"")}]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This threw me through a bit of a loop.

Suggested change
basic.contact-points = [${contactPoints.mkString("\"", "\", \"", "\"")}]
basic.contact-points = [${contactPoints.mkString(start = "\"", sep = "\", \"", end = "\"")}]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of loop? I find the compact notation better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cognitive loop ;) I couldn't figure out if I was looking at one string or 3 until I made the params explicit.

""").withFallback(CqlSessionProvider.driverConfig(system, config))
val driverConfigLoader = DriverConfigLoaderFromConfig.fromConfig(driverConfigWithContactPoints)
CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync().toScala
}
}

/**
* Expect a `service` section in Config and use Akka Discovery to read the addresses for `name` within `lookup-timeout`.
*/
private def readNodes(config: Config)(implicit system: ActorSystem,
ec: ExecutionContext): Future[immutable.Seq[String]] = {
val serviceConfig = config.getConfig("service-discovery")
val serviceName = serviceConfig.getString("name")
val lookupTimeout = serviceConfig.getDuration("lookup-timeout").asScala
readNodes(serviceName, lookupTimeout)
}

/**
* Use Akka Discovery to read the addresses for `serviceName` within `lookupTimeout`.
*/
private def readNodes(
serviceName: String,
lookupTimeout: FiniteDuration
)(implicit system: ActorSystem, ec: ExecutionContext): Future[immutable.Seq[String]] = {
Discovery(system).discovery.lookup(serviceName, lookupTimeout).map { resolved =>
resolved.addresses.map { target =>
target.host + ":" + target.port.getOrElse {
throw new ConfigurationException(
s"Akka Discovery for Cassandra service [$serviceName] must provide a port for [${target.host}]"
)
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.cassandra

import akka.actor._
import akka.annotation.InternalApi
import com.codahale.metrics.MetricRegistry

import scala.collection.JavaConverters._

/**
* Retrieves Cassandra metrics registry for an actor system
*/
class CassandraMetricsRegistry extends Extension {
private val metricRegistry = new MetricRegistry()

def getRegistry: MetricRegistry = metricRegistry

/**
* INTERNAL API
*/
@InternalApi private[akka] def addMetrics(category: String, registry: MetricRegistry): Unit =
metricRegistry.register(category, registry)

/**
* INTERNAL API
*/
@InternalApi private[akka] def removeMetrics(category: String): Unit =
metricRegistry.getNames.iterator.asScala.foreach { name =>
if (name.startsWith(category))
metricRegistry.remove(name)
}
}

object CassandraMetricsRegistry extends ExtensionId[CassandraMetricsRegistry] with ExtensionIdProvider {
override def lookup = CassandraMetricsRegistry
override def createExtension(system: ExtendedActorSystem) =
new CassandraMetricsRegistry
override def get(system: ActorSystem): CassandraMetricsRegistry =
super.get(system)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.cassandra

final class CassandraServerMetaData(val clusterName: String, val dataCenter: String, val version: String) {

val isVersion2: Boolean = version.startsWith("2.")

override def toString: String =
s"CassandraServerMetaData($clusterName,$dataCenter,$version)"

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.cassandra

import java.util.concurrent.CompletionStage

import akka.Done
import com.datastax.oss.driver.api.core.CqlSession
import scala.compat.java8.FunctionConverters._
import scala.compat.java8.FutureConverters._

import scala.concurrent.Future

class CassandraSessionSettings private (val configPath: String,
_metricsCategory: Option[String] = None,
val init: Option[CqlSession => Future[Done]] = None) {

def metricsCategory: String = _metricsCategory.getOrElse(configPath)

def withMetricCategory(value: String): CassandraSessionSettings =
copy(metricsCategory = Option(value))

/**
* Java API:
*
* The `init` function will be performed once when the session is created, i.e.
* if `CassandraSessionRegistry.sessionFor` is called from multiple places with different `init` it will
* only execute the first.
*/
def withInit(value: java.util.function.Function[CqlSession, CompletionStage[Done]]): CassandraSessionSettings =
copy(init = Some(value.asScala.andThen(_.toScala)))

/**
* The `init` function will be performed once when the session is created, i.e.
* if `CassandraSessionRegistry.sessionFor` is called from multiple places with different `init` it will
* only execute the first.
*/
def withInit(value: CqlSession => Future[Done]): CassandraSessionSettings = copy(init = Some(value))

private def copy(configPath: String = configPath,
metricsCategory: Option[String] = _metricsCategory,
init: Option[CqlSession => Future[Done]] = init) =
new CassandraSessionSettings(configPath, metricsCategory, init)

override def toString: String =
"CassandraSessionSettings(" +
s"configPath=$configPath," +
s"metricsCategory=$metricsCategory," +
s"init=$init)"
}

object CassandraSessionSettings {

val ConfigPath = "alpakka.cassandra"

def apply(): CassandraSessionSettings = apply(ConfigPath)

def apply(configPath: String, init: CqlSession => Future[Done]): CassandraSessionSettings =
new CassandraSessionSettings(configPath, init = Some(init))

def apply(configPath: String): CassandraSessionSettings = new CassandraSessionSettings(configPath)

/** Java API */
def create(): CassandraSessionSettings = apply(ConfigPath)

/** Java API */
def create(configPath: String): CassandraSessionSettings = apply(configPath)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.alpakka.cassandra

import akka.util.JavaDurationConverters._
import com.datastax.oss.driver.api.core.cql.BatchType

import scala.concurrent.duration.{FiniteDuration, _}

class CassandraWriteSettings private (val parallelism: Int,
val maxBatchSize: Int,
val maxBatchWait: FiniteDuration,
val batchType: BatchType) {
require(parallelism > 0, s"Invalid value for parallelism: $parallelism. It should be > 0.")
require(maxBatchSize > 0, s"Invalid value for maxBatchSize: $maxBatchSize. It should be > 0.")

/**
* WARNING: setting a write parallelism other than 1 will lead to out-of-order updates
*/
def withParallelism(value: Int): CassandraWriteSettings = copy(parallelism = value)

/**
* Batch size for `CassandraFlow.createUnloggedBatch`.
*/
def withMaxBatchSize(maxBatchSize: Int): CassandraWriteSettings =
copy(maxBatchSize = maxBatchSize)

/**
* Batch grouping time for `CassandraFlow.createUnloggedBatch`.
*/
def withMaxBatchWait(maxBatchWait: FiniteDuration): CassandraWriteSettings =
copy(maxBatchWait = maxBatchWait)

/**
* Java API: Batch grouping time for `CassandraFlow.createUnloggedBatch`.
*/
def withMaxBatchWait(maxBatchWait: java.time.Duration): CassandraWriteSettings =
copy(maxBatchWait = maxBatchWait.asScala)

def withBatchType(value: BatchType): CassandraWriteSettings =
copy(batchType = value)

private def copy(parallelism: Int = parallelism,
maxBatchSize: Int = maxBatchSize,
maxBatchWait: FiniteDuration = maxBatchWait,
batchType: BatchType = batchType) =
new CassandraWriteSettings(parallelism, maxBatchSize, maxBatchWait, batchType)

override def toString: String =
"CassandraWriteSettings(" +
s"parallelism=$parallelism," +
s"maxBatchSize=$maxBatchSize," +
s"maxBatchWait=$maxBatchWait," +
s"batchType=$batchType)"

}

object CassandraWriteSettings {
val defaults: CassandraWriteSettings = new CassandraWriteSettings(1, 100, 500.millis, BatchType.LOGGED)

def create(): CassandraWriteSettings = defaults
def apply(): CassandraWriteSettings = defaults
}
Loading