Skip to content

Commit

Permalink
[PIP 79][client] Add lazy-loading feature to PartitionedProducer (#10279
Browse files Browse the repository at this point in the history
)

* feat: add lazy loading feature to PartitionedProducerImpl

* feat: add partial round robin routing mode

* test: add tests for lazy-loading

* fix: fix producer closing code at lazy-loading

* test: remove unnecessary handling, fail from test codes

* feat: add enableLazyStartPartitionedProducers config

* test: fix test for lazy-loading config

* fix: address comments

* fix: add partition-change interceptor

* fix: address comments
  • Loading branch information
equanz authored Oct 15, 2021
1 parent 4498b8b commit 1f8945a
Show file tree
Hide file tree
Showing 8 changed files with 628 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import lombok.Cleanup;
import org.apache.pulsar.client.impl.ConsumerBase;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.api.proto.ServerError;
Expand Down Expand Up @@ -462,8 +463,100 @@ private void subscribeFailDoesNotFailOtherConsumer(String topic1, String topic2)
mockBrokerService.resetHandleSubscribe();
}

// if a producer fails to connect while creating partitioned producer, it should close all successful connections of
// other producers and fail
// failed to connect to partition at initialization step if a producer which connects to broker as lazy-loading mode
@Test
public void testPartitionedProducerFailOnInitialization() throws Throwable {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
final AtomicInteger producerCounter = new AtomicInteger(0);

mockBrokerService.setHandleProducer((ctx, producer) -> {
if (producerCounter.incrementAndGet() == 1) {
ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
return;
}
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
});

try {
client.newProducer()
.enableLazyStartPartitionedProducers(true)
.accessMode(ProducerAccessMode.Shared)
.topic("persistent://prop/use/ns/multi-part-t1").create();
fail("Should have failed with an authorization error");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException.AuthorizationException);
}
assertEquals(producerCounter.get(), 1);

mockBrokerService.resetHandleProducer();
mockBrokerService.resetHandleCloseProducer();
client.close();
}

// failed to connect to partition at sending step if a producer which connects to broker as lazy-loading mode
@Test
public void testPartitionedProducerFailOnSending() throws Throwable {
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
final AtomicInteger producerCounter = new AtomicInteger(0);
final AtomicInteger closeCounter = new AtomicInteger(0);
final String topicName = "persistent://prop/use/ns/multi-part-t1";

mockBrokerService.setHandleProducer((ctx, producer) -> {
if (producerCounter.incrementAndGet() == 2) {
ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
return;
}
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
});

mockBrokerService.setHandleSend((ctx, send, headersAndPayload) ->
ctx.writeAndFlush(Commands.newSendReceipt(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), 0L, 0L))
);

mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId()));
closeCounter.incrementAndGet();
});

final PartitionedProducerImpl<byte[]> producer = (PartitionedProducerImpl<byte[]>) client.newProducer()
.enableLazyStartPartitionedProducers(true)
.accessMode(ProducerAccessMode.Shared)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();

try {
producer.send("msg".getBytes());
fail("Should have failed with an not connected exception");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException.NotConnectedException);
assertEquals(producer.getProducers().size(), 1);
}

try {
// recreate failed producer
for (int i = 0; i < client.getPartitionsForTopic(topicName).get().size(); i++) {
producer.send("msg".getBytes());
}
assertEquals(producer.getProducers().size(), client.getPartitionsForTopic(topicName).get().size());
assertEquals(producerCounter.get(), 5);
} catch (Exception e) {
fail();
}

// should not call close
assertEquals(closeCounter.get(), 0);

mockBrokerService.resetHandleProducer();
mockBrokerService.resetHandleCloseProducer();
client.close();
}

// if a producer which doesn't connect as lazy-loading mode fails to connect while creating partitioned producer,
// it should close all successful connections of other producers and fail
@Test
public void testOneProducerFailShouldCloseAllProducersInPartitionedProducer() throws Exception {
@Cleanup
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-impl")
public class PartialPartitionedProducerTest extends ProducerConsumerBase {
@Override
@BeforeClass
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@Override
@AfterClass(alwaysRun = true)
public void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testPtWithSinglePartition() throws Throwable {
final String topic = BrokerTestUtil.newUniqueName("pt-with-single-routing");
admin.topics().createPartitionedTopic(topic, 10);

@Cleanup
final PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topic)
.enableLazyStartPartitionedProducers(true)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

for (int i = 0; i < 10; i++) {
producerImpl.newMessage().value("msg".getBytes()).send();
}
assertEquals(producerImpl.getProducers().size(), 1);
}

@Test
public void testPtWithPartialPartition() throws Throwable {
final String topic = BrokerTestUtil.newUniqueName("pt-with-partial-routing");
admin.topics().createPartitionedTopic(topic, 10);

@Cleanup
final PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topic)
.enableLazyStartPartitionedProducers(true)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter(new PartialRoundRobinMessageRouterImpl(3))
.create();

for (int i = 0; i < 10; i++) {
producerImpl.newMessage().value("msg".getBytes()).send();
}
assertEquals(producerImpl.getProducers().size(), 3);
}

// AddPartitionTest
@Test
public void testPtLazyLoading() throws Throwable {
final String topic = BrokerTestUtil.newUniqueName("pt-lazily");
admin.topics().createPartitionedTopic(topic, 10);

@Cleanup
final PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topic)
.enableLazyStartPartitionedProducers(true)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();

