Skip to content

Commit

Permalink
merge: #11631 #11649
Browse files Browse the repository at this point in the history
11631: fix: disable batch processing by default on 8.1 r=oleschoenburg a=oleschoenburg

Updates config templates and default config to disable batch processing by default

relates to #11539 

11649: deps(maven): bump netty-bom from 4.1.88.Final to 4.1.89.Final r=oleschoenburg a=dependabot[bot]

Bumps [netty-bom](https://github.com/netty/netty) from 4.1.88.Final to 4.1.89.Final.
<details>
<summary>Commits</summary>
<ul>
<li><a href="https://github.com/netty/netty/commit/263a745b93513b8f761daa007897af9ce57161ba"><code>263a745</code></a> [maven-release-plugin] prepare release netty-4.1.89.Final</li>
<li><a href="https://github.com/netty/netty/commit/ed425fe281f7f20a9d946d9f8213be0a64f3a778"><code>ed425fe</code></a> Don't fail on HttpObjectDecoder's maxHeaderSize greater then (Integer.MAX_VAL...</li>
<li><a href="https://github.com/netty/netty/commit/a803e107614bc8aa0886b9b7ae742463c6a156c1"><code>a803e10</code></a> Revert &quot;Revert &quot;Speed-up HTTP 1.1 header and line parsing (<a href="https://github-redirect.dependabot.com/netty/netty/issues/12321">#12321</a>)&quot;&quot;</li>
<li><a href="https://github.com/netty/netty/commit/9993e07356ea39edf14008789d3377d2bb62ea92"><code>9993e07</code></a> Revert &quot;Speed-up HTTP 1.1 header and line parsing (<a href="https://github-redirect.dependabot.com/netty/netty/issues/12321">#12321</a>)&quot;</li>
<li><a href="https://github.com/netty/netty/commit/4475b5c5719e7e7f36f2f01e25c139bcb2b048d4"><code>4475b5c</code></a> [maven-release-plugin] prepare for next development iteration</li>
<li>See full diff in <a href="https://github.com/netty/netty/compare/netty-4.1.88.Final...netty-4.1.89.Final">compare view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=io.netty:netty-bom&package-manager=maven&previous-version=4.1.88.Final&new-version=4.1.89.Final)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting ``@dependabot` rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- ``@dependabot` rebase` will rebase this PR
- ``@dependabot` recreate` will recreate this PR, overwriting any edits that have been made to it
- ``@dependabot` merge` will merge this PR after your CI passes on it
- ``@dependabot` squash and merge` will squash and merge this PR after your CI passes on it
- ``@dependabot` cancel merge` will cancel a previously requested merge and block automerging
- ``@dependabot` reopen` will reopen this PR if it is closed
- ``@dependabot` close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- ``@dependabot` ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)


</details>

Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
4 people authored Feb 14, 2023
3 parents 4e950c4 + 97532f6 + 31870d9 commit be36dd4
Show file tree
Hide file tree
Showing 11 changed files with 146 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package io.camunda.zeebe.broker.system.configuration;

public final class ProcessingCfg implements ConfigurationEntry {
private static final int DEFAULT_PROCESSING_BATCH_LIMIT = 100;
private static final int DEFAULT_PROCESSING_BATCH_LIMIT = 1;
private Integer maxCommandsInBatch = DEFAULT_PROCESSING_BATCH_LIMIT;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void shouldUseDefaultMaxCommandsInBatch() {
final int limit = cfg.getMaxCommandsInBatch();

// then
assertThat(limit).isEqualTo(100);
assertThat(limit).isEqualTo(1);
}

@Test
Expand Down
4 changes: 2 additions & 2 deletions dist/src/main/config/broker.standalone.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -602,13 +602,13 @@
# Sets the maximum number of commands that processed within one batch.
# The processor will process until no more follow up commands are created by the initial command
# or the configured limit is reached.
# By default, up to 100 commands are processed in one batch. Can be set to 1 to disable batch processing.
# By default, batch processing is disabled.
# Must be a positive integer number.
# Note that the resulting batch size will contain more entries than this limit because it includes follow up events.
# When resulting batch size is too large (see maxMessageSize), processing will be rolled back and retried with a smaller maximum batch size.
# Lowering the command limit can reduce the frequency of rollback and retry.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_PROCESSING_MAXCOMMANDSINBATCH
# maxCommandsInBatch = 100
# maxCommandsInBatch = 1
# experimental
# Be aware that all configuration's which are part of the experimental section
# are subject to change and can be dropped at any time.
Expand Down
4 changes: 2 additions & 2 deletions dist/src/main/config/broker.yaml.template
Original file line number Diff line number Diff line change
Expand Up @@ -540,13 +540,13 @@
# Sets the maximum number of commands that processed within one batch.
# The processor will process until no more follow up commands are created by the initial command
# or the configured limit is reached.
# By default, up to 100 commands are processed in one batch. Can be set to 1 to disable batch processing.
# By default, batch processing is disabled.
# Must be a positive integer number.
# Note that the resulting batch size will contain more entries than this limit because it includes follow up events.
# When resulting batch size is too large (see maxMessageSize), processing will be rolled back and retried with a smaller maximum batch size.
# Lowering the command limit can reduce the frequency of rollback and retry.
# This setting can also be overridden using the environment variable ZEEBE_BROKER_PROCESSING_MAXCOMMANDSINBATCH
# maxCommandsInBatch = 100
# maxCommandsInBatch = 1
# experimental
# Be aware that all configuration's which are part of the experimental section
# are subject to change and can be dropped at any time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

public final class StreamProcessorContext implements ReadonlyStreamProcessorContext {

public static final int DEFAULT_MAX_COMMANDS_IN_BATCH = 100;
public static final int DEFAULT_MAX_COMMANDS_IN_BATCH = 1;
private static final StreamProcessorListener NOOP_LISTENER =
new StreamProcessorListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ public void shouldOnlyExecuteOneBranchWithEqualTimers() {
.withProcessInstanceKey(processInstanceKey)
.onlyCommandRejections()
.getFirst())
.hasRejectionType(RejectionType.NOT_FOUND)
.hasRejectionType(RejectionType.INVALID_STATE)
.hasRecordType(RecordType.COMMAND_REJECTION);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ public void shouldNotTerminateFlowScopeIfPendingActivation() {
assertThat(
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(processInstanceKey)
.limit("A", ProcessInstanceIntent.ELEMENT_TERMINATED))
.limit("C", ProcessInstanceIntent.ELEMENT_ACTIVATED))
.extracting(
r -> r.getValue().getBpmnElementType(),
r -> r.getValue().getElementId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorContext;
import io.camunda.zeebe.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.util.FileUtil;
Expand Down Expand Up @@ -76,6 +77,7 @@ public final class StreamPlatform {
private final ZeebeDbFactory zeebeDbFactory;
private final StreamProcessorLifecycleAware mockProcessorLifecycleAware;
private final StreamProcessorListener mockStreamProcessorListener;
private int maxCommandsInBatch = StreamProcessorContext.DEFAULT_MAX_COMMANDS_IN_BATCH;

public StreamPlatform(
final Path dataDirectory,
Expand Down Expand Up @@ -116,6 +118,10 @@ public CommandResponseWriter getMockCommandResponseWriter() {
return mockCommandResponseWriter;
}

public void setMaxCommandsInBatch(final int maxCommandsInBatch) {
this.maxCommandsInBatch = maxCommandsInBatch;
}

public void createLogStream() {
final var logStorage = new ListLogStorage();
final var logStream =
Expand Down Expand Up @@ -223,6 +229,7 @@ public StreamProcessor buildStreamProcessor(
.eventApplierFactory(EventAppliers::new) // todo remove this soon
.streamProcessorMode(streamProcessorMode)
.listener(mockStreamProcessorListener)
.maxCommandsInBatch(maxCommandsInBatch)
.partitionCommandSender(mock(InterPartitionCommandSender.class));

builder.getLifecycleListeners().add(mockProcessorLifecycleAware);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor;

import static io.camunda.zeebe.engine.util.RecordToWrite.command;
import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.ACTIVATE_ELEMENT;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamPlatform;
import io.camunda.zeebe.engine.util.StreamPlatformExtension;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.verification.VerificationWithTimeout;

@ExtendWith(StreamPlatformExtension.class)
public class StreamProcessorBatchProcessingTest {

private static final long TIMEOUT_MILLIS = 2_000L;
private static final VerificationWithTimeout TIMEOUT = timeout(TIMEOUT_MILLIS);

private static final ProcessInstanceRecord RECORD = Records.processInstance(1);

@SuppressWarnings("unused") // injected by the extension
private StreamPlatform streamPlatform;

@Test
public void shouldProcessFollowUpEventsAndCommands() {
// given
streamPlatform.setMaxCommandsInBatch(100); // enable batch processing
final var defaultRecordProcessor = streamPlatform.getDefaultMockedRecordProcessor();
final var resultBuilderCaptor = ArgumentCaptor.forClass(ProcessingResultBuilder.class);
when(defaultRecordProcessor.process(any(), resultBuilderCaptor.capture()))
.thenAnswer(
(invocation) -> {
final var resultBuilder = resultBuilderCaptor.getValue();
resultBuilder.appendRecordReturnEither(
1,
RecordType.EVENT,
ACTIVATE_ELEMENT,
RejectionType.NULL_VAL,
"",
Records.processInstance(1));
resultBuilder.appendRecordReturnEither(
2,
RecordType.COMMAND,
ACTIVATE_ELEMENT,
RejectionType.NULL_VAL,
"",
Records.processInstance(1));
return resultBuilder.build();
})
.thenAnswer(
(invocation) -> {
final var resultBuilder = resultBuilderCaptor.getValue();
resultBuilder.appendRecordReturnEither(
3,
RecordType.EVENT,
ACTIVATE_ELEMENT,
RejectionType.NULL_VAL,
"",
Records.processInstance(1));
return resultBuilder.build();
});

streamPlatform.startStreamProcessor();

// when
streamPlatform.writeBatch(command().processInstance(ACTIVATE_ELEMENT, RECORD));

// then
verify(defaultRecordProcessor, TIMEOUT.times(2)).process(any(), any());
await("Last written position should be updated")
.untilAsserted(
() -> assertThat(streamPlatform.getLogStream().getLastWrittenPosition()).isEqualTo(4));
await("Last processed position should be updated")
.untilAsserted(
() ->
assertThat(
streamPlatform.getStreamProcessor().getLastProcessedPositionAsync().join())
.isEqualTo(1));

final var logStreamReader = streamPlatform.getLogStream().newLogStreamReader();
logStreamReader.seekToFirstEvent();
final var firstRecord = logStreamReader.next();
assertThat(firstRecord.getSourceEventPosition()).isEqualTo(-1);
final var firstRecordPosition = firstRecord.getPosition();

await("should write follow up events")
.untilAsserted(() -> assertThat(logStreamReader.hasNext()).isTrue());
while (logStreamReader.hasNext()) {
assertThat(logStreamReader.next().getSourceEventPosition()).isEqualTo(firstRecordPosition);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.util.exception.RecoverableException;
import java.util.List;
Expand All @@ -55,6 +56,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.verification.VerificationWithTimeout;
Expand Down Expand Up @@ -228,74 +230,6 @@ public void shouldProcessOnlyCommands() {
verify(defaultRecordProcessor, TIMEOUT.times(1)).process(any(), any());
}

@Test
public void shouldProcessFollowUpEventsAndCommands() {
// given
final var defaultRecordProcessor = streamPlatform.getDefaultMockedRecordProcessor();
final var resultBuilderCaptor = ArgumentCaptor.forClass(ProcessingResultBuilder.class);
when(defaultRecordProcessor.process(any(), resultBuilderCaptor.capture()))
.thenAnswer(
(invocation) -> {
final var resultBuilder = resultBuilderCaptor.getValue();
resultBuilder.appendRecordReturnEither(
1,
RecordType.EVENT,
ACTIVATE_ELEMENT,
RejectionType.NULL_VAL,
"",
Records.processInstance(1));
resultBuilder.appendRecordReturnEither(
2,
RecordType.COMMAND,
ACTIVATE_ELEMENT,
RejectionType.NULL_VAL,
"",
Records.processInstance(1));
return resultBuilder.build();
})
.thenAnswer(
(invocation) -> {
final var resultBuilder = resultBuilderCaptor.getValue();
resultBuilder.appendRecordReturnEither(
3,
RecordType.EVENT,
ACTIVATE_ELEMENT,
RejectionType.NULL_VAL,
"",
Records.processInstance(1));
return resultBuilder.build();
});

streamPlatform.startStreamProcessor();

// when
streamPlatform.writeBatch(command().processInstance(ACTIVATE_ELEMENT, RECORD));

// then
verify(defaultRecordProcessor, TIMEOUT.times(2)).process(any(), any());
await("Last written position should be updated")
.untilAsserted(
() -> assertThat(streamPlatform.getLogStream().getLastWrittenPosition()).isEqualTo(4));
await("Last processed position should be updated")
.untilAsserted(
() ->
assertThat(
streamPlatform.getStreamProcessor().getLastProcessedPositionAsync().join())
.isEqualTo(1));

final var logStreamReader = streamPlatform.getLogStream().newLogStreamReader();
logStreamReader.seekToFirstEvent();
final var firstRecord = logStreamReader.next();
assertThat(firstRecord.getSourceEventPosition()).isEqualTo(-1);
final var firstRecordPosition = firstRecord.getPosition();

await("should write follow up events")
.untilAsserted(() -> assertThat(logStreamReader.hasNext()).isTrue());
while (logStreamReader.hasNext()) {
assertThat(logStreamReader.next().getSourceEventPosition()).isEqualTo(firstRecordPosition);
}
}

@Test
public void shouldSetSourcePointerForFollowUpRecords() {
// given
Expand All @@ -316,7 +250,9 @@ public void shouldSetSourcePointerForFollowUpRecords() {
"",
Records.processInstance(1));

when(defaultRecordProcessor.process(any(), any())).thenReturn(resultBuilder.build());
when(defaultRecordProcessor.process(
ArgumentMatchers.argThat(new RecordIntentMatcher(ACTIVATE_ELEMENT)), any()))
.thenReturn(resultBuilder.build());

streamPlatform.startStreamProcessor();

Expand Down Expand Up @@ -875,4 +811,18 @@ public ProcessingResult onProcessingError(
return processingResultOnError;
}
}

private static final class RecordIntentMatcher implements ArgumentMatcher<TypedRecord> {

private final Intent toMatchIntent;

private RecordIntentMatcher(final Intent toMatchIntent) {
this.toMatchIntent = toMatchIntent;
}

@Override
public boolean matches(final TypedRecord typedRecord) {
return toMatchIntent.equals(typedRecord.getIntent());
}
}
}
2 changes: 1 addition & 1 deletion parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
<version.mockito>4.8.0</version.mockito>
<version.model>7.7.0</version.model>
<version.msgpack>0.9.3</version.msgpack>
<version.netty>4.1.88.Final</version.netty>
<version.netty>4.1.89.Final</version.netty>
<version.objenesis>3.3</version.objenesis>
<version.prometheus>0.16.0</version.prometheus>
<version.protobuf>3.21.9</version.protobuf>
Expand Down

0 comments on commit be36dd4

Please sign in to comment.