Skip to content

Commit

Permalink
Added asynchronous rd_kafka_consumer_close_queue() and .._consumer_cl…
Browse files Browse the repository at this point in the history
…osed()

This is mainly for the Go client, but can be used by applications as well.
  • Loading branch information
edenhill committed Jun 7, 2022
1 parent d50099f commit b47da0e
Show file tree
Hide file tree
Showing 13 changed files with 366 additions and 63 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ librdkafka v1.9.0 is a feature release:
can now be triggered automatically on the librdkafka background thread.
* `rd_kafka_queue_get_background()` now creates the background thread
if not already created.
* Added `rd_kafka_consumer_close_queue()` and `rd_kafka_consumer_closed()`.
This allow applications and language bindings to implement asynchronous
consumer close.
* Bundled zlib upgraded to version 1.2.12.
* Bundled OpenSSL upgraded to 1.1.1n.
* Added `test.mock.broker.rtt` to simulate RTT/latency for mock brokers.
Expand Down
11 changes: 11 additions & 0 deletions src-cpp/KafkaConsumerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,17 @@ RdKafka::ErrorCode RdKafka::KafkaConsumerImpl::close() {
}


RdKafka::Error *RdKafka::KafkaConsumerImpl::close(Queue *queue) {
QueueImpl *queueimpl = dynamic_cast<QueueImpl *>(queue);
rd_kafka_error_t *c_error;

c_error = rd_kafka_consumer_close_queue(rk_, queueimpl->queue_);
if (c_error)
return new ErrorImpl(c_error);

return NULL;
}


RdKafka::ConsumerGroupMetadata::~ConsumerGroupMetadata() {
}
36 changes: 31 additions & 5 deletions src-cpp/rdkafkacpp.h
Original file line number Diff line number Diff line change
Expand Up @@ -2790,13 +2790,13 @@ class RD_EXPORT KafkaConsumer : public virtual Handle {


/**
* @brief Close and shut down the proper.
* @brief Close and shut down the consumer.
*
* This call will block until the following operations are finished:
* - Trigger a local rebalance to void the current assignment
* - Stop consumption for current assignment
* - Commit offsets
* - Leave group
* - Trigger a local rebalance to void the current assignment (if any).
* - Stop consumption for current assignment (if any).
* - Commit offsets (if any).
* - Leave group (if applicable).
*
* The maximum blocking time is roughly limited to session.timeout.ms.
*
Expand Down Expand Up @@ -2931,6 +2931,32 @@ class RD_EXPORT KafkaConsumer : public virtual Handle {
*/
virtual Error *incremental_unassign(
const std::vector<TopicPartition *> &partitions) = 0;

/**
* @brief Close and shut down the consumer.
*
* Performs the same actions as RdKafka::KafkaConsumer::close() but in a
* background thread.
*
* Rebalance events/callbacks (etc) will be forwarded to the
* application-provided \p queue. The application must poll this queue until
* RdKafka::KafkaConsumer::closed() returns true.
*
* @remark Depending on consumer group join state there may or may not be
* rebalance events emitted on \p rkqu.
*
* @returns an error object if the consumer close failed, else NULL.
*
* @sa RdKafka::KafkaConsumer::closed()
*/
virtual Error *close(Queue *queue) = 0;


/** @returns true if the consumer is closed, else 0.
*
* @sa RdKafka::KafkaConsumer::close()
*/
virtual bool closed() = 0;
};


Expand Down
6 changes: 6 additions & 0 deletions src-cpp/rdkafkacpp_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,12 @@ class KafkaConsumerImpl : virtual public KafkaConsumer,

ErrorCode close();

Error *close(Queue *queue);

bool closed() {
return rd_kafka_consumer_closed(rk_) ? true : false;
};

ErrorCode seek(const TopicPartition &partition, int timeout_ms);

