Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-74: Add fetch response size limit and implement round-robin on client side #1812

Closed
wants to merge 65 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
662d696
KAFKA-2063: Add possibility to bound fetch response size
Jul 29, 2016
2835570
LOGBROKER-2063: Change interfaces of FetchRequest/Response to Vector …
Sep 12, 2016
4c05398
KAFKA-2311; Make KafkaConsumer's ensureNotClosed method thread-safe
Tim-Brooks Sep 13, 2016
b621ba1
KAFKA-4158; Reset quota to default value if quota override is deleted
lindong28 Sep 14, 2016
f98fa67
MINOR: Give a name to the coordinator heartbeat thread
ijuma Sep 14, 2016
a6b3a8e
KAFKA-4162: Fixed typo "rebalance"
dchentech Sep 14, 2016
62a7394
KAFKA-4172; Ensure fetch responses contain the requested partitions
Sep 15, 2016
a2e3ee5
Refactor constructors for a.k.c.r.FetchRequest and introduce factory …
ijuma Sep 13, 2016
62a4b12
Add missing cases in `FetchRequest.getErrorResponse` and equality bug
ijuma Sep 13, 2016
9f93937
Fix reference to assignFromUser in javadoc
ijuma Sep 13, 2016
20129a0
Use `TopicPartition` instead of `TopicAndPartition` in *FetcherThread…
ijuma Sep 13, 2016
df15bb7
Implement round-robin for `Fetcher` and `*FetcherThread` classes
ijuma Sep 14, 2016
e114821
Flesh out tests in `RequestResponseTest`
ijuma Sep 14, 2016
60643b6
Flesh out config descriptions in `ConsumerConfig`
ijuma Sep 14, 2016
a2b8b1b
Various compatibility and ordering changes to fetch request/response …
ijuma Sep 15, 2016
d89bf00
Introduce `minOneMessage` parameter to `Log.read` with a `minOneMessage`
ijuma Sep 15, 2016
8572cdf
Use `Seq` instead of `Map` in `ReplicaManager` and improve handling o…
ijuma Sep 15, 2016
52402f2
Fix comment in `DEFAULT_RESPONSE_MAX_BYTES`
ijuma Sep 15, 2016
c0503f4
Mention broker configs for limiting message size
ijuma Sep 15, 2016
b38e096
Allow `message.max.bytes` to be smaller than `replica.fetch.max.bytes`
ijuma Sep 15, 2016
e8b0105
Add validator for `ReplicaFetchMaxBytesProp` and reduce priority to m…
ijuma Sep 15, 2016
7ecde21
Rename `groupByTopicOrdered` to `batchByTopic` to be consistent
ijuma Sep 15, 2016
ee686f5
Combine "max_bytes` key name constants
ijuma Sep 15, 2016
22c039c
Remove `final` from static method
ijuma Sep 15, 2016
387c4ac
Use `batchByTopic` in `FetchResponse` too
ijuma Sep 15, 2016
9b4c82a
Merge pull request #1 from ijuma/kafka-2063-bound-fetch-response-size
nepal Sep 15, 2016
d583988
Add tests to verify that we only return a large message if it's the f…
ijuma Sep 15, 2016
1ae52e5
Rename `partitionMap` to `fetchBuilder` in `AbstractFetcherThread`
ijuma Sep 15, 2016
cdc9ee3
Add test for `FetchBuilder` and document it
ijuma Sep 15, 2016
0146fb7
Remove `list` from `FetchBuilder` and change `map` to be a `LinkedHas…
ijuma Sep 15, 2016
e5ccb0e
Fix import checkstyle error
ijuma Sep 15, 2016
a262cae
Merge pull request #2 from ijuma/kafka-2063-bound-fetch-response-size
nepal Sep 15, 2016
7a5bbc8
Clean-ups in `FetchRequest`
ijuma Sep 15, 2016
1950fac
Remove `emptyResponse` boolean in `Fetcher` and add clarifying comment
ijuma Sep 15, 2016
3c9a4ef
Merge remote-tracking branch 'apache/trunk' into kafka-2063-bound-fet…
ijuma Sep 15, 2016
6447db1
Add `testReadWithMinMessage` to `LogTest`
ijuma Sep 15, 2016
9f6bbdb
Minor clean-ups in `searchOffsetWithSize` and `LogSegment.read`
ijuma Sep 15, 2016
28c6cb4
Replace usages of `TopicAndPartition` with `TopicPartition` in `*Fetc…
ijuma Sep 15, 2016
55481b4
Rename `FetchBuilder` to `PartitionStates`
ijuma Sep 15, 2016
f21b814
Fixed ConcurrentModificationException by returning a copy in `partiti…
ijuma Sep 16, 2016
abe5e4b
Merge remote-tracking branch 'apache/trunk' into kafka-2063-bound-fet…
ijuma Sep 16, 2016
fc67047
Improve logging in `ReplicaManager.readFromLocalLog`
ijuma Sep 16, 2016
9fa75e2
Improve behaviour when `maxSize` is non-zero but smaller than the mes…
ijuma Sep 16, 2016
965b73c
Add `testLowMaxFetchSizeForRequestAndPartition`
ijuma Sep 16, 2016
f5eb8d5
Update `Fetcher` not to try and detect `RecordTooLargeException`s
ijuma Sep 16, 2016
6bc8783
Merge pull request #3 from ijuma/kafka-2063-bound-fetch-response-size
nepal Sep 16, 2016
8c65352
Accept large messages from the first non-empty partition (instead of …
ijuma Sep 16, 2016
f61cdaa
Use `minOneMessage = true` when reading the log from `GroupMetadataMa…
ijuma Sep 16, 2016
5c14cbc
Remove unused import.
ijuma Sep 16, 2016
eef8fe4
Remove `warnIfMessageOversized` from `ReplicaFetcherThread`
ijuma Sep 16, 2016
4e7e604
Remove message size warnings from `TopicCommand` and `ConfigCommand`
ijuma Sep 16, 2016
d206a59
Add test to verify that oversized messages produced with ack=all succeed
ijuma Sep 16, 2016
8f09fae
Merge pull request #4 from ijuma/kafka-2063-bound-fetch-response-size
nepal Sep 16, 2016
03694d0
Introduce FetchRequestTest
ijuma Sep 16, 2016
7534e9e
Preserve behaviour of fetch request version 2 and add test to verify
ijuma Sep 17, 2016
bf50e65
Merge remote-tracking branch 'apache/trunk' into kafka-2063-bound-fet…
ijuma Sep 17, 2016
b75ee68
Merge pull request #5 from ijuma/kafka-2063-bound-fetch-response-size
nepal Sep 17, 2016
4b9326c
Merge remote-tracking branch 'apache/trunk' into kafka-2063-bound-fet…
ijuma Sep 17, 2016
6f1df09
Rename `messageSetIncomplete` to `firstMessageSetIncomplete`
ijuma Sep 17, 2016
31e1589
Group exception clauses since the result is the same
ijuma Sep 17, 2016
bf6b4db
Mention that the way we group topics and partitions deviates from the…
ijuma Sep 17, 2016
3ba807b
Restore replication warning due to oversized messages if fetch reques…
ijuma Sep 17, 2016
19ffdb7
Merge remote-tracking branch 'apache/trunk' into kafka-2063-bound-fet…
ijuma Sep 17, 2016
2edf038
Improve protocol description for fetch request version 3
ijuma Sep 17, 2016
3a790e7
Merge pull request #6 from ijuma/kafka-2063-bound-fetch-response-size
nepal Sep 18, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ public class ConsumerConfig extends AbstractConfig {
public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.";

/**
* <code>fetch.max.bytes</code>
*/
public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
private static final String FETCH_MAX_BYTES_DOC = "The maximum amount of data the server should return for a fetch request. " +
"This is not an absolute maximum, if the first message in the first non-empty partition of the fetch is larger than " +
"this value, the message will still be returned to ensure that the consumer can make progress. " +
"The maximum message size accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
"<code>max.message.bytes</code> (topic config). Note that the consumer performs multiple fetches in parallel.";
public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;

/**
* <code>fetch.max.wait.ms</code>
*/
Expand All @@ -125,7 +136,11 @@ public class ConsumerConfig extends AbstractConfig {
* <code>max.partition.fetch.bytes</code>
*/
public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be <code>#partitions * max.partition.fetch.bytes</code>. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.";
private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server " +
"will return. If the first message in the first non-empty partition of the fetch is larger than this limit, the " +
"message will still be returned to ensure that the consumer can make progress. The maximum message size " +
"accepted by the broker is defined via <code>message.max.bytes</code> (broker config) or " +
"<code>max.message.bytes</code> (topic config). See " + FETCH_MAX_BYTES_CONFIG + " for limiting the consumer request size";
public static final int DEFAULT_MAX_PARTITION_FETCH_BYTES = 1 * 1024 * 1024;

/** <code>send.buffer.bytes</code> */
Expand Down Expand Up @@ -265,6 +280,12 @@ public class ConsumerConfig extends AbstractConfig {
atLeast(0),
Importance.HIGH,
FETCH_MIN_BYTES_DOC)
.define(FETCH_MAX_BYTES_CONFIG,
Type.INT,
DEFAULT_FETCH_MAX_BYTES,
atLeast(0),
Importance.MEDIUM,
FETCH_MAX_BYTES_DOC)
.define(FETCH_MAX_WAIT_MS_CONFIG,
Type.INT,
500,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ private KafkaConsumer(ConsumerConfig config,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
this.fetcher = new Fetcher<>(this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
Expand Down Expand Up @@ -62,6 +61,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -79,6 +79,7 @@ public class Fetcher<K, V> {
private final ConsumerNetworkClient client;
private final Time time;
private final int minBytes;
private final int maxBytes;
private final int maxWaitMs;
private final int fetchSize;
private final long retryBackoffMs;
Expand All @@ -96,6 +97,7 @@ public class Fetcher<K, V> {

public Fetcher(ConsumerNetworkClient client,
int minBytes,
int maxBytes,
int maxWaitMs,
int fetchSize,
int maxPollRecords,
Expand All @@ -113,6 +115,7 @@ public Fetcher(ConsumerNetworkClient client,
this.metadata = metadata;
this.subscriptions = subscriptions;
this.minBytes = minBytes;
this.maxBytes = maxBytes;
this.maxWaitMs = maxWaitMs;
this.fetchSize = fetchSize;
this.maxPollRecords = maxPollRecords;
Expand Down Expand Up @@ -152,7 +155,7 @@ private boolean matchesRequestedPartitions(FetchRequest request, FetchResponse r
* an in-flight fetch or pending fetch data.
*/
public void sendFetches() {
for (Map.Entry<Node, FetchRequest> fetchEntry: createFetchRequests().entrySet()) {
for (Map.Entry<Node, FetchRequest> fetchEntry : createFetchRequests().entrySet()) {
final FetchRequest request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();

Expand Down Expand Up @@ -514,8 +517,8 @@ private void handleListOffsetResponse(TopicPartition topicPartition,
}
}

private Set<TopicPartition> fetchablePartitions() {
Set<TopicPartition> fetchable = subscriptions.fetchablePartitions();
private List<TopicPartition> fetchablePartitions() {
List<TopicPartition> fetchable = subscriptions.fetchablePartitions();
if (nextInLineRecords != null && !nextInLineRecords.isEmpty())
fetchable.remove(nextInLineRecords.partition);
for (CompletedFetch completedFetch : completedFetches)
Expand All @@ -530,16 +533,16 @@ private Set<TopicPartition> fetchablePartitions() {
private Map<Node, FetchRequest> createFetchRequests() {
// create the fetch info
Cluster cluster = metadata.fetch();
Map<Node, Map<TopicPartition, FetchRequest.PartitionData>> fetchable = new HashMap<>();
Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> fetchable = new LinkedHashMap<>();
for (TopicPartition partition : fetchablePartitions()) {
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
} else if (this.client.pendingRequestCount(node) == 0) {
// if there is a leader and no in-flight requests, issue a new fetch
Map<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
fetch = new HashMap<>();
fetch = new LinkedHashMap<>();
fetchable.put(node, fetch);
}

Expand All @@ -553,9 +556,9 @@ private Map<Node, FetchRequest> createFetchRequests() {

// create the fetches
Map<Node, FetchRequest> requests = new HashMap<>();
for (Map.Entry<Node, Map<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
Node node = entry.getKey();
FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, entry.getValue());
FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, this.maxBytes, entry.getValue());
requests.put(node, fetch);
}
return requests;
Expand Down Expand Up @@ -590,14 +593,11 @@ private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
ByteBuffer buffer = partition.recordSet;
MemoryRecords records = MemoryRecords.readableRecords(buffer);
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
boolean skippedRecords = false;
for (LogEntry logEntry : records) {
// Skip the messages earlier than current position.
if (logEntry.offset() >= position) {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
} else {
skippedRecords = true;
}
}

Expand All @@ -609,19 +609,6 @@ private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
} else if (buffer.limit() > 0 && !skippedRecords) {
// we did not read a single message from a non-empty buffer
// because that message's size is larger than fetch size, in this case
// record this exception
Map<TopicPartition, Long> recordTooLargePartitions = Collections.singletonMap(tp, fetchOffset);
throw new RecordTooLargeException("There are some messages at [Partition=Offset]: "
+ recordTooLargePartitions
+ " whose size is larger than the fetch size "
+ this.fetchSize
+ " and hence cannot be ever returned."
+ " Increase the fetch size on the client (using max.partition.fetch.bytes),"
+ " or decrease the maximum message size the broker will allow (using message.max.bytes).",
recordTooLargePartitions);
}
} else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code()
|| partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
Expand All @@ -648,6 +635,11 @@ private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
completedFetch.metricAggregator.record(tp, bytes, recordsCount);
}

// we move the partition to the end if we received some bytes or if there was an error. This way, it's more
// likely that partitions for the same topic can remain together (allowing for more efficient serialization).
if (bytes > 0 || partition.errorCode != Errors.NONE.code())
subscriptions.movePartitionToEnd(tp);

return parsedRecords;
}

Expand Down
Loading