Skip to content

Commit

Permalink
Remove references to chatAsync (#16950)
Browse files Browse the repository at this point in the history
Remove references to chatAsync from Rabbit stream supervisors
  • Loading branch information
AmatyaAvadhanula authored Aug 23, 2024
1 parent 2abcb41 commit 8c8a4b2
Show file tree
Hide file tree
Showing 6 changed files with 0 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param
|`resetOffsetAutomatically`|Boolean|Controls behavior when Druid needs to read RabbitMQ messages that are no longer available. Not supported. |no (default == false)|
|`skipSequenceNumberAvailabilityCheck`|Boolean|Whether to enable checking if the current sequence number is still available in a particular RabbitMQ stream. If set to false, the indexing task will attempt to reset the current sequence number (or not), depending on the value of `resetOffsetAutomatically`.|no (default == false)|
|`workerThreads`|Integer|The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation.|no (default == min(10, taskCount))|
|`chatAsync`|Boolean| If true, use asynchronous communication with indexing tasks, and ignore the `chatThreads` parameter. If false, use synchronous communication in a thread pool of size `chatThreads`. | no (default == true) |
|`chatThreads`|Integer| The number of threads that will be used for communicating with indexing tasks. Ignored if `chatAsync` is `true` (the default).| no (default == min(10, taskCount * replicas))|
|`chatRetries`|Integer|The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.| no (default == 8)|
|`httpTimeout`|ISO8601 Period|How long to wait for a HTTP response from an indexing task.|no (default == PT10S)|
|`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)|
Expand Down
2 changes: 0 additions & 2 deletions docs/ingestion/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,6 @@ For configuration properties shared across all streaming ingestion methods, refe
|Property|Type|Description|Required|Default|
|--------|----|-----------|--------|-------|
|`numPersistThreads`|Integer|The number of threads to use to create and persist incremental segments on the disk. Higher ingestion data throughput results in a larger number of incremental segments, causing significant CPU time to be spent on the creation of the incremental segments on the disk. For datasources with number of columns running into hundreds or thousands, creation of the incremental segments may take up significant time, in the order of multiple seconds. In both of these scenarios, ingestion can stall or pause frequently, causing it to fall behind. You can use additional threads to parallelize the segment creation without blocking ingestion as long as there are sufficient CPU resources available.|No|1|
|`chatAsync`|Boolean|If `true`, use asynchronous communication with indexing tasks, and ignore the `chatThreads` parameter. If `false`, use synchronous communication in a thread pool of size `chatThreads`.|No|`true`|
|`chatThreads`|Integer|The number of threads to use for communicating with indexing tasks. Ignored if `chatAsync` is `true`.|No|`min(10, taskCount * replicas)`|

## Deployment notes on Kafka partitions and Druid segments

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ public class RabbitStreamSupervisorTuningConfig extends RabbitStreamIndexTaskTun
implements SeekableStreamSupervisorTuningConfig
{
private final Integer workerThreads;
private final Boolean chatAsync;
private final Integer chatThreads;
private final Long chatRetries;
private final Duration httpTimeout;
private final Duration shutdownTimeout;
Expand Down Expand Up @@ -70,8 +68,6 @@ public static RabbitStreamSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null,
null
);
}
Expand All @@ -92,8 +88,6 @@ public RabbitStreamSupervisorTuningConfig(
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("workerThreads") Integer workerThreads,
@JsonProperty("chatAsync") Boolean chatAsync,
@JsonProperty("chatThreads") Integer chatThreads,
@JsonProperty("chatRetries") Long chatRetries,
@JsonProperty("httpTimeout") Period httpTimeout,
@JsonProperty("shutdownTimeout") Period shutdownTimeout,
Expand Down Expand Up @@ -133,8 +127,6 @@ public RabbitStreamSupervisorTuningConfig(
maxRecordsPerPoll
);
this.workerThreads = workerThreads;
this.chatAsync = chatAsync;
this.chatThreads = chatThreads;
this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES);
this.httpTimeout = SeekableStreamSupervisorTuningConfig.defaultDuration(httpTimeout, DEFAULT_HTTP_TIMEOUT);
this.shutdownTimeout = SeekableStreamSupervisorTuningConfig.defaultDuration(
Expand Down Expand Up @@ -206,7 +198,6 @@ public String toString()
", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() +
", workerThreads=" + workerThreads +
", chatThreads=" + chatThreads +
", chatRetries=" + chatRetries +
", httpTimeout=" + httpTimeout +
", shutdownTimeout=" + shutdownTimeout +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ public void testtoString() throws Exception
"resetOffsetAutomatically=false, " +
"segmentWriteOutMediumFactory=null, " +
"workerThreads=null, " +
"chatThreads=null, " +
"chatRetries=8, " +
"httpTimeout=PT10S, " +
"shutdownTimeout=PT80S, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public class RabbitStreamSupervisorTest extends EasyMockSupport
false,
false);
private static final String DATASOURCE = "testDS";
private static final int TEST_CHAT_THREADS = 3;
private static final long TEST_CHAT_RETRIES = 9L;
private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
Expand Down Expand Up @@ -148,8 +147,6 @@ public void setupTest()
null,
null,
numThreads, // worker threads
null,
TEST_CHAT_THREADS,
TEST_CHAT_RETRIES,
TEST_HTTP_TIMEOUT,
TEST_SHUTDOWN_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public void testSerdeWithNonDefaults() throws Exception
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"workerThreads\": 12,\n"
+ " \"chatThreads\": 13,\n"
+ " \"chatRetries\": 14,\n"
+ " \"httpTimeout\": \"PT15S\",\n"
+ " \"shutdownTimeout\": \"PT95S\",\n"
Expand Down

0 comments on commit 8c8a4b2

Please sign in to comment.