diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0eb39bb..2cf9a82 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,10 +22,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - - name: Set up JDK 11 + - name: Set up JDK 17 uses: actions/setup-java@v2 with: - java-version: '11' + java-version: '17' distribution: 'temurin' cache: 'maven' - name: Build diff --git a/exporter/pom.xml b/exporter/pom.xml index 33b5b5c..cd60cee 100644 --- a/exporter/pom.xml +++ b/exporter/pom.xml @@ -19,6 +19,17 @@ + + + com.fasterxml.jackson.core + jackson-databind + + + + com.fasterxml.jackson.core + jackson-core + + io.zeebe @@ -43,6 +54,12 @@ provided + + io.camunda + zeebe-protocol-jackson + provided + + slf4j-api org.slf4j @@ -58,13 +75,7 @@ io.camunda - zeebe-protocol-jackson - test - - - - io.camunda - zeebe-test + zeebe-exporter-test test diff --git a/exporter/src/main/java/io/zeebe/exporters/kafka/record/RecordSerializer.java b/exporter/src/main/java/io/zeebe/exporters/kafka/record/RecordSerializer.java index 2b3e46a..47ddea1 100644 --- a/exporter/src/main/java/io/zeebe/exporters/kafka/record/RecordSerializer.java +++ b/exporter/src/main/java/io/zeebe/exporters/kafka/record/RecordSerializer.java @@ -15,8 +15,12 @@ */ package io.zeebe.exporters.kafka.record; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.camunda.zeebe.protocol.jackson.ZeebeProtocolModule; import io.camunda.zeebe.protocol.record.Record; import java.util.Map; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -28,13 +32,15 @@ */ public final class RecordSerializer implements Serializer> { private final StringSerializer delegate; + private final ObjectMapper mapper; public RecordSerializer() { this(new StringSerializer()); } public RecordSerializer(final StringSerializer delegate) { - this.delegate = delegate; + this.mapper = new ObjectMapper().registerModule(new ZeebeProtocolModule()); + this.delegate = new StringSerializer(); } @Override @@ -44,7 +50,12 @@ public void configure(final Map configs, final boolean isKey) { @Override public byte[] serialize(final String topic, final Record data) { - return delegate.serialize(topic, data.toJson()); + try { + return delegate.serialize(topic, mapper.writeValueAsString(data)); + } catch (JsonProcessingException e) { + throw new SerializationException( + String.format("Expected to serialize data for topic [%s], but failed", topic), e); + } } @Override diff --git a/exporter/src/test/java/io/zeebe/exporters/kafka/KafkaExporterTest.java b/exporter/src/test/java/io/zeebe/exporters/kafka/KafkaExporterTest.java index e426c0e..bc35333 100644 --- a/exporter/src/test/java/io/zeebe/exporters/kafka/KafkaExporterTest.java +++ b/exporter/src/test/java/io/zeebe/exporters/kafka/KafkaExporterTest.java @@ -19,9 +19,15 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.tuple; +import io.camunda.zeebe.exporter.test.ExporterTestConfiguration; +import io.camunda.zeebe.exporter.test.ExporterTestContext; +import io.camunda.zeebe.exporter.test.ExporterTestController; +import io.camunda.zeebe.protocol.record.ImmutableRecord; +import io.camunda.zeebe.protocol.record.Record; +import io.camunda.zeebe.protocol.record.RecordType; import io.camunda.zeebe.protocol.record.ValueType; -import io.camunda.zeebe.test.exporter.ExporterTestHarness; -import io.camunda.zeebe.test.exporter.record.MockRecordMetadata; +import io.camunda.zeebe.protocol.record.intent.DeploymentIntent; +import io.camunda.zeebe.protocol.record.value.ImmutableDeploymentRecordValue; import io.zeebe.exporters.kafka.config.Config; import io.zeebe.exporters.kafka.config.parser.MockConfigParser; import io.zeebe.exporters.kafka.config.parser.RawConfigParser; @@ -31,7 +37,8 @@ import io.zeebe.exporters.kafka.producer.RecordBatchStub; import io.zeebe.exporters.kafka.record.RecordHandler; import io.zeebe.exporters.kafka.serde.RecordId; -import java.util.stream.Collectors; +import java.time.Duration; +import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; @@ -44,45 +51,49 @@ final class KafkaExporterTest { private final RawConfig rawConfig = new RawConfig(); private final MockConfigParser mockConfigParser = - new MockConfigParser<>(new RawConfigParser()); + new MockConfigParser<>(new RawConfigParser()); + + private final ExporterTestContext context = + new ExporterTestContext().setConfiguration(new ExporterTestConfiguration<>("test", rawConfig)); + private final ExporterTestController controller = new ExporterTestController(); + private final RecordBatchStub.Factory batchStubFactory = new RecordBatchStub.Factory(); private final KafkaExporter exporter = new KafkaExporter(batchStubFactory, mockConfigParser); - private final ExporterTestHarness testHarness = new ExporterTestHarness(exporter); @Test - void shouldAddRecordToBatchOnExport() throws Exception { + void shouldAddRecordToBatchOnExport() { // given rawConfig.maxBatchSize = 5; - testHarness.configure(EXPORTER_ID, rawConfig); - testHarness.open(); + exporter.configure(context); + exporter.open(controller); + + final Record record = recordFixture(); // when - final var records = testHarness.stream().export(5); + exporter.export(record); // then - final var expectedIds = - records.stream() - .map(r -> new RecordId(r.getPartitionId(), r.getPosition())) - .collect(Collectors.toList()); + final var expectedIds = new RecordId(record.getPartitionId(), record.getPosition()); assertThat(batchStubFactory.stub.getPendingRecords()) .as("the records were added to the batch in order") .extracting(ProducerRecord::key) - .containsExactlyElementsOf(expectedIds); + .containsExactlyElementsOf(List.of(expectedIds)); assertThat(batchStubFactory.stub.getFlushedRecords()) .as("no records were flushed yet") .isEmpty(); } @Test - void shouldUseCorrectSerializer() throws Exception { + void shouldUseCorrectSerializer() { // given - testHarness.configure(EXPORTER_ID, rawConfig); - testHarness.open(); + exporter.configure(context); + exporter.open(controller); final var recordHandler = new RecordHandler(mockConfigParser.config.getRecords()); + final Record record = recordFixture(); + // when - final var json = "{\"a\": 1}"; - final var record = testHarness.export(r -> r.setJson(json)); + exporter.export(record); // then final var expectedRecord = recordHandler.transform(record); @@ -93,18 +104,20 @@ final var record = testHarness.export(r -> r.setJson(json)); tuple(expectedRecord.topic(), expectedRecord.key(), expectedRecord.value())); } + @Test - void shouldSkipDisallowedRecords() throws Exception { + void shouldSkipDisallowedRecords() { // given rawConfig.records = new RawRecordsConfig(); rawConfig.records.deployment = new RawRecordConfig(); rawConfig.records.deployment.type = ""; - testHarness.configure(EXPORTER_ID, rawConfig); - testHarness.open(); + exporter.configure(context); + exporter.open(controller); + + final Record record = recordFixture(); // when - testHarness.export( - r -> r.setMetadata(new MockRecordMetadata().setValueType(ValueType.DEPLOYMENT))); + exporter.export(record); // then assertThat(batchStubFactory.stub.getPendingRecords()) @@ -112,104 +125,115 @@ void shouldSkipDisallowedRecords() throws Exception { .isEmpty(); } + @Test - void shouldFlushOnScheduledTask() throws Exception { + void shouldFlushOnScheduledTask() { // given - rawConfig.maxBatchSize = 5; - testHarness.configure(EXPORTER_ID, rawConfig); - testHarness.open(); + exporter.configure(context); + exporter.open(controller); + + final Record record = recordFixture(); // when - final var records = testHarness.stream().export(5); - triggerFlushTask(); + exporter.export(record); + controller.runScheduledTasks(Duration.ofSeconds(10)); // then - final var expectedIds = - records.stream() - .map(r -> new RecordId(r.getPartitionId(), r.getPosition())) - .collect(Collectors.toList()); + final var expectedIds = new RecordId(record.getPartitionId(), record.getPosition()); assertThat(batchStubFactory.stub.getFlushedRecords()) .as("the records were added to the batch in order") .extracting(ProducerRecord::key) - .containsExactlyElementsOf(expectedIds); + .containsExactlyElementsOf(List.of(expectedIds)); assertThat(batchStubFactory.stub.getPendingRecords()) .as("no pending records after flush") .isEmpty(); } @Test - void shouldUpdatePositionOnFlush() throws Exception { + void shouldUpdatePositionOnFlush() { // given - testHarness.configure(EXPORTER_ID, rawConfig); - testHarness.open(); + exporter.configure(context); + exporter.open(controller); + + final Record record = recordFixture(); // when - final var records = testHarness.stream().export(5); - triggerFlushTask(); + exporter.export(record); + controller.runScheduledTasks(Duration.ofSeconds(10)); // then - assertThat(testHarness.getLastUpdatedPosition()) + assertThat(controller.getPosition()) .as("position should be updated since after flush") - .isEqualTo(records.get(4).getPosition()); + .isEqualTo(record.getPosition()); } @Test - void shouldRescheduleFlushTaskEvenOnException() throws Exception { + void shouldRescheduleFlushTaskEvenOnException() { // given - testHarness.configure(EXPORTER_ID, rawConfig); - testHarness.open(); + exporter.configure(context); + exporter.open(controller); + + final Record record = recordFixture(); // when - final var records = testHarness.stream().export(2); + exporter.export(record); batchStubFactory.stub.flushException = new RuntimeException("failed to flush"); - assertThatThrownBy(this::triggerFlushTask).isEqualTo(batchStubFactory.stub.flushException); + assertThatThrownBy(() -> controller.runScheduledTasks(Duration.ofSeconds(10))).isEqualTo(batchStubFactory.stub.flushException); batchStubFactory.stub.flushException = null; - triggerFlushTask(); + controller.runScheduledTasks(Duration.ofSeconds(10)); // then - assertThat(testHarness.getLastUpdatedPosition()) + assertThat(controller.getPosition()) .as("position should be updated since we managed to flush after the second try") - .isEqualTo(records.get(1).getPosition()); + .isEqualTo(record.getPosition()); } @Test - void shouldFlushBatchOnClose() throws Exception { + void shouldFlushBatchOnClose() { // given - testHarness.configure(EXPORTER_ID, rawConfig); - testHarness.open(); + exporter.configure(context); + exporter.open(controller); + final Record record = recordFixture(); // when - final var records = testHarness.stream().export(2); - testHarness.close(); + exporter.export(record); + exporter.close(); // then - assertThat(testHarness.getLastUpdatedPosition()) + assertThat(controller.getPosition()) .as("position should be updated since we managed to flush after the second try") - .isEqualTo(records.get(1).getPosition()); + .isEqualTo(record.getPosition()); assertThat(batchStubFactory.stub.isClosed()) .as("batch should be closed on exporter close") .isTrue(); } @Test - void shouldRescheduleFlush() throws Exception { + void shouldRescheduleFlush() { // given - testHarness.configure(EXPORTER_ID, rawConfig); - testHarness.open(); + exporter.configure(context); + exporter.open(controller); // when - triggerFlushTask(); - final var records = testHarness.stream().export(2); - triggerFlushTask(); + controller.runScheduledTasks(Duration.ofSeconds(10)); + exporter.export(recordFixture()); + controller.runScheduledTasks(Duration.ofSeconds(10)); // then - assertThat(testHarness.getLastUpdatedPosition()) + assertThat(controller.getPosition()) .as("position should be updated after triggering the second flush task") - .isEqualTo(records.get(1).getPosition()); + .isEqualTo(0); } - private void triggerFlushTask() { - mockConfigParser.parse(rawConfig); - testHarness.runScheduledTasks(mockConfigParser.config.getFlushInterval()); + Record recordFixture() { + return + ImmutableRecord.builder() + .withIntent(DeploymentIntent.CREATED) + .withRecordType(RecordType.EVENT) + .withValueType(ValueType.DEPLOYMENT) + .withValue(ImmutableDeploymentRecordValue + .builder() + .build()) + .build(); } } diff --git a/exporter/src/test/java/io/zeebe/exporters/kafka/config/parser/RawRecordsConfigParserTest.java b/exporter/src/test/java/io/zeebe/exporters/kafka/config/parser/RawRecordsConfigParserTest.java index f6a665d..f6e224a 100644 --- a/exporter/src/test/java/io/zeebe/exporters/kafka/config/parser/RawRecordsConfigParserTest.java +++ b/exporter/src/test/java/io/zeebe/exporters/kafka/config/parser/RawRecordsConfigParserTest.java @@ -31,7 +31,15 @@ @Execution(ExecutionMode.CONCURRENT) final class RawRecordsConfigParserTest { private static final Set EXPECTED_VALUE_TYPES = - EnumSet.complementOf(EnumSet.of(ValueType.NULL_VAL, ValueType.SBE_UNKNOWN)); + EnumSet.complementOf(EnumSet.of( + ValueType.NULL_VAL, + ValueType.SBE_UNKNOWN, + ValueType.DECISION, + ValueType.DECISION_REQUIREMENTS, + ValueType.DECISION_EVALUATION, + ValueType.CHECKPOINT, + ValueType.PROCESS_INSTANCE_MODIFICATION + )); private final RawRecordsConfigParser parser = new RawRecordsConfigParser(); diff --git a/exporter/src/test/java/io/zeebe/exporters/kafka/record/RecordHandlerTest.java b/exporter/src/test/java/io/zeebe/exporters/kafka/record/RecordHandlerTest.java index 57ee520..943118e 100644 --- a/exporter/src/test/java/io/zeebe/exporters/kafka/record/RecordHandlerTest.java +++ b/exporter/src/test/java/io/zeebe/exporters/kafka/record/RecordHandlerTest.java @@ -17,17 +17,19 @@ import static org.assertj.core.api.Assertions.assertThat; -import io.camunda.zeebe.protocol.jackson.record.DeploymentRecordValueBuilder; -import io.camunda.zeebe.protocol.jackson.record.RecordBuilder; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.camunda.zeebe.protocol.jackson.ZeebeProtocolModule; +import io.camunda.zeebe.protocol.record.ImmutableRecord; import io.camunda.zeebe.protocol.record.Record; import io.camunda.zeebe.protocol.record.RecordType; import io.camunda.zeebe.protocol.record.ValueType; import io.camunda.zeebe.protocol.record.intent.DeploymentIntent; -import io.camunda.zeebe.protocol.record.value.DeploymentRecordValue; +import io.camunda.zeebe.protocol.record.value.ImmutableDeploymentRecordValue; import io.zeebe.exporters.kafka.config.RecordConfig; import io.zeebe.exporters.kafka.config.RecordsConfig; import io.zeebe.exporters.kafka.serde.RecordId; -import java.nio.charset.StandardCharsets; import java.util.EnumSet; import java.util.Map; import org.apache.kafka.clients.producer.ProducerRecord; @@ -42,10 +44,10 @@ final class RecordHandlerTest { new RecordConfig(EnumSet.allOf(RecordType.class), "zeebe"); @Test - void shouldTransformRecord() { + void shouldTransformRecord() throws JsonProcessingException { // given - final Record record = - buildDeploymentRecord().recordType(RecordType.COMMAND).build(); + final ObjectMapper mapper = new ObjectMapper().registerModule(new ZeebeProtocolModule()); + final Record record = buildDeploymentRecord().withRecordType(RecordType.COMMAND).build(); final RecordConfig deploymentRecordConfig = new RecordConfig(EnumSet.allOf(RecordType.class), "topic"); final RecordHandler recordHandler = new RecordHandler(newRecordsConfig(RecordType.COMMAND)); @@ -57,14 +59,13 @@ void shouldTransformRecord() { assertThat(transformed.topic()).isEqualTo(deploymentRecordConfig.getTopic()); assertThat(transformed.key()) .isEqualTo(new RecordId(record.getPartitionId(), record.getPosition())); - assertThat(transformed.value()).isEqualTo(record.toJson().getBytes(StandardCharsets.UTF_8)); + assertThat(transformed.value()).isEqualTo(mapper.writeValueAsBytes(record)); } @Test void shouldTestRecordAsNotAllowed() { // given - final Record record = - buildDeploymentRecord().recordType(RecordType.COMMAND).build(); + final Record record = buildDeploymentRecord().withRecordType(RecordType.COMMAND).build(); final RecordHandler recordHandler = new RecordHandler(newRecordsConfig(RecordType.EVENT)); // when - then @@ -74,8 +75,7 @@ void shouldTestRecordAsNotAllowed() { @Test void shouldTestRecordAsAllowed() { // given - final Record record = - buildDeploymentRecord().recordType(RecordType.EVENT).build(); + final Record record = buildDeploymentRecord().withRecordType(RecordType.EVENT).build(); final RecordHandler recordHandler = new RecordHandler(newRecordsConfig(RecordType.EVENT)); // when - then @@ -87,14 +87,15 @@ private RecordsConfig newRecordsConfig(final RecordType allowedType) { return new RecordsConfig(Map.of(ValueType.DEPLOYMENT, recordConfig), DEFAULT_RECORD_CONFIG); } - private RecordBuilder buildDeploymentRecord() { - return new RecordBuilder() - .valueType(ValueType.DEPLOYMENT) - .recordType(RecordType.EVENT) - .timestamp(System.currentTimeMillis()) - .intent(DeploymentIntent.CREATE) - .value(new DeploymentRecordValueBuilder().build()) - .partitionId(1) - .position(1); + private ImmutableRecord.Builder buildDeploymentRecord() { + return ImmutableRecord.builder() + .withIntent(DeploymentIntent.CREATE) + .withRecordType(RecordType.EVENT) + .withValueType(ValueType.DEPLOYMENT) + .withPartitionId(1) + .withPosition(1) + .withValue(ImmutableDeploymentRecordValue + .builder() + .build()); } } diff --git a/pom.xml b/pom.xml index 5d74229..9195292 100644 --- a/pom.xml +++ b/pom.xml @@ -26,7 +26,7 @@ - 11 + 17 https://artifacts.camunda.com/artifactory/zeebe-io-snapshots/ @@ -51,8 +51,8 @@ 1.7.36 4.2.0 1.17.4 - 1.3.6 3.5.2 + 8.1.2 1.22 @@ -60,11 +60,10 @@ 3.10.1 3.2.0 3.2.0 - 1.11.1 - 3.1.2 + 1.11.2 + 3.3.0 3.1.0 3.0.0-M7 - 2.13 3.0.1 3.4.1 4.1 @@ -72,6 +71,7 @@ 1.0.6 0.14.7 3.0.0-M7 + 2.17.3 1.6.1 @@ -88,6 +88,7 @@ ${skipChecks} ${skipChecks} ${skipChecks} + ${skipChecks} ${skipChecks} @@ -115,6 +116,24 @@ pom + + io.camunda + zeebe-protocol-jackson + ${version.zeebe} + + + + io.camunda + zeebe-protocol-asserts + ${version.zeebe} + + + + io.camunda + zeebe-exporter-test + ${version.zeebe} + + org.testcontainers @@ -130,6 +149,12 @@ ${version.zeebe-test-container} + + io.camunda + zeebe-process-test-extension + ${version.zeebe} + + org.junit @@ -351,16 +376,17 @@ - com.coveo - fmt-maven-plugin - ${plugin.version.fmt} - - - - format - - - + com.diffplug.spotless + spotless-maven-plugin + ${plugin.version.spotless} + + + + 1.12.0 + + + + @@ -403,13 +429,6 @@ maven-dependency-plugin ${plugin.version.dependency} - - - org.apache.maven.shared - maven-dependency-analyzer - ${plugin.version.dependency-analyzer} - - @@ -497,11 +516,6 @@ maven-surefire-plugin - - com.coveo - fmt-maven-plugin - - org.apache.maven.plugins maven-enforcer-plugin diff --git a/qa/pom.xml b/qa/pom.xml index fcc1574..56fb3df 100644 --- a/qa/pom.xml +++ b/qa/pom.xml @@ -29,12 +29,6 @@ test - - io.camunda - zeebe-protocol-jackson - test - - com.fasterxml.jackson.core jackson-core diff --git a/qa/src/test/java/io/zeebe/exporters/kafka/qa/DebugHttpExporterClient.java b/qa/src/test/java/io/zeebe/exporters/kafka/qa/DebugHttpExporterClient.java index ce502a9..84f4a94 100644 --- a/qa/src/test/java/io/zeebe/exporters/kafka/qa/DebugHttpExporterClient.java +++ b/qa/src/test/java/io/zeebe/exporters/kafka/qa/DebugHttpExporterClient.java @@ -18,7 +18,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; -import io.camunda.zeebe.protocol.jackson.record.AbstractRecord; import io.camunda.zeebe.protocol.record.Record; import java.io.IOException; import java.io.UncheckedIOException; @@ -42,7 +41,7 @@ final class DebugHttpExporterClient { private static final ObjectReader READER = - new ObjectMapper().readerFor(new TypeReference>>() {}); + new ObjectMapper().readerFor(new TypeReference>>() {}); private final URL serverUrl; @@ -53,7 +52,7 @@ final class DebugHttpExporterClient { Stream> streamRecords() { try { // the HTTP exporter returns records in reversed order, so flip them before returning - final List> records = READER.readValue(serverUrl); + final List> records = READER.readValue(serverUrl); Collections.reverse(records); return records.stream().map(r -> r); diff --git a/serde/pom.xml b/serde/pom.xml index 469596d..88d08a0 100644 --- a/serde/pom.xml +++ b/serde/pom.xml @@ -22,11 +22,6 @@ - - io.camunda - zeebe-protocol-jackson - - com.fasterxml.jackson.core jackson-databind @@ -49,6 +44,11 @@ + + io.camunda + zeebe-protocol-jackson + + io.camunda zeebe-protocol diff --git a/serde/src/main/java/io/zeebe/exporters/kafka/serde/RecordDeserializer.java b/serde/src/main/java/io/zeebe/exporters/kafka/serde/RecordDeserializer.java index 76bb524..93af2aa 100644 --- a/serde/src/main/java/io/zeebe/exporters/kafka/serde/RecordDeserializer.java +++ b/serde/src/main/java/io/zeebe/exporters/kafka/serde/RecordDeserializer.java @@ -18,7 +18,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; -import io.camunda.zeebe.protocol.jackson.record.AbstractRecord; +import io.camunda.zeebe.protocol.jackson.ZeebeProtocolModule; import io.camunda.zeebe.protocol.record.Record; import org.apache.kafka.common.serialization.Deserializer; @@ -31,11 +31,11 @@ public final class RecordDeserializer extends JacksonDeserializer> { public RecordDeserializer() { - this(new ObjectMapper()); + this(new ObjectMapper().registerModule(new ZeebeProtocolModule())); } public RecordDeserializer(final ObjectMapper objectMapper) { - this(objectMapper.readerFor(new TypeReference>() {})); + this(objectMapper.registerModule(new ZeebeProtocolModule()).readerFor(new TypeReference>() {})); } public RecordDeserializer(final ObjectReader objectReader) { diff --git a/serde/src/main/java/io/zeebe/exporters/kafka/serde/RecordSerializer.java b/serde/src/main/java/io/zeebe/exporters/kafka/serde/RecordSerializer.java index 6ccf097..93865dd 100644 --- a/serde/src/main/java/io/zeebe/exporters/kafka/serde/RecordSerializer.java +++ b/serde/src/main/java/io/zeebe/exporters/kafka/serde/RecordSerializer.java @@ -18,7 +18,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; -import io.camunda.zeebe.protocol.jackson.record.AbstractRecord; +import io.camunda.zeebe.protocol.jackson.ZeebeProtocolModule; import io.camunda.zeebe.protocol.record.Record; import org.apache.kafka.common.serialization.Serializer; @@ -34,11 +34,11 @@ */ public final class RecordSerializer extends JacksonSerializer> { public RecordSerializer() { - this(new ObjectMapper()); + this(new ObjectMapper().registerModule(new ZeebeProtocolModule())); } protected RecordSerializer(final ObjectMapper objectMapper) { - this(objectMapper.writerFor(new TypeReference>() {})); + this(objectMapper.registerModule(new ZeebeProtocolModule()).writerFor(new TypeReference>() {})); } protected RecordSerializer(final ObjectWriter writer) { diff --git a/serde/src/test/java/io/zeebe/exporters/kafka/serde/RecordTest.java b/serde/src/test/java/io/zeebe/exporters/kafka/serde/RecordTest.java index efb671d..9dcf6a9 100644 --- a/serde/src/test/java/io/zeebe/exporters/kafka/serde/RecordTest.java +++ b/serde/src/test/java/io/zeebe/exporters/kafka/serde/RecordTest.java @@ -19,13 +19,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.cbor.databind.CBORMapper; -import io.camunda.zeebe.protocol.jackson.record.DeploymentRecordValueBuilder; -import io.camunda.zeebe.protocol.jackson.record.RecordBuilder; +import io.camunda.zeebe.protocol.record.ImmutableRecord; import io.camunda.zeebe.protocol.record.Record; import io.camunda.zeebe.protocol.record.RecordType; import io.camunda.zeebe.protocol.record.ValueType; import io.camunda.zeebe.protocol.record.intent.DeploymentIntent; -import io.camunda.zeebe.protocol.record.value.DeploymentRecordValue; +import io.camunda.zeebe.protocol.record.value.ImmutableDeploymentRecordValue; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.ExecutionMode; @@ -37,13 +36,15 @@ final class RecordTest { @Test void shouldSerialize() { // given + ImmutableDeploymentRecordValue.builder().build(); final Record record = - new RecordBuilder() - .intent(DeploymentIntent.CREATED) - .recordType(RecordType.EVENT) - .valueType(ValueType.DEPLOYMENT) - .value(new DeploymentRecordValueBuilder().build()) - .build(); + ImmutableRecord.builder() + .withIntent(DeploymentIntent.CREATED) + .withRecordType(RecordType.EVENT) + .withValueType(ValueType.DEPLOYMENT) + .withValue(ImmutableDeploymentRecordValue.builder().build()) + .build(); + final RecordSerializer serializer = new RecordSerializer(); final RecordDeserializer deserializer = new RecordDeserializer(); @@ -62,12 +63,12 @@ void shouldSerializeOtherFormat() { // given final ObjectMapper cborMapper = new CBORMapper(); final Record record = - new RecordBuilder() - .intent(DeploymentIntent.CREATED) - .recordType(RecordType.EVENT) - .valueType(ValueType.DEPLOYMENT) - .value(new DeploymentRecordValueBuilder().build()) - .build(); + ImmutableRecord.builder() + .withIntent(DeploymentIntent.CREATED) + .withRecordType(RecordType.EVENT) + .withValueType(ValueType.DEPLOYMENT) + .withValue(ImmutableDeploymentRecordValue.builder().build()) + .build(); final RecordSerializer serializer = new RecordSerializer(cborMapper); final RecordDeserializer deserializer = new RecordDeserializer(cborMapper);