Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduces dspace prefix in signaling client #4468

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,23 @@ public class DataPlaneSignalingClient implements DataPlaneClient {
private final TypeTransformerRegistry transformerRegistry;
private final JsonLd jsonLd;

private final String jsonLdScope;
private final ObjectMapper mapper;

public DataPlaneSignalingClient(ControlApiHttpClient httpClient, TypeTransformerRegistry transformerRegistry, JsonLd jsonLd,
public DataPlaneSignalingClient(ControlApiHttpClient httpClient, TypeTransformerRegistry transformerRegistry, JsonLd jsonLd, String jsonLdScope,
ObjectMapper mapper, DataPlaneInstance dataPlane) {
this.httpClient = httpClient;
this.transformerRegistry = transformerRegistry;
this.jsonLd = jsonLd;
this.jsonLdScope = jsonLdScope;
this.mapper = mapper;
this.dataPlane = dataPlane;
}

private static <T> @NotNull StatusResult<T> failedResult(String processId, ServiceFailure failure) {
return StatusResult.failure(FATAL_ERROR, format("Transfer request for process %s failed: %s", processId, failure.getFailureDetail()));
}

@WithSpan
@Override
public StatusResult<DataFlowResponseMessage> start(DataFlowStartMessage message) {
Expand Down Expand Up @@ -102,13 +108,9 @@ public StatusResult<Void> checkAvailability() {
.orElse(failure -> failedResult(null, failure)));
}

private static <T> @NotNull StatusResult<T> failedResult(String processId, ServiceFailure failure) {
return StatusResult.failure(FATAL_ERROR, format("Transfer request for process %s failed: %s", processId, failure.getFailureDetail()));
}

private StatusResult<Request.Builder> createRequestBuilder(Object message, String url) {
return transformerRegistry.transform(message, JsonObject.class)
.compose(jsonLd::compact)
.compose(this::compact)
.compose(this::serializeMessage)
.map(rawBody -> RequestBody.create(rawBody, TYPE_JSON))
.map(body -> new Request.Builder().post(body).url(url))
Expand Down Expand Up @@ -143,6 +145,10 @@ private StatusResult<DataFlowResponseMessage> deserializeStartMessage(String res
}
}

private Result<JsonObject> compact(JsonObject object) {
return jsonLd.compact(object, jsonLdScope);
}

private Result<String> serializeMessage(Object message) {
try {
return Result.success(mapper.writeValueAsString(message));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@

import java.util.Objects;

import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.VOCAB;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_PREFIX;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_SCHEMA;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_PREFIX;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_SCHEMA;
import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.edc.spi.constants.CoreConstants.JSON_LD;

/**
Expand All @@ -37,6 +43,7 @@
@Extension(value = DataPlaneSignalingClientExtension.NAME)
public class DataPlaneSignalingClientExtension implements ServiceExtension {
public static final String NAME = "Data Plane Signaling Client";
public static final String CONTROL_CLIENT_SCOPE = "CONTROL_CLIENT_SCOPE";

@Inject(required = false)
private ControlApiHttpClient httpClient;
Expand All @@ -62,12 +69,16 @@ public DataPlaneClientFactory dataPlaneClientFactory(ServiceExtensionContext con
context.getMonitor().debug(() -> "Using embedded Data Plane client.");
return instance -> new EmbeddedDataPlaneClient(dataPlaneManager);
}

jsonLd.registerNamespace(ODRL_PREFIX, ODRL_SCHEMA, CONTROL_CLIENT_SCOPE);
jsonLd.registerNamespace(DSPACE_PREFIX, DSPACE_SCHEMA, CONTROL_CLIENT_SCOPE);
jsonLd.registerNamespace(VOCAB, EDC_NAMESPACE);

var mapper = typeManager.getMapper(JSON_LD);
context.getMonitor().debug(() -> "Using remote Data Plane client.");
Objects.requireNonNull(httpClient, "To use remote Data Plane client, a ControlApiHttpClient instance must be registered");
var signalingApiTypeTransformerRegistry = transformerRegistry.forContext("signaling-api");
return instance -> new DataPlaneSignalingClient(httpClient, signalingApiTypeTransformerRegistry, jsonLd, mapper,
return instance -> new DataPlaneSignalingClient(httpClient, signalingApiTypeTransformerRegistry, jsonLd, CONTROL_CLIENT_SCOPE, mapper,
instance);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,39 @@
import org.eclipse.edc.boot.system.injection.ObjectFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.dataplane.client.DataPlaneSignalingClientExtension.CONTROL_CLIENT_SCOPE;
import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.VOCAB;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_PREFIX;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_SCHEMA;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_PREFIX;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_SCHEMA;
import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

@ExtendWith(DependencyInjectionExtension.class)
class DataPlaneSignalingClientExtensionTest {

@Test
void verifyDataPlaneClientFactory(ServiceExtensionContext context, ObjectFactory factory) {
var jsonLd = mock(JsonLd.class);
context.registerService(DataPlaneManager.class, null);
context.registerService(JsonLd.class, jsonLd);
var extension = factory.constructInstance(DataPlaneSignalingClientExtension.class);

var client = extension.dataPlaneClientFactory(context).createClient(createDataPlaneInstance());

assertThat(client).isInstanceOf(DataPlaneSignalingClient.class);
verify(jsonLd).registerNamespace(ODRL_PREFIX, ODRL_SCHEMA, CONTROL_CLIENT_SCOPE);
verify(jsonLd).registerNamespace(DSPACE_PREFIX, DSPACE_SCHEMA, CONTROL_CLIENT_SCOPE);
verify(jsonLd).registerNamespace(VOCAB, EDC_NAMESPACE);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,15 @@
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.edc.connector.dataplane.client.DataPlaneSignalingClientExtension.CONTROL_CLIENT_SCOPE;
import static org.eclipse.edc.http.client.testfixtures.HttpTestUtils.testHttpClient;
import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.VOCAB;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_PREFIX;
import static org.eclipse.edc.jsonld.spi.Namespaces.DSPACE_SCHEMA;
import static org.eclipse.edc.jsonld.util.JacksonJsonLd.createObjectMapper;
import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_PREFIX;
import static org.eclipse.edc.policy.model.OdrlNamespace.ODRL_SCHEMA;
import static org.eclipse.edc.spi.constants.CoreConstants.EDC_NAMESPACE;
import static org.eclipse.edc.util.io.Ports.getFreePort;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -94,13 +99,17 @@ class DataPlaneSignalingClientTest {
private final ControlApiHttpClient httpClient = new ControlApiHttpClientImpl(testHttpClient(), mock());

private final DataPlaneClient dataPlaneClient = new DataPlaneSignalingClient(httpClient, TRANSFORMER_REGISTRY,
JSON_LD, MAPPER, instance);
JSON_LD, CONTROL_CLIENT_SCOPE, MAPPER, instance);

@BeforeAll
public static void setUp() {
var factory = Json.createBuilderFactory(Map.of());

JSON_LD.registerNamespace(VOCAB, EDC_NAMESPACE);
JSON_LD.registerNamespace(VOCAB, EDC_NAMESPACE, CONTROL_CLIENT_SCOPE);
JSON_LD.registerNamespace(ODRL_PREFIX, ODRL_SCHEMA, CONTROL_CLIENT_SCOPE);
JSON_LD.registerNamespace(DSPACE_PREFIX, DSPACE_SCHEMA, CONTROL_CLIENT_SCOPE);

dataPlane = startClientAndServer(DATA_PLANE_API_PORT);
TRANSFORMER_REGISTRY.register(new JsonObjectFromDataFlowTerminateMessageTransformer(factory));
TRANSFORMER_REGISTRY.register(new JsonObjectFromDataFlowSuspendMessageTransformer(factory));
Expand All @@ -116,6 +125,10 @@ public static void tearDown() {
stopQuietly(dataPlane);
}

private static Result<JsonObject> compact(JsonObject input) {
return JSON_LD.compact(input, CONTROL_CLIENT_SCOPE);
}

@AfterEach
public void resetMockServer() {
dataPlane.reset();
Expand All @@ -129,7 +142,7 @@ void verifyReturnFatalErrorIfReceiveResponseWithNullBody() throws JsonProcessing
var flowRequest = createDataFlowRequest();

var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class)
.compose(JSON_LD::compact)
.compose(DataPlaneSignalingClientTest::compact)
.orElseThrow((e) -> new EdcException(e.getFailureDetail()));

var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected));
Expand All @@ -150,7 +163,7 @@ void verifyReturnFatalErrorIfReceiveErrorInResponse() throws JsonProcessingExcep
var flowRequest = createDataFlowRequest();

var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class)
.compose(JSON_LD::compact)
.compose(DataPlaneSignalingClientTest::compact)
.orElseThrow((e) -> new EdcException(e.getFailureDetail()));

var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected));
Expand All @@ -170,7 +183,7 @@ void verifyReturnFatalErrorIfReceiveErrorInResponse() throws JsonProcessingExcep
void verifyReturnFatalErrorIfTransformFails() {
var flowRequest = createDataFlowRequest();
TypeTransformerRegistry registry = mock();
var dataPlaneClient = new DataPlaneSignalingClient(httpClient, registry, JSON_LD, MAPPER, instance);
var dataPlaneClient = new DataPlaneSignalingClient(httpClient, registry, JSON_LD, CONTROL_CLIENT_SCOPE, MAPPER, instance);

when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure"));

Expand All @@ -188,7 +201,7 @@ void verifyReturnFatalErrorIfTransformFails() {
void verifyReturnFatalError_whenBadResponse() throws JsonProcessingException {
var flowRequest = createDataFlowRequest();
var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class)
.compose(JSON_LD::compact)
.compose(DataPlaneSignalingClientTest::compact)
.orElseThrow((e) -> new EdcException(e.getFailureDetail()));


Expand All @@ -211,7 +224,7 @@ void verifyReturnFatalError_whenBadResponse() throws JsonProcessingException {
void verifyTransferSuccess() throws JsonProcessingException {
var flowRequest = createDataFlowRequest();
var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class)
.compose(JSON_LD::compact)
.compose(DataPlaneSignalingClientTest::compact)
.orElseThrow((e) -> new EdcException(e.getFailureDetail()));

var flowResponse = DataFlowResponseMessage.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("type").build()).build();
Expand All @@ -234,7 +247,7 @@ void verifyTransferSuccess() throws JsonProcessingException {
void verifyTransferSuccess_withoutDataAddress() throws JsonProcessingException {
var flowRequest = createDataFlowRequest();
var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class)
.compose(JSON_LD::compact)
.compose(DataPlaneSignalingClientTest::compact)
.orElseThrow((e) -> new EdcException(e.getFailureDetail()));

var flowResponse = DataFlowResponseMessage.Builder.newInstance().build();
Expand Down Expand Up @@ -303,7 +316,7 @@ void shouldFail_whenConflictResponse() {
@Test
void verifyReturnFatalErrorIfTransformFails() {
TypeTransformerRegistry registry = mock();
var dataPlaneClient = new DataPlaneSignalingClient(httpClient, registry, JSON_LD, MAPPER, instance);
var dataPlaneClient = new DataPlaneSignalingClient(httpClient, registry, JSON_LD, CONTROL_CLIENT_SCOPE, MAPPER, instance);

when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure"));

Expand Down Expand Up @@ -346,7 +359,7 @@ void shouldFail_whenConflictResponse() {
@Test
void verifyReturnFatalErrorIfTransformFails() {
TypeTransformerRegistry registry = mock();
var dataPlaneClient = new DataPlaneSignalingClient(httpClient, registry, JSON_LD, MAPPER, instance);
var dataPlaneClient = new DataPlaneSignalingClient(httpClient, registry, JSON_LD, CONTROL_CLIENT_SCOPE, MAPPER, instance);

when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure"));

Expand Down
Loading