Skip to content

Commit

Permalink
Update to version 2.0.0-milestone4 of the CloudEvents SDK. (#75)
Browse files Browse the repository at this point in the history
The principal changes are that the payload of an event is now `CloudEventData`
instead of `byte[]`, which can save on copying; and that there is now better
support for reading and writing HTTP events.

A workaround was needed for a bug that has since been fixed, in
cloudevents/sdk-java#259.
  • Loading branch information
eamonnmcmanus authored Nov 24, 2020
1 parent 25a3f3f commit 0921c6c
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 157 deletions.
2 changes: 1 addition & 1 deletion functions-framework-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-api</artifactId>
<version>2.0.0-milestone2</version>
<version>2.0.0-milestone4</version>
</dependency>
</dependencies>

Expand Down
10 changes: 5 additions & 5 deletions invoker/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-api</artifactId>
<version>2.0.0-milestone2</version>
<artifactId>cloudevents-core</artifactId>
<version>2.0.0-milestone4</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>2.0.0-milestone2</version>
<artifactId>cloudevents-http-basic</artifactId>
<version>2.0.0-milestone4</version>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-json-jackson</artifactId>
<version>2.0.0-milestone2</version>
<version>2.0.0-milestone4</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@
import com.google.gson.TypeAdapter;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.message.impl.UnknownEncodingMessageReader;
import io.cloudevents.http.HttpMessageFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
import java.lang.reflect.Type;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.servlet.http.HttpServlet;
Expand Down Expand Up @@ -256,7 +256,9 @@ void serviceLegacyEvent(Event legacyEvent) throws Exception {
@Override
void serviceCloudEvent(CloudEvent cloudEvent) throws Exception {
Context context = contextFromCloudEvent(cloudEvent);
String jsonData = cloudEvent.getData() == null ? "{}" : new String(cloudEvent.getData(), UTF_8);
String jsonData = (cloudEvent.getData() == null)
? "{}"
: new String(cloudEvent.getData().toBytes(), UTF_8);
function.accept(jsonData, context);
}
}
Expand Down Expand Up @@ -286,7 +288,7 @@ void serviceLegacyEvent(Event legacyEvent) throws Exception {
@Override
void serviceCloudEvent(CloudEvent cloudEvent) throws Exception {
if (cloudEvent.getData() != null) {
String data = new String(cloudEvent.getData(), UTF_8);
String data = new String(cloudEvent.getData().toBytes(), UTF_8);
T payload = new Gson().fromJson(data, type);
Context context = contextFromCloudEvent(cloudEvent);
function.accept(payload, context);
Expand Down Expand Up @@ -345,22 +347,27 @@ private enum CloudEventKind {BINARY, STRUCTURED}
private <CloudEventT> void serviceCloudEvent(HttpServletRequest req) throws Exception {
@SuppressWarnings("unchecked")
FunctionExecutor<CloudEventT> executor = (FunctionExecutor<CloudEventT>) functionExecutor;
Map<String, List<String>> headers = CloudEventsServletBinaryMessageReader.headerMap(req);
byte[] body = req.getInputStream().readAllBytes();
List<String> listOfNull = Collections.singletonList(null);
MessageReader reader = MessageUtils.parseStructuredOrBinaryMessage(
() -> headers.getOrDefault("content-type", listOfNull).get(0),
format -> new GenericStructuredMessageReader(format, body),
() -> headers.getOrDefault("ce-specversion", listOfNull).get(0),
unusedSpecVersion -> CloudEventsServletBinaryMessageReader.from(req, body),
UnknownEncodingMessageReader::new);
MessageReader reader = HttpMessageFactory.createReaderFromMultimap(headerMap(req), body);
// It's important not to set the context ClassLoader earlier, because MessageUtils will use
// ServiceLoader.load(EventFormat.class) to find a handler to deserialize a binary CloudEvent
// and if it finds something from the function ClassLoader then that something will implement
// the EventFormat interface as defined by that ClassLoader rather than ours. Then ServiceLoader.load
// will throw ServiceConfigurationError. At this point we're still running with the default
// context ClassLoader, which is the system ClassLoader that has loaded the code here.
runWithContextClassLoader(() -> executor.serviceCloudEvent(reader.toEvent()));
runWithContextClassLoader(() -> executor.serviceCloudEvent(reader.toEvent(data -> data)));
// The data->data is a workaround for a bug fixed since Milestone 4 of the SDK, in
// https://github.com/cloudevents/sdk-java/pull/259.
}

private static Map<String, List<String>> headerMap(HttpServletRequest req) {
Map<String, List<String>> headerMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
for (String header : Collections.list(req.getHeaderNames())) {
for (String value : Collections.list(req.getHeaders(header))) {
headerMap.computeIfAbsent(header, unused -> new ArrayList<>()).add(value);
}
}
return headerMap;
}

private void serviceLegacyEvent(HttpServletRequest req) throws Exception {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void pubSubBinaryData() throws IOException {
public void pubSubWrapping() throws IOException {
Event legacyEvent = legacyEventForResource("legacy_pubsub.json");
CloudEvent cloudEvent = GcfEvents.convertToCloudEvent(legacyEvent);
assertThat(new String(cloudEvent.getData(), UTF_8))
assertThat(new String(cloudEvent.getData().toBytes(), UTF_8))
.isEqualTo("{\"message\":{\"@type\":\"type.googleapis.com/google.pubsub.v1.PubsubMessage\","
+ "\"attributes\":{\"attribute1\":\"value1\"},"
+ "\"data\":\"VGhpcyBpcyBhIHNhbXBsZSBtZXNzYWdl\"}}");
Expand All @@ -221,7 +221,8 @@ public void pubSubWrapping() throws IOException {
public void firestoreWildcards() throws IOException {
Event legacyEvent = legacyEventForResource("firestore_simple.json");
CloudEvent cloudEvent = GcfEvents.convertToCloudEvent(legacyEvent);
JsonObject payload = new Gson().fromJson(new String(cloudEvent.getData(), UTF_8), JsonObject.class);
JsonObject payload =
new Gson().fromJson(new String(cloudEvent.getData().toBytes(), UTF_8), JsonObject.class);
JsonObject wildcards = payload.getAsJsonObject("wildcards");
assertThat(wildcards.keySet()).containsExactly("doc");
assertThat(wildcards.getAsJsonPrimitive("doc").getAsString()).isEqualTo("2Vm2mI1d0wIaK2Waj5to");
Expand All @@ -236,7 +237,7 @@ private Event legacyEventForResource(String resourceName) throws IOException {
}

private static Map<String, Object> cloudEventDataJson(CloudEvent cloudEvent) {
String data = new String(cloudEvent.getData(), UTF_8);
String data = new String(cloudEvent.getData().toBytes(), UTF_8);
@SuppressWarnings("unchecked")
Map<String, Object> map = new Gson().fromJson(data, Map.class);
return map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,11 @@
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.http.HttpMessageFactory;
import io.cloudevents.jackson.JsonFormat;
import io.cloudevents.rw.CloudEventWriter;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
Expand All @@ -63,6 +60,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentProvider;
Expand Down Expand Up @@ -353,11 +351,13 @@ private void backgroundTest(String target) throws Exception {

// A CloudEvent using the "binary content mode", where the metadata is in HTTP headers and the
// payload is the body of the HTTP request.
BinaryWriter binaryWriter = new BinaryWriter();
Map<String, String> headers = binaryWriter.writeBinary(sampleCloudEvent(snoopFile));
Map<String, String> headers = new TreeMap<>();
AtomicReference<byte[]> bodyRef = new AtomicReference<>();
HttpMessageFactory.createWriter(headers::put, bodyRef::set)
.writeBinary(sampleCloudEvent(snoopFile));
TestCase cloudEventsBinaryTestCase = TestCase.builder()
.setSnoopFile(snoopFile)
.setRequestText(new String(binaryWriter.body, UTF_8))
.setRequestText(new String(bodyRef.get(), UTF_8))
.setHttpContentType(headers.get("Content-Type"))
.setHttpHeaders(ImmutableMap.copyOf(headers))
.setExpectedJson(cloudEventExpectedJson)
Expand Down Expand Up @@ -388,11 +388,13 @@ public void nativeCloudEvent() throws Exception {

// A CloudEvent using the "binary content mode", where the metadata is in HTTP headers and the
// payload is the body of the HTTP request.
BinaryWriter binaryWriter = new BinaryWriter();
Map<String, String> headers = binaryWriter.writeBinary(cloudEvent);
Map<String, String> headers = new TreeMap<>();
AtomicReference<byte[]> bodyRef = new AtomicReference<>();
HttpMessageFactory.createWriter(headers::put, bodyRef::set)
.writeBinary(sampleCloudEvent(snoopFile));
TestCase cloudEventsBinaryTestCase = TestCase.builder()
.setSnoopFile(snoopFile)
.setRequestText(new String(binaryWriter.body, UTF_8))
.setRequestText(new String(bodyRef.get(), UTF_8))
.setHttpContentType(headers.get("Content-Type"))
.setHttpHeaders(ImmutableMap.copyOf(headers))
.setExpectedJson(cloudEventJsonObject)
Expand Down Expand Up @@ -688,57 +690,4 @@ private void monitorOutput(
throw new UncheckedIOException(e);
}
}

// I might be missing something, but as far as I can tell the V2 SDK forces us to go through all this
// rigmarole just so we can tell what HTTP headers should be set for a Binary CloudEvent. With the
// V1 SDK it was much simpler.
// https://github.com/cloudevents/sdk-java/issues/212
private static class BinaryWriter
implements MessageWriter<CloudEventWriter<Map<String, String>>, Map<String, String>> {

private static final Map<String, String> ATTRIBUTES_TO_HEADERS =
MessageUtils.generateAttributesToHeadersMapping(v ->
v.equals("datacontenttype") ? "Content-Type" : ("ce-" + v));

final Map<String, String> headers = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
byte[] body;

@Override
public CloudEventWriter<Map<String, String>> create(SpecVersion version) {
headers.put("ce-specversion", version.toString());
return new EventWriter();
}

@Override
public Map<String, String> setEvent(EventFormat format, byte[] bytes) {
throw new UnsupportedOperationException("Only binary events supported, not structured");
}

private class EventWriter implements CloudEventWriter<Map<String, String>> {
@Override
public Map<String, String> end(byte[] bytes) {
body = bytes;
return headers;
}

@Override
public Map<String, String> end() {
return end(new byte[0]);
}

@Override
public void setAttribute(String name, String value) {
if (ATTRIBUTES_TO_HEADERS.containsKey(name)) {
headers.put(ATTRIBUTES_TO_HEADERS.get(name), value);
} else {
throw new IllegalArgumentException("Unknown attribute: " + name);
}
}

@Override
public void setExtension(String name, String value) {
headers.put("ce-" + name, value);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class CloudEventSnoop implements ExperimentalCloudEventsFunction {
@Override
public void accept(CloudEvent event) throws Exception {
String payloadJson = new String(event.getData(), UTF_8);
String payloadJson = new String(event.getData().toBytes(), UTF_8);
Gson gson = new Gson();
JsonObject jsonObject = gson.fromJson(payloadJson, JsonObject.class);
String targetFile = jsonObject.get("targetFile").getAsString();
Expand Down
2 changes: 1 addition & 1 deletion invoker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
<dependency>
<groupId>com.google.cloud.functions</groupId>
<artifactId>functions-framework-api</artifactId>
<version>1.0.2</version>
<version>1.0.3-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down

0 comments on commit 0921c6c

Please sign in to comment.