Skip to content

Commit

Permalink
Fix pulsar-io-thread block forever (streamnative#946)
Browse files Browse the repository at this point in the history
When the requestQueue is full, the pulsar-io thread will always be blocked in: requestQueue.put(), which will cause the requestQueue.remove method to not be called, because the writeAndFlushResponseToClient method is also executed by the pulsar-io thread:
`                ctx.channel().eventLoop().execute(() -> {
                     writeAndFlushResponseToClient(channel);
                 });`
This will cause a lot of close_wait on the server side;

related to:
streamnative#942
  • Loading branch information
lordcheng10 authored Dec 3, 2021
1 parent 2b89562 commit c4ce316
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarService;
import org.eclipse.jetty.util.ssl.SslContextFactory;

Expand Down Expand Up @@ -59,6 +60,7 @@ public class KafkaChannelInitializer extends ChannelInitializer<SocketChannel> {
private final SslContextFactory.Server sslContextFactory;
@Getter
private final StatsLogger statsLogger;
private final OrderedScheduler sendResponseScheduler;

public KafkaChannelInitializer(PulsarService pulsarService,
KafkaServiceConfiguration kafkaConfig,
Expand All @@ -70,7 +72,8 @@ public KafkaChannelInitializer(PulsarService pulsarService,
boolean enableTLS,
EndPoint advertisedEndPoint,
boolean skipMessagesWithoutIndex,
StatsLogger statsLogger) {
StatsLogger statsLogger,
OrderedScheduler sendResponseScheduler) {
super();
this.pulsarService = pulsarService;
this.kafkaConfig = kafkaConfig;
Expand All @@ -88,6 +91,7 @@ public KafkaChannelInitializer(PulsarService pulsarService,
} else {
sslContextFactory = null;
}
this.sendResponseScheduler = sendResponseScheduler;
}

@Override
Expand All @@ -112,7 +116,7 @@ public KafkaRequestHandler newCnx() throws Exception {
return new KafkaRequestHandler(pulsarService, kafkaConfig,
tenantContextManager, kopBrokerLookupManager, adminManager,
producePurgatory, fetchPurgatory,
enableTls, advertisedEndPoint, skipMessagesWithoutIndex, statsLogger);
enableTls, advertisedEndPoint, skipMessagesWithoutIndex, statsLogger, sendResponseScheduler);
}

@VisibleForTesting
Expand All @@ -121,6 +125,6 @@ public KafkaRequestHandler newCnx(final TenantContextManager tenantContextManage
return new KafkaRequestHandler(pulsarService, kafkaConfig,
tenantContextManager, kopBrokerLookupManager, adminManager,
producePurgatory, fetchPurgatory,
enableTls, advertisedEndPoint, skipMessagesWithoutIndex, statsLogger);
enableTls, advertisedEndPoint, skipMessagesWithoutIndex, statsLogger, sendResponseScheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -65,11 +66,15 @@ public abstract class KafkaCommandDecoder extends ChannelInboundHandlerAdapter {
@Getter
protected final KafkaServiceConfiguration kafkaConfig;

private final OrderedScheduler sendResponseScheduler;

public KafkaCommandDecoder(StatsLogger statsLogger,
KafkaServiceConfiguration kafkaConfig) {
KafkaServiceConfiguration kafkaConfig,
OrderedScheduler sendResponseScheduler) {
this.requestStats = new RequestStats(statsLogger);
this.kafkaConfig = kafkaConfig;
this.requestQueue = new LinkedBlockingQueue<>(kafkaConfig.getMaxQueuedRequests());
this.sendResponseScheduler = sendResponseScheduler;
}

@Override
Expand Down Expand Up @@ -246,7 +251,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
registerRequestLatency.accept(kafkaHeaderAndRequest.getHeader().apiKey().name,
startProcessRequestTimestamp);

ctx.channel().eventLoop().execute(() -> {
sendResponseScheduler.executeOrdered(channel.remoteAddress().hashCode(), () -> {
writeAndFlushResponseToClient(channel);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class KafkaProtocolHandler implements ProtocolHandler, TenantContextManag
private BrokerService brokerService;
@Getter
private KopEventManager kopEventManager;
private OrderedScheduler sendResponseScheduler;

private final Map<String, GroupCoordinator> groupCoordinatorsByTenant = new ConcurrentHashMap<>();
private final Map<String, TransactionCoordinator> transactionCoordinatorByTenant = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -441,6 +442,10 @@ public void initialize(ServiceConfiguration conf) throws Exception {
statsProvider = new PrometheusMetricsProvider();
rootStatsLogger = statsProvider.getStatsLogger("");
scopeStatsLogger = rootStatsLogger.scope(SERVER_SCOPE);
sendResponseScheduler = OrderedScheduler.newSchedulerBuilder()
.name("send-response")
.numThreads(kafkaConfig.getNumSendKafkaResponseThreads())
.build();
}

// This method is called after initialize
Expand Down Expand Up @@ -581,7 +586,8 @@ private KafkaChannelInitializer newKafkaChannelInitializer(final EndPoint endPoi
endPoint.isTlsEnabled(),
endPoint,
kafkaConfig.isSkipMessagesWithoutIndex(),
scopeStatsLogger);
scopeStatsLogger,
sendResponseScheduler);
}

// this is called after initialize, and with kafkaConfig, brokerService all set.
Expand Down Expand Up @@ -643,6 +649,7 @@ public void close() {
KafkaTopicManager.getTopics().clear();
kopBrokerLookupManager.close();
statsProvider.stop();
sendResponseScheduler.shutdown();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.stream.IntStream;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -284,8 +285,9 @@ public KafkaRequestHandler(PulsarService pulsarService,
Boolean tlsEnabled,
EndPoint advertisedEndPoint,
boolean skipMessagesWithoutIndex,
StatsLogger statsLogger) throws Exception {
super(statsLogger, kafkaConfig);
StatsLogger statsLogger,
OrderedScheduler sendResponseScheduler) throws Exception {
super(statsLogger, kafkaConfig, sendResponseScheduler);
this.pulsarService = pulsarService;
this.tenantContextManager = tenantContextManager;
this.kopBrokerLookupManager = kopBrokerLookupManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public class KafkaServiceConfiguration extends ServiceConfiguration {
//
// --- Kafka on Pulsar Broker configuration ---
//
@FieldContext(
category = CATEGORY_KOP,
doc = "The number of threads used to respond to the response."
)
private int numSendKafkaResponseThreads = 4;

@FieldContext(
category = CATEGORY_KOP,
Expand Down

0 comments on commit c4ce316

Please sign in to comment.