Skip to content

Commit

Permalink
Optimize produce request handler (apache#466)
Browse files Browse the repository at this point in the history
### Motivation

There're some problems with produce performance.

The main problem is the pending produce queue. It holds multiple pending produce requests (`PendingProduce`), which wait until `PersistentTopic` is ready and the `MemoryRecords` is encoded. First, `PendingProduce`s wait for different futures of `PersistentTopic`. Second, encoding `MemoryRecords` is fast so that putting it to another thread is not necessary and could cause some performance overhead.

### Modifications

1. Encoding `MemoryRecords` in the same thread of `handleProduceRequest`.
2. Check if the `CompletableFuture<PersistentTopic>` is done.
   - If it's done, just publish the messages directly without pushing the pending produce requests to the queue.
   - Otherwise, reuse the previous `CompletableFuture<PersistentTopic>`. This trick is performed by `PendingTopicFutures`, which uses the previous `CompletableFuture<PersistentTopic>` by `thenApply` or `exceptionally`.
3. Add tests for `PendingTopicFutures`.
4. Use a map of partition and response instead of a map of partition and response future in `handleProduceRequest`.
  • Loading branch information
BewareMyPower authored May 7, 2021
1 parent 3a31730 commit 808b969
Show file tree
Hide file tree
Showing 6 changed files with 372 additions and 269 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.naming.AuthenticationException;
Expand All @@ -76,9 +78,7 @@
import org.apache.commons.lang3.NotImplementedException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
Expand All @@ -88,7 +88,6 @@
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
Expand Down Expand Up @@ -148,6 +147,7 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.api.proto.MarkerType;
Expand Down Expand Up @@ -193,8 +193,10 @@ public class KafkaRequestHandler extends KafkaCommandDecoder {
@Getter
private final EntryFormatter entryFormatter;

private final Map<TopicPartition, PendingProduceQueue> pendingProduceQueueMap = new ConcurrentHashMap<>();
private final Set<String> groupIds = new HashSet<>();
// key is the topic(partition), value is the future that indicates whether the PersistentTopic instance of the key
// is found.
private final Map<TopicPartition, PendingTopicFutures> pendingTopicFuturesMap = new ConcurrentHashMap<>();

public KafkaRequestHandler(PulsarService pulsarService,
KafkaServiceConfiguration kafkaConfig,
Expand Down Expand Up @@ -627,84 +629,144 @@ protected void handleTopicMetadataRequest(KafkaHeaderAndRequest metadataHar,
});
}

private void publishMessages(final PersistentTopic persistentTopic,
final ByteBuf byteBuf,
final int numMessages,
final MemoryRecords records,
final String partitionName,
final Consumer<Long> offsetConsumer,
final Consumer<Errors> errorsConsumer) {
if (persistentTopic == null) {
// It will trigger a retry send of Kafka client
errorsConsumer.accept(Errors.NOT_LEADER_FOR_PARTITION);
return;
}
if (persistentTopic.isSystemTopic()) {
log.error("Not support producing message to system topic: {}", persistentTopic);
errorsConsumer.accept(Errors.INVALID_TOPIC_EXCEPTION);
return;
}
topicManager.registerProducerInPersistentTopic(partitionName, persistentTopic);
// collect metrics
final Producer producer = KafkaTopicManager.getReferenceProducer(partitionName);
producer.updateRates(numMessages, byteBuf.readableBytes());
producer.getTopic().incrementPublishCount(numMessages, byteBuf.readableBytes());
// publish
final CompletableFuture<Long> offsetFuture = new CompletableFuture<>();
final long beforePublish = MathUtils.nowInNano();
persistentTopic.publishMessage(byteBuf,
MessagePublishContext.get(offsetFuture, persistentTopic, numMessages, System.nanoTime()));
byteBuf.release();
final RecordBatch batch = records.batchIterator().next();
offsetFuture.whenComplete((offset, e) -> {
if (e == null) {
if (batch.isTransactional()) {
transactionCoordinator.addActivePidOffset(TopicName.get(partitionName), batch.producerId(), offset);
}
requestStats.getMessagePublishStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(beforePublish), TimeUnit.NANOSECONDS);
offsetConsumer.accept(offset);
} else {
log.error("publishMessages for topic partition: {} failed when write.", partitionName, e);
requestStats.getMessagePublishStats().registerFailedEvent(
MathUtils.elapsedNanos(beforePublish), TimeUnit.NANOSECONDS);
errorsConsumer.accept(Errors.KAFKA_STORAGE_ERROR);
}
});
}

