Skip to content

Commit

Permalink
[Fix apache#3763] Properly reading compressdata string header (apache…
Browse files Browse the repository at this point in the history
…#3764)

* [Fix apache#3763] Properly reading compressdata string header

* [Fix apache#3763] Adding unit test
  • Loading branch information
fjtirado authored and rgdoliveira committed Nov 7, 2024
1 parent cf64c1d commit cbe9c66
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,16 @@ public MultipleProcessInstanceDataEvent(URI source, Collection<ProcessInstanceDa
}

public boolean isCompressed() {
Object extension = getExtension(MultipleProcessInstanceDataEvent.COMPRESS_DATA);
return extension instanceof Boolean ? ((Boolean) extension).booleanValue() : false;
return isCompressed(getExtension(MultipleProcessInstanceDataEvent.COMPRESS_DATA));
}

public static boolean isCompressed(Object extension) {
if (extension instanceof Boolean) {
return ((Boolean) extension).booleanValue();
} else if (extension instanceof String) {
return Boolean.parseBoolean((String) extension);
}
return false;
}

public void setCompressed(boolean compressed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,20 @@

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.core.data.PojoCloudEventData.ToBytes;

public class MultipleProcessDataInstanceConverterFactory {

private MultipleProcessDataInstanceConverterFactory() {
}

public static ToBytes<Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> toCloudEvent(MultipleProcessInstanceDataEvent event, ObjectMapper objectMapper) {
if (MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(event.getDataContentType())) {
return event.isCompressed() ? compressedToBytes : binaryToBytes;
} else {
return objectMapper::writeValueAsBytes;
}
}

public static Converter<CloudEventData, Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> fromCloudEvent(CloudEvent cloudEvent, ObjectMapper objectMapper) {
if (MultipleProcessInstanceDataEvent.BINARY_CONTENT_TYPE.equals(cloudEvent.getDataContentType())) {
return isCompressed(cloudEvent) ? compressedConverter : binaryConverter;
Expand All @@ -49,10 +57,13 @@ public static Converter<CloudEventData, Collection<ProcessInstanceDataEvent<? ex
}

private static boolean isCompressed(CloudEvent event) {
Object value = event.getExtension(MultipleProcessInstanceDataEvent.COMPRESS_DATA);
return value instanceof Boolean ? ((Boolean) value).booleanValue() : false;
return MultipleProcessInstanceDataEvent.isCompressed(event.getExtension(MultipleProcessInstanceDataEvent.COMPRESS_DATA));
}

private static ToBytes<Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> compressedToBytes = data -> serialize(data, true);

private static ToBytes<Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> binaryToBytes = data -> serialize(data, false);

private static Converter<CloudEventData, Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>>> binaryConverter =
data -> deserialize(data, false);

Expand All @@ -62,4 +73,9 @@ private static boolean isCompressed(CloudEvent event) {
private static Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> deserialize(CloudEventData data, boolean compress) throws IOException {
return MultipleProcessInstanceDataEventDeserializer.readFromBytes(Base64.getDecoder().decode(data.toBytes()), compress);
}

private static byte[] serialize(Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> data,
boolean compress) throws IOException {
return Base64.getEncoder().encode(MultipleProcessInstanceDataEventSerializer.dataAsBytes(data, compress));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ public void serialize(MultipleProcessInstanceDataEvent value, JsonGenerator gen,
if (compress) {
gen.writeBooleanField(MultipleProcessInstanceDataEvent.COMPRESS_DATA, true);
}
gen.writeBinaryField("data", dataAsBytes(gen, value.getData(), compress));
gen.writeBinaryField("data", dataAsBytes(value.getData(), compress));
gen.writeEndObject();
} else {
defaultSerializer.serialize(value, gen, serializers);
}
}

private byte[] dataAsBytes(JsonGenerator gen, Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> data, boolean compress) throws IOException {
static byte[] dataAsBytes(Collection<ProcessInstanceDataEvent<? extends KogitoMarshallEventSupport>> data, boolean compress) throws IOException {
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
try (DataOutputStream out = new DataOutputStream(compress ? new GZIPOutputStream(bytesOut) : bytesOut)) {
logger.trace("Writing size {}", data.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import org.junit.jupiter.api.Test;
Expand All @@ -40,8 +43,17 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.data.BytesCloudEventData;
import io.cloudevents.core.data.PojoCloudEventData;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
import io.cloudevents.jackson.JsonFormat;
import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;

import static org.assertj.core.api.Assertions.assertThat;
import static org.kie.kogito.event.process.KogitoEventBodySerializationHelper.toDate;
Expand Down Expand Up @@ -130,14 +142,103 @@ void processInstanceDataEvent() throws Exception {
@Test
void multipleInstanceDataEvent() throws IOException {
JsonNode expectedVarValue = OBJECT_MAPPER.createObjectNode().put("name", "John Doe");
int standard = processMultipleInstanceDataEvent(expectedVarValue, false, false);
int binary = processMultipleInstanceDataEvent(expectedVarValue, true, false);
int binaryCompressed = processMultipleInstanceDataEvent(expectedVarValue, true, true);
assertThat(standard).isGreaterThan(binary);
assertThat(binary).isGreaterThan(binaryCompressed);
processMultipleInstanceDataEvent(expectedVarValue, false, false, this::serializeAsStructured);
processMultipleInstanceDataEvent(expectedVarValue, true, false, this::serializeAsStructured);
processMultipleInstanceDataEvent(expectedVarValue, true, true, this::serializeAsStructured);
processMultipleInstanceDataEvent(expectedVarValue, false, false, this::serializeAsBinary);
processMultipleInstanceDataEvent(expectedVarValue, true, false, this::serializeAsBinary);
processMultipleInstanceDataEvent(expectedVarValue, true, true, this::serializeAsBinary);
}

private int processMultipleInstanceDataEvent(JsonNode expectedVarValue, boolean binary, boolean compress) throws IOException {
private MultipleProcessInstanceDataEvent serializeAsStructured(MultipleProcessInstanceDataEvent event) throws IOException {
return OBJECT_MAPPER.readValue(OBJECT_MAPPER.writeValueAsBytes(event), MultipleProcessInstanceDataEvent.class);
}

private record CloudEventHolder(Map<String, String> headers, byte[] data) {
}

private static class TestMessageWriter implements CloudEventWriter<CloudEventHolder>, MessageWriter<CloudEventWriter<CloudEventHolder>, CloudEventHolder> {

private Map<String, String> headers = new HashMap<>();
private byte[] value;

@Override
public TestMessageWriter create(SpecVersion version) throws CloudEventRWException {
headers.put("specversion", version.toString());
return this;
}

@Override
public CloudEventHolder setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
this.value = value;
return this.end();
}

@Override
public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
headers.put(name, value);
return this;
}

@Override
public CloudEventHolder end(CloudEventData data) throws CloudEventRWException {
this.value = data.toBytes();
return this.end();
}

@Override
public CloudEventHolder end() throws CloudEventRWException {
return new CloudEventHolder(headers, value);
}
}

private class TestMessageReader extends BaseGenericBinaryMessageReaderImpl<String, String> {
private Map<String, String> headers;

protected TestMessageReader(SpecVersion version, CloudEventHolder body) {
super(version, body.data() == null ? null : BytesCloudEventData.wrap(body.data()));
this.headers = body.headers();
}

@Override
protected boolean isContentTypeHeader(String key) {
return false;
}

@Override
protected boolean isCloudEventsHeader(String key) {
return true;
}

@Override
protected String toCloudEventsKey(String key) {
return key;
}

@Override
protected void forEachHeader(BiConsumer<String, String> fn) {
headers.forEach(fn);
}

@Override
protected String toCloudEventsValue(String value) {
return value;
}

}

private MultipleProcessInstanceDataEvent serializeAsBinary(MultipleProcessInstanceDataEvent event) throws IOException {
CloudEvent toSerialize = event.asCloudEvent(value -> PojoCloudEventData.wrap(value, MultipleProcessDataInstanceConverterFactory.toCloudEvent(event, OBJECT_MAPPER)));
CloudEventHolder holder = new TestMessageWriter().writeBinary(toSerialize);
CloudEvent deserialized = new TestMessageReader(SpecVersion.V1, holder).toEvent();
return DataEventFactory.from(new MultipleProcessInstanceDataEvent(), deserialized, MultipleProcessDataInstanceConverterFactory.fromCloudEvent(deserialized, OBJECT_MAPPER));
}

private static interface CheckedUnaryOperator<T> {
T apply(T obj) throws IOException;
}

private void processMultipleInstanceDataEvent(JsonNode expectedVarValue, boolean binary, boolean compress, CheckedUnaryOperator<MultipleProcessInstanceDataEvent> operator) throws IOException {
ProcessInstanceStateDataEvent stateEvent = new ProcessInstanceStateDataEvent();
setBaseEventValues(stateEvent, ProcessInstanceStateDataEvent.STATE_TYPE);
stateEvent.setData(ProcessInstanceStateEventBody.create().eventDate(toDate(TIME)).eventType(EVENT_TYPE).eventUser(SUBJECT)
Expand Down Expand Up @@ -185,20 +286,9 @@ private int processMultipleInstanceDataEvent(JsonNode expectedVarValue, boolean
event.setCompressed(compress);
}

byte[] json = OBJECT_MAPPER.writeValueAsBytes(event);
logger.info("Serialized chunk size is {}", json.length);

// cloud event structured mode check
MultipleProcessInstanceDataEvent deserializedEvent = OBJECT_MAPPER.readValue(json, MultipleProcessInstanceDataEvent.class);
assertThat(deserializedEvent.getData()).hasSize(event.getData().size());
assertMultipleIntance(deserializedEvent, expectedVarValue);

// cloud event binary mode check
CloudEvent cloudEvent = OBJECT_MAPPER.readValue(json, CloudEvent.class);
deserializedEvent = DataEventFactory.from(new MultipleProcessInstanceDataEvent(), cloudEvent, MultipleProcessDataInstanceConverterFactory.fromCloudEvent(cloudEvent, OBJECT_MAPPER));
MultipleProcessInstanceDataEvent deserializedEvent = operator.apply(event);
assertThat(deserializedEvent.getData()).hasSize(event.getData().size());
assertMultipleIntance(deserializedEvent, expectedVarValue);
return json.length;
}

private void assertMultipleIntance(MultipleProcessInstanceDataEvent deserializedEvent, JsonNode expectedVarValue) {
Expand Down

0 comments on commit cbe9c66

Please sign in to comment.