Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pal 1023 s3 interceptor client #67

Merged
merged 27 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c195c85
PAL-1023-s3-interceptor-client initial commit
dev930018 May 24, 2021
378c444
PAL-1023 s3 server stands up and returns pair of example resources
dev930018 May 27, 2021
17c7655
Merge branch 'develop' of https://github.com/gchq/Palisade-clients in…
dev930018 Jun 15, 2021
562ad87
PAL-1023 added client acting as s3-api interface, tested against spark
dev930018 Jun 18, 2021
cae85a4
PAL-1023 put on hold without completing work for spark compatibility
dev930018 Jun 22, 2021
1b57822
PAL-1023 minor message change and port updates from testing
dev930018 Jun 23, 2021
1536f4b
Merge branch 'develop' of https://github.com/gchq/Palisade-clients in…
dev930018 Jun 24, 2021
877421a
PAL-1023 updated pom description for s3 client
dev930018 Jun 24, 2021
a2379c2
PAL-1023 Abstract- prefix for abstract class
dev930018 Jun 24, 2021
3cf3e69
PAL-1023 added config for service endpoints
dev930018 Jun 30, 2021
d839d93
PAL-1023 sonarqube code smells, mostly lambda types and documentation
dev930018 Jun 30, 2021
bf65e4b
PAL-1023 checkstyle fixes for docs
dev930018 Jun 30, 2021
6239c12
PAL-1023 import order
dev930018 Jun 30, 2021
74287b6
PAL-1023 import order and unused imports
dev930018 Jun 30, 2021
0644803
PAL-1023 further code-smells
dev930018 Jun 30, 2021
9d1c1a5
Merge branch 'develop' of https://github.com/gchq/Palisade-clients in…
dev930018 Jul 1, 2021
ce740e8
PAL-1023 documentation for code review
dev930018 Jul 6, 2021
a28a1eb
PAL-1023 better publisher subscription to stop tests hanging
dev930018 Jul 6, 2021
9184467
PAL-1023 removed unused imports
dev930018 Jul 6, 2021
7ad799a
PAL-1023 Update client-s3/src/main/java/uk/gov/gchq/palisade/client/s…
dev930018 Jul 6, 2021
25e4aab
PAL-1023 update in-line with Abstract- codesmell fix
dev930018 Jul 6, 2021
d9a5c41
Merge branch 'PAL-1023-s3-interceptor-client' of https://github.com/g…
dev930018 Jul 6, 2021
f722dc7
PAL-1023 dispose of disposable in effort to consume and close stream
dev930018 Jul 6, 2021
ef5c939
PAL-1023 added comment on why test is disabled, as well as another ef…
dev930018 Jul 7, 2021
6892cb8
PAL-1023 unused imports
dev930018 Jul 7, 2021
2cc96eb
PAL-1023 fixed test assertions
dev930018 Jul 7, 2021
d2b1924
PAL-1023 removed comment on disabled test
dev930018 Jul 7, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion NOTICES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,27 @@ List of third-party dependencies grouped by their license type
### [Apache Software License 2.0](./licenses/apache_software_license_2.0.txt):
* Jackson-annotations ([com.fasterxml.jackson.core:jackson-annotations:2.11.0](http://github.com/FasterXML/jackson))
* jackson-databind ([com.fasterxml.jackson.core:jackson-databind:2.11.0](http://github.com/FasterXML/jackson))
* Jackson-dataformat-XML ([com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.11.0](https://github.com/FasterXML/jackson-dataformat-xml))
* Jackson datatype: jdk8 ([com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.11.0](https://github.com/FasterXML/jackson-modules-java8/jackson-datatype-jdk8))
* akka-http-jackson ([com.typesafe.akka:akka-http-jackson_2.13:10.2.1](https://akka.io))
* akka-http ([com.typesafe.akka:akka-http_2.13:10.2.1](https://akka.io))
* akka-stream ([com.typesafe.akka:akka-stream_2.13:2.6.10](https://akka.io/))
* Micronaut ([io.micronaut:micronaut-http-server-netty:2.3.2](http://micronaut.io))
* Micronaut ([io.micronaut:micronaut-inject:2.3.2](http://micronaut.io))
* Micronaut ([io.micronaut:micronaut-inject-java:2.3.2](http://micronaut.io))
* Micronaut ([io.micronaut:micronaut-runtime:2.3.2](http://micronaut.io))
* Micronaut Test ([io.micronaut.test:micronaut-test-junit5:2.3.2](http://micronaut.io))
* Reactive Relational Database Connectivity - H2 ([io.r2dbc:r2dbc-h2:0.8.4.RELEASE](https://github.com/r2dbc/r2dbc-h2))
* RxJava ([io.reactivex.rxjava3:rxjava:3.0.8](https://github.com/ReactiveX/RxJava))
* AssertJ fluent assertions ([org.assertj:assertj-core:3.19.0](https://assertj.github.io/doc/assertj-core/))
* org.immutables.value ([org.immutables:value:2.8.2](http://immutables.org/value))
* spring-boot-autoconfigure ([org.springframework.boot:spring-boot-autoconfigure:2.3.1.RELEASE](https://spring.io/projects/spring-boot))
* spring-boot-starter-aop ([org.springframework.boot:spring-boot-starter-aop:2.3.1.RELEASE](https://spring.io/projects/spring-boot))
* spring-boot-starter-data-r2dbc ([org.springframework.boot:spring-boot-starter-data-r2dbc:2.3.1.RELEASE](https://spring.io/projects/spring-boot))
* spring-boot-starter-test ([org.springframework.boot:spring-boot-starter-test:2.3.1.RELEASE](https://spring.io/projects/spring-boot))
* Spring Shell Starter ([org.springframework.shell:spring-shell-starter:2.0.0.RELEASE](http://projects.spring.io/spring-boot/spring-shell-parent/spring-shell-starter/))
* client-java ([uk.gov.gchq.palisade:client-java:0.5.0-SNAPSHOT](https://github.com/gchq/Palisade-clients/tree/develop/client-java))
* GCHQ Palisade - Akka Client ([uk.gov.gchq.palisade:client-akka:0.5.0-SNAPSHOT](https://github.com/gchq/Palisade-clients/tree/develop/client-akka))
* GCHQ Palisade - Java Client ([uk.gov.gchq.palisade:client-java:0.5.0-SNAPSHOT](https://github.com/gchq/Palisade-clients/tree/develop/client-java))
* GCHQ Palisade Common Library ([uk.gov.gchq.palisade:common:0.5.0-SNAPSHOT](https://github.com/gchq/Palisade-common))

### [Eclipse Public License 1.0](./licenses/eclipse_public_license_1.0.html):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.WebSocketRequest;
import akka.http.javadsl.model.ws.WebSocketUpgradeResponse;
import akka.http.scaladsl.model.ws.TextMessage.Strict;
import akka.japi.Pair;
import akka.stream.Materializer;
import akka.stream.javadsl.AsPublisher;
import akka.stream.javadsl.BroadcastHub;
Expand Down Expand Up @@ -59,8 +57,31 @@
public class AkkaClient implements Client {
private static final ObjectMapper MAPPER = new ObjectMapper();

public enum SSLMode {
NONE("http", "ws"),
SSL_TLS("https", "wss");

private final String httpScheme;
private final String wsScheme;

SSLMode(final String httpScheme, final String wsScheme) {
this.httpScheme = httpScheme;
this.wsScheme = wsScheme;
}

public String getHttpScheme() {
return httpScheme;
}

public String getWsScheme() {
return wsScheme;
}
}

private final String palisadeUrl;
private final String filteredResourceUrl;
private final Map<String, String> dataUrlMap;
private final SSLMode sslMode;
private final Materializer materializer;
private final Http http;

Expand All @@ -69,11 +90,15 @@ public class AkkaClient implements Client {
*
* @param palisadeUrl the location of the Palisade Service
* @param filteredResourceUrl the location of the Filtered Resource Service
* @param dataUrlMap lookup map from the names of Data Services to their locations
* @param actorSystem the akka Actor System bean
* @param sslMode whether the client should connect using SSL or not
*/
public AkkaClient(final String palisadeUrl, final String filteredResourceUrl, final ActorSystem actorSystem) {
public AkkaClient(final String palisadeUrl, final String filteredResourceUrl, final Map<String, String> dataUrlMap, final ActorSystem actorSystem, final SSLMode sslMode) {
this.palisadeUrl = palisadeUrl;
this.filteredResourceUrl = filteredResourceUrl;
this.dataUrlMap = dataUrlMap;
this.sslMode = sslMode;
this.materializer = Materializer.createMaterializer(actorSystem);
this.http = Http.get(actorSystem);
}
Expand All @@ -89,7 +114,7 @@ public AkkaClient(final String palisadeUrl, final String filteredResourceUrl, fi
*/
public CompletionStage<String> register(final String userId, final String resourceId, final Map<String, String> context) {
return http
.singleRequest(HttpRequest.POST(String.format("http://%s/api/registerDataRequest", palisadeUrl))
.singleRequest(HttpRequest.POST(String.format("%s://%s/api/registerDataRequest", sslMode.getHttpScheme(), palisadeUrl))
.withEntity(ContentTypes.APPLICATION_JSON, serialize(
PalisadeRequest.Builder.create()
.withUserId(userId)
Expand All @@ -111,14 +136,14 @@ public Source<LeafResource, CompletionStage<NotUsed>> fetchSource(final String t
WebSocketMessage cts = WebSocketMessage.Builder.create().withType(MessageType.CTS).noHeaders().noBody();

// Map inbound messages to outbound CTS until COMPLETE is seen
Flow<WebSocketMessage, WebSocketMessage, Pair<Sink<WebSocketMessage, NotUsed>, Source<WebSocketMessage, NotUsed>>> exposeSinkAndSource = Flow.<WebSocketMessage>create()
var exposeSinkAndSource = Flow.<WebSocketMessage>create()
// Merge ws upstream with our decoupled upstream - prefer our source of messages, complete when both ws and ours are complete
.mergePreferredMat(MergeHub.of(WebSocketMessage.class), true, false, Keep.right())
// Broadcast to both ws downstream and our downstream
.alsoToMat(BroadcastHub.of(WebSocketMessage.class), Keep.both());

// Ser/Des for messages to/from the websocket
Flow<Message, Message, Pair<Sink<WebSocketMessage, NotUsed>, Source<WebSocketMessage, NotUsed>>> clientFlow = Flow.<Message>create()
var clientFlow = Flow.<Message>create()
.map(msg -> AkkaClient.readWsMessage(msg, materializer))
// Expose source and sink to this stage in materialization
.viaMat(exposeSinkAndSource, Keep.right())
Expand All @@ -137,8 +162,8 @@ public Source<LeafResource, CompletionStage<NotUsed>> fetchSource(final String t
.map(AkkaClient::writeWsMessage);

// Make the request using the ser/des flow linked to the oscillator
Pair<CompletionStage<WebSocketUpgradeResponse>, Pair<Sink<WebSocketMessage, NotUsed>, Source<WebSocketMessage, NotUsed>>> wsResponse = http.singleWebSocketRequest(
WebSocketRequest.create(String.format("ws://%s/resource/%s", filteredResourceUrl, token)),
var wsResponse = http.singleWebSocketRequest(
WebSocketRequest.create(String.format("%s://%s/resource/%s", sslMode.getWsScheme(), filteredResourceUrl, token)),
clientFlow,
materializer);

Expand Down Expand Up @@ -182,11 +207,14 @@ public Publisher<LeafResource> fetch(final String token) {
* @return a stream of bytes representing the contents of the resource
*/
public Source<ByteString, CompletionStage<NotUsed>> readSource(final String token, final LeafResource resource) {
String createConn = resource.getConnectionDetail().createConnection();
String dataUrl = dataUrlMap.getOrDefault(createConn, createConn);
return Source.completionStageSource(http.singleRequest(
HttpRequest.POST(String.format("http://%s/read/chunked", resource.getConnectionDetail().createConnection()))
HttpRequest.POST(String.format("%s://%s/read/chunked", sslMode.getHttpScheme(), dataUrl))
.withEntity(ContentTypes.APPLICATION_JSON, serialize(DataRequest.Builder.create()
.withToken(token)
.withLeafResourceId(resource.getId()))))
.withLeafResourceId(resource.getId()))
))
.thenApply(response -> response.entity().getDataBytes()
.mapMaterializedValue(ignored -> NotUsed.notUsed())));
}
Expand Down
2 changes: 1 addition & 1 deletion client-fuse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
<!-- *********** Artifact ID *********** -->
<artifactId>client-fuse</artifactId>
<url>https://github.com/gchq/Palisade-clients/tree/develop/client-fuse</url>
<name>GCHQ Palisade - Attribute-Masking Service</name>
<name>GCHQ Palisade - FUSE FS Client</name>
<description>
The Fuse Palisade Client creates a software-controlled filesystem mount to represent the returned resources and data from a query.
Returned resources, data and metadata are all presented as a FUSE mounted local directory.
Expand Down
59 changes: 59 additions & 0 deletions client-s3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<!--
/*
* Copyright 2018-2021 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->

> :information_source: ***Work In Progress***
> There are some issues between what Spark expects and what the server returns for a `GetObject`.
> The server returns a valid AVRO object (which can be successfully read if written to a file).
> The server also returns what look to be all the correct headers.
> Despite this, a Spark `read` will return a data-frame with zero records.

dev930018 marked this conversation as resolved.
Show resolved Hide resolved
# Palisade S3-Server Client

Presents an S3-compliant API wrapping a Palisade deployment.

Given a Spark job running against AWS S3 as follows:
```
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://s3.eu-west-2.amazonaws.com/")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
val nonrecursive = scala.io.Source.fromFile("/schema/nonrecursive.json").mkString
spark.read.format("avro").option("avroSchema", nonrecursive).load("s3a://palisade-application-dev/data/remote-data-store/data/employee_file0.avro").show()
```
_Note that we use a modified non-recursive AVRO schema `/schema/nonrecursive.json` (this excludes the managers field) as recursive schema are not compatible with Spark SQL._

Adapt the Spark job to run against the Palisade S3 client:
```scala
import sys.process._;
// User 'Alice' wants 'file:/data/local-data-store/' directory for 'SALARY' purposes
// We get back the token '09d3a677-3d03-42e0-8cdb-f048f3929f8c', to be used as a bucket-name
val token = (Seq("curl", "-X", "POST", "http://localhost:8092/register?userId=Alice&resourceId=file%3A%2Fdata%2Flocal-data-store%2F&purpose=SALARY")!!).stripSuffix("\n")
Thread.sleep(5000)

spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "localhost:8092/request")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false")
// These are not interpreted or validated by Palisade, but Spark requires them to be non-null
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "accesskey")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "secretkey")
// spark.read.format("avro").load("s3a://" + token + "/with-policy/employee_small.avro").show()
val nonrecursive = scala.io.Source.fromFile("/schema/nonrecursive.json").mkString
spark.read.format("avro").option("avroSchema", nonrecursive).load("s3a://" + token + "/data/employee_file0.avro").show()
```

The client currently requires hard-coding the Palisade services URLs in the `EndpointConfiguration`.
61 changes: 61 additions & 0 deletions client-s3/mvn_dependency_tree.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
uk.gov.gchq.palisade:client-s3:jar:0.5.0-SNAPSHOT
+- uk.gov.gchq.palisade:client-akka:jar:0.5.0-SNAPSHOT:compile
| +- uk.gov.gchq.palisade:common:jar:0.5.0-SNAPSHOT:compile
| | \- org.apache.avro:avro:jar:1.8.2:compile
| | +- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
| | +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
| | +- com.thoughtworks.paranamer:paranamer:jar:2.7:compile
| | +- org.xerial.snappy:snappy-java:jar:1.1.1.3:compile
| | +- org.apache.commons:commons-compress:jar:1.8.1:compile
| | \- org.tukaani:xz:jar:1.5:compile
| \- com.fasterxml.jackson.core:jackson-annotations:jar:2.11.0:compile
+- com.typesafe.akka:akka-stream_2.13:jar:2.6.10:compile
| +- org.scala-lang:scala-library:jar:2.13.3:compile
| +- com.typesafe.akka:akka-actor_2.13:jar:2.6.10:compile
| | +- com.typesafe:config:jar:1.4.0:compile
| | \- org.scala-lang.modules:scala-java8-compat_2.13:jar:0.9.0:compile
| +- com.typesafe.akka:akka-protobuf-v3_2.13:jar:2.6.10:compile
| +- org.reactivestreams:reactive-streams:jar:1.0.3:compile
| \- com.typesafe:ssl-config-core_2.13:jar:0.4.2:compile
| \- org.scala-lang.modules:scala-parser-combinators_2.13:jar:1.1.2:compile
+- com.typesafe.akka:akka-http_2.13:jar:10.2.1:compile
| \- com.typesafe.akka:akka-http-core_2.13:jar:10.2.1:compile
| \- com.typesafe.akka:akka-parsing_2.13:jar:10.2.1:compile
+- com.typesafe.akka:akka-http-jackson_2.13:jar:10.2.1:compile
+- com.fasterxml.jackson.dataformat:jackson-dataformat-xml:jar:2.11.0:compile
| +- com.fasterxml.jackson.core:jackson-core:jar:2.11.0:compile
| +- com.fasterxml.jackson.module:jackson-module-jaxb-annotations:jar:2.11.0:compile
| | +- jakarta.xml.bind:jakarta.xml.bind-api:jar:2.3.3:compile
| | \- jakarta.activation:jakarta.activation-api:jar:1.2.2:compile
| +- org.codehaus.woodstox:stax2-api:jar:4.2:compile
| \- com.fasterxml.woodstox:woodstox-core:jar:6.2.0:compile
+- com.fasterxml.jackson.core:jackson-databind:jar:2.11.0:compile
+- org.springframework.boot:spring-boot-autoconfigure:jar:2.3.1.RELEASE:compile
| \- org.springframework.boot:spring-boot:jar:2.3.1.RELEASE:compile
| +- org.springframework:spring-core:jar:5.2.7.RELEASE:compile
| | \- org.springframework:spring-jcl:jar:5.2.7.RELEASE:compile
| \- org.springframework:spring-context:jar:5.2.7.RELEASE:compile
| +- org.springframework:spring-aop:jar:5.2.7.RELEASE:compile
| \- org.springframework:spring-expression:jar:5.2.7.RELEASE:compile
+- org.springframework.boot:spring-boot-starter-data-r2dbc:jar:2.3.1.RELEASE:compile
| +- org.springframework.boot:spring-boot-starter:jar:2.3.1.RELEASE:compile
| | +- org.springframework.boot:spring-boot-starter-logging:jar:2.3.1.RELEASE:compile
| | | +- ch.qos.logback:logback-classic:jar:1.2.3:compile
| | | | \- ch.qos.logback:logback-core:jar:1.2.3:compile
| | | +- org.apache.logging.log4j:log4j-to-slf4j:jar:2.13.3:compile
| | | | \- org.apache.logging.log4j:log4j-api:jar:2.13.3:compile
| | | \- org.slf4j:jul-to-slf4j:jar:1.7.30:compile
| | +- jakarta.annotation:jakarta.annotation-api:jar:1.3.5:compile
| | \- org.yaml:snakeyaml:jar:1.26:compile
| +- org.springframework.data:spring-data-r2dbc:jar:1.1.1.RELEASE:compile
| | +- org.springframework.data:spring-data-commons:jar:2.3.1.RELEASE:compile
| | +- org.springframework.data:spring-data-relational:jar:2.0.1.RELEASE:compile
| | +- org.springframework:spring-tx:jar:5.2.7.RELEASE:compile
| | +- org.springframework:spring-beans:jar:5.2.7.RELEASE:compile
| | \- org.slf4j:slf4j-api:jar:1.7.30:compile
| +- io.r2dbc:r2dbc-spi:jar:0.8.2.RELEASE:compile
| \- io.r2dbc:r2dbc-pool:jar:0.8.3.RELEASE:compile
| \- io.projectreactor.addons:reactor-pool:jar:0.1.4.RELEASE:compile
\- io.r2dbc:r2dbc-h2:jar:0.8.4.RELEASE:compile
+- com.h2database:h2:jar:1.4.200:compile
\- io.projectreactor:reactor-core:jar:3.3.6.RELEASE:compile
Loading