protected void handleProduceRequest(KafkaHeaderAndRequest produceHar,
CompletableFuture<AbstractResponse> resultFuture) {
final long startProduceNanos = MathUtils.nowInNano();

checkArgument(produceHar.getRequest() instanceof ProduceRequest);
ProduceRequest produceRequest = (ProduceRequest) produceHar.getRequest();
if (produceRequest.transactionalId() != null) {
// TODO auth check
}

Map<TopicPartition, CompletableFuture<PartitionResponse>> responsesFutures = new HashMap<>();

final int responsesSize = produceRequest.partitionRecordsOrFail().size();
final int numPartitions = produceRequest.partitionRecordsOrFail().size();

final long dataSizePerPartition = produceHar.getBuffer().readableBytes();
topicManager.getInternalServerCnx().increasePublishBuffer(dataSizePerPartition);

// TODO: handle un-exist topic:
// nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
for (Map.Entry<TopicPartition, ? extends Records> entry : produceRequest.partitionRecordsOrFail().entrySet()) {
TopicPartition topicPartition = entry.getKey();
try {
String fullPartitionName = KopTopic.toString(topicPartition);
if (isOffsetTopic(fullPartitionName) || isTransactionTopic(fullPartitionName)) {
log.error("[{}] Request {}: not support produce message to inner topic. topic: {}",
final Map<TopicPartition, PartitionResponse> responseMap = new ConcurrentHashMap<>();
final CompletableFuture<Void> produceFuture = new CompletableFuture<>();
BiConsumer<TopicPartition, PartitionResponse> addPartitionResponse = (topicPartition, response) -> {
responseMap.put(topicPartition, response);
if (responseMap.size() == numPartitions) {
produceFuture.complete(null);
}
};

produceRequest.partitionRecordsOrFail().forEach((topicPartition, records) -> {
final Consumer<Long> offsetConsumer = offset -> addPartitionResponse.accept(
topicPartition, new PartitionResponse(Errors.NONE, offset, -1L, -1L));
final Consumer<Errors> errorsConsumer =
errors -> addPartitionResponse.accept(topicPartition, new PartitionResponse(errors));
final Consumer<Throwable> exceptionConsumer =
e -> addPartitionResponse.accept(topicPartition, new PartitionResponse(Errors.forException(e)));

final String fullPartitionName = KopTopic.toString(topicPartition);
if (isOffsetTopic(fullPartitionName) || isTransactionTopic(fullPartitionName)) {
log.error("[{}] Request {}: not support produce message to inner topic. topic: {}",
ctx.channel(), produceHar.getHeader(), topicPartition);
throw new InvalidTopicException(Errors.INVALID_TOPIC_EXCEPTION.message());
}

MemoryRecords validRecords = validateRecords(produceHar.getHeader().apiVersion(),
topicPartition, (MemoryRecords) entry.getValue());
errorsConsumer.accept(Errors.INVALID_TOPIC_EXCEPTION);
return;
}

CompletableFuture<PartitionResponse> partitionResponse = new CompletableFuture<>();
responsesFutures.put(topicPartition, partitionResponse);
try {
final long beforeRecordsProcess = MathUtils.nowInNano();
final MemoryRecords validRecords =
validateRecords(produceHar.getHeader().apiVersion(), topicPartition, records);
final int numMessages = EntryFormatter.parseNumMessages(validRecords);
final ByteBuf byteBuf = entryFormatter.encode(validRecords, numMessages);
requestStats.getProduceEncodeStats().registerSuccessfulEvent(
MathUtils.elapsedNanos(beforeRecordsProcess), TimeUnit.NANOSECONDS);

if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Produce messages for topic {} partition {}, request size: {} ",
ctx.channel(), produceHar.getHeader(),
topicPartition.topic(), topicPartition.partition(), responsesSize);
ctx.channel(), produceHar.getHeader(),
topicPartition.topic(), topicPartition.partition(), numPartitions);
}

PendingProduce pendingProduce = new PendingProduce(partitionResponse, topicManager, fullPartitionName,
entryFormatter, validRecords, executor, transactionCoordinator, requestStats);
PendingProduceQueue queue =
pendingProduceQueueMap.computeIfAbsent(topicPartition, ignored -> new PendingProduceQueue());
queue.add(pendingProduce);
pendingProduce.whenComplete(queue::sendCompletedProduces);
} catch (ApiException e) {
responsesFutures.put(topicPartition,
CompletableFuture.completedFuture(new PartitionResponse(Errors.forException(e))));
}
}

CompletableFuture.allOf(responsesFutures.values().toArray(new CompletableFuture<?>[responsesSize]))
.whenComplete((ignore, ex) -> {
topicManager.getInternalServerCnx().decreasePublishBuffer(dataSizePerPartition);
// all ex has translated to PartitionResponse with Errors.KAFKA_STORAGE_ERROR
Map<TopicPartition, PartitionResponse> responses = new ConcurrentHashMap<>();
for (Map.Entry<TopicPartition, CompletableFuture<PartitionResponse>> entry :
responsesFutures.entrySet()) {
responses.put(entry.getKey(), entry.getValue().join());
}

if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Complete handle produce.",
ctx.channel(), produceHar.toString());
}
final CompletableFuture<PersistentTopic> topicFuture = topicManager.getTopic(fullPartitionName);
if (topicFuture.isCompletedExceptionally()) {
topicFuture.exceptionally(e -> {
exceptionConsumer.accept(e);
return null;
});
return;
}
if (topicFuture.isDone() && topicFuture.getNow(null) == null) {
errorsConsumer.accept(Errors.NOT_LEADER_FOR_PARTITION);
return;
}

if (ex != null) {
requestStats.getHandleProduceRequestStats()
.registerFailedEvent(MathUtils.elapsedNanos(startProduceNanos), TimeUnit.NANOSECONDS);
} else {
requestStats.getHandleProduceRequestStats()
.registerSuccessfulEvent(MathUtils.elapsedNanos(startProduceNanos), TimeUnit.NANOSECONDS);
}
final Consumer<PersistentTopic> persistentTopicConsumer = persistentTopic ->
publishMessages(persistentTopic, byteBuf, numMessages, validRecords, fullPartitionName,
offsetConsumer, errorsConsumer);
if (topicFuture.isDone()) {
persistentTopicConsumer.accept(topicFuture.getNow(null));
} else {
// topic is not available now
pendingTopicFuturesMap
.computeIfAbsent(topicPartition, ignored -> new PendingTopicFutures(requestStats))
.addListener(topicFuture, persistentTopicConsumer, exceptionConsumer);
}
} catch (Exception e) {
log.error("[{}] Failed to handle produce request for {}", ctx.channel(), topicPartition, e);
exceptionConsumer.accept(e);
}
});

