Skip to content

Commit

Permalink
Merge pull request #353 from camunda-community-hub/np-pass-receiver
Browse files Browse the repository at this point in the history
Allow configuring the debug receiver in general
  • Loading branch information
npepinpe authored Sep 24, 2022
2 parents 572438d + 4161f90 commit 33557a5
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 17 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: Continuous Integration
on:
pull_request:
branches: [ main ]
paths:
- '**/*.java'
- '**/pom.xml'
Expand All @@ -18,6 +19,13 @@ on:
- '.github/workflows/test.yml'
- '.github/workflows/lint.yml'
workflow_dispatch: { }

concurrency:
# add a sub-key using the run ID whenever this is called via workflow dispatch (i.e. manually by a user); in that
# case, we do want to have as many runs as a user wants. for all other cases, this will only run once.
group: ${{ github.workflow }}-${{ (github.event_name == 'workflow_dispatch' && github.run_id) || '' }}-${{ github.ref }}
cancel-in-progress: true

jobs:
lint:
name: Linting & Analysis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.zeebe.containers.ZeebeBrokerNode;
import io.zeebe.containers.ZeebeGatewayNode;
import io.zeebe.containers.cluster.ZeebeCluster;
import io.zeebe.containers.exporter.DebugReceiver;
import java.time.Duration;
import org.apiguardian.api.API;
import org.apiguardian.api.API.Status;
Expand All @@ -36,19 +37,6 @@
@API(status = Status.EXPERIMENTAL)
public interface ContainerEngine extends Startable, ZeebeTestEngine {

/**
* Marks all records with a position less than {@code position} on partition with ID {@code
* partitionId} as acknowledged, meaning they can now be deleted from Zeebe.
*
* <p>Note that this is not a synchronous operation, but instead will take effect when the next
* record is exported. See {@link io.zeebe.containers.exporter.DebugReceiver#acknowledge(int,
* long)} for more.
*
* @param partitionId the ID of the partition on which to acknowledge
* @param position the position up to which they should be acknowledged
*/
void acknowledge(final int partitionId, final long position);

/**
* Returns a default builder. Calling {@link Builder#build()} on a fresh builder will return a
* builder wrapping a default {@link io.zeebe.containers.ZeebeContainer}, with an idle period of 1
Expand All @@ -70,6 +58,19 @@ static ContainerEngine createDefault() {
return builder().build();
}

/**
* Marks all records with a position less than {@code position} on partition with ID {@code
* partitionId} as acknowledged, meaning they can now be deleted from Zeebe.
*
* <p>Note that this is not a synchronous operation, but instead will take effect when the next
* record is exported. See {@link io.zeebe.containers.exporter.DebugReceiver#acknowledge(int,
* long)} for more.
*
* @param partitionId the ID of the partition on which to acknowledge
* @param position the position up to which they should be acknowledged
*/
void acknowledge(final int partitionId, final long position);

/**
* A helper class to build {@link ContainerEngine} instances. A fresh, non-configured builder will
* always return one which has an idle period of 1 second, and uses a default {@link
Expand Down Expand Up @@ -161,7 +162,10 @@ interface Builder {
*
* @param acknowledge whether to automatically acknowledge exported records or not
* @return itself for chaining
* @deprecated since 3.5.2, will be removed in 3.7.0; use {@link
* #withDebugReceiver(DebugReceiver)} instead
*/
@Deprecated
Builder withAutoAcknowledge(final boolean acknowledge);

/**
Expand All @@ -170,9 +174,21 @@ interface Builder {
*
* @param port the port to assign to the receiver
* @return itself for chaining
* @deprecated since 3.5.2, will be removed in 3.7.0; use {@link
* #withDebugReceiver(DebugReceiver)} instead
*/
@Deprecated
Builder withDebugReceiverPort(final int port);

/**
* The pre-configured {@link DebugReceiver} instance to use. Useful if you want to pre-assign
* ports or have fine-grained control over the acknowledgment process.
*
* @param receiver the debug receiver to use
* @return itself for chaining
*/
Builder withDebugReceiver(final DebugReceiver receiver);

/**
* Builds a {@link ContainerEngine} based on the configuration. If nothing else was called, will
* build an engine using a default {@link io.zeebe.containers.ZeebeContainer}, an idle period of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ final class ContainerEngineBuilder implements Builder {
private Duration gracePeriod;
private boolean autoAcknowledge;
private int debugReceiverPort;
private DebugReceiver debugReceiver;

@Override
public <T extends GenericContainer<T> & ZeebeGatewayNode<T> & ZeebeBrokerNode<T>>
Expand Down Expand Up @@ -102,21 +103,34 @@ public Builder withAutoAcknowledge(final boolean autoAcknowledge) {

@Override
public Builder withDebugReceiverPort(final int debugReceiverPort) {
if (debugReceiverPort < 0) {
throw new IllegalArgumentException(
String.format(
"Debug receiver port must be greater than or equal to 0, but %d was given",
debugReceiverPort));
}

this.debugReceiverPort = debugReceiverPort;
return this;
}

@Override
public Builder withDebugReceiver(final DebugReceiver debugReceiver) {
this.debugReceiver = Objects.requireNonNull(debugReceiver, "must specify a debug receiver");
return this;
}

@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public ContainerEngine build() {
final Duration listGracePeriod = Optional.ofNullable(gracePeriod).orElse(DEFAULT_GRACE_PERIOD);
final Duration receiveIdlePeriod = Optional.ofNullable(idlePeriod).orElse(DEFAULT_IDLE_PERIOD);
final InfiniteList<Record<?>> records = new InfiniteList<>(listGracePeriod);
final DebugReceiver receiver =
Optional.ofNullable(debugReceiver)
.orElse(new DebugReceiver(records::add, debugReceiverPort, autoAcknowledge));
final DebugReceiverStream recordStream =
new DebugReceiverStream(
records,
new DebugReceiver(records::add, debugReceiverPort, autoAcknowledge),
receiveIdlePeriod);
new DebugReceiverStream(records, receiver, receiveIdlePeriod);

try {
if (container != null) {
Expand Down

0 comments on commit 33557a5

Please sign in to comment.