diff --git a/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java b/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java index 86a42aab1..402b5a43e 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java +++ b/api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java @@ -18,12 +18,12 @@ import dev.cel.common.types.StructType; import dev.cel.compiler.CelCompiler; import dev.cel.compiler.CelCompilerFactory; +import dev.cel.extensions.CelExtensions; import dev.cel.parser.CelStandardMacro; import dev.cel.runtime.CelEvaluationException; import dev.cel.runtime.CelRuntime; import dev.cel.runtime.CelRuntimeFactory; import io.kafbat.ui.exception.CelException; -import io.kafbat.ui.model.MessageFilterTypeDTO; import io.kafbat.ui.model.TopicMessageDTO; import java.util.HashMap; import java.util.Map; @@ -42,8 +42,7 @@ public class MessageFilters { private static final String CEL_RECORD_TYPE_NAME = TopicMessageDTO.class.getSimpleName(); private static final CelCompiler CEL_COMPILER = createCompiler(); - private static final CelRuntime CEL_RUNTIME = CelRuntimeFactory.standardCelRuntimeBuilder() - .build(); + private static final CelRuntime CEL_RUNTIME = createRuntime(); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -143,6 +142,7 @@ private static CelCompiler createCompiler() { return CelCompilerFactory.standardCelCompilerBuilder() .setOptions(CelOptions.DEFAULT) .setStandardMacros(CelStandardMacro.STANDARD_MACROS) + .addLibraries(CelExtensions.strings(), CelExtensions.encoders()) .addVar(CEL_RECORD_VAR_NAME, recordType) .setResultType(SimpleType.BOOL) .setTypeProvider(new CelTypeProvider() { @@ -159,6 +159,12 @@ public Optional findType(String typeName) { .build(); } + private static CelRuntime createRuntime() { + return CelRuntimeFactory.standardCelRuntimeBuilder() + .addLibraries(CelExtensions.strings(), CelExtensions.encoders()) + .build(); + } + @Nullable private static Object parseToJsonOrReturnAsIs(@Nullable String str) { if (str == null) { diff --git a/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java b/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java index 7cdcf00f6..cae8629eb 100644 --- a/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java +++ b/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java @@ -10,10 +10,11 @@ import io.kafbat.ui.exception.CelException; import io.kafbat.ui.model.TopicMessageDTO; import java.time.OffsetDateTime; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; +import java.util.Base64; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.function.Predicate; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.Nested; @@ -100,7 +101,7 @@ void canCheckTimestampMs() { var ts = OffsetDateTime.now(); var f = celScriptFilter("record.timestampMs == " + ts.toInstant().toEpochMilli()); assertTrue(f.test(msg().timestamp(ts))); - assertFalse(f.test(msg().timestamp(ts.plus(1L, ChronoUnit.SECONDS)))); + assertFalse(f.test(msg().timestamp(ts.plusSeconds(1L)))); } @Test @@ -177,6 +178,7 @@ void filterSpeedIsAtLeast5kPerSec() { toFilter.add(msg().content(jsonContent).key(randString)); } // first iteration for warmup + // noinspection ResultOfMethodCallIgnored toFilter.stream().filter(f).count(); long before = System.currentTimeMillis(); @@ -188,10 +190,15 @@ void filterSpeedIsAtLeast5kPerSec() { } } + @Test + void testBase64DecodingWorks() { + var uuid = UUID.randomUUID().toString(); + var msg = "test." + Base64.getEncoder().encodeToString(uuid.getBytes()); + var f = celScriptFilter("string(base64.decode(record.value.split('.')[1])).contains('" + uuid + "')"); + assertTrue(f.test(msg().content(msg))); + } + private TopicMessageDTO msg() { - return new TopicMessageDTO() - .timestamp(OffsetDateTime.now()) - .offset(-1L) - .partition(1); + return new TopicMessageDTO(1, -1L, OffsetDateTime.now()); } }