final Supplier<Boolean> send = () -> {
for (int i = 0; i < 10; i++) {
try {
producerImpl.newMessage().value("msg".getBytes()).send();
} catch (Throwable e) {
return false;
}
}
return true;
};

// create first producer at initialization step
assertEquals(producerImpl.getProducers().size(), 1);

assertTrue(send.get());
assertEquals(producerImpl.getProducers().size(), 10);
}

@Test
public void testPtLoadingNotSharedMode() throws Throwable {
final String topic = BrokerTestUtil.newUniqueName("pt-not-shared-mode");
admin.topics().createPartitionedTopic(topic, 10);

@Cleanup
final PartitionedProducerImpl<byte[]> producerImplExclusive = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topic)
.enableLazyStartPartitionedProducers(true)
.enableBatching(false)
.accessMode(ProducerAccessMode.Exclusive)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();

// create first producer at initialization step
assertEquals(producerImplExclusive.getProducers().size(), 10);

producerImplExclusive.close();

@Cleanup
final PartitionedProducerImpl<byte[]> producerImplWaitForExclusive = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topic)
.enableLazyStartPartitionedProducers(true)
.enableBatching(false)
.accessMode(ProducerAccessMode.WaitForExclusive)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();

assertEquals(producerImplWaitForExclusive.getProducers().size(), 10);
}

// AddPartitionAndLimitTest
@Test
public void testPtUpdateWithPartialPartition() throws Throwable {
final String topic = BrokerTestUtil.newUniqueName("pt-update-with-partial-routing");
admin.topics().createPartitionedTopic(topic, 2);

final Field field = PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
field.setAccessible(true);
@Cleanup
final PartitionedProducerImpl<byte[]> producerImpl = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topic)
.enableLazyStartPartitionedProducers(true)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter(new PartialRoundRobinMessageRouterImpl(3))
.accessMode(ProducerAccessMode.Shared)
.autoUpdatePartitions(true)
.autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
.create();

final Supplier<Boolean> send = ()-> {
for (int i = 0; i < 10; i++) {
try {
producerImpl.newMessage().value("msg".getBytes()).send();
} catch (Throwable e) {
return false;
}
}
return true;
};

// create first producer at initialization step
assertEquals(producerImpl.getProducers().size(), 1);

assertTrue(send.get());
assertEquals(producerImpl.getProducers().size(), 2);

admin.topics().updatePartitionedTopic(topic, 3);
Awaitility.await().untilAsserted(() ->
assertEquals(((TopicMetadata) field.get(producerImpl)).numPartitions(), 3));
assertEquals(producerImpl.getProducers().size(), 2);

assertTrue(send.get());
assertEquals(producerImpl.getProducers().size(), 3);

admin.topics().updatePartitionedTopic(topic, 4);
Awaitility.await().untilAsserted(() ->
assertEquals(((TopicMetadata) field.get(producerImpl)).numPartitions(), 4));
assertTrue(send.get());
assertEquals(producerImpl.getProducers().size(), 3);
}

@Test
public void testPtUpdateNotSharedMode() throws Throwable {
final String topic = BrokerTestUtil.newUniqueName("pt-update-not-shared");
admin.topics().createPartitionedTopic(topic, 2);

final Field field = PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
field.setAccessible(true);
@Cleanup
final PartitionedProducerImpl<byte[]> producerImplExclusive = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topic)
.enableLazyStartPartitionedProducers(true)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.accessMode(ProducerAccessMode.Exclusive)
.autoUpdatePartitions(true)
.autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
.create();

assertEquals(producerImplExclusive.getProducers().size(), 2);

admin.topics().updatePartitionedTopic(topic, 3);
Awaitility.await().untilAsserted(() ->
assertEquals(((TopicMetadata) field.get(producerImplExclusive)).numPartitions(), 3));
assertEquals(producerImplExclusive.getProducers().size(), 3);

producerImplExclusive.close();

@Cleanup
final PartitionedProducerImpl<byte[]> producerImplWaitForExclusive = (PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topic)
.enableLazyStartPartitionedProducers(true)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.accessMode(ProducerAccessMode.WaitForExclusive)
.autoUpdatePartitions(true)
.autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
.create();

assertEquals(producerImplWaitForExclusive.getProducers().size(), 3);

admin.topics().updatePartitionedTopic(topic, 4);
Awaitility.await().untilAsserted(() ->
assertEquals(((TopicMetadata) field.get(producerImplWaitForExclusive)).numPartitions(), 4));
assertEquals(producerImplWaitForExclusive.getProducers().size(), 4);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -576,4 +576,23 @@ public interface ProducerBuilder<T> extends Cloneable {
* @since 2.5.0
*/
ProducerBuilder<T> enableMultiSchema(boolean multiSchema);

/**
* This config affects Shared mode producers of partitioned topics only. It controls whether
* producers register and connect immediately to the owner broker of each partition
* or start lazily on demand. The internal producer of one partition is always
* started eagerly, chosen by the routing policy, but the internal producers of
* any additional partitions are started on demand, upon receiving their first
* message.
* Using this mode can reduce the strain on brokers for topics with large numbers of
* partitions and when the SinglePartition or some custom partial partition routing policy
* like PartialRoundRobinMessageRouterImpl is used without keyed messages.
* Because producer connection can be on demand, this can produce extra send latency
* for the first messages of a given partition.
*
* @param lazyStartPartitionedProducers
* true/false as to whether to start partition producers lazily
* @return the producer builder instance
*/
ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean lazyStartPartitionedProducers);
}
Loading

0 comments on commit 1f8945a

Please sign in to comment.