Skip to content

Commit

Permalink
KAFKA-2063; Bound fetch response size (KIP-74)
Browse files Browse the repository at this point in the history
This PR is implementation of [KIP-74](https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes) which is originally motivated by [KAFKA-2063](https://issues.apache.org/jira/browse/KAFKA-2063).

Author: Andrey Neporada <neporada@gmail.com>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jun Rao <junrao@gmail.com>, Jiangjie Qin <becket.qin@gmail.com>, Jason Gustafson <jason@confluent.io>

Closes #1812 from nepal/kip-74
  • Loading branch information
nepal authored and Jason Gustafson committed Sep 18, 2016
1 parent 6fb25f0 commit d04b099
Show file tree
Hide file tree
Showing 48 changed files with 1,716 additions and 678 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ public class ConsumerConfig extends AbstractConfig {
public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.";

/**
* <code>fetch.max.bytes</code>
*/
public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
private static final String FETCH_MAX_BYTES_DOC = "The maximum amount of data the server should return for a fetch request. " +
"This is not an absolute maximum, if the first message in the first non-empty partition of the fetch is larger than " +
"this value, the message will still be returned to ensure that the consumer can make progress. " +
"The maximum message size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
"<code>max.message.bytes</code> (topic config). Note that the consumer performs multiple fetches in parallel.";
public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;

/**
* <code>fetch.max.wait.ms</code>
*/
Expand All @@ -125,7 +136,11 @@ public class ConsumerConfig extends AbstractConfig {
* <code>max.partition.fetch.bytes</code>
*/
public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be <code>#partitions * max.partition.fetch.bytes</code>. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.";
private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server " +
"will return. If the first message in the first non-empty partition of the fetch is larger than this limit, the " +
"message will still be returned to ensure that the consumer can make progress. The maximum message size " +
"accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
"<code>max.message.bytes</code> (topic config). See " + FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size";
public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 1024;

/** <code>send.buffer.bytes</code> */
Expand Down Expand Up @@ -265,6 +280,12 @@ public class ConsumerConfig extends AbstractConfig {
atLeast(0),
Importance.HIGH,
FETCH_MIN_BYTES_DOC)
.define(FETCH_MAX_BYTES_CONFIG,
Type.INT,
DEFAULT_FETCH_MAX_BYTES,
atLeast(0),
Importance.MEDIUM,
FETCH_MAX_BYTES_DOC)
.define(FETCH_MAX_WAIT_MS_CONFIG,
Type.INT,
500,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ private KafkaConsumer(ConsumerConfig config,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
this.fetcher = new Fetcher<>(this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
Expand Down Expand Up @@ -62,6 +61,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -79,6 +79,7 @@ public class Fetcher<K, V> {
private final ConsumerNetworkClient client;
private final Time time;
private final int minBytes;
private final int maxBytes;
private final int maxWaitMs;
private final int fetchSize;
private final long retryBackoffMs;
Expand All @@ -96,6 +97,7 @@ public class Fetcher<K, V> {

public Fetcher(ConsumerNetworkClient client,
int minBytes,
int maxBytes,
int maxWaitMs,
int fetchSize,
int maxPollRecords,
Expand All @@ -113,6 +115,7 @@ public Fetcher(ConsumerNetworkClient client,
this.metadata = metadata;
this.subscriptions = subscriptions;
this.minBytes = minBytes;
this.maxBytes = maxBytes;
this.maxWaitMs = maxWaitMs;
this.fetchSize = fetchSize;
this.maxPollRecords = maxPollRecords;
Expand Down Expand Up @@ -152,7 +155,7 @@ private boolean matchesRequestedPartitions(FetchRequest request, FetchResponse r
* an in-flight fetch or pending fetch data.
*/
public void sendFetches() {
for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
for (Map.Entry<Node, FetchRequest> fetchEntry : createFetchRequests().entrySet()) {
final FetchRequest request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();

Expand Down Expand Up @@ -514,8 +517,8 @@ private void handleListOffsetResponse(TopicPartition topicPartition,
}
}

private Set<TopicPartition> fetchablePartitions() {
Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
private List<TopicPartition> fetchablePartitions() {
List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
fetchable.remove(nextInLineRecords.partition);
for (CompletedFetch completedFetch : completedFetches)
Expand All @@ -530,16 +533,16 @@ private Set<TopicPartition> fetchablePartitions() {
private Map<Node, FetchRequest> createFetchRequests() {
// create the fetch info
Cluster cluster = metadata.fetch();
Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> fetchable = new LinkedHashMap<>();
for (TopicPartition partition : fetchablePartitions()) {
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
} else if (this.client.pendingRequestCount(node) == 0) {
// if there is a leader and no in-flight requests, issue a new fetch
Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
fetch = new HashMap<>();
fetch = new LinkedHashMap<>();
fetchable.put(node, fetch);
}

Expand All @@ -553,9 +556,9 @@ private Map<Node, FetchRequest> createFetchRequests() {

// create the fetches
Map<Node, FetchRequest> requests = new HashMap<>();
for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
Node node = entry.getKey();
FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, this.maxBytes, entry.getValue());
requests.put(node, fetch);
}
return requests;
Expand Down Expand Up @@ -590,14 +593,11 @@ private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
ByteBuffer buffer = partition.recordSet;
MemoryRecords records = MemoryRecords.readableRecords(buffer);
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
boolean skippedRecords = false;
for (LogEntry logEntry : records) {
// Skip the messages earlier than current position.
if (logEntry.offset() >= position) {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
} else {
skippedRecords = true;
}
}

Expand All @@ -609,19 +609,6 @@ private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
} else if (buffer.limit() > 0 && !skippedRecords) {
// we did not read a single message from a non-empty buffer
// because that message's size is larger than fetch size, in this case
// record this exception
Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
throw new RecordTooLargeException("There are some messages at [Partition=Offset]: "
+ recordTooLargePartitions
+ " whose size is larger than the fetch size "
+ this.fetchSize
+ " and hence cannot be ever returned."
+ " Increase the fetch size on the client (using max.partition.fetch.bytes),"
+ " or decrease the maximum message size the broker will allow (using message.max.bytes).",
recordTooLargePartitions);
}
} else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
|| partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
Expand All @@ -648,6 +635,11 @@ private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
completedFetch.metricAggregator.record(tp, bytes, recordsCount);
}

// we move the partition to the end if we received some bytes or if there was an error. This way, it's more
// likely that partitions for the same topic can remain together (allowing for more efficient serialization).
if (bytes > 0 || partition.errorCode != Errors.NONE.code())
subscriptions.movePartitionToEnd(tp);

return parsedRecords;
}

Expand Down
Loading

0 comments on commit d04b099

Please sign in to comment.