Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

build: update zeebe to version 8.1.2, Jdk set to 17 #346

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up JDK 11
- name: Set up JDK 17
uses: actions/setup-java@v2
with:
java-version: '11'
java-version: '17'
distribution: 'temurin'
cache: 'maven'
- name: Build
Expand Down
25 changes: 18 additions & 7 deletions exporter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@
</properties>

<dependencies>
<!-- serialization -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<!-- runtime dependencies which must be packaged with the exporter -->
<dependency>
<groupId>io.zeebe</groupId>
Expand All @@ -43,6 +54,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-protocol-jackson</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
Expand All @@ -58,13 +75,7 @@
<!-- test dependencies -->
<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-protocol-jackson</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-test</artifactId>
<artifactId>zeebe-exporter-test</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
*/
package io.zeebe.exporters.kafka.record;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.zeebe.protocol.jackson.ZeebeProtocolModule;
import io.camunda.zeebe.protocol.record.Record;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;

Expand All @@ -28,13 +32,15 @@
*/
public final class RecordSerializer implements Serializer<Record<?>> {
private final StringSerializer delegate;
private final ObjectMapper mapper;

public RecordSerializer() {
this(new StringSerializer());
}

public RecordSerializer(final StringSerializer delegate) {
this.delegate = delegate;
this.mapper = new ObjectMapper().registerModule(new ZeebeProtocolModule());
this.delegate = new StringSerializer();
}

@Override
Expand All @@ -44,7 +50,12 @@ public void configure(final Map<String, ?> configs, final boolean isKey) {

@Override
public byte[] serialize(final String topic, final Record data) {
return delegate.serialize(topic, data.toJson());
try {
return delegate.serialize(topic, mapper.writeValueAsString(data));
} catch (JsonProcessingException e) {
throw new SerializationException(
String.format("Expected to serialize data for topic [%s], but failed", topic), e);
}
}

@Override
Expand Down
160 changes: 92 additions & 68 deletions exporter/src/test/java/io/zeebe/exporters/kafka/KafkaExporterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.tuple;

import io.camunda.zeebe.exporter.test.ExporterTestConfiguration;
import io.camunda.zeebe.exporter.test.ExporterTestContext;
import io.camunda.zeebe.exporter.test.ExporterTestController;
import io.camunda.zeebe.protocol.record.ImmutableRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.test.exporter.ExporterTestHarness;
import io.camunda.zeebe.test.exporter.record.MockRecordMetadata;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.value.ImmutableDeploymentRecordValue;
import io.zeebe.exporters.kafka.config.Config;
import io.zeebe.exporters.kafka.config.parser.MockConfigParser;
import io.zeebe.exporters.kafka.config.parser.RawConfigParser;
Expand All @@ -31,7 +37,8 @@
import io.zeebe.exporters.kafka.producer.RecordBatchStub;
import io.zeebe.exporters.kafka.record.RecordHandler;
import io.zeebe.exporters.kafka.serde.RecordId;
import java.util.stream.Collectors;
import java.time.Duration;
import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;
Expand All @@ -44,45 +51,49 @@ final class KafkaExporterTest {

private final RawConfig rawConfig = new RawConfig();
private final MockConfigParser<RawConfig, Config> mockConfigParser =
new MockConfigParser<>(new RawConfigParser());
new MockConfigParser<>(new RawConfigParser());

private final ExporterTestContext context =
new ExporterTestContext().setConfiguration(new ExporterTestConfiguration<>("test", rawConfig));
private final ExporterTestController controller = new ExporterTestController();

private final RecordBatchStub.Factory batchStubFactory = new RecordBatchStub.Factory();
private final KafkaExporter exporter = new KafkaExporter(batchStubFactory, mockConfigParser);
private final ExporterTestHarness testHarness = new ExporterTestHarness(exporter);

@Test
void shouldAddRecordToBatchOnExport() throws Exception {
void shouldAddRecordToBatchOnExport() {
// given
rawConfig.maxBatchSize = 5;
testHarness.configure(EXPORTER_ID, rawConfig);
testHarness.open();
exporter.configure(context);
exporter.open(controller);

final Record<?> record = recordFixture();

// when
final var records = testHarness.stream().export(5);
exporter.export(record);

// then
final var expectedIds =
records.stream()
.map(r -> new RecordId(r.getPartitionId(), r.getPosition()))
.collect(Collectors.toList());
final var expectedIds = new RecordId(record.getPartitionId(), record.getPosition());
assertThat(batchStubFactory.stub.getPendingRecords())
.as("the records were added to the batch in order")
.extracting(ProducerRecord::key)
.containsExactlyElementsOf(expectedIds);
.containsExactlyElementsOf(List.of(expectedIds));
assertThat(batchStubFactory.stub.getFlushedRecords())
.as("no records were flushed yet")
.isEmpty();
}

@Test
void shouldUseCorrectSerializer() throws Exception {
void shouldUseCorrectSerializer() {
// given
testHarness.configure(EXPORTER_ID, rawConfig);
testHarness.open();
exporter.configure(context);
exporter.open(controller);
final var recordHandler = new RecordHandler(mockConfigParser.config.getRecords());

final Record<?> record = recordFixture();

// when
final var json = "{\"a\": 1}";
final var record = testHarness.export(r -> r.setJson(json));
exporter.export(record);

// then
final var expectedRecord = recordHandler.transform(record);
Expand All @@ -93,123 +104,136 @@ final var record = testHarness.export(r -> r.setJson(json));
tuple(expectedRecord.topic(), expectedRecord.key(), expectedRecord.value()));
}


@Test
void shouldSkipDisallowedRecords() throws Exception {
void shouldSkipDisallowedRecords() {
// given
rawConfig.records = new RawRecordsConfig();
rawConfig.records.deployment = new RawRecordConfig();
rawConfig.records.deployment.type = "";
testHarness.configure(EXPORTER_ID, rawConfig);
testHarness.open();
exporter.configure(context);
exporter.open(controller);

final Record<?> record = recordFixture();

// when
testHarness.export(
r -> r.setMetadata(new MockRecordMetadata().setValueType(ValueType.DEPLOYMENT)));
exporter.export(record);

// then
assertThat(batchStubFactory.stub.getPendingRecords())
.as("disallowed record should not be added to the batch")
.isEmpty();
}


@Test
void shouldFlushOnScheduledTask() throws Exception {
void shouldFlushOnScheduledTask() {
// given
rawConfig.maxBatchSize = 5;
testHarness.configure(EXPORTER_ID, rawConfig);
testHarness.open();
exporter.configure(context);
exporter.open(controller);

final Record<?> record = recordFixture();

// when
final var records = testHarness.stream().export(5);
triggerFlushTask();
exporter.export(record);
controller.runScheduledTasks(Duration.ofSeconds(10));

// then
final var expectedIds =
records.stream()
.map(r -> new RecordId(r.getPartitionId(), r.getPosition()))
.collect(Collectors.toList());
final var expectedIds = new RecordId(record.getPartitionId(), record.getPosition());
assertThat(batchStubFactory.stub.getFlushedRecords())
.as("the records were added to the batch in order")
.extracting(ProducerRecord::key)
.containsExactlyElementsOf(expectedIds);
.containsExactlyElementsOf(List.of(expectedIds));
assertThat(batchStubFactory.stub.getPendingRecords())
.as("no pending records after flush")
.isEmpty();
}

@Test
void shouldUpdatePositionOnFlush() throws Exception {
void shouldUpdatePositionOnFlush() {
// given
testHarness.configure(EXPORTER_ID, rawConfig);
testHarness.open();
exporter.configure(context);
exporter.open(controller);

final Record<?> record = recordFixture();

// when
final var records = testHarness.stream().export(5);
triggerFlushTask();
exporter.export(record);
controller.runScheduledTasks(Duration.ofSeconds(10));

// then
assertThat(testHarness.getLastUpdatedPosition())
assertThat(controller.getPosition())
.as("position should be updated since after flush")
.isEqualTo(records.get(4).getPosition());
.isEqualTo(record.getPosition());
}

@Test
void shouldRescheduleFlushTaskEvenOnException() throws Exception {
void shouldRescheduleFlushTaskEvenOnException() {
// given
testHarness.configure(EXPORTER_ID, rawConfig);
testHarness.open();
exporter.configure(context);
exporter.open(controller);

final Record<?> record = recordFixture();

// when
final var records = testHarness.stream().export(2);
exporter.export(record);
batchStubFactory.stub.flushException = new RuntimeException("failed to flush");
assertThatThrownBy(this::triggerFlushTask).isEqualTo(batchStubFactory.stub.flushException);
assertThatThrownBy(() -> controller.runScheduledTasks(Duration.ofSeconds(10))).isEqualTo(batchStubFactory.stub.flushException);
batchStubFactory.stub.flushException = null;
triggerFlushTask();
controller.runScheduledTasks(Duration.ofSeconds(10));

// then
assertThat(testHarness.getLastUpdatedPosition())
assertThat(controller.getPosition())
.as("position should be updated since we managed to flush after the second try")
.isEqualTo(records.get(1).getPosition());
.isEqualTo(record.getPosition());
}

@Test
void shouldFlushBatchOnClose() throws Exception {
void shouldFlushBatchOnClose() {
// given
testHarness.configure(EXPORTER_ID, rawConfig);
testHarness.open();
exporter.configure(context);
exporter.open(controller);
final Record<?> record = recordFixture();

// when
final var records = testHarness.stream().export(2);
testHarness.close();
exporter.export(record);
exporter.close();

// then
assertThat(testHarness.getLastUpdatedPosition())
assertThat(controller.getPosition())
.as("position should be updated since we managed to flush after the second try")
.isEqualTo(records.get(1).getPosition());
.isEqualTo(record.getPosition());
assertThat(batchStubFactory.stub.isClosed())
.as("batch should be closed on exporter close")
.isTrue();
}

@Test
void shouldRescheduleFlush() throws Exception {
void shouldRescheduleFlush() {
// given
testHarness.configure(EXPORTER_ID, rawConfig);
testHarness.open();
exporter.configure(context);
exporter.open(controller);

// when
triggerFlushTask();
final var records = testHarness.stream().export(2);
triggerFlushTask();
controller.runScheduledTasks(Duration.ofSeconds(10));
exporter.export(recordFixture());
controller.runScheduledTasks(Duration.ofSeconds(10));

// then
assertThat(testHarness.getLastUpdatedPosition())
assertThat(controller.getPosition())
.as("position should be updated after triggering the second flush task")
.isEqualTo(records.get(1).getPosition());
.isEqualTo(0);
}

private void triggerFlushTask() {
mockConfigParser.parse(rawConfig);
testHarness.runScheduledTasks(mockConfigParser.config.getFlushInterval());
Record<?> recordFixture() {
return
ImmutableRecord.builder()
.withIntent(DeploymentIntent.CREATED)
.withRecordType(RecordType.EVENT)
.withValueType(ValueType.DEPLOYMENT)
.withValue(ImmutableDeploymentRecordValue
.builder()
.build())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@
@Execution(ExecutionMode.CONCURRENT)
final class RawRecordsConfigParserTest {
private static final Set<ValueType> EXPECTED_VALUE_TYPES =
EnumSet.complementOf(EnumSet.of(ValueType.NULL_VAL, ValueType.SBE_UNKNOWN));
EnumSet.complementOf(EnumSet.of(
ValueType.NULL_VAL,
ValueType.SBE_UNKNOWN,
ValueType.DECISION,
ValueType.DECISION_REQUIREMENTS,
ValueType.DECISION_EVALUATION,
ValueType.CHECKPOINT,
ValueType.PROCESS_INSTANCE_MODIFICATION
));

private final RawRecordsConfigParser parser = new RawRecordsConfigParser();

Expand Down
Loading