Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pal 1023 s3 interceptor client #67

Merged
merged 27 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c195c85
PAL-1023-s3-interceptor-client initial commit
dev930018 May 24, 2021
378c444
PAL-1023 s3 server stands up and returns pair of example resources
dev930018 May 27, 2021
17c7655
Merge branch 'develop' of https://github.com/gchq/Palisade-clients in…
dev930018 Jun 15, 2021
562ad87
PAL-1023 added client acting as s3-api interface, tested against spark
dev930018 Jun 18, 2021
cae85a4
PAL-1023 put on hold without completing work for spark compatibility
dev930018 Jun 22, 2021
1b57822
PAL-1023 minor message change and port updates from testing
dev930018 Jun 23, 2021
1536f4b
Merge branch 'develop' of https://github.com/gchq/Palisade-clients in…
dev930018 Jun 24, 2021
877421a
PAL-1023 updated pom description for s3 client
dev930018 Jun 24, 2021
a2379c2
PAL-1023 Abstract- prefix for abstract class
dev930018 Jun 24, 2021
3cf3e69
PAL-1023 added config for service endpoints
dev930018 Jun 30, 2021
d839d93
PAL-1023 sonarqube code smells, mostly lambda types and documentation
dev930018 Jun 30, 2021
bf65e4b
PAL-1023 checkstyle fixes for docs
dev930018 Jun 30, 2021
6239c12
PAL-1023 import order
dev930018 Jun 30, 2021
74287b6
PAL-1023 import order and unused imports
dev930018 Jun 30, 2021
0644803
PAL-1023 further code-smells
dev930018 Jun 30, 2021
9d1c1a5
Merge branch 'develop' of https://github.com/gchq/Palisade-clients in…
dev930018 Jul 1, 2021
ce740e8
PAL-1023 documentation for code review
dev930018 Jul 6, 2021
a28a1eb
PAL-1023 better publisher subscription to stop tests hanging
dev930018 Jul 6, 2021
9184467
PAL-1023 removed unused imports
dev930018 Jul 6, 2021
7ad799a
PAL-1023 Update client-s3/src/main/java/uk/gov/gchq/palisade/client/s…
dev930018 Jul 6, 2021
25e4aab
PAL-1023 update in-line with Abstract- codesmell fix
dev930018 Jul 6, 2021
d9a5c41
Merge branch 'PAL-1023-s3-interceptor-client' of https://github.com/g…
dev930018 Jul 6, 2021
f722dc7
PAL-1023 dispose of disposable in effort to consume and close stream
dev930018 Jul 6, 2021
ef5c939
PAL-1023 added comment on why test is disabled, as well as another ef…
dev930018 Jul 7, 2021
6892cb8
PAL-1023 unused imports
dev930018 Jul 7, 2021
2cc96eb
PAL-1023 fixed test assertions
dev930018 Jul 7, 2021
d2b1924
PAL-1023 removed comment on disabled test
dev930018 Jul 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
public class AkkaClient implements Client {
private static final ObjectMapper MAPPER = new ObjectMapper();

/**
* Specify URL schemes based on whether SSL/TLS encryption should be used over-the-wire.
* Ingress to the cluster may use TLS, while cluster-internal comms may use plaintext.
*/
public enum SSLMode {
NONE("http", "ws"),
SSL_TLS("https", "wss");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

import io.micronaut.runtime.server.EmbeddedServer;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.reactivestreams.FlowAdapters;
import org.slf4j.Logger;
Expand Down Expand Up @@ -85,6 +82,8 @@ void testWithDownloadOutsideStream() throws Exception {

}

// RxJava .subscribe(..) returns an unused disposable
@SuppressWarnings("ResultOfMethodCallIgnored")
@Test
void testWithDownloadInsideStream() throws Exception {

Expand All @@ -99,10 +98,7 @@ void testWithDownloadInsideStream() throws Exception {
.filter(m -> m.getType().equals(ItemType.RESOURCE))
.map(session::fetch)
.timeout(10, TimeUnit.SECONDS)
.subscribe(new FlowableSubscriber<>() {

@Override
public void onNext(final Download t) {
.subscribe((final Download t) -> {
LOGGER.debug("## Got message: {}", t);
try (var is = t.getInputStream()) {
LOGGER.debug("## reading bytes");
Expand All @@ -113,25 +109,6 @@ public void onNext(final Download t) {
LOGGER.error("Got error reading input stream into byte array", e);
throw new IllegalStateException("Got error reading input stream into byte array", e);
}
}

@Override
public void onError(final Throwable t) {
LOGGER.error("## Error: {}", t.getMessage());
Assertions.fail("Failed due to:" + t.getMessage());
}

@Override
public void onComplete() {
LOGGER.debug("## complete");

}

@Override
public void onSubscribe(final org.reactivestreams.@NonNull Subscription s) {
s.request(Long.MAX_VALUE);
LOGGER.debug("## Subscribed");
}
});
});
}
}
4 changes: 4 additions & 0 deletions client-s3/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
-->

# <img src="../logos/logo.svg" width="180">

## A Tool for Complex and Scalable Data Access Policy Enforcement

> :information_source: ***Work In Progress***
> There are some issues between what Spark expects and what the server returns for a `GetObject`.
> The server returns a valid AVRO object (which can be successfully read if written to a file).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,25 @@
import uk.gov.gchq.palisade.Generated;
import uk.gov.gchq.palisade.client.akka.AkkaClient;
import uk.gov.gchq.palisade.client.akka.AkkaClient.SSLMode;
import uk.gov.gchq.palisade.client.s3.config.EndpointConfiguration.ClientMap;
import uk.gov.gchq.palisade.client.s3.config.ApplicationConfiguration.ClientMap;
import uk.gov.gchq.palisade.client.s3.repository.ContentLengthRepository;
import uk.gov.gchq.palisade.client.s3.repository.PersistenceLayer;
import uk.gov.gchq.palisade.client.s3.repository.ResourceRepository;
import uk.gov.gchq.palisade.client.s3.web.AkkaHttpServer;
import uk.gov.gchq.palisade.client.s3.web.DynamicS3ServerApi;
import uk.gov.gchq.palisade.client.s3.web.RouteSupplier;
import uk.gov.gchq.palisade.client.s3.web.S3ServerApi;

import java.net.InetAddress;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;

/**
* Spring bean dependency injection graph
*/
@Configuration
@EnableConfigurationProperties(ClientMap.class)
public class EndpointConfiguration {
public class ApplicationConfiguration {

@ConfigurationProperties(prefix = "web")
static class ClientMap {
Expand All @@ -53,6 +56,12 @@ public Map<String, String> getClient() {
return client;
}

/**
* Get a single url for a service name.
*
* @param key the name of a service
* @return the URL for that service
*/
@Generated
public String getClient(final String key) {
return client.get(key);
Expand Down Expand Up @@ -94,7 +103,7 @@ AkkaClient akkaClient(final ActorSystem actorSystem, final ClientMap clientMap)

@Bean
RouteSupplier s3ServerApi(final AkkaClient akkaClient, final Materializer materializer, final PersistenceLayer persistenceLayer) {
return new S3ServerApi(akkaClient, materializer, persistenceLayer);
return new DynamicS3ServerApi(akkaClient, materializer, persistenceLayer);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,25 @@
import java.util.Arrays;
import java.util.List;

public class JacksonXmlSupport {
/**
* Configure Jackson for XML serialisation and deserialisation.
* S3 is built on XML for message request/response types, whereas Jackson defaults to JSON.
*/
public final class JacksonXmlSupport {
private static final List<MediaType> XML_MEDIA_TYPES = Arrays.asList(MediaTypes.APPLICATION_XML, MediaTypes.TEXT_XML);
private static final ObjectMapper DEFAULT_XML_MAPPER = new XmlMapper()
.enable(SerializationFeature.WRAP_ROOT_VALUE);

private JacksonXmlSupport() {
// Hide public constructor for static-method-only class
}

/**
* Create a Jackson {@link Marshaller} for serialising to the XML media-type.
*
* @param <T> the domain type for the marshaller
* @return a new marshaller for converting objects to XML
*/
public static <T> Marshaller<T, RequestEntity> marshaller() {
return Marshaller.wrapEntity(
JacksonXmlSupport::toXML,
Expand All @@ -43,11 +57,25 @@ public static <T> Marshaller<T, RequestEntity> marshaller() {
);
}

/**
* Create a Jackson {@link Unmarshaller} for deserialising from the XML media-type.
*
* @param expectedType the expected type to deserialise the XML into
* @param <T> the domain type for the unmarshaller
* @return a new unmarshaller for converting XML to objects
*/
public static <T> Unmarshaller<HttpEntity, T> unmarshaller(final Class<T> expectedType) {
return Unmarshaller.forMediaTypes(XML_MEDIA_TYPES, Unmarshaller.entityToString())
.thenApply(xml -> fromXML(xml, expectedType));
}

/**
* Convert a Java object to XML.
*
* @param object the Java object to convert
* @param <T> the type of the Java object
* @return a {@link String} of XML data representing the serialised object
*/
private static <T> String toXML(final T object) {
try {
return JacksonXmlSupport.DEFAULT_XML_MAPPER.writeValueAsString(object);
Expand All @@ -56,6 +84,14 @@ private static <T> String toXML(final T object) {
}
}

/**
* Convert an XML String to a Java object.
*
* @param xml the Java object to convert
* @param expectedType the expected type to deserialise the XML into
* @param <T> the type of the Java object
* @return a Java object representing the deserialisation of the XML data
*/
private static <T> T fromXML(final String xml, final Class<T> expectedType) {
try {
return JacksonXmlSupport.DEFAULT_XML_MAPPER.readerFor(expectedType).readValue(xml);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.StringJoiner;


/**
* S3 model for the 'CanonicalUser' XML schema.
*/
public class CanonicalUser {

@JsonProperty(value = "ID", required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
import java.util.List;
import java.util.StringJoiner;

/**
* S3 model for the 'ListBucketResult' XML schema.
*/
// Schema specifies field 'isTruncated', not 'truncated'
@SuppressWarnings("java:S2047")
@JacksonXmlRootElement(localName = "ListBucketResult", namespace = "http://s3.amazonaws.com/doc/2006-03-01/")
public class ListBucketResult {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.Date;
import java.util.StringJoiner;

/**
* S3 model for the 'ListEntry' XML schema.
*/
public class ListEntry {

@JsonProperty(value = "Key", required = true)
Expand Down Expand Up @@ -61,12 +64,12 @@ public void setLastModified(final Date lastModified) {
}

@Generated
public String geteTag() {
public String getETag() {
return eTag;
}

@Generated
public void seteTag(final String eTag) {
public void setETag(final String eTag) {
this.eTag = eTag;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.StringJoiner;


/**
* S3 model for the 'MetadataEntry' XML schema.
*/
public class MetadataEntry {

@JsonProperty(value = "Name", required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

import java.util.StringJoiner;

/**
* S3 model for the 'PrefixEntry' XML schema.
*/
public class PrefixEntry {

@JsonProperty(value = "Prefix", required = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,31 @@

package uk.gov.gchq.palisade.client.s3.domain;

/**
* S3 model for the 'StorageClass' XML schema.
*/
public enum StorageClass {

STANDARD,
REDUCED_REDUNDANCY,
GLACIER,
UNKNOWN;

/**
* Convert from Enum to String.
*
* @return a {@link String} representation of the {@link StorageClass}
*/
public String value() {
return name();
}

/**
* Convert from String to Enum.
*
* @param v the string to convert to a storageClass enum
* @return a {@link StorageClass}
*/
public static StorageClass fromValue(final String v) {
return valueOf(v);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
* This includes, for any given persisted resource, its parent id, which will also be persisted.
* Erase parent when storing JSON, it will be rebuilt using the repository.
*/
// Must be abstract class, not interface, to be used as mixin
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS)
@SuppressWarnings("java:S1610")
// Must be abstract class, not interface, to be used as mixin
@SuppressWarnings({"java:S1610", "java:S1694"})
public abstract class AbstractOrphanedChildJsonMixin {
/**
* Ignore the parent field when serialising to JSON.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,55 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

/**
* Repository for storing/caching content-lengths of resources after rules are applied.
* Used to inform a client of the S3 server on HTTP Content-Length (e.g. Spark is quite picky about it).
*/
public interface ContentLengthRepository extends ReactiveCrudRepository<ContentLengthEntity, String> {
int PARALLELISM = 1;

/**
* Is there a length stored for a given resourceId.
*
* @param id the resourceId to check if there is a length stored
* @return true if there is a length stored, false otherwise
*/
Mono<Boolean> existsById(String id);

/**
* Get a Content-Length for a resourceId.
*
* @param id the resourceId to get a length for
* @return a {@link ContentLengthEntity} containing the Content-Length
*/
Mono<ContentLengthEntity> getByResourceId(String id);

/**
* Is there a length stored for a given resourceId.
*
* @param id the resourceId to check if there is a length stored
* @return true if there is a length stored, false otherwise
*/
default CompletableFuture<Boolean> futureExistsById(String id) {
return existsById(id).toFuture();
}

/**
* Get a Content-Length for a resourceId.
*
* @param id the resourceId to get a length for
* @return a {@link ContentLengthEntity} containing the Content-Length
*/
default CompletableFuture<Optional<ContentLengthEntity>> futureGetByResourceId(String id) {
return this.getByResourceId(id)
.toFuture()
.thenApply(Optional::ofNullable);
}

/**
* Set a Content-Length for a given resourceId.
*
* @param entity a {@link ContentLengthEntity} that will be stored, indexed by its id
* @return the saved {@link ContentLengthEntity}, once it has been saved
*/
default CompletableFuture<ContentLengthEntity> futureSave(ContentLengthEntity entity) {
return this.save(entity)
.toFuture();
Expand Down
Loading