Skip to content

Commit

Permalink
Improve stream metadata serialization.
Browse files Browse the repository at this point in the history
  • Loading branch information
YoEight committed Oct 10, 2023
1 parent 3fc8c1d commit 8b185d2
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,4 @@
/**
* Common access control list (ACL) interface.
*/
public interface Acl {
Object serialize();
}
public interface Acl {}
21 changes: 0 additions & 21 deletions db-client-java/src/main/java/com/eventstore/dbclient/Acls.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.eventstore.dbclient;

import java.util.HashMap;

/**
* Access control list (ACL) utility class.
*/
Expand Down Expand Up @@ -33,23 +31,4 @@ public static Acl newUserStreamAcl() {
public static Acl newSystemStreamAcl() {
return SystemStreamAcl.getInstance();
}

@SuppressWarnings("unchecked")
static Acl deserialize(Object source) {
Acl acl = null;

if (source != null) {
if (source instanceof HashMap) {
acl = StreamAcl.deserialize((HashMap<String, Object>) source);
} else if (source instanceof String) {
String str = (String) source;
acl = UserStreamAcl.deserialize(str);
acl = acl == null ? SystemStreamAcl.deserialize(str) : acl;
} else {
throw new RuntimeException("Unsupported type for ACL: " + source.getClass());
}
}

return acl;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package com.eventstore.dbclient;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public final class CustomAclCodec {
public static class ListSerializer extends JsonSerializer<List<String>> {
@Override
public void serialize(List<String> value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
if (value.size() == 1) {
gen.writeString(value.get(0));
} else {
gen.writeStartArray();

for (String s: value) {
gen.writeString(s);
}

gen.writeEndArray();
}
}
}

public static class ListDeserializer extends JsonDeserializer<List<String>> {
@Override
public List<String> deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
if (p.isExpectedStartArrayToken()) {
List<String> list = new ArrayList<>();

while (!p.nextToken().isStructEnd()) {
list.add(p.getValueAsString());
}

return list;
}

return Collections.singletonList(p.getValueAsString());
}
}

public static class Serializer extends JsonSerializer<Acl> {
@Override
public void serialize(Acl value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
if (value instanceof SystemStreamAcl) {
gen.writeString(SystemStreamAcl.ACL_NAME);
return;
}

if (value instanceof UserStreamAcl) {
gen.writeString(UserStreamAcl.ACL_NAME);
return;
}

gen.writePOJO(value);
}
}

public static class Deserializer extends JsonDeserializer<Acl> {
@Override
public Acl deserialize(JsonParser p, DeserializationContext ctx) throws IOException {
if (p.currentToken() == JsonToken.VALUE_STRING) {
String value = p.getText();

if (value.equals(UserStreamAcl.ACL_NAME))
return UserStreamAcl.getInstance();

if (value.equals(SystemStreamAcl.ACL_NAME)) {
return SystemStreamAcl.getInstance();
}

throw new IOException(String.format("Unknown ACL type '%s'", value));
}

return p.readValueAs(StreamAcl.class);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package com.eventstore.dbclient;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.json.JsonMapper;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/**
* Represents EventStoreDB client for stream operations. A client instance maintains a two-way communication to EventStoreDB.
Expand Down Expand Up @@ -97,9 +95,14 @@ public CompletableFuture<WriteResult> setStreamMetadata(String streamName, Strea
* @return a write result if successful.
*/
public CompletableFuture<WriteResult> setStreamMetadata(String streamName, AppendToStreamOptions options, StreamMetadata metadata) {
EventData event = EventDataBuilder.json("$metadata", metadata.serialize()).build();
JsonMapper mapper = new JsonMapper();

return appendToStream("$$" + streamName, options, event);
try {
EventData event = EventDataBuilder.json("$metadata", mapper.writeValueAsBytes(metadata)).build();
return appendToStream("$$" + streamName, options, event);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

/**
Expand Down Expand Up @@ -132,32 +135,6 @@ public Publisher<ReadMessage> readStreamReactive(String streamName, ReadStreamOp
return new ReadStream(this.getGrpcClient(), streamName, options);
}

private static <A, B> Publisher<B> publisherMap(final Publisher<A> parent, Function<A, B> fun) {
return sub -> {
parent.subscribe(new Subscriber<A>() {
@Override
public void onSubscribe(org.reactivestreams.Subscription s) {
sub.onSubscribe(s);
}

@Override
public void onNext(A a) {
sub.onNext(fun.apply(a));
}

@Override
public void onError(Throwable t) {
sub.onError(t);
}

@Override
public void onComplete() {
sub.onComplete();
}
});
};
}

/**
* Reads stream's metadata.
* @param streamName stream's name.
Expand All @@ -181,10 +158,8 @@ public CompletableFuture<StreamMetadata> getStreamMetadata(String streamName, Re

try {
JsonMapper mapper = new JsonMapper();
@SuppressWarnings("unchecked")
HashMap<String, Object> source = mapper.readValue(event.getEventData(), HashMap.class);

out.complete(StreamMetadata.deserialize(source));
StreamMetadata metadata = mapper.readValue(event.getEventData(), StreamMetadata.class);
out.complete(metadata);
} catch (Throwable e) {
out.completeExceptionally(e);
}
Expand Down
100 changes: 35 additions & 65 deletions db-client-java/src/main/java/com/eventstore/dbclient/StreamAcl.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,41 @@
package com.eventstore.dbclient;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;

import java.util.*;

/**
* Stream-related access control list (ACL).
*/
@JsonInclude(JsonInclude.Include.NON_NULL)
public class StreamAcl implements Acl {
private ArrayList<String> readRoles;
private ArrayList<String> writeRoles;
private ArrayList<String> deleteRoles;
private ArrayList<String> metaReadRoles;
private ArrayList<String> metaWriteRoles;
@JsonProperty("$r")
@JsonSerialize(using = CustomAclCodec.ListSerializer.class)
@JsonDeserialize(using = CustomAclCodec.ListDeserializer.class)
private List<String> readRoles;

@JsonProperty("$w")
@JsonSerialize(using = CustomAclCodec.ListSerializer.class)
@JsonDeserialize(using = CustomAclCodec.ListDeserializer.class)
private List<String> writeRoles;

@JsonProperty("$d")
@JsonSerialize(using = CustomAclCodec.ListSerializer.class)
@JsonDeserialize(using = CustomAclCodec.ListDeserializer.class)
private List<String> deleteRoles;

@JsonProperty("$mr")
@JsonSerialize(using = CustomAclCodec.ListSerializer.class)
@JsonDeserialize(using = CustomAclCodec.ListDeserializer.class)
private List<String> metaReadRoles;

@JsonProperty("$mw")
@JsonSerialize(using = CustomAclCodec.ListSerializer.class)
@JsonDeserialize(using = CustomAclCodec.ListDeserializer.class)
private List<String> metaWriteRoles;

/**
* Adds read roles.
Expand Down Expand Up @@ -67,93 +92,38 @@ public StreamAcl addMetaWriteRoles(String... roles) {
return this;
}

private static void serializeRoles(HashMap<String, Object> output, String key, ArrayList<String> target) {
if (target == null)
return;

if (target.size() == 1) {
output.put(key, target.get(0));
} else {
output.put(key, target);
}
}

@SuppressWarnings("unchecked")
private static ArrayList<String> deserializeRoles(HashMap<String, Object> source, String key) {
ArrayList<String> list = null;
Object value = source.get(key);

if (value != null) {
list = new ArrayList<>();

if (value instanceof String) {
list.add((String) value);
} else if (value instanceof ArrayList) {
list.addAll((ArrayList<String>) value);
} else {
throw new RuntimeException("Unsupported role type: " + value.getClass());
}
}

return list;
}

@Override
public Object serialize() {
HashMap<String, Object> output = new HashMap<>();
serializeRoles(output, "$r", this.readRoles);
serializeRoles(output, "$w", this.writeRoles);
serializeRoles(output, "$d", this.deleteRoles);
serializeRoles(output, "$mr", this.metaReadRoles);
serializeRoles(output, "$mw", this.metaWriteRoles);

return output;
}

static StreamAcl deserialize(HashMap<String, Object> source) {
StreamAcl acl = new StreamAcl();

acl.readRoles = deserializeRoles(source, "$r");
acl.writeRoles = deserializeRoles(source, "$w");
acl.deleteRoles = deserializeRoles(source, "$d");
acl.metaReadRoles = deserializeRoles(source, "$mr");
acl.metaWriteRoles = deserializeRoles(source, "$mw");

return acl;
}

/**
* Returns read roles.
*/
public ArrayList<String> getReadRoles() {
public List<String> getReadRoles() {
return readRoles;
}

/**
* Returns write roles.
*/
public ArrayList<String> getWriteRoles() {
public List<String> getWriteRoles() {
return writeRoles;
}

/**
* Returns delete roles.
*/
public ArrayList<String> getDeleteRoles() {
public List<String> getDeleteRoles() {
return deleteRoles;
}

/**
* Return metadata read roles.
*/
public ArrayList<String> getMetaReadRoles() {
public List<String> getMetaReadRoles() {
return metaReadRoles;
}

/**
* Return metadata write roles.
*/
public ArrayList<String> getMetaWriteRoles() {
public List<String> getMetaWriteRoles() {
return metaWriteRoles;
}

Expand Down
Loading

0 comments on commit 8b185d2

Please sign in to comment.