Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pal 1023 s3 interceptor client #67

Merged
merged 27 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c195c85
PAL-1023-s3-interceptor-client initial commit
dev930018 May 24, 2021
378c444
PAL-1023 s3 server stands up and returns pair of example resources
dev930018 May 27, 2021
17c7655
Merge branch 'develop' of https://github.com/gchq/Palisade-clients in…
dev930018 Jun 15, 2021
562ad87
PAL-1023 added client acting as s3-api interface, tested against spark
dev930018 Jun 18, 2021
cae85a4
PAL-1023 put on hold without completing work for spark compatibility
dev930018 Jun 22, 2021
1b57822
PAL-1023 minor message change and port updates from testing
dev930018 Jun 23, 2021
1536f4b
Merge branch 'develop' of https://github.com/gchq/Palisade-clients in…
dev930018 Jun 24, 2021
877421a
PAL-1023 updated pom description for s3 client
dev930018 Jun 24, 2021
a2379c2
PAL-1023 Abstract- prefix for abstract class
dev930018 Jun 24, 2021
3cf3e69
PAL-1023 added config for service endpoints
dev930018 Jun 30, 2021
d839d93
PAL-1023 sonarqube code smells, mostly lambda types and documentation
dev930018 Jun 30, 2021
bf65e4b
PAL-1023 checkstyle fixes for docs
dev930018 Jun 30, 2021
6239c12
PAL-1023 import order
dev930018 Jun 30, 2021
74287b6
PAL-1023 import order and unused imports
dev930018 Jun 30, 2021
0644803
PAL-1023 further code-smells
dev930018 Jun 30, 2021
9d1c1a5
Merge branch 'develop' of https://github.com/gchq/Palisade-clients in…
dev930018 Jul 1, 2021
ce740e8
PAL-1023 documentation for code review
dev930018 Jul 6, 2021
a28a1eb
PAL-1023 better publisher subscription to stop tests hanging
dev930018 Jul 6, 2021
9184467
PAL-1023 removed unused imports
dev930018 Jul 6, 2021
7ad799a
PAL-1023 Update client-s3/src/main/java/uk/gov/gchq/palisade/client/s…
dev930018 Jul 6, 2021
25e4aab
PAL-1023 update in-line with Abstract- codesmell fix
dev930018 Jul 6, 2021
d9a5c41
Merge branch 'PAL-1023-s3-interceptor-client' of https://github.com/g…
dev930018 Jul 6, 2021
f722dc7
PAL-1023 dispose of disposable in effort to consume and close stream
dev930018 Jul 6, 2021
ef5c939
PAL-1023 added comment on why test is disabled, as well as another ef…
dev930018 Jul 7, 2021
6892cb8
PAL-1023 unused imports
dev930018 Jul 7, 2021
2cc96eb
PAL-1023 fixed test assertions
dev930018 Jul 7, 2021
d2b1924
PAL-1023 removed comment on disabled test
dev930018 Jul 7, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions NOTICES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,26 @@ List of third-party dependencies grouped by their license type
### [Apache Software License 2.0](./licenses/apache_software_license_2.0.txt):
* Jackson-annotations ([com.fasterxml.jackson.core:jackson-annotations:2.11.0](http://github.com/FasterXML/jackson))
* jackson-databind ([com.fasterxml.jackson.core:jackson-databind:2.11.0](http://github.com/FasterXML/jackson))
* Jackson-dataformat-XML ([com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.11.0](https://github.com/FasterXML/jackson-dataformat-xml))
* Jackson datatype: jdk8 ([com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.11.0](https://github.com/FasterXML/jackson-modules-java8/jackson-datatype-jdk8))
* akka-http-jackson ([com.typesafe.akka:akka-http-jackson_2.13:10.2.1](https://akka.io))
* akka-http ([com.typesafe.akka:akka-http_2.13:10.2.1](https://akka.io))
* akka-stream ([com.typesafe.akka:akka-stream_2.13:2.6.10](https://akka.io/))
* Micronaut ([io.micronaut:micronaut-http-server-netty:2.3.2](http://micronaut.io))
* Micronaut ([io.micronaut:micronaut-inject:2.3.2](http://micronaut.io))
* Micronaut ([io.micronaut:micronaut-inject-java:2.3.2](http://micronaut.io))
* Micronaut ([io.micronaut:micronaut-runtime:2.3.2](http://micronaut.io))
* Micronaut Test ([io.micronaut.test:micronaut-test-junit5:2.3.2](http://micronaut.io))
* Reactive Relational Database Connectivity - H2 ([io.r2dbc:r2dbc-h2:0.8.4.RELEASE](https://github.com/r2dbc/r2dbc-h2))
* RxJava ([io.reactivex.rxjava3:rxjava:3.0.8](https://github.com/ReactiveX/RxJava))
* AssertJ fluent assertions ([org.assertj:assertj-core:3.19.0](https://assertj.github.io/doc/assertj-core/))
* org.immutables.value ([org.immutables:value:2.8.2](http://immutables.org/value))
* spring-boot-autoconfigure ([org.springframework.boot:spring-boot-autoconfigure:2.3.1.RELEASE](https://spring.io/projects/spring-boot))
* spring-boot-starter-aop ([org.springframework.boot:spring-boot-starter-aop:2.3.1.RELEASE](https://spring.io/projects/spring-boot))
* spring-boot-starter-data-r2dbc ([org.springframework.boot:spring-boot-starter-data-r2dbc:2.3.1.RELEASE](https://spring.io/projects/spring-boot))
* spring-boot-starter-test ([org.springframework.boot:spring-boot-starter-test:2.3.1.RELEASE](https://spring.io/projects/spring-boot))
* Spring Shell Starter ([org.springframework.shell:spring-shell-starter:2.0.0.RELEASE](http://projects.spring.io/spring-boot/spring-shell-parent/spring-shell-starter/))
* GCHQ Palisade - Akka Client ([uk.gov.gchq.palisade:client-akka:0.5.0-SNAPSHOT](https://github.com/gchq/Palisade-clients/tree/develop/client-akka))
* GCHQ Palisade - Java Client ([uk.gov.gchq.palisade:client-java:0.5.0-SNAPSHOT](https://github.com/gchq/Palisade-clients/tree/develop/client-java))
* GCHQ Palisade Common Library ([uk.gov.gchq.palisade:common:0.5.0-SNAPSHOT](https://github.com/gchq/Palisade-common))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.WebSocketRequest;
import akka.http.javadsl.model.ws.WebSocketUpgradeResponse;
import akka.http.scaladsl.model.ws.TextMessage.Strict;
import akka.japi.Pair;
import akka.stream.Materializer;
import akka.stream.javadsl.AsPublisher;
import akka.stream.javadsl.BroadcastHub;
Expand Down Expand Up @@ -59,8 +57,35 @@
public class AkkaClient implements Client {
private static final ObjectMapper MAPPER = new ObjectMapper();

/**
* Specify URL schemes based on whether SSL/TLS encryption should be used over-the-wire.
* Ingress to the cluster may use TLS, while cluster-internal comms may use plaintext.
*/
public enum SSLMode {
NONE("http", "ws"),
SSL_TLS("https", "wss");

private final String httpScheme;
private final String wsScheme;

SSLMode(final String httpScheme, final String wsScheme) {
this.httpScheme = httpScheme;
this.wsScheme = wsScheme;
}

public String getHttpScheme() {
return httpScheme;
}

public String getWsScheme() {
return wsScheme;
}
}

private final String palisadeUrl;
private final String filteredResourceUrl;
private final Map<String, String> dataUrlMap;
private final SSLMode sslMode;
private final Materializer materializer;
private final Http http;

Expand All @@ -69,11 +94,15 @@ public class AkkaClient implements Client {
*
* @param palisadeUrl the location of the Palisade Service
* @param filteredResourceUrl the location of the Filtered Resource Service
* @param dataUrlMap lookup map from the names of Data Services to their locations
* @param actorSystem the akka Actor System bean
* @param sslMode whether the client should connect using SSL or not
*/
public AkkaClient(final String palisadeUrl, final String filteredResourceUrl, final ActorSystem actorSystem) {
public AkkaClient(final String palisadeUrl, final String filteredResourceUrl, final Map<String, String> dataUrlMap, final ActorSystem actorSystem, final SSLMode sslMode) {
this.palisadeUrl = palisadeUrl;
this.filteredResourceUrl = filteredResourceUrl;
this.dataUrlMap = dataUrlMap;
this.sslMode = sslMode;
this.materializer = Materializer.createMaterializer(actorSystem);
this.http = Http.get(actorSystem);
}
Expand All @@ -89,7 +118,7 @@ public AkkaClient(final String palisadeUrl, final String filteredResourceUrl, fi
*/
public CompletionStage<String> register(final String userId, final String resourceId, final Map<String, String> context) {
return http
.singleRequest(HttpRequest.POST(String.format("http://%s/api/registerDataRequest", palisadeUrl))
.singleRequest(HttpRequest.POST(String.format("%s://%s/api/registerDataRequest", sslMode.getHttpScheme(), palisadeUrl))
.withEntity(ContentTypes.APPLICATION_JSON, serialize(
PalisadeRequest.Builder.create()
.withUserId(userId)
Expand All @@ -111,14 +140,14 @@ public Source<LeafResource, CompletionStage<NotUsed>> fetchSource(final String t
WebSocketMessage cts = WebSocketMessage.Builder.create().withType(MessageType.CTS).noHeaders().noBody();

// Map inbound messages to outbound CTS until COMPLETE is seen
Flow<WebSocketMessage, WebSocketMessage, Pair<Sink<WebSocketMessage, NotUsed>, Source<WebSocketMessage, NotUsed>>> exposeSinkAndSource = Flow.<WebSocketMessage>create()
var exposeSinkAndSource = Flow.<WebSocketMessage>create()
// Merge ws upstream with our decoupled upstream - prefer our source of messages, complete when both ws and ours are complete
.mergePreferredMat(MergeHub.of(WebSocketMessage.class), true, false, Keep.right())
// Broadcast to both ws downstream and our downstream
.alsoToMat(BroadcastHub.of(WebSocketMessage.class), Keep.both());

// Ser/Des for messages to/from the websocket
Flow<Message, Message, Pair<Sink<WebSocketMessage, NotUsed>, Source<WebSocketMessage, NotUsed>>> clientFlow = Flow.<Message>create()
var clientFlow = Flow.<Message>create()
.map(msg -> AkkaClient.readWsMessage(msg, materializer))
// Expose source and sink to this stage in materialization
.viaMat(exposeSinkAndSource, Keep.right())
Expand All @@ -137,8 +166,8 @@ public Source<LeafResource, CompletionStage<NotUsed>> fetchSource(final String t
.map(AkkaClient::writeWsMessage);

// Make the request using the ser/des flow linked to the oscillator
Pair<CompletionStage<WebSocketUpgradeResponse>, Pair<Sink<WebSocketMessage, NotUsed>, Source<WebSocketMessage, NotUsed>>> wsResponse = http.singleWebSocketRequest(
WebSocketRequest.create(String.format("ws://%s/resource/%s", filteredResourceUrl, token)),
var wsResponse = http.singleWebSocketRequest(
WebSocketRequest.create(String.format("%s://%s/resource/%s", sslMode.getWsScheme(), filteredResourceUrl, token)),
clientFlow,
materializer);

Expand Down Expand Up @@ -182,11 +211,14 @@ public Publisher<LeafResource> fetch(final String token) {
* @return a stream of bytes representing the contents of the resource
*/
public Source<ByteString, CompletionStage<NotUsed>> readSource(final String token, final LeafResource resource) {
String createConn = resource.getConnectionDetail().createConnection();
String dataUrl = dataUrlMap.getOrDefault(createConn, createConn);
return Source.completionStageSource(http.singleRequest(
HttpRequest.POST(String.format("http://%s/read/chunked", resource.getConnectionDetail().createConnection()))
HttpRequest.POST(String.format("%s://%s/read/chunked", sslMode.getHttpScheme(), dataUrl))
.withEntity(ContentTypes.APPLICATION_JSON, serialize(DataRequest.Builder.create()
.withToken(token)
.withLeafResourceId(resource.getId()))))
.withLeafResourceId(resource.getId()))
))
.thenApply(response -> response.entity().getDataBytes()
.mapMaterializedValue(ignored -> NotUsed.notUsed())));
}
Expand Down
2 changes: 1 addition & 1 deletion client-fuse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<!-- *********** Artifact ID *********** -->
<artifactId>client-fuse</artifactId>
<url>https://github.com/gchq/Palisade-clients/tree/develop/client-fuse</url>
<name>GCHQ Palisade - Attribute-Masking Service</name>
<name>GCHQ Palisade - FUSE FS Client</name>
<description>
The Fuse Palisade Client creates a software-controlled filesystem mount to represent the returned resources and data from a query.
Returned resources, data and metadata are all presented as a FUSE mounted local directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,26 @@

import io.micronaut.runtime.server.EmbeddedServer;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import org.assertj.core.api.Assertions;
import io.reactivex.rxjava3.internal.schedulers.IoScheduler;
import org.junit.jupiter.api.Test;
import org.reactivestreams.FlowAdapters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import uk.gov.gchq.palisade.client.java.ClientManager;
import uk.gov.gchq.palisade.client.java.Download;
import uk.gov.gchq.palisade.client.java.QueryItem.ItemType;
import uk.gov.gchq.palisade.client.java.QueryResponse;

import javax.inject.Inject;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static uk.gov.gchq.palisade.client.java.testing.ClientTestData.FILE_NAME_0;
import static uk.gov.gchq.palisade.client.java.testing.ClientTestData.FILE_NAME_1;

/**
* @since 0.5.0
Expand All @@ -49,6 +48,12 @@ class FullTest {
@Inject
EmbeddedServer embeddedServer;

/**
* Register a request with the Palisade Service, fetch resources from the Filtered-Resource Service, and download from the Data Service.
* This test runs in a flatter, non-streaming, non-async manner, where the download is performed on the main thread.
*
* @throws Exception if no resources are returned, or the download fails
*/
@Test
void testWithDownloadOutsideStream() throws Exception {

Expand All @@ -57,81 +62,80 @@ void testWithDownloadOutsideStream() throws Exception {
var session = ClientManager.openSession(String.format("pal://localhost:%d/cluster?userid=alice", port));
var query = session.createQuery("resource_id");
var publisher = query
.execute()
.thenApply(QueryResponse::stream)
.get();
.execute()
.thenApply(QueryResponse::stream)
.get();

var resources = Flowable.fromPublisher(FlowAdapters.toPublisher(publisher))
.filter(m -> m.getType().equals(ItemType.RESOURCE))
.collect(Collectors.toList())
.timeout(10, TimeUnit.SECONDS)
.blockingGet();
.filter(m -> m.getType().equals(ItemType.RESOURCE))
.collect(Collectors.toList())
.timeout(10, TimeUnit.SECONDS)
.blockingGet();

assertThat(resources).as("check resource count").hasSizeGreaterThan(0);

var resource = resources.get(0);
assertThat(resource.asResource().getId())
.as("check leaf resource id")
.isEqualTo(FILE_NAME_0.asString());
var expectedCollection = Map.of(
FILE_NAME_0.asString(), FILE_NAME_0.createStream(),
FILE_NAME_1.asString(), FILE_NAME_1.createStream()
);

var download = session.fetch(resource);
assertThat(download).as("check download exists").isNotNull();
for (var resource : resources) {
assertThat(resource.asResource().getId())
.as("check leaf resource id")
.isIn(expectedCollection.keySet());

try (var actual = download.getInputStream();
var expected = FILE_NAME_0.createStream();
) {
assertThat(actual).as("check stream download").hasSameContentAs(expected);
var download = session.fetch(resource);
assertThat(download).as("check download exists").isNotNull();

try (var actual = download.getInputStream();
var expected = expectedCollection.get(resource.asResource().getId());
) {
assertThat(actual).as("check stream download").hasSameContentAs(expected);
}
}

}

/**
* Register a request with the Palisade Service, fetch resources from the Filtered-Resource Service, and download from the Data Service.
* This test runs in a nested, streaming, async manner, where the download is performed asynchronously on some reactor thread.
*
* @throws Exception if no resources are returned, or the download fails
*/
@Test
void testWithDownloadInsideStream() throws Exception {

var session = ClientManager.openSession(String.format("pal://localhost:%d/cluster?userid=alice", embeddedServer.getPort()));
var query = session.createQuery("resource_id");
var publisher = query
.execute()
.thenApply(QueryResponse::stream)
.get();

Flowable.fromPublisher(FlowAdapters.toPublisher(publisher))
.filter(m -> m.getType().equals(ItemType.RESOURCE))
.map(session::fetch)
.timeout(10, TimeUnit.SECONDS)
.subscribe(new FlowableSubscriber<>() {

@Override
public void onNext(final Download t) {
LOGGER.debug("## Got message: {}", t);
try (var is = t.getInputStream()) {
LOGGER.debug("## reading bytes");
var ba = is.readAllBytes();
LOGGER.debug("## read {} bytes", ba.length);
LOGGER.debug(new String(ba));
} catch (Throwable e) {
LOGGER.error("Got error reading input stream into byte array", e);
throw new IllegalStateException("Got error reading input stream into byte array", e);
.execute()
.thenApply(QueryResponse::stream)
.get();

var expectedCollection = Map.of(
FILE_NAME_0.asString(), FILE_NAME_0.createStream(),
FILE_NAME_1.asString(), FILE_NAME_1.createStream()
);

var disposable = Flowable.fromPublisher(FlowAdapters.toPublisher(publisher))
.filter(m -> m.getType().equals(ItemType.RESOURCE))
.timeout(10, TimeUnit.SECONDS)
.subscribeOn(new IoScheduler())
.subscribe((var resource) -> {
assertThat(resource.asResource().getId())
.as("check leaf resource id")
.isIn(expectedCollection.keySet());

var download = session.fetch(resource);
assertThat(download).as("check download exists").isNotNull();

try (var actual = download.getInputStream();
var expected = expectedCollection.get(resource.asResource().getId());
) {
assertThat(actual).as("check stream download").hasSameContentAs(expected);
}
}

@Override
public void onError(final Throwable t) {
LOGGER.error("## Error: {}", t.getMessage());
Assertions.fail("Failed due to:" + t.getMessage());
}

@Override
public void onComplete() {
LOGGER.debug("## complete");

}

@Override
public void onSubscribe(final org.reactivestreams.@NonNull Subscription s) {
s.request(Long.MAX_VALUE);
LOGGER.debug("## Subscribed");
}
});
});

disposable.dispose();
}
}
Loading