ErrorCode offsets_store(std::vector<TopicPartition *> &offsets) {
Expand Down
115 changes: 99 additions & 16 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "rdkafka_assignor.h"
#include "rdkafka_request.h"
#include "rdkafka_event.h"
#include "rdkafka_error.h"
#include "rdkafka_sasl.h"
#include "rdkafka_interceptor.h"
#include "rdkafka_idempotence.h"
Expand Down Expand Up @@ -879,6 +880,26 @@ int rd_kafka_set_fatal_error0(rd_kafka_t *rk,
}


/**
* @returns a copy of the current fatal error, if any, else NULL.
*
* @locks_acquired rd_kafka_rdlock(rk)
*/
rd_kafka_error_t *rd_kafka_get_fatal_error(rd_kafka_t *rk) {
rd_kafka_error_t *error;
rd_kafka_resp_err_t err;

if (!(err = rd_atomic32_get(&rk->rk_fatal.err)))
return NULL; /* No fatal error raised */

rd_kafka_rdlock(rk);
error = rd_kafka_error_new_fatal(err, "%s", rk->rk_fatal.errstr);
rd_kafka_rdunlock(rk);

return error;
}


rd_kafka_resp_err_t rd_kafka_test_fatal_error(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
const char *reason) {
Expand Down Expand Up @@ -3181,33 +3202,79 @@ rd_kafka_message_t *rd_kafka_consumer_poll(rd_kafka_t *rk, int timeout_ms) {
}


rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t *rk) {
/**
* @brief Consumer close.
*
* @param rkq The consumer group queue will be forwarded to this queue, which
* which must be served (rebalance events) by the application/caller
* until rd_kafka_consumer_closed() returns true.
* If the consumer is not in a joined state, no rebalance events
* will be emitted.
*/
static rd_kafka_error_t *rd_kafka_consumer_close_q(rd_kafka_t *rk,
rd_kafka_q_t *rkq) {
rd_kafka_cgrp_t *rkcg;
rd_kafka_op_t *rko;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
rd_kafka_q_t *rkq;
rd_kafka_error_t *error = NULL;

if (!(rkcg = rd_kafka_cgrp_get(rk)))
return RD_KAFKA_RESP_ERR__UNKNOWN_GROUP;
return rd_kafka_error_new(RD_KAFKA_RESP_ERR__UNKNOWN_GROUP,
"Consume close called on non-group "
"consumer");

if (rd_atomic32_get(&rkcg->rkcg_terminated))
return rd_kafka_error_new(RD_KAFKA_RESP_ERR__DESTROY,
"Consumer already closed");

/* If a fatal error has been raised and this is an
* explicit consumer_close() from the application we return
* a fatal error. Otherwise let the "silent" no_consumer_close
* logic be performed to clean up properly. */
if (rd_kafka_fatal_error_code(rk) &&
!rd_kafka_destroy_flags_no_consumer_close(rk))
return RD_KAFKA_RESP_ERR__FATAL;
if (!rd_kafka_destroy_flags_no_consumer_close(rk) &&
(error = rd_kafka_get_fatal_error(rk)))
return error;

rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Closing consumer");
rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "CLOSE",
"Closing consumer");

/* Redirect cgrp queue to our temporary queue to make sure
* all posted ops (e.g., rebalance callbacks) are served by
* this function. */
rkq = rd_kafka_q_new(rk);
/* Redirect cgrp queue to the rebalance queue to make sure all posted
* ops (e.g., rebalance callbacks) are served by
* the application/caller. */
rd_kafka_q_fwd_set(rkcg->rkcg_q, rkq);

/* Tell cgrp subsystem to terminate. A TERMINATE op will be posted
* on the rkq when done. */
rd_kafka_cgrp_terminate(rkcg, RD_KAFKA_REPLYQ(rkq, 0)); /* async */

return error;
}

rd_kafka_error_t *rd_kafka_consumer_close_queue(rd_kafka_t *rk,
rd_kafka_queue_t *rkqu) {
if (!rkqu)
return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG,
"Queue must be specified");
return rd_kafka_consumer_close_q(rk, rkqu->rkqu_q);
}

rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t *rk) {
rd_kafka_error_t *error;
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__TIMED_OUT;
rd_kafka_q_t *rkq;

/* Create a temporary reply queue to handle the TERMINATE reply op. */
rkq = rd_kafka_q_new(rk);

/* Initiate the close (async) */
error = rd_kafka_consumer_close_q(rk, rkq);
if (error) {
err = rd_kafka_error_is_fatal(error)
? RD_KAFKA_RESP_ERR__FATAL
: rd_kafka_error_code(error);
rd_kafka_error_destroy(error);
rd_kafka_q_destroy_owner(rkq);
return err;
}

/* Disable the queue if termination is immediate or the user
* does not want the blocking consumer_close() behaviour, this will
* cause any ops posted for this queue (such as rebalance) to
Expand All @@ -3217,10 +3284,12 @@ rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t *rk) {
rd_kafka_dbg(rk, CONSUMER, "CLOSE",
"Disabling and purging temporary queue to quench "
"close events");
err = RD_KAFKA_RESP_ERR_NO_ERROR;
rd_kafka_q_disable(rkq);
/* Purge ops already enqueued */
rd_kafka_q_purge(rkq);
} else {
rd_kafka_op_t *rko;
rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Waiting for close events");
while ((rko = rd_kafka_q_pop(rkq, RD_POLL_INFINITE, 0))) {
rd_kafka_op_res_t res;
Expand All @@ -3230,6 +3299,7 @@ rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t *rk) {
rd_kafka_op_destroy(rko);
break;
}
/* Handle callbacks */
res = rd_kafka_poll_cb(rk, rkq, rko,
RD_KAFKA_Q_CB_RETURN, NULL);
if (res == RD_KAFKA_OP_RES_PASS)
Expand All @@ -3238,16 +3308,27 @@ rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t *rk) {
}
}

rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL);

rd_kafka_q_destroy_owner(rkq);

rd_kafka_dbg(rk, CONSUMER, "CLOSE", "Consumer closed");
if (err)
rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "CLOSE",
"Consumer closed with error: %s",
rd_kafka_err2str(err));
else
rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "CLOSE",
"Consumer closed");

return err;
}


int rd_kafka_consumer_closed(rd_kafka_t *rk) {
if (unlikely(!rk->rk_cgrp))
return 0;

return rd_atomic32_get(&rk->rk_cgrp->rkcg_terminated);
}


rd_kafka_resp_err_t
rd_kafka_committed(rd_kafka_t *rk,
Expand Down Expand Up @@ -3827,6 +3908,8 @@ rd_kafka_op_res_t rd_kafka_poll_cb(rd_kafka_t *rk,

case RD_KAFKA_OP_TERMINATE:
/* nop: just a wake-up */
res = RD_KAFKA_OP_RES_YIELD;
rd_kafka_op_destroy(rko);
break;

case RD_KAFKA_OP_CREATETOPICS:
Expand Down
44 changes: 39 additions & 5 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -3995,12 +3995,12 @@ RD_EXPORT
rd_kafka_message_t *rd_kafka_consumer_poll(rd_kafka_t *rk, int timeout_ms);

/**
* @brief Close down the KafkaConsumer.
* @brief Close the consumer.
*
* @remark This call will block until the consumer has revoked its assignment,
* calling the \c rebalance_cb if it is configured, committed offsets
* to broker, and left the consumer group.
* The maximum blocking time is roughly limited to session.timeout.ms.
* This call will block until the consumer has revoked its assignment,
* calling the \c rebalance_cb if it is configured, committed offsets
* to broker, and left the consumer group (if applicable).
* The maximum blocking time is roughly limited to session.timeout.ms.
*
* @returns An error code indicating if the consumer close was succesful
* or not.
Expand All @@ -4015,6 +4015,40 @@ RD_EXPORT
rd_kafka_resp_err_t rd_kafka_consumer_close(rd_kafka_t *rk);


/**
* @brief Asynchronously close the consumer.
*
* Performs the same actions as rd_kafka_consumer_close() but in a
* background thread.
*
* Rebalance events/callbacks (etc) will be forwarded to the
* application-provided \p rkqu. The application must poll/serve this queue
* until rd_kafka_consumer_closed() returns true.
*
* @remark Depending on consumer group join state there may or may not be
* rebalance events emitted on \p rkqu.
*
* @returns an error object if the consumer close failed, else NULL.
*
* @sa rd_kafka_consumer_closed()
*/
RD_EXPORT
rd_kafka_error_t *rd_kafka_consumer_close_queue(rd_kafka_t *rk,
rd_kafka_queue_t *rkqu);


/**
* @returns 1 if the consumer is closed, else 0.
*
* Should be used in conjunction with rd_kafka_consumer_close_queue() to know
* when the consumer has been closed.
*
* @sa rd_kafka_consumer_close_queue()
*/
RD_EXPORT
int rd_kafka_consumer_closed(rd_kafka_t *rk);


/**
* @brief Incrementally add \p partitions to the current assignment.
*
Expand Down
9 changes: 3 additions & 6 deletions src/rdkafka_background.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,11 @@ rd_kafka_background_queue_serve(rd_kafka_t *rk,
/*
* Handle non-event:able ops through the standard poll_cb that
* will trigger type-specific callbacks (and return OP_RES_HANDLED)
* or do no handling and return OP_RES_PASS
* or do no handling and return OP_RES_PASS.
* Also signal yield to q_serve() (which implies that op was handled).
*/
res = rd_kafka_poll_cb(rk, rkq, rko, RD_KAFKA_Q_CB_CALLBACK, opaque);
if (res == RD_KAFKA_OP_RES_HANDLED)
if (res == RD_KAFKA_OP_RES_HANDLED || res == RD_KAFKA_OP_RES_YIELD)
return res;

/* Op was not handled, log and destroy it. */
Expand All @@ -97,10 +98,6 @@ rd_kafka_background_queue_serve(rd_kafka_t *rk,
rd_kafka_op2str(rko->rko_type));
rd_kafka_op_destroy(rko);

/* Signal yield to q_serve() (implies that the op was handled). */
if (res == RD_KAFKA_OP_RES_YIELD)
return res;

/* Indicate that the op was handled. */
return RD_KAFKA_OP_RES_HANDLED;
}
Expand Down
Loading

0 comments on commit b47da0e

Please sign in to comment.