-
Notifications
You must be signed in to change notification settings - Fork 115
Message Exchange Patterns 0.7.0 to 0.8.x
Once the KafkaConsumerActor is created with the required configuration, communication between the client/receiver actor and the consumer actor is naturally via Actor messages.
import cakesolutions.kafka.akka.KafkaConsumerActor.Subscribe
case class Subscribe(offsets: Option[Offsets] = None)
consumer ! Subscribe()
Sent to the Consumer Actor to initiate a subscription based on initial configuration. Initial offets may be provided to seek from a known point in each Topic+Partition that consumer is subscribed to. This allows the client to self manage commit offsets as described here in section "Manual Offset Control".
If no offsets are provided (i.e. Subscribe()), the consumer will seek (for each topic/partition) to the last committed offset as recorded by Kafka itself, and also based on the "auto.offset.reset" client property.
Once the client has sent the consumer a Subscribe message, it can assume that the subscription will be made and any messages consumer will be delivered to the provided ActorRef callback. Any failure to connect to Kafka or other exception will be propagated via the standard Actor Supervision mechanism.
import cakesolutions.kafka.akka.ConsumerRecords
case class ConsumerRecords(offsets: Offsets, records: ConsumerRecords[K, V])
The payload delivered to the client contains the offsets for the records sent and the Java client's ConsumerRecords
,
which contains a sequence of Records for each Topic Partition.
The ConsumerRecords
can be iterated and read as described in the Kafka Client docs.
The Offsets can be used when confirming the message to commit the offsets to Kafka.
import cakesolutions.kafka.akka.KafkaConsumerActor.Confirm
case class Confirm(offsets: Offsets, commit: Boolean = false)
consumer ! Confirm(offsets)
For each set of records received by the client, a corresponding Confirm(offsets)
message should be sent back to the consumer
to acknowledge the message. If the message is not confirmed within ("unconfirmed.timeout") it is redelivered (by default).
If commit is provided as true, they offsets are committed to Kafka. If commit is false, the records are removed from the Consumer Actor's buffer, but no commit to Kafka is made.
import cakesolutions.kafka.akka.KafkaConsumerActor.Unsubscribe
case object Unsubscribe
consumer ! Unsubscribe
The Consumer Actor clears its state and disconnects from Kafka.
Produced by Cake Solutions
Team Blog | Twitter @cakesolutions | Careers