diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java index cb818cffeb..b7b95619bc 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChain.java @@ -38,8 +38,11 @@ public RecordDispatcherMutatorChain(final RecordDispatcher next, final CloudEven @Override public Future dispatch(ConsumerRecord record) { - return next.dispatch( - KafkaConsumerRecordUtils.copyRecordAssigningValue(record, cloudEventMutator.apply(record))); + final var n = cloudEventMutator.apply(record); + if (n == record.value()) { + return next.dispatch(record); + } + return next.dispatch(KafkaConsumerRecordUtils.copyRecordAssigningValue(record, n)); } @Override diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java index b70fbf367b..ccfb0edbf4 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutator.java @@ -35,6 +35,9 @@ public CloudEventOverridesMutator(final DataPlaneContract.CloudEventOverrides cl @Override public CloudEvent apply(ConsumerRecord record) { + if (record.value() instanceof InvalidCloudEvent) { + return record.value(); + } final var builder = CloudEventBuilder.from(record.value()); applyKafkaMetadata(builder, record.partition(), record.offset()); applyCloudEventOverrides(builder); diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChainTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChainTest.java index 4b00532d84..f504dca99b 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChainTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherMutatorChainTest.java @@ -16,6 +16,7 @@ package dev.knative.eventing.kafka.broker.dispatcher.impl; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.mock; @@ -24,12 +25,14 @@ import static org.mockito.Mockito.when; import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher; +import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEvent; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import io.vertx.core.Future; import java.net.URI; import java.time.OffsetDateTime; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; @@ -74,4 +77,22 @@ public void shouldCloseInner() { assertThat(succeeded.succeeded()).isTrue(); verify(next, times(1)).close(); } + + @Test + public void shouldNotThrowOnInvalidCloudEvent() { + final var next = mock(RecordDispatcher.class); + + final var called = new AtomicInteger(0); + final var given = new InvalidCloudEvent(null); + final var chain = new RecordDispatcherMutatorChain(next, in -> { + assertThat(in.value()).isSameAs(given); + called.incrementAndGet(); + return given; + }); + + final var givenRecord = new ConsumerRecord<>("t1", 0, 0, (Object) "abc", (CloudEvent) given); + + assertThatCode(() -> chain.dispatch(givenRecord)).doesNotThrowAnyException(); + assertThat(called.get()).isEqualTo(1); + } } diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java index a96ec0580e..a0661cb77c 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/CloudEventOverridesMutatorTest.java @@ -16,8 +16,10 @@ package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; +import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import java.net.URI; import java.time.OffsetDateTime; @@ -56,6 +58,30 @@ public void shouldAddExtensions() { assertThat(got).isEqualTo(expected.build()); } + @Test + public void shouldNotThrowOnInvalidCloudEvent() { + final var extensions = Map.of( + "a", "foo", + "b", "bar"); + final var ceOverrides = DataPlaneContract.CloudEventOverrides.newBuilder() + .putAllExtensions(extensions) + .build(); + + final var mutator = new CloudEventOverridesMutator(ceOverrides); + + final var given = new InvalidCloudEvent(null); + + CloudEvent ce = null; + assertThatCode(() -> { + mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given)); + }) + .doesNotThrowAnyException(); + + final var got = mutator.apply(new ConsumerRecord<>("test-topic", 1, 1, "key", given)); + + assertThat(got).isSameAs(given); + } + @Test public void shouldAddKafkaExtensionsWhenNoOverrides() { final var ceOverrides = DataPlaneContract.CloudEventOverrides.newBuilder()