resultFuture.complete(new ProduceResponse(responses));
});
produceFuture.thenApply(ignored -> {
topicManager.getInternalServerCnx().decreasePublishBuffer(dataSizePerPartition);
if (log.isDebugEnabled()) {
log.debug("[{}] Request {}: Complete handle produce.", ctx.channel(), produceHar.toString());
}
requestStats.getHandleProduceRequestStats()
.registerFailedEvent(MathUtils.elapsedNanos(startProduceNanos), TimeUnit.NANOSECONDS);
resultFuture.complete(new ProduceResponse(responseMap));
return null;
});
}

protected void handleFindCoordinatorRequest(KafkaHeaderAndRequest findCoordinator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -244,8 +245,14 @@ public CompletableFuture<PersistentTopic> getTopic(String topicName) {
brokerService.getTopic(topicName, brokerService.isAllowAutoTopicCreation(topicName))
.whenComplete((t2, throwable) -> {
if (throwable != null) {
log.error("[{}] Failed to getTopic {}. exception:",
requestHandler.ctx.channel(), topicName, throwable);
// The ServiceUnitNotReadyException is retriable so we should print a warning log instead of error log
if (throwable instanceof BrokerServiceException.ServiceUnitNotReadyException) {
log.warn("[{}] Failed to getTopic {}: {}",
requestHandler.ctx.channel(), topicName, throwable.getMessage());
} else {
log.error("[{}] Failed to getTopic {}. exception:",
requestHandler.ctx.channel(), topicName, throwable);
}
// failed to getTopic from current broker, remove cache, which added in getTopicBroker.
removeTopicManagerCache(topicName);
topicCompletableFuture.complete(null);
Expand Down
Loading

0 comments on commit 808b969

Please sign in to comment.