From 8b185d285d56030f499f60ade283ebeaf84b4de3 Mon Sep 17 00:00:00 2001 From: YoEight Date: Fri, 6 Oct 2023 12:59:02 -0400 Subject: [PATCH] Improve stream metadata serialization. --- .../java/com/eventstore/dbclient/Acl.java | 4 +- .../java/com/eventstore/dbclient/Acls.java | 21 ---- .../eventstore/dbclient/CustomAclCodec.java | 87 ++++++++++++++ .../dbclient/EventStoreDBClient.java | 45 ++----- .../com/eventstore/dbclient/StreamAcl.java | 100 ++++++---------- .../eventstore/dbclient/StreamMetadata.java | 113 ++++++------------ .../eventstore/dbclient/SystemStreamAcl.java | 7 +- .../eventstore/dbclient/UserStreamAcl.java | 7 +- .../dbclient/misc/OfflineMetadataTests.java | 25 ++-- .../dbclient/streams/MetadataTests.java | 18 ++- 10 files changed, 193 insertions(+), 234 deletions(-) create mode 100644 db-client-java/src/main/java/com/eventstore/dbclient/CustomAclCodec.java diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/Acl.java b/db-client-java/src/main/java/com/eventstore/dbclient/Acl.java index e7eb50a0..ef849da0 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/Acl.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/Acl.java @@ -3,6 +3,4 @@ /** * Common access control list (ACL) interface. */ -public interface Acl { - Object serialize(); -} +public interface Acl {} diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/Acls.java b/db-client-java/src/main/java/com/eventstore/dbclient/Acls.java index 647c677b..c0ac1a79 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/Acls.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/Acls.java @@ -1,7 +1,5 @@ package com.eventstore.dbclient; -import java.util.HashMap; - /** * Access control list (ACL) utility class. */ @@ -33,23 +31,4 @@ public static Acl newUserStreamAcl() { public static Acl newSystemStreamAcl() { return SystemStreamAcl.getInstance(); } - - @SuppressWarnings("unchecked") - static Acl deserialize(Object source) { - Acl acl = null; - - if (source != null) { - if (source instanceof HashMap) { - acl = StreamAcl.deserialize((HashMap) source); - } else if (source instanceof String) { - String str = (String) source; - acl = UserStreamAcl.deserialize(str); - acl = acl == null ? SystemStreamAcl.deserialize(str) : acl; - } else { - throw new RuntimeException("Unsupported type for ACL: " + source.getClass()); - } - } - - return acl; - } } diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/CustomAclCodec.java b/db-client-java/src/main/java/com/eventstore/dbclient/CustomAclCodec.java new file mode 100644 index 00000000..37e55c74 --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/CustomAclCodec.java @@ -0,0 +1,87 @@ +package com.eventstore.dbclient; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public final class CustomAclCodec { + public static class ListSerializer extends JsonSerializer> { + @Override + public void serialize(List value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + if (value.size() == 1) { + gen.writeString(value.get(0)); + } else { + gen.writeStartArray(); + + for (String s: value) { + gen.writeString(s); + } + + gen.writeEndArray(); + } + } + } + + public static class ListDeserializer extends JsonDeserializer> { + @Override + public List deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + if (p.isExpectedStartArrayToken()) { + List list = new ArrayList<>(); + + while (!p.nextToken().isStructEnd()) { + list.add(p.getValueAsString()); + } + + return list; + } + + return Collections.singletonList(p.getValueAsString()); + } + } + + public static class Serializer extends JsonSerializer { + @Override + public void serialize(Acl value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + if (value instanceof SystemStreamAcl) { + gen.writeString(SystemStreamAcl.ACL_NAME); + return; + } + + if (value instanceof UserStreamAcl) { + gen.writeString(UserStreamAcl.ACL_NAME); + return; + } + + gen.writePOJO(value); + } + } + + public static class Deserializer extends JsonDeserializer { + @Override + public Acl deserialize(JsonParser p, DeserializationContext ctx) throws IOException { + if (p.currentToken() == JsonToken.VALUE_STRING) { + String value = p.getText(); + + if (value.equals(UserStreamAcl.ACL_NAME)) + return UserStreamAcl.getInstance(); + + if (value.equals(SystemStreamAcl.ACL_NAME)) { + return SystemStreamAcl.getInstance(); + } + + throw new IOException(String.format("Unknown ACL type '%s'", value)); + } + + return p.readValueAs(StreamAcl.class); + } + } +} diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreDBClient.java b/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreDBClient.java index 6c243e8a..137afd6f 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreDBClient.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreDBClient.java @@ -1,16 +1,14 @@ package com.eventstore.dbclient; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.json.JsonMapper; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.function.Function; /** * Represents EventStoreDB client for stream operations. A client instance maintains a two-way communication to EventStoreDB. @@ -97,9 +95,14 @@ public CompletableFuture setStreamMetadata(String streamName, Strea * @return a write result if successful. */ public CompletableFuture setStreamMetadata(String streamName, AppendToStreamOptions options, StreamMetadata metadata) { - EventData event = EventDataBuilder.json("$metadata", metadata.serialize()).build(); + JsonMapper mapper = new JsonMapper(); - return appendToStream("$$" + streamName, options, event); + try { + EventData event = EventDataBuilder.json("$metadata", mapper.writeValueAsBytes(metadata)).build(); + return appendToStream("$$" + streamName, options, event); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } } /** @@ -132,32 +135,6 @@ public Publisher readStreamReactive(String streamName, ReadStreamOp return new ReadStream(this.getGrpcClient(), streamName, options); } - private static Publisher publisherMap(final Publisher parent, Function fun) { - return sub -> { - parent.subscribe(new Subscriber() { - @Override - public void onSubscribe(org.reactivestreams.Subscription s) { - sub.onSubscribe(s); - } - - @Override - public void onNext(A a) { - sub.onNext(fun.apply(a)); - } - - @Override - public void onError(Throwable t) { - sub.onError(t); - } - - @Override - public void onComplete() { - sub.onComplete(); - } - }); - }; - } - /** * Reads stream's metadata. * @param streamName stream's name. @@ -181,10 +158,8 @@ public CompletableFuture getStreamMetadata(String streamName, Re try { JsonMapper mapper = new JsonMapper(); - @SuppressWarnings("unchecked") - HashMap source = mapper.readValue(event.getEventData(), HashMap.class); - - out.complete(StreamMetadata.deserialize(source)); + StreamMetadata metadata = mapper.readValue(event.getEventData(), StreamMetadata.class); + out.complete(metadata); } catch (Throwable e) { out.completeExceptionally(e); } diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/StreamAcl.java b/db-client-java/src/main/java/com/eventstore/dbclient/StreamAcl.java index 13d9886d..3c83f614 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/StreamAcl.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/StreamAcl.java @@ -1,16 +1,41 @@ package com.eventstore.dbclient; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + import java.util.*; /** * Stream-related access control list (ACL). */ +@JsonInclude(JsonInclude.Include.NON_NULL) public class StreamAcl implements Acl { - private ArrayList readRoles; - private ArrayList writeRoles; - private ArrayList deleteRoles; - private ArrayList metaReadRoles; - private ArrayList metaWriteRoles; + @JsonProperty("$r") + @JsonSerialize(using = CustomAclCodec.ListSerializer.class) + @JsonDeserialize(using = CustomAclCodec.ListDeserializer.class) + private List readRoles; + + @JsonProperty("$w") + @JsonSerialize(using = CustomAclCodec.ListSerializer.class) + @JsonDeserialize(using = CustomAclCodec.ListDeserializer.class) + private List writeRoles; + + @JsonProperty("$d") + @JsonSerialize(using = CustomAclCodec.ListSerializer.class) + @JsonDeserialize(using = CustomAclCodec.ListDeserializer.class) + private List deleteRoles; + + @JsonProperty("$mr") + @JsonSerialize(using = CustomAclCodec.ListSerializer.class) + @JsonDeserialize(using = CustomAclCodec.ListDeserializer.class) + private List metaReadRoles; + + @JsonProperty("$mw") + @JsonSerialize(using = CustomAclCodec.ListSerializer.class) + @JsonDeserialize(using = CustomAclCodec.ListDeserializer.class) + private List metaWriteRoles; /** * Adds read roles. @@ -67,93 +92,38 @@ public StreamAcl addMetaWriteRoles(String... roles) { return this; } - private static void serializeRoles(HashMap output, String key, ArrayList target) { - if (target == null) - return; - - if (target.size() == 1) { - output.put(key, target.get(0)); - } else { - output.put(key, target); - } - } - - @SuppressWarnings("unchecked") - private static ArrayList deserializeRoles(HashMap source, String key) { - ArrayList list = null; - Object value = source.get(key); - - if (value != null) { - list = new ArrayList<>(); - - if (value instanceof String) { - list.add((String) value); - } else if (value instanceof ArrayList) { - list.addAll((ArrayList) value); - } else { - throw new RuntimeException("Unsupported role type: " + value.getClass()); - } - } - - return list; - } - - @Override - public Object serialize() { - HashMap output = new HashMap<>(); - serializeRoles(output, "$r", this.readRoles); - serializeRoles(output, "$w", this.writeRoles); - serializeRoles(output, "$d", this.deleteRoles); - serializeRoles(output, "$mr", this.metaReadRoles); - serializeRoles(output, "$mw", this.metaWriteRoles); - - return output; - } - - static StreamAcl deserialize(HashMap source) { - StreamAcl acl = new StreamAcl(); - - acl.readRoles = deserializeRoles(source, "$r"); - acl.writeRoles = deserializeRoles(source, "$w"); - acl.deleteRoles = deserializeRoles(source, "$d"); - acl.metaReadRoles = deserializeRoles(source, "$mr"); - acl.metaWriteRoles = deserializeRoles(source, "$mw"); - - return acl; - } - /** * Returns read roles. */ - public ArrayList getReadRoles() { + public List getReadRoles() { return readRoles; } /** * Returns write roles. */ - public ArrayList getWriteRoles() { + public List getWriteRoles() { return writeRoles; } /** * Returns delete roles. */ - public ArrayList getDeleteRoles() { + public List getDeleteRoles() { return deleteRoles; } /** * Return metadata read roles. */ - public ArrayList getMetaReadRoles() { + public List getMetaReadRoles() { return metaReadRoles; } /** * Return metadata write roles. */ - public ArrayList getMetaWriteRoles() { + public List getMetaWriteRoles() { return metaWriteRoles; } diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/StreamMetadata.java b/db-client-java/src/main/java/com/eventstore/dbclient/StreamMetadata.java index a1a4ca17..cc11858b 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/StreamMetadata.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/StreamMetadata.java @@ -1,6 +1,9 @@ package com.eventstore.dbclient; -import java.util.HashMap; +import com.fasterxml.jackson.annotation.*; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + import java.util.Map; import java.util.Objects; @@ -8,25 +11,39 @@ * Represents stream metadata with strongly typed properties for system values and a dictionary-like interface for * custom values. */ +@JsonInclude(JsonInclude.Include.NON_NULL) public class StreamMetadata { - private Integer maxAge; - private Integer truncateBefore; - private Integer cacheControl; + @JsonProperty("$maxAge") + private Long maxAge; + + @JsonProperty("$tb") + private Long truncateBefore; + + @JsonProperty("$cacheControl") + private Long cacheControl; + + @JsonProperty("$acl") + @JsonSerialize(using = CustomAclCodec.Serializer.class) + @JsonDeserialize(using = CustomAclCodec.Deserializer.class) private Acl acl; - private Integer maxCount; - private HashMap customProperties; + + @JsonProperty("$maxCount") + private Long maxCount; + + @JsonAnySetter + private Map customProperties; /** * The maximum age of events allowed in the stream. */ - public Integer getMaxAge() { + public Long getMaxAge() { return maxAge; } /** * The maximum age of events allowed in the stream. */ - public void setMaxAge(Integer maxAge) { + public void setMaxAge(Long maxAge) { this.maxAge = maxAge; } @@ -34,7 +51,7 @@ public void setMaxAge(Integer maxAge) { * The event number from which previous events can be scavenged. This is used to implement deletion of * streams. */ - public Integer getTruncateBefore() { + public Long getTruncateBefore() { return truncateBefore; } @@ -42,21 +59,21 @@ public Integer getTruncateBefore() { * The event number from which previous events can be scavenged. This is used to implement deletion of * streams. */ - public void setTruncateBefore(Integer truncateBefore) { + public void setTruncateBefore(Long truncateBefore) { this.truncateBefore = truncateBefore; } /** * The amount of time for which the stream head is cacheable (in seconds). */ - public Integer getCacheControl() { + public Long getCacheControl() { return cacheControl; } /** * The amount of time for which the stream head is cacheable (in seconds). */ - public void setCacheControl(Integer cacheControl) { + public void setCacheControl(Long cacheControl) { this.cacheControl = cacheControl; } @@ -77,94 +94,32 @@ public void setAcl(Acl acl) { /** * The maximum number of events allowed in the stream. */ - public Integer getMaxCount() { + public Long getMaxCount() { return maxCount; } /** * The maximum number of events allowed in the stream. */ - public void setMaxCount(Integer maxCount) { + public void setMaxCount(Long maxCount) { this.maxCount = maxCount; } /** * An enumerable of key-value pairs of keys to JSON text for user-provider metadata. */ - public HashMap getCustomProperties() { + @JsonAnyGetter + public Map getCustomProperties() { return customProperties; } /** * An enumerable of key-value pairs of keys to JSON text for user-provider metadata. */ - public void setCustomProperties(HashMap customProperties) { + public void setCustomProperties(Map customProperties) { this.customProperties = customProperties; } - static private void insertValue(HashMap output, String key, Object value) { - if (value != null) { - output.put(key, value); - } - } - - public Object serialize() { - HashMap output = new HashMap<>(); - - insertValue(output, "$maxAge", this.maxAge); - insertValue(output, "$maxCount", this.maxCount); - insertValue(output, "$tb", this.truncateBefore); - insertValue(output, "$cacheControl", this.cacheControl); - - if (this.acl != null) { - insertValue(output, "$acl", this.acl.serialize()); - } - - if (this.customProperties != null) { - this.customProperties.forEach((key, value) -> { - if (key.startsWith("$")) - return; - - insertValue(output, key, value); - }); - } - - return output; - } - - public static StreamMetadata deserialize(HashMap source) { - StreamMetadata metadata = new StreamMetadata(); - HashMap customProperties = null; - - for (Map.Entry entry : source.entrySet()) { - switch (entry.getKey()) { - case "$maxAge": - metadata.setMaxAge((Integer) entry.getValue()); - break; - case "$maxCount": - metadata.setMaxCount((Integer) entry.getValue()); - break; - case "$tb": - metadata.setTruncateBefore((Integer) entry.getValue()); - break; - case "$cacheControl": - metadata.setCacheControl((Integer) entry.getValue()); - break; - case "$acl": - metadata.setAcl(Acls.deserialize(entry.getValue())); - break; - default: - customProperties = customProperties == null ? new HashMap<>() : customProperties; - customProperties.put(entry.getKey(), entry.getValue()); - break; - } - } - - metadata.setCustomProperties(customProperties); - - return metadata; - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/SystemStreamAcl.java b/db-client-java/src/main/java/com/eventstore/dbclient/SystemStreamAcl.java index ba1d63ee..4e2adb51 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/SystemStreamAcl.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/SystemStreamAcl.java @@ -4,7 +4,7 @@ * Admin stream access control list (ACL). */ class SystemStreamAcl implements Acl { - private static final String ACL_NAME = "$systemStreamAcl"; + public static final String ACL_NAME = "$systemStreamAcl"; private static final SystemStreamAcl SINGLETON = new SystemStreamAcl(); private SystemStreamAcl() {} @@ -22,11 +22,6 @@ static SystemStreamAcl getInstance() { return SINGLETON; } - @Override - public Object serialize() { - return ACL_NAME; - } - @Override public int hashCode() { return ACL_NAME.hashCode(); diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/UserStreamAcl.java b/db-client-java/src/main/java/com/eventstore/dbclient/UserStreamAcl.java index b54a5a86..f0180d75 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/UserStreamAcl.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/UserStreamAcl.java @@ -4,7 +4,7 @@ * Default user stream access control list (ACL). */ class UserStreamAcl implements Acl { - private static final String ACL_NAME = "$userStreamAcl"; + public static final String ACL_NAME = "$userStreamAcl"; private static final UserStreamAcl SINGLETON = new UserStreamAcl(); private UserStreamAcl() {} @@ -22,11 +22,6 @@ static UserStreamAcl getInstance() { return SINGLETON; } - @Override - public Object serialize() { - return ACL_NAME; - } - @Override public int hashCode() { return ACL_NAME.hashCode(); diff --git a/db-client-java/src/test/java/com/eventstore/dbclient/misc/OfflineMetadataTests.java b/db-client-java/src/test/java/com/eventstore/dbclient/misc/OfflineMetadataTests.java index c6ff7b97..f71ecf94 100644 --- a/db-client-java/src/test/java/com/eventstore/dbclient/misc/OfflineMetadataTests.java +++ b/db-client-java/src/test/java/com/eventstore/dbclient/misc/OfflineMetadataTests.java @@ -9,7 +9,6 @@ import java.util.HashMap; -@SuppressWarnings("unchecked") public class OfflineMetadataTests { @Test public void testSerializationIsoMorphism() throws Throwable { @@ -18,10 +17,10 @@ public void testSerializationIsoMorphism() throws Throwable { custom.put("foo", "bar"); - expected.setMaxAge(2); - expected.setCacheControl(15); - expected.setTruncateBefore(1); - expected.setMaxCount(12); + expected.setMaxAge(2L); + expected.setCacheControl(15L); + expected.setTruncateBefore(1L); + expected.setMaxCount(12L); Acl acl = Acls.newStreamAcl() .addReadRoles("admin") @@ -34,9 +33,7 @@ public void testSerializationIsoMorphism() throws Throwable { expected.setCustomProperties(custom); ObjectMapper mapper = new ObjectMapper(); - HashMap source = mapper.readValue(mapper.writeValueAsString(expected.serialize()), HashMap.class); - StreamMetadata actual = StreamMetadata.deserialize(source); - + StreamMetadata actual = mapper.readValue(mapper.writeValueAsString(expected), StreamMetadata.class); Assertions.assertEquals(expected, actual); } @@ -45,15 +42,13 @@ public void testSerializationIsoMorphism() throws Throwable { public void testNullAcl() throws Throwable { StreamMetadata expected = new StreamMetadata(); - expected.setMaxAge(2); - expected.setCacheControl(15); - expected.setTruncateBefore(1); - expected.setMaxCount(12); + expected.setMaxAge(2L); + expected.setCacheControl(15L); + expected.setTruncateBefore(1L); + expected.setMaxCount(12L); ObjectMapper mapper = new ObjectMapper(); - HashMap source = mapper.readValue(mapper.writeValueAsString(expected.serialize()), HashMap.class); - StreamMetadata actual = StreamMetadata.deserialize(source); - + StreamMetadata actual = mapper.readValue(mapper.writeValueAsString(expected), StreamMetadata.class); Assertions.assertEquals(expected, actual); } diff --git a/db-client-java/src/test/java/com/eventstore/dbclient/streams/MetadataTests.java b/db-client-java/src/test/java/com/eventstore/dbclient/streams/MetadataTests.java index 32d963e0..b24b2949 100644 --- a/db-client-java/src/test/java/com/eventstore/dbclient/streams/MetadataTests.java +++ b/db-client-java/src/test/java/com/eventstore/dbclient/streams/MetadataTests.java @@ -13,10 +13,10 @@ default void testSetStreamMetadata() throws Throwable { StreamMetadata metadata = new StreamMetadata(); - metadata.setMaxAge(2); - metadata.setCacheControl(15); - metadata.setTruncateBefore(1); - metadata.setMaxCount(12); + metadata.setMaxAge(2L); + metadata.setCacheControl(15L); + metadata.setTruncateBefore(1L); + metadata.setMaxCount(12L); Acl acl = Acls.newStreamAcl() .addReadRoles("admin") @@ -49,4 +49,14 @@ default void testReadNoExistingMetadata() throws Throwable { Assertions.assertEquals(new StreamMetadata(), got); } + + @Test + default void testReadMetadataAfterStreamDeletion() throws Throwable { + EventStoreDBClient client = getDatabase().defaultClient(); + String streamName = generateName(); + client.appendToStream(streamName, EventDataBuilder.json("bar", new HashMap()).build()).get(); + + client.deleteStream(streamName).get(); + client.getStreamMetadata(streamName).get(); + } } \ No newline